xref: /aosp_15_r20/external/crosvm/io_uring/src/uring.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1 // Copyright 2020 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 // This file makes several casts from u8 pointers into more-aligned pointer types.
6 // We assume that the kernel will give us suitably aligned memory.
7 #![allow(clippy::cast_ptr_alignment)]
8 
9 use std::collections::BTreeMap;
10 use std::fs::File;
11 use std::io;
12 use std::os::unix::io::AsRawFd;
13 use std::os::unix::io::FromRawFd;
14 use std::os::unix::io::RawFd;
15 use std::pin::Pin;
16 use std::ptr::null;
17 use std::sync::atomic::AtomicPtr;
18 use std::sync::atomic::AtomicU32;
19 use std::sync::atomic::Ordering;
20 
21 use base::AsRawDescriptor;
22 use base::EventType;
23 use base::IoBufMut;
24 use base::MappedRegion;
25 use base::MemoryMapping;
26 use base::MemoryMappingBuilder;
27 use base::Protection;
28 use base::RawDescriptor;
29 use libc::c_void;
30 use remain::sorted;
31 use sync::Mutex;
32 use thiserror::Error as ThisError;
33 
34 use crate::bindings::*;
35 use crate::syscalls::*;
36 
37 /// Holds per-operation, user specified data. The usage is up to the caller. The most common use is
38 /// for callers to identify each request.
39 pub type UserData = u64;
40 
41 #[sorted]
42 #[derive(Debug, ThisError)]
43 pub enum Error {
44     /// Failed to map the completion ring.
45     #[error("Failed to mmap completion ring {0}")]
46     MappingCompleteRing(base::MmapError),
47     /// Failed to map submit entries.
48     #[error("Failed to mmap submit entries {0}")]
49     MappingSubmitEntries(base::MmapError),
50     /// Failed to map the submit ring.
51     #[error("Failed to mmap submit ring {0}")]
52     MappingSubmitRing(base::MmapError),
53     /// Too many ops are already queued.
54     #[error("No space for more ring entries, try increasing the size passed to `new`")]
55     NoSpace,
56     /// The call to `io_uring_enter` failed with the given errno.
57     #[error("Failed to enter io uring: {0}")]
58     RingEnter(libc::c_int),
59     /// The call to `io_uring_register` failed with the given errno.
60     #[error("Failed to register operations for io uring: {0}")]
61     RingRegister(libc::c_int),
62     /// The call to `io_uring_setup` failed with the given errno.
63     #[error("Failed to setup io uring {0}")]
64     Setup(libc::c_int),
65 }
66 pub type Result<T> = std::result::Result<T, Error>;
67 
68 impl From<Error> for io::Error {
from(e: Error) -> Self69     fn from(e: Error) -> Self {
70         use Error::*;
71         match e {
72             RingEnter(errno) => io::Error::from_raw_os_error(errno),
73             Setup(errno) => io::Error::from_raw_os_error(errno),
74             e => io::Error::new(io::ErrorKind::Other, e),
75         }
76     }
77 }
78 
79 pub struct SubmitQueue {
80     submit_ring: SubmitQueueState,
81     submit_queue_entries: SubmitQueueEntries,
82     submitting: usize, // The number of ops in the process of being submitted.
83     pub added: usize,  // The number of ops added since the last call to `io_uring_enter`.
84     num_sqes: usize,   // The total number of sqes allocated in shared memory.
85 }
86 
87 // Helper functions to set io_uring_sqe bindgen union members in a less verbose manner.
88 impl io_uring_sqe {
set_addr(&mut self, val: u64)89     pub fn set_addr(&mut self, val: u64) {
90         self.__bindgen_anon_2.addr = val;
91     }
set_off(&mut self, val: u64)92     pub fn set_off(&mut self, val: u64) {
93         self.__bindgen_anon_1.off = val;
94     }
95 
set_buf_index(&mut self, val: u16)96     pub fn set_buf_index(&mut self, val: u16) {
97         self.__bindgen_anon_4.buf_index = val;
98     }
99 
set_rw_flags(&mut self, val: libc::c_int)100     pub fn set_rw_flags(&mut self, val: libc::c_int) {
101         self.__bindgen_anon_3.rw_flags = val;
102     }
103 
set_poll_events(&mut self, val: u32)104     pub fn set_poll_events(&mut self, val: u32) {
105         let val = if cfg!(target_endian = "big") {
106             // Swap words on big-endian platforms to match the original ABI where poll_events was 16
107             // bits wide.
108             val.rotate_left(16)
109         } else {
110             val
111         };
112         self.__bindgen_anon_3.poll32_events = val;
113     }
114 }
115 
116 // Convert a file offset to the raw io_uring offset format.
117 // Some => explicit offset
118 // None => use current file position
file_offset_to_raw_offset(offset: Option<u64>) -> u64119 fn file_offset_to_raw_offset(offset: Option<u64>) -> u64 {
120     // File offsets are interpretted as off64_t inside io_uring, with -1 representing the current
121     // file position.
122     const USE_CURRENT_FILE_POS: libc::off64_t = -1;
123     offset.unwrap_or(USE_CURRENT_FILE_POS as u64)
124 }
125 
126 impl SubmitQueue {
127     // Call `f` with the next available sqe or return an error if none are available.
128     // After `f` returns, the sqe is appended to the kernel's queue.
prep_next_sqe<F>(&mut self, mut f: F) -> Result<()> where F: FnMut(&mut io_uring_sqe),129     fn prep_next_sqe<F>(&mut self, mut f: F) -> Result<()>
130     where
131         F: FnMut(&mut io_uring_sqe),
132     {
133         if self.added == self.num_sqes {
134             return Err(Error::NoSpace);
135         }
136 
137         // Find the next free submission entry in the submit ring and fill it with an iovec.
138         // The below raw pointer derefs are safe because the memory the pointers use lives as long
139         // as the mmap in self.
140         let tail = self.submit_ring.pointers.tail(Ordering::Relaxed);
141         let next_tail = tail.wrapping_add(1);
142         if next_tail == self.submit_ring.pointers.head(Ordering::Acquire) {
143             return Err(Error::NoSpace);
144         }
145         // `tail` is the next sqe to use.
146         let index = (tail & self.submit_ring.ring_mask) as usize;
147         let sqe = self.submit_queue_entries.get_mut(index).unwrap();
148 
149         f(sqe);
150 
151         // Tells the kernel to use the new index when processing the entry at that index.
152         self.submit_ring.set_array_entry(index, index as u32);
153         // Ensure the above writes to sqe are seen before the tail is updated.
154         // set_tail uses Release ordering when storing to the ring.
155         self.submit_ring.pointers.set_tail(next_tail);
156 
157         self.added += 1;
158 
159         Ok(())
160     }
161 
162     // Returns the number of entries that have been added to this SubmitQueue since the last time
163     // `prepare_submit` was called.
prepare_submit(&mut self) -> usize164     fn prepare_submit(&mut self) -> usize {
165         let out = self.added - self.submitting;
166         self.submitting = self.added;
167 
168         out
169     }
170 
171     // Indicates that we failed to submit `count` entries to the kernel and that they should be
172     // retried.
fail_submit(&mut self, count: usize)173     fn fail_submit(&mut self, count: usize) {
174         debug_assert!(count <= self.submitting);
175         self.submitting -= count;
176     }
177 
178     // Indicates that `count` entries have been submitted to the kernel and so the space may be
179     // reused for new entries.
complete_submit(&mut self, count: usize)180     fn complete_submit(&mut self, count: usize) {
181         debug_assert!(count <= self.submitting);
182         self.submitting -= count;
183         self.added -= count;
184     }
185 }
186 
187 /// Enum to represent all io_uring operations
188 #[repr(u32)]
189 pub enum URingOperation {
190     Nop = io_uring_op_IORING_OP_NOP,
191     Readv = io_uring_op_IORING_OP_READV,
192     Writev = io_uring_op_IORING_OP_WRITEV,
193     Fsync = io_uring_op_IORING_OP_FSYNC,
194     ReadFixed = io_uring_op_IORING_OP_READ_FIXED,
195     WriteFixed = io_uring_op_IORING_OP_WRITE_FIXED,
196     PollAdd = io_uring_op_IORING_OP_POLL_ADD,
197     PollRemove = io_uring_op_IORING_OP_POLL_REMOVE,
198     SyncFileRange = io_uring_op_IORING_OP_SYNC_FILE_RANGE,
199     Sendmsg = io_uring_op_IORING_OP_SENDMSG,
200     Recvmsg = io_uring_op_IORING_OP_RECVMSG,
201     Timeout = io_uring_op_IORING_OP_TIMEOUT,
202     TimeoutRemove = io_uring_op_IORING_OP_TIMEOUT_REMOVE,
203     Accept = io_uring_op_IORING_OP_ACCEPT,
204     AsyncCancel = io_uring_op_IORING_OP_ASYNC_CANCEL,
205     LinkTimeout = io_uring_op_IORING_OP_LINK_TIMEOUT,
206     Connect = io_uring_op_IORING_OP_CONNECT,
207     Fallocate = io_uring_op_IORING_OP_FALLOCATE,
208     Openat = io_uring_op_IORING_OP_OPENAT,
209     Close = io_uring_op_IORING_OP_CLOSE,
210     FilesUpdate = io_uring_op_IORING_OP_FILES_UPDATE,
211     Statx = io_uring_op_IORING_OP_STATX,
212     Read = io_uring_op_IORING_OP_READ,
213     Write = io_uring_op_IORING_OP_WRITE,
214     Fadvise = io_uring_op_IORING_OP_FADVISE,
215     Madvise = io_uring_op_IORING_OP_MADVISE,
216     Send = io_uring_op_IORING_OP_SEND,
217     Recv = io_uring_op_IORING_OP_RECV,
218     Openat2 = io_uring_op_IORING_OP_OPENAT2,
219     EpollCtl = io_uring_op_IORING_OP_EPOLL_CTL,
220     Splice = io_uring_op_IORING_OP_SPLICE,
221     ProvideBuffers = io_uring_op_IORING_OP_PROVIDE_BUFFERS,
222     RemoveBuffers = io_uring_op_IORING_OP_REMOVE_BUFFERS,
223     Tee = io_uring_op_IORING_OP_TEE,
224     Shutdown = io_uring_op_IORING_OP_SHUTDOWN,
225     Renameat = io_uring_op_IORING_OP_RENAMEAT,
226     Unlinkat = io_uring_op_IORING_OP_UNLINKAT,
227     Mkdirat = io_uring_op_IORING_OP_MKDIRAT,
228     Symlinkat = io_uring_op_IORING_OP_SYMLINKAT,
229     Linkat = io_uring_op_IORING_OP_LINKAT,
230 }
231 
232 /// Represents an allowlist of the restrictions to be registered to a uring.
233 #[derive(Default)]
234 pub struct URingAllowlist(Vec<io_uring_restriction>);
235 
236 impl URingAllowlist {
237     /// Create a new `UringAllowList` which allows no operation.
new() -> Self238     pub fn new() -> Self {
239         URingAllowlist::default()
240     }
241 
242     /// Allow `operation` to be submitted to the submit queue of the io_uring.
allow_submit_operation(&mut self, operation: URingOperation) -> &mut Self243     pub fn allow_submit_operation(&mut self, operation: URingOperation) -> &mut Self {
244         self.0.push(io_uring_restriction {
245             opcode: IORING_RESTRICTION_SQE_OP as u16,
246             __bindgen_anon_1: io_uring_restriction__bindgen_ty_1 {
247                 sqe_op: operation as u8,
248             },
249             ..Default::default()
250         });
251         self
252     }
253 }
254 
255 /// Unsafe wrapper for the kernel's io_uring interface. Allows for queueing multiple I/O operations
256 /// to the kernel and asynchronously handling the completion of these operations.
257 /// Use the various `add_*` functions to configure operations, then call `wait` to start
258 /// the operations and get any completed results. Each op is given a u64 user_data argument that is
259 /// used to identify the result when returned in the iterator provided by `wait`.
260 ///
261 /// # Example polling an FD for readable status.
262 ///
263 /// ```no_run
264 /// # use std::fs::File;
265 /// # use std::os::unix::io::AsRawFd;
266 /// # use std::path::Path;
267 /// # use base::EventType;
268 /// # use io_uring::URingContext;
269 /// let f = File::open(Path::new("/dev/zero")).unwrap();
270 /// let uring = URingContext::new(16, None).unwrap();
271 /// uring
272 ///   .add_poll_fd(f.as_raw_fd(), EventType::Read, 454)
273 /// .unwrap();
274 /// let (user_data, res) = uring.wait().unwrap().next().unwrap();
275 /// assert_eq!(user_data, 454 as io_uring::UserData);
276 /// assert_eq!(res.unwrap(), 1 as u32);
277 /// ```
278 pub struct URingContext {
279     ring_file: File, // Holds the io_uring context FD returned from io_uring_setup.
280     pub submit_ring: Mutex<SubmitQueue>,
281     pub complete_ring: CompleteQueueState,
282 }
283 
284 impl URingContext {
285     /// Creates a `URingContext` where the underlying uring has a space for `num_entries`
286     /// simultaneous operations. If `allowlist` is given, all operations other
287     /// than those explicitly permitted by `allowlist` are prohibited.
new(num_entries: usize, allowlist: Option<&URingAllowlist>) -> Result<URingContext>288     pub fn new(num_entries: usize, allowlist: Option<&URingAllowlist>) -> Result<URingContext> {
289         let mut ring_params = io_uring_params::default();
290         if allowlist.is_some() {
291             // To register restrictions, a uring must start in a disabled state.
292             ring_params.flags |= IORING_SETUP_R_DISABLED;
293         }
294 
295         // SAFETY:
296         // The below unsafe block isolates the creation of the URingContext. Each step on it's own
297         // is unsafe. Using the uring FD for the mapping and the offsets returned by the kernel for
298         // base addresses maintains safety guarantees assuming the kernel API guarantees are
299         // trusted.
300         unsafe {
301             // Safe because the kernel is trusted to only modify params and `File` is created with
302             // an FD that it takes complete ownership of.
303             let fd = io_uring_setup(num_entries, &ring_params).map_err(Error::Setup)?;
304             let ring_file = File::from_raw_fd(fd);
305 
306             // Register the restrictions if it's given
307             if let Some(restrictions) = allowlist {
308                 // safe because IORING_REGISTER_RESTRICTIONS does not modify the memory and
309                 // `restrictions` contains a valid pointer and length.
310                 io_uring_register(
311                     fd,
312                     IORING_REGISTER_RESTRICTIONS,
313                     restrictions.0.as_ptr() as *const c_void,
314                     restrictions.0.len() as u32,
315                 )
316                 .map_err(Error::RingRegister)?;
317 
318                 // enables the URingContext since it was started in a disabled state.
319                 // safe because IORING_REGISTER_RESTRICTIONS does not modify the memory
320                 io_uring_register(fd, IORING_REGISTER_ENABLE_RINGS, null::<c_void>(), 0)
321                     .map_err(Error::RingRegister)?;
322             }
323 
324             // Mmap the submit and completion queues.
325             // Safe because we trust the kernel to set valid sizes in `io_uring_setup` and any error
326             // is checked.
327             let submit_ring = SubmitQueueState::new(
328                 MemoryMappingBuilder::new(
329                     ring_params.sq_off.array as usize
330                         + ring_params.sq_entries as usize * std::mem::size_of::<u32>(),
331                 )
332                 .from_file(&ring_file)
333                 .offset(u64::from(IORING_OFF_SQ_RING))
334                 .protection(Protection::read_write())
335                 .populate()
336                 .build()
337                 .map_err(Error::MappingSubmitRing)?,
338                 &ring_params,
339             );
340 
341             let num_sqe = ring_params.sq_entries as usize;
342             let submit_queue_entries = SubmitQueueEntries {
343                 mmap: MemoryMappingBuilder::new(
344                     ring_params.sq_entries as usize * std::mem::size_of::<io_uring_sqe>(),
345                 )
346                 .from_file(&ring_file)
347                 .offset(u64::from(IORING_OFF_SQES))
348                 .protection(Protection::read_write())
349                 .populate()
350                 .build()
351                 .map_err(Error::MappingSubmitEntries)?,
352                 len: num_sqe,
353             };
354 
355             let complete_ring = CompleteQueueState::new(
356                 MemoryMappingBuilder::new(
357                     ring_params.cq_off.cqes as usize
358                         + ring_params.cq_entries as usize * std::mem::size_of::<io_uring_cqe>(),
359                 )
360                 .from_file(&ring_file)
361                 .offset(u64::from(IORING_OFF_CQ_RING))
362                 .protection(Protection::read_write())
363                 .populate()
364                 .build()
365                 .map_err(Error::MappingCompleteRing)?,
366                 &ring_params,
367             );
368 
369             Ok(URingContext {
370                 ring_file,
371                 submit_ring: Mutex::new(SubmitQueue {
372                     submit_ring,
373                     submit_queue_entries,
374                     submitting: 0,
375                     added: 0,
376                     num_sqes: ring_params.sq_entries as usize,
377                 }),
378                 complete_ring,
379             })
380         }
381     }
382 
383     /// # Safety
384     /// See 'writev' but accepts an iterator instead of a vector if there isn't already a vector in
385     /// existence.
add_writev_iter<I>( &self, iovecs: I, fd: RawFd, offset: Option<u64>, user_data: UserData, ) -> Result<()> where I: Iterator<Item = libc::iovec>,386     pub unsafe fn add_writev_iter<I>(
387         &self,
388         iovecs: I,
389         fd: RawFd,
390         offset: Option<u64>,
391         user_data: UserData,
392     ) -> Result<()>
393     where
394         I: Iterator<Item = libc::iovec>,
395     {
396         self.add_writev(
397             Pin::from(
398                 // Safe because the caller is required to guarantee that the memory pointed to by
399                 // `iovecs` lives until the transaction is complete and the completion has been
400                 // returned from `wait()`.
401                 iovecs
402                     .map(|iov| IoBufMut::from_raw_parts(iov.iov_base as *mut u8, iov.iov_len))
403                     .collect::<Vec<_>>()
404                     .into_boxed_slice(),
405             ),
406             fd,
407             offset,
408             user_data,
409         )
410     }
411 
412     /// Asynchronously writes to `fd` from the addresses given in `iovecs`.
413     /// # Safety
414     /// `add_writev` will write to the address given by `iovecs`. This is only safe if the caller
415     /// guarantees there are no other references to that memory and that the memory lives until the
416     /// transaction is complete and that completion has been returned from the `wait` function.  In
417     /// addition there must not be any mutable references to the data pointed to by `iovecs` until
418     /// the operation completes.  Ensure that the fd remains open until the op completes as well.
419     /// The iovecs reference must be kept alive until the op returns.
add_writev( &self, iovecs: Pin<Box<[IoBufMut<'static>]>>, fd: RawFd, offset: Option<u64>, user_data: UserData, ) -> Result<()>420     pub unsafe fn add_writev(
421         &self,
422         iovecs: Pin<Box<[IoBufMut<'static>]>>,
423         fd: RawFd,
424         offset: Option<u64>,
425         user_data: UserData,
426     ) -> Result<()> {
427         self.submit_ring.lock().prep_next_sqe(|sqe| {
428             sqe.opcode = io_uring_op_IORING_OP_WRITEV as u8;
429             sqe.set_addr(iovecs.as_ptr() as *const _ as *const libc::c_void as u64);
430             sqe.len = iovecs.len() as u32;
431             sqe.set_off(file_offset_to_raw_offset(offset));
432             sqe.set_buf_index(0);
433             sqe.ioprio = 0;
434             sqe.user_data = user_data;
435             sqe.flags = 0;
436             sqe.fd = fd;
437         })?;
438         self.complete_ring.add_op_data(user_data, iovecs);
439         Ok(())
440     }
441 
442     /// # Safety
443     /// See 'readv' but accepts an iterator instead of a vector if there isn't already a vector in
444     /// existence.
add_readv_iter<I>( &self, iovecs: I, fd: RawFd, offset: Option<u64>, user_data: UserData, ) -> Result<()> where I: Iterator<Item = libc::iovec>,445     pub unsafe fn add_readv_iter<I>(
446         &self,
447         iovecs: I,
448         fd: RawFd,
449         offset: Option<u64>,
450         user_data: UserData,
451     ) -> Result<()>
452     where
453         I: Iterator<Item = libc::iovec>,
454     {
455         self.add_readv(
456             Pin::from(
457                 // Safe because the caller is required to guarantee that the memory pointed to by
458                 // `iovecs` lives until the transaction is complete and the completion has been
459                 // returned from `wait()`.
460                 iovecs
461                     .map(|iov| IoBufMut::from_raw_parts(iov.iov_base as *mut u8, iov.iov_len))
462                     .collect::<Vec<_>>()
463                     .into_boxed_slice(),
464             ),
465             fd,
466             offset,
467             user_data,
468         )
469     }
470 
471     /// Asynchronously reads from `fd` to the addresses given in `iovecs`.
472     /// # Safety
473     /// `add_readv` will write to the address given by `iovecs`. This is only safe if the caller
474     /// guarantees there are no other references to that memory and that the memory lives until the
475     /// transaction is complete and that completion has been returned from the `wait` function.  In
476     /// addition there must not be any references to the data pointed to by `iovecs` until the
477     /// operation completes.  Ensure that the fd remains open until the op completes as well.
478     /// The iovecs reference must be kept alive until the op returns.
add_readv( &self, iovecs: Pin<Box<[IoBufMut<'static>]>>, fd: RawFd, offset: Option<u64>, user_data: UserData, ) -> Result<()>479     pub unsafe fn add_readv(
480         &self,
481         iovecs: Pin<Box<[IoBufMut<'static>]>>,
482         fd: RawFd,
483         offset: Option<u64>,
484         user_data: UserData,
485     ) -> Result<()> {
486         self.submit_ring.lock().prep_next_sqe(|sqe| {
487             sqe.opcode = io_uring_op_IORING_OP_READV as u8;
488             sqe.set_addr(iovecs.as_ptr() as *const _ as *const libc::c_void as u64);
489             sqe.len = iovecs.len() as u32;
490             sqe.set_off(file_offset_to_raw_offset(offset));
491             sqe.set_buf_index(0);
492             sqe.ioprio = 0;
493             sqe.user_data = user_data;
494             sqe.flags = 0;
495             sqe.fd = fd;
496         })?;
497         self.complete_ring.add_op_data(user_data, iovecs);
498         Ok(())
499     }
500 
501     /// Add a no-op operation that doesn't perform any IO. Useful for testing the performance of the
502     /// io_uring itself and for waking up a thread that's blocked inside a wait() call.
add_nop(&self, user_data: UserData) -> Result<()>503     pub fn add_nop(&self, user_data: UserData) -> Result<()> {
504         self.submit_ring.lock().prep_next_sqe(|sqe| {
505             sqe.opcode = io_uring_op_IORING_OP_NOP as u8;
506             sqe.fd = -1;
507             sqe.user_data = user_data;
508 
509             sqe.set_addr(0);
510             sqe.len = 0;
511             sqe.set_off(0);
512             sqe.set_buf_index(0);
513             sqe.set_rw_flags(0);
514             sqe.ioprio = 0;
515             sqe.flags = 0;
516         })
517     }
518 
519     /// Syncs all completed operations, the ordering with in-flight async ops is not
520     /// defined.
add_fsync(&self, fd: RawFd, user_data: UserData) -> Result<()>521     pub fn add_fsync(&self, fd: RawFd, user_data: UserData) -> Result<()> {
522         self.submit_ring.lock().prep_next_sqe(|sqe| {
523             sqe.opcode = io_uring_op_IORING_OP_FSYNC as u8;
524             sqe.fd = fd;
525             sqe.user_data = user_data;
526 
527             sqe.set_addr(0);
528             sqe.len = 0;
529             sqe.set_off(0);
530             sqe.set_buf_index(0);
531             sqe.set_rw_flags(0);
532             sqe.ioprio = 0;
533             sqe.flags = 0;
534         })
535     }
536 
537     /// See the usage of `fallocate`, this asynchronously performs the same operations.
add_fallocate( &self, fd: RawFd, offset: u64, len: u64, mode: u32, user_data: UserData, ) -> Result<()>538     pub fn add_fallocate(
539         &self,
540         fd: RawFd,
541         offset: u64,
542         len: u64,
543         mode: u32,
544         user_data: UserData,
545     ) -> Result<()> {
546         // Note that len for fallocate in passed in the addr field of the sqe and the mode uses the
547         // len field.
548         self.submit_ring.lock().prep_next_sqe(|sqe| {
549             sqe.opcode = io_uring_op_IORING_OP_FALLOCATE as u8;
550 
551             sqe.fd = fd;
552             sqe.set_addr(len);
553             sqe.len = mode;
554             sqe.set_off(offset);
555             sqe.user_data = user_data;
556 
557             sqe.set_buf_index(0);
558             sqe.set_rw_flags(0);
559             sqe.ioprio = 0;
560             sqe.flags = 0;
561         })
562     }
563 
564     /// Adds an FD to be polled based on the given flags.
565     /// The user must keep the FD open until the operation completion is returned from
566     /// `wait`.
567     /// Note that io_uring is always a one shot poll. After the fd is returned, it must be re-added
568     /// to get future events.
add_poll_fd(&self, fd: RawFd, events: EventType, user_data: UserData) -> Result<()>569     pub fn add_poll_fd(&self, fd: RawFd, events: EventType, user_data: UserData) -> Result<()> {
570         self.submit_ring.lock().prep_next_sqe(|sqe| {
571             sqe.opcode = io_uring_op_IORING_OP_POLL_ADD as u8;
572             sqe.fd = fd;
573             sqe.user_data = user_data;
574             sqe.set_poll_events(events.into());
575 
576             sqe.set_addr(0);
577             sqe.len = 0;
578             sqe.set_off(0);
579             sqe.set_buf_index(0);
580             sqe.ioprio = 0;
581             sqe.flags = 0;
582         })
583     }
584 
585     /// Removes an FD that was previously added with `add_poll_fd`.
remove_poll_fd(&self, fd: RawFd, events: EventType, user_data: UserData) -> Result<()>586     pub fn remove_poll_fd(&self, fd: RawFd, events: EventType, user_data: UserData) -> Result<()> {
587         self.submit_ring.lock().prep_next_sqe(|sqe| {
588             sqe.opcode = io_uring_op_IORING_OP_POLL_REMOVE as u8;
589             sqe.fd = fd;
590             sqe.user_data = user_data;
591             sqe.set_poll_events(events.into());
592 
593             sqe.set_addr(0);
594             sqe.len = 0;
595             sqe.set_off(0);
596             sqe.set_buf_index(0);
597             sqe.ioprio = 0;
598             sqe.flags = 0;
599         })
600     }
601 
602     /// Attempt to cancel an already issued request. addr must contain the user_data field of the
603     /// request that should be cancelled. The cancellation request will complete with one of the
604     /// following results codes. If found, the res field of the cqe will contain 0. If not found,
605     /// res will contain -ENOENT. If found and attempted cancelled, the res field will contain
606     /// -EALREADY. In this case, the request may or may not terminate. In general, requests that
607     /// are interruptible (like socket IO) will get cancelled, while disk IO requests cannot be
608     /// cancelled if already started.
async_cancel(&self, addr: UserData, user_data: UserData) -> Result<()>609     pub fn async_cancel(&self, addr: UserData, user_data: UserData) -> Result<()> {
610         self.submit_ring.lock().prep_next_sqe(|sqe| {
611             sqe.opcode = io_uring_op_IORING_OP_ASYNC_CANCEL as u8;
612             sqe.user_data = user_data;
613             sqe.set_addr(addr);
614 
615             sqe.len = 0;
616             sqe.fd = 0;
617             sqe.set_off(0);
618             sqe.set_buf_index(0);
619             sqe.ioprio = 0;
620             sqe.flags = 0;
621         })
622     }
623 
624     // Calls io_uring_enter, submitting any new sqes that have been added to the submit queue and
625     // waiting for `wait_nr` operations to complete.
enter(&self, wait_nr: u64) -> Result<()>626     fn enter(&self, wait_nr: u64) -> Result<()> {
627         let added = self.submit_ring.lock().prepare_submit();
628         if added == 0 && wait_nr == 0 {
629             return Ok(());
630         }
631 
632         let flags = if wait_nr > 0 {
633             IORING_ENTER_GETEVENTS
634         } else {
635             0
636         };
637         let res =
638             // SAFETY:
639             // Safe because the only memory modified is in the completion queue.
640             unsafe { io_uring_enter(self.ring_file.as_raw_fd(), added as u64, wait_nr, flags) };
641 
642         // An EINTR means we did successfully submit the events.
643         if res.is_ok() || res == Err(libc::EINTR) {
644             self.submit_ring.lock().complete_submit(added);
645         } else {
646             self.submit_ring.lock().fail_submit(added);
647         }
648 
649         match res {
650             Ok(()) => Ok(()),
651             // EBUSY means that some completed events need to be processed before more can
652             // be submitted, so wait for some sqes to finish without submitting new ones.
653             // EINTR means we were interrupted while waiting, so start waiting again.
654             Err(libc::EBUSY) | Err(libc::EINTR) if wait_nr != 0 => {
655                 loop {
656                     let res =
657                         // SAFETY:
658                         // Safe because the only memory modified is in the completion queue.
659                         unsafe { io_uring_enter(self.ring_file.as_raw_fd(), 0, wait_nr, flags) };
660                     if res != Err(libc::EINTR) {
661                         return res.map_err(Error::RingEnter);
662                     }
663                 }
664             }
665             Err(e) => Err(Error::RingEnter(e)),
666         }
667     }
668 
669     /// Sends operations added with the `add_*` functions to the kernel.
submit(&self) -> Result<()>670     pub fn submit(&self) -> Result<()> {
671         self.enter(0)
672     }
673 
674     /// Sends operations added with the `add_*` functions to the kernel and return an iterator to
675     /// any completed operations. `wait` blocks until at least one completion is ready.  If
676     /// called without any new events added, this simply waits for any existing events to
677     /// complete and returns as soon an one or more is ready.
wait(&self) -> Result<impl Iterator<Item = (UserData, std::io::Result<u32>)> + '_>678     pub fn wait(&self) -> Result<impl Iterator<Item = (UserData, std::io::Result<u32>)> + '_> {
679         // We only want to wait for events if there aren't already events in the completion queue.
680         let wait_nr = if self.complete_ring.num_ready() > 0 {
681             0
682         } else {
683             1
684         };
685 
686         // The CompletionQueue will iterate all completed ops.
687         match self.enter(wait_nr) {
688             Ok(()) => Ok(&self.complete_ring),
689             // If we cannot submit any more entries then we need to pull stuff out of the completion
690             // ring, so just return the completion ring. This can only happen when `wait_nr` is 0 so
691             // we know there are already entries in the completion queue.
692             Err(Error::RingEnter(libc::EBUSY)) => Ok(&self.complete_ring),
693             Err(e) => Err(e),
694         }
695     }
696 }
697 
698 impl AsRawFd for URingContext {
as_raw_fd(&self) -> RawFd699     fn as_raw_fd(&self) -> RawFd {
700         self.ring_file.as_raw_fd()
701     }
702 }
703 
704 impl AsRawDescriptor for URingContext {
as_raw_descriptor(&self) -> RawDescriptor705     fn as_raw_descriptor(&self) -> RawDescriptor {
706         self.ring_file.as_raw_descriptor()
707     }
708 }
709 
710 struct SubmitQueueEntries {
711     mmap: MemoryMapping,
712     len: usize,
713 }
714 
715 impl SubmitQueueEntries {
get_mut(&mut self, index: usize) -> Option<&mut io_uring_sqe>716     fn get_mut(&mut self, index: usize) -> Option<&mut io_uring_sqe> {
717         if index >= self.len {
718             return None;
719         }
720         // SAFETY:
721         // Safe because the mut borrow of self resticts to one mutable reference at a time and
722         // we trust that the kernel has returned enough memory in io_uring_setup and mmap.
723         let mut_ref = unsafe { &mut *(self.mmap.as_ptr() as *mut io_uring_sqe).add(index) };
724         // Clear any state.
725         *mut_ref = io_uring_sqe::default();
726         Some(mut_ref)
727     }
728 }
729 
730 struct SubmitQueueState {
731     _mmap: MemoryMapping,
732     pointers: QueuePointers,
733     ring_mask: u32,
734     array: AtomicPtr<u32>,
735 }
736 
737 impl SubmitQueueState {
738     // # Safety
739     // Safe iff `mmap` is created by mapping from a uring FD at the SQ_RING offset and params is
740     // the params struct passed to io_uring_setup.
new(mmap: MemoryMapping, params: &io_uring_params) -> SubmitQueueState741     unsafe fn new(mmap: MemoryMapping, params: &io_uring_params) -> SubmitQueueState {
742         let ptr = mmap.as_ptr();
743         // Transmutes are safe because a u32 is atomic on all supported architectures and the
744         // pointer will live until after self is dropped because the mmap is owned.
745         let head = ptr.add(params.sq_off.head as usize) as *const AtomicU32;
746         let tail = ptr.add(params.sq_off.tail as usize) as *const AtomicU32;
747         // This offset is guaranteed to be within the mmap so unwrap the result.
748         let ring_mask = mmap.read_obj(params.sq_off.ring_mask as usize).unwrap();
749         let array = AtomicPtr::new(ptr.add(params.sq_off.array as usize) as *mut u32);
750         SubmitQueueState {
751             _mmap: mmap,
752             pointers: QueuePointers { head, tail },
753             ring_mask,
754             array,
755         }
756     }
757 
758     // Sets the kernel's array entry at the given `index` to `value`.
set_array_entry(&self, index: usize, value: u32)759     fn set_array_entry(&self, index: usize, value: u32) {
760         // SAFETY:
761         // Safe because self being constructed from the correct mmap guaratees that the memory is
762         // valid to written.
763         unsafe {
764             std::ptr::write_volatile(self.array.load(Ordering::Relaxed).add(index), value);
765         }
766     }
767 }
768 
769 #[derive(Default)]
770 struct CompleteQueueData {
771     //For ops that pass in arrays of iovecs, they need to be valid for the duration of the
772     //operation because the kernel might read them at any time.
773     pending_op_addrs: BTreeMap<UserData, Pin<Box<[IoBufMut<'static>]>>>,
774 }
775 
776 pub struct CompleteQueueState {
777     mmap: MemoryMapping,
778     pointers: QueuePointers,
779     ring_mask: u32,
780     cqes_offset: u32,
781     data: Mutex<CompleteQueueData>,
782 }
783 
784 impl CompleteQueueState {
785     /// # Safety
786     /// Safe iff `mmap` is created by mapping from a uring FD at the CQ_RING offset and params is
787     /// the params struct passed to io_uring_setup.
new(mmap: MemoryMapping, params: &io_uring_params) -> CompleteQueueState788     unsafe fn new(mmap: MemoryMapping, params: &io_uring_params) -> CompleteQueueState {
789         let ptr = mmap.as_ptr();
790         let head = ptr.add(params.cq_off.head as usize) as *const AtomicU32;
791         let tail = ptr.add(params.cq_off.tail as usize) as *const AtomicU32;
792         let ring_mask = mmap.read_obj(params.cq_off.ring_mask as usize).unwrap();
793         CompleteQueueState {
794             mmap,
795             pointers: QueuePointers { head, tail },
796             ring_mask,
797             cqes_offset: params.cq_off.cqes,
798             data: Default::default(),
799         }
800     }
801 
add_op_data(&self, user_data: UserData, addrs: Pin<Box<[IoBufMut<'static>]>>)802     fn add_op_data(&self, user_data: UserData, addrs: Pin<Box<[IoBufMut<'static>]>>) {
803         self.data.lock().pending_op_addrs.insert(user_data, addrs);
804     }
805 
get_cqe(&self, head: u32) -> &io_uring_cqe806     fn get_cqe(&self, head: u32) -> &io_uring_cqe {
807         // SAFETY:
808         // Safe because we trust that the kernel has returned enough memory in io_uring_setup
809         // and mmap and index is checked within range by the ring_mask.
810         unsafe {
811             let cqes = (self.mmap.as_ptr() as *const u8).add(self.cqes_offset as usize)
812                 as *const io_uring_cqe;
813 
814             let index = head & self.ring_mask;
815 
816             &*cqes.add(index as usize)
817         }
818     }
819 
num_ready(&self) -> u32820     pub fn num_ready(&self) -> u32 {
821         let tail = self.pointers.tail(Ordering::Acquire);
822         let head = self.pointers.head(Ordering::Relaxed);
823 
824         tail.saturating_sub(head)
825     }
826 
pop_front(&self) -> Option<(UserData, std::io::Result<u32>)>827     fn pop_front(&self) -> Option<(UserData, std::io::Result<u32>)> {
828         // Take the lock on self.data first so that 2 threads don't try to pop the same completed op
829         // from the queue.
830         let mut data = self.data.lock();
831 
832         // Safe because the pointers to the atomics are valid and the cqe must be in range
833         // because the kernel provided mask is applied to the index.
834         let head = self.pointers.head(Ordering::Relaxed);
835 
836         // Synchronize the read of tail after the read of head.
837         if head == self.pointers.tail(Ordering::Acquire) {
838             return None;
839         }
840 
841         let cqe = self.get_cqe(head);
842         let user_data = cqe.user_data;
843         let res = cqe.res;
844 
845         // free the addrs saved for this op.
846         let _ = data.pending_op_addrs.remove(&user_data);
847 
848         // Store the new head and ensure the reads above complete before the kernel sees the
849         // update to head, `set_head` uses `Release` ordering
850         let new_head = head.wrapping_add(1);
851         self.pointers.set_head(new_head);
852 
853         let io_res = match res {
854             r if r < 0 => Err(std::io::Error::from_raw_os_error(-r)),
855             r => Ok(r as u32),
856         };
857         Some((user_data, io_res))
858     }
859 }
860 
861 // Return the completed ops with their result.
862 impl<'c> Iterator for &'c CompleteQueueState {
863     type Item = (UserData, std::io::Result<u32>);
864 
next(&mut self) -> Option<Self::Item>865     fn next(&mut self) -> Option<Self::Item> {
866         self.pop_front()
867     }
868 }
869 
870 struct QueuePointers {
871     head: *const AtomicU32,
872     tail: *const AtomicU32,
873 }
874 
875 // SAFETY:
876 // Rust pointers don't implement Send or Sync but in this case both fields are atomics and so it's
877 // safe to send the pointers between threads or access them concurrently from multiple threads.
878 unsafe impl Send for QueuePointers {}
879 // SAFETY: See safety comments for impl Send
880 unsafe impl Sync for QueuePointers {}
881 
882 impl QueuePointers {
883     // Loads the tail pointer atomically with the given ordering.
tail(&self, ordering: Ordering) -> u32884     fn tail(&self, ordering: Ordering) -> u32 {
885         // SAFETY:
886         // Safe because self being constructed from the correct mmap guaratees that the memory is
887         // valid to read.
888         unsafe { (*self.tail).load(ordering) }
889     }
890 
891     // Stores the new value of the tail in the submit queue. This allows the kernel to start
892     // processing entries that have been added up until the given tail pointer.
893     // Always stores with release ordering as that is the only valid way to use the pointer.
set_tail(&self, next_tail: u32)894     fn set_tail(&self, next_tail: u32) {
895         // SAFETY:
896         // Safe because self being constructed from the correct mmap guaratees that the memory is
897         // valid to read and it's used as an atomic to cover mutability concerns.
898         unsafe { (*self.tail).store(next_tail, Ordering::Release) }
899     }
900 
901     // Loads the head pointer atomically with the given ordering.
head(&self, ordering: Ordering) -> u32902     fn head(&self, ordering: Ordering) -> u32 {
903         // SAFETY:
904         // Safe because self being constructed from the correct mmap guaratees that the memory is
905         // valid to read.
906         unsafe { (*self.head).load(ordering) }
907     }
908 
909     // Stores the new value of the head in the submit queue. This allows the kernel to start
910     // processing entries that have been added up until the given head pointer.
911     // Always stores with release ordering as that is the only valid way to use the pointer.
set_head(&self, next_head: u32)912     fn set_head(&self, next_head: u32) {
913         // SAFETY:
914         // Safe because self being constructed from the correct mmap guaratees that the memory is
915         // valid to read and it's used as an atomic to cover mutability concerns.
916         unsafe { (*self.head).store(next_head, Ordering::Release) }
917     }
918 }
919