1 /*
2 * Copyright (C) 2007 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #define TRACE_TAG TRANSPORT
18
19 #include "sysdeps.h"
20
21 #include "transport.h"
22
23 #include <ctype.h>
24 #include <errno.h>
25 #include <inttypes.h>
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <string.h>
29 #include <unistd.h>
30
31 #include <algorithm>
32 #include <list>
33 #include <memory>
34 #include <mutex>
35 #include <set>
36 #include <thread>
37
38 #include <adb/crypto/rsa_2048_key.h>
39 #include <adb/crypto/x509_generator.h>
40 #include <adb/tls/tls_connection.h>
41 #include <android-base/logging.h>
42 #include <android-base/no_destructor.h>
43 #include <android-base/parsenetaddress.h>
44 #include <android-base/stringprintf.h>
45 #include <android-base/strings.h>
46 #include <android-base/thread_annotations.h>
47
48 #include <diagnose_usb.h>
49
50 #include "adb.h"
51 #include "adb_auth.h"
52 #include "adb_io.h"
53 #include "adb_trace.h"
54 #include "adb_utils.h"
55 #include "fdevent/fdevent.h"
56 #include "sysdeps/chrono.h"
57
58 using namespace adb::crypto;
59 using namespace adb::tls;
60 using android::base::ScopedLockAssertion;
61 using TlsError = TlsConnection::TlsError;
62
63 static void remove_transport(atransport* transport);
64 static void transport_destroy(atransport* transport);
65
66 // TODO: unordered_map<TransportId, atransport*>
67 static auto& transport_list = *new std::list<atransport*>();
68 static auto& pending_list = *new std::list<atransport*>();
69
70 static auto& transport_lock = *new std::recursive_mutex();
71
72 const char* const kFeatureShell2 = "shell_v2";
73 const char* const kFeatureCmd = "cmd";
74 const char* const kFeatureStat2 = "stat_v2";
75 const char* const kFeatureLs2 = "ls_v2";
76 const char* const kFeatureLibusb = "libusb";
77 const char* const kFeaturePushSync = "push_sync";
78 const char* const kFeatureApex = "apex";
79 const char* const kFeatureFixedPushMkdir = "fixed_push_mkdir";
80 const char* const kFeatureAbb = "abb";
81 const char* const kFeatureFixedPushSymlinkTimestamp = "fixed_push_symlink_timestamp";
82 const char* const kFeatureAbbExec = "abb_exec";
83 const char* const kFeatureRemountShell = "remount_shell";
84 const char* const kFeatureTrackApp = "track_app";
85 const char* const kFeatureSendRecv2 = "sendrecv_v2";
86 const char* const kFeatureSendRecv2Brotli = "sendrecv_v2_brotli";
87 const char* const kFeatureSendRecv2LZ4 = "sendrecv_v2_lz4";
88 const char* const kFeatureSendRecv2Zstd = "sendrecv_v2_zstd";
89 const char* const kFeatureSendRecv2DryRunSend = "sendrecv_v2_dry_run_send";
90
91 namespace {
92
93 #if ADB_HOST
94 // Tracks and handles atransport*s that are attempting reconnection.
95 class ReconnectHandler {
96 public:
97 ReconnectHandler() = default;
98 ~ReconnectHandler() = default;
99
100 // Starts the ReconnectHandler thread.
101 void Start();
102
103 // Requests the ReconnectHandler thread to stop.
104 void Stop();
105
106 // Adds the atransport* to the queue of reconnect attempts.
107 void TrackTransport(atransport* transport);
108
109 // Wake up the ReconnectHandler thread to have it check for kicked transports.
110 void CheckForKicked();
111
112 private:
113 // The main thread loop.
114 void Run();
115
116 // Tracks a reconnection attempt.
117 struct ReconnectAttempt {
118 atransport* transport;
119 std::chrono::steady_clock::time_point reconnect_time;
120 size_t attempts_left;
121
operator <__anon294a789f0111::ReconnectHandler::ReconnectAttempt122 bool operator<(const ReconnectAttempt& rhs) const {
123 if (reconnect_time == rhs.reconnect_time) {
124 return reinterpret_cast<uintptr_t>(transport) <
125 reinterpret_cast<uintptr_t>(rhs.transport);
126 }
127 return reconnect_time < rhs.reconnect_time;
128 }
129 };
130
131 // Only retry for up to one minute.
132 static constexpr const std::chrono::seconds kDefaultTimeout = 10s;
133 static constexpr const size_t kMaxAttempts = 6;
134
135 // Protects all members.
136 std::mutex reconnect_mutex_;
137 bool running_ GUARDED_BY(reconnect_mutex_) = true;
138 std::thread handler_thread_;
139 std::condition_variable reconnect_cv_;
140 std::set<ReconnectAttempt> reconnect_queue_ GUARDED_BY(reconnect_mutex_);
141
142 DISALLOW_COPY_AND_ASSIGN(ReconnectHandler);
143 };
144
Start()145 void ReconnectHandler::Start() {
146 check_main_thread();
147 handler_thread_ = std::thread(&ReconnectHandler::Run, this);
148 }
149
Stop()150 void ReconnectHandler::Stop() {
151 check_main_thread();
152 {
153 std::lock_guard<std::mutex> lock(reconnect_mutex_);
154 running_ = false;
155 }
156 reconnect_cv_.notify_one();
157 handler_thread_.join();
158
159 // Drain the queue to free all resources.
160 std::lock_guard<std::mutex> lock(reconnect_mutex_);
161 while (!reconnect_queue_.empty()) {
162 ReconnectAttempt attempt = *reconnect_queue_.begin();
163 reconnect_queue_.erase(reconnect_queue_.begin());
164 remove_transport(attempt.transport);
165 }
166 }
167
TrackTransport(atransport * transport)168 void ReconnectHandler::TrackTransport(atransport* transport) {
169 check_main_thread();
170 {
171 std::lock_guard<std::mutex> lock(reconnect_mutex_);
172 if (!running_) return;
173 // Arbitrary sleep to give adbd time to get ready, if we disconnected because it exited.
174 auto reconnect_time = std::chrono::steady_clock::now() + 250ms;
175 reconnect_queue_.emplace(
176 ReconnectAttempt{transport, reconnect_time, ReconnectHandler::kMaxAttempts});
177 }
178 reconnect_cv_.notify_one();
179 }
180
CheckForKicked()181 void ReconnectHandler::CheckForKicked() {
182 reconnect_cv_.notify_one();
183 }
184
Run()185 void ReconnectHandler::Run() {
186 while (true) {
187 ReconnectAttempt attempt;
188 {
189 std::unique_lock<std::mutex> lock(reconnect_mutex_);
190 ScopedLockAssertion assume_lock(reconnect_mutex_);
191
192 if (!reconnect_queue_.empty()) {
193 // FIXME: libstdc++ (used on Windows) implements condition_variable with
194 // system_clock as its clock, so we're probably hosed if the clock changes,
195 // even if we use steady_clock throughout. This problem goes away once we
196 // switch to libc++.
197 reconnect_cv_.wait_until(lock, reconnect_queue_.begin()->reconnect_time);
198 } else {
199 reconnect_cv_.wait(lock);
200 }
201
202 if (!running_) return;
203
204 // Scan the whole list for kicked transports, so that we immediately handle an explicit
205 // disconnect request.
206 bool kicked = false;
207 for (auto it = reconnect_queue_.begin(); it != reconnect_queue_.end();) {
208 if (it->transport->kicked()) {
209 D("transport %s was kicked. giving up on it.", it->transport->serial.c_str());
210 remove_transport(it->transport);
211 it = reconnect_queue_.erase(it);
212 } else {
213 ++it;
214 }
215 kicked = true;
216 }
217
218 if (reconnect_queue_.empty()) continue;
219
220 // Go back to sleep if we either woke up spuriously, or we were woken up to remove
221 // a kicked transport, and the first transport isn't ready for reconnection yet.
222 auto now = std::chrono::steady_clock::now();
223 if (reconnect_queue_.begin()->reconnect_time > now) {
224 continue;
225 }
226
227 attempt = *reconnect_queue_.begin();
228 reconnect_queue_.erase(reconnect_queue_.begin());
229 }
230 D("attempting to reconnect %s", attempt.transport->serial.c_str());
231
232 switch (attempt.transport->Reconnect()) {
233 case ReconnectResult::Retry: {
234 D("attempting to reconnect %s failed.", attempt.transport->serial.c_str());
235 if (attempt.attempts_left == 0) {
236 D("transport %s exceeded the number of retry attempts. giving up on it.",
237 attempt.transport->serial.c_str());
238 remove_transport(attempt.transport);
239 continue;
240 }
241
242 std::lock_guard<std::mutex> lock(reconnect_mutex_);
243 reconnect_queue_.emplace(ReconnectAttempt{
244 attempt.transport,
245 std::chrono::steady_clock::now() + ReconnectHandler::kDefaultTimeout,
246 attempt.attempts_left - 1});
247 continue;
248 }
249
250 case ReconnectResult::Success:
251 D("reconnection to %s succeeded.", attempt.transport->serial.c_str());
252 register_transport(attempt.transport);
253 continue;
254
255 case ReconnectResult::Abort:
256 D("cancelling reconnection attempt to %s.", attempt.transport->serial.c_str());
257 remove_transport(attempt.transport);
258 continue;
259 }
260 }
261 }
262
263 static auto& reconnect_handler = *new ReconnectHandler();
264
265 #endif
266
267 } // namespace
268
NextTransportId()269 TransportId NextTransportId() {
270 static std::atomic<TransportId> next(1);
271 return next++;
272 }
273
Reset()274 void Connection::Reset() {
275 LOG(INFO) << "Connection::Reset(): stopping";
276 Stop();
277 }
278
BlockingConnectionAdapter(std::unique_ptr<BlockingConnection> connection)279 BlockingConnectionAdapter::BlockingConnectionAdapter(std::unique_ptr<BlockingConnection> connection)
280 : underlying_(std::move(connection)) {}
281
~BlockingConnectionAdapter()282 BlockingConnectionAdapter::~BlockingConnectionAdapter() {
283 LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): destructing";
284 Stop();
285 }
286
Start()287 void BlockingConnectionAdapter::Start() {
288 std::lock_guard<std::mutex> lock(mutex_);
289 if (started_) {
290 LOG(FATAL) << "BlockingConnectionAdapter(" << this->transport_name_
291 << "): started multiple times";
292 }
293
294 StartReadThread();
295
296 write_thread_ = std::thread([this]() {
297 LOG(INFO) << this->transport_name_ << ": write thread spawning";
298 while (true) {
299 std::unique_lock<std::mutex> lock(mutex_);
300 ScopedLockAssertion assume_locked(mutex_);
301 cv_.wait(lock, [this]() REQUIRES(mutex_) {
302 return this->stopped_ || !this->write_queue_.empty();
303 });
304
305 if (this->stopped_) {
306 return;
307 }
308
309 std::unique_ptr<apacket> packet = std::move(this->write_queue_.front());
310 this->write_queue_.pop_front();
311 lock.unlock();
312
313 if (!this->underlying_->Write(packet.get())) {
314 break;
315 }
316 }
317 std::call_once(this->error_flag_, [this]() { this->error_callback_(this, "write failed"); });
318 });
319
320 started_ = true;
321 }
322
StartReadThread()323 void BlockingConnectionAdapter::StartReadThread() {
324 read_thread_ = std::thread([this]() {
325 LOG(INFO) << this->transport_name_ << ": read thread spawning";
326 while (true) {
327 auto packet = std::make_unique<apacket>();
328 if (!underlying_->Read(packet.get())) {
329 PLOG(INFO) << this->transport_name_ << ": read failed";
330 break;
331 }
332
333 bool got_stls_cmd = false;
334 if (packet->msg.command == A_STLS) {
335 got_stls_cmd = true;
336 }
337
338 read_callback_(this, std::move(packet));
339
340 // If we received the STLS packet, we are about to perform the TLS
341 // handshake. So this read thread must stop and resume after the
342 // handshake completes otherwise this will interfere in the process.
343 if (got_stls_cmd) {
344 LOG(INFO) << this->transport_name_
345 << ": Received STLS packet. Stopping read thread.";
346 return;
347 }
348 }
349 std::call_once(this->error_flag_, [this]() { this->error_callback_(this, "read failed"); });
350 });
351 }
352
DoTlsHandshake(RSA * key,std::string * auth_key)353 bool BlockingConnectionAdapter::DoTlsHandshake(RSA* key, std::string* auth_key) {
354 std::lock_guard<std::mutex> lock(mutex_);
355 if (read_thread_.joinable()) {
356 read_thread_.join();
357 }
358 bool success = this->underlying_->DoTlsHandshake(key, auth_key);
359 StartReadThread();
360 return success;
361 }
362
Reset()363 void BlockingConnectionAdapter::Reset() {
364 {
365 std::lock_guard<std::mutex> lock(mutex_);
366 if (!started_) {
367 LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): not started";
368 return;
369 }
370
371 if (stopped_) {
372 LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_
373 << "): already stopped";
374 return;
375 }
376 }
377
378 LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): resetting";
379 this->underlying_->Reset();
380 Stop();
381 }
382
Stop()383 void BlockingConnectionAdapter::Stop() {
384 {
385 std::lock_guard<std::mutex> lock(mutex_);
386 if (!started_) {
387 LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): not started";
388 return;
389 }
390
391 if (stopped_) {
392 LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_
393 << "): already stopped";
394 return;
395 }
396
397 stopped_ = true;
398 }
399
400 LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): stopping";
401
402 this->underlying_->Close();
403 this->cv_.notify_one();
404
405 // Move the threads out into locals with the lock taken, and then unlock to let them exit.
406 std::thread read_thread;
407 std::thread write_thread;
408
409 {
410 std::lock_guard<std::mutex> lock(mutex_);
411 read_thread = std::move(read_thread_);
412 write_thread = std::move(write_thread_);
413 }
414
415 read_thread.join();
416 write_thread.join();
417
418 LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): stopped";
419 std::call_once(this->error_flag_, [this]() { this->error_callback_(this, "requested stop"); });
420 }
421
Write(std::unique_ptr<apacket> packet)422 bool BlockingConnectionAdapter::Write(std::unique_ptr<apacket> packet) {
423 {
424 std::lock_guard<std::mutex> lock(this->mutex_);
425 write_queue_.emplace_back(std::move(packet));
426 }
427
428 cv_.notify_one();
429 return true;
430 }
431
FdConnection(unique_fd fd)432 FdConnection::FdConnection(unique_fd fd) : fd_(std::move(fd)) {}
433
~FdConnection()434 FdConnection::~FdConnection() {}
435
DispatchRead(void * buf,size_t len)436 bool FdConnection::DispatchRead(void* buf, size_t len) {
437 if (tls_ != nullptr) {
438 // The TlsConnection doesn't allow 0 byte reads
439 if (len == 0) {
440 return true;
441 }
442 return tls_->ReadFully(buf, len);
443 }
444
445 return ReadFdExactly(fd_.get(), buf, len);
446 }
447
DispatchWrite(void * buf,size_t len)448 bool FdConnection::DispatchWrite(void* buf, size_t len) {
449 if (tls_ != nullptr) {
450 // The TlsConnection doesn't allow 0 byte writes
451 if (len == 0) {
452 return true;
453 }
454 return tls_->WriteFully(std::string_view(reinterpret_cast<const char*>(buf), len));
455 }
456
457 return WriteFdExactly(fd_.get(), buf, len);
458 }
459
Read(apacket * packet)460 bool FdConnection::Read(apacket* packet) {
461 if (!DispatchRead(&packet->msg, sizeof(amessage))) {
462 D("remote local: read terminated (message)");
463 return false;
464 }
465
466 if (packet->msg.data_length > MAX_PAYLOAD) {
467 D("remote local: read overflow (data length = %" PRIu32 ")", packet->msg.data_length);
468 return false;
469 }
470
471 packet->payload.resize(packet->msg.data_length);
472
473 if (!DispatchRead(&packet->payload[0], packet->payload.size())) {
474 D("remote local: terminated (data)");
475 return false;
476 }
477
478 return true;
479 }
480
Write(apacket * packet)481 bool FdConnection::Write(apacket* packet) {
482 if (!DispatchWrite(&packet->msg, sizeof(packet->msg))) {
483 D("remote local: write terminated");
484 return false;
485 }
486
487 if (packet->msg.data_length) {
488 if (!DispatchWrite(&packet->payload[0], packet->msg.data_length)) {
489 D("remote local: write terminated");
490 return false;
491 }
492 }
493
494 return true;
495 }
496
DoTlsHandshake(RSA * key,std::string * auth_key)497 bool FdConnection::DoTlsHandshake(RSA* key, std::string* auth_key) {
498 bssl::UniquePtr<EVP_PKEY> evp_pkey(EVP_PKEY_new());
499 if (!EVP_PKEY_set1_RSA(evp_pkey.get(), key)) {
500 LOG(ERROR) << "EVP_PKEY_set1_RSA failed";
501 return false;
502 }
503 auto x509 = GenerateX509Certificate(evp_pkey.get());
504 auto x509_str = X509ToPEMString(x509.get());
505 auto evp_str = Key::ToPEMString(evp_pkey.get());
506
507 int osh = cast_handle_to_int(adb_get_os_handle(fd_));
508 #if ADB_HOST
509 tls_ = TlsConnection::Create(TlsConnection::Role::Client, x509_str, evp_str, osh);
510 #else
511 tls_ = TlsConnection::Create(TlsConnection::Role::Server, x509_str, evp_str, osh);
512 #endif
513 CHECK(tls_);
514 #if ADB_HOST
515 // TLS 1.3 gives the client no message if the server rejected the
516 // certificate. This will enable a check in the tls connection to check
517 // whether the client certificate got rejected. Note that this assumes
518 // that, on handshake success, the server speaks first.
519 tls_->EnableClientPostHandshakeCheck(true);
520 // Add callback to set the certificate when server issues the
521 // CertificateRequest.
522 tls_->SetCertificateCallback(adb_tls_set_certificate);
523 // Allow any server certificate
524 tls_->SetCertVerifyCallback([](X509_STORE_CTX*) { return 1; });
525 #else
526 // Add callback to check certificate against a list of known public keys
527 tls_->SetCertVerifyCallback(
528 [auth_key](X509_STORE_CTX* ctx) { return adbd_tls_verify_cert(ctx, auth_key); });
529 // Add the list of allowed client CA issuers
530 auto ca_list = adbd_tls_client_ca_list();
531 tls_->SetClientCAList(ca_list.get());
532 #endif
533
534 auto err = tls_->DoHandshake();
535 if (err == TlsError::Success) {
536 return true;
537 }
538
539 tls_.reset();
540 return false;
541 }
542
Close()543 void FdConnection::Close() {
544 adb_shutdown(fd_.get());
545 fd_.reset();
546 }
547
send_packet(apacket * p,atransport * t)548 void send_packet(apacket* p, atransport* t) {
549 p->msg.magic = p->msg.command ^ 0xffffffff;
550 // compute a checksum for connection/auth packets for compatibility reasons
551 if (t->get_protocol_version() >= A_VERSION_SKIP_CHECKSUM) {
552 p->msg.data_check = 0;
553 } else {
554 p->msg.data_check = calculate_apacket_checksum(p);
555 }
556
557 VLOG(TRANSPORT) << dump_packet(t->serial.c_str(), "to remote", p);
558
559 if (t == nullptr) {
560 LOG(FATAL) << "Transport is null";
561 }
562
563 if (t->Write(p) != 0) {
564 D("%s: failed to enqueue packet, closing transport", t->serial.c_str());
565 t->Kick();
566 }
567 }
568
kick_transport(atransport * t,bool reset)569 void kick_transport(atransport* t, bool reset) {
570 std::lock_guard<std::recursive_mutex> lock(transport_lock);
571 // As kick_transport() can be called from threads without guarantee that t is valid,
572 // check if the transport is in transport_list first.
573 //
574 // TODO(jmgao): WTF? Is this actually true?
575 if (std::find(transport_list.begin(), transport_list.end(), t) != transport_list.end()) {
576 if (reset) {
577 t->Reset();
578 } else {
579 t->Kick();
580 }
581 }
582
583 #if ADB_HOST
584 reconnect_handler.CheckForKicked();
585 #endif
586 }
587
588 static int transport_registration_send = -1;
589 static int transport_registration_recv = -1;
590 static fdevent* transport_registration_fde;
591
592 #if ADB_HOST
593
594 /* this adds support required by the 'track-devices' service.
595 * this is used to send the content of "list_transport" to any
596 * number of client connections that want it through a single
597 * live TCP connection
598 */
599 struct device_tracker {
600 asocket socket;
601 bool update_needed = false;
602 bool long_output = false;
603 device_tracker* next = nullptr;
604 };
605
606 /* linked list of all device trackers */
607 static device_tracker* device_tracker_list;
608
device_tracker_remove(device_tracker * tracker)609 static void device_tracker_remove(device_tracker* tracker) {
610 device_tracker** pnode = &device_tracker_list;
611 device_tracker* node = *pnode;
612
613 std::lock_guard<std::recursive_mutex> lock(transport_lock);
614 while (node) {
615 if (node == tracker) {
616 *pnode = node->next;
617 break;
618 }
619 pnode = &node->next;
620 node = *pnode;
621 }
622 }
623
device_tracker_close(asocket * socket)624 static void device_tracker_close(asocket* socket) {
625 device_tracker* tracker = (device_tracker*)socket;
626 asocket* peer = socket->peer;
627
628 D("device tracker %p removed", tracker);
629 if (peer) {
630 peer->peer = nullptr;
631 peer->close(peer);
632 }
633 device_tracker_remove(tracker);
634 delete tracker;
635 }
636
device_tracker_enqueue(asocket * socket,apacket::payload_type)637 static int device_tracker_enqueue(asocket* socket, apacket::payload_type) {
638 /* you can't read from a device tracker, close immediately */
639 device_tracker_close(socket);
640 return -1;
641 }
642
device_tracker_send(device_tracker * tracker,const std::string & string)643 static int device_tracker_send(device_tracker* tracker, const std::string& string) {
644 asocket* peer = tracker->socket.peer;
645
646 apacket::payload_type data;
647 data.resize(4 + string.size());
648 char buf[5];
649 snprintf(buf, sizeof(buf), "%04x", static_cast<int>(string.size()));
650 memcpy(&data[0], buf, 4);
651 memcpy(&data[4], string.data(), string.size());
652 return peer->enqueue(peer, std::move(data));
653 }
654
device_tracker_ready(asocket * socket)655 static void device_tracker_ready(asocket* socket) {
656 device_tracker* tracker = reinterpret_cast<device_tracker*>(socket);
657
658 // We want to send the device list when the tracker connects
659 // for the first time, even if no update occurred.
660 if (tracker->update_needed) {
661 tracker->update_needed = false;
662 device_tracker_send(tracker, list_transports(tracker->long_output));
663 }
664 }
665
create_device_tracker(bool long_output)666 asocket* create_device_tracker(bool long_output) {
667 device_tracker* tracker = new device_tracker();
668 if (tracker == nullptr) LOG(FATAL) << "cannot allocate device tracker";
669
670 D("device tracker %p created", tracker);
671
672 tracker->socket.enqueue = device_tracker_enqueue;
673 tracker->socket.ready = device_tracker_ready;
674 tracker->socket.close = device_tracker_close;
675 tracker->update_needed = true;
676 tracker->long_output = long_output;
677
678 tracker->next = device_tracker_list;
679 device_tracker_list = tracker;
680
681 return &tracker->socket;
682 }
683
684 // Check if all of the USB transports are connected.
iterate_transports(std::function<bool (const atransport *)> fn)685 bool iterate_transports(std::function<bool(const atransport*)> fn) {
686 std::lock_guard<std::recursive_mutex> lock(transport_lock);
687 for (const auto& t : transport_list) {
688 if (!fn(t)) {
689 return false;
690 }
691 }
692 for (const auto& t : pending_list) {
693 if (!fn(t)) {
694 return false;
695 }
696 }
697 return true;
698 }
699
700 // Call this function each time the transport list has changed.
update_transports()701 void update_transports() {
702 update_transport_status();
703
704 // Notify `adb track-devices` clients.
705 device_tracker* tracker = device_tracker_list;
706 while (tracker != nullptr) {
707 device_tracker* next = tracker->next;
708 // This may destroy the tracker if the connection is closed.
709 device_tracker_send(tracker, list_transports(tracker->long_output));
710 tracker = next;
711 }
712 }
713
714 #else
715
update_transports()716 void update_transports() {
717 // Nothing to do on the device side.
718 }
719
720 #endif // ADB_HOST
721
722 struct tmsg {
723 atransport* transport;
724 int action;
725 };
726
transport_read_action(int fd,struct tmsg * m)727 static int transport_read_action(int fd, struct tmsg* m) {
728 char* p = (char*)m;
729 int len = sizeof(*m);
730 int r;
731
732 while (len > 0) {
733 r = adb_read(fd, p, len);
734 if (r > 0) {
735 len -= r;
736 p += r;
737 } else {
738 D("transport_read_action: on fd %d: %s", fd, strerror(errno));
739 return -1;
740 }
741 }
742 return 0;
743 }
744
transport_write_action(int fd,struct tmsg * m)745 static int transport_write_action(int fd, struct tmsg* m) {
746 char* p = (char*)m;
747 int len = sizeof(*m);
748 int r;
749
750 while (len > 0) {
751 r = adb_write(fd, p, len);
752 if (r > 0) {
753 len -= r;
754 p += r;
755 } else {
756 D("transport_write_action: on fd %d: %s", fd, strerror(errno));
757 return -1;
758 }
759 }
760 return 0;
761 }
762
transport_registration_func(int _fd,unsigned ev,void *)763 static void transport_registration_func(int _fd, unsigned ev, void*) {
764 tmsg m;
765 atransport* t;
766
767 if (!(ev & FDE_READ)) {
768 return;
769 }
770
771 if (transport_read_action(_fd, &m)) {
772 PLOG(FATAL) << "cannot read transport registration socket";
773 }
774
775 t = m.transport;
776
777 if (m.action == 0) {
778 D("transport: %s deleting", t->serial.c_str());
779
780 {
781 std::lock_guard<std::recursive_mutex> lock(transport_lock);
782 transport_list.remove(t);
783 }
784
785 delete t;
786
787 update_transports();
788 return;
789 }
790
791 /* don't create transport threads for inaccessible devices */
792 if (t->GetConnectionState() != kCsNoPerm) {
793 // The connection gets a reference to the atransport. It will release it
794 // upon a read/write error.
795 t->connection()->SetTransportName(t->serial_name());
796 t->connection()->SetReadCallback([t](Connection*, std::unique_ptr<apacket> p) {
797 if (!check_header(p.get(), t)) {
798 D("%s: remote read: bad header", t->serial.c_str());
799 return false;
800 }
801
802 VLOG(TRANSPORT) << dump_packet(t->serial.c_str(), "from remote", p.get());
803 apacket* packet = p.release();
804
805 // TODO: Does this need to run on the main thread?
806 fdevent_run_on_main_thread([packet, t]() { handle_packet(packet, t); });
807 return true;
808 });
809 t->connection()->SetErrorCallback([t](Connection*, const std::string& error) {
810 LOG(INFO) << t->serial_name() << ": connection terminated: " << error;
811 fdevent_run_on_main_thread([t]() {
812 handle_offline(t);
813 transport_destroy(t);
814 });
815 });
816
817 t->connection()->Start();
818 #if ADB_HOST
819 send_connect(t);
820 #endif
821 }
822
823 {
824 std::lock_guard<std::recursive_mutex> lock(transport_lock);
825 auto it = std::find(pending_list.begin(), pending_list.end(), t);
826 if (it != pending_list.end()) {
827 pending_list.remove(t);
828 transport_list.push_front(t);
829 }
830 }
831
832 update_transports();
833 }
834
835 #if ADB_HOST
init_reconnect_handler(void)836 void init_reconnect_handler(void) {
837 reconnect_handler.Start();
838 }
839 #endif
840
init_transport_registration(void)841 void init_transport_registration(void) {
842 int s[2];
843
844 if (adb_socketpair(s)) {
845 PLOG(FATAL) << "cannot open transport registration socketpair";
846 }
847 D("socketpair: (%d,%d)", s[0], s[1]);
848
849 transport_registration_send = s[0];
850 transport_registration_recv = s[1];
851
852 transport_registration_fde =
853 fdevent_create(transport_registration_recv, transport_registration_func, nullptr);
854 fdevent_set(transport_registration_fde, FDE_READ);
855 }
856
kick_all_transports()857 void kick_all_transports() {
858 #if ADB_HOST
859 reconnect_handler.Stop();
860 #endif
861 // To avoid only writing part of a packet to a transport after exit, kick all transports.
862 std::lock_guard<std::recursive_mutex> lock(transport_lock);
863 for (auto t : transport_list) {
864 t->Kick();
865 }
866 }
867
kick_all_tcp_tls_transports()868 void kick_all_tcp_tls_transports() {
869 std::lock_guard<std::recursive_mutex> lock(transport_lock);
870 for (auto t : transport_list) {
871 if (t->IsTcpDevice() && t->use_tls) {
872 t->Kick();
873 }
874 }
875 }
876
877 #if !ADB_HOST
kick_all_transports_by_auth_key(std::string_view auth_key)878 void kick_all_transports_by_auth_key(std::string_view auth_key) {
879 std::lock_guard<std::recursive_mutex> lock(transport_lock);
880 for (auto t : transport_list) {
881 if (auth_key == t->auth_key) {
882 t->Kick();
883 }
884 }
885 }
886 #endif
887
888 /* the fdevent select pump is single threaded */
register_transport(atransport * transport)889 void register_transport(atransport* transport) {
890 tmsg m;
891 m.transport = transport;
892 m.action = 1;
893 D("transport: %s registered", transport->serial.c_str());
894 if (transport_write_action(transport_registration_send, &m)) {
895 PLOG(FATAL) << "cannot write transport registration socket";
896 }
897 }
898
remove_transport(atransport * transport)899 static void remove_transport(atransport* transport) {
900 tmsg m;
901 m.transport = transport;
902 m.action = 0;
903 D("transport: %s removed", transport->serial.c_str());
904 if (transport_write_action(transport_registration_send, &m)) {
905 PLOG(FATAL) << "cannot write transport registration socket";
906 }
907 }
908
transport_destroy(atransport * t)909 static void transport_destroy(atransport* t) {
910 check_main_thread();
911 CHECK(t != nullptr);
912
913 std::lock_guard<std::recursive_mutex> lock(transport_lock);
914 LOG(INFO) << "destroying transport " << t->serial_name();
915 t->connection()->Stop();
916 #if ADB_HOST
917 if (t->IsTcpDevice() && !t->kicked()) {
918 D("transport: %s destroy (attempting reconnection)", t->serial.c_str());
919
920 // We need to clear the transport's keys, so that on the next connection, it tries
921 // again from the beginning.
922 t->ResetKeys();
923 reconnect_handler.TrackTransport(t);
924 return;
925 }
926 #endif
927
928 D("transport: %s destroy (kicking and closing)", t->serial.c_str());
929 remove_transport(t);
930 }
931
932 #if ADB_HOST
qual_match(const std::string & to_test,const char * prefix,const std::string & qual,bool sanitize_qual)933 static int qual_match(const std::string& to_test, const char* prefix, const std::string& qual,
934 bool sanitize_qual) {
935 if (to_test.empty()) /* Return true if both the qual and to_test are empty strings. */
936 return qual.empty();
937
938 if (qual.empty()) return 0;
939
940 const char* ptr = to_test.c_str();
941 if (prefix) {
942 while (*prefix) {
943 if (*prefix++ != *ptr++) return 0;
944 }
945 }
946
947 for (char ch : qual) {
948 if (sanitize_qual && !isalnum(ch)) ch = '_';
949 if (ch != *ptr++) return 0;
950 }
951
952 /* Everything matched so far. Return true if *ptr is a NUL. */
953 return !*ptr;
954 }
955
acquire_one_transport(TransportType type,const char * serial,TransportId transport_id,bool * is_ambiguous,std::string * error_out,bool accept_any_state)956 atransport* acquire_one_transport(TransportType type, const char* serial, TransportId transport_id,
957 bool* is_ambiguous, std::string* error_out,
958 bool accept_any_state) {
959 atransport* result = nullptr;
960
961 if (transport_id != 0) {
962 *error_out =
963 android::base::StringPrintf("no device with transport id '%" PRIu64 "'", transport_id);
964 } else if (serial) {
965 *error_out = android::base::StringPrintf("device '%s' not found", serial);
966 } else if (type == kTransportLocal) {
967 *error_out = "no emulators found";
968 } else if (type == kTransportAny) {
969 *error_out = "no devices/emulators found";
970 } else {
971 *error_out = "no devices found";
972 }
973
974 std::unique_lock<std::recursive_mutex> lock(transport_lock);
975 for (const auto& t : transport_list) {
976 if (t->GetConnectionState() == kCsNoPerm) {
977 *error_out = UsbNoPermissionsLongHelpText();
978 continue;
979 }
980
981 if (transport_id) {
982 if (t->id == transport_id) {
983 result = t;
984 break;
985 }
986 } else if (serial) {
987 if (t->MatchesTarget(serial)) {
988 if (result) {
989 *error_out = "more than one device";
990 if (is_ambiguous) *is_ambiguous = true;
991 result = nullptr;
992 break;
993 }
994 result = t;
995 }
996 } else {
997 if (type == kTransportUsb && t->type == kTransportUsb) {
998 if (result) {
999 *error_out = "more than one device";
1000 if (is_ambiguous) *is_ambiguous = true;
1001 result = nullptr;
1002 break;
1003 }
1004 result = t;
1005 } else if (type == kTransportLocal && t->type == kTransportLocal) {
1006 if (result) {
1007 *error_out = "more than one emulator";
1008 if (is_ambiguous) *is_ambiguous = true;
1009 result = nullptr;
1010 break;
1011 }
1012 result = t;
1013 } else if (type == kTransportAny) {
1014 if (result) {
1015 *error_out = "more than one device/emulator";
1016 if (is_ambiguous) *is_ambiguous = true;
1017 result = nullptr;
1018 break;
1019 }
1020 result = t;
1021 }
1022 }
1023 }
1024 lock.unlock();
1025
1026 if (result && !accept_any_state) {
1027 // The caller requires an active transport.
1028 // Make sure that we're actually connected.
1029 ConnectionState state = result->GetConnectionState();
1030 switch (state) {
1031 case kCsConnecting:
1032 *error_out = "device still connecting";
1033 result = nullptr;
1034 break;
1035
1036 case kCsAuthorizing:
1037 *error_out = "device still authorizing";
1038 result = nullptr;
1039 break;
1040
1041 case kCsUnauthorized: {
1042 *error_out = "device unauthorized.\n";
1043 char* ADB_VENDOR_KEYS = getenv("ADB_VENDOR_KEYS");
1044 *error_out += "This adb server's $ADB_VENDOR_KEYS is ";
1045 *error_out += ADB_VENDOR_KEYS ? ADB_VENDOR_KEYS : "not set";
1046 *error_out += "\n";
1047 *error_out += "Try 'adb kill-server' if that seems wrong.\n";
1048 *error_out += "Otherwise check for a confirmation dialog on your device.";
1049 result = nullptr;
1050 break;
1051 }
1052
1053 case kCsOffline:
1054 *error_out = "device offline";
1055 result = nullptr;
1056 break;
1057
1058 default:
1059 break;
1060 }
1061 }
1062
1063 if (result) {
1064 *error_out = "success";
1065 }
1066
1067 return result;
1068 }
1069
WaitForConnection(std::chrono::milliseconds timeout)1070 bool ConnectionWaitable::WaitForConnection(std::chrono::milliseconds timeout) {
1071 std::unique_lock<std::mutex> lock(mutex_);
1072 ScopedLockAssertion assume_locked(mutex_);
1073 return cv_.wait_for(lock, timeout, [&]() REQUIRES(mutex_) {
1074 return connection_established_ready_;
1075 }) && connection_established_;
1076 }
1077
SetConnectionEstablished(bool success)1078 void ConnectionWaitable::SetConnectionEstablished(bool success) {
1079 {
1080 std::lock_guard<std::mutex> lock(mutex_);
1081 if (connection_established_ready_) return;
1082 connection_established_ready_ = true;
1083 connection_established_ = success;
1084 D("connection established with %d", success);
1085 }
1086 cv_.notify_one();
1087 }
1088 #endif
1089
~atransport()1090 atransport::~atransport() {
1091 #if ADB_HOST
1092 // If the connection callback had not been run before, run it now.
1093 SetConnectionEstablished(false);
1094 #endif
1095 }
1096
Write(apacket * p)1097 int atransport::Write(apacket* p) {
1098 return this->connection()->Write(std::unique_ptr<apacket>(p)) ? 0 : -1;
1099 }
1100
Reset()1101 void atransport::Reset() {
1102 if (!kicked_.exchange(true)) {
1103 LOG(INFO) << "resetting transport " << this << " " << this->serial;
1104 this->connection()->Reset();
1105 }
1106 }
1107
Kick()1108 void atransport::Kick() {
1109 if (!kicked_.exchange(true)) {
1110 LOG(INFO) << "kicking transport " << this << " " << this->serial;
1111 this->connection()->Stop();
1112 }
1113 }
1114
GetConnectionState() const1115 ConnectionState atransport::GetConnectionState() const {
1116 return connection_state_;
1117 }
1118
SetConnectionState(ConnectionState state)1119 void atransport::SetConnectionState(ConnectionState state) {
1120 check_main_thread();
1121 connection_state_ = state;
1122 }
1123
SetConnection(std::unique_ptr<Connection> connection)1124 void atransport::SetConnection(std::unique_ptr<Connection> connection) {
1125 std::lock_guard<std::mutex> lock(mutex_);
1126 connection_ = std::shared_ptr<Connection>(std::move(connection));
1127 }
1128
connection_state_name() const1129 std::string atransport::connection_state_name() const {
1130 ConnectionState state = GetConnectionState();
1131 switch (state) {
1132 case kCsOffline:
1133 return "offline";
1134 case kCsBootloader:
1135 return "bootloader";
1136 case kCsDevice:
1137 return "device";
1138 case kCsHost:
1139 return "host";
1140 case kCsRecovery:
1141 return "recovery";
1142 case kCsRescue:
1143 return "rescue";
1144 case kCsNoPerm:
1145 return UsbNoPermissionsShortHelpText();
1146 case kCsSideload:
1147 return "sideload";
1148 case kCsUnauthorized:
1149 return "unauthorized";
1150 case kCsAuthorizing:
1151 return "authorizing";
1152 case kCsConnecting:
1153 return "connecting";
1154 default:
1155 return "unknown";
1156 }
1157 }
1158
update_version(int version,size_t payload)1159 void atransport::update_version(int version, size_t payload) {
1160 protocol_version = std::min(version, A_VERSION);
1161 max_payload = std::min(payload, MAX_PAYLOAD);
1162 }
1163
get_protocol_version() const1164 int atransport::get_protocol_version() const {
1165 return protocol_version;
1166 }
1167
get_tls_version() const1168 int atransport::get_tls_version() const {
1169 return tls_version;
1170 }
1171
get_max_payload() const1172 size_t atransport::get_max_payload() const {
1173 return max_payload;
1174 }
1175
supported_features()1176 const FeatureSet& supported_features() {
1177 static const android::base::NoDestructor<FeatureSet> features([] {
1178 return FeatureSet{
1179 kFeatureShell2,
1180 kFeatureCmd,
1181 kFeatureStat2,
1182 kFeatureLs2,
1183 kFeatureFixedPushMkdir,
1184 kFeatureApex,
1185 kFeatureAbb,
1186 kFeatureFixedPushSymlinkTimestamp,
1187 kFeatureAbbExec,
1188 kFeatureRemountShell,
1189 kFeatureTrackApp,
1190 kFeatureSendRecv2,
1191 kFeatureSendRecv2Brotli,
1192 kFeatureSendRecv2LZ4,
1193 kFeatureSendRecv2Zstd,
1194 kFeatureSendRecv2DryRunSend,
1195 // Increment ADB_SERVER_VERSION when adding a feature that adbd needs
1196 // to know about. Otherwise, the client can be stuck running an old
1197 // version of the server even after upgrading their copy of adb.
1198 // (http://b/24370690)
1199 };
1200 }());
1201
1202 return *features;
1203 }
1204
FeatureSetToString(const FeatureSet & features)1205 std::string FeatureSetToString(const FeatureSet& features) {
1206 return android::base::Join(features, ',');
1207 }
1208
StringToFeatureSet(const std::string & features_string)1209 FeatureSet StringToFeatureSet(const std::string& features_string) {
1210 if (features_string.empty()) {
1211 return FeatureSet();
1212 }
1213
1214 return android::base::Split(features_string, ",");
1215 }
1216
1217 template <class Range, class Value>
contains(const Range & r,const Value & v)1218 static bool contains(const Range& r, const Value& v) {
1219 return std::find(std::begin(r), std::end(r), v) != std::end(r);
1220 }
1221
CanUseFeature(const FeatureSet & feature_set,const std::string & feature)1222 bool CanUseFeature(const FeatureSet& feature_set, const std::string& feature) {
1223 return contains(feature_set, feature) && contains(supported_features(), feature);
1224 }
1225
has_feature(const std::string & feature) const1226 bool atransport::has_feature(const std::string& feature) const {
1227 return contains(features_, feature);
1228 }
1229
SetFeatures(const std::string & features_string)1230 void atransport::SetFeatures(const std::string& features_string) {
1231 features_ = StringToFeatureSet(features_string);
1232 }
1233
AddDisconnect(adisconnect * disconnect)1234 void atransport::AddDisconnect(adisconnect* disconnect) {
1235 disconnects_.push_back(disconnect);
1236 }
1237
RemoveDisconnect(adisconnect * disconnect)1238 void atransport::RemoveDisconnect(adisconnect* disconnect) {
1239 disconnects_.remove(disconnect);
1240 }
1241
RunDisconnects()1242 void atransport::RunDisconnects() {
1243 for (const auto& disconnect : disconnects_) {
1244 disconnect->func(disconnect->opaque, this);
1245 }
1246 disconnects_.clear();
1247 }
1248
1249 #if ADB_HOST
MatchesTarget(const std::string & target) const1250 bool atransport::MatchesTarget(const std::string& target) const {
1251 if (!serial.empty()) {
1252 if (target == serial) {
1253 return true;
1254 } else if (type == kTransportLocal) {
1255 // Local transports can match [tcp:|udp:]<hostname>[:port].
1256 const char* local_target_ptr = target.c_str();
1257
1258 // For fastboot compatibility, ignore protocol prefixes.
1259 if (android::base::StartsWith(target, "tcp:") ||
1260 android::base::StartsWith(target, "udp:")) {
1261 local_target_ptr += 4;
1262 }
1263
1264 // Parse our |serial| and the given |target| to check if the hostnames and ports match.
1265 std::string serial_host, error;
1266 int serial_port = -1;
1267 if (android::base::ParseNetAddress(serial, &serial_host, &serial_port, nullptr, &error)) {
1268 // |target| may omit the port to default to ours.
1269 std::string target_host;
1270 int target_port = serial_port;
1271 if (android::base::ParseNetAddress(local_target_ptr, &target_host, &target_port,
1272 nullptr, &error) &&
1273 serial_host == target_host && serial_port == target_port) {
1274 return true;
1275 }
1276 }
1277 }
1278 }
1279
1280 return (target == devpath) || qual_match(target, "product:", product, false) ||
1281 qual_match(target, "model:", model, true) ||
1282 qual_match(target, "device:", device, false);
1283 }
1284
SetConnectionEstablished(bool success)1285 void atransport::SetConnectionEstablished(bool success) {
1286 connection_waitable_->SetConnectionEstablished(success);
1287 }
1288
Reconnect()1289 ReconnectResult atransport::Reconnect() {
1290 return reconnect_(this);
1291 }
1292
1293 // We use newline as our delimiter, make sure to never output it.
sanitize(std::string str,bool alphanumeric)1294 static std::string sanitize(std::string str, bool alphanumeric) {
1295 auto pred = alphanumeric ? [](const char c) { return !isalnum(c); }
1296 : [](const char c) { return c == '\n'; };
1297 std::replace_if(str.begin(), str.end(), pred, '_');
1298 return str;
1299 }
1300
append_transport_info(std::string * result,const char * key,const std::string & value,bool alphanumeric)1301 static void append_transport_info(std::string* result, const char* key, const std::string& value,
1302 bool alphanumeric) {
1303 if (value.empty()) {
1304 return;
1305 }
1306
1307 *result += ' ';
1308 *result += key;
1309 *result += sanitize(value, alphanumeric);
1310 }
1311
append_transport(const atransport * t,std::string * result,bool long_listing)1312 static void append_transport(const atransport* t, std::string* result, bool long_listing) {
1313 std::string serial = t->serial;
1314 if (serial.empty()) {
1315 serial = "(no serial number)";
1316 }
1317
1318 if (!long_listing) {
1319 *result += serial;
1320 *result += '\t';
1321 *result += t->connection_state_name();
1322 } else {
1323 android::base::StringAppendF(result, "%-22s %s", serial.c_str(),
1324 t->connection_state_name().c_str());
1325
1326 append_transport_info(result, "", t->devpath, false);
1327 append_transport_info(result, "product:", t->product, false);
1328 append_transport_info(result, "model:", t->model, true);
1329 append_transport_info(result, "device:", t->device, false);
1330
1331 // Put id at the end, so that anyone parsing the output here can always find it by scanning
1332 // backwards from newlines, even with hypothetical devices named 'transport_id:1'.
1333 *result += " transport_id:";
1334 *result += std::to_string(t->id);
1335 }
1336 *result += '\n';
1337 }
1338
list_transports(bool long_listing)1339 std::string list_transports(bool long_listing) {
1340 std::lock_guard<std::recursive_mutex> lock(transport_lock);
1341
1342 auto sorted_transport_list = transport_list;
1343 sorted_transport_list.sort([](atransport*& x, atransport*& y) {
1344 if (x->type != y->type) {
1345 return x->type < y->type;
1346 }
1347 return x->serial < y->serial;
1348 });
1349
1350 std::string result;
1351 for (const auto& t : sorted_transport_list) {
1352 append_transport(t, &result, long_listing);
1353 }
1354 return result;
1355 }
1356
close_usb_devices(std::function<bool (const atransport *)> predicate,bool reset)1357 void close_usb_devices(std::function<bool(const atransport*)> predicate, bool reset) {
1358 std::lock_guard<std::recursive_mutex> lock(transport_lock);
1359 for (auto& t : transport_list) {
1360 if (predicate(t)) {
1361 if (reset) {
1362 t->Reset();
1363 } else {
1364 t->Kick();
1365 }
1366 }
1367 }
1368 }
1369
1370 /* hack for osx */
close_usb_devices(bool reset)1371 void close_usb_devices(bool reset) {
1372 close_usb_devices([](const atransport*) { return true; }, reset);
1373 }
1374 #endif
1375
register_socket_transport(unique_fd s,std::string serial,int port,int local,atransport::ReconnectCallback reconnect,bool use_tls,int * error)1376 bool register_socket_transport(unique_fd s, std::string serial, int port, int local,
1377 atransport::ReconnectCallback reconnect, bool use_tls, int* error) {
1378 atransport* t = new atransport(std::move(reconnect), kCsOffline);
1379 t->use_tls = use_tls;
1380
1381 D("transport: %s init'ing for socket %d, on port %d", serial.c_str(), s.get(), port);
1382 if (init_socket_transport(t, std::move(s), port, local) < 0) {
1383 delete t;
1384 if (error) *error = errno;
1385 return false;
1386 }
1387
1388 std::unique_lock<std::recursive_mutex> lock(transport_lock);
1389 for (const auto& transport : pending_list) {
1390 if (serial == transport->serial) {
1391 VLOG(TRANSPORT) << "socket transport " << transport->serial
1392 << " is already in pending_list and fails to register";
1393 delete t;
1394 if (error) *error = EALREADY;
1395 return false;
1396 }
1397 }
1398
1399 for (const auto& transport : transport_list) {
1400 if (serial == transport->serial) {
1401 VLOG(TRANSPORT) << "socket transport " << transport->serial
1402 << " is already in transport_list and fails to register";
1403 delete t;
1404 if (error) *error = EALREADY;
1405 return false;
1406 }
1407 }
1408
1409 t->serial = std::move(serial);
1410 pending_list.push_front(t);
1411
1412 lock.unlock();
1413
1414 #if ADB_HOST
1415 auto waitable = t->connection_waitable();
1416 #endif
1417 register_transport(t);
1418
1419 if (local == 1) {
1420 // Do not wait for emulator transports.
1421 return true;
1422 }
1423
1424 #if ADB_HOST
1425 if (!waitable->WaitForConnection(std::chrono::seconds(10))) {
1426 if (error) *error = ETIMEDOUT;
1427 return false;
1428 }
1429
1430 if (t->GetConnectionState() == kCsUnauthorized) {
1431 if (error) *error = EPERM;
1432 return false;
1433 }
1434 #endif
1435
1436 return true;
1437 }
1438
1439 #if ADB_HOST
find_transport(const char * serial)1440 atransport* find_transport(const char* serial) {
1441 atransport* result = nullptr;
1442
1443 std::lock_guard<std::recursive_mutex> lock(transport_lock);
1444 for (auto& t : transport_list) {
1445 if (strcmp(serial, t->serial.c_str()) == 0) {
1446 result = t;
1447 break;
1448 }
1449 }
1450
1451 return result;
1452 }
1453
kick_all_tcp_devices()1454 void kick_all_tcp_devices() {
1455 std::lock_guard<std::recursive_mutex> lock(transport_lock);
1456 for (auto& t : transport_list) {
1457 if (t->IsTcpDevice()) {
1458 // Kicking breaks the read_transport thread of this transport out of any read, then
1459 // the read_transport thread will notify the main thread to make this transport
1460 // offline. Then the main thread will notify the write_transport thread to exit.
1461 // Finally, this transport will be closed and freed in the main thread.
1462 t->Kick();
1463 }
1464 }
1465 reconnect_handler.CheckForKicked();
1466 }
1467
register_usb_transport(usb_handle * usb,const char * serial,const char * devpath,unsigned writeable)1468 void register_usb_transport(usb_handle* usb, const char* serial, const char* devpath,
1469 unsigned writeable) {
1470 atransport* t = new atransport(writeable ? kCsOffline : kCsNoPerm);
1471
1472 D("transport: %p init'ing for usb_handle %p (sn='%s')", t, usb, serial ? serial : "");
1473 init_usb_transport(t, usb);
1474 if (serial) {
1475 t->serial = serial;
1476 }
1477
1478 if (devpath) {
1479 t->devpath = devpath;
1480 }
1481
1482 {
1483 std::lock_guard<std::recursive_mutex> lock(transport_lock);
1484 pending_list.push_front(t);
1485 }
1486
1487 register_transport(t);
1488 }
1489
1490 // This should only be used for transports with connection_state == kCsNoPerm.
unregister_usb_transport(usb_handle * usb)1491 void unregister_usb_transport(usb_handle* usb) {
1492 std::lock_guard<std::recursive_mutex> lock(transport_lock);
1493 transport_list.remove_if([usb](atransport* t) {
1494 return t->GetUsbHandle() == usb && t->GetConnectionState() == kCsNoPerm;
1495 });
1496 }
1497 #endif
1498
check_header(apacket * p,atransport * t)1499 bool check_header(apacket* p, atransport* t) {
1500 if (p->msg.magic != (p->msg.command ^ 0xffffffff)) {
1501 VLOG(RWX) << "check_header(): invalid magic command = " << std::hex << p->msg.command
1502 << ", magic = " << p->msg.magic;
1503 return false;
1504 }
1505
1506 if (p->msg.data_length > t->get_max_payload()) {
1507 VLOG(RWX) << "check_header(): " << p->msg.data_length
1508 << " atransport::max_payload = " << t->get_max_payload();
1509 return false;
1510 }
1511
1512 return true;
1513 }
1514
1515 #if ADB_HOST
Key()1516 std::shared_ptr<RSA> atransport::Key() {
1517 if (keys_.empty()) {
1518 return nullptr;
1519 }
1520
1521 std::shared_ptr<RSA> result = keys_[0];
1522 return result;
1523 }
1524
NextKey()1525 std::shared_ptr<RSA> atransport::NextKey() {
1526 if (keys_.empty()) {
1527 LOG(INFO) << "fetching keys for transport " << this->serial_name();
1528 keys_ = adb_auth_get_private_keys();
1529
1530 // We should have gotten at least one key: the one that's automatically generated.
1531 CHECK(!keys_.empty());
1532 } else {
1533 keys_.pop_front();
1534 }
1535
1536 return Key();
1537 }
1538
ResetKeys()1539 void atransport::ResetKeys() {
1540 keys_.clear();
1541 }
1542 #endif
1543