1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define TRACE_TAG USB
18 
19 #include "sysdeps.h"
20 
21 #include <errno.h>
22 #include <inttypes.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <sys/ioctl.h>
27 #include <sys/types.h>
28 #include <unistd.h>
29 
30 #include <linux/usb/functionfs.h>
31 #include <sys/eventfd.h>
32 
33 #include <algorithm>
34 #include <array>
35 #include <future>
36 #include <memory>
37 #include <mutex>
38 #include <optional>
39 #include <vector>
40 
41 #include <asyncio/AsyncIO.h>
42 
43 #include <android-base/logging.h>
44 #include <android-base/macros.h>
45 #include <android-base/parsebool.h>
46 #include <android-base/properties.h>
47 #include <android-base/thread_annotations.h>
48 
49 #include "adb_unique_fd.h"
50 #include "adb_utils.h"
51 #include "apacket_reader.h"
52 #include "daemon/property_monitor.h"
53 #include "daemon/usb_ffs.h"
54 #include "sysdeps/chrono.h"
55 #include "transfer_id.h"
56 #include "transport.h"
57 #include "types.h"
58 
59 using android::base::StringPrintf;
60 
61 // Not all USB controllers support operations larger than 16k, so don't go above that.
62 // Also, each submitted operation does an allocation in the kernel of that size, so we want to
63 // minimize our queue depth while still maintaining a deep enough queue to keep the USB stack fed.
64 static constexpr size_t kUsbReadQueueDepth = 8;
65 static constexpr size_t kUsbReadSize = 16384;
66 
67 static constexpr size_t kUsbWriteQueueDepth = 8;
68 static constexpr size_t kUsbWriteSize = 16384;
69 
to_string(enum usb_functionfs_event_type type)70 static const char* to_string(enum usb_functionfs_event_type type) {
71     switch (type) {
72         case FUNCTIONFS_BIND:
73             return "FUNCTIONFS_BIND";
74         case FUNCTIONFS_UNBIND:
75             return "FUNCTIONFS_UNBIND";
76         case FUNCTIONFS_ENABLE:
77             return "FUNCTIONFS_ENABLE";
78         case FUNCTIONFS_DISABLE:
79             return "FUNCTIONFS_DISABLE";
80         case FUNCTIONFS_SETUP:
81             return "FUNCTIONFS_SETUP";
82         case FUNCTIONFS_SUSPEND:
83             return "FUNCTIONFS_SUSPEND";
84         case FUNCTIONFS_RESUME:
85             return "FUNCTIONFS_RESUME";
86     }
87 }
88 
89 template <class Payload>
90 struct IoBlock {
91     bool pending = false;
92     struct iocb control = {};
93     Payload payload;
94 
idIoBlock95     TransferId id() const { return TransferId::from_value(control.aio_data); }
96 };
97 
98 using IoReadBlock = IoBlock<Block>;
99 using IoWriteBlock = IoBlock<std::shared_ptr<Block>>;
100 
101 struct ScopedAioContext {
102     ScopedAioContext() = default;
~ScopedAioContextScopedAioContext103     ~ScopedAioContext() { reset(); }
104 
ScopedAioContextScopedAioContext105     ScopedAioContext(ScopedAioContext&& move) { reset(move.release()); }
106     ScopedAioContext(const ScopedAioContext& copy) = delete;
107 
operator =ScopedAioContext108     ScopedAioContext& operator=(ScopedAioContext&& move) {
109         reset(move.release());
110         return *this;
111     }
112     ScopedAioContext& operator=(const ScopedAioContext& copy) = delete;
113 
CreateScopedAioContext114     static ScopedAioContext Create(size_t max_events) {
115         aio_context_t ctx = 0;
116         if (io_setup(max_events, &ctx) != 0) {
117             PLOG(FATAL) << "failed to create aio_context_t";
118         }
119         ScopedAioContext result;
120         result.reset(ctx);
121         return result;
122     }
123 
releaseScopedAioContext124     aio_context_t release() {
125         aio_context_t result = context_;
126         context_ = 0;
127         return result;
128     }
129 
resetScopedAioContext130     void reset(aio_context_t new_context = 0) {
131         if (context_ != 0) {
132             io_destroy(context_);
133         }
134 
135         context_ = new_context;
136     }
137 
getScopedAioContext138     aio_context_t get() { return context_; }
139 
140   private:
141     aio_context_t context_ = 0;
142 };
143 
144 struct UsbFfsConnection : public Connection {
UsbFfsConnectionUsbFfsConnection145     UsbFfsConnection(unique_fd control, unique_fd read, unique_fd write,
146                      std::promise<void> destruction_notifier)
147         : worker_started_(false),
148           stopped_(false),
149           destruction_notifier_(std::move(destruction_notifier)),
150           control_fd_(std::move(control)),
151           read_fd_(std::move(read)),
152           write_fd_(std::move(write)) {
153         LOG(INFO) << "UsbFfsConnection constructed";
154         worker_event_fd_.reset(eventfd(0, EFD_CLOEXEC));
155         if (worker_event_fd_ == -1) {
156             PLOG(FATAL) << "failed to create eventfd";
157         }
158 
159         monitor_event_fd_.reset(eventfd(0, EFD_CLOEXEC));
160         if (monitor_event_fd_ == -1) {
161             PLOG(FATAL) << "failed to create eventfd";
162         }
163 
164         aio_context_ = ScopedAioContext::Create(kUsbReadQueueDepth + kUsbWriteQueueDepth);
165     }
166 
~UsbFfsConnectionUsbFfsConnection167     ~UsbFfsConnection() {
168         LOG(INFO) << "UsbFfsConnection being destroyed";
169         Stop();
170         monitor_thread_.join();
171 
172         // We need to explicitly close our file descriptors before we notify our destruction,
173         // because the thread listening on the future will immediately try to reopen the endpoint.
174         aio_context_.reset();
175         control_fd_.reset();
176         read_fd_.reset();
177         write_fd_.reset();
178 
179         destruction_notifier_.set_value();
180     }
181 
WriteUsbFfsConnection182     virtual bool Write(std::unique_ptr<apacket> packet) override final {
183         LOG(DEBUG) << "USB write: " << dump_header(&packet->msg);
184         auto header = std::make_shared<Block>(sizeof(packet->msg));
185         memcpy(header->data(), &packet->msg, sizeof(packet->msg));
186 
187         std::lock_guard<std::mutex> lock(write_mutex_);
188         write_requests_.push_back(
189                 CreateWriteBlock(std::move(header), 0, sizeof(packet->msg), next_write_id_++));
190         if (!packet->payload.empty()) {
191             // The kernel attempts to allocate a contiguous block of memory for each write,
192             // which can fail if the write is large and the kernel heap is fragmented.
193             // Split large writes into smaller chunks to avoid this.
194             auto payload = std::make_shared<Block>(std::move(packet->payload));
195             size_t offset = 0;
196             size_t len = payload->size();
197 
198             while (len > 0) {
199                 size_t write_size = std::min(kUsbWriteSize, len);
200                 write_requests_.push_back(
201                         CreateWriteBlock(payload, offset, write_size, next_write_id_++));
202                 len -= write_size;
203                 offset += write_size;
204             }
205         }
206 
207         // Wake up the worker thread to submit writes.
208         uint64_t notify = 1;
209         ssize_t rc = adb_write(worker_event_fd_.get(), &notify, sizeof(notify));
210         if (rc < 0) {
211             PLOG(FATAL) << "failed to notify worker eventfd to submit writes";
212         }
213 
214         return true;
215     }
216 
StartUsbFfsConnection217     virtual bool Start() override final {
218         StartMonitor();
219         return true;
220     }
221 
StopUsbFfsConnection222     virtual void Stop() override final {
223         if (stopped_.exchange(true)) {
224             return;
225         }
226         stopped_ = true;
227         uint64_t notify = 1;
228         ssize_t rc = adb_write(worker_event_fd_.get(), &notify, sizeof(notify));
229         if (rc < 0) {
230             PLOG(FATAL) << "failed to notify worker eventfd to stop UsbFfsConnection";
231         }
232         CHECK_EQ(static_cast<size_t>(rc), sizeof(notify));
233 
234         rc = adb_write(monitor_event_fd_.get(), &notify, sizeof(notify));
235         if (rc < 0) {
236             PLOG(FATAL) << "failed to notify monitor eventfd to stop UsbFfsConnection";
237         }
238 
239         CHECK_EQ(static_cast<size_t>(rc), sizeof(notify));
240     }
241 
DoTlsHandshakeUsbFfsConnection242     virtual bool DoTlsHandshake(RSA* key, std::string* auth_key) override final {
243         // TODO: support TLS for usb connections.
244         LOG(FATAL) << "Not supported yet.";
245         return false;
246     }
247 
248   private:
StartMonitorUsbFfsConnection249     void StartMonitor() {
250         // This is a bit of a mess.
251         // It's possible for io_submit to end up blocking, if we call it as the endpoint
252         // becomes disabled. Work around this by having a monitor thread to listen for functionfs
253         // lifecycle events. If we notice an error condition (either we've become disabled, or we
254         // were never enabled in the first place), we send interruption signals to the worker thread
255         // until it dies, and then report failure to the transport via HandleError, which will
256         // eventually result in the transport being destroyed, which will result in UsbFfsConnection
257         // being destroyed, which unblocks the open thread and restarts this entire process.
258         static std::once_flag handler_once;
259         std::call_once(handler_once, []() { signal(kInterruptionSignal, [](int) {}); });
260 
261         monitor_thread_ = std::thread([this]() {
262             adb_thread_setname("UsbFfs-monitor");
263             LOG(INFO) << "UsbFfs-monitor thread spawned";
264 
265             bool bound = false;
266             bool enabled = false;
267             bool running = true;
268             while (running) {
269                 adb_pollfd pfd[2] = {
270                   { .fd = control_fd_.get(), .events = POLLIN, .revents = 0 },
271                   { .fd = monitor_event_fd_.get(), .events = POLLIN, .revents = 0 },
272                 };
273 
274                 // If we don't see our first bind within a second, try again.
275                 int timeout_ms = bound ? -1 : 1000;
276 
277                 int rc = TEMP_FAILURE_RETRY(adb_poll(pfd, 2, timeout_ms));
278                 if (rc == -1) {
279                     PLOG(FATAL) << "poll on USB control fd failed";
280                 } else if (rc == 0) {
281                     LOG(WARNING) << "timed out while waiting for FUNCTIONFS_BIND, trying again";
282                     break;
283                 }
284 
285                 if (pfd[1].revents) {
286                     // We were told to die.
287                     break;
288                 }
289 
290                 struct usb_functionfs_event event;
291                 rc = TEMP_FAILURE_RETRY(adb_read(control_fd_.get(), &event, sizeof(event)));
292                 if (rc == -1) {
293                     PLOG(FATAL) << "failed to read functionfs event";
294                 } else if (rc == 0) {
295                     LOG(WARNING) << "hit EOF on functionfs control fd";
296                     break;
297                 } else if (rc != sizeof(event)) {
298                     LOG(FATAL) << "read functionfs event of unexpected size, expected "
299                                << sizeof(event) << ", got " << rc;
300                 }
301 
302                 LOG(INFO) << "USB event: "
303                           << to_string(static_cast<usb_functionfs_event_type>(event.type));
304 
305                 switch (event.type) {
306                     case FUNCTIONFS_BIND:
307                         if (bound) {
308                             LOG(WARNING) << "received FUNCTIONFS_BIND while already bound?";
309                             running = false;
310                             break;
311                         }
312 
313                         if (enabled) {
314                             LOG(WARNING) << "received FUNCTIONFS_BIND while already enabled?";
315                             running = false;
316                             break;
317                         }
318 
319                         bound = true;
320                         break;
321 
322                     case FUNCTIONFS_ENABLE:
323                         if (!bound) {
324                             LOG(WARNING) << "received FUNCTIONFS_ENABLE while not bound?";
325                             running = false;
326                             break;
327                         }
328 
329                         if (enabled) {
330                             LOG(WARNING) << "received FUNCTIONFS_ENABLE while already enabled?";
331                             running = false;
332                             break;
333                         }
334 
335                         enabled = true;
336                         StartWorker();
337                         break;
338 
339                     case FUNCTIONFS_DISABLE:
340                         if (!bound) {
341                             LOG(WARNING) << "received FUNCTIONFS_DISABLE while not bound?";
342                         }
343 
344                         if (!enabled) {
345                             LOG(WARNING) << "received FUNCTIONFS_DISABLE while not enabled?";
346                         }
347 
348                         enabled = false;
349                         running = false;
350                         break;
351 
352                     case FUNCTIONFS_UNBIND:
353                         if (enabled) {
354                             LOG(WARNING) << "received FUNCTIONFS_UNBIND while still enabled?";
355                         }
356 
357                         if (!bound) {
358                             LOG(WARNING) << "received FUNCTIONFS_UNBIND when not bound?";
359                         }
360 
361                         bound = false;
362                         running = false;
363                         break;
364 
365                     case FUNCTIONFS_SETUP: {
366                         LOG(INFO) << "received FUNCTIONFS_SETUP control transfer: bRequestType = "
367                                   << static_cast<int>(event.u.setup.bRequestType)
368                                   << ", bRequest = " << static_cast<int>(event.u.setup.bRequest)
369                                   << ", wValue = " << static_cast<int>(event.u.setup.wValue)
370                                   << ", wIndex = " << static_cast<int>(event.u.setup.wIndex)
371                                   << ", wLength = " << static_cast<int>(event.u.setup.wLength);
372 
373                         if ((event.u.setup.bRequestType & USB_DIR_IN)) {
374                             LOG(INFO) << "acking device-to-host control transfer";
375                             ssize_t rc = adb_write(control_fd_.get(), "", 0);
376                             if (rc != 0) {
377                                 PLOG(ERROR) << "failed to write empty packet to host";
378                                 break;
379                             }
380                         } else {
381                             std::string buf;
382                             buf.resize(event.u.setup.wLength + 1);
383 
384                             ssize_t rc = adb_read(control_fd_.get(), buf.data(), buf.size());
385                             if (rc != event.u.setup.wLength) {
386                                 LOG(ERROR)
387                                         << "read " << rc
388                                         << " bytes when trying to read control request, expected "
389                                         << event.u.setup.wLength;
390                             }
391 
392                             LOG(INFO) << "control request contents: " << buf;
393                             break;
394                         }
395                     }
396                 }
397             }
398 
399             StopWorker();
400             HandleError("monitor thread finished");
401         });
402     }
403 
StartWorkerUsbFfsConnection404     void StartWorker() {
405         CHECK(!worker_started_);
406         worker_started_ = true;
407         worker_thread_ = std::thread([this]() {
408             adb_thread_setname("UsbFfs-worker");
409             LOG(INFO) << "UsbFfs-worker thread spawned";
410 
411             for (size_t i = 0; i < kUsbReadQueueDepth; ++i) {
412                 read_requests_[i] = CreateReadBlock(next_read_id_++);
413                 if (!SubmitRead(&read_requests_[i])) {
414                     return;
415                 }
416             }
417 
418             while (!stopped_) {
419                 uint64_t dummy;
420                 ssize_t rc = adb_read(worker_event_fd_.get(), &dummy, sizeof(dummy));
421                 if (rc == -1) {
422                     PLOG(FATAL) << "failed to read from eventfd";
423                 } else if (rc == 0) {
424                     LOG(FATAL) << "hit EOF on eventfd";
425                 }
426 
427                 HandleEvents();
428 
429                 std::lock_guard<std::mutex> lock(write_mutex_);
430                 SubmitWrites();
431             }
432         });
433     }
434 
StopWorkerUsbFfsConnection435     void StopWorker() {
436         if (!worker_started_) {
437             return;
438         }
439 
440         pthread_t worker_thread_handle = worker_thread_.native_handle();
441         while (true) {
442             int rc = pthread_kill(worker_thread_handle, kInterruptionSignal);
443             if (rc != 0) {
444                 LOG(ERROR) << "failed to send interruption signal to worker: " << strerror(rc);
445                 break;
446             }
447 
448             std::this_thread::sleep_for(100ms);
449 
450             rc = pthread_kill(worker_thread_handle, 0);
451             if (rc == 0) {
452                 continue;
453             } else if (rc == ESRCH) {
454                 break;
455             } else {
456                 LOG(ERROR) << "failed to send interruption signal to worker: " << strerror(rc);
457             }
458         }
459 
460         worker_thread_.join();
461     }
462 
PrepareReadBlockUsbFfsConnection463     void PrepareReadBlock(IoReadBlock* block, uint64_t id) {
464         block->pending = false;
465         if (block->payload.capacity() >= kUsbReadSize) {
466             block->payload.resize(kUsbReadSize);
467         } else {
468             block->payload = Block(kUsbReadSize);
469         }
470         block->control.aio_data = static_cast<uint64_t>(TransferId::read(id));
471         block->control.aio_buf = reinterpret_cast<uintptr_t>(block->payload.data());
472         block->control.aio_nbytes = block->payload.size();
473     }
474 
CreateReadBlockUsbFfsConnection475     IoReadBlock CreateReadBlock(uint64_t id) {
476         IoReadBlock block;
477         PrepareReadBlock(&block, id);
478         block.control.aio_rw_flags = 0;
479         block.control.aio_lio_opcode = IOCB_CMD_PREAD;
480         block.control.aio_reqprio = 0;
481         block.control.aio_fildes = read_fd_.get();
482         block.control.aio_offset = 0;
483         block.control.aio_flags = IOCB_FLAG_RESFD;
484         block.control.aio_resfd = worker_event_fd_.get();
485         return block;
486     }
487 
HandleEventsUsbFfsConnection488     void HandleEvents() {
489         static constexpr size_t kMaxEvents = kUsbReadQueueDepth + kUsbWriteQueueDepth;
490         struct io_event events[kMaxEvents];
491         struct timespec timeout = {.tv_sec = 0, .tv_nsec = 0};
492         int rc = io_getevents(aio_context_.get(), 0, kMaxEvents, events, &timeout);
493         if (rc == -1) {
494             HandleError(StringPrintf("io_getevents failed while reading: %s", strerror(errno)));
495             return;
496         }
497 
498         for (int event_idx = 0; event_idx < rc; ++event_idx) {
499             auto& event = events[event_idx];
500             TransferId id = TransferId::from_value(event.data);
501 
502             if (event.res < 0) {
503                 // On initial connection, some clients will send a ClearFeature(HALT) to
504                 // attempt to resynchronize host and device after the adb server is killed.
505                 // On newer device kernels, the reads we've already dispatched will be cancelled.
506                 // Instead of treating this as a failure, which will tear down the interface and
507                 // lead to the client doing the same thing again, just resubmit if this happens
508                 // before we've actually read anything.
509                 if (!connection_started_ && event.res == -EPIPE &&
510                     id.direction == TransferDirection::READ) {
511                     uint64_t read_idx = id.id % kUsbReadQueueDepth;
512                     SubmitRead(&read_requests_[read_idx]);
513                     continue;
514                 } else {
515                     std::string error =
516                             StringPrintf("%s %" PRIu64 " failed with error %s",
517                                          id.direction == TransferDirection::READ ? "read" : "write",
518                                          id.id, strerror(-event.res));
519                     HandleError(error);
520                     return;
521                 }
522             }
523 
524             if (id.direction == TransferDirection::READ) {
525                 connection_started_ = true;
526                 if (!HandleRead(id, event.res)) {
527                     return;
528                 }
529             } else {
530                 HandleWrite(id);
531             }
532         }
533     }
534 
HandleReadUsbFfsConnection535     bool HandleRead(TransferId id, int64_t size) {
536         uint64_t read_idx = id.id % kUsbReadQueueDepth;
537         IoReadBlock* block = &read_requests_[read_idx];
538         block->pending = false;
539         VLOG(USB) << "HandleRead, resizing from " << block->payload.size() << " to " << size;
540         block->payload.resize(size);
541 
542         // Notification for completed reads can be received out of order.
543         if (block->id().id != needed_read_id_) {
544             LOG(VERBOSE) << "read " << block->id().id << " completed while waiting for "
545                          << needed_read_id_;
546             return true;
547         }
548 
549         for (uint64_t id = needed_read_id_;; ++id) {
550             size_t read_idx = id % kUsbReadQueueDepth;
551             IoReadBlock* current_block = &read_requests_[read_idx];
552             if (current_block->pending) {
553                 break;
554             }
555             if (!ProcessRead(current_block)) {
556                 return false;
557             }
558             ++needed_read_id_;
559         }
560 
561         return true;
562     }
563 
ProcessReadUsbFfsConnection564     bool ProcessRead(IoReadBlock* block) {
565         if (!block->payload.empty()) {
566             if (packet_reader_.add_bytes(std::move(block->payload)) != APacketReader::OK) {
567                 HandleError("Error while reading USB block");
568                 return false;
569             }
570 
571             auto packets = packet_reader_.get_packets();
572             for (auto& p : packets) {
573                 if (p->msg.command == A_CNXN) {
574                     CancelWrites();
575                 }
576                 transport_->HandleRead(std::move(p));
577             }
578 
579             block->payload.clear();
580         }
581 
582         PrepareReadBlock(block, block->id().id + kUsbReadQueueDepth);
583         SubmitRead(block);
584         return true;
585     }
586 
SubmitReadUsbFfsConnection587     bool SubmitRead(IoReadBlock* block) {
588         block->pending = true;
589         struct iocb* iocb = &block->control;
590         if (io_submit(aio_context_.get(), 1, &iocb) != 1) {
591             HandleError(StringPrintf("failed to submit read: %s", strerror(errno)));
592             return false;
593         }
594 
595         return true;
596     }
597 
HandleWriteUsbFfsConnection598     void HandleWrite(TransferId id) {
599         std::lock_guard<std::mutex> lock(write_mutex_);
600         auto it =
601                 std::find_if(write_requests_.begin(), write_requests_.end(), [id](const auto& req) {
602                     return static_cast<uint64_t>(req.id()) == static_cast<uint64_t>(id);
603                 });
604         CHECK(it != write_requests_.end());
605 
606         write_requests_.erase(it);
607         size_t outstanding_writes = --writes_submitted_;
608         LOG(DEBUG) << "USB write: reaped, down to " << outstanding_writes;
609     }
610 
CreateWriteBlockUsbFfsConnection611     IoWriteBlock CreateWriteBlock(std::shared_ptr<Block> payload, size_t offset, size_t len,
612                                   uint64_t id) {
613         auto block = IoWriteBlock();
614         block.payload = std::move(payload);
615         block.control.aio_data = static_cast<uint64_t>(TransferId::write(id));
616         block.control.aio_rw_flags = 0;
617         block.control.aio_lio_opcode = IOCB_CMD_PWRITE;
618         block.control.aio_reqprio = 0;
619         block.control.aio_fildes = write_fd_.get();
620         block.control.aio_buf = reinterpret_cast<uintptr_t>(block.payload->data() + offset);
621         block.control.aio_nbytes = len;
622         block.control.aio_offset = 0;
623         block.control.aio_flags = IOCB_FLAG_RESFD;
624         block.control.aio_resfd = worker_event_fd_.get();
625         return block;
626     }
627 
CreateWriteBlockUsbFfsConnection628     IoWriteBlock CreateWriteBlock(Block&& payload, uint64_t id) {
629         size_t len = payload.size();
630         return CreateWriteBlock(std::make_shared<Block>(std::move(payload)), 0, len, id);
631     }
632 
SubmitWritesUsbFfsConnection633     void SubmitWrites() REQUIRES(write_mutex_) {
634         if (writes_submitted_ == kUsbWriteQueueDepth) {
635             return;
636         }
637 
638         ssize_t writes_to_submit = std::min(kUsbWriteQueueDepth - writes_submitted_,
639                                             write_requests_.size() - writes_submitted_);
640         CHECK_GE(writes_to_submit, 0);
641         if (writes_to_submit == 0) {
642             return;
643         }
644 
645         struct iocb* iocbs[kUsbWriteQueueDepth];
646         for (int i = 0; i < writes_to_submit; ++i) {
647             CHECK(!write_requests_[writes_submitted_ + i].pending);
648             write_requests_[writes_submitted_ + i].pending = true;
649             iocbs[i] = &write_requests_[writes_submitted_ + i].control;
650             LOG(VERBOSE) << "submitting write_request " << static_cast<void*>(iocbs[i]);
651         }
652 
653         writes_submitted_ += writes_to_submit;
654 
655         int rc = io_submit(aio_context_.get(), writes_to_submit, iocbs);
656         if (rc == -1) {
657             HandleError(StringPrintf("failed to submit write requests: %s", strerror(errno)));
658             return;
659         } else if (rc != writes_to_submit) {
660             LOG(FATAL) << "failed to submit all writes: wanted to submit " << writes_to_submit
661                        << ", actually submitted " << rc;
662         }
663     }
664 
CancelWritesUsbFfsConnection665     void CancelWrites() {
666         std::lock_guard<std::mutex> lock(write_mutex_);
667         for (size_t i = 0; i < writes_submitted_; ++i) {
668             struct io_event res;
669             if (write_requests_[i].pending == true) {
670                 LOG(INFO) << "cancelling pending write# " << i;
671                 io_cancel(aio_context_.get(), &write_requests_[i].control, &res);
672             }
673         }
674     }
675 
HandleErrorUsbFfsConnection676     void HandleError(const std::string& error) {
677         std::call_once(error_flag_, [&]() {
678             if (transport_) {
679                 transport_->HandleError(error);
680             }
681 
682             if (!stopped_) {
683                 Stop();
684             }
685         });
686     }
687 
688     std::thread monitor_thread_;
689 
690     bool worker_started_;
691     std::thread worker_thread_;
692 
693     std::atomic<bool> stopped_;
694     std::promise<void> destruction_notifier_;
695     std::once_flag error_flag_;
696 
697     unique_fd worker_event_fd_;
698     unique_fd monitor_event_fd_;
699 
700     ScopedAioContext aio_context_;
701     unique_fd control_fd_;
702     unique_fd read_fd_;
703     unique_fd write_fd_;
704 
705     bool connection_started_ = false;
706     APacketReader packet_reader_;
707 
708     std::array<IoReadBlock, kUsbReadQueueDepth> read_requests_;
709     IOVector read_data_;
710 
711     // ID of the next request that we're going to send out.
712     size_t next_read_id_ = 0;
713 
714     // ID of the next packet we're waiting for.
715     size_t needed_read_id_ = 0;
716 
717     std::mutex write_mutex_;
718     std::deque<IoWriteBlock> write_requests_ GUARDED_BY(write_mutex_);
719     size_t next_write_id_ GUARDED_BY(write_mutex_) = 0;
720     size_t writes_submitted_ GUARDED_BY(write_mutex_) = 0;
721 
722     static constexpr int kInterruptionSignal = SIGUSR1;
723 };
724 
usb_ffs_open_thread()725 static void usb_ffs_open_thread() {
726     adb_thread_setname("usb ffs open");
727 
728     // When the device is acting as a USB host, we'll be unable to bind to the USB gadget on kernels
729     // that don't carry a downstream patch to enable that behavior.
730     //
731     // This property is copied from vendor.sys.usb.adb.disabled by an init.rc script.
732     //
733     // Note that this property only disables rebinding the USB gadget: setting it while an interface
734     // is already bound will do nothing.
735     static const char* kPropertyUsbDisabled = "sys.usb.adb.disabled";
736     PropertyMonitor prop_mon;
737     prop_mon.Add(kPropertyUsbDisabled, [](std::string value) {
738         // Return false (i.e. break out of PropertyMonitor::Run) when the property != 1.
739         return android::base::ParseBool(value) == android::base::ParseBoolResult::kTrue;
740     });
741 
742     while (true) {
743         unique_fd control;
744         unique_fd bulk_out;
745         unique_fd bulk_in;
746         if (!open_functionfs(&control, &bulk_out, &bulk_in)) {
747             std::this_thread::sleep_for(1s);
748             continue;
749         }
750 
751         if (android::base::GetBoolProperty(kPropertyUsbDisabled, false)) {
752             LOG(INFO) << "pausing USB due to " << kPropertyUsbDisabled;
753             prop_mon.Run();
754             LOG(INFO) << "resuming USB";
755         }
756 
757         atransport* transport = new atransport(kTransportUsb);
758         transport->serial = "UsbFfs";
759         std::promise<void> destruction_notifier;
760         std::future<void> future = destruction_notifier.get_future();
761         transport->SetConnection(std::make_unique<UsbFfsConnection>(
762                 std::move(control), std::move(bulk_out), std::move(bulk_in),
763                 std::move(destruction_notifier)));
764         register_transport(transport);
765         future.wait();
766     }
767 }
768 
usb_init()769 void usb_init() {
770     std::thread(usb_ffs_open_thread).detach();
771 }
772