xref: /aosp_15_r20/external/crosvm/cros_async/src/sys/linux/uring_executor.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1 // Copyright 2020 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 // TODO: Move this doc to one of the public APIs, it isn't io_uring specific.
6 
7 //! `UringReactor`
8 //!
9 //! ## Read/Write buffer management.
10 //!
11 //! There are two key issues managing asynchronous IO buffers in rust.
12 //! 1) The kernel has a mutable reference to the memory until the completion is returned. Rust must
13 //!    not have any references to it during that time.
14 //! 2) The memory must remain valid as long as the kernel has a reference to it.
15 //!
16 //! ### The kernel's mutable borrow of the buffer
17 //!
18 //! Because the buffers used for read and write must be passed to the kernel for an unknown
19 //! duration, the functions must maintain ownership of the memory.  The core of this problem is that
20 //! the lifetime of the future isn't tied to the scope in which the kernel can modify the buffer the
21 //! future has a reference to.  The buffer can be modified at any point from submission until the
22 //! operation completes. The operation can't be synchronously canceled when the future is dropped,
23 //! and Drop can't be used for safety guarantees. To ensure this never happens, only memory that
24 //! implements `BackingMemory` is accepted.  For implementors of `BackingMemory` the mut borrow
25 //! isn't an issue because those are already Ok with external modifications to the memory (Like a
26 //! `VolatileSlice`).
27 //!
28 //! ### Buffer lifetime
29 //!
30 //! What if the kernel's reference to the buffer outlives the buffer itself?  This could happen if a
31 //! read operation was submitted, then the memory is dropped.  To solve this, the executor takes an
32 //! Arc to the backing memory. Vecs being read to are also wrapped in an Arc before being passed to
33 //! the executor.  The executor holds the Arc and ensures all operations are complete before
34 //! dropping it, that guarantees the memory is valid for the duration.
35 //!
36 //! The buffers _have_ to be on the heap. Because we don't have a way to cancel a future if it is
37 //! dropped(can't rely on drop running), there is no way to ensure the kernel's buffer remains valid
38 //! until the operation completes unless the executor holds an Arc to the memory on the heap.
39 //!
40 //! ## Using `Vec` for reads/writes.
41 //!
42 //! There is a convenience wrapper `VecIoWrapper` provided for fully owned vectors. This type
43 //! ensures that only the kernel is allowed to access the `Vec` and wraps the the `Vec` in an Arc to
44 //! ensure it lives long enough.
45 
46 use std::convert::TryInto;
47 use std::ffi::CStr;
48 use std::fs::File;
49 use std::future::Future;
50 use std::io;
51 use std::mem;
52 use std::mem::MaybeUninit;
53 use std::os::unix::io::FromRawFd;
54 use std::os::unix::io::RawFd;
55 use std::pin::Pin;
56 use std::sync::Arc;
57 use std::sync::Weak;
58 use std::task::Context;
59 use std::task::Poll;
60 use std::task::Waker;
61 use std::thread;
62 use std::thread::ThreadId;
63 
64 use base::trace;
65 use base::warn;
66 use base::AsRawDescriptor;
67 use base::EventType;
68 use base::IoBufMut;
69 use base::RawDescriptor;
70 use io_uring::URingAllowlist;
71 use io_uring::URingContext;
72 use io_uring::URingOperation;
73 use once_cell::sync::Lazy;
74 use remain::sorted;
75 use slab::Slab;
76 use sync::Mutex;
77 use thiserror::Error as ThisError;
78 
79 use crate::common_executor::RawExecutor;
80 use crate::common_executor::RawTaskHandle;
81 use crate::common_executor::Reactor;
82 use crate::mem::BackingMemory;
83 use crate::waker::WakerToken;
84 use crate::waker::WeakWake;
85 use crate::AsyncError;
86 use crate::AsyncResult;
87 use crate::IoSource;
88 use crate::MemRegion;
89 use crate::TaskHandle;
90 
91 #[sorted]
92 #[derive(Debug, ThisError)]
93 pub enum Error {
94     /// Creating a context to wait on FDs failed.
95     #[error("Error creating the fd waiting context: {0}")]
96     CreatingContext(io_uring::Error),
97     /// Failed to discard a block
98     #[error("Failed to discard a block: {0}")]
99     Discard(base::Error),
100     /// Failed to copy the FD for the polling context.
101     #[error("Failed to copy the FD for the polling context: {0}")]
102     DuplicatingFd(base::Error),
103     /// Enabling a context faild.
104     #[error("Error enabling the URing context: {0}")]
105     EnablingContext(io_uring::Error),
106     /// The Executor is gone.
107     #[error("The executor is gone")]
108     ExecutorGone,
109     /// Invalid offset or length given for an iovec in backing memory.
110     #[error("Invalid offset/len for getting an iovec")]
111     InvalidOffset,
112     /// Invalid FD source specified.
113     #[error("Invalid source, FD not registered for use")]
114     InvalidSource,
115     /// Error doing the IO.
116     #[error("Error during IO: {0}")]
117     Io(io::Error),
118     /// Registering operation restrictions to a uring failed.
119     #[error("Error registering restrictions to the URing context: {0}")]
120     RegisteringURingRestriction(io_uring::Error),
121     /// Failed to remove the waker remove the polling context.
122     #[error("Error removing from the URing context: {0}")]
123     RemovingWaker(io_uring::Error),
124     /// Failed to submit the operation to the polling context.
125     #[error("Error adding to the URing context: {0}")]
126     SubmittingOp(io_uring::Error),
127     /// URingContext failure.
128     #[error("URingContext failure: {0}")]
129     URingContextError(io_uring::Error),
130     /// Failed to submit or wait for io_uring events.
131     #[error("URing::enter: {0}")]
132     URingEnter(io_uring::Error),
133 }
134 pub type Result<T> = std::result::Result<T, Error>;
135 
136 impl From<Error> for io::Error {
from(e: Error) -> Self137     fn from(e: Error) -> Self {
138         use Error::*;
139         match e {
140             Discard(e) => e.into(),
141             DuplicatingFd(e) => e.into(),
142             ExecutorGone => io::Error::new(io::ErrorKind::Other, ExecutorGone),
143             InvalidOffset => io::Error::new(io::ErrorKind::InvalidInput, InvalidOffset),
144             InvalidSource => io::Error::new(io::ErrorKind::InvalidData, InvalidSource),
145             Io(e) => e,
146             CreatingContext(e) => e.into(),
147             RemovingWaker(e) => e.into(),
148             SubmittingOp(e) => e.into(),
149             URingContextError(e) => e.into(),
150             URingEnter(e) => e.into(),
151             EnablingContext(e) => e.into(),
152             RegisteringURingRestriction(e) => e.into(),
153         }
154     }
155 }
156 
157 impl From<Error> for AsyncError {
from(e: Error) -> AsyncError158     fn from(e: Error) -> AsyncError {
159         AsyncError::SysVariants(e.into())
160     }
161 }
162 
163 static IS_URING_STABLE: Lazy<bool> = Lazy::new(|| {
164     let mut utsname = MaybeUninit::zeroed();
165 
166     // SAFETY:
167     // Safe because this will only modify `utsname` and we check the return value.
168     let res = unsafe { libc::uname(utsname.as_mut_ptr()) };
169     if res < 0 {
170         return false;
171     }
172 
173     // SAFETY:
174     // Safe because the kernel has initialized `utsname`.
175     let utsname = unsafe { utsname.assume_init() };
176 
177     // SAFETY:
178     // Safe because the pointer is valid and the kernel guarantees that this is a valid C string.
179     let release = unsafe { CStr::from_ptr(utsname.release.as_ptr()) };
180 
181     let mut components = match release.to_str().map(|r| r.split('.').map(str::parse)) {
182         Ok(c) => c,
183         Err(_) => return false,
184     };
185 
186     // Kernels older than 5.10 either didn't support io_uring or had bugs in the implementation.
187     match (components.next(), components.next()) {
188         (Some(Ok(major)), Some(Ok(minor))) if (major, minor) >= (5, 10) => {
189             // The kernel version is new enough so check if we can actually make a uring context.
190             URingContext::new(8, None).is_ok()
191         }
192         _ => false,
193     }
194 });
195 
196 // Checks if the uring executor is stable.
197 // Caches the result so that the check is only run once.
198 // Useful for falling back to the FD executor on pre-uring kernels.
is_uring_stable() -> bool199 pub fn is_uring_stable() -> bool {
200     *IS_URING_STABLE
201 }
202 
203 // Checks the uring availability by checking if the uring creation succeeds.
204 // If uring creation succeeds, it returns `Ok(())`. It returns an `URingContextError` otherwise.
205 // It fails if the kernel does not support io_uring, but note that the cause is not limited to it.
check_uring_availability() -> Result<()>206 pub(crate) fn check_uring_availability() -> Result<()> {
207     URingContext::new(8, None)
208         .map(drop)
209         .map_err(Error::URingContextError)
210 }
211 
212 pub struct RegisteredSource {
213     tag: usize,
214     ex: Weak<RawExecutor<UringReactor>>,
215 }
216 
217 impl RegisteredSource {
start_read_to_mem( &self, file_offset: Option<u64>, mem: Arc<dyn BackingMemory + Send + Sync>, addrs: impl IntoIterator<Item = MemRegion>, ) -> Result<PendingOperation>218     pub fn start_read_to_mem(
219         &self,
220         file_offset: Option<u64>,
221         mem: Arc<dyn BackingMemory + Send + Sync>,
222         addrs: impl IntoIterator<Item = MemRegion>,
223     ) -> Result<PendingOperation> {
224         let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
225         let token = ex
226             .reactor
227             .submit_read_to_vectored(self, mem, file_offset, addrs)?;
228 
229         Ok(PendingOperation {
230             waker_token: Some(token),
231             ex: self.ex.clone(),
232             submitted: false,
233         })
234     }
235 
start_write_from_mem( &self, file_offset: Option<u64>, mem: Arc<dyn BackingMemory + Send + Sync>, addrs: impl IntoIterator<Item = MemRegion>, ) -> Result<PendingOperation>236     pub fn start_write_from_mem(
237         &self,
238         file_offset: Option<u64>,
239         mem: Arc<dyn BackingMemory + Send + Sync>,
240         addrs: impl IntoIterator<Item = MemRegion>,
241     ) -> Result<PendingOperation> {
242         let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
243         let token = ex
244             .reactor
245             .submit_write_from_vectored(self, mem, file_offset, addrs)?;
246 
247         Ok(PendingOperation {
248             waker_token: Some(token),
249             ex: self.ex.clone(),
250             submitted: false,
251         })
252     }
253 
start_fallocate(&self, offset: u64, len: u64, mode: u32) -> Result<PendingOperation>254     pub fn start_fallocate(&self, offset: u64, len: u64, mode: u32) -> Result<PendingOperation> {
255         let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
256         let token = ex.reactor.submit_fallocate(self, offset, len, mode)?;
257 
258         Ok(PendingOperation {
259             waker_token: Some(token),
260             ex: self.ex.clone(),
261             submitted: false,
262         })
263     }
264 
start_fsync(&self) -> Result<PendingOperation>265     pub fn start_fsync(&self) -> Result<PendingOperation> {
266         let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
267         let token = ex.reactor.submit_fsync(self)?;
268 
269         Ok(PendingOperation {
270             waker_token: Some(token),
271             ex: self.ex.clone(),
272             submitted: false,
273         })
274     }
275 
poll_fd_readable(&self) -> Result<PendingOperation>276     pub fn poll_fd_readable(&self) -> Result<PendingOperation> {
277         let events = EventType::Read;
278 
279         let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
280         let token = ex.reactor.submit_poll(self, events)?;
281 
282         Ok(PendingOperation {
283             waker_token: Some(token),
284             ex: self.ex.clone(),
285             submitted: false,
286         })
287     }
288 }
289 
290 impl Drop for RegisteredSource {
drop(&mut self)291     fn drop(&mut self) {
292         if let Some(ex) = self.ex.upgrade() {
293             ex.reactor.deregister_source(self);
294         }
295     }
296 }
297 
298 // Number of entries in the ring.
299 const NUM_ENTRIES: usize = 256;
300 
301 // An operation that has been submitted to the uring and is potentially being waited on.
302 struct OpData {
303     _file: Arc<File>,
304     _mem: Option<Arc<dyn BackingMemory + Send + Sync>>,
305     waker: Option<Waker>,
306     canceled: bool,
307 }
308 
309 // The current status of an operation that's been submitted to the uring.
310 enum OpStatus {
311     Nop,
312     Pending(OpData),
313     Completed(Option<::std::io::Result<u32>>),
314 }
315 
316 struct Ring {
317     ops: Slab<OpStatus>,
318     registered_sources: Slab<Arc<File>>,
319 }
320 
321 /// `Reactor` that manages async IO work using io_uring.
322 pub struct UringReactor {
323     // The URingContext needs to be first so that it is dropped first, closing the uring fd, and
324     // releasing the resources borrowed by the kernel before we free them.
325     ctx: URingContext,
326     ring: Mutex<Ring>,
327     thread_id: Mutex<Option<ThreadId>>,
328 }
329 
330 impl UringReactor {
new() -> Result<UringReactor>331     fn new() -> Result<UringReactor> {
332         // Allow operations only that the UringReactor really submits to enhance the security.
333         let mut restrictions = URingAllowlist::new();
334         let ops = [
335             URingOperation::Writev,
336             URingOperation::Readv,
337             URingOperation::Nop,
338             URingOperation::Fsync,
339             URingOperation::Fallocate,
340             URingOperation::PollAdd,
341             URingOperation::PollRemove,
342             URingOperation::AsyncCancel,
343         ];
344         for op in ops {
345             restrictions.allow_submit_operation(op);
346         }
347 
348         let ctx =
349             URingContext::new(NUM_ENTRIES, Some(&restrictions)).map_err(Error::CreatingContext)?;
350 
351         Ok(UringReactor {
352             ctx,
353             ring: Mutex::new(Ring {
354                 ops: Slab::with_capacity(NUM_ENTRIES),
355                 registered_sources: Slab::with_capacity(NUM_ENTRIES),
356             }),
357             thread_id: Mutex::new(None),
358         })
359     }
360 
runs_tasks_on_current_thread(&self) -> bool361     fn runs_tasks_on_current_thread(&self) -> bool {
362         let executor_thread = self.thread_id.lock();
363         executor_thread
364             .map(|id| id == thread::current().id())
365             .unwrap_or(false)
366     }
367 
get_result(&self, token: &WakerToken, cx: &mut Context) -> Option<io::Result<u32>>368     fn get_result(&self, token: &WakerToken, cx: &mut Context) -> Option<io::Result<u32>> {
369         let mut ring = self.ring.lock();
370 
371         let op = ring
372             .ops
373             .get_mut(token.0)
374             .expect("`get_result` called on unknown operation");
375         match op {
376             OpStatus::Nop => panic!("`get_result` called on nop"),
377             OpStatus::Pending(data) => {
378                 if data.canceled {
379                     panic!("`get_result` called on canceled operation");
380                 }
381                 data.waker = Some(cx.waker().clone());
382                 None
383             }
384             OpStatus::Completed(res) => {
385                 let out = res.take();
386                 ring.ops.remove(token.0);
387                 Some(out.expect("Missing result in completed operation"))
388             }
389         }
390     }
391 
392     // Remove the waker for the given token if it hasn't fired yet.
cancel_operation(&self, token: WakerToken)393     fn cancel_operation(&self, token: WakerToken) {
394         let mut ring = self.ring.lock();
395         let submit_cancel = if let Some(op) = ring.ops.get_mut(token.0) {
396             match op {
397                 OpStatus::Nop => panic!("`cancel_operation` called on nop"),
398                 OpStatus::Pending(data) => {
399                     if data.canceled {
400                         panic!("uring operation canceled more than once");
401                     }
402 
403                     if let Some(waker) = data.waker.take() {
404                         waker.wake();
405                     }
406                     // Clear the waker as it is no longer needed.
407                     data.waker = None;
408                     data.canceled = true;
409 
410                     // Keep the rest of the op data as the uring might still be accessing either
411                     // the source of the backing memory so it needs to live until the kernel
412                     // completes the operation.
413                     true
414                 }
415                 OpStatus::Completed(_) => {
416                     ring.ops.remove(token.0);
417                     false
418                 }
419             }
420         } else {
421             false
422         };
423         std::mem::drop(ring);
424         if submit_cancel {
425             let _best_effort = self.submit_cancel_async(token.0);
426         }
427     }
428 
register_source<F: AsRawDescriptor>( &self, raw: &Arc<RawExecutor<UringReactor>>, fd: &F, ) -> Result<RegisteredSource>429     pub(crate) fn register_source<F: AsRawDescriptor>(
430         &self,
431         raw: &Arc<RawExecutor<UringReactor>>,
432         fd: &F,
433     ) -> Result<RegisteredSource> {
434         // SAFETY:
435         // Safe because duplicating an FD doesn't affect memory safety, and the dup'd FD
436         // will only be added to the poll loop.
437         let duped_fd = unsafe { File::from_raw_fd(dup_fd(fd.as_raw_descriptor())?) };
438 
439         Ok(RegisteredSource {
440             tag: self
441                 .ring
442                 .lock()
443                 .registered_sources
444                 .insert(Arc::new(duped_fd)),
445             ex: Arc::downgrade(raw),
446         })
447     }
448 
deregister_source(&self, source: &RegisteredSource)449     fn deregister_source(&self, source: &RegisteredSource) {
450         // There isn't any need to pull pending ops out, the all have Arc's to the file and mem they
451         // need.let them complete. deregister with pending ops is not a common path no need to
452         // optimize that case yet.
453         self.ring.lock().registered_sources.remove(source.tag);
454     }
455 
submit_poll( &self, source: &RegisteredSource, events: base::EventType, ) -> Result<WakerToken>456     fn submit_poll(
457         &self,
458         source: &RegisteredSource,
459         events: base::EventType,
460     ) -> Result<WakerToken> {
461         let mut ring = self.ring.lock();
462         let src = ring
463             .registered_sources
464             .get(source.tag)
465             .ok_or(Error::InvalidSource)?
466             .clone();
467         let entry = ring.ops.vacant_entry();
468         let next_op_token = entry.key();
469         self.ctx
470             .add_poll_fd(src.as_raw_descriptor(), events, usize_to_u64(next_op_token))
471             .map_err(Error::SubmittingOp)?;
472         entry.insert(OpStatus::Pending(OpData {
473             _file: src,
474             _mem: None,
475             waker: None,
476             canceled: false,
477         }));
478 
479         Ok(WakerToken(next_op_token))
480     }
481 
submit_fallocate( &self, source: &RegisteredSource, offset: u64, len: u64, mode: u32, ) -> Result<WakerToken>482     fn submit_fallocate(
483         &self,
484         source: &RegisteredSource,
485         offset: u64,
486         len: u64,
487         mode: u32,
488     ) -> Result<WakerToken> {
489         let mut ring = self.ring.lock();
490         let src = ring
491             .registered_sources
492             .get(source.tag)
493             .ok_or(Error::InvalidSource)?
494             .clone();
495         let entry = ring.ops.vacant_entry();
496         let next_op_token = entry.key();
497         self.ctx
498             .add_fallocate(
499                 src.as_raw_descriptor(),
500                 offset,
501                 len,
502                 mode,
503                 usize_to_u64(next_op_token),
504             )
505             .map_err(Error::SubmittingOp)?;
506 
507         entry.insert(OpStatus::Pending(OpData {
508             _file: src,
509             _mem: None,
510             waker: None,
511             canceled: false,
512         }));
513 
514         Ok(WakerToken(next_op_token))
515     }
516 
submit_cancel_async(&self, token: usize) -> Result<WakerToken>517     fn submit_cancel_async(&self, token: usize) -> Result<WakerToken> {
518         let mut ring = self.ring.lock();
519         let entry = ring.ops.vacant_entry();
520         let next_op_token = entry.key();
521         self.ctx
522             .async_cancel(usize_to_u64(token), usize_to_u64(next_op_token))
523             .map_err(Error::SubmittingOp)?;
524 
525         entry.insert(OpStatus::Nop);
526 
527         Ok(WakerToken(next_op_token))
528     }
529 
submit_fsync(&self, source: &RegisteredSource) -> Result<WakerToken>530     fn submit_fsync(&self, source: &RegisteredSource) -> Result<WakerToken> {
531         let mut ring = self.ring.lock();
532         let src = ring
533             .registered_sources
534             .get(source.tag)
535             .ok_or(Error::InvalidSource)?
536             .clone();
537         let entry = ring.ops.vacant_entry();
538         let next_op_token = entry.key();
539         self.ctx
540             .add_fsync(src.as_raw_descriptor(), usize_to_u64(next_op_token))
541             .map_err(Error::SubmittingOp)?;
542         entry.insert(OpStatus::Pending(OpData {
543             _file: src,
544             _mem: None,
545             waker: None,
546             canceled: false,
547         }));
548 
549         Ok(WakerToken(next_op_token))
550     }
551 
submit_read_to_vectored( &self, source: &RegisteredSource, mem: Arc<dyn BackingMemory + Send + Sync>, offset: Option<u64>, addrs: impl IntoIterator<Item = MemRegion>, ) -> Result<WakerToken>552     fn submit_read_to_vectored(
553         &self,
554         source: &RegisteredSource,
555         mem: Arc<dyn BackingMemory + Send + Sync>,
556         offset: Option<u64>,
557         addrs: impl IntoIterator<Item = MemRegion>,
558     ) -> Result<WakerToken> {
559         let iovecs = addrs
560             .into_iter()
561             .map(|mem_range| {
562                 let vslice = mem
563                     .get_volatile_slice(mem_range)
564                     .map_err(|_| Error::InvalidOffset)?;
565                 // SAFETY:
566                 // Safe because we guarantee that the memory pointed to by `iovecs` lives until the
567                 // transaction is complete and the completion has been returned from `wait()`.
568                 Ok(unsafe { IoBufMut::from_raw_parts(vslice.as_mut_ptr(), vslice.size()) })
569             })
570             .collect::<Result<Vec<_>>>()?;
571         let iovecs = Pin::from(iovecs.into_boxed_slice());
572 
573         let mut ring = self.ring.lock();
574         let src = ring
575             .registered_sources
576             .get(source.tag)
577             .ok_or(Error::InvalidSource)?
578             .clone();
579 
580         let entry = ring.ops.vacant_entry();
581         let next_op_token = entry.key();
582 
583         // SAFETY:
584         // Safe because all the addresses are within the Memory that an Arc is kept for the
585         // duration to ensure the memory is valid while the kernel accesses it.
586         // Tested by `dont_drop_backing_mem_read` unit test.
587         unsafe {
588             self.ctx
589                 .add_readv(
590                     iovecs,
591                     src.as_raw_descriptor(),
592                     offset,
593                     usize_to_u64(next_op_token),
594                 )
595                 .map_err(Error::SubmittingOp)?;
596         }
597 
598         entry.insert(OpStatus::Pending(OpData {
599             _file: src,
600             _mem: Some(mem),
601             waker: None,
602             canceled: false,
603         }));
604 
605         Ok(WakerToken(next_op_token))
606     }
607 
submit_write_from_vectored( &self, source: &RegisteredSource, mem: Arc<dyn BackingMemory + Send + Sync>, offset: Option<u64>, addrs: impl IntoIterator<Item = MemRegion>, ) -> Result<WakerToken>608     fn submit_write_from_vectored(
609         &self,
610         source: &RegisteredSource,
611         mem: Arc<dyn BackingMemory + Send + Sync>,
612         offset: Option<u64>,
613         addrs: impl IntoIterator<Item = MemRegion>,
614     ) -> Result<WakerToken> {
615         let iovecs = addrs
616             .into_iter()
617             .map(|mem_range| {
618                 let vslice = mem
619                     .get_volatile_slice(mem_range)
620                     .map_err(|_| Error::InvalidOffset)?;
621                 // SAFETY:
622                 // Safe because we guarantee that the memory pointed to by `iovecs` lives until the
623                 // transaction is complete and the completion has been returned from `wait()`.
624                 Ok(unsafe { IoBufMut::from_raw_parts(vslice.as_mut_ptr(), vslice.size()) })
625             })
626             .collect::<Result<Vec<_>>>()?;
627         let iovecs = Pin::from(iovecs.into_boxed_slice());
628 
629         let mut ring = self.ring.lock();
630         let src = ring
631             .registered_sources
632             .get(source.tag)
633             .ok_or(Error::InvalidSource)?
634             .clone();
635 
636         let entry = ring.ops.vacant_entry();
637         let next_op_token = entry.key();
638 
639         // SAFETY:
640         // Safe because all the addresses are within the Memory that an Arc is kept for the
641         // duration to ensure the memory is valid while the kernel accesses it.
642         // Tested by `dont_drop_backing_mem_write` unit test.
643         unsafe {
644             self.ctx
645                 .add_writev(
646                     iovecs,
647                     src.as_raw_descriptor(),
648                     offset,
649                     usize_to_u64(next_op_token),
650                 )
651                 .map_err(Error::SubmittingOp)?;
652         }
653 
654         entry.insert(OpStatus::Pending(OpData {
655             _file: src,
656             _mem: Some(mem),
657             waker: None,
658             canceled: false,
659         }));
660 
661         Ok(WakerToken(next_op_token))
662     }
663 }
664 
665 impl Reactor for UringReactor {
new() -> std::io::Result<Self>666     fn new() -> std::io::Result<Self> {
667         Ok(UringReactor::new()?)
668     }
669 
wake(&self)670     fn wake(&self) {
671         let mut ring = self.ring.lock();
672         let entry = ring.ops.vacant_entry();
673         let next_op_token = entry.key();
674         if let Err(e) = self.ctx.add_nop(usize_to_u64(next_op_token)) {
675             warn!("Failed to add NOP for waking up executor: {}", e);
676         }
677         entry.insert(OpStatus::Nop);
678         mem::drop(ring);
679 
680         match self.ctx.submit() {
681             Ok(()) => {}
682             // If the kernel's submit ring is full then we know we won't block when calling
683             // io_uring_enter, which is all we really care about.
684             Err(io_uring::Error::RingEnter(libc::EBUSY)) => {}
685             Err(e) => warn!("Failed to submit NOP for waking up executor: {}", e),
686         }
687     }
688 
on_executor_drop<'a>(&'a self) -> Pin<Box<dyn Future<Output = ()> + 'a>>689     fn on_executor_drop<'a>(&'a self) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
690         // At this point, there are no strong references to the executor (see `on_executor_drop`
691         // docs). That means all the `RegisteredSource::ex` will fail to upgrade and so no more IO
692         // work can be submitted.
693 
694         // Submit cancellations for all operations
695         #[allow(clippy::needless_collect)]
696         let ops: Vec<_> = self
697             .ring
698             .lock()
699             .ops
700             .iter_mut()
701             .filter_map(|op| match op.1 {
702                 OpStatus::Pending(data) if !data.canceled => Some(op.0),
703                 _ => None,
704             })
705             .collect();
706         for token in ops {
707             self.cancel_operation(WakerToken(token));
708         }
709 
710         // Since the UringReactor is wrapped in an Arc it may end up being dropped from a different
711         // thread than the one that called `run` or `run_until`. Since we know there are no other
712         // references, just clear the thread id so that we don't panic.
713         *self.thread_id.lock() = None;
714 
715         // Make sure all pending uring operations are completed as kernel may try to write to
716         // memory that we may drop.
717         //
718         // This future doesn't use the waker, it assumes the future will always be polled after
719         // processing other woken futures.
720         // TODO: Find a more robust solution.
721         Box::pin(futures::future::poll_fn(|_cx| {
722             if self.ring.lock().ops.is_empty() {
723                 Poll::Ready(())
724             } else {
725                 Poll::Pending
726             }
727         }))
728     }
729 
on_thread_start(&self)730     fn on_thread_start(&self) {
731         let current_thread = thread::current().id();
732         let mut thread_id = self.thread_id.lock();
733         assert_eq!(
734             *thread_id.get_or_insert(current_thread),
735             current_thread,
736             "`UringReactor::wait_for_work` cannot be called from more than one thread"
737         );
738     }
739 
wait_for_work(&self, set_processing: impl Fn()) -> std::io::Result<()>740     fn wait_for_work(&self, set_processing: impl Fn()) -> std::io::Result<()> {
741         trace!(
742             "Waiting on events, {} pending ops",
743             self.ring.lock().ops.len()
744         );
745         let events = self.ctx.wait().map_err(Error::URingEnter)?;
746 
747         // Set the state back to PROCESSING to prevent any tasks woken up by the loop below from
748         // writing to the eventfd.
749         set_processing();
750 
751         let mut ring = self.ring.lock();
752         for (raw_token, result) in events {
753             // While the `expect()` might fail on arbitrary `u64`s, the `raw_token` was
754             // something that we originally gave to the kernel and that was created from a
755             // `usize` so we should always be able to convert it back into a `usize`.
756             let token = raw_token
757                 .try_into()
758                 .expect("`u64` doesn't fit inside a `usize`");
759 
760             let op = ring
761                 .ops
762                 .get_mut(token)
763                 .expect("Received completion token for unexpected operation");
764             match mem::replace(op, OpStatus::Completed(Some(result))) {
765                 // No one is waiting on a Nop.
766                 OpStatus::Nop => mem::drop(ring.ops.remove(token)),
767                 OpStatus::Pending(data) => {
768                     if data.canceled {
769                         // No one is waiting for this operation and the uring is done with
770                         // it so it's safe to remove.
771                         ring.ops.remove(token);
772                     }
773                     if let Some(waker) = data.waker {
774                         waker.wake();
775                     }
776                 }
777                 OpStatus::Completed(_) => panic!("uring operation completed more than once"),
778             }
779         }
780 
781         Ok(())
782     }
783 
new_source<F: AsRawDescriptor>( &self, ex: &Arc<RawExecutor<Self>>, f: F, ) -> AsyncResult<IoSource<F>>784     fn new_source<F: AsRawDescriptor>(
785         &self,
786         ex: &Arc<RawExecutor<Self>>,
787         f: F,
788     ) -> AsyncResult<IoSource<F>> {
789         Ok(IoSource::Uring(super::UringSource::new(f, ex)?))
790     }
791 
wrap_task_handle<R>(task: RawTaskHandle<UringReactor, R>) -> TaskHandle<R>792     fn wrap_task_handle<R>(task: RawTaskHandle<UringReactor, R>) -> TaskHandle<R> {
793         TaskHandle::Uring(task)
794     }
795 }
796 
797 impl AsRawDescriptor for UringReactor {
as_raw_descriptor(&self) -> RawDescriptor798     fn as_raw_descriptor(&self) -> RawDescriptor {
799         self.ctx.as_raw_descriptor()
800     }
801 }
802 
803 impl WeakWake for UringReactor {
wake_by_ref(weak_self: &Weak<Self>)804     fn wake_by_ref(weak_self: &Weak<Self>) {
805         if let Some(arc_self) = weak_self.upgrade() {
806             Reactor::wake(&*arc_self);
807         }
808     }
809 }
810 
811 impl Drop for UringReactor {
drop(&mut self)812     fn drop(&mut self) {
813         // The ring should have been drained when the executor's Drop ran.
814         assert!(self.ring.lock().ops.is_empty());
815     }
816 }
817 
818 // SAFETY:
819 // Used to dup the FDs passed to the executor so there is a guarantee they aren't closed while
820 // waiting in TLS to be added to the main polling context.
dup_fd(fd: RawFd) -> Result<RawFd>821 unsafe fn dup_fd(fd: RawFd) -> Result<RawFd> {
822     let ret = libc::fcntl(fd, libc::F_DUPFD_CLOEXEC, 0);
823     if ret < 0 {
824         Err(Error::DuplicatingFd(base::Error::last()))
825     } else {
826         Ok(ret)
827     }
828 }
829 
830 // Converts a `usize` into a `u64` and panics if the conversion fails.
831 #[inline]
usize_to_u64(val: usize) -> u64832 fn usize_to_u64(val: usize) -> u64 {
833     val.try_into().expect("`usize` doesn't fit inside a `u64`")
834 }
835 
836 pub struct PendingOperation {
837     waker_token: Option<WakerToken>,
838     ex: Weak<RawExecutor<UringReactor>>,
839     submitted: bool,
840 }
841 
842 impl Future for PendingOperation {
843     type Output = Result<u32>;
844 
poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>845     fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
846         let token = self
847             .waker_token
848             .as_ref()
849             .expect("PendingOperation polled after returning Poll::Ready");
850         if let Some(ex) = self.ex.upgrade() {
851             if let Some(result) = ex.reactor.get_result(token, cx) {
852                 self.waker_token = None;
853                 Poll::Ready(result.map_err(Error::Io))
854             } else {
855                 // If we haven't submitted the operation yet, and the executor runs on a different
856                 // thread then submit it now. Otherwise the executor will submit it automatically
857                 // the next time it calls UringContext::wait.
858                 if !self.submitted && !ex.reactor.runs_tasks_on_current_thread() {
859                     match ex.reactor.ctx.submit() {
860                         Ok(()) => self.submitted = true,
861                         // If the kernel ring is full then wait until some ops are removed from the
862                         // completion queue. This op should get submitted the next time the executor
863                         // calls UringContext::wait.
864                         Err(io_uring::Error::RingEnter(libc::EBUSY)) => {}
865                         Err(e) => return Poll::Ready(Err(Error::URingEnter(e))),
866                     }
867                 }
868                 Poll::Pending
869             }
870         } else {
871             Poll::Ready(Err(Error::ExecutorGone))
872         }
873     }
874 }
875 
876 impl Drop for PendingOperation {
drop(&mut self)877     fn drop(&mut self) {
878         if let Some(waker_token) = self.waker_token.take() {
879             if let Some(ex) = self.ex.upgrade() {
880                 ex.reactor.cancel_operation(waker_token);
881             }
882         }
883     }
884 }
885 
886 #[cfg(test)]
887 mod tests {
888     use std::future::Future;
889     use std::io::Read;
890     use std::io::Write;
891     use std::mem;
892     use std::pin::Pin;
893     use std::rc::Rc;
894     use std::task::Context;
895     use std::task::Poll;
896 
897     use futures::executor::block_on;
898 
899     use super::*;
900     use crate::mem::BackingMemory;
901     use crate::mem::MemRegion;
902     use crate::mem::VecIoWrapper;
903     use crate::BlockingPool;
904     use crate::ExecutorTrait;
905 
906     // A future that returns ready when the uring queue is empty.
907     struct UringQueueEmpty<'a> {
908         ex: &'a Arc<RawExecutor<UringReactor>>,
909     }
910 
911     impl<'a> Future for UringQueueEmpty<'a> {
912         type Output = ();
913 
poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output>914         fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
915             if self.ex.reactor.ring.lock().ops.is_empty() {
916                 Poll::Ready(())
917             } else {
918                 Poll::Pending
919             }
920         }
921     }
922 
923     #[test]
dont_drop_backing_mem_read()924     fn dont_drop_backing_mem_read() {
925         if !is_uring_stable() {
926             return;
927         }
928 
929         // Create a backing memory wrapped in an Arc and check that the drop isn't called while the
930         // op is pending.
931         let bm =
932             Arc::new(VecIoWrapper::from(vec![0u8; 4096])) as Arc<dyn BackingMemory + Send + Sync>;
933 
934         // Use pipes to create a future that will block forever.
935         let (rx, mut tx) = base::pipe().unwrap();
936 
937         // Set up the TLS for the uring_executor by creating one.
938         let ex = RawExecutor::<UringReactor>::new().unwrap();
939 
940         // Register the receive side of the pipe with the executor.
941         let registered_source = ex
942             .reactor
943             .register_source(&ex, &rx)
944             .expect("register source failed");
945 
946         // Submit the op to the kernel. Next, test that the source keeps its Arc open for the
947         // duration of the op.
948         let pending_op = registered_source
949             .start_read_to_mem(None, Arc::clone(&bm), [MemRegion { offset: 0, len: 8 }])
950             .expect("failed to start read to mem");
951 
952         // Here the Arc count must be two, one for `bm` and one to signify that the kernel has a
953         // reference while the op is active.
954         assert_eq!(Arc::strong_count(&bm), 2);
955 
956         // Dropping the operation shouldn't reduce the Arc count, as the kernel could still be using
957         // it.
958         drop(pending_op);
959         assert_eq!(Arc::strong_count(&bm), 2);
960 
961         // Finishing the operation should put the Arc count back to 1.
962         // write to the pipe to wake the read pipe and then wait for the uring result in the
963         // executor.
964         tx.write_all(&[0u8; 8]).expect("write failed");
965         ex.run_until(UringQueueEmpty { ex: &ex })
966             .expect("Failed to wait for read pipe ready");
967         assert_eq!(Arc::strong_count(&bm), 1);
968     }
969 
970     #[test]
dont_drop_backing_mem_write()971     fn dont_drop_backing_mem_write() {
972         if !is_uring_stable() {
973             return;
974         }
975 
976         // Create a backing memory wrapped in an Arc and check that the drop isn't called while the
977         // op is pending.
978         let bm =
979             Arc::new(VecIoWrapper::from(vec![0u8; 4096])) as Arc<dyn BackingMemory + Send + Sync>;
980 
981         // Use pipes to create a future that will block forever.
982         let (mut rx, tx) = base::new_pipe_full().expect("Pipe failed");
983 
984         // Set up the TLS for the uring_executor by creating one.
985         let ex = RawExecutor::<UringReactor>::new().unwrap();
986 
987         // Register the receive side of the pipe with the executor.
988         let registered_source = ex
989             .reactor
990             .register_source(&ex, &tx)
991             .expect("register source failed");
992 
993         // Submit the op to the kernel. Next, test that the source keeps its Arc open for the
994         // duration of the op.
995         let pending_op = registered_source
996             .start_write_from_mem(None, Arc::clone(&bm), [MemRegion { offset: 0, len: 8 }])
997             .expect("failed to start write to mem");
998 
999         // Here the Arc count must be two, one for `bm` and one to signify that the kernel has a
1000         // reference while the op is active.
1001         assert_eq!(Arc::strong_count(&bm), 2);
1002 
1003         // Dropping the operation shouldn't reduce the Arc count, as the kernel could still be using
1004         // it.
1005         drop(pending_op);
1006         assert_eq!(Arc::strong_count(&bm), 2);
1007 
1008         // Finishing the operation should put the Arc count back to 1.
1009         // write to the pipe to wake the read pipe and then wait for the uring result in the
1010         // executor.
1011         let mut buf = vec![0u8; base::round_up_to_page_size(1)];
1012         rx.read_exact(&mut buf).expect("read to empty failed");
1013         ex.run_until(UringQueueEmpty { ex: &ex })
1014             .expect("Failed to wait for write pipe ready");
1015         assert_eq!(Arc::strong_count(&bm), 1);
1016     }
1017 
1018     #[test]
canceled_before_completion()1019     fn canceled_before_completion() {
1020         if !is_uring_stable() {
1021             return;
1022         }
1023 
1024         async fn cancel_io(op: PendingOperation) {
1025             mem::drop(op);
1026         }
1027 
1028         async fn check_result(op: PendingOperation, expected: u32) {
1029             let actual = op.await.expect("operation failed to complete");
1030             assert_eq!(expected, actual);
1031         }
1032 
1033         let bm =
1034             Arc::new(VecIoWrapper::from(vec![0u8; 16])) as Arc<dyn BackingMemory + Send + Sync>;
1035 
1036         let (rx, tx) = base::pipe().expect("Pipe failed");
1037 
1038         let ex = RawExecutor::<UringReactor>::new().unwrap();
1039 
1040         let rx_source = ex
1041             .reactor
1042             .register_source(&ex, &rx)
1043             .expect("register source failed");
1044         let tx_source = ex
1045             .reactor
1046             .register_source(&ex, &tx)
1047             .expect("register source failed");
1048 
1049         let read_task = rx_source
1050             .start_read_to_mem(None, Arc::clone(&bm), [MemRegion { offset: 0, len: 8 }])
1051             .expect("failed to start read to mem");
1052 
1053         ex.spawn_local(cancel_io(read_task)).detach();
1054 
1055         // Write to the pipe so that the kernel operation will complete.
1056         let buf =
1057             Arc::new(VecIoWrapper::from(vec![0xc2u8; 16])) as Arc<dyn BackingMemory + Send + Sync>;
1058         let write_task = tx_source
1059             .start_write_from_mem(None, Arc::clone(&buf), [MemRegion { offset: 0, len: 8 }])
1060             .expect("failed to start write from mem");
1061 
1062         ex.run_until(check_result(write_task, 8))
1063             .expect("Failed to run executor");
1064     }
1065 
1066     // We will drain all ops on drop and its not guaranteed that operation won't finish
1067     #[ignore]
1068     #[test]
drop_before_completion()1069     fn drop_before_completion() {
1070         if !is_uring_stable() {
1071             return;
1072         }
1073 
1074         const VALUE: u64 = 0xef6c_a8df_b842_eb9c;
1075 
1076         async fn check_op(op: PendingOperation) {
1077             let err = op.await.expect_err("Op completed successfully");
1078             match err {
1079                 Error::ExecutorGone => {}
1080                 e => panic!("Unexpected error from op: {}", e),
1081             }
1082         }
1083 
1084         let (mut rx, mut tx) = base::pipe().expect("Pipe failed");
1085 
1086         let ex = RawExecutor::<UringReactor>::new().unwrap();
1087 
1088         let tx_source = ex
1089             .reactor
1090             .register_source(&ex, &tx)
1091             .expect("Failed to register source");
1092         let bm = Arc::new(VecIoWrapper::from(VALUE.to_ne_bytes().to_vec()));
1093         let op = tx_source
1094             .start_write_from_mem(
1095                 None,
1096                 bm,
1097                 [MemRegion {
1098                     offset: 0,
1099                     len: mem::size_of::<u64>(),
1100                 }],
1101             )
1102             .expect("Failed to start write from mem");
1103 
1104         ex.spawn_local(check_op(op)).detach();
1105 
1106         // Now drop the executor. It shouldn't run the write operation.
1107         mem::drop(ex);
1108 
1109         // Make sure the executor did not complete the uring operation.
1110         let new_val = [0x2e; 8];
1111         tx.write_all(&new_val).unwrap();
1112 
1113         let mut buf = 0u64.to_ne_bytes();
1114         rx.read_exact(&mut buf[..])
1115             .expect("Failed to read from pipe");
1116 
1117         assert_eq!(buf, new_val);
1118     }
1119 
1120     // Dropping a task that owns a BlockingPool shouldn't leak the pool.
1121     #[test]
drop_detached_blocking_pool()1122     fn drop_detached_blocking_pool() {
1123         if !is_uring_stable() {
1124             return;
1125         }
1126 
1127         struct Cleanup(BlockingPool);
1128 
1129         impl Drop for Cleanup {
1130             fn drop(&mut self) {
1131                 // Make sure we shutdown cleanly (BlockingPool::drop just prints a warning).
1132                 self.0
1133                     .shutdown(Some(
1134                         std::time::Instant::now() + std::time::Duration::from_secs(1),
1135                     ))
1136                     .unwrap();
1137             }
1138         }
1139 
1140         let rc = Rc::new(std::cell::Cell::new(0));
1141         {
1142             let ex = RawExecutor::<UringReactor>::new().unwrap();
1143             let rc_clone = rc.clone();
1144             ex.spawn_local(async move {
1145                 rc_clone.set(1);
1146                 let pool = Cleanup(BlockingPool::new(1, std::time::Duration::new(60, 0)));
1147                 let (send, recv) = std::sync::mpsc::sync_channel::<()>(0);
1148                 // Spawn a blocking task.
1149                 let blocking_task = pool.0.spawn(move || {
1150                     // Rendezvous.
1151                     assert_eq!(recv.recv(), Ok(()));
1152                     // Wait for drop.
1153                     assert_eq!(recv.recv(), Err(std::sync::mpsc::RecvError));
1154                 });
1155                 // Make sure it has actually started (using a "rendezvous channel" send).
1156                 //
1157                 // Without this step, we'll have a race where we can shutdown the blocking pool
1158                 // before the worker thread pops off the task.
1159                 send.send(()).unwrap();
1160                 // Wait for it to finish
1161                 blocking_task.await;
1162                 rc_clone.set(2);
1163             })
1164             .detach();
1165             ex.run_until(async {}).unwrap();
1166             // `ex` is dropped here. If everything is working as expected, it should drop all of
1167             // its tasks, including `send` and `pool` (in that order, which is important). `pool`'s
1168             // `Drop` impl will try to join all the worker threads, which should work because send
1169             // half of the channel closed.
1170         }
1171         assert_eq!(rc.get(), 1);
1172         Rc::try_unwrap(rc).expect("Rc had too many refs");
1173     }
1174 
1175     #[test]
drop_on_different_thread()1176     fn drop_on_different_thread() {
1177         if !is_uring_stable() {
1178             return;
1179         }
1180 
1181         let ex = RawExecutor::<UringReactor>::new().unwrap();
1182 
1183         let ex2 = ex.clone();
1184         let t = thread::spawn(move || ex2.run_until(async {}));
1185 
1186         t.join().unwrap().unwrap();
1187 
1188         // Leave an uncompleted operation in the queue so that the drop impl will try to drive it to
1189         // completion.
1190         let (_rx, tx) = base::pipe().expect("Pipe failed");
1191         let tx = ex
1192             .reactor
1193             .register_source(&ex, &tx)
1194             .expect("Failed to register source");
1195         let bm = Arc::new(VecIoWrapper::from(0xf2e96u64.to_ne_bytes().to_vec()));
1196         let op = tx
1197             .start_write_from_mem(
1198                 None,
1199                 bm,
1200                 [MemRegion {
1201                     offset: 0,
1202                     len: mem::size_of::<u64>(),
1203                 }],
1204             )
1205             .expect("Failed to start write from mem");
1206 
1207         mem::drop(ex);
1208 
1209         match block_on(op).expect_err("Pending operation completed after executor was dropped") {
1210             Error::ExecutorGone => {}
1211             e => panic!("Unexpected error after dropping executor: {}", e),
1212         }
1213     }
1214 }
1215