1 /*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #define TRACE_TAG USB
18
19 #include "sysdeps.h"
20
21 #include <errno.h>
22 #include <inttypes.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <sys/ioctl.h>
27 #include <sys/types.h>
28 #include <unistd.h>
29
30 #include <linux/usb/functionfs.h>
31 #include <sys/eventfd.h>
32
33 #include <algorithm>
34 #include <array>
35 #include <future>
36 #include <memory>
37 #include <mutex>
38 #include <optional>
39 #include <vector>
40
41 #include <asyncio/AsyncIO.h>
42
43 #include <android-base/logging.h>
44 #include <android-base/macros.h>
45 #include <android-base/parsebool.h>
46 #include <android-base/properties.h>
47 #include <android-base/thread_annotations.h>
48
49 #include "adb_unique_fd.h"
50 #include "adb_utils.h"
51 #include "apacket_reader.h"
52 #include "daemon/property_monitor.h"
53 #include "daemon/usb_ffs.h"
54 #include "sysdeps/chrono.h"
55 #include "transfer_id.h"
56 #include "transport.h"
57 #include "types.h"
58
59 using android::base::StringPrintf;
60
61 // Not all USB controllers support operations larger than 16k, so don't go above that.
62 // Also, each submitted operation does an allocation in the kernel of that size, so we want to
63 // minimize our queue depth while still maintaining a deep enough queue to keep the USB stack fed.
64 static constexpr size_t kUsbReadQueueDepth = 8;
65 static constexpr size_t kUsbReadSize = 16384;
66
67 static constexpr size_t kUsbWriteQueueDepth = 8;
68 static constexpr size_t kUsbWriteSize = 16384;
69
to_string(enum usb_functionfs_event_type type)70 static const char* to_string(enum usb_functionfs_event_type type) {
71 switch (type) {
72 case FUNCTIONFS_BIND:
73 return "FUNCTIONFS_BIND";
74 case FUNCTIONFS_UNBIND:
75 return "FUNCTIONFS_UNBIND";
76 case FUNCTIONFS_ENABLE:
77 return "FUNCTIONFS_ENABLE";
78 case FUNCTIONFS_DISABLE:
79 return "FUNCTIONFS_DISABLE";
80 case FUNCTIONFS_SETUP:
81 return "FUNCTIONFS_SETUP";
82 case FUNCTIONFS_SUSPEND:
83 return "FUNCTIONFS_SUSPEND";
84 case FUNCTIONFS_RESUME:
85 return "FUNCTIONFS_RESUME";
86 }
87 }
88
89 template <class Payload>
90 struct IoBlock {
91 bool pending = false;
92 struct iocb control = {};
93 Payload payload;
94
idIoBlock95 TransferId id() const { return TransferId::from_value(control.aio_data); }
96 };
97
98 using IoReadBlock = IoBlock<Block>;
99 using IoWriteBlock = IoBlock<std::shared_ptr<Block>>;
100
101 struct ScopedAioContext {
102 ScopedAioContext() = default;
~ScopedAioContextScopedAioContext103 ~ScopedAioContext() { reset(); }
104
ScopedAioContextScopedAioContext105 ScopedAioContext(ScopedAioContext&& move) { reset(move.release()); }
106 ScopedAioContext(const ScopedAioContext& copy) = delete;
107
operator =ScopedAioContext108 ScopedAioContext& operator=(ScopedAioContext&& move) {
109 reset(move.release());
110 return *this;
111 }
112 ScopedAioContext& operator=(const ScopedAioContext& copy) = delete;
113
CreateScopedAioContext114 static ScopedAioContext Create(size_t max_events) {
115 aio_context_t ctx = 0;
116 if (io_setup(max_events, &ctx) != 0) {
117 PLOG(FATAL) << "failed to create aio_context_t";
118 }
119 ScopedAioContext result;
120 result.reset(ctx);
121 return result;
122 }
123
releaseScopedAioContext124 aio_context_t release() {
125 aio_context_t result = context_;
126 context_ = 0;
127 return result;
128 }
129
resetScopedAioContext130 void reset(aio_context_t new_context = 0) {
131 if (context_ != 0) {
132 io_destroy(context_);
133 }
134
135 context_ = new_context;
136 }
137
getScopedAioContext138 aio_context_t get() { return context_; }
139
140 private:
141 aio_context_t context_ = 0;
142 };
143
144 struct UsbFfsConnection : public Connection {
UsbFfsConnectionUsbFfsConnection145 UsbFfsConnection(unique_fd control, unique_fd read, unique_fd write,
146 std::promise<void> destruction_notifier)
147 : worker_started_(false),
148 stopped_(false),
149 destruction_notifier_(std::move(destruction_notifier)),
150 control_fd_(std::move(control)),
151 read_fd_(std::move(read)),
152 write_fd_(std::move(write)) {
153 LOG(INFO) << "UsbFfsConnection constructed";
154 worker_event_fd_.reset(eventfd(0, EFD_CLOEXEC));
155 if (worker_event_fd_ == -1) {
156 PLOG(FATAL) << "failed to create eventfd";
157 }
158
159 monitor_event_fd_.reset(eventfd(0, EFD_CLOEXEC));
160 if (monitor_event_fd_ == -1) {
161 PLOG(FATAL) << "failed to create eventfd";
162 }
163
164 aio_context_ = ScopedAioContext::Create(kUsbReadQueueDepth + kUsbWriteQueueDepth);
165 }
166
~UsbFfsConnectionUsbFfsConnection167 ~UsbFfsConnection() {
168 LOG(INFO) << "UsbFfsConnection being destroyed";
169 Stop();
170 monitor_thread_.join();
171
172 // We need to explicitly close our file descriptors before we notify our destruction,
173 // because the thread listening on the future will immediately try to reopen the endpoint.
174 aio_context_.reset();
175 control_fd_.reset();
176 read_fd_.reset();
177 write_fd_.reset();
178
179 destruction_notifier_.set_value();
180 }
181
WriteUsbFfsConnection182 virtual bool Write(std::unique_ptr<apacket> packet) override final {
183 LOG(DEBUG) << "USB write: " << dump_header(&packet->msg);
184 auto header = std::make_shared<Block>(sizeof(packet->msg));
185 memcpy(header->data(), &packet->msg, sizeof(packet->msg));
186
187 std::lock_guard<std::mutex> lock(write_mutex_);
188 write_requests_.push_back(
189 CreateWriteBlock(std::move(header), 0, sizeof(packet->msg), next_write_id_++));
190 if (!packet->payload.empty()) {
191 // The kernel attempts to allocate a contiguous block of memory for each write,
192 // which can fail if the write is large and the kernel heap is fragmented.
193 // Split large writes into smaller chunks to avoid this.
194 auto payload = std::make_shared<Block>(std::move(packet->payload));
195 size_t offset = 0;
196 size_t len = payload->size();
197
198 while (len > 0) {
199 size_t write_size = std::min(kUsbWriteSize, len);
200 write_requests_.push_back(
201 CreateWriteBlock(payload, offset, write_size, next_write_id_++));
202 len -= write_size;
203 offset += write_size;
204 }
205 }
206
207 // Wake up the worker thread to submit writes.
208 uint64_t notify = 1;
209 ssize_t rc = adb_write(worker_event_fd_.get(), ¬ify, sizeof(notify));
210 if (rc < 0) {
211 PLOG(FATAL) << "failed to notify worker eventfd to submit writes";
212 }
213
214 return true;
215 }
216
StartUsbFfsConnection217 virtual bool Start() override final {
218 StartMonitor();
219 return true;
220 }
221
StopUsbFfsConnection222 virtual void Stop() override final {
223 if (stopped_.exchange(true)) {
224 return;
225 }
226 stopped_ = true;
227 uint64_t notify = 1;
228 ssize_t rc = adb_write(worker_event_fd_.get(), ¬ify, sizeof(notify));
229 if (rc < 0) {
230 PLOG(FATAL) << "failed to notify worker eventfd to stop UsbFfsConnection";
231 }
232 CHECK_EQ(static_cast<size_t>(rc), sizeof(notify));
233
234 rc = adb_write(monitor_event_fd_.get(), ¬ify, sizeof(notify));
235 if (rc < 0) {
236 PLOG(FATAL) << "failed to notify monitor eventfd to stop UsbFfsConnection";
237 }
238
239 CHECK_EQ(static_cast<size_t>(rc), sizeof(notify));
240 }
241
DoTlsHandshakeUsbFfsConnection242 virtual bool DoTlsHandshake(RSA* key, std::string* auth_key) override final {
243 // TODO: support TLS for usb connections.
244 LOG(FATAL) << "Not supported yet.";
245 return false;
246 }
247
248 private:
StartMonitorUsbFfsConnection249 void StartMonitor() {
250 // This is a bit of a mess.
251 // It's possible for io_submit to end up blocking, if we call it as the endpoint
252 // becomes disabled. Work around this by having a monitor thread to listen for functionfs
253 // lifecycle events. If we notice an error condition (either we've become disabled, or we
254 // were never enabled in the first place), we send interruption signals to the worker thread
255 // until it dies, and then report failure to the transport via HandleError, which will
256 // eventually result in the transport being destroyed, which will result in UsbFfsConnection
257 // being destroyed, which unblocks the open thread and restarts this entire process.
258 static std::once_flag handler_once;
259 std::call_once(handler_once, []() { signal(kInterruptionSignal, [](int) {}); });
260
261 monitor_thread_ = std::thread([this]() {
262 adb_thread_setname("UsbFfs-monitor");
263 LOG(INFO) << "UsbFfs-monitor thread spawned";
264
265 bool bound = false;
266 bool enabled = false;
267 bool running = true;
268 while (running) {
269 adb_pollfd pfd[2] = {
270 { .fd = control_fd_.get(), .events = POLLIN, .revents = 0 },
271 { .fd = monitor_event_fd_.get(), .events = POLLIN, .revents = 0 },
272 };
273
274 // If we don't see our first bind within a second, try again.
275 int timeout_ms = bound ? -1 : 1000;
276
277 int rc = TEMP_FAILURE_RETRY(adb_poll(pfd, 2, timeout_ms));
278 if (rc == -1) {
279 PLOG(FATAL) << "poll on USB control fd failed";
280 } else if (rc == 0) {
281 LOG(WARNING) << "timed out while waiting for FUNCTIONFS_BIND, trying again";
282 break;
283 }
284
285 if (pfd[1].revents) {
286 // We were told to die.
287 break;
288 }
289
290 struct usb_functionfs_event event;
291 rc = TEMP_FAILURE_RETRY(adb_read(control_fd_.get(), &event, sizeof(event)));
292 if (rc == -1) {
293 PLOG(FATAL) << "failed to read functionfs event";
294 } else if (rc == 0) {
295 LOG(WARNING) << "hit EOF on functionfs control fd";
296 break;
297 } else if (rc != sizeof(event)) {
298 LOG(FATAL) << "read functionfs event of unexpected size, expected "
299 << sizeof(event) << ", got " << rc;
300 }
301
302 LOG(INFO) << "USB event: "
303 << to_string(static_cast<usb_functionfs_event_type>(event.type));
304
305 switch (event.type) {
306 case FUNCTIONFS_BIND:
307 if (bound) {
308 LOG(WARNING) << "received FUNCTIONFS_BIND while already bound?";
309 running = false;
310 break;
311 }
312
313 if (enabled) {
314 LOG(WARNING) << "received FUNCTIONFS_BIND while already enabled?";
315 running = false;
316 break;
317 }
318
319 bound = true;
320 break;
321
322 case FUNCTIONFS_ENABLE:
323 if (!bound) {
324 LOG(WARNING) << "received FUNCTIONFS_ENABLE while not bound?";
325 running = false;
326 break;
327 }
328
329 if (enabled) {
330 LOG(WARNING) << "received FUNCTIONFS_ENABLE while already enabled?";
331 running = false;
332 break;
333 }
334
335 enabled = true;
336 StartWorker();
337 break;
338
339 case FUNCTIONFS_DISABLE:
340 if (!bound) {
341 LOG(WARNING) << "received FUNCTIONFS_DISABLE while not bound?";
342 }
343
344 if (!enabled) {
345 LOG(WARNING) << "received FUNCTIONFS_DISABLE while not enabled?";
346 }
347
348 enabled = false;
349 running = false;
350 break;
351
352 case FUNCTIONFS_UNBIND:
353 if (enabled) {
354 LOG(WARNING) << "received FUNCTIONFS_UNBIND while still enabled?";
355 }
356
357 if (!bound) {
358 LOG(WARNING) << "received FUNCTIONFS_UNBIND when not bound?";
359 }
360
361 bound = false;
362 running = false;
363 break;
364
365 case FUNCTIONFS_SETUP: {
366 LOG(INFO) << "received FUNCTIONFS_SETUP control transfer: bRequestType = "
367 << static_cast<int>(event.u.setup.bRequestType)
368 << ", bRequest = " << static_cast<int>(event.u.setup.bRequest)
369 << ", wValue = " << static_cast<int>(event.u.setup.wValue)
370 << ", wIndex = " << static_cast<int>(event.u.setup.wIndex)
371 << ", wLength = " << static_cast<int>(event.u.setup.wLength);
372
373 if ((event.u.setup.bRequestType & USB_DIR_IN)) {
374 LOG(INFO) << "acking device-to-host control transfer";
375 ssize_t rc = adb_write(control_fd_.get(), "", 0);
376 if (rc != 0) {
377 PLOG(ERROR) << "failed to write empty packet to host";
378 break;
379 }
380 } else {
381 std::string buf;
382 buf.resize(event.u.setup.wLength + 1);
383
384 ssize_t rc = adb_read(control_fd_.get(), buf.data(), buf.size());
385 if (rc != event.u.setup.wLength) {
386 LOG(ERROR)
387 << "read " << rc
388 << " bytes when trying to read control request, expected "
389 << event.u.setup.wLength;
390 }
391
392 LOG(INFO) << "control request contents: " << buf;
393 break;
394 }
395 }
396 }
397 }
398
399 StopWorker();
400 HandleError("monitor thread finished");
401 });
402 }
403
StartWorkerUsbFfsConnection404 void StartWorker() {
405 CHECK(!worker_started_);
406 worker_started_ = true;
407 worker_thread_ = std::thread([this]() {
408 adb_thread_setname("UsbFfs-worker");
409 LOG(INFO) << "UsbFfs-worker thread spawned";
410
411 for (size_t i = 0; i < kUsbReadQueueDepth; ++i) {
412 read_requests_[i] = CreateReadBlock(next_read_id_++);
413 if (!SubmitRead(&read_requests_[i])) {
414 return;
415 }
416 }
417
418 while (!stopped_) {
419 uint64_t dummy;
420 ssize_t rc = adb_read(worker_event_fd_.get(), &dummy, sizeof(dummy));
421 if (rc == -1) {
422 PLOG(FATAL) << "failed to read from eventfd";
423 } else if (rc == 0) {
424 LOG(FATAL) << "hit EOF on eventfd";
425 }
426
427 HandleEvents();
428
429 std::lock_guard<std::mutex> lock(write_mutex_);
430 SubmitWrites();
431 }
432 });
433 }
434
StopWorkerUsbFfsConnection435 void StopWorker() {
436 if (!worker_started_) {
437 return;
438 }
439
440 pthread_t worker_thread_handle = worker_thread_.native_handle();
441 while (true) {
442 int rc = pthread_kill(worker_thread_handle, kInterruptionSignal);
443 if (rc != 0) {
444 LOG(ERROR) << "failed to send interruption signal to worker: " << strerror(rc);
445 break;
446 }
447
448 std::this_thread::sleep_for(100ms);
449
450 rc = pthread_kill(worker_thread_handle, 0);
451 if (rc == 0) {
452 continue;
453 } else if (rc == ESRCH) {
454 break;
455 } else {
456 LOG(ERROR) << "failed to send interruption signal to worker: " << strerror(rc);
457 }
458 }
459
460 worker_thread_.join();
461 }
462
PrepareReadBlockUsbFfsConnection463 void PrepareReadBlock(IoReadBlock* block, uint64_t id) {
464 block->pending = false;
465 if (block->payload.capacity() >= kUsbReadSize) {
466 block->payload.resize(kUsbReadSize);
467 } else {
468 block->payload = Block(kUsbReadSize);
469 }
470 block->control.aio_data = static_cast<uint64_t>(TransferId::read(id));
471 block->control.aio_buf = reinterpret_cast<uintptr_t>(block->payload.data());
472 block->control.aio_nbytes = block->payload.size();
473 }
474
CreateReadBlockUsbFfsConnection475 IoReadBlock CreateReadBlock(uint64_t id) {
476 IoReadBlock block;
477 PrepareReadBlock(&block, id);
478 block.control.aio_rw_flags = 0;
479 block.control.aio_lio_opcode = IOCB_CMD_PREAD;
480 block.control.aio_reqprio = 0;
481 block.control.aio_fildes = read_fd_.get();
482 block.control.aio_offset = 0;
483 block.control.aio_flags = IOCB_FLAG_RESFD;
484 block.control.aio_resfd = worker_event_fd_.get();
485 return block;
486 }
487
HandleEventsUsbFfsConnection488 void HandleEvents() {
489 static constexpr size_t kMaxEvents = kUsbReadQueueDepth + kUsbWriteQueueDepth;
490 struct io_event events[kMaxEvents];
491 struct timespec timeout = {.tv_sec = 0, .tv_nsec = 0};
492 int rc = io_getevents(aio_context_.get(), 0, kMaxEvents, events, &timeout);
493 if (rc == -1) {
494 HandleError(StringPrintf("io_getevents failed while reading: %s", strerror(errno)));
495 return;
496 }
497
498 for (int event_idx = 0; event_idx < rc; ++event_idx) {
499 auto& event = events[event_idx];
500 TransferId id = TransferId::from_value(event.data);
501
502 if (event.res < 0) {
503 // On initial connection, some clients will send a ClearFeature(HALT) to
504 // attempt to resynchronize host and device after the adb server is killed.
505 // On newer device kernels, the reads we've already dispatched will be cancelled.
506 // Instead of treating this as a failure, which will tear down the interface and
507 // lead to the client doing the same thing again, just resubmit if this happens
508 // before we've actually read anything.
509 if (!connection_started_ && event.res == -EPIPE &&
510 id.direction == TransferDirection::READ) {
511 uint64_t read_idx = id.id % kUsbReadQueueDepth;
512 SubmitRead(&read_requests_[read_idx]);
513 continue;
514 } else {
515 std::string error =
516 StringPrintf("%s %" PRIu64 " failed with error %s",
517 id.direction == TransferDirection::READ ? "read" : "write",
518 id.id, strerror(-event.res));
519 HandleError(error);
520 return;
521 }
522 }
523
524 if (id.direction == TransferDirection::READ) {
525 connection_started_ = true;
526 if (!HandleRead(id, event.res)) {
527 return;
528 }
529 } else {
530 HandleWrite(id);
531 }
532 }
533 }
534
HandleReadUsbFfsConnection535 bool HandleRead(TransferId id, int64_t size) {
536 uint64_t read_idx = id.id % kUsbReadQueueDepth;
537 IoReadBlock* block = &read_requests_[read_idx];
538 block->pending = false;
539 VLOG(USB) << "HandleRead, resizing from " << block->payload.size() << " to " << size;
540 block->payload.resize(size);
541
542 // Notification for completed reads can be received out of order.
543 if (block->id().id != needed_read_id_) {
544 LOG(VERBOSE) << "read " << block->id().id << " completed while waiting for "
545 << needed_read_id_;
546 return true;
547 }
548
549 for (uint64_t id = needed_read_id_;; ++id) {
550 size_t read_idx = id % kUsbReadQueueDepth;
551 IoReadBlock* current_block = &read_requests_[read_idx];
552 if (current_block->pending) {
553 break;
554 }
555 if (!ProcessRead(current_block)) {
556 return false;
557 }
558 ++needed_read_id_;
559 }
560
561 return true;
562 }
563
ProcessReadUsbFfsConnection564 bool ProcessRead(IoReadBlock* block) {
565 if (!block->payload.empty()) {
566 if (packet_reader_.add_bytes(std::move(block->payload)) != APacketReader::OK) {
567 HandleError("Error while reading USB block");
568 return false;
569 }
570
571 auto packets = packet_reader_.get_packets();
572 for (auto& p : packets) {
573 if (p->msg.command == A_CNXN) {
574 CancelWrites();
575 }
576 transport_->HandleRead(std::move(p));
577 }
578
579 block->payload.clear();
580 }
581
582 PrepareReadBlock(block, block->id().id + kUsbReadQueueDepth);
583 SubmitRead(block);
584 return true;
585 }
586
SubmitReadUsbFfsConnection587 bool SubmitRead(IoReadBlock* block) {
588 block->pending = true;
589 struct iocb* iocb = &block->control;
590 if (io_submit(aio_context_.get(), 1, &iocb) != 1) {
591 HandleError(StringPrintf("failed to submit read: %s", strerror(errno)));
592 return false;
593 }
594
595 return true;
596 }
597
HandleWriteUsbFfsConnection598 void HandleWrite(TransferId id) {
599 std::lock_guard<std::mutex> lock(write_mutex_);
600 auto it =
601 std::find_if(write_requests_.begin(), write_requests_.end(), [id](const auto& req) {
602 return static_cast<uint64_t>(req.id()) == static_cast<uint64_t>(id);
603 });
604 CHECK(it != write_requests_.end());
605
606 write_requests_.erase(it);
607 size_t outstanding_writes = --writes_submitted_;
608 LOG(DEBUG) << "USB write: reaped, down to " << outstanding_writes;
609 }
610
CreateWriteBlockUsbFfsConnection611 IoWriteBlock CreateWriteBlock(std::shared_ptr<Block> payload, size_t offset, size_t len,
612 uint64_t id) {
613 auto block = IoWriteBlock();
614 block.payload = std::move(payload);
615 block.control.aio_data = static_cast<uint64_t>(TransferId::write(id));
616 block.control.aio_rw_flags = 0;
617 block.control.aio_lio_opcode = IOCB_CMD_PWRITE;
618 block.control.aio_reqprio = 0;
619 block.control.aio_fildes = write_fd_.get();
620 block.control.aio_buf = reinterpret_cast<uintptr_t>(block.payload->data() + offset);
621 block.control.aio_nbytes = len;
622 block.control.aio_offset = 0;
623 block.control.aio_flags = IOCB_FLAG_RESFD;
624 block.control.aio_resfd = worker_event_fd_.get();
625 return block;
626 }
627
CreateWriteBlockUsbFfsConnection628 IoWriteBlock CreateWriteBlock(Block&& payload, uint64_t id) {
629 size_t len = payload.size();
630 return CreateWriteBlock(std::make_shared<Block>(std::move(payload)), 0, len, id);
631 }
632
SubmitWritesUsbFfsConnection633 void SubmitWrites() REQUIRES(write_mutex_) {
634 if (writes_submitted_ == kUsbWriteQueueDepth) {
635 return;
636 }
637
638 ssize_t writes_to_submit = std::min(kUsbWriteQueueDepth - writes_submitted_,
639 write_requests_.size() - writes_submitted_);
640 CHECK_GE(writes_to_submit, 0);
641 if (writes_to_submit == 0) {
642 return;
643 }
644
645 struct iocb* iocbs[kUsbWriteQueueDepth];
646 for (int i = 0; i < writes_to_submit; ++i) {
647 CHECK(!write_requests_[writes_submitted_ + i].pending);
648 write_requests_[writes_submitted_ + i].pending = true;
649 iocbs[i] = &write_requests_[writes_submitted_ + i].control;
650 LOG(VERBOSE) << "submitting write_request " << static_cast<void*>(iocbs[i]);
651 }
652
653 writes_submitted_ += writes_to_submit;
654
655 int rc = io_submit(aio_context_.get(), writes_to_submit, iocbs);
656 if (rc == -1) {
657 HandleError(StringPrintf("failed to submit write requests: %s", strerror(errno)));
658 return;
659 } else if (rc != writes_to_submit) {
660 LOG(FATAL) << "failed to submit all writes: wanted to submit " << writes_to_submit
661 << ", actually submitted " << rc;
662 }
663 }
664
CancelWritesUsbFfsConnection665 void CancelWrites() {
666 std::lock_guard<std::mutex> lock(write_mutex_);
667 for (size_t i = 0; i < writes_submitted_; ++i) {
668 struct io_event res;
669 if (write_requests_[i].pending == true) {
670 LOG(INFO) << "cancelling pending write# " << i;
671 io_cancel(aio_context_.get(), &write_requests_[i].control, &res);
672 }
673 }
674 }
675
HandleErrorUsbFfsConnection676 void HandleError(const std::string& error) {
677 std::call_once(error_flag_, [&]() {
678 if (transport_) {
679 transport_->HandleError(error);
680 }
681
682 if (!stopped_) {
683 Stop();
684 }
685 });
686 }
687
688 std::thread monitor_thread_;
689
690 bool worker_started_;
691 std::thread worker_thread_;
692
693 std::atomic<bool> stopped_;
694 std::promise<void> destruction_notifier_;
695 std::once_flag error_flag_;
696
697 unique_fd worker_event_fd_;
698 unique_fd monitor_event_fd_;
699
700 ScopedAioContext aio_context_;
701 unique_fd control_fd_;
702 unique_fd read_fd_;
703 unique_fd write_fd_;
704
705 bool connection_started_ = false;
706 APacketReader packet_reader_;
707
708 std::array<IoReadBlock, kUsbReadQueueDepth> read_requests_;
709 IOVector read_data_;
710
711 // ID of the next request that we're going to send out.
712 size_t next_read_id_ = 0;
713
714 // ID of the next packet we're waiting for.
715 size_t needed_read_id_ = 0;
716
717 std::mutex write_mutex_;
718 std::deque<IoWriteBlock> write_requests_ GUARDED_BY(write_mutex_);
719 size_t next_write_id_ GUARDED_BY(write_mutex_) = 0;
720 size_t writes_submitted_ GUARDED_BY(write_mutex_) = 0;
721
722 static constexpr int kInterruptionSignal = SIGUSR1;
723 };
724
usb_ffs_open_thread()725 static void usb_ffs_open_thread() {
726 adb_thread_setname("usb ffs open");
727
728 // When the device is acting as a USB host, we'll be unable to bind to the USB gadget on kernels
729 // that don't carry a downstream patch to enable that behavior.
730 //
731 // This property is copied from vendor.sys.usb.adb.disabled by an init.rc script.
732 //
733 // Note that this property only disables rebinding the USB gadget: setting it while an interface
734 // is already bound will do nothing.
735 static const char* kPropertyUsbDisabled = "sys.usb.adb.disabled";
736 PropertyMonitor prop_mon;
737 prop_mon.Add(kPropertyUsbDisabled, [](std::string value) {
738 // Return false (i.e. break out of PropertyMonitor::Run) when the property != 1.
739 return android::base::ParseBool(value) == android::base::ParseBoolResult::kTrue;
740 });
741
742 while (true) {
743 unique_fd control;
744 unique_fd bulk_out;
745 unique_fd bulk_in;
746 if (!open_functionfs(&control, &bulk_out, &bulk_in)) {
747 std::this_thread::sleep_for(1s);
748 continue;
749 }
750
751 if (android::base::GetBoolProperty(kPropertyUsbDisabled, false)) {
752 LOG(INFO) << "pausing USB due to " << kPropertyUsbDisabled;
753 prop_mon.Run();
754 LOG(INFO) << "resuming USB";
755 }
756
757 atransport* transport = new atransport(kTransportUsb);
758 transport->serial = "UsbFfs";
759 std::promise<void> destruction_notifier;
760 std::future<void> future = destruction_notifier.get_future();
761 transport->SetConnection(std::make_unique<UsbFfsConnection>(
762 std::move(control), std::move(bulk_out), std::move(bulk_in),
763 std::move(destruction_notifier)));
764 register_transport(transport);
765 future.wait();
766 }
767 }
768
usb_init()769 void usb_init() {
770 std::thread(usb_ffs_open_thread).detach();
771 }
772