1 /*
2  * Copyright (C) 2007 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define TRACE_TAG TRANSPORT
18 
19 #include "sysdeps.h"
20 
21 #include "transport.h"
22 
23 #include <ctype.h>
24 #include <errno.h>
25 #include <inttypes.h>
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <string.h>
29 #include <unistd.h>
30 
31 #include <algorithm>
32 #include <list>
33 #include <memory>
34 #include <mutex>
35 #include <set>
36 #include <thread>
37 
38 #include <adb/crypto/rsa_2048_key.h>
39 #include <adb/crypto/x509_generator.h>
40 #include <adb/tls/tls_connection.h>
41 #include <android-base/logging.h>
42 #include <android-base/no_destructor.h>
43 #include <android-base/parsenetaddress.h>
44 #include <android-base/stringprintf.h>
45 #include <android-base/strings.h>
46 #include <android-base/thread_annotations.h>
47 
48 #include <diagnose_usb.h>
49 
50 #include "adb.h"
51 #include "adb_auth.h"
52 #include "adb_io.h"
53 #include "adb_trace.h"
54 #include "adb_utils.h"
55 #include "fdevent/fdevent.h"
56 #include "sysdeps/chrono.h"
57 
58 using namespace adb::crypto;
59 using namespace adb::tls;
60 using android::base::ScopedLockAssertion;
61 using TlsError = TlsConnection::TlsError;
62 
63 static void remove_transport(atransport* transport);
64 static void transport_destroy(atransport* transport);
65 
66 // TODO: unordered_map<TransportId, atransport*>
67 static auto& transport_list = *new std::list<atransport*>();
68 static auto& pending_list = *new std::list<atransport*>();
69 
70 static auto& transport_lock = *new std::recursive_mutex();
71 
72 const char* const kFeatureShell2 = "shell_v2";
73 const char* const kFeatureCmd = "cmd";
74 const char* const kFeatureStat2 = "stat_v2";
75 const char* const kFeatureLs2 = "ls_v2";
76 const char* const kFeatureLibusb = "libusb";
77 const char* const kFeaturePushSync = "push_sync";
78 const char* const kFeatureApex = "apex";
79 const char* const kFeatureFixedPushMkdir = "fixed_push_mkdir";
80 const char* const kFeatureAbb = "abb";
81 const char* const kFeatureFixedPushSymlinkTimestamp = "fixed_push_symlink_timestamp";
82 const char* const kFeatureAbbExec = "abb_exec";
83 const char* const kFeatureRemountShell = "remount_shell";
84 const char* const kFeatureTrackApp = "track_app";
85 const char* const kFeatureSendRecv2 = "sendrecv_v2";
86 const char* const kFeatureSendRecv2Brotli = "sendrecv_v2_brotli";
87 const char* const kFeatureSendRecv2LZ4 = "sendrecv_v2_lz4";
88 const char* const kFeatureSendRecv2Zstd = "sendrecv_v2_zstd";
89 const char* const kFeatureSendRecv2DryRunSend = "sendrecv_v2_dry_run_send";
90 
91 namespace {
92 
93 #if ADB_HOST
94 // Tracks and handles atransport*s that are attempting reconnection.
95 class ReconnectHandler {
96   public:
97     ReconnectHandler() = default;
98     ~ReconnectHandler() = default;
99 
100     // Starts the ReconnectHandler thread.
101     void Start();
102 
103     // Requests the ReconnectHandler thread to stop.
104     void Stop();
105 
106     // Adds the atransport* to the queue of reconnect attempts.
107     void TrackTransport(atransport* transport);
108 
109     // Wake up the ReconnectHandler thread to have it check for kicked transports.
110     void CheckForKicked();
111 
112   private:
113     // The main thread loop.
114     void Run();
115 
116     // Tracks a reconnection attempt.
117     struct ReconnectAttempt {
118         atransport* transport;
119         std::chrono::steady_clock::time_point reconnect_time;
120         size_t attempts_left;
121 
operator <__anon294a789f0111::ReconnectHandler::ReconnectAttempt122         bool operator<(const ReconnectAttempt& rhs) const {
123             if (reconnect_time == rhs.reconnect_time) {
124                 return reinterpret_cast<uintptr_t>(transport) <
125                        reinterpret_cast<uintptr_t>(rhs.transport);
126             }
127             return reconnect_time < rhs.reconnect_time;
128         }
129     };
130 
131     // Only retry for up to one minute.
132     static constexpr const std::chrono::seconds kDefaultTimeout = 10s;
133     static constexpr const size_t kMaxAttempts = 6;
134 
135     // Protects all members.
136     std::mutex reconnect_mutex_;
137     bool running_ GUARDED_BY(reconnect_mutex_) = true;
138     std::thread handler_thread_;
139     std::condition_variable reconnect_cv_;
140     std::set<ReconnectAttempt> reconnect_queue_ GUARDED_BY(reconnect_mutex_);
141 
142     DISALLOW_COPY_AND_ASSIGN(ReconnectHandler);
143 };
144 
Start()145 void ReconnectHandler::Start() {
146     check_main_thread();
147     handler_thread_ = std::thread(&ReconnectHandler::Run, this);
148 }
149 
Stop()150 void ReconnectHandler::Stop() {
151     check_main_thread();
152     {
153         std::lock_guard<std::mutex> lock(reconnect_mutex_);
154         running_ = false;
155     }
156     reconnect_cv_.notify_one();
157     handler_thread_.join();
158 
159     // Drain the queue to free all resources.
160     std::lock_guard<std::mutex> lock(reconnect_mutex_);
161     while (!reconnect_queue_.empty()) {
162         ReconnectAttempt attempt = *reconnect_queue_.begin();
163         reconnect_queue_.erase(reconnect_queue_.begin());
164         remove_transport(attempt.transport);
165     }
166 }
167 
TrackTransport(atransport * transport)168 void ReconnectHandler::TrackTransport(atransport* transport) {
169     check_main_thread();
170     {
171         std::lock_guard<std::mutex> lock(reconnect_mutex_);
172         if (!running_) return;
173         // Arbitrary sleep to give adbd time to get ready, if we disconnected because it exited.
174         auto reconnect_time = std::chrono::steady_clock::now() + 250ms;
175         reconnect_queue_.emplace(
176                 ReconnectAttempt{transport, reconnect_time, ReconnectHandler::kMaxAttempts});
177     }
178     reconnect_cv_.notify_one();
179 }
180 
CheckForKicked()181 void ReconnectHandler::CheckForKicked() {
182     reconnect_cv_.notify_one();
183 }
184 
Run()185 void ReconnectHandler::Run() {
186     while (true) {
187         ReconnectAttempt attempt;
188         {
189             std::unique_lock<std::mutex> lock(reconnect_mutex_);
190             ScopedLockAssertion assume_lock(reconnect_mutex_);
191 
192             if (!reconnect_queue_.empty()) {
193                 // FIXME: libstdc++ (used on Windows) implements condition_variable with
194                 //        system_clock as its clock, so we're probably hosed if the clock changes,
195                 //        even if we use steady_clock throughout. This problem goes away once we
196                 //        switch to libc++.
197                 reconnect_cv_.wait_until(lock, reconnect_queue_.begin()->reconnect_time);
198             } else {
199                 reconnect_cv_.wait(lock);
200             }
201 
202             if (!running_) return;
203 
204             // Scan the whole list for kicked transports, so that we immediately handle an explicit
205             // disconnect request.
206             bool kicked = false;
207             for (auto it = reconnect_queue_.begin(); it != reconnect_queue_.end();) {
208                 if (it->transport->kicked()) {
209                     D("transport %s was kicked. giving up on it.", it->transport->serial.c_str());
210                     remove_transport(it->transport);
211                     it = reconnect_queue_.erase(it);
212                 } else {
213                     ++it;
214                 }
215                 kicked = true;
216             }
217 
218             if (reconnect_queue_.empty()) continue;
219 
220             // Go back to sleep if we either woke up spuriously, or we were woken up to remove
221             // a kicked transport, and the first transport isn't ready for reconnection yet.
222             auto now = std::chrono::steady_clock::now();
223             if (reconnect_queue_.begin()->reconnect_time > now) {
224                 continue;
225             }
226 
227             attempt = *reconnect_queue_.begin();
228             reconnect_queue_.erase(reconnect_queue_.begin());
229         }
230         D("attempting to reconnect %s", attempt.transport->serial.c_str());
231 
232         switch (attempt.transport->Reconnect()) {
233             case ReconnectResult::Retry: {
234                 D("attempting to reconnect %s failed.", attempt.transport->serial.c_str());
235                 if (attempt.attempts_left == 0) {
236                     D("transport %s exceeded the number of retry attempts. giving up on it.",
237                       attempt.transport->serial.c_str());
238                     remove_transport(attempt.transport);
239                     continue;
240                 }
241 
242                 std::lock_guard<std::mutex> lock(reconnect_mutex_);
243                 reconnect_queue_.emplace(ReconnectAttempt{
244                         attempt.transport,
245                         std::chrono::steady_clock::now() + ReconnectHandler::kDefaultTimeout,
246                         attempt.attempts_left - 1});
247                 continue;
248             }
249 
250             case ReconnectResult::Success:
251                 D("reconnection to %s succeeded.", attempt.transport->serial.c_str());
252                 register_transport(attempt.transport);
253                 continue;
254 
255             case ReconnectResult::Abort:
256                 D("cancelling reconnection attempt to %s.", attempt.transport->serial.c_str());
257                 remove_transport(attempt.transport);
258                 continue;
259         }
260     }
261 }
262 
263 static auto& reconnect_handler = *new ReconnectHandler();
264 
265 #endif
266 
267 }  // namespace
268 
NextTransportId()269 TransportId NextTransportId() {
270     static std::atomic<TransportId> next(1);
271     return next++;
272 }
273 
Reset()274 void Connection::Reset() {
275     LOG(INFO) << "Connection::Reset(): stopping";
276     Stop();
277 }
278 
BlockingConnectionAdapter(std::unique_ptr<BlockingConnection> connection)279 BlockingConnectionAdapter::BlockingConnectionAdapter(std::unique_ptr<BlockingConnection> connection)
280     : underlying_(std::move(connection)) {}
281 
~BlockingConnectionAdapter()282 BlockingConnectionAdapter::~BlockingConnectionAdapter() {
283     LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): destructing";
284     Stop();
285 }
286 
Start()287 void BlockingConnectionAdapter::Start() {
288     std::lock_guard<std::mutex> lock(mutex_);
289     if (started_) {
290         LOG(FATAL) << "BlockingConnectionAdapter(" << this->transport_name_
291                    << "): started multiple times";
292     }
293 
294     StartReadThread();
295 
296     write_thread_ = std::thread([this]() {
297         LOG(INFO) << this->transport_name_ << ": write thread spawning";
298         while (true) {
299             std::unique_lock<std::mutex> lock(mutex_);
300             ScopedLockAssertion assume_locked(mutex_);
301             cv_.wait(lock, [this]() REQUIRES(mutex_) {
302                 return this->stopped_ || !this->write_queue_.empty();
303             });
304 
305             if (this->stopped_) {
306                 return;
307             }
308 
309             std::unique_ptr<apacket> packet = std::move(this->write_queue_.front());
310             this->write_queue_.pop_front();
311             lock.unlock();
312 
313             if (!this->underlying_->Write(packet.get())) {
314                 break;
315             }
316         }
317         std::call_once(this->error_flag_, [this]() { this->error_callback_(this, "write failed"); });
318     });
319 
320     started_ = true;
321 }
322 
StartReadThread()323 void BlockingConnectionAdapter::StartReadThread() {
324     read_thread_ = std::thread([this]() {
325         LOG(INFO) << this->transport_name_ << ": read thread spawning";
326         while (true) {
327             auto packet = std::make_unique<apacket>();
328             if (!underlying_->Read(packet.get())) {
329                 PLOG(INFO) << this->transport_name_ << ": read failed";
330                 break;
331             }
332 
333             bool got_stls_cmd = false;
334             if (packet->msg.command == A_STLS) {
335                 got_stls_cmd = true;
336             }
337 
338             read_callback_(this, std::move(packet));
339 
340             // If we received the STLS packet, we are about to perform the TLS
341             // handshake. So this read thread must stop and resume after the
342             // handshake completes otherwise this will interfere in the process.
343             if (got_stls_cmd) {
344                 LOG(INFO) << this->transport_name_
345                           << ": Received STLS packet. Stopping read thread.";
346                 return;
347             }
348         }
349         std::call_once(this->error_flag_, [this]() { this->error_callback_(this, "read failed"); });
350     });
351 }
352 
DoTlsHandshake(RSA * key,std::string * auth_key)353 bool BlockingConnectionAdapter::DoTlsHandshake(RSA* key, std::string* auth_key) {
354     std::lock_guard<std::mutex> lock(mutex_);
355     if (read_thread_.joinable()) {
356         read_thread_.join();
357     }
358     bool success = this->underlying_->DoTlsHandshake(key, auth_key);
359     StartReadThread();
360     return success;
361 }
362 
Reset()363 void BlockingConnectionAdapter::Reset() {
364     {
365         std::lock_guard<std::mutex> lock(mutex_);
366         if (!started_) {
367             LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): not started";
368             return;
369         }
370 
371         if (stopped_) {
372             LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_
373                       << "): already stopped";
374             return;
375         }
376     }
377 
378     LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): resetting";
379     this->underlying_->Reset();
380     Stop();
381 }
382 
Stop()383 void BlockingConnectionAdapter::Stop() {
384     {
385         std::lock_guard<std::mutex> lock(mutex_);
386         if (!started_) {
387             LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): not started";
388             return;
389         }
390 
391         if (stopped_) {
392             LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_
393                       << "): already stopped";
394             return;
395         }
396 
397         stopped_ = true;
398     }
399 
400     LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): stopping";
401 
402     this->underlying_->Close();
403     this->cv_.notify_one();
404 
405     // Move the threads out into locals with the lock taken, and then unlock to let them exit.
406     std::thread read_thread;
407     std::thread write_thread;
408 
409     {
410         std::lock_guard<std::mutex> lock(mutex_);
411         read_thread = std::move(read_thread_);
412         write_thread = std::move(write_thread_);
413     }
414 
415     read_thread.join();
416     write_thread.join();
417 
418     LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): stopped";
419     std::call_once(this->error_flag_, [this]() { this->error_callback_(this, "requested stop"); });
420 }
421 
Write(std::unique_ptr<apacket> packet)422 bool BlockingConnectionAdapter::Write(std::unique_ptr<apacket> packet) {
423     {
424         std::lock_guard<std::mutex> lock(this->mutex_);
425         write_queue_.emplace_back(std::move(packet));
426     }
427 
428     cv_.notify_one();
429     return true;
430 }
431 
FdConnection(unique_fd fd)432 FdConnection::FdConnection(unique_fd fd) : fd_(std::move(fd)) {}
433 
~FdConnection()434 FdConnection::~FdConnection() {}
435 
DispatchRead(void * buf,size_t len)436 bool FdConnection::DispatchRead(void* buf, size_t len) {
437     if (tls_ != nullptr) {
438         // The TlsConnection doesn't allow 0 byte reads
439         if (len == 0) {
440             return true;
441         }
442         return tls_->ReadFully(buf, len);
443     }
444 
445     return ReadFdExactly(fd_.get(), buf, len);
446 }
447 
DispatchWrite(void * buf,size_t len)448 bool FdConnection::DispatchWrite(void* buf, size_t len) {
449     if (tls_ != nullptr) {
450         // The TlsConnection doesn't allow 0 byte writes
451         if (len == 0) {
452             return true;
453         }
454         return tls_->WriteFully(std::string_view(reinterpret_cast<const char*>(buf), len));
455     }
456 
457     return WriteFdExactly(fd_.get(), buf, len);
458 }
459 
Read(apacket * packet)460 bool FdConnection::Read(apacket* packet) {
461     if (!DispatchRead(&packet->msg, sizeof(amessage))) {
462         D("remote local: read terminated (message)");
463         return false;
464     }
465 
466     if (packet->msg.data_length > MAX_PAYLOAD) {
467         D("remote local: read overflow (data length = %" PRIu32 ")", packet->msg.data_length);
468         return false;
469     }
470 
471     packet->payload.resize(packet->msg.data_length);
472 
473     if (!DispatchRead(&packet->payload[0], packet->payload.size())) {
474         D("remote local: terminated (data)");
475         return false;
476     }
477 
478     return true;
479 }
480 
Write(apacket * packet)481 bool FdConnection::Write(apacket* packet) {
482     if (!DispatchWrite(&packet->msg, sizeof(packet->msg))) {
483         D("remote local: write terminated");
484         return false;
485     }
486 
487     if (packet->msg.data_length) {
488         if (!DispatchWrite(&packet->payload[0], packet->msg.data_length)) {
489             D("remote local: write terminated");
490             return false;
491         }
492     }
493 
494     return true;
495 }
496 
DoTlsHandshake(RSA * key,std::string * auth_key)497 bool FdConnection::DoTlsHandshake(RSA* key, std::string* auth_key) {
498     bssl::UniquePtr<EVP_PKEY> evp_pkey(EVP_PKEY_new());
499     if (!EVP_PKEY_set1_RSA(evp_pkey.get(), key)) {
500         LOG(ERROR) << "EVP_PKEY_set1_RSA failed";
501         return false;
502     }
503     auto x509 = GenerateX509Certificate(evp_pkey.get());
504     auto x509_str = X509ToPEMString(x509.get());
505     auto evp_str = Key::ToPEMString(evp_pkey.get());
506 
507     int osh = cast_handle_to_int(adb_get_os_handle(fd_));
508 #if ADB_HOST
509     tls_ = TlsConnection::Create(TlsConnection::Role::Client, x509_str, evp_str, osh);
510 #else
511     tls_ = TlsConnection::Create(TlsConnection::Role::Server, x509_str, evp_str, osh);
512 #endif
513     CHECK(tls_);
514 #if ADB_HOST
515     // TLS 1.3 gives the client no message if the server rejected the
516     // certificate. This will enable a check in the tls connection to check
517     // whether the client certificate got rejected. Note that this assumes
518     // that, on handshake success, the server speaks first.
519     tls_->EnableClientPostHandshakeCheck(true);
520     // Add callback to set the certificate when server issues the
521     // CertificateRequest.
522     tls_->SetCertificateCallback(adb_tls_set_certificate);
523     // Allow any server certificate
524     tls_->SetCertVerifyCallback([](X509_STORE_CTX*) { return 1; });
525 #else
526     // Add callback to check certificate against a list of known public keys
527     tls_->SetCertVerifyCallback(
528             [auth_key](X509_STORE_CTX* ctx) { return adbd_tls_verify_cert(ctx, auth_key); });
529     // Add the list of allowed client CA issuers
530     auto ca_list = adbd_tls_client_ca_list();
531     tls_->SetClientCAList(ca_list.get());
532 #endif
533 
534     auto err = tls_->DoHandshake();
535     if (err == TlsError::Success) {
536         return true;
537     }
538 
539     tls_.reset();
540     return false;
541 }
542 
Close()543 void FdConnection::Close() {
544     adb_shutdown(fd_.get());
545     fd_.reset();
546 }
547 
send_packet(apacket * p,atransport * t)548 void send_packet(apacket* p, atransport* t) {
549     p->msg.magic = p->msg.command ^ 0xffffffff;
550     // compute a checksum for connection/auth packets for compatibility reasons
551     if (t->get_protocol_version() >= A_VERSION_SKIP_CHECKSUM) {
552         p->msg.data_check = 0;
553     } else {
554         p->msg.data_check = calculate_apacket_checksum(p);
555     }
556 
557     VLOG(TRANSPORT) << dump_packet(t->serial.c_str(), "to remote", p);
558 
559     if (t == nullptr) {
560         LOG(FATAL) << "Transport is null";
561     }
562 
563     if (t->Write(p) != 0) {
564         D("%s: failed to enqueue packet, closing transport", t->serial.c_str());
565         t->Kick();
566     }
567 }
568 
kick_transport(atransport * t,bool reset)569 void kick_transport(atransport* t, bool reset) {
570     std::lock_guard<std::recursive_mutex> lock(transport_lock);
571     // As kick_transport() can be called from threads without guarantee that t is valid,
572     // check if the transport is in transport_list first.
573     //
574     // TODO(jmgao): WTF? Is this actually true?
575     if (std::find(transport_list.begin(), transport_list.end(), t) != transport_list.end()) {
576         if (reset) {
577             t->Reset();
578         } else {
579             t->Kick();
580         }
581     }
582 
583 #if ADB_HOST
584     reconnect_handler.CheckForKicked();
585 #endif
586 }
587 
588 static int transport_registration_send = -1;
589 static int transport_registration_recv = -1;
590 static fdevent* transport_registration_fde;
591 
592 #if ADB_HOST
593 
594 /* this adds support required by the 'track-devices' service.
595  * this is used to send the content of "list_transport" to any
596  * number of client connections that want it through a single
597  * live TCP connection
598  */
599 struct device_tracker {
600     asocket socket;
601     bool update_needed = false;
602     bool long_output = false;
603     device_tracker* next = nullptr;
604 };
605 
606 /* linked list of all device trackers */
607 static device_tracker* device_tracker_list;
608 
device_tracker_remove(device_tracker * tracker)609 static void device_tracker_remove(device_tracker* tracker) {
610     device_tracker** pnode = &device_tracker_list;
611     device_tracker* node = *pnode;
612 
613     std::lock_guard<std::recursive_mutex> lock(transport_lock);
614     while (node) {
615         if (node == tracker) {
616             *pnode = node->next;
617             break;
618         }
619         pnode = &node->next;
620         node = *pnode;
621     }
622 }
623 
device_tracker_close(asocket * socket)624 static void device_tracker_close(asocket* socket) {
625     device_tracker* tracker = (device_tracker*)socket;
626     asocket* peer = socket->peer;
627 
628     D("device tracker %p removed", tracker);
629     if (peer) {
630         peer->peer = nullptr;
631         peer->close(peer);
632     }
633     device_tracker_remove(tracker);
634     delete tracker;
635 }
636 
device_tracker_enqueue(asocket * socket,apacket::payload_type)637 static int device_tracker_enqueue(asocket* socket, apacket::payload_type) {
638     /* you can't read from a device tracker, close immediately */
639     device_tracker_close(socket);
640     return -1;
641 }
642 
device_tracker_send(device_tracker * tracker,const std::string & string)643 static int device_tracker_send(device_tracker* tracker, const std::string& string) {
644     asocket* peer = tracker->socket.peer;
645 
646     apacket::payload_type data;
647     data.resize(4 + string.size());
648     char buf[5];
649     snprintf(buf, sizeof(buf), "%04x", static_cast<int>(string.size()));
650     memcpy(&data[0], buf, 4);
651     memcpy(&data[4], string.data(), string.size());
652     return peer->enqueue(peer, std::move(data));
653 }
654 
device_tracker_ready(asocket * socket)655 static void device_tracker_ready(asocket* socket) {
656     device_tracker* tracker = reinterpret_cast<device_tracker*>(socket);
657 
658     // We want to send the device list when the tracker connects
659     // for the first time, even if no update occurred.
660     if (tracker->update_needed) {
661         tracker->update_needed = false;
662         device_tracker_send(tracker, list_transports(tracker->long_output));
663     }
664 }
665 
create_device_tracker(bool long_output)666 asocket* create_device_tracker(bool long_output) {
667     device_tracker* tracker = new device_tracker();
668     if (tracker == nullptr) LOG(FATAL) << "cannot allocate device tracker";
669 
670     D("device tracker %p created", tracker);
671 
672     tracker->socket.enqueue = device_tracker_enqueue;
673     tracker->socket.ready = device_tracker_ready;
674     tracker->socket.close = device_tracker_close;
675     tracker->update_needed = true;
676     tracker->long_output = long_output;
677 
678     tracker->next = device_tracker_list;
679     device_tracker_list = tracker;
680 
681     return &tracker->socket;
682 }
683 
684 // Check if all of the USB transports are connected.
iterate_transports(std::function<bool (const atransport *)> fn)685 bool iterate_transports(std::function<bool(const atransport*)> fn) {
686     std::lock_guard<std::recursive_mutex> lock(transport_lock);
687     for (const auto& t : transport_list) {
688         if (!fn(t)) {
689             return false;
690         }
691     }
692     for (const auto& t : pending_list) {
693         if (!fn(t)) {
694             return false;
695         }
696     }
697     return true;
698 }
699 
700 // Call this function each time the transport list has changed.
update_transports()701 void update_transports() {
702     update_transport_status();
703 
704     // Notify `adb track-devices` clients.
705     device_tracker* tracker = device_tracker_list;
706     while (tracker != nullptr) {
707         device_tracker* next = tracker->next;
708         // This may destroy the tracker if the connection is closed.
709         device_tracker_send(tracker, list_transports(tracker->long_output));
710         tracker = next;
711     }
712 }
713 
714 #else
715 
update_transports()716 void update_transports() {
717     // Nothing to do on the device side.
718 }
719 
720 #endif  // ADB_HOST
721 
722 struct tmsg {
723     atransport* transport;
724     int action;
725 };
726 
transport_read_action(int fd,struct tmsg * m)727 static int transport_read_action(int fd, struct tmsg* m) {
728     char* p = (char*)m;
729     int len = sizeof(*m);
730     int r;
731 
732     while (len > 0) {
733         r = adb_read(fd, p, len);
734         if (r > 0) {
735             len -= r;
736             p += r;
737         } else {
738             D("transport_read_action: on fd %d: %s", fd, strerror(errno));
739             return -1;
740         }
741     }
742     return 0;
743 }
744 
transport_write_action(int fd,struct tmsg * m)745 static int transport_write_action(int fd, struct tmsg* m) {
746     char* p = (char*)m;
747     int len = sizeof(*m);
748     int r;
749 
750     while (len > 0) {
751         r = adb_write(fd, p, len);
752         if (r > 0) {
753             len -= r;
754             p += r;
755         } else {
756             D("transport_write_action: on fd %d: %s", fd, strerror(errno));
757             return -1;
758         }
759     }
760     return 0;
761 }
762 
transport_registration_func(int _fd,unsigned ev,void *)763 static void transport_registration_func(int _fd, unsigned ev, void*) {
764     tmsg m;
765     atransport* t;
766 
767     if (!(ev & FDE_READ)) {
768         return;
769     }
770 
771     if (transport_read_action(_fd, &m)) {
772         PLOG(FATAL) << "cannot read transport registration socket";
773     }
774 
775     t = m.transport;
776 
777     if (m.action == 0) {
778         D("transport: %s deleting", t->serial.c_str());
779 
780         {
781             std::lock_guard<std::recursive_mutex> lock(transport_lock);
782             transport_list.remove(t);
783         }
784 
785         delete t;
786 
787         update_transports();
788         return;
789     }
790 
791     /* don't create transport threads for inaccessible devices */
792     if (t->GetConnectionState() != kCsNoPerm) {
793         // The connection gets a reference to the atransport. It will release it
794         // upon a read/write error.
795         t->connection()->SetTransportName(t->serial_name());
796         t->connection()->SetReadCallback([t](Connection*, std::unique_ptr<apacket> p) {
797             if (!check_header(p.get(), t)) {
798                 D("%s: remote read: bad header", t->serial.c_str());
799                 return false;
800             }
801 
802             VLOG(TRANSPORT) << dump_packet(t->serial.c_str(), "from remote", p.get());
803             apacket* packet = p.release();
804 
805             // TODO: Does this need to run on the main thread?
806             fdevent_run_on_main_thread([packet, t]() { handle_packet(packet, t); });
807             return true;
808         });
809         t->connection()->SetErrorCallback([t](Connection*, const std::string& error) {
810             LOG(INFO) << t->serial_name() << ": connection terminated: " << error;
811             fdevent_run_on_main_thread([t]() {
812                 handle_offline(t);
813                 transport_destroy(t);
814             });
815         });
816 
817         t->connection()->Start();
818 #if ADB_HOST
819         send_connect(t);
820 #endif
821     }
822 
823     {
824         std::lock_guard<std::recursive_mutex> lock(transport_lock);
825         auto it = std::find(pending_list.begin(), pending_list.end(), t);
826         if (it != pending_list.end()) {
827             pending_list.remove(t);
828             transport_list.push_front(t);
829         }
830     }
831 
832     update_transports();
833 }
834 
835 #if ADB_HOST
init_reconnect_handler(void)836 void init_reconnect_handler(void) {
837     reconnect_handler.Start();
838 }
839 #endif
840 
init_transport_registration(void)841 void init_transport_registration(void) {
842     int s[2];
843 
844     if (adb_socketpair(s)) {
845         PLOG(FATAL) << "cannot open transport registration socketpair";
846     }
847     D("socketpair: (%d,%d)", s[0], s[1]);
848 
849     transport_registration_send = s[0];
850     transport_registration_recv = s[1];
851 
852     transport_registration_fde =
853         fdevent_create(transport_registration_recv, transport_registration_func, nullptr);
854     fdevent_set(transport_registration_fde, FDE_READ);
855 }
856 
kick_all_transports()857 void kick_all_transports() {
858 #if ADB_HOST
859     reconnect_handler.Stop();
860 #endif
861     // To avoid only writing part of a packet to a transport after exit, kick all transports.
862     std::lock_guard<std::recursive_mutex> lock(transport_lock);
863     for (auto t : transport_list) {
864         t->Kick();
865     }
866 }
867 
kick_all_tcp_tls_transports()868 void kick_all_tcp_tls_transports() {
869     std::lock_guard<std::recursive_mutex> lock(transport_lock);
870     for (auto t : transport_list) {
871         if (t->IsTcpDevice() && t->use_tls) {
872             t->Kick();
873         }
874     }
875 }
876 
877 #if !ADB_HOST
kick_all_transports_by_auth_key(std::string_view auth_key)878 void kick_all_transports_by_auth_key(std::string_view auth_key) {
879     std::lock_guard<std::recursive_mutex> lock(transport_lock);
880     for (auto t : transport_list) {
881         if (auth_key == t->auth_key) {
882             t->Kick();
883         }
884     }
885 }
886 #endif
887 
888 /* the fdevent select pump is single threaded */
register_transport(atransport * transport)889 void register_transport(atransport* transport) {
890     tmsg m;
891     m.transport = transport;
892     m.action = 1;
893     D("transport: %s registered", transport->serial.c_str());
894     if (transport_write_action(transport_registration_send, &m)) {
895         PLOG(FATAL) << "cannot write transport registration socket";
896     }
897 }
898 
remove_transport(atransport * transport)899 static void remove_transport(atransport* transport) {
900     tmsg m;
901     m.transport = transport;
902     m.action = 0;
903     D("transport: %s removed", transport->serial.c_str());
904     if (transport_write_action(transport_registration_send, &m)) {
905         PLOG(FATAL) << "cannot write transport registration socket";
906     }
907 }
908 
transport_destroy(atransport * t)909 static void transport_destroy(atransport* t) {
910     check_main_thread();
911     CHECK(t != nullptr);
912 
913     std::lock_guard<std::recursive_mutex> lock(transport_lock);
914     LOG(INFO) << "destroying transport " << t->serial_name();
915     t->connection()->Stop();
916 #if ADB_HOST
917     if (t->IsTcpDevice() && !t->kicked()) {
918         D("transport: %s destroy (attempting reconnection)", t->serial.c_str());
919 
920         // We need to clear the transport's keys, so that on the next connection, it tries
921         // again from the beginning.
922         t->ResetKeys();
923         reconnect_handler.TrackTransport(t);
924         return;
925     }
926 #endif
927 
928     D("transport: %s destroy (kicking and closing)", t->serial.c_str());
929     remove_transport(t);
930 }
931 
932 #if ADB_HOST
qual_match(const std::string & to_test,const char * prefix,const std::string & qual,bool sanitize_qual)933 static int qual_match(const std::string& to_test, const char* prefix, const std::string& qual,
934                       bool sanitize_qual) {
935     if (to_test.empty()) /* Return true if both the qual and to_test are empty strings. */
936         return qual.empty();
937 
938     if (qual.empty()) return 0;
939 
940     const char* ptr = to_test.c_str();
941     if (prefix) {
942         while (*prefix) {
943             if (*prefix++ != *ptr++) return 0;
944         }
945     }
946 
947     for (char ch : qual) {
948         if (sanitize_qual && !isalnum(ch)) ch = '_';
949         if (ch != *ptr++) return 0;
950     }
951 
952     /* Everything matched so far.  Return true if *ptr is a NUL. */
953     return !*ptr;
954 }
955 
acquire_one_transport(TransportType type,const char * serial,TransportId transport_id,bool * is_ambiguous,std::string * error_out,bool accept_any_state)956 atransport* acquire_one_transport(TransportType type, const char* serial, TransportId transport_id,
957                                   bool* is_ambiguous, std::string* error_out,
958                                   bool accept_any_state) {
959     atransport* result = nullptr;
960 
961     if (transport_id != 0) {
962         *error_out =
963             android::base::StringPrintf("no device with transport id '%" PRIu64 "'", transport_id);
964     } else if (serial) {
965         *error_out = android::base::StringPrintf("device '%s' not found", serial);
966     } else if (type == kTransportLocal) {
967         *error_out = "no emulators found";
968     } else if (type == kTransportAny) {
969         *error_out = "no devices/emulators found";
970     } else {
971         *error_out = "no devices found";
972     }
973 
974     std::unique_lock<std::recursive_mutex> lock(transport_lock);
975     for (const auto& t : transport_list) {
976         if (t->GetConnectionState() == kCsNoPerm) {
977             *error_out = UsbNoPermissionsLongHelpText();
978             continue;
979         }
980 
981         if (transport_id) {
982             if (t->id == transport_id) {
983                 result = t;
984                 break;
985             }
986         } else if (serial) {
987             if (t->MatchesTarget(serial)) {
988                 if (result) {
989                     *error_out = "more than one device";
990                     if (is_ambiguous) *is_ambiguous = true;
991                     result = nullptr;
992                     break;
993                 }
994                 result = t;
995             }
996         } else {
997             if (type == kTransportUsb && t->type == kTransportUsb) {
998                 if (result) {
999                     *error_out = "more than one device";
1000                     if (is_ambiguous) *is_ambiguous = true;
1001                     result = nullptr;
1002                     break;
1003                 }
1004                 result = t;
1005             } else if (type == kTransportLocal && t->type == kTransportLocal) {
1006                 if (result) {
1007                     *error_out = "more than one emulator";
1008                     if (is_ambiguous) *is_ambiguous = true;
1009                     result = nullptr;
1010                     break;
1011                 }
1012                 result = t;
1013             } else if (type == kTransportAny) {
1014                 if (result) {
1015                     *error_out = "more than one device/emulator";
1016                     if (is_ambiguous) *is_ambiguous = true;
1017                     result = nullptr;
1018                     break;
1019                 }
1020                 result = t;
1021             }
1022         }
1023     }
1024     lock.unlock();
1025 
1026     if (result && !accept_any_state) {
1027         // The caller requires an active transport.
1028         // Make sure that we're actually connected.
1029         ConnectionState state = result->GetConnectionState();
1030         switch (state) {
1031             case kCsConnecting:
1032                 *error_out = "device still connecting";
1033                 result = nullptr;
1034                 break;
1035 
1036             case kCsAuthorizing:
1037                 *error_out = "device still authorizing";
1038                 result = nullptr;
1039                 break;
1040 
1041             case kCsUnauthorized: {
1042                 *error_out = "device unauthorized.\n";
1043                 char* ADB_VENDOR_KEYS = getenv("ADB_VENDOR_KEYS");
1044                 *error_out += "This adb server's $ADB_VENDOR_KEYS is ";
1045                 *error_out += ADB_VENDOR_KEYS ? ADB_VENDOR_KEYS : "not set";
1046                 *error_out += "\n";
1047                 *error_out += "Try 'adb kill-server' if that seems wrong.\n";
1048                 *error_out += "Otherwise check for a confirmation dialog on your device.";
1049                 result = nullptr;
1050                 break;
1051             }
1052 
1053             case kCsOffline:
1054                 *error_out = "device offline";
1055                 result = nullptr;
1056                 break;
1057 
1058             default:
1059                 break;
1060         }
1061     }
1062 
1063     if (result) {
1064         *error_out = "success";
1065     }
1066 
1067     return result;
1068 }
1069 
WaitForConnection(std::chrono::milliseconds timeout)1070 bool ConnectionWaitable::WaitForConnection(std::chrono::milliseconds timeout) {
1071     std::unique_lock<std::mutex> lock(mutex_);
1072     ScopedLockAssertion assume_locked(mutex_);
1073     return cv_.wait_for(lock, timeout, [&]() REQUIRES(mutex_) {
1074         return connection_established_ready_;
1075     }) && connection_established_;
1076 }
1077 
SetConnectionEstablished(bool success)1078 void ConnectionWaitable::SetConnectionEstablished(bool success) {
1079     {
1080         std::lock_guard<std::mutex> lock(mutex_);
1081         if (connection_established_ready_) return;
1082         connection_established_ready_ = true;
1083         connection_established_ = success;
1084         D("connection established with %d", success);
1085     }
1086     cv_.notify_one();
1087 }
1088 #endif
1089 
~atransport()1090 atransport::~atransport() {
1091 #if ADB_HOST
1092     // If the connection callback had not been run before, run it now.
1093     SetConnectionEstablished(false);
1094 #endif
1095 }
1096 
Write(apacket * p)1097 int atransport::Write(apacket* p) {
1098     return this->connection()->Write(std::unique_ptr<apacket>(p)) ? 0 : -1;
1099 }
1100 
Reset()1101 void atransport::Reset() {
1102     if (!kicked_.exchange(true)) {
1103         LOG(INFO) << "resetting transport " << this << " " << this->serial;
1104         this->connection()->Reset();
1105     }
1106 }
1107 
Kick()1108 void atransport::Kick() {
1109     if (!kicked_.exchange(true)) {
1110         LOG(INFO) << "kicking transport " << this << " " << this->serial;
1111         this->connection()->Stop();
1112     }
1113 }
1114 
GetConnectionState() const1115 ConnectionState atransport::GetConnectionState() const {
1116     return connection_state_;
1117 }
1118 
SetConnectionState(ConnectionState state)1119 void atransport::SetConnectionState(ConnectionState state) {
1120     check_main_thread();
1121     connection_state_ = state;
1122 }
1123 
SetConnection(std::unique_ptr<Connection> connection)1124 void atransport::SetConnection(std::unique_ptr<Connection> connection) {
1125     std::lock_guard<std::mutex> lock(mutex_);
1126     connection_ = std::shared_ptr<Connection>(std::move(connection));
1127 }
1128 
connection_state_name() const1129 std::string atransport::connection_state_name() const {
1130     ConnectionState state = GetConnectionState();
1131     switch (state) {
1132         case kCsOffline:
1133             return "offline";
1134         case kCsBootloader:
1135             return "bootloader";
1136         case kCsDevice:
1137             return "device";
1138         case kCsHost:
1139             return "host";
1140         case kCsRecovery:
1141             return "recovery";
1142         case kCsRescue:
1143             return "rescue";
1144         case kCsNoPerm:
1145             return UsbNoPermissionsShortHelpText();
1146         case kCsSideload:
1147             return "sideload";
1148         case kCsUnauthorized:
1149             return "unauthorized";
1150         case kCsAuthorizing:
1151             return "authorizing";
1152         case kCsConnecting:
1153             return "connecting";
1154         default:
1155             return "unknown";
1156     }
1157 }
1158 
update_version(int version,size_t payload)1159 void atransport::update_version(int version, size_t payload) {
1160     protocol_version = std::min(version, A_VERSION);
1161     max_payload = std::min(payload, MAX_PAYLOAD);
1162 }
1163 
get_protocol_version() const1164 int atransport::get_protocol_version() const {
1165     return protocol_version;
1166 }
1167 
get_tls_version() const1168 int atransport::get_tls_version() const {
1169     return tls_version;
1170 }
1171 
get_max_payload() const1172 size_t atransport::get_max_payload() const {
1173     return max_payload;
1174 }
1175 
supported_features()1176 const FeatureSet& supported_features() {
1177     static const android::base::NoDestructor<FeatureSet> features([] {
1178         return FeatureSet{
1179                 kFeatureShell2,
1180                 kFeatureCmd,
1181                 kFeatureStat2,
1182                 kFeatureLs2,
1183                 kFeatureFixedPushMkdir,
1184                 kFeatureApex,
1185                 kFeatureAbb,
1186                 kFeatureFixedPushSymlinkTimestamp,
1187                 kFeatureAbbExec,
1188                 kFeatureRemountShell,
1189                 kFeatureTrackApp,
1190                 kFeatureSendRecv2,
1191                 kFeatureSendRecv2Brotli,
1192                 kFeatureSendRecv2LZ4,
1193                 kFeatureSendRecv2Zstd,
1194                 kFeatureSendRecv2DryRunSend,
1195                 // Increment ADB_SERVER_VERSION when adding a feature that adbd needs
1196                 // to know about. Otherwise, the client can be stuck running an old
1197                 // version of the server even after upgrading their copy of adb.
1198                 // (http://b/24370690)
1199         };
1200     }());
1201 
1202     return *features;
1203 }
1204 
FeatureSetToString(const FeatureSet & features)1205 std::string FeatureSetToString(const FeatureSet& features) {
1206     return android::base::Join(features, ',');
1207 }
1208 
StringToFeatureSet(const std::string & features_string)1209 FeatureSet StringToFeatureSet(const std::string& features_string) {
1210     if (features_string.empty()) {
1211         return FeatureSet();
1212     }
1213 
1214     return android::base::Split(features_string, ",");
1215 }
1216 
1217 template <class Range, class Value>
contains(const Range & r,const Value & v)1218 static bool contains(const Range& r, const Value& v) {
1219     return std::find(std::begin(r), std::end(r), v) != std::end(r);
1220 }
1221 
CanUseFeature(const FeatureSet & feature_set,const std::string & feature)1222 bool CanUseFeature(const FeatureSet& feature_set, const std::string& feature) {
1223     return contains(feature_set, feature) && contains(supported_features(), feature);
1224 }
1225 
has_feature(const std::string & feature) const1226 bool atransport::has_feature(const std::string& feature) const {
1227     return contains(features_, feature);
1228 }
1229 
SetFeatures(const std::string & features_string)1230 void atransport::SetFeatures(const std::string& features_string) {
1231     features_ = StringToFeatureSet(features_string);
1232 }
1233 
AddDisconnect(adisconnect * disconnect)1234 void atransport::AddDisconnect(adisconnect* disconnect) {
1235     disconnects_.push_back(disconnect);
1236 }
1237 
RemoveDisconnect(adisconnect * disconnect)1238 void atransport::RemoveDisconnect(adisconnect* disconnect) {
1239     disconnects_.remove(disconnect);
1240 }
1241 
RunDisconnects()1242 void atransport::RunDisconnects() {
1243     for (const auto& disconnect : disconnects_) {
1244         disconnect->func(disconnect->opaque, this);
1245     }
1246     disconnects_.clear();
1247 }
1248 
1249 #if ADB_HOST
MatchesTarget(const std::string & target) const1250 bool atransport::MatchesTarget(const std::string& target) const {
1251     if (!serial.empty()) {
1252         if (target == serial) {
1253             return true;
1254         } else if (type == kTransportLocal) {
1255             // Local transports can match [tcp:|udp:]<hostname>[:port].
1256             const char* local_target_ptr = target.c_str();
1257 
1258             // For fastboot compatibility, ignore protocol prefixes.
1259             if (android::base::StartsWith(target, "tcp:") ||
1260                 android::base::StartsWith(target, "udp:")) {
1261                 local_target_ptr += 4;
1262             }
1263 
1264             // Parse our |serial| and the given |target| to check if the hostnames and ports match.
1265             std::string serial_host, error;
1266             int serial_port = -1;
1267             if (android::base::ParseNetAddress(serial, &serial_host, &serial_port, nullptr, &error)) {
1268                 // |target| may omit the port to default to ours.
1269                 std::string target_host;
1270                 int target_port = serial_port;
1271                 if (android::base::ParseNetAddress(local_target_ptr, &target_host, &target_port,
1272                                                    nullptr, &error) &&
1273                     serial_host == target_host && serial_port == target_port) {
1274                     return true;
1275                 }
1276             }
1277         }
1278     }
1279 
1280     return (target == devpath) || qual_match(target, "product:", product, false) ||
1281            qual_match(target, "model:", model, true) ||
1282            qual_match(target, "device:", device, false);
1283 }
1284 
SetConnectionEstablished(bool success)1285 void atransport::SetConnectionEstablished(bool success) {
1286     connection_waitable_->SetConnectionEstablished(success);
1287 }
1288 
Reconnect()1289 ReconnectResult atransport::Reconnect() {
1290     return reconnect_(this);
1291 }
1292 
1293 // We use newline as our delimiter, make sure to never output it.
sanitize(std::string str,bool alphanumeric)1294 static std::string sanitize(std::string str, bool alphanumeric) {
1295     auto pred = alphanumeric ? [](const char c) { return !isalnum(c); }
1296                              : [](const char c) { return c == '\n'; };
1297     std::replace_if(str.begin(), str.end(), pred, '_');
1298     return str;
1299 }
1300 
append_transport_info(std::string * result,const char * key,const std::string & value,bool alphanumeric)1301 static void append_transport_info(std::string* result, const char* key, const std::string& value,
1302                                   bool alphanumeric) {
1303     if (value.empty()) {
1304         return;
1305     }
1306 
1307     *result += ' ';
1308     *result += key;
1309     *result += sanitize(value, alphanumeric);
1310 }
1311 
append_transport(const atransport * t,std::string * result,bool long_listing)1312 static void append_transport(const atransport* t, std::string* result, bool long_listing) {
1313     std::string serial = t->serial;
1314     if (serial.empty()) {
1315         serial = "(no serial number)";
1316     }
1317 
1318     if (!long_listing) {
1319         *result += serial;
1320         *result += '\t';
1321         *result += t->connection_state_name();
1322     } else {
1323         android::base::StringAppendF(result, "%-22s %s", serial.c_str(),
1324                                      t->connection_state_name().c_str());
1325 
1326         append_transport_info(result, "", t->devpath, false);
1327         append_transport_info(result, "product:", t->product, false);
1328         append_transport_info(result, "model:", t->model, true);
1329         append_transport_info(result, "device:", t->device, false);
1330 
1331         // Put id at the end, so that anyone parsing the output here can always find it by scanning
1332         // backwards from newlines, even with hypothetical devices named 'transport_id:1'.
1333         *result += " transport_id:";
1334         *result += std::to_string(t->id);
1335     }
1336     *result += '\n';
1337 }
1338 
list_transports(bool long_listing)1339 std::string list_transports(bool long_listing) {
1340     std::lock_guard<std::recursive_mutex> lock(transport_lock);
1341 
1342     auto sorted_transport_list = transport_list;
1343     sorted_transport_list.sort([](atransport*& x, atransport*& y) {
1344         if (x->type != y->type) {
1345             return x->type < y->type;
1346         }
1347         return x->serial < y->serial;
1348     });
1349 
1350     std::string result;
1351     for (const auto& t : sorted_transport_list) {
1352         append_transport(t, &result, long_listing);
1353     }
1354     return result;
1355 }
1356 
close_usb_devices(std::function<bool (const atransport *)> predicate,bool reset)1357 void close_usb_devices(std::function<bool(const atransport*)> predicate, bool reset) {
1358     std::lock_guard<std::recursive_mutex> lock(transport_lock);
1359     for (auto& t : transport_list) {
1360         if (predicate(t)) {
1361             if (reset) {
1362                 t->Reset();
1363             } else {
1364                 t->Kick();
1365             }
1366         }
1367     }
1368 }
1369 
1370 /* hack for osx */
close_usb_devices(bool reset)1371 void close_usb_devices(bool reset) {
1372     close_usb_devices([](const atransport*) { return true; }, reset);
1373 }
1374 #endif
1375 
register_socket_transport(unique_fd s,std::string serial,int port,int local,atransport::ReconnectCallback reconnect,bool use_tls,int * error)1376 bool register_socket_transport(unique_fd s, std::string serial, int port, int local,
1377                                atransport::ReconnectCallback reconnect, bool use_tls, int* error) {
1378     atransport* t = new atransport(std::move(reconnect), kCsOffline);
1379     t->use_tls = use_tls;
1380 
1381     D("transport: %s init'ing for socket %d, on port %d", serial.c_str(), s.get(), port);
1382     if (init_socket_transport(t, std::move(s), port, local) < 0) {
1383         delete t;
1384         if (error) *error = errno;
1385         return false;
1386     }
1387 
1388     std::unique_lock<std::recursive_mutex> lock(transport_lock);
1389     for (const auto& transport : pending_list) {
1390         if (serial == transport->serial) {
1391             VLOG(TRANSPORT) << "socket transport " << transport->serial
1392                             << " is already in pending_list and fails to register";
1393             delete t;
1394             if (error) *error = EALREADY;
1395             return false;
1396         }
1397     }
1398 
1399     for (const auto& transport : transport_list) {
1400         if (serial == transport->serial) {
1401             VLOG(TRANSPORT) << "socket transport " << transport->serial
1402                             << " is already in transport_list and fails to register";
1403             delete t;
1404             if (error) *error = EALREADY;
1405             return false;
1406         }
1407     }
1408 
1409     t->serial = std::move(serial);
1410     pending_list.push_front(t);
1411 
1412     lock.unlock();
1413 
1414 #if ADB_HOST
1415     auto waitable = t->connection_waitable();
1416 #endif
1417     register_transport(t);
1418 
1419     if (local == 1) {
1420         // Do not wait for emulator transports.
1421         return true;
1422     }
1423 
1424 #if ADB_HOST
1425     if (!waitable->WaitForConnection(std::chrono::seconds(10))) {
1426         if (error) *error = ETIMEDOUT;
1427         return false;
1428     }
1429 
1430     if (t->GetConnectionState() == kCsUnauthorized) {
1431         if (error) *error = EPERM;
1432         return false;
1433     }
1434 #endif
1435 
1436     return true;
1437 }
1438 
1439 #if ADB_HOST
find_transport(const char * serial)1440 atransport* find_transport(const char* serial) {
1441     atransport* result = nullptr;
1442 
1443     std::lock_guard<std::recursive_mutex> lock(transport_lock);
1444     for (auto& t : transport_list) {
1445         if (strcmp(serial, t->serial.c_str()) == 0) {
1446             result = t;
1447             break;
1448         }
1449     }
1450 
1451     return result;
1452 }
1453 
kick_all_tcp_devices()1454 void kick_all_tcp_devices() {
1455     std::lock_guard<std::recursive_mutex> lock(transport_lock);
1456     for (auto& t : transport_list) {
1457         if (t->IsTcpDevice()) {
1458             // Kicking breaks the read_transport thread of this transport out of any read, then
1459             // the read_transport thread will notify the main thread to make this transport
1460             // offline. Then the main thread will notify the write_transport thread to exit.
1461             // Finally, this transport will be closed and freed in the main thread.
1462             t->Kick();
1463         }
1464     }
1465     reconnect_handler.CheckForKicked();
1466 }
1467 
register_usb_transport(usb_handle * usb,const char * serial,const char * devpath,unsigned writeable)1468 void register_usb_transport(usb_handle* usb, const char* serial, const char* devpath,
1469                             unsigned writeable) {
1470     atransport* t = new atransport(writeable ? kCsOffline : kCsNoPerm);
1471 
1472     D("transport: %p init'ing for usb_handle %p (sn='%s')", t, usb, serial ? serial : "");
1473     init_usb_transport(t, usb);
1474     if (serial) {
1475         t->serial = serial;
1476     }
1477 
1478     if (devpath) {
1479         t->devpath = devpath;
1480     }
1481 
1482     {
1483         std::lock_guard<std::recursive_mutex> lock(transport_lock);
1484         pending_list.push_front(t);
1485     }
1486 
1487     register_transport(t);
1488 }
1489 
1490 // This should only be used for transports with connection_state == kCsNoPerm.
unregister_usb_transport(usb_handle * usb)1491 void unregister_usb_transport(usb_handle* usb) {
1492     std::lock_guard<std::recursive_mutex> lock(transport_lock);
1493     transport_list.remove_if([usb](atransport* t) {
1494         return t->GetUsbHandle() == usb && t->GetConnectionState() == kCsNoPerm;
1495     });
1496 }
1497 #endif
1498 
check_header(apacket * p,atransport * t)1499 bool check_header(apacket* p, atransport* t) {
1500     if (p->msg.magic != (p->msg.command ^ 0xffffffff)) {
1501         VLOG(RWX) << "check_header(): invalid magic command = " << std::hex << p->msg.command
1502                   << ", magic = " << p->msg.magic;
1503         return false;
1504     }
1505 
1506     if (p->msg.data_length > t->get_max_payload()) {
1507         VLOG(RWX) << "check_header(): " << p->msg.data_length
1508                   << " atransport::max_payload = " << t->get_max_payload();
1509         return false;
1510     }
1511 
1512     return true;
1513 }
1514 
1515 #if ADB_HOST
Key()1516 std::shared_ptr<RSA> atransport::Key() {
1517     if (keys_.empty()) {
1518         return nullptr;
1519     }
1520 
1521     std::shared_ptr<RSA> result = keys_[0];
1522     return result;
1523 }
1524 
NextKey()1525 std::shared_ptr<RSA> atransport::NextKey() {
1526     if (keys_.empty()) {
1527         LOG(INFO) << "fetching keys for transport " << this->serial_name();
1528         keys_ = adb_auth_get_private_keys();
1529 
1530         // We should have gotten at least one key: the one that's automatically generated.
1531         CHECK(!keys_.empty());
1532     } else {
1533         keys_.pop_front();
1534     }
1535 
1536     return Key();
1537 }
1538 
ResetKeys()1539 void atransport::ResetKeys() {
1540     keys_.clear();
1541 }
1542 #endif
1543