1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define TRACE_TAG USB
18 
19 #include "sysdeps.h"
20 
21 #include <errno.h>
22 #include <inttypes.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <sys/ioctl.h>
27 #include <sys/types.h>
28 #include <unistd.h>
29 
30 #include <linux/usb/functionfs.h>
31 #include <sys/eventfd.h>
32 
33 #include <algorithm>
34 #include <array>
35 #include <future>
36 #include <memory>
37 #include <mutex>
38 #include <optional>
39 #include <vector>
40 
41 #include <asyncio/AsyncIO.h>
42 
43 #include <android-base/logging.h>
44 #include <android-base/macros.h>
45 #include <android-base/properties.h>
46 #include <android-base/thread_annotations.h>
47 
48 #include "adb_unique_fd.h"
49 #include "adb_utils.h"
50 #include "daemon/usb_ffs.h"
51 #include "sysdeps/chrono.h"
52 #include "transport.h"
53 #include "types.h"
54 
55 using android::base::StringPrintf;
56 
57 // Not all USB controllers support operations larger than 16k, so don't go above that.
58 // Also, each submitted operation does an allocation in the kernel of that size, so we want to
59 // minimize our queue depth while still maintaining a deep enough queue to keep the USB stack fed.
60 static constexpr size_t kUsbReadQueueDepth = 8;
61 static constexpr size_t kUsbReadSize = 4 * PAGE_SIZE;
62 
63 static constexpr size_t kUsbWriteQueueDepth = 8;
64 static constexpr size_t kUsbWriteSize = 4 * PAGE_SIZE;
65 
to_string(enum usb_functionfs_event_type type)66 static const char* to_string(enum usb_functionfs_event_type type) {
67     switch (type) {
68         case FUNCTIONFS_BIND:
69             return "FUNCTIONFS_BIND";
70         case FUNCTIONFS_UNBIND:
71             return "FUNCTIONFS_UNBIND";
72         case FUNCTIONFS_ENABLE:
73             return "FUNCTIONFS_ENABLE";
74         case FUNCTIONFS_DISABLE:
75             return "FUNCTIONFS_DISABLE";
76         case FUNCTIONFS_SETUP:
77             return "FUNCTIONFS_SETUP";
78         case FUNCTIONFS_SUSPEND:
79             return "FUNCTIONFS_SUSPEND";
80         case FUNCTIONFS_RESUME:
81             return "FUNCTIONFS_RESUME";
82     }
83 }
84 
85 enum class TransferDirection : uint64_t {
86     READ = 0,
87     WRITE = 1,
88 };
89 
90 struct TransferId {
91     TransferDirection direction : 1;
92     uint64_t id : 63;
93 
TransferIdTransferId94     TransferId() : TransferId(TransferDirection::READ, 0) {}
95 
96   private:
TransferIdTransferId97     TransferId(TransferDirection direction, uint64_t id) : direction(direction), id(id) {}
98 
99   public:
operator uint64_tTransferId100     explicit operator uint64_t() const {
101         uint64_t result;
102         static_assert(sizeof(*this) == sizeof(result));
103         memcpy(&result, this, sizeof(*this));
104         return result;
105     }
106 
readTransferId107     static TransferId read(uint64_t id) { return TransferId(TransferDirection::READ, id); }
writeTransferId108     static TransferId write(uint64_t id) { return TransferId(TransferDirection::WRITE, id); }
109 
from_valueTransferId110     static TransferId from_value(uint64_t value) {
111         TransferId result;
112         memcpy(&result, &value, sizeof(value));
113         return result;
114     }
115 };
116 
117 template <class Payload>
118 struct IoBlock {
119     bool pending = false;
120     struct iocb control = {};
121     Payload payload;
122 
idIoBlock123     TransferId id() const { return TransferId::from_value(control.aio_data); }
124 };
125 
126 using IoReadBlock = IoBlock<Block>;
127 using IoWriteBlock = IoBlock<std::shared_ptr<Block>>;
128 
129 struct ScopedAioContext {
130     ScopedAioContext() = default;
~ScopedAioContextScopedAioContext131     ~ScopedAioContext() { reset(); }
132 
ScopedAioContextScopedAioContext133     ScopedAioContext(ScopedAioContext&& move) { reset(move.release()); }
134     ScopedAioContext(const ScopedAioContext& copy) = delete;
135 
operator =ScopedAioContext136     ScopedAioContext& operator=(ScopedAioContext&& move) {
137         reset(move.release());
138         return *this;
139     }
140     ScopedAioContext& operator=(const ScopedAioContext& copy) = delete;
141 
CreateScopedAioContext142     static ScopedAioContext Create(size_t max_events) {
143         aio_context_t ctx = 0;
144         if (io_setup(max_events, &ctx) != 0) {
145             PLOG(FATAL) << "failed to create aio_context_t";
146         }
147         ScopedAioContext result;
148         result.reset(ctx);
149         return result;
150     }
151 
releaseScopedAioContext152     aio_context_t release() {
153         aio_context_t result = context_;
154         context_ = 0;
155         return result;
156     }
157 
resetScopedAioContext158     void reset(aio_context_t new_context = 0) {
159         if (context_ != 0) {
160             io_destroy(context_);
161         }
162 
163         context_ = new_context;
164     }
165 
getScopedAioContext166     aio_context_t get() { return context_; }
167 
168   private:
169     aio_context_t context_ = 0;
170 };
171 
172 struct UsbFfsConnection : public Connection {
UsbFfsConnectionUsbFfsConnection173     UsbFfsConnection(unique_fd control, unique_fd read, unique_fd write,
174                      std::promise<void> destruction_notifier)
175         : worker_started_(false),
176           stopped_(false),
177           destruction_notifier_(std::move(destruction_notifier)),
178           control_fd_(std::move(control)),
179           read_fd_(std::move(read)),
180           write_fd_(std::move(write)) {
181         LOG(INFO) << "UsbFfsConnection constructed";
182         worker_event_fd_.reset(eventfd(0, EFD_CLOEXEC));
183         if (worker_event_fd_ == -1) {
184             PLOG(FATAL) << "failed to create eventfd";
185         }
186 
187         monitor_event_fd_.reset(eventfd(0, EFD_CLOEXEC));
188         if (monitor_event_fd_ == -1) {
189             PLOG(FATAL) << "failed to create eventfd";
190         }
191 
192         aio_context_ = ScopedAioContext::Create(kUsbReadQueueDepth + kUsbWriteQueueDepth);
193     }
194 
~UsbFfsConnectionUsbFfsConnection195     ~UsbFfsConnection() {
196         LOG(INFO) << "UsbFfsConnection being destroyed";
197         Stop();
198         monitor_thread_.join();
199 
200         // We need to explicitly close our file descriptors before we notify our destruction,
201         // because the thread listening on the future will immediately try to reopen the endpoint.
202         aio_context_.reset();
203         control_fd_.reset();
204         read_fd_.reset();
205         write_fd_.reset();
206 
207         destruction_notifier_.set_value();
208     }
209 
WriteUsbFfsConnection210     virtual bool Write(std::unique_ptr<apacket> packet) override final {
211         LOG(DEBUG) << "USB write: " << dump_header(&packet->msg);
212         auto header = std::make_shared<Block>(sizeof(packet->msg));
213         memcpy(header->data(), &packet->msg, sizeof(packet->msg));
214 
215         std::lock_guard<std::mutex> lock(write_mutex_);
216         write_requests_.push_back(
217                 CreateWriteBlock(std::move(header), 0, sizeof(packet->msg), next_write_id_++));
218         if (!packet->payload.empty()) {
219             // The kernel attempts to allocate a contiguous block of memory for each write,
220             // which can fail if the write is large and the kernel heap is fragmented.
221             // Split large writes into smaller chunks to avoid this.
222             auto payload = std::make_shared<Block>(std::move(packet->payload));
223             size_t offset = 0;
224             size_t len = payload->size();
225 
226             while (len > 0) {
227                 size_t write_size = std::min(kUsbWriteSize, len);
228                 write_requests_.push_back(
229                         CreateWriteBlock(payload, offset, write_size, next_write_id_++));
230                 len -= write_size;
231                 offset += write_size;
232             }
233         }
234 
235         // Wake up the worker thread to submit writes.
236         uint64_t notify = 1;
237         ssize_t rc = adb_write(worker_event_fd_.get(), &notify, sizeof(notify));
238         if (rc < 0) {
239             PLOG(FATAL) << "failed to notify worker eventfd to submit writes";
240         }
241 
242         return true;
243     }
244 
StartUsbFfsConnection245     virtual void Start() override final { StartMonitor(); }
246 
StopUsbFfsConnection247     virtual void Stop() override final {
248         if (stopped_.exchange(true)) {
249             return;
250         }
251         stopped_ = true;
252         uint64_t notify = 1;
253         ssize_t rc = adb_write(worker_event_fd_.get(), &notify, sizeof(notify));
254         if (rc < 0) {
255             PLOG(FATAL) << "failed to notify worker eventfd to stop UsbFfsConnection";
256         }
257         CHECK_EQ(static_cast<size_t>(rc), sizeof(notify));
258 
259         rc = adb_write(monitor_event_fd_.get(), &notify, sizeof(notify));
260         if (rc < 0) {
261             PLOG(FATAL) << "failed to notify monitor eventfd to stop UsbFfsConnection";
262         }
263 
264         CHECK_EQ(static_cast<size_t>(rc), sizeof(notify));
265     }
266 
DoTlsHandshakeUsbFfsConnection267     virtual bool DoTlsHandshake(RSA* key, std::string* auth_key) override final {
268         // TODO: support TLS for usb connections.
269         LOG(FATAL) << "Not supported yet.";
270         return false;
271     }
272 
273   private:
StartMonitorUsbFfsConnection274     void StartMonitor() {
275         // This is a bit of a mess.
276         // It's possible for io_submit to end up blocking, if we call it as the endpoint
277         // becomes disabled. Work around this by having a monitor thread to listen for functionfs
278         // lifecycle events. If we notice an error condition (either we've become disabled, or we
279         // were never enabled in the first place), we send interruption signals to the worker thread
280         // until it dies, and then report failure to the transport via HandleError, which will
281         // eventually result in the transport being destroyed, which will result in UsbFfsConnection
282         // being destroyed, which unblocks the open thread and restarts this entire process.
283         static std::once_flag handler_once;
284         std::call_once(handler_once, []() { signal(kInterruptionSignal, [](int) {}); });
285 
286         monitor_thread_ = std::thread([this]() {
287             adb_thread_setname("UsbFfs-monitor");
288             LOG(INFO) << "UsbFfs-monitor thread spawned";
289 
290             bool bound = false;
291             bool enabled = false;
292             bool running = true;
293             while (running) {
294                 adb_pollfd pfd[2] = {
295                   { .fd = control_fd_.get(), .events = POLLIN, .revents = 0 },
296                   { .fd = monitor_event_fd_.get(), .events = POLLIN, .revents = 0 },
297                 };
298 
299                 // If we don't see our first bind within a second, try again.
300                 int timeout_ms = bound ? -1 : 1000;
301 
302                 int rc = TEMP_FAILURE_RETRY(adb_poll(pfd, 2, timeout_ms));
303                 if (rc == -1) {
304                     PLOG(FATAL) << "poll on USB control fd failed";
305                 } else if (rc == 0) {
306                     LOG(WARNING) << "timed out while waiting for FUNCTIONFS_BIND, trying again";
307                     break;
308                 }
309 
310                 if (pfd[1].revents) {
311                     // We were told to die.
312                     break;
313                 }
314 
315                 struct usb_functionfs_event event;
316                 rc = TEMP_FAILURE_RETRY(adb_read(control_fd_.get(), &event, sizeof(event)));
317                 if (rc == -1) {
318                     PLOG(FATAL) << "failed to read functionfs event";
319                 } else if (rc == 0) {
320                     LOG(WARNING) << "hit EOF on functionfs control fd";
321                     break;
322                 } else if (rc != sizeof(event)) {
323                     LOG(FATAL) << "read functionfs event of unexpected size, expected "
324                                << sizeof(event) << ", got " << rc;
325                 }
326 
327                 LOG(INFO) << "USB event: "
328                           << to_string(static_cast<usb_functionfs_event_type>(event.type));
329 
330                 switch (event.type) {
331                     case FUNCTIONFS_BIND:
332                         if (bound) {
333                             LOG(WARNING) << "received FUNCTIONFS_BIND while already bound?";
334                             running = false;
335                             break;
336                         }
337 
338                         if (enabled) {
339                             LOG(WARNING) << "received FUNCTIONFS_BIND while already enabled?";
340                             running = false;
341                             break;
342                         }
343 
344                         bound = true;
345                         break;
346 
347                     case FUNCTIONFS_ENABLE:
348                         if (!bound) {
349                             LOG(WARNING) << "received FUNCTIONFS_ENABLE while not bound?";
350                             running = false;
351                             break;
352                         }
353 
354                         if (enabled) {
355                             LOG(WARNING) << "received FUNCTIONFS_ENABLE while already enabled?";
356                             running = false;
357                             break;
358                         }
359 
360                         enabled = true;
361                         StartWorker();
362                         break;
363 
364                     case FUNCTIONFS_DISABLE:
365                         if (!bound) {
366                             LOG(WARNING) << "received FUNCTIONFS_DISABLE while not bound?";
367                         }
368 
369                         if (!enabled) {
370                             LOG(WARNING) << "received FUNCTIONFS_DISABLE while not enabled?";
371                         }
372 
373                         enabled = false;
374                         running = false;
375                         break;
376 
377                     case FUNCTIONFS_UNBIND:
378                         if (enabled) {
379                             LOG(WARNING) << "received FUNCTIONFS_UNBIND while still enabled?";
380                         }
381 
382                         if (!bound) {
383                             LOG(WARNING) << "received FUNCTIONFS_UNBIND when not bound?";
384                         }
385 
386                         bound = false;
387                         running = false;
388                         break;
389 
390                     case FUNCTIONFS_SETUP: {
391                         LOG(INFO) << "received FUNCTIONFS_SETUP control transfer: bRequestType = "
392                                   << static_cast<int>(event.u.setup.bRequestType)
393                                   << ", bRequest = " << static_cast<int>(event.u.setup.bRequest)
394                                   << ", wValue = " << static_cast<int>(event.u.setup.wValue)
395                                   << ", wIndex = " << static_cast<int>(event.u.setup.wIndex)
396                                   << ", wLength = " << static_cast<int>(event.u.setup.wLength);
397 
398                         if ((event.u.setup.bRequestType & USB_DIR_IN)) {
399                             LOG(INFO) << "acking device-to-host control transfer";
400                             ssize_t rc = adb_write(control_fd_.get(), "", 0);
401                             if (rc != 0) {
402                                 PLOG(ERROR) << "failed to write empty packet to host";
403                                 break;
404                             }
405                         } else {
406                             std::string buf;
407                             buf.resize(event.u.setup.wLength + 1);
408 
409                             ssize_t rc = adb_read(control_fd_.get(), buf.data(), buf.size());
410                             if (rc != event.u.setup.wLength) {
411                                 LOG(ERROR)
412                                         << "read " << rc
413                                         << " bytes when trying to read control request, expected "
414                                         << event.u.setup.wLength;
415                             }
416 
417                             LOG(INFO) << "control request contents: " << buf;
418                             break;
419                         }
420                     }
421                 }
422             }
423 
424             StopWorker();
425             HandleError("monitor thread finished");
426         });
427     }
428 
StartWorkerUsbFfsConnection429     void StartWorker() {
430         CHECK(!worker_started_);
431         worker_started_ = true;
432         worker_thread_ = std::thread([this]() {
433             adb_thread_setname("UsbFfs-worker");
434             LOG(INFO) << "UsbFfs-worker thread spawned";
435 
436             for (size_t i = 0; i < kUsbReadQueueDepth; ++i) {
437                 read_requests_[i] = CreateReadBlock(next_read_id_++);
438                 if (!SubmitRead(&read_requests_[i])) {
439                     return;
440                 }
441             }
442 
443             while (!stopped_) {
444                 uint64_t dummy;
445                 ssize_t rc = adb_read(worker_event_fd_.get(), &dummy, sizeof(dummy));
446                 if (rc == -1) {
447                     PLOG(FATAL) << "failed to read from eventfd";
448                 } else if (rc == 0) {
449                     LOG(FATAL) << "hit EOF on eventfd";
450                 }
451 
452                 ReadEvents();
453 
454                 std::lock_guard<std::mutex> lock(write_mutex_);
455                 SubmitWrites();
456             }
457         });
458     }
459 
StopWorkerUsbFfsConnection460     void StopWorker() {
461         if (!worker_started_) {
462             return;
463         }
464 
465         pthread_t worker_thread_handle = worker_thread_.native_handle();
466         while (true) {
467             int rc = pthread_kill(worker_thread_handle, kInterruptionSignal);
468             if (rc != 0) {
469                 LOG(ERROR) << "failed to send interruption signal to worker: " << strerror(rc);
470                 break;
471             }
472 
473             std::this_thread::sleep_for(100ms);
474 
475             rc = pthread_kill(worker_thread_handle, 0);
476             if (rc == 0) {
477                 continue;
478             } else if (rc == ESRCH) {
479                 break;
480             } else {
481                 LOG(ERROR) << "failed to send interruption signal to worker: " << strerror(rc);
482             }
483         }
484 
485         worker_thread_.join();
486     }
487 
PrepareReadBlockUsbFfsConnection488     void PrepareReadBlock(IoReadBlock* block, uint64_t id) {
489         block->pending = false;
490         if (block->payload.capacity() >= kUsbReadSize) {
491             block->payload.resize(kUsbReadSize);
492         } else {
493             block->payload = Block(kUsbReadSize);
494         }
495         block->control.aio_data = static_cast<uint64_t>(TransferId::read(id));
496         block->control.aio_buf = reinterpret_cast<uintptr_t>(block->payload.data());
497         block->control.aio_nbytes = block->payload.size();
498     }
499 
CreateReadBlockUsbFfsConnection500     IoReadBlock CreateReadBlock(uint64_t id) {
501         IoReadBlock block;
502         PrepareReadBlock(&block, id);
503         block.control.aio_rw_flags = 0;
504         block.control.aio_lio_opcode = IOCB_CMD_PREAD;
505         block.control.aio_reqprio = 0;
506         block.control.aio_fildes = read_fd_.get();
507         block.control.aio_offset = 0;
508         block.control.aio_flags = IOCB_FLAG_RESFD;
509         block.control.aio_resfd = worker_event_fd_.get();
510         return block;
511     }
512 
ReadEventsUsbFfsConnection513     void ReadEvents() {
514         static constexpr size_t kMaxEvents = kUsbReadQueueDepth + kUsbWriteQueueDepth;
515         struct io_event events[kMaxEvents];
516         struct timespec timeout = {.tv_sec = 0, .tv_nsec = 0};
517         int rc = io_getevents(aio_context_.get(), 0, kMaxEvents, events, &timeout);
518         if (rc == -1) {
519             HandleError(StringPrintf("io_getevents failed while reading: %s", strerror(errno)));
520             return;
521         }
522 
523         for (int event_idx = 0; event_idx < rc; ++event_idx) {
524             auto& event = events[event_idx];
525             TransferId id = TransferId::from_value(event.data);
526 
527             if (event.res < 0) {
528                 std::string error =
529                         StringPrintf("%s %" PRIu64 " failed with error %s",
530                                      id.direction == TransferDirection::READ ? "read" : "write",
531                                      id.id, strerror(-event.res));
532                 HandleError(error);
533                 return;
534             }
535 
536             if (id.direction == TransferDirection::READ) {
537                 if (!HandleRead(id, event.res)) {
538                     return;
539                 }
540             } else {
541                 HandleWrite(id);
542             }
543         }
544     }
545 
HandleReadUsbFfsConnection546     bool HandleRead(TransferId id, int64_t size) {
547         uint64_t read_idx = id.id % kUsbReadQueueDepth;
548         IoReadBlock* block = &read_requests_[read_idx];
549         block->pending = false;
550         block->payload.resize(size);
551 
552         // Notification for completed reads can be received out of order.
553         if (block->id().id != needed_read_id_) {
554             LOG(VERBOSE) << "read " << block->id().id << " completed while waiting for "
555                          << needed_read_id_;
556             return true;
557         }
558 
559         for (uint64_t id = needed_read_id_;; ++id) {
560             size_t read_idx = id % kUsbReadQueueDepth;
561             IoReadBlock* current_block = &read_requests_[read_idx];
562             if (current_block->pending) {
563                 break;
564             }
565             if (!ProcessRead(current_block)) {
566                 return false;
567             }
568             ++needed_read_id_;
569         }
570 
571         return true;
572     }
573 
ProcessReadUsbFfsConnection574     bool ProcessRead(IoReadBlock* block) {
575         if (!block->payload.empty()) {
576             if (!incoming_header_.has_value()) {
577                 if (block->payload.size() != sizeof(amessage)) {
578                     HandleError("received packet of unexpected length while reading header");
579                     return false;
580                 }
581                 amessage& msg = incoming_header_.emplace();
582                 memcpy(&msg, block->payload.data(), sizeof(msg));
583                 LOG(DEBUG) << "USB read:" << dump_header(&msg);
584                 incoming_header_ = msg;
585             } else {
586                 size_t bytes_left = incoming_header_->data_length - incoming_payload_.size();
587                 Block payload = std::move(block->payload);
588                 if (block->payload.size() > bytes_left) {
589                     HandleError("received too many bytes while waiting for payload");
590                     return false;
591                 }
592                 incoming_payload_.append(std::move(payload));
593             }
594 
595             if (incoming_header_->data_length == incoming_payload_.size()) {
596                 auto packet = std::make_unique<apacket>();
597                 packet->msg = *incoming_header_;
598 
599                 // TODO: Make apacket contain an IOVector so we don't have to coalesce.
600                 packet->payload = std::move(incoming_payload_).coalesce();
601                 read_callback_(this, std::move(packet));
602 
603                 incoming_header_.reset();
604                 // reuse the capacity of the incoming payload while we can.
605                 auto free_block = incoming_payload_.clear();
606                 if (block->payload.capacity() == 0) {
607                     block->payload = std::move(free_block);
608                 }
609             }
610         }
611 
612         PrepareReadBlock(block, block->id().id + kUsbReadQueueDepth);
613         SubmitRead(block);
614         return true;
615     }
616 
SubmitReadUsbFfsConnection617     bool SubmitRead(IoReadBlock* block) {
618         block->pending = true;
619         struct iocb* iocb = &block->control;
620         if (io_submit(aio_context_.get(), 1, &iocb) != 1) {
621             HandleError(StringPrintf("failed to submit read: %s", strerror(errno)));
622             return false;
623         }
624 
625         return true;
626     }
627 
HandleWriteUsbFfsConnection628     void HandleWrite(TransferId id) {
629         std::lock_guard<std::mutex> lock(write_mutex_);
630         auto it =
631                 std::find_if(write_requests_.begin(), write_requests_.end(), [id](const auto& req) {
632                     return static_cast<uint64_t>(req.id()) == static_cast<uint64_t>(id);
633                 });
634         CHECK(it != write_requests_.end());
635 
636         write_requests_.erase(it);
637         size_t outstanding_writes = --writes_submitted_;
638         LOG(DEBUG) << "USB write: reaped, down to " << outstanding_writes;
639     }
640 
CreateWriteBlockUsbFfsConnection641     IoWriteBlock CreateWriteBlock(std::shared_ptr<Block> payload, size_t offset, size_t len,
642                                   uint64_t id) {
643         auto block = IoWriteBlock();
644         block.payload = std::move(payload);
645         block.control.aio_data = static_cast<uint64_t>(TransferId::write(id));
646         block.control.aio_rw_flags = 0;
647         block.control.aio_lio_opcode = IOCB_CMD_PWRITE;
648         block.control.aio_reqprio = 0;
649         block.control.aio_fildes = write_fd_.get();
650         block.control.aio_buf = reinterpret_cast<uintptr_t>(block.payload->data() + offset);
651         block.control.aio_nbytes = len;
652         block.control.aio_offset = 0;
653         block.control.aio_flags = IOCB_FLAG_RESFD;
654         block.control.aio_resfd = worker_event_fd_.get();
655         return block;
656     }
657 
CreateWriteBlockUsbFfsConnection658     IoWriteBlock CreateWriteBlock(Block&& payload, uint64_t id) {
659         size_t len = payload.size();
660         return CreateWriteBlock(std::make_shared<Block>(std::move(payload)), 0, len, id);
661     }
662 
SubmitWritesUsbFfsConnection663     void SubmitWrites() REQUIRES(write_mutex_) {
664         if (writes_submitted_ == kUsbWriteQueueDepth) {
665             return;
666         }
667 
668         ssize_t writes_to_submit = std::min(kUsbWriteQueueDepth - writes_submitted_,
669                                             write_requests_.size() - writes_submitted_);
670         CHECK_GE(writes_to_submit, 0);
671         if (writes_to_submit == 0) {
672             return;
673         }
674 
675         struct iocb* iocbs[kUsbWriteQueueDepth];
676         for (int i = 0; i < writes_to_submit; ++i) {
677             CHECK(!write_requests_[writes_submitted_ + i].pending);
678             write_requests_[writes_submitted_ + i].pending = true;
679             iocbs[i] = &write_requests_[writes_submitted_ + i].control;
680             LOG(VERBOSE) << "submitting write_request " << static_cast<void*>(iocbs[i]);
681         }
682 
683         writes_submitted_ += writes_to_submit;
684 
685         int rc = io_submit(aio_context_.get(), writes_to_submit, iocbs);
686         if (rc == -1) {
687             HandleError(StringPrintf("failed to submit write requests: %s", strerror(errno)));
688             return;
689         } else if (rc != writes_to_submit) {
690             LOG(FATAL) << "failed to submit all writes: wanted to submit " << writes_to_submit
691                        << ", actually submitted " << rc;
692         }
693     }
694 
HandleErrorUsbFfsConnection695     void HandleError(const std::string& error) {
696         std::call_once(error_flag_, [&]() {
697             error_callback_(this, error);
698             if (!stopped_) {
699                 Stop();
700             }
701         });
702     }
703 
704     std::thread monitor_thread_;
705 
706     bool worker_started_;
707     std::thread worker_thread_;
708 
709     std::atomic<bool> stopped_;
710     std::promise<void> destruction_notifier_;
711     std::once_flag error_flag_;
712 
713     unique_fd worker_event_fd_;
714     unique_fd monitor_event_fd_;
715 
716     ScopedAioContext aio_context_;
717     unique_fd control_fd_;
718     unique_fd read_fd_;
719     unique_fd write_fd_;
720 
721     std::optional<amessage> incoming_header_;
722     IOVector incoming_payload_;
723 
724     std::array<IoReadBlock, kUsbReadQueueDepth> read_requests_;
725     IOVector read_data_;
726 
727     // ID of the next request that we're going to send out.
728     size_t next_read_id_ = 0;
729 
730     // ID of the next packet we're waiting for.
731     size_t needed_read_id_ = 0;
732 
733     std::mutex write_mutex_;
734     std::deque<IoWriteBlock> write_requests_ GUARDED_BY(write_mutex_);
735     size_t next_write_id_ GUARDED_BY(write_mutex_) = 0;
736     size_t writes_submitted_ GUARDED_BY(write_mutex_) = 0;
737 
738     static constexpr int kInterruptionSignal = SIGUSR1;
739 };
740 
usb_ffs_open_thread()741 static void usb_ffs_open_thread() {
742     adb_thread_setname("usb ffs open");
743 
744     while (true) {
745         unique_fd control;
746         unique_fd bulk_out;
747         unique_fd bulk_in;
748         if (!open_functionfs(&control, &bulk_out, &bulk_in)) {
749             std::this_thread::sleep_for(1s);
750             continue;
751         }
752 
753         atransport* transport = new atransport();
754         transport->serial = "UsbFfs";
755         std::promise<void> destruction_notifier;
756         std::future<void> future = destruction_notifier.get_future();
757         transport->SetConnection(std::make_unique<UsbFfsConnection>(
758                 std::move(control), std::move(bulk_out), std::move(bulk_in),
759                 std::move(destruction_notifier)));
760         register_transport(transport);
761         future.wait();
762     }
763 }
764 
usb_init()765 void usb_init() {
766     std::thread(usb_ffs_open_thread).detach();
767 }
768