xref: /aosp_15_r20/external/crosvm/cros_async/src/sys/linux/uring_source.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1 // Copyright 2020 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::ops::Deref;
6 use std::ops::DerefMut;
7 use std::sync::Arc;
8 
9 use base::sys::FallocateMode;
10 use base::AsRawDescriptor;
11 
12 use super::uring_executor::RegisteredSource;
13 use super::uring_executor::Result;
14 use super::uring_executor::UringReactor;
15 use crate::common_executor::RawExecutor;
16 use crate::mem::BackingMemory;
17 use crate::mem::MemRegion;
18 use crate::mem::VecIoWrapper;
19 use crate::AsyncResult;
20 
21 /// `UringSource` wraps FD backed IO sources for use with io_uring. It is a thin wrapper around
22 /// registering an IO source with the uring that provides an `IoSource` implementation.
23 pub struct UringSource<F: AsRawDescriptor> {
24     registered_source: RegisteredSource,
25     source: F,
26 }
27 
28 impl<F: AsRawDescriptor> UringSource<F> {
29     /// Creates a new `UringSource` that wraps the given `io_source` object.
new(io_source: F, ex: &Arc<RawExecutor<UringReactor>>) -> Result<UringSource<F>>30     pub fn new(io_source: F, ex: &Arc<RawExecutor<UringReactor>>) -> Result<UringSource<F>> {
31         let r = ex.reactor.register_source(ex, &io_source)?;
32         Ok(UringSource {
33             registered_source: r,
34             source: io_source,
35         })
36     }
37 
38     /// Reads from the iosource at `file_offset` and fill the given `vec`.
read_to_vec( &self, file_offset: Option<u64>, vec: Vec<u8>, ) -> AsyncResult<(usize, Vec<u8>)>39     pub async fn read_to_vec(
40         &self,
41         file_offset: Option<u64>,
42         vec: Vec<u8>,
43     ) -> AsyncResult<(usize, Vec<u8>)> {
44         let buf = Arc::new(VecIoWrapper::from(vec));
45         let op = self.registered_source.start_read_to_mem(
46             file_offset,
47             buf.clone(),
48             [MemRegion {
49                 offset: 0,
50                 len: buf.len(),
51             }],
52         )?;
53         let len = op.await?;
54         let bytes = if let Ok(v) = Arc::try_unwrap(buf) {
55             v.into()
56         } else {
57             panic!("too many refs on buf");
58         };
59 
60         Ok((len as usize, bytes))
61     }
62 
63     /// Wait for the FD of `self` to be readable.
wait_readable(&self) -> AsyncResult<()>64     pub async fn wait_readable(&self) -> AsyncResult<()> {
65         let op = self.registered_source.poll_fd_readable()?;
66         op.await?;
67         Ok(())
68     }
69 
70     /// Reads to the given `mem` at the given offsets from the file starting at `file_offset`.
read_to_mem( &self, file_offset: Option<u64>, mem: Arc<dyn BackingMemory + Send + Sync>, mem_offsets: impl IntoIterator<Item = MemRegion>, ) -> AsyncResult<usize>71     pub async fn read_to_mem(
72         &self,
73         file_offset: Option<u64>,
74         mem: Arc<dyn BackingMemory + Send + Sync>,
75         mem_offsets: impl IntoIterator<Item = MemRegion>,
76     ) -> AsyncResult<usize> {
77         let op = self
78             .registered_source
79             .start_read_to_mem(file_offset, mem, mem_offsets)?;
80         let len = op.await?;
81         Ok(len as usize)
82     }
83 
84     /// Writes from the given `vec` to the file starting at `file_offset`.
write_from_vec( &self, file_offset: Option<u64>, vec: Vec<u8>, ) -> AsyncResult<(usize, Vec<u8>)>85     pub async fn write_from_vec(
86         &self,
87         file_offset: Option<u64>,
88         vec: Vec<u8>,
89     ) -> AsyncResult<(usize, Vec<u8>)> {
90         let buf = Arc::new(VecIoWrapper::from(vec));
91         let op = self.registered_source.start_write_from_mem(
92             file_offset,
93             buf.clone(),
94             [MemRegion {
95                 offset: 0,
96                 len: buf.len(),
97             }],
98         )?;
99         let len = op.await?;
100         let bytes = if let Ok(v) = Arc::try_unwrap(buf) {
101             v.into()
102         } else {
103             panic!("too many refs on buf");
104         };
105 
106         Ok((len as usize, bytes))
107     }
108 
109     /// Writes from the given `mem` from the given offsets to the file starting at `file_offset`.
write_from_mem( &self, file_offset: Option<u64>, mem: Arc<dyn BackingMemory + Send + Sync>, mem_offsets: impl IntoIterator<Item = MemRegion>, ) -> AsyncResult<usize>110     pub async fn write_from_mem(
111         &self,
112         file_offset: Option<u64>,
113         mem: Arc<dyn BackingMemory + Send + Sync>,
114         mem_offsets: impl IntoIterator<Item = MemRegion>,
115     ) -> AsyncResult<usize> {
116         let op = self
117             .registered_source
118             .start_write_from_mem(file_offset, mem, mem_offsets)?;
119         let len = op.await?;
120         Ok(len as usize)
121     }
122 
123     /// Deallocates the given range of a file.
punch_hole(&self, file_offset: u64, len: u64) -> AsyncResult<()>124     pub async fn punch_hole(&self, file_offset: u64, len: u64) -> AsyncResult<()> {
125         let op = self.registered_source.start_fallocate(
126             file_offset,
127             len,
128             FallocateMode::PunchHole.into(),
129         )?;
130         let _ = op.await?;
131         Ok(())
132     }
133 
134     /// Fills the given range with zeroes.
write_zeroes_at(&self, file_offset: u64, len: u64) -> AsyncResult<()>135     pub async fn write_zeroes_at(&self, file_offset: u64, len: u64) -> AsyncResult<()> {
136         let op = self.registered_source.start_fallocate(
137             file_offset,
138             len,
139             FallocateMode::ZeroRange.into(),
140         )?;
141         let _ = op.await?;
142         Ok(())
143     }
144 
145     /// Sync all completed write operations to the backing storage.
fsync(&self) -> AsyncResult<()>146     pub async fn fsync(&self) -> AsyncResult<()> {
147         let op = self.registered_source.start_fsync()?;
148         let _ = op.await?;
149         Ok(())
150     }
151 
152     /// Sync all data of completed write operations to the backing storage. Currently, the
153     /// implementation is equivalent to fsync.
fdatasync(&self) -> AsyncResult<()>154     pub async fn fdatasync(&self) -> AsyncResult<()> {
155         // Currently io_uring does not implement fdatasync. Fall back to fsync.
156         // TODO(b/281609112): Implement real fdatasync with io_uring.
157         self.fsync().await
158     }
159 
160     /// Yields the underlying IO source.
into_source(self) -> F161     pub fn into_source(self) -> F {
162         self.source
163     }
164 
165     /// Provides a mutable ref to the underlying IO source.
as_source(&self) -> &F166     pub fn as_source(&self) -> &F {
167         &self.source
168     }
169 
170     /// Provides a ref to the underlying IO source.
as_source_mut(&mut self) -> &mut F171     pub fn as_source_mut(&mut self) -> &mut F {
172         &mut self.source
173     }
174 }
175 
176 impl<F: AsRawDescriptor> Deref for UringSource<F> {
177     type Target = F;
178 
deref(&self) -> &Self::Target179     fn deref(&self) -> &Self::Target {
180         &self.source
181     }
182 }
183 
184 impl<F: AsRawDescriptor> DerefMut for UringSource<F> {
deref_mut(&mut self) -> &mut Self::Target185     fn deref_mut(&mut self) -> &mut Self::Target {
186         &mut self.source
187     }
188 }
189 
190 // NOTE: Prefer adding tests to io_source.rs if not backend specific.
191 #[cfg(test)]
192 mod tests {
193     use std::fs::File;
194     use std::future::Future;
195     use std::pin::Pin;
196     use std::task::Context;
197     use std::task::Poll;
198     use std::task::Waker;
199 
200     use sync::Mutex;
201 
202     use super::super::uring_executor::is_uring_stable;
203     use super::super::UringSource;
204     use super::*;
205     use crate::sys::linux::ExecutorKindSys;
206     use crate::Executor;
207     use crate::ExecutorTrait;
208     use crate::IoSource;
209 
read_u64<T: AsRawDescriptor>(source: &UringSource<T>) -> u64210     async fn read_u64<T: AsRawDescriptor>(source: &UringSource<T>) -> u64 {
211         // Init a vec that translates to u64::max;
212         let u64_mem = vec![0xffu8; std::mem::size_of::<u64>()];
213         let (ret, u64_mem) = source.read_to_vec(None, u64_mem).await.unwrap();
214         assert_eq!(ret, std::mem::size_of::<u64>());
215         let mut val = 0u64.to_ne_bytes();
216         val.copy_from_slice(&u64_mem);
217         u64::from_ne_bytes(val)
218     }
219 
220     #[test]
event()221     fn event() {
222         if !is_uring_stable() {
223             return;
224         }
225 
226         use base::Event;
227         use base::EventExt;
228 
229         async fn write_event(ev: Event, wait: Event, ex: &Arc<RawExecutor<UringReactor>>) {
230             let wait = UringSource::new(wait, ex).unwrap();
231             ev.write_count(55).unwrap();
232             read_u64(&wait).await;
233             ev.write_count(66).unwrap();
234             read_u64(&wait).await;
235             ev.write_count(77).unwrap();
236             read_u64(&wait).await;
237         }
238 
239         async fn read_events(ev: Event, signal: Event, ex: &Arc<RawExecutor<UringReactor>>) {
240             let source = UringSource::new(ev, ex).unwrap();
241             assert_eq!(read_u64(&source).await, 55);
242             signal.signal().unwrap();
243             assert_eq!(read_u64(&source).await, 66);
244             signal.signal().unwrap();
245             assert_eq!(read_u64(&source).await, 77);
246             signal.signal().unwrap();
247         }
248 
249         let event = Event::new().unwrap();
250         let signal_wait = Event::new().unwrap();
251         let ex = RawExecutor::<UringReactor>::new().unwrap();
252         let write_task = write_event(
253             event.try_clone().unwrap(),
254             signal_wait.try_clone().unwrap(),
255             &ex,
256         );
257         let read_task = read_events(event, signal_wait, &ex);
258         ex.run_until(futures::future::join(read_task, write_task))
259             .unwrap();
260     }
261 
262     #[test]
pend_on_pipe()263     fn pend_on_pipe() {
264         if !is_uring_stable() {
265             return;
266         }
267 
268         use std::io::Write;
269 
270         use futures::future::Either;
271 
272         async fn do_test(ex: &Arc<RawExecutor<UringReactor>>) {
273             let (read_source, mut w) = base::pipe().unwrap();
274             let source = UringSource::new(read_source, ex).unwrap();
275             let done = Box::pin(async { 5usize });
276             let pending = Box::pin(read_u64(&source));
277             match futures::future::select(pending, done).await {
278                 Either::Right((5, pending)) => {
279                     // Write to the pipe so that the kernel will release the memory associated with
280                     // the uring read operation.
281                     w.write_all(&[0]).expect("failed to write to pipe");
282                     ::std::mem::drop(pending);
283                 }
284                 _ => panic!("unexpected select result"),
285             };
286         }
287 
288         let ex = RawExecutor::<UringReactor>::new().unwrap();
289         ex.run_until(do_test(&ex)).unwrap();
290     }
291 
292     #[test]
range_error()293     fn range_error() {
294         if !is_uring_stable() {
295             return;
296         }
297 
298         async fn go(ex: &Arc<RawExecutor<UringReactor>>) {
299             let f = File::open("/dev/zero").unwrap();
300             let source = UringSource::new(f, ex).unwrap();
301             let v = vec![0x55u8; 64];
302             let vw = Arc::new(VecIoWrapper::from(v));
303             let ret = source
304                 .read_to_mem(
305                     None,
306                     Arc::<VecIoWrapper>::clone(&vw),
307                     [MemRegion {
308                         offset: 32,
309                         len: 33,
310                     }],
311                 )
312                 .await;
313             assert!(ret.is_err());
314         }
315 
316         let ex = RawExecutor::<UringReactor>::new().unwrap();
317         ex.run_until(go(&ex)).unwrap();
318     }
319 
320     #[test]
wait_read()321     fn wait_read() {
322         if !is_uring_stable() {
323             return;
324         }
325 
326         async fn go(ex: &Arc<RawExecutor<UringReactor>>) {
327             let f = File::open("/dev/zero").unwrap();
328             let source = UringSource::new(f, ex).unwrap();
329             source.wait_readable().await.unwrap();
330         }
331 
332         let ex = RawExecutor::<UringReactor>::new().unwrap();
333         ex.run_until(go(&ex)).unwrap();
334     }
335 
336     struct State {
337         should_quit: bool,
338         waker: Option<Waker>,
339     }
340 
341     impl State {
wake(&mut self)342         fn wake(&mut self) {
343             self.should_quit = true;
344             let waker = self.waker.take();
345 
346             if let Some(waker) = waker {
347                 waker.wake();
348             }
349         }
350     }
351 
352     struct Quit {
353         state: Arc<Mutex<State>>,
354     }
355 
356     impl Future for Quit {
357         type Output = ();
358 
poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()>359         fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
360             let mut state = self.state.lock();
361             if state.should_quit {
362                 return Poll::Ready(());
363             }
364 
365             state.waker = Some(cx.waker().clone());
366             Poll::Pending
367         }
368     }
369 
370     #[cfg(any(target_os = "android", target_os = "linux"))]
371     #[test]
await_uring_from_poll()372     fn await_uring_from_poll() {
373         if !is_uring_stable() {
374             return;
375         }
376         // Start a uring operation and then await the result from an FdExecutor.
377         async fn go(source: IoSource<File>) {
378             let v = vec![0xa4u8; 16];
379             let (len, vec) = source.read_to_vec(None, v).await.unwrap();
380             assert_eq!(len, 16);
381             assert!(vec.iter().all(|&b| b == 0));
382         }
383 
384         let state = Arc::new(Mutex::new(State {
385             should_quit: false,
386             waker: None,
387         }));
388 
389         let uring_ex = Executor::with_executor_kind(ExecutorKindSys::Uring.into()).unwrap();
390         let f = File::open("/dev/zero").unwrap();
391         let source = uring_ex.async_from(f).unwrap();
392 
393         let quit = Quit {
394             state: state.clone(),
395         };
396         let handle = std::thread::spawn(move || uring_ex.run_until(quit));
397 
398         let poll_ex = Executor::with_executor_kind(ExecutorKindSys::Fd.into()).unwrap();
399         poll_ex.run_until(go(source)).unwrap();
400 
401         state.lock().wake();
402         handle.join().unwrap().unwrap();
403     }
404 
405     #[cfg(any(target_os = "android", target_os = "linux"))]
406     #[test]
await_poll_from_uring()407     fn await_poll_from_uring() {
408         if !is_uring_stable() {
409             return;
410         }
411         // Start a poll operation and then await the result
412         async fn go(source: IoSource<File>) {
413             let v = vec![0x2cu8; 16];
414             let (len, vec) = source.read_to_vec(None, v).await.unwrap();
415             assert_eq!(len, 16);
416             assert!(vec.iter().all(|&b| b == 0));
417         }
418 
419         let state = Arc::new(Mutex::new(State {
420             should_quit: false,
421             waker: None,
422         }));
423 
424         let poll_ex = Executor::with_executor_kind(ExecutorKindSys::Fd.into()).unwrap();
425         let f = File::open("/dev/zero").unwrap();
426         let source = poll_ex.async_from(f).unwrap();
427 
428         let quit = Quit {
429             state: state.clone(),
430         };
431         let handle = std::thread::spawn(move || poll_ex.run_until(quit));
432 
433         let uring_ex = Executor::with_executor_kind(ExecutorKindSys::Uring.into()).unwrap();
434         uring_ex.run_until(go(source)).unwrap();
435 
436         state.lock().wake();
437         handle.join().unwrap().unwrap();
438     }
439 }
440