xref: /aosp_15_r20/external/crosvm/io_uring/tests/uring.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #![cfg(any(target_os = "android", target_os = "linux"))]
6 
7 use std::collections::BTreeSet;
8 use std::fs::File;
9 use std::fs::OpenOptions;
10 use std::io::IoSlice;
11 use std::io::IoSliceMut;
12 use std::io::Read;
13 use std::io::Seek;
14 use std::io::SeekFrom;
15 use std::io::Write;
16 use std::mem;
17 use std::os::unix::io::AsRawFd;
18 use std::os::unix::io::RawFd;
19 use std::path::Path;
20 use std::path::PathBuf;
21 use std::pin::Pin;
22 use std::sync::atomic::AtomicUsize;
23 use std::sync::atomic::Ordering;
24 use std::sync::mpsc::channel;
25 use std::sync::Arc;
26 use std::sync::Barrier;
27 use std::thread;
28 use std::time::Duration;
29 
30 use base::pipe;
31 use base::EventType;
32 use base::IoBufMut;
33 use base::WaitContext;
34 use io_uring::Error;
35 use io_uring::URingAllowlist;
36 use io_uring::URingContext;
37 use io_uring::UserData;
38 use libc::EACCES;
39 use sync::Condvar;
40 use sync::Mutex;
41 use tempfile::tempfile;
42 use tempfile::TempDir;
43 
append_file_name(path: &Path, name: &str) -> PathBuf44 fn append_file_name(path: &Path, name: &str) -> PathBuf {
45     let mut joined = path.to_path_buf();
46     joined.push(name);
47     joined
48 }
49 
50 // TODO(b/315998194): Add safety comment
51 #[allow(clippy::undocumented_unsafe_blocks)]
add_one_read( uring: &URingContext, ptr: *mut u8, len: usize, fd: RawFd, offset: Option<u64>, user_data: UserData, ) -> Result<(), Error>52 unsafe fn add_one_read(
53     uring: &URingContext,
54     ptr: *mut u8,
55     len: usize,
56     fd: RawFd,
57     offset: Option<u64>,
58     user_data: UserData,
59 ) -> Result<(), Error> {
60     uring.add_readv(
61         Pin::from(vec![IoBufMut::from_raw_parts(ptr, len)].into_boxed_slice()),
62         fd,
63         offset,
64         user_data,
65     )
66 }
67 
68 // TODO(b/315998194): Add safety comment
69 #[allow(clippy::undocumented_unsafe_blocks)]
add_one_write( uring: &URingContext, ptr: *const u8, len: usize, fd: RawFd, offset: Option<u64>, user_data: UserData, ) -> Result<(), Error>70 unsafe fn add_one_write(
71     uring: &URingContext,
72     ptr: *const u8,
73     len: usize,
74     fd: RawFd,
75     offset: Option<u64>,
76     user_data: UserData,
77 ) -> Result<(), Error> {
78     uring.add_writev(
79         Pin::from(vec![IoBufMut::from_raw_parts(ptr as *mut u8, len)].into_boxed_slice()),
80         fd,
81         offset,
82         user_data,
83     )
84 }
85 
create_test_file(size: u64) -> std::fs::File86 fn create_test_file(size: u64) -> std::fs::File {
87     let f = tempfile().unwrap();
88     f.set_len(size).unwrap();
89     f
90 }
91 
92 #[test]
93 // Queue as many reads as possible and then collect the completions.
read_parallel()94 fn read_parallel() {
95     const QUEUE_SIZE: usize = 10;
96     const BUF_SIZE: usize = 0x1000;
97 
98     let uring = URingContext::new(QUEUE_SIZE, None).unwrap();
99     let mut buf = [0u8; BUF_SIZE * QUEUE_SIZE];
100     let f = create_test_file((BUF_SIZE * QUEUE_SIZE) as u64);
101 
102     // check that the whole file can be read and that the queues wrapping is handled by reading
103     // double the quue depth of buffers.
104     for i in 0..QUEUE_SIZE * 64 {
105         let index = i as u64;
106         // TODO(b/315998194): Add safety comment
107         #[allow(clippy::undocumented_unsafe_blocks)]
108         unsafe {
109             let offset = (i % QUEUE_SIZE) * BUF_SIZE;
110             match add_one_read(
111                 &uring,
112                 buf[offset..].as_mut_ptr(),
113                 BUF_SIZE,
114                 f.as_raw_fd(),
115                 Some(offset as u64),
116                 index,
117             ) {
118                 Ok(_) => (),
119                 Err(Error::NoSpace) => {
120                     let _ = uring.wait().unwrap().next().unwrap();
121                 }
122                 Err(_) => panic!("unexpected error from uring wait"),
123             }
124         }
125     }
126 }
127 
128 #[test]
read_readv()129 fn read_readv() {
130     let queue_size = 128;
131 
132     let uring = URingContext::new(queue_size, None).unwrap();
133     let mut buf = [0u8; 0x1000];
134     let f = create_test_file(0x1000 * 2);
135 
136     // check that the whole file can be read and that the queues wrapping is handled by reading
137     // double the quue depth of buffers.
138     for i in 0..queue_size * 2 {
139         let index = i as u64;
140         // SAFETY:
141         // safe to transmut from IoSlice to iovec.
142         let io_vecs = unsafe {
143             vec![IoSliceMut::new(&mut buf)]
144                 .into_iter()
145                 .map(|slice| std::mem::transmute::<IoSliceMut, libc::iovec>(slice))
146         };
147         // SAFETY:
148         // Safe because the `wait` call waits until the kernel is done with `buf`.
149         let (user_data_ret, res) = unsafe {
150             uring
151                 .add_readv_iter(io_vecs, f.as_raw_fd(), Some((index % 2) * 0x1000), index)
152                 .unwrap();
153             uring.wait().unwrap().next().unwrap()
154         };
155         assert_eq!(user_data_ret, index);
156         assert_eq!(res.unwrap(), buf.len() as u32);
157     }
158 }
159 
160 #[test]
readv_vec()161 fn readv_vec() {
162     let queue_size = 128;
163     const BUF_SIZE: usize = 0x2000;
164 
165     let uring = URingContext::new(queue_size, None).unwrap();
166     let mut buf = [0u8; BUF_SIZE];
167     let mut buf2 = [0u8; BUF_SIZE];
168     let mut buf3 = [0u8; BUF_SIZE];
169     // SAFETY:
170     //safe to transmut from IoSlice to iovec.
171     let io_vecs = unsafe {
172         vec![
173             IoSliceMut::new(&mut buf),
174             IoSliceMut::new(&mut buf2),
175             IoSliceMut::new(&mut buf3),
176         ]
177         .into_iter()
178         .map(|slice| std::mem::transmute::<IoSliceMut, libc::iovec>(slice))
179         .collect::<Vec<libc::iovec>>()
180     };
181     let total_len = io_vecs.iter().fold(0, |a, iovec| a + iovec.iov_len);
182     let f = create_test_file(total_len as u64 * 2);
183     // SAFETY:
184     // Safe because the `wait` call waits until the kernel is done with `buf`.
185     let (user_data_ret, res) = unsafe {
186         uring
187             .add_readv_iter(io_vecs.into_iter(), f.as_raw_fd(), Some(0), 55)
188             .unwrap();
189         uring.wait().unwrap().next().unwrap()
190     };
191     assert_eq!(user_data_ret, 55);
192     assert_eq!(res.unwrap(), total_len as u32);
193 }
194 
195 #[test]
write_one_block()196 fn write_one_block() {
197     let uring = URingContext::new(16, None).unwrap();
198     let mut buf = [0u8; 4096];
199     let mut f = create_test_file(0);
200     f.write_all(&buf).unwrap();
201     f.write_all(&buf).unwrap();
202 
203     // SAFETY:
204     // Safe because the `wait` call waits until the kernel is done mutating `buf`.
205     unsafe {
206         add_one_write(
207             &uring,
208             buf.as_mut_ptr(),
209             buf.len(),
210             f.as_raw_fd(),
211             Some(0),
212             55,
213         )
214         .unwrap();
215         let (user_data, res) = uring.wait().unwrap().next().unwrap();
216         assert_eq!(user_data, 55_u64);
217         assert_eq!(res.unwrap(), buf.len() as u32);
218     }
219 }
220 
221 #[test]
write_one_submit_poll()222 fn write_one_submit_poll() {
223     let uring = URingContext::new(16, None).unwrap();
224     let mut buf = [0u8; 4096];
225     let mut f = create_test_file(0);
226     f.write_all(&buf).unwrap();
227     f.write_all(&buf).unwrap();
228 
229     let ctx: WaitContext<u64> = WaitContext::build_with(&[(&uring, 1)]).unwrap();
230     {
231         // Test that the uring context isn't readable before any events are complete.
232         let events = ctx.wait_timeout(Duration::from_millis(1)).unwrap();
233         assert!(events.iter().next().is_none());
234     }
235 
236     // SAFETY:
237     // Safe because the `wait` call waits until the kernel is done mutating `buf`.
238     unsafe {
239         add_one_write(
240             &uring,
241             buf.as_mut_ptr(),
242             buf.len(),
243             f.as_raw_fd(),
244             Some(0),
245             55,
246         )
247         .unwrap();
248         uring.submit().unwrap();
249         // Poll for completion with epoll.
250         let events = ctx.wait().unwrap();
251         let event = events.iter().next().unwrap();
252         assert!(event.is_readable);
253         assert_eq!(event.token, 1);
254         let (user_data, res) = uring.wait().unwrap().next().unwrap();
255         assert_eq!(user_data, 55_u64);
256         assert_eq!(res.unwrap(), buf.len() as u32);
257     }
258 }
259 
260 #[test]
writev_vec()261 fn writev_vec() {
262     let queue_size = 128;
263     const BUF_SIZE: usize = 0x2000;
264     const OFFSET: u64 = 0x2000;
265 
266     let uring = URingContext::new(queue_size, None).unwrap();
267     let buf = [0xaau8; BUF_SIZE];
268     let buf2 = [0xffu8; BUF_SIZE];
269     let buf3 = [0x55u8; BUF_SIZE];
270     // SAFETY:
271     //safe to transmut from IoSlice to iovec.
272     let io_vecs = unsafe {
273         vec![IoSlice::new(&buf), IoSlice::new(&buf2), IoSlice::new(&buf3)]
274             .into_iter()
275             .map(|slice| std::mem::transmute::<IoSlice, libc::iovec>(slice))
276             .collect::<Vec<libc::iovec>>()
277     };
278     let total_len = io_vecs.iter().fold(0, |a, iovec| a + iovec.iov_len);
279     let mut f = create_test_file(total_len as u64 * 2);
280     // SAFETY:
281     // Safe because the `wait` call waits until the kernel is done with `buf`.
282     let (user_data_ret, res) = unsafe {
283         uring
284             .add_writev_iter(io_vecs.into_iter(), f.as_raw_fd(), Some(OFFSET), 55)
285             .unwrap();
286         uring.wait().unwrap().next().unwrap()
287     };
288     assert_eq!(user_data_ret, 55);
289     assert_eq!(res.unwrap(), total_len as u32);
290 
291     let mut read_back = [0u8; BUF_SIZE];
292     f.seek(SeekFrom::Start(OFFSET)).unwrap();
293     f.read_exact(&mut read_back).unwrap();
294     assert!(!read_back.iter().any(|&b| b != 0xaa));
295     f.read_exact(&mut read_back).unwrap();
296     assert!(!read_back.iter().any(|&b| b != 0xff));
297     f.read_exact(&mut read_back).unwrap();
298     assert!(!read_back.iter().any(|&b| b != 0x55));
299 }
300 
301 #[test]
fallocate_fsync()302 fn fallocate_fsync() {
303     let tempdir = TempDir::new().unwrap();
304     let file_path = append_file_name(tempdir.path(), "test");
305 
306     {
307         let buf = [0u8; 4096];
308         let mut f = OpenOptions::new()
309             .read(true)
310             .write(true)
311             .create_new(true)
312             .open(&file_path)
313             .unwrap();
314         f.write_all(&buf).unwrap();
315     }
316 
317     let init_size = std::fs::metadata(&file_path).unwrap().len() as usize;
318     let set_size = init_size + 1024 * 1024 * 50;
319     let f = OpenOptions::new()
320         .read(true)
321         .write(true)
322         .create(true)
323         .truncate(false)
324         .open(&file_path)
325         .unwrap();
326 
327     let uring = URingContext::new(16, None).unwrap();
328     uring
329         .add_fallocate(f.as_raw_fd(), 0, set_size as u64, 0, 66)
330         .unwrap();
331     let (user_data, res) = uring.wait().unwrap().next().unwrap();
332     assert_eq!(user_data, 66_u64);
333     match res {
334         Err(e) => {
335             if e.kind() == std::io::ErrorKind::InvalidInput {
336                 // skip on kernels that don't support fallocate.
337                 return;
338             }
339             panic!("Unexpected fallocate error: {}", e);
340         }
341         Ok(val) => assert_eq!(val, 0_u32),
342     }
343 
344     // Add a few writes and then fsync
345     let buf = [0u8; 4096];
346     let mut pending = std::collections::BTreeSet::new();
347     // TODO(b/315998194): Add safety comment
348     #[allow(clippy::undocumented_unsafe_blocks)]
349     unsafe {
350         add_one_write(&uring, buf.as_ptr(), buf.len(), f.as_raw_fd(), Some(0), 67).unwrap();
351         pending.insert(67u64);
352         add_one_write(
353             &uring,
354             buf.as_ptr(),
355             buf.len(),
356             f.as_raw_fd(),
357             Some(4096),
358             68,
359         )
360         .unwrap();
361         pending.insert(68);
362         add_one_write(
363             &uring,
364             buf.as_ptr(),
365             buf.len(),
366             f.as_raw_fd(),
367             Some(8192),
368             69,
369         )
370         .unwrap();
371         pending.insert(69);
372     }
373     uring.add_fsync(f.as_raw_fd(), 70).unwrap();
374     pending.insert(70);
375 
376     let mut wait_calls = 0;
377 
378     while !pending.is_empty() && wait_calls < 5 {
379         let events = uring.wait().unwrap();
380         for (user_data, res) in events {
381             assert!(res.is_ok());
382             assert!(pending.contains(&user_data));
383             pending.remove(&user_data);
384         }
385         wait_calls += 1;
386     }
387     assert!(pending.is_empty());
388 
389     uring
390         .add_fallocate(
391             f.as_raw_fd(),
392             init_size as u64,
393             (set_size - init_size) as u64,
394             (libc::FALLOC_FL_PUNCH_HOLE | libc::FALLOC_FL_KEEP_SIZE) as u32,
395             68,
396         )
397         .unwrap();
398     let (user_data, res) = uring.wait().unwrap().next().unwrap();
399     assert_eq!(user_data, 68_u64);
400     assert_eq!(res.unwrap(), 0_u32);
401 
402     drop(f); // Close to ensure directory entires for metadata are updated.
403 
404     let new_size = std::fs::metadata(&file_path).unwrap().len() as usize;
405     assert_eq!(new_size, set_size);
406 }
407 
408 #[test]
dev_zero_readable()409 fn dev_zero_readable() {
410     let f = File::open(Path::new("/dev/zero")).unwrap();
411     let uring = URingContext::new(16, None).unwrap();
412     uring
413         .add_poll_fd(f.as_raw_fd(), EventType::Read, 454)
414         .unwrap();
415     let (user_data, res) = uring.wait().unwrap().next().unwrap();
416     assert_eq!(user_data, 454_u64);
417     assert_eq!(res.unwrap(), 1_u32);
418 }
419 
420 #[test]
queue_many_ebusy_retry()421 fn queue_many_ebusy_retry() {
422     let num_entries = 16;
423     let f = File::open(Path::new("/dev/zero")).unwrap();
424     let uring = URingContext::new(num_entries, None).unwrap();
425     // Fill the sumbit ring.
426     for sqe_batch in 0..3 {
427         for i in 0..num_entries {
428             uring
429                 .add_poll_fd(
430                     f.as_raw_fd(),
431                     EventType::Read,
432                     (sqe_batch * num_entries + i) as u64,
433                 )
434                 .unwrap();
435         }
436         uring.submit().unwrap();
437     }
438     // Adding more than the number of cqes will cause the uring to return ebusy, make sure that
439     // is handled cleanly and wait still returns the completed entries.
440     uring
441         .add_poll_fd(f.as_raw_fd(), EventType::Read, (num_entries * 3) as u64)
442         .unwrap();
443     // The first wait call should return the cques that are already filled.
444     {
445         let mut results = uring.wait().unwrap();
446         for _i in 0..num_entries * 2 {
447             assert_eq!(results.next().unwrap().1.unwrap(), 1_u32);
448         }
449         assert!(results.next().is_none());
450     }
451     // The second will finish submitting any more sqes and return the rest.
452     let mut results = uring.wait().unwrap();
453     for _i in 0..num_entries + 1 {
454         assert_eq!(results.next().unwrap().1.unwrap(), 1_u32);
455     }
456     assert!(results.next().is_none());
457 }
458 
459 #[test]
wake_with_nop()460 fn wake_with_nop() {
461     const PIPE_READ: UserData = 0;
462     const NOP: UserData = 1;
463     const BUF_DATA: [u8; 16] = [0xf4; 16];
464 
465     let uring = URingContext::new(4, None).map(Arc::new).unwrap();
466     let (pipe_out, mut pipe_in) = pipe().unwrap();
467     let (tx, rx) = channel();
468 
469     let uring2 = uring.clone();
470     let wait_thread = thread::spawn(move || {
471         let mut buf = [0u8; BUF_DATA.len()];
472         // TODO(b/315998194): Add safety comment
473         #[allow(clippy::undocumented_unsafe_blocks)]
474         unsafe {
475             add_one_read(
476                 &uring2,
477                 buf.as_mut_ptr(),
478                 buf.len(),
479                 pipe_out.as_raw_fd(),
480                 Some(0),
481                 0,
482             )
483             .unwrap();
484         }
485 
486         // This is still a bit racy as the other thread may end up adding the NOP before we make
487         // the syscall but I'm not aware of a mechanism that will notify the other thread
488         // exactly when we make the syscall.
489         tx.send(()).unwrap();
490         let mut events = uring2.wait().unwrap();
491         let (user_data, result) = events.next().unwrap();
492         assert_eq!(user_data, NOP);
493         assert_eq!(result.unwrap(), 0);
494 
495         tx.send(()).unwrap();
496         let mut events = uring2.wait().unwrap();
497         let (user_data, result) = events.next().unwrap();
498         assert_eq!(user_data, PIPE_READ);
499         assert_eq!(result.unwrap(), buf.len() as u32);
500         assert_eq!(&buf, &BUF_DATA);
501     });
502 
503     // Wait until the other thread is about to make the syscall.
504     rx.recv_timeout(Duration::from_secs(10)).unwrap();
505 
506     // Now add a NOP operation. This should wake up the other thread even though it cannot yet
507     // read from the pipe.
508     uring.add_nop(NOP).unwrap();
509     uring.submit().unwrap();
510 
511     // Wait for the other thread to process the NOP result.
512     rx.recv_timeout(Duration::from_secs(10)).unwrap();
513 
514     // Now write to the pipe to finish the uring read.
515     pipe_in.write_all(&BUF_DATA).unwrap();
516 
517     wait_thread.join().unwrap();
518 }
519 
520 #[test]
complete_from_any_thread()521 fn complete_from_any_thread() {
522     let num_entries = 16;
523     let uring = URingContext::new(num_entries, None).map(Arc::new).unwrap();
524 
525     // Fill the sumbit ring.
526     for sqe_batch in 0..3 {
527         for i in 0..num_entries {
528             uring.add_nop((sqe_batch * num_entries + i) as u64).unwrap();
529         }
530         uring.submit().unwrap();
531     }
532 
533     // Spawn a bunch of threads that pull cqes out of the uring and make sure none of them see a
534     // duplicate.
535     const NUM_THREADS: usize = 7;
536     let completed = Arc::new(Mutex::new(BTreeSet::new()));
537     let cv = Arc::new(Condvar::new());
538     let barrier = Arc::new(Barrier::new(NUM_THREADS));
539 
540     let mut threads = Vec::with_capacity(NUM_THREADS);
541     for _ in 0..NUM_THREADS {
542         let uring = uring.clone();
543         let completed = completed.clone();
544         let barrier = barrier.clone();
545         let cv = cv.clone();
546         threads.push(thread::spawn(move || {
547             barrier.wait();
548 
549             'wait: while completed.lock().len() < num_entries * 3 {
550                 for (user_data, result) in uring.wait().unwrap() {
551                     assert_eq!(result.unwrap(), 0);
552 
553                     let mut completed = completed.lock();
554                     assert!(completed.insert(user_data));
555                     if completed.len() >= num_entries * 3 {
556                         break 'wait;
557                     }
558                 }
559             }
560 
561             cv.notify_one();
562         }));
563     }
564 
565     // Wait until all the operations have completed.
566     let mut c = completed.lock();
567     while c.len() < num_entries * 3 {
568         c = cv.wait(c);
569     }
570     mem::drop(c);
571 
572     // Let the OS clean up the still-waiting threads after the test run.
573 }
574 
575 #[test]
submit_from_any_thread()576 fn submit_from_any_thread() {
577     const NUM_THREADS: usize = 7;
578     const ITERATIONS: usize = 113;
579     const NUM_ENTRIES: usize = 16;
580 
581     fn wait_for_completion_thread(in_flight: &Mutex<isize>, cv: &Condvar) {
582         let mut in_flight = in_flight.lock();
583         while *in_flight > NUM_ENTRIES as isize {
584             in_flight = cv.wait(in_flight);
585         }
586     }
587 
588     let uring = URingContext::new(NUM_ENTRIES, None).map(Arc::new).unwrap();
589     let in_flight = Arc::new(Mutex::new(0));
590     let cv = Arc::new(Condvar::new());
591 
592     let mut threads = Vec::with_capacity(NUM_THREADS);
593     for idx in 0..NUM_THREADS {
594         let uring = uring.clone();
595         let in_flight = in_flight.clone();
596         let cv = cv.clone();
597         threads.push(thread::spawn(move || {
598             for iter in 0..ITERATIONS {
599                 loop {
600                     match uring.add_nop(((idx * NUM_THREADS) + iter) as UserData) {
601                         Ok(()) => *in_flight.lock() += 1,
602                         Err(Error::NoSpace) => {
603                             wait_for_completion_thread(&in_flight, &cv);
604                             continue;
605                         }
606                         Err(e) => panic!("Failed to add nop: {}", e),
607                     }
608 
609                     // We don't need to wait for the completion queue if the submit fails with
610                     // EBUSY because we already added the operation to the submit queue. It will
611                     // get added eventually.
612                     match uring.submit() {
613                         Ok(()) => break,
614                         Err(Error::RingEnter(libc::EBUSY)) => break,
615                         Err(e) => panic!("Failed to submit ops: {}", e),
616                     }
617                 }
618             }
619         }));
620     }
621 
622     let mut completed = 0;
623     while completed < NUM_THREADS * ITERATIONS {
624         for (_, res) in uring.wait().unwrap() {
625             assert_eq!(res.unwrap(), 0);
626             completed += 1;
627 
628             let mut in_flight = in_flight.lock();
629             *in_flight -= 1;
630             let notify_submitters = *in_flight <= NUM_ENTRIES as isize;
631             mem::drop(in_flight);
632 
633             if notify_submitters {
634                 cv.notify_all();
635             }
636 
637             if completed >= NUM_THREADS * ITERATIONS {
638                 break;
639             }
640         }
641     }
642 
643     for t in threads {
644         t.join().unwrap();
645     }
646 
647     // Make sure we didn't submit more entries than expected.
648     assert_eq!(*in_flight.lock(), 0);
649     assert_eq!(uring.submit_ring.lock().added, 0);
650     assert_eq!(uring.complete_ring.num_ready(), 0);
651 }
652 
653 // TODO(b/183722981): Fix and re-enable test
654 #[test]
655 #[ignore]
multi_thread_submit_and_complete()656 fn multi_thread_submit_and_complete() {
657     const NUM_SUBMITTERS: usize = 7;
658     const NUM_COMPLETERS: usize = 3;
659     const ITERATIONS: usize = 113;
660     const NUM_ENTRIES: usize = 16;
661 
662     fn wait_for_completion_thread(in_flight: &Mutex<isize>, cv: &Condvar) {
663         let mut in_flight = in_flight.lock();
664         while *in_flight > NUM_ENTRIES as isize {
665             in_flight = cv.wait(in_flight);
666         }
667     }
668 
669     let uring = URingContext::new(NUM_ENTRIES, None).map(Arc::new).unwrap();
670     let in_flight = Arc::new(Mutex::new(0));
671     let cv = Arc::new(Condvar::new());
672 
673     let mut threads = Vec::with_capacity(NUM_SUBMITTERS + NUM_COMPLETERS);
674     for idx in 0..NUM_SUBMITTERS {
675         let uring = uring.clone();
676         let in_flight = in_flight.clone();
677         let cv = cv.clone();
678         threads.push(thread::spawn(move || {
679             for iter in 0..ITERATIONS {
680                 loop {
681                     match uring.add_nop(((idx * NUM_SUBMITTERS) + iter) as UserData) {
682                         Ok(()) => *in_flight.lock() += 1,
683                         Err(Error::NoSpace) => {
684                             wait_for_completion_thread(&in_flight, &cv);
685                             continue;
686                         }
687                         Err(e) => panic!("Failed to add nop: {}", e),
688                     }
689 
690                     // We don't need to wait for the completion queue if the submit fails with
691                     // EBUSY because we already added the operation to the submit queue. It will
692                     // get added eventually.
693                     match uring.submit() {
694                         Ok(()) => break,
695                         Err(Error::RingEnter(libc::EBUSY)) => break,
696                         Err(e) => panic!("Failed to submit ops: {}", e),
697                     }
698                 }
699             }
700         }));
701     }
702 
703     let completed = Arc::new(AtomicUsize::new(0));
704     for _ in 0..NUM_COMPLETERS {
705         let uring = uring.clone();
706         let in_flight = in_flight.clone();
707         let cv = cv.clone();
708         let completed = completed.clone();
709         threads.push(thread::spawn(move || {
710             while completed.load(Ordering::Relaxed) < NUM_SUBMITTERS * ITERATIONS {
711                 for (_, res) in uring.wait().unwrap() {
712                     assert_eq!(res.unwrap(), 0);
713                     completed.fetch_add(1, Ordering::Relaxed);
714 
715                     let mut in_flight = in_flight.lock();
716                     *in_flight -= 1;
717                     let notify_submitters = *in_flight <= NUM_ENTRIES as isize;
718                     mem::drop(in_flight);
719 
720                     if notify_submitters {
721                         cv.notify_all();
722                     }
723 
724                     if completed.load(Ordering::Relaxed) >= NUM_SUBMITTERS * ITERATIONS {
725                         break;
726                     }
727                 }
728             }
729         }));
730     }
731 
732     for t in threads.drain(..NUM_SUBMITTERS) {
733         t.join().unwrap();
734     }
735 
736     // Now that all submitters are finished, add NOPs to wake up any completers blocked on the
737     // syscall.
738     for i in 0..NUM_COMPLETERS {
739         uring
740             .add_nop((NUM_SUBMITTERS * ITERATIONS + i) as UserData)
741             .unwrap();
742     }
743     uring.submit().unwrap();
744 
745     for t in threads {
746         t.join().unwrap();
747     }
748 
749     // Make sure we didn't submit more entries than expected. Only the last few NOPs added to
750     // wake up the completer threads may still be in the completion ring.
751     assert!(uring.complete_ring.num_ready() <= NUM_COMPLETERS as u32);
752     assert_eq!(
753         in_flight.lock().unsigned_abs() as u32 + uring.complete_ring.num_ready(),
754         NUM_COMPLETERS as u32
755     );
756     assert_eq!(uring.submit_ring.lock().added, 0);
757 }
758 
759 #[test]
restrict_ops()760 fn restrict_ops() {
761     const TEST_DATA: &[u8; 4] = b"foo!";
762 
763     let queue_size = 128;
764 
765     // Allow only Readv operation
766     let mut restriction = URingAllowlist::new();
767     restriction.allow_submit_operation(io_uring::URingOperation::Readv);
768 
769     let uring = URingContext::new(queue_size, Some(&restriction)).unwrap();
770 
771     let mut buf = [0u8; 4];
772     let mut f = create_test_file(4);
773     f.write_all(TEST_DATA).unwrap();
774 
775     // add_read, which submits Readv, should succeed
776 
777     // TODO(b/315998194): Add safety comment
778     #[allow(clippy::undocumented_unsafe_blocks)]
779     unsafe {
780         add_one_read(
781             &uring,
782             buf.as_mut_ptr(),
783             buf.len(),
784             f.as_raw_fd(),
785             Some(0),
786             0,
787         )
788         .unwrap();
789     }
790     let result = uring.wait().unwrap().next().unwrap();
791     assert!(result.1.is_ok(), "uring read should succeed");
792     assert_eq!(&buf, TEST_DATA, "file should be read to buf");
793     drop(f);
794 
795     // add_write should be rejected.
796 
797     let mut buf: [u8; 4] = TEST_DATA.to_owned(); // fake data, which should not be written
798     let mut f = create_test_file(4);
799 
800     // TODO(b/315998194): Add safety comment
801     #[allow(clippy::undocumented_unsafe_blocks)]
802     unsafe {
803         add_one_write(
804             &uring,
805             buf.as_mut_ptr(),
806             buf.len(),
807             f.as_raw_fd(),
808             Some(0),
809             0,
810         )
811         .unwrap();
812     }
813     let result = uring.wait().unwrap().next().unwrap();
814     assert!(result.1.is_err(), "uring write should fail");
815     assert_eq!(
816         result.1.unwrap_err().raw_os_error(),
817         Some(EACCES),
818         "the error should be permission denied"
819     );
820     let mut result_f = vec![];
821     f.seek(SeekFrom::Start(0)).unwrap(); // rewind to read from the beginning
822     f.read_to_end(&mut result_f).unwrap();
823     assert_eq!(
824         result_f.as_slice(),
825         &[0, 0, 0, 0],
826         "file should not be written and should stay empty"
827     );
828 }
829