1 // Copyright 2020 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 // TODO: Move this doc to one of the public APIs, it isn't io_uring specific.
6
7 //! `UringReactor`
8 //!
9 //! ## Read/Write buffer management.
10 //!
11 //! There are two key issues managing asynchronous IO buffers in rust.
12 //! 1) The kernel has a mutable reference to the memory until the completion is returned. Rust must
13 //! not have any references to it during that time.
14 //! 2) The memory must remain valid as long as the kernel has a reference to it.
15 //!
16 //! ### The kernel's mutable borrow of the buffer
17 //!
18 //! Because the buffers used for read and write must be passed to the kernel for an unknown
19 //! duration, the functions must maintain ownership of the memory. The core of this problem is that
20 //! the lifetime of the future isn't tied to the scope in which the kernel can modify the buffer the
21 //! future has a reference to. The buffer can be modified at any point from submission until the
22 //! operation completes. The operation can't be synchronously canceled when the future is dropped,
23 //! and Drop can't be used for safety guarantees. To ensure this never happens, only memory that
24 //! implements `BackingMemory` is accepted. For implementors of `BackingMemory` the mut borrow
25 //! isn't an issue because those are already Ok with external modifications to the memory (Like a
26 //! `VolatileSlice`).
27 //!
28 //! ### Buffer lifetime
29 //!
30 //! What if the kernel's reference to the buffer outlives the buffer itself? This could happen if a
31 //! read operation was submitted, then the memory is dropped. To solve this, the executor takes an
32 //! Arc to the backing memory. Vecs being read to are also wrapped in an Arc before being passed to
33 //! the executor. The executor holds the Arc and ensures all operations are complete before
34 //! dropping it, that guarantees the memory is valid for the duration.
35 //!
36 //! The buffers _have_ to be on the heap. Because we don't have a way to cancel a future if it is
37 //! dropped(can't rely on drop running), there is no way to ensure the kernel's buffer remains valid
38 //! until the operation completes unless the executor holds an Arc to the memory on the heap.
39 //!
40 //! ## Using `Vec` for reads/writes.
41 //!
42 //! There is a convenience wrapper `VecIoWrapper` provided for fully owned vectors. This type
43 //! ensures that only the kernel is allowed to access the `Vec` and wraps the the `Vec` in an Arc to
44 //! ensure it lives long enough.
45
46 use std::convert::TryInto;
47 use std::ffi::CStr;
48 use std::fs::File;
49 use std::future::Future;
50 use std::io;
51 use std::mem;
52 use std::mem::MaybeUninit;
53 use std::os::unix::io::FromRawFd;
54 use std::os::unix::io::RawFd;
55 use std::pin::Pin;
56 use std::sync::Arc;
57 use std::sync::Weak;
58 use std::task::Context;
59 use std::task::Poll;
60 use std::task::Waker;
61 use std::thread;
62 use std::thread::ThreadId;
63
64 use base::trace;
65 use base::warn;
66 use base::AsRawDescriptor;
67 use base::EventType;
68 use base::IoBufMut;
69 use base::RawDescriptor;
70 use io_uring::URingAllowlist;
71 use io_uring::URingContext;
72 use io_uring::URingOperation;
73 use once_cell::sync::Lazy;
74 use remain::sorted;
75 use slab::Slab;
76 use sync::Mutex;
77 use thiserror::Error as ThisError;
78
79 use crate::common_executor::RawExecutor;
80 use crate::common_executor::RawTaskHandle;
81 use crate::common_executor::Reactor;
82 use crate::mem::BackingMemory;
83 use crate::waker::WakerToken;
84 use crate::waker::WeakWake;
85 use crate::AsyncError;
86 use crate::AsyncResult;
87 use crate::IoSource;
88 use crate::MemRegion;
89 use crate::TaskHandle;
90
91 #[sorted]
92 #[derive(Debug, ThisError)]
93 pub enum Error {
94 /// Creating a context to wait on FDs failed.
95 #[error("Error creating the fd waiting context: {0}")]
96 CreatingContext(io_uring::Error),
97 /// Failed to discard a block
98 #[error("Failed to discard a block: {0}")]
99 Discard(base::Error),
100 /// Failed to copy the FD for the polling context.
101 #[error("Failed to copy the FD for the polling context: {0}")]
102 DuplicatingFd(base::Error),
103 /// Enabling a context faild.
104 #[error("Error enabling the URing context: {0}")]
105 EnablingContext(io_uring::Error),
106 /// The Executor is gone.
107 #[error("The executor is gone")]
108 ExecutorGone,
109 /// Invalid offset or length given for an iovec in backing memory.
110 #[error("Invalid offset/len for getting an iovec")]
111 InvalidOffset,
112 /// Invalid FD source specified.
113 #[error("Invalid source, FD not registered for use")]
114 InvalidSource,
115 /// Error doing the IO.
116 #[error("Error during IO: {0}")]
117 Io(io::Error),
118 /// Registering operation restrictions to a uring failed.
119 #[error("Error registering restrictions to the URing context: {0}")]
120 RegisteringURingRestriction(io_uring::Error),
121 /// Failed to remove the waker remove the polling context.
122 #[error("Error removing from the URing context: {0}")]
123 RemovingWaker(io_uring::Error),
124 /// Failed to submit the operation to the polling context.
125 #[error("Error adding to the URing context: {0}")]
126 SubmittingOp(io_uring::Error),
127 /// URingContext failure.
128 #[error("URingContext failure: {0}")]
129 URingContextError(io_uring::Error),
130 /// Failed to submit or wait for io_uring events.
131 #[error("URing::enter: {0}")]
132 URingEnter(io_uring::Error),
133 }
134 pub type Result<T> = std::result::Result<T, Error>;
135
136 impl From<Error> for io::Error {
from(e: Error) -> Self137 fn from(e: Error) -> Self {
138 use Error::*;
139 match e {
140 Discard(e) => e.into(),
141 DuplicatingFd(e) => e.into(),
142 ExecutorGone => io::Error::new(io::ErrorKind::Other, ExecutorGone),
143 InvalidOffset => io::Error::new(io::ErrorKind::InvalidInput, InvalidOffset),
144 InvalidSource => io::Error::new(io::ErrorKind::InvalidData, InvalidSource),
145 Io(e) => e,
146 CreatingContext(e) => e.into(),
147 RemovingWaker(e) => e.into(),
148 SubmittingOp(e) => e.into(),
149 URingContextError(e) => e.into(),
150 URingEnter(e) => e.into(),
151 EnablingContext(e) => e.into(),
152 RegisteringURingRestriction(e) => e.into(),
153 }
154 }
155 }
156
157 impl From<Error> for AsyncError {
from(e: Error) -> AsyncError158 fn from(e: Error) -> AsyncError {
159 AsyncError::SysVariants(e.into())
160 }
161 }
162
163 static IS_URING_STABLE: Lazy<bool> = Lazy::new(|| {
164 let mut utsname = MaybeUninit::zeroed();
165
166 // SAFETY:
167 // Safe because this will only modify `utsname` and we check the return value.
168 let res = unsafe { libc::uname(utsname.as_mut_ptr()) };
169 if res < 0 {
170 return false;
171 }
172
173 // SAFETY:
174 // Safe because the kernel has initialized `utsname`.
175 let utsname = unsafe { utsname.assume_init() };
176
177 // SAFETY:
178 // Safe because the pointer is valid and the kernel guarantees that this is a valid C string.
179 let release = unsafe { CStr::from_ptr(utsname.release.as_ptr()) };
180
181 let mut components = match release.to_str().map(|r| r.split('.').map(str::parse)) {
182 Ok(c) => c,
183 Err(_) => return false,
184 };
185
186 // Kernels older than 5.10 either didn't support io_uring or had bugs in the implementation.
187 match (components.next(), components.next()) {
188 (Some(Ok(major)), Some(Ok(minor))) if (major, minor) >= (5, 10) => {
189 // The kernel version is new enough so check if we can actually make a uring context.
190 URingContext::new(8, None).is_ok()
191 }
192 _ => false,
193 }
194 });
195
196 // Checks if the uring executor is stable.
197 // Caches the result so that the check is only run once.
198 // Useful for falling back to the FD executor on pre-uring kernels.
is_uring_stable() -> bool199 pub fn is_uring_stable() -> bool {
200 *IS_URING_STABLE
201 }
202
203 // Checks the uring availability by checking if the uring creation succeeds.
204 // If uring creation succeeds, it returns `Ok(())`. It returns an `URingContextError` otherwise.
205 // It fails if the kernel does not support io_uring, but note that the cause is not limited to it.
check_uring_availability() -> Result<()>206 pub(crate) fn check_uring_availability() -> Result<()> {
207 URingContext::new(8, None)
208 .map(drop)
209 .map_err(Error::URingContextError)
210 }
211
212 pub struct RegisteredSource {
213 tag: usize,
214 ex: Weak<RawExecutor<UringReactor>>,
215 }
216
217 impl RegisteredSource {
start_read_to_mem( &self, file_offset: Option<u64>, mem: Arc<dyn BackingMemory + Send + Sync>, addrs: impl IntoIterator<Item = MemRegion>, ) -> Result<PendingOperation>218 pub fn start_read_to_mem(
219 &self,
220 file_offset: Option<u64>,
221 mem: Arc<dyn BackingMemory + Send + Sync>,
222 addrs: impl IntoIterator<Item = MemRegion>,
223 ) -> Result<PendingOperation> {
224 let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
225 let token = ex
226 .reactor
227 .submit_read_to_vectored(self, mem, file_offset, addrs)?;
228
229 Ok(PendingOperation {
230 waker_token: Some(token),
231 ex: self.ex.clone(),
232 submitted: false,
233 })
234 }
235
start_write_from_mem( &self, file_offset: Option<u64>, mem: Arc<dyn BackingMemory + Send + Sync>, addrs: impl IntoIterator<Item = MemRegion>, ) -> Result<PendingOperation>236 pub fn start_write_from_mem(
237 &self,
238 file_offset: Option<u64>,
239 mem: Arc<dyn BackingMemory + Send + Sync>,
240 addrs: impl IntoIterator<Item = MemRegion>,
241 ) -> Result<PendingOperation> {
242 let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
243 let token = ex
244 .reactor
245 .submit_write_from_vectored(self, mem, file_offset, addrs)?;
246
247 Ok(PendingOperation {
248 waker_token: Some(token),
249 ex: self.ex.clone(),
250 submitted: false,
251 })
252 }
253
start_fallocate(&self, offset: u64, len: u64, mode: u32) -> Result<PendingOperation>254 pub fn start_fallocate(&self, offset: u64, len: u64, mode: u32) -> Result<PendingOperation> {
255 let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
256 let token = ex.reactor.submit_fallocate(self, offset, len, mode)?;
257
258 Ok(PendingOperation {
259 waker_token: Some(token),
260 ex: self.ex.clone(),
261 submitted: false,
262 })
263 }
264
start_fsync(&self) -> Result<PendingOperation>265 pub fn start_fsync(&self) -> Result<PendingOperation> {
266 let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
267 let token = ex.reactor.submit_fsync(self)?;
268
269 Ok(PendingOperation {
270 waker_token: Some(token),
271 ex: self.ex.clone(),
272 submitted: false,
273 })
274 }
275
poll_fd_readable(&self) -> Result<PendingOperation>276 pub fn poll_fd_readable(&self) -> Result<PendingOperation> {
277 let events = EventType::Read;
278
279 let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
280 let token = ex.reactor.submit_poll(self, events)?;
281
282 Ok(PendingOperation {
283 waker_token: Some(token),
284 ex: self.ex.clone(),
285 submitted: false,
286 })
287 }
288 }
289
290 impl Drop for RegisteredSource {
drop(&mut self)291 fn drop(&mut self) {
292 if let Some(ex) = self.ex.upgrade() {
293 ex.reactor.deregister_source(self);
294 }
295 }
296 }
297
298 // Number of entries in the ring.
299 const NUM_ENTRIES: usize = 256;
300
301 // An operation that has been submitted to the uring and is potentially being waited on.
302 struct OpData {
303 _file: Arc<File>,
304 _mem: Option<Arc<dyn BackingMemory + Send + Sync>>,
305 waker: Option<Waker>,
306 canceled: bool,
307 }
308
309 // The current status of an operation that's been submitted to the uring.
310 enum OpStatus {
311 Nop,
312 Pending(OpData),
313 Completed(Option<::std::io::Result<u32>>),
314 }
315
316 struct Ring {
317 ops: Slab<OpStatus>,
318 registered_sources: Slab<Arc<File>>,
319 }
320
321 /// `Reactor` that manages async IO work using io_uring.
322 pub struct UringReactor {
323 // The URingContext needs to be first so that it is dropped first, closing the uring fd, and
324 // releasing the resources borrowed by the kernel before we free them.
325 ctx: URingContext,
326 ring: Mutex<Ring>,
327 thread_id: Mutex<Option<ThreadId>>,
328 }
329
330 impl UringReactor {
new() -> Result<UringReactor>331 fn new() -> Result<UringReactor> {
332 // Allow operations only that the UringReactor really submits to enhance the security.
333 let mut restrictions = URingAllowlist::new();
334 let ops = [
335 URingOperation::Writev,
336 URingOperation::Readv,
337 URingOperation::Nop,
338 URingOperation::Fsync,
339 URingOperation::Fallocate,
340 URingOperation::PollAdd,
341 URingOperation::PollRemove,
342 URingOperation::AsyncCancel,
343 ];
344 for op in ops {
345 restrictions.allow_submit_operation(op);
346 }
347
348 let ctx =
349 URingContext::new(NUM_ENTRIES, Some(&restrictions)).map_err(Error::CreatingContext)?;
350
351 Ok(UringReactor {
352 ctx,
353 ring: Mutex::new(Ring {
354 ops: Slab::with_capacity(NUM_ENTRIES),
355 registered_sources: Slab::with_capacity(NUM_ENTRIES),
356 }),
357 thread_id: Mutex::new(None),
358 })
359 }
360
runs_tasks_on_current_thread(&self) -> bool361 fn runs_tasks_on_current_thread(&self) -> bool {
362 let executor_thread = self.thread_id.lock();
363 executor_thread
364 .map(|id| id == thread::current().id())
365 .unwrap_or(false)
366 }
367
get_result(&self, token: &WakerToken, cx: &mut Context) -> Option<io::Result<u32>>368 fn get_result(&self, token: &WakerToken, cx: &mut Context) -> Option<io::Result<u32>> {
369 let mut ring = self.ring.lock();
370
371 let op = ring
372 .ops
373 .get_mut(token.0)
374 .expect("`get_result` called on unknown operation");
375 match op {
376 OpStatus::Nop => panic!("`get_result` called on nop"),
377 OpStatus::Pending(data) => {
378 if data.canceled {
379 panic!("`get_result` called on canceled operation");
380 }
381 data.waker = Some(cx.waker().clone());
382 None
383 }
384 OpStatus::Completed(res) => {
385 let out = res.take();
386 ring.ops.remove(token.0);
387 Some(out.expect("Missing result in completed operation"))
388 }
389 }
390 }
391
392 // Remove the waker for the given token if it hasn't fired yet.
cancel_operation(&self, token: WakerToken)393 fn cancel_operation(&self, token: WakerToken) {
394 let mut ring = self.ring.lock();
395 let submit_cancel = if let Some(op) = ring.ops.get_mut(token.0) {
396 match op {
397 OpStatus::Nop => panic!("`cancel_operation` called on nop"),
398 OpStatus::Pending(data) => {
399 if data.canceled {
400 panic!("uring operation canceled more than once");
401 }
402
403 if let Some(waker) = data.waker.take() {
404 waker.wake();
405 }
406 // Clear the waker as it is no longer needed.
407 data.waker = None;
408 data.canceled = true;
409
410 // Keep the rest of the op data as the uring might still be accessing either
411 // the source of the backing memory so it needs to live until the kernel
412 // completes the operation.
413 true
414 }
415 OpStatus::Completed(_) => {
416 ring.ops.remove(token.0);
417 false
418 }
419 }
420 } else {
421 false
422 };
423 std::mem::drop(ring);
424 if submit_cancel {
425 let _best_effort = self.submit_cancel_async(token.0);
426 }
427 }
428
register_source<F: AsRawDescriptor>( &self, raw: &Arc<RawExecutor<UringReactor>>, fd: &F, ) -> Result<RegisteredSource>429 pub(crate) fn register_source<F: AsRawDescriptor>(
430 &self,
431 raw: &Arc<RawExecutor<UringReactor>>,
432 fd: &F,
433 ) -> Result<RegisteredSource> {
434 // SAFETY:
435 // Safe because duplicating an FD doesn't affect memory safety, and the dup'd FD
436 // will only be added to the poll loop.
437 let duped_fd = unsafe { File::from_raw_fd(dup_fd(fd.as_raw_descriptor())?) };
438
439 Ok(RegisteredSource {
440 tag: self
441 .ring
442 .lock()
443 .registered_sources
444 .insert(Arc::new(duped_fd)),
445 ex: Arc::downgrade(raw),
446 })
447 }
448
deregister_source(&self, source: &RegisteredSource)449 fn deregister_source(&self, source: &RegisteredSource) {
450 // There isn't any need to pull pending ops out, the all have Arc's to the file and mem they
451 // need.let them complete. deregister with pending ops is not a common path no need to
452 // optimize that case yet.
453 self.ring.lock().registered_sources.remove(source.tag);
454 }
455
submit_poll( &self, source: &RegisteredSource, events: base::EventType, ) -> Result<WakerToken>456 fn submit_poll(
457 &self,
458 source: &RegisteredSource,
459 events: base::EventType,
460 ) -> Result<WakerToken> {
461 let mut ring = self.ring.lock();
462 let src = ring
463 .registered_sources
464 .get(source.tag)
465 .ok_or(Error::InvalidSource)?
466 .clone();
467 let entry = ring.ops.vacant_entry();
468 let next_op_token = entry.key();
469 self.ctx
470 .add_poll_fd(src.as_raw_descriptor(), events, usize_to_u64(next_op_token))
471 .map_err(Error::SubmittingOp)?;
472 entry.insert(OpStatus::Pending(OpData {
473 _file: src,
474 _mem: None,
475 waker: None,
476 canceled: false,
477 }));
478
479 Ok(WakerToken(next_op_token))
480 }
481
submit_fallocate( &self, source: &RegisteredSource, offset: u64, len: u64, mode: u32, ) -> Result<WakerToken>482 fn submit_fallocate(
483 &self,
484 source: &RegisteredSource,
485 offset: u64,
486 len: u64,
487 mode: u32,
488 ) -> Result<WakerToken> {
489 let mut ring = self.ring.lock();
490 let src = ring
491 .registered_sources
492 .get(source.tag)
493 .ok_or(Error::InvalidSource)?
494 .clone();
495 let entry = ring.ops.vacant_entry();
496 let next_op_token = entry.key();
497 self.ctx
498 .add_fallocate(
499 src.as_raw_descriptor(),
500 offset,
501 len,
502 mode,
503 usize_to_u64(next_op_token),
504 )
505 .map_err(Error::SubmittingOp)?;
506
507 entry.insert(OpStatus::Pending(OpData {
508 _file: src,
509 _mem: None,
510 waker: None,
511 canceled: false,
512 }));
513
514 Ok(WakerToken(next_op_token))
515 }
516
submit_cancel_async(&self, token: usize) -> Result<WakerToken>517 fn submit_cancel_async(&self, token: usize) -> Result<WakerToken> {
518 let mut ring = self.ring.lock();
519 let entry = ring.ops.vacant_entry();
520 let next_op_token = entry.key();
521 self.ctx
522 .async_cancel(usize_to_u64(token), usize_to_u64(next_op_token))
523 .map_err(Error::SubmittingOp)?;
524
525 entry.insert(OpStatus::Nop);
526
527 Ok(WakerToken(next_op_token))
528 }
529
submit_fsync(&self, source: &RegisteredSource) -> Result<WakerToken>530 fn submit_fsync(&self, source: &RegisteredSource) -> Result<WakerToken> {
531 let mut ring = self.ring.lock();
532 let src = ring
533 .registered_sources
534 .get(source.tag)
535 .ok_or(Error::InvalidSource)?
536 .clone();
537 let entry = ring.ops.vacant_entry();
538 let next_op_token = entry.key();
539 self.ctx
540 .add_fsync(src.as_raw_descriptor(), usize_to_u64(next_op_token))
541 .map_err(Error::SubmittingOp)?;
542 entry.insert(OpStatus::Pending(OpData {
543 _file: src,
544 _mem: None,
545 waker: None,
546 canceled: false,
547 }));
548
549 Ok(WakerToken(next_op_token))
550 }
551
submit_read_to_vectored( &self, source: &RegisteredSource, mem: Arc<dyn BackingMemory + Send + Sync>, offset: Option<u64>, addrs: impl IntoIterator<Item = MemRegion>, ) -> Result<WakerToken>552 fn submit_read_to_vectored(
553 &self,
554 source: &RegisteredSource,
555 mem: Arc<dyn BackingMemory + Send + Sync>,
556 offset: Option<u64>,
557 addrs: impl IntoIterator<Item = MemRegion>,
558 ) -> Result<WakerToken> {
559 let iovecs = addrs
560 .into_iter()
561 .map(|mem_range| {
562 let vslice = mem
563 .get_volatile_slice(mem_range)
564 .map_err(|_| Error::InvalidOffset)?;
565 // SAFETY:
566 // Safe because we guarantee that the memory pointed to by `iovecs` lives until the
567 // transaction is complete and the completion has been returned from `wait()`.
568 Ok(unsafe { IoBufMut::from_raw_parts(vslice.as_mut_ptr(), vslice.size()) })
569 })
570 .collect::<Result<Vec<_>>>()?;
571 let iovecs = Pin::from(iovecs.into_boxed_slice());
572
573 let mut ring = self.ring.lock();
574 let src = ring
575 .registered_sources
576 .get(source.tag)
577 .ok_or(Error::InvalidSource)?
578 .clone();
579
580 let entry = ring.ops.vacant_entry();
581 let next_op_token = entry.key();
582
583 // SAFETY:
584 // Safe because all the addresses are within the Memory that an Arc is kept for the
585 // duration to ensure the memory is valid while the kernel accesses it.
586 // Tested by `dont_drop_backing_mem_read` unit test.
587 unsafe {
588 self.ctx
589 .add_readv(
590 iovecs,
591 src.as_raw_descriptor(),
592 offset,
593 usize_to_u64(next_op_token),
594 )
595 .map_err(Error::SubmittingOp)?;
596 }
597
598 entry.insert(OpStatus::Pending(OpData {
599 _file: src,
600 _mem: Some(mem),
601 waker: None,
602 canceled: false,
603 }));
604
605 Ok(WakerToken(next_op_token))
606 }
607
submit_write_from_vectored( &self, source: &RegisteredSource, mem: Arc<dyn BackingMemory + Send + Sync>, offset: Option<u64>, addrs: impl IntoIterator<Item = MemRegion>, ) -> Result<WakerToken>608 fn submit_write_from_vectored(
609 &self,
610 source: &RegisteredSource,
611 mem: Arc<dyn BackingMemory + Send + Sync>,
612 offset: Option<u64>,
613 addrs: impl IntoIterator<Item = MemRegion>,
614 ) -> Result<WakerToken> {
615 let iovecs = addrs
616 .into_iter()
617 .map(|mem_range| {
618 let vslice = mem
619 .get_volatile_slice(mem_range)
620 .map_err(|_| Error::InvalidOffset)?;
621 // SAFETY:
622 // Safe because we guarantee that the memory pointed to by `iovecs` lives until the
623 // transaction is complete and the completion has been returned from `wait()`.
624 Ok(unsafe { IoBufMut::from_raw_parts(vslice.as_mut_ptr(), vslice.size()) })
625 })
626 .collect::<Result<Vec<_>>>()?;
627 let iovecs = Pin::from(iovecs.into_boxed_slice());
628
629 let mut ring = self.ring.lock();
630 let src = ring
631 .registered_sources
632 .get(source.tag)
633 .ok_or(Error::InvalidSource)?
634 .clone();
635
636 let entry = ring.ops.vacant_entry();
637 let next_op_token = entry.key();
638
639 // SAFETY:
640 // Safe because all the addresses are within the Memory that an Arc is kept for the
641 // duration to ensure the memory is valid while the kernel accesses it.
642 // Tested by `dont_drop_backing_mem_write` unit test.
643 unsafe {
644 self.ctx
645 .add_writev(
646 iovecs,
647 src.as_raw_descriptor(),
648 offset,
649 usize_to_u64(next_op_token),
650 )
651 .map_err(Error::SubmittingOp)?;
652 }
653
654 entry.insert(OpStatus::Pending(OpData {
655 _file: src,
656 _mem: Some(mem),
657 waker: None,
658 canceled: false,
659 }));
660
661 Ok(WakerToken(next_op_token))
662 }
663 }
664
665 impl Reactor for UringReactor {
new() -> std::io::Result<Self>666 fn new() -> std::io::Result<Self> {
667 Ok(UringReactor::new()?)
668 }
669
wake(&self)670 fn wake(&self) {
671 let mut ring = self.ring.lock();
672 let entry = ring.ops.vacant_entry();
673 let next_op_token = entry.key();
674 if let Err(e) = self.ctx.add_nop(usize_to_u64(next_op_token)) {
675 warn!("Failed to add NOP for waking up executor: {}", e);
676 }
677 entry.insert(OpStatus::Nop);
678 mem::drop(ring);
679
680 match self.ctx.submit() {
681 Ok(()) => {}
682 // If the kernel's submit ring is full then we know we won't block when calling
683 // io_uring_enter, which is all we really care about.
684 Err(io_uring::Error::RingEnter(libc::EBUSY)) => {}
685 Err(e) => warn!("Failed to submit NOP for waking up executor: {}", e),
686 }
687 }
688
on_executor_drop<'a>(&'a self) -> Pin<Box<dyn Future<Output = ()> + 'a>>689 fn on_executor_drop<'a>(&'a self) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
690 // At this point, there are no strong references to the executor (see `on_executor_drop`
691 // docs). That means all the `RegisteredSource::ex` will fail to upgrade and so no more IO
692 // work can be submitted.
693
694 // Submit cancellations for all operations
695 #[allow(clippy::needless_collect)]
696 let ops: Vec<_> = self
697 .ring
698 .lock()
699 .ops
700 .iter_mut()
701 .filter_map(|op| match op.1 {
702 OpStatus::Pending(data) if !data.canceled => Some(op.0),
703 _ => None,
704 })
705 .collect();
706 for token in ops {
707 self.cancel_operation(WakerToken(token));
708 }
709
710 // Since the UringReactor is wrapped in an Arc it may end up being dropped from a different
711 // thread than the one that called `run` or `run_until`. Since we know there are no other
712 // references, just clear the thread id so that we don't panic.
713 *self.thread_id.lock() = None;
714
715 // Make sure all pending uring operations are completed as kernel may try to write to
716 // memory that we may drop.
717 //
718 // This future doesn't use the waker, it assumes the future will always be polled after
719 // processing other woken futures.
720 // TODO: Find a more robust solution.
721 Box::pin(futures::future::poll_fn(|_cx| {
722 if self.ring.lock().ops.is_empty() {
723 Poll::Ready(())
724 } else {
725 Poll::Pending
726 }
727 }))
728 }
729
on_thread_start(&self)730 fn on_thread_start(&self) {
731 let current_thread = thread::current().id();
732 let mut thread_id = self.thread_id.lock();
733 assert_eq!(
734 *thread_id.get_or_insert(current_thread),
735 current_thread,
736 "`UringReactor::wait_for_work` cannot be called from more than one thread"
737 );
738 }
739
wait_for_work(&self, set_processing: impl Fn()) -> std::io::Result<()>740 fn wait_for_work(&self, set_processing: impl Fn()) -> std::io::Result<()> {
741 trace!(
742 "Waiting on events, {} pending ops",
743 self.ring.lock().ops.len()
744 );
745 let events = self.ctx.wait().map_err(Error::URingEnter)?;
746
747 // Set the state back to PROCESSING to prevent any tasks woken up by the loop below from
748 // writing to the eventfd.
749 set_processing();
750
751 let mut ring = self.ring.lock();
752 for (raw_token, result) in events {
753 // While the `expect()` might fail on arbitrary `u64`s, the `raw_token` was
754 // something that we originally gave to the kernel and that was created from a
755 // `usize` so we should always be able to convert it back into a `usize`.
756 let token = raw_token
757 .try_into()
758 .expect("`u64` doesn't fit inside a `usize`");
759
760 let op = ring
761 .ops
762 .get_mut(token)
763 .expect("Received completion token for unexpected operation");
764 match mem::replace(op, OpStatus::Completed(Some(result))) {
765 // No one is waiting on a Nop.
766 OpStatus::Nop => mem::drop(ring.ops.remove(token)),
767 OpStatus::Pending(data) => {
768 if data.canceled {
769 // No one is waiting for this operation and the uring is done with
770 // it so it's safe to remove.
771 ring.ops.remove(token);
772 }
773 if let Some(waker) = data.waker {
774 waker.wake();
775 }
776 }
777 OpStatus::Completed(_) => panic!("uring operation completed more than once"),
778 }
779 }
780
781 Ok(())
782 }
783
new_source<F: AsRawDescriptor>( &self, ex: &Arc<RawExecutor<Self>>, f: F, ) -> AsyncResult<IoSource<F>>784 fn new_source<F: AsRawDescriptor>(
785 &self,
786 ex: &Arc<RawExecutor<Self>>,
787 f: F,
788 ) -> AsyncResult<IoSource<F>> {
789 Ok(IoSource::Uring(super::UringSource::new(f, ex)?))
790 }
791
wrap_task_handle<R>(task: RawTaskHandle<UringReactor, R>) -> TaskHandle<R>792 fn wrap_task_handle<R>(task: RawTaskHandle<UringReactor, R>) -> TaskHandle<R> {
793 TaskHandle::Uring(task)
794 }
795 }
796
797 impl AsRawDescriptor for UringReactor {
as_raw_descriptor(&self) -> RawDescriptor798 fn as_raw_descriptor(&self) -> RawDescriptor {
799 self.ctx.as_raw_descriptor()
800 }
801 }
802
803 impl WeakWake for UringReactor {
wake_by_ref(weak_self: &Weak<Self>)804 fn wake_by_ref(weak_self: &Weak<Self>) {
805 if let Some(arc_self) = weak_self.upgrade() {
806 Reactor::wake(&*arc_self);
807 }
808 }
809 }
810
811 impl Drop for UringReactor {
drop(&mut self)812 fn drop(&mut self) {
813 // The ring should have been drained when the executor's Drop ran.
814 assert!(self.ring.lock().ops.is_empty());
815 }
816 }
817
818 // SAFETY:
819 // Used to dup the FDs passed to the executor so there is a guarantee they aren't closed while
820 // waiting in TLS to be added to the main polling context.
dup_fd(fd: RawFd) -> Result<RawFd>821 unsafe fn dup_fd(fd: RawFd) -> Result<RawFd> {
822 let ret = libc::fcntl(fd, libc::F_DUPFD_CLOEXEC, 0);
823 if ret < 0 {
824 Err(Error::DuplicatingFd(base::Error::last()))
825 } else {
826 Ok(ret)
827 }
828 }
829
830 // Converts a `usize` into a `u64` and panics if the conversion fails.
831 #[inline]
usize_to_u64(val: usize) -> u64832 fn usize_to_u64(val: usize) -> u64 {
833 val.try_into().expect("`usize` doesn't fit inside a `u64`")
834 }
835
836 pub struct PendingOperation {
837 waker_token: Option<WakerToken>,
838 ex: Weak<RawExecutor<UringReactor>>,
839 submitted: bool,
840 }
841
842 impl Future for PendingOperation {
843 type Output = Result<u32>;
844
poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>845 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
846 let token = self
847 .waker_token
848 .as_ref()
849 .expect("PendingOperation polled after returning Poll::Ready");
850 if let Some(ex) = self.ex.upgrade() {
851 if let Some(result) = ex.reactor.get_result(token, cx) {
852 self.waker_token = None;
853 Poll::Ready(result.map_err(Error::Io))
854 } else {
855 // If we haven't submitted the operation yet, and the executor runs on a different
856 // thread then submit it now. Otherwise the executor will submit it automatically
857 // the next time it calls UringContext::wait.
858 if !self.submitted && !ex.reactor.runs_tasks_on_current_thread() {
859 match ex.reactor.ctx.submit() {
860 Ok(()) => self.submitted = true,
861 // If the kernel ring is full then wait until some ops are removed from the
862 // completion queue. This op should get submitted the next time the executor
863 // calls UringContext::wait.
864 Err(io_uring::Error::RingEnter(libc::EBUSY)) => {}
865 Err(e) => return Poll::Ready(Err(Error::URingEnter(e))),
866 }
867 }
868 Poll::Pending
869 }
870 } else {
871 Poll::Ready(Err(Error::ExecutorGone))
872 }
873 }
874 }
875
876 impl Drop for PendingOperation {
drop(&mut self)877 fn drop(&mut self) {
878 if let Some(waker_token) = self.waker_token.take() {
879 if let Some(ex) = self.ex.upgrade() {
880 ex.reactor.cancel_operation(waker_token);
881 }
882 }
883 }
884 }
885
886 #[cfg(test)]
887 mod tests {
888 use std::future::Future;
889 use std::io::Read;
890 use std::io::Write;
891 use std::mem;
892 use std::pin::Pin;
893 use std::rc::Rc;
894 use std::task::Context;
895 use std::task::Poll;
896
897 use futures::executor::block_on;
898
899 use super::*;
900 use crate::mem::BackingMemory;
901 use crate::mem::MemRegion;
902 use crate::mem::VecIoWrapper;
903 use crate::BlockingPool;
904 use crate::ExecutorTrait;
905
906 // A future that returns ready when the uring queue is empty.
907 struct UringQueueEmpty<'a> {
908 ex: &'a Arc<RawExecutor<UringReactor>>,
909 }
910
911 impl<'a> Future for UringQueueEmpty<'a> {
912 type Output = ();
913
poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output>914 fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
915 if self.ex.reactor.ring.lock().ops.is_empty() {
916 Poll::Ready(())
917 } else {
918 Poll::Pending
919 }
920 }
921 }
922
923 #[test]
dont_drop_backing_mem_read()924 fn dont_drop_backing_mem_read() {
925 if !is_uring_stable() {
926 return;
927 }
928
929 // Create a backing memory wrapped in an Arc and check that the drop isn't called while the
930 // op is pending.
931 let bm =
932 Arc::new(VecIoWrapper::from(vec![0u8; 4096])) as Arc<dyn BackingMemory + Send + Sync>;
933
934 // Use pipes to create a future that will block forever.
935 let (rx, mut tx) = base::pipe().unwrap();
936
937 // Set up the TLS for the uring_executor by creating one.
938 let ex = RawExecutor::<UringReactor>::new().unwrap();
939
940 // Register the receive side of the pipe with the executor.
941 let registered_source = ex
942 .reactor
943 .register_source(&ex, &rx)
944 .expect("register source failed");
945
946 // Submit the op to the kernel. Next, test that the source keeps its Arc open for the
947 // duration of the op.
948 let pending_op = registered_source
949 .start_read_to_mem(None, Arc::clone(&bm), [MemRegion { offset: 0, len: 8 }])
950 .expect("failed to start read to mem");
951
952 // Here the Arc count must be two, one for `bm` and one to signify that the kernel has a
953 // reference while the op is active.
954 assert_eq!(Arc::strong_count(&bm), 2);
955
956 // Dropping the operation shouldn't reduce the Arc count, as the kernel could still be using
957 // it.
958 drop(pending_op);
959 assert_eq!(Arc::strong_count(&bm), 2);
960
961 // Finishing the operation should put the Arc count back to 1.
962 // write to the pipe to wake the read pipe and then wait for the uring result in the
963 // executor.
964 tx.write_all(&[0u8; 8]).expect("write failed");
965 ex.run_until(UringQueueEmpty { ex: &ex })
966 .expect("Failed to wait for read pipe ready");
967 assert_eq!(Arc::strong_count(&bm), 1);
968 }
969
970 #[test]
dont_drop_backing_mem_write()971 fn dont_drop_backing_mem_write() {
972 if !is_uring_stable() {
973 return;
974 }
975
976 // Create a backing memory wrapped in an Arc and check that the drop isn't called while the
977 // op is pending.
978 let bm =
979 Arc::new(VecIoWrapper::from(vec![0u8; 4096])) as Arc<dyn BackingMemory + Send + Sync>;
980
981 // Use pipes to create a future that will block forever.
982 let (mut rx, tx) = base::new_pipe_full().expect("Pipe failed");
983
984 // Set up the TLS for the uring_executor by creating one.
985 let ex = RawExecutor::<UringReactor>::new().unwrap();
986
987 // Register the receive side of the pipe with the executor.
988 let registered_source = ex
989 .reactor
990 .register_source(&ex, &tx)
991 .expect("register source failed");
992
993 // Submit the op to the kernel. Next, test that the source keeps its Arc open for the
994 // duration of the op.
995 let pending_op = registered_source
996 .start_write_from_mem(None, Arc::clone(&bm), [MemRegion { offset: 0, len: 8 }])
997 .expect("failed to start write to mem");
998
999 // Here the Arc count must be two, one for `bm` and one to signify that the kernel has a
1000 // reference while the op is active.
1001 assert_eq!(Arc::strong_count(&bm), 2);
1002
1003 // Dropping the operation shouldn't reduce the Arc count, as the kernel could still be using
1004 // it.
1005 drop(pending_op);
1006 assert_eq!(Arc::strong_count(&bm), 2);
1007
1008 // Finishing the operation should put the Arc count back to 1.
1009 // write to the pipe to wake the read pipe and then wait for the uring result in the
1010 // executor.
1011 let mut buf = vec![0u8; base::round_up_to_page_size(1)];
1012 rx.read_exact(&mut buf).expect("read to empty failed");
1013 ex.run_until(UringQueueEmpty { ex: &ex })
1014 .expect("Failed to wait for write pipe ready");
1015 assert_eq!(Arc::strong_count(&bm), 1);
1016 }
1017
1018 #[test]
canceled_before_completion()1019 fn canceled_before_completion() {
1020 if !is_uring_stable() {
1021 return;
1022 }
1023
1024 async fn cancel_io(op: PendingOperation) {
1025 mem::drop(op);
1026 }
1027
1028 async fn check_result(op: PendingOperation, expected: u32) {
1029 let actual = op.await.expect("operation failed to complete");
1030 assert_eq!(expected, actual);
1031 }
1032
1033 let bm =
1034 Arc::new(VecIoWrapper::from(vec![0u8; 16])) as Arc<dyn BackingMemory + Send + Sync>;
1035
1036 let (rx, tx) = base::pipe().expect("Pipe failed");
1037
1038 let ex = RawExecutor::<UringReactor>::new().unwrap();
1039
1040 let rx_source = ex
1041 .reactor
1042 .register_source(&ex, &rx)
1043 .expect("register source failed");
1044 let tx_source = ex
1045 .reactor
1046 .register_source(&ex, &tx)
1047 .expect("register source failed");
1048
1049 let read_task = rx_source
1050 .start_read_to_mem(None, Arc::clone(&bm), [MemRegion { offset: 0, len: 8 }])
1051 .expect("failed to start read to mem");
1052
1053 ex.spawn_local(cancel_io(read_task)).detach();
1054
1055 // Write to the pipe so that the kernel operation will complete.
1056 let buf =
1057 Arc::new(VecIoWrapper::from(vec![0xc2u8; 16])) as Arc<dyn BackingMemory + Send + Sync>;
1058 let write_task = tx_source
1059 .start_write_from_mem(None, Arc::clone(&buf), [MemRegion { offset: 0, len: 8 }])
1060 .expect("failed to start write from mem");
1061
1062 ex.run_until(check_result(write_task, 8))
1063 .expect("Failed to run executor");
1064 }
1065
1066 // We will drain all ops on drop and its not guaranteed that operation won't finish
1067 #[ignore]
1068 #[test]
drop_before_completion()1069 fn drop_before_completion() {
1070 if !is_uring_stable() {
1071 return;
1072 }
1073
1074 const VALUE: u64 = 0xef6c_a8df_b842_eb9c;
1075
1076 async fn check_op(op: PendingOperation) {
1077 let err = op.await.expect_err("Op completed successfully");
1078 match err {
1079 Error::ExecutorGone => {}
1080 e => panic!("Unexpected error from op: {}", e),
1081 }
1082 }
1083
1084 let (mut rx, mut tx) = base::pipe().expect("Pipe failed");
1085
1086 let ex = RawExecutor::<UringReactor>::new().unwrap();
1087
1088 let tx_source = ex
1089 .reactor
1090 .register_source(&ex, &tx)
1091 .expect("Failed to register source");
1092 let bm = Arc::new(VecIoWrapper::from(VALUE.to_ne_bytes().to_vec()));
1093 let op = tx_source
1094 .start_write_from_mem(
1095 None,
1096 bm,
1097 [MemRegion {
1098 offset: 0,
1099 len: mem::size_of::<u64>(),
1100 }],
1101 )
1102 .expect("Failed to start write from mem");
1103
1104 ex.spawn_local(check_op(op)).detach();
1105
1106 // Now drop the executor. It shouldn't run the write operation.
1107 mem::drop(ex);
1108
1109 // Make sure the executor did not complete the uring operation.
1110 let new_val = [0x2e; 8];
1111 tx.write_all(&new_val).unwrap();
1112
1113 let mut buf = 0u64.to_ne_bytes();
1114 rx.read_exact(&mut buf[..])
1115 .expect("Failed to read from pipe");
1116
1117 assert_eq!(buf, new_val);
1118 }
1119
1120 // Dropping a task that owns a BlockingPool shouldn't leak the pool.
1121 #[test]
drop_detached_blocking_pool()1122 fn drop_detached_blocking_pool() {
1123 if !is_uring_stable() {
1124 return;
1125 }
1126
1127 struct Cleanup(BlockingPool);
1128
1129 impl Drop for Cleanup {
1130 fn drop(&mut self) {
1131 // Make sure we shutdown cleanly (BlockingPool::drop just prints a warning).
1132 self.0
1133 .shutdown(Some(
1134 std::time::Instant::now() + std::time::Duration::from_secs(1),
1135 ))
1136 .unwrap();
1137 }
1138 }
1139
1140 let rc = Rc::new(std::cell::Cell::new(0));
1141 {
1142 let ex = RawExecutor::<UringReactor>::new().unwrap();
1143 let rc_clone = rc.clone();
1144 ex.spawn_local(async move {
1145 rc_clone.set(1);
1146 let pool = Cleanup(BlockingPool::new(1, std::time::Duration::new(60, 0)));
1147 let (send, recv) = std::sync::mpsc::sync_channel::<()>(0);
1148 // Spawn a blocking task.
1149 let blocking_task = pool.0.spawn(move || {
1150 // Rendezvous.
1151 assert_eq!(recv.recv(), Ok(()));
1152 // Wait for drop.
1153 assert_eq!(recv.recv(), Err(std::sync::mpsc::RecvError));
1154 });
1155 // Make sure it has actually started (using a "rendezvous channel" send).
1156 //
1157 // Without this step, we'll have a race where we can shutdown the blocking pool
1158 // before the worker thread pops off the task.
1159 send.send(()).unwrap();
1160 // Wait for it to finish
1161 blocking_task.await;
1162 rc_clone.set(2);
1163 })
1164 .detach();
1165 ex.run_until(async {}).unwrap();
1166 // `ex` is dropped here. If everything is working as expected, it should drop all of
1167 // its tasks, including `send` and `pool` (in that order, which is important). `pool`'s
1168 // `Drop` impl will try to join all the worker threads, which should work because send
1169 // half of the channel closed.
1170 }
1171 assert_eq!(rc.get(), 1);
1172 Rc::try_unwrap(rc).expect("Rc had too many refs");
1173 }
1174
1175 #[test]
drop_on_different_thread()1176 fn drop_on_different_thread() {
1177 if !is_uring_stable() {
1178 return;
1179 }
1180
1181 let ex = RawExecutor::<UringReactor>::new().unwrap();
1182
1183 let ex2 = ex.clone();
1184 let t = thread::spawn(move || ex2.run_until(async {}));
1185
1186 t.join().unwrap().unwrap();
1187
1188 // Leave an uncompleted operation in the queue so that the drop impl will try to drive it to
1189 // completion.
1190 let (_rx, tx) = base::pipe().expect("Pipe failed");
1191 let tx = ex
1192 .reactor
1193 .register_source(&ex, &tx)
1194 .expect("Failed to register source");
1195 let bm = Arc::new(VecIoWrapper::from(0xf2e96u64.to_ne_bytes().to_vec()));
1196 let op = tx
1197 .start_write_from_mem(
1198 None,
1199 bm,
1200 [MemRegion {
1201 offset: 0,
1202 len: mem::size_of::<u64>(),
1203 }],
1204 )
1205 .expect("Failed to start write from mem");
1206
1207 mem::drop(ex);
1208
1209 match block_on(op).expect_err("Pending operation completed after executor was dropped") {
1210 Error::ExecutorGone => {}
1211 e => panic!("Unexpected error after dropping executor: {}", e),
1212 }
1213 }
1214 }
1215