1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #![cfg(any(target_os = "android", target_os = "linux"))]
6
7 use std::collections::BTreeSet;
8 use std::fs::File;
9 use std::fs::OpenOptions;
10 use std::io::IoSlice;
11 use std::io::IoSliceMut;
12 use std::io::Read;
13 use std::io::Seek;
14 use std::io::SeekFrom;
15 use std::io::Write;
16 use std::mem;
17 use std::os::unix::io::AsRawFd;
18 use std::os::unix::io::RawFd;
19 use std::path::Path;
20 use std::path::PathBuf;
21 use std::pin::Pin;
22 use std::sync::atomic::AtomicUsize;
23 use std::sync::atomic::Ordering;
24 use std::sync::mpsc::channel;
25 use std::sync::Arc;
26 use std::sync::Barrier;
27 use std::thread;
28 use std::time::Duration;
29
30 use base::pipe;
31 use base::EventType;
32 use base::IoBufMut;
33 use base::WaitContext;
34 use io_uring::Error;
35 use io_uring::URingAllowlist;
36 use io_uring::URingContext;
37 use io_uring::UserData;
38 use libc::EACCES;
39 use sync::Condvar;
40 use sync::Mutex;
41 use tempfile::tempfile;
42 use tempfile::TempDir;
43
append_file_name(path: &Path, name: &str) -> PathBuf44 fn append_file_name(path: &Path, name: &str) -> PathBuf {
45 let mut joined = path.to_path_buf();
46 joined.push(name);
47 joined
48 }
49
50 // TODO(b/315998194): Add safety comment
51 #[allow(clippy::undocumented_unsafe_blocks)]
add_one_read( uring: &URingContext, ptr: *mut u8, len: usize, fd: RawFd, offset: Option<u64>, user_data: UserData, ) -> Result<(), Error>52 unsafe fn add_one_read(
53 uring: &URingContext,
54 ptr: *mut u8,
55 len: usize,
56 fd: RawFd,
57 offset: Option<u64>,
58 user_data: UserData,
59 ) -> Result<(), Error> {
60 uring.add_readv(
61 Pin::from(vec![IoBufMut::from_raw_parts(ptr, len)].into_boxed_slice()),
62 fd,
63 offset,
64 user_data,
65 )
66 }
67
68 // TODO(b/315998194): Add safety comment
69 #[allow(clippy::undocumented_unsafe_blocks)]
add_one_write( uring: &URingContext, ptr: *const u8, len: usize, fd: RawFd, offset: Option<u64>, user_data: UserData, ) -> Result<(), Error>70 unsafe fn add_one_write(
71 uring: &URingContext,
72 ptr: *const u8,
73 len: usize,
74 fd: RawFd,
75 offset: Option<u64>,
76 user_data: UserData,
77 ) -> Result<(), Error> {
78 uring.add_writev(
79 Pin::from(vec![IoBufMut::from_raw_parts(ptr as *mut u8, len)].into_boxed_slice()),
80 fd,
81 offset,
82 user_data,
83 )
84 }
85
create_test_file(size: u64) -> std::fs::File86 fn create_test_file(size: u64) -> std::fs::File {
87 let f = tempfile().unwrap();
88 f.set_len(size).unwrap();
89 f
90 }
91
92 #[test]
93 // Queue as many reads as possible and then collect the completions.
read_parallel()94 fn read_parallel() {
95 const QUEUE_SIZE: usize = 10;
96 const BUF_SIZE: usize = 0x1000;
97
98 let uring = URingContext::new(QUEUE_SIZE, None).unwrap();
99 let mut buf = [0u8; BUF_SIZE * QUEUE_SIZE];
100 let f = create_test_file((BUF_SIZE * QUEUE_SIZE) as u64);
101
102 // check that the whole file can be read and that the queues wrapping is handled by reading
103 // double the quue depth of buffers.
104 for i in 0..QUEUE_SIZE * 64 {
105 let index = i as u64;
106 // TODO(b/315998194): Add safety comment
107 #[allow(clippy::undocumented_unsafe_blocks)]
108 unsafe {
109 let offset = (i % QUEUE_SIZE) * BUF_SIZE;
110 match add_one_read(
111 &uring,
112 buf[offset..].as_mut_ptr(),
113 BUF_SIZE,
114 f.as_raw_fd(),
115 Some(offset as u64),
116 index,
117 ) {
118 Ok(_) => (),
119 Err(Error::NoSpace) => {
120 let _ = uring.wait().unwrap().next().unwrap();
121 }
122 Err(_) => panic!("unexpected error from uring wait"),
123 }
124 }
125 }
126 }
127
128 #[test]
read_readv()129 fn read_readv() {
130 let queue_size = 128;
131
132 let uring = URingContext::new(queue_size, None).unwrap();
133 let mut buf = [0u8; 0x1000];
134 let f = create_test_file(0x1000 * 2);
135
136 // check that the whole file can be read and that the queues wrapping is handled by reading
137 // double the quue depth of buffers.
138 for i in 0..queue_size * 2 {
139 let index = i as u64;
140 // SAFETY:
141 // safe to transmut from IoSlice to iovec.
142 let io_vecs = unsafe {
143 vec![IoSliceMut::new(&mut buf)]
144 .into_iter()
145 .map(|slice| std::mem::transmute::<IoSliceMut, libc::iovec>(slice))
146 };
147 // SAFETY:
148 // Safe because the `wait` call waits until the kernel is done with `buf`.
149 let (user_data_ret, res) = unsafe {
150 uring
151 .add_readv_iter(io_vecs, f.as_raw_fd(), Some((index % 2) * 0x1000), index)
152 .unwrap();
153 uring.wait().unwrap().next().unwrap()
154 };
155 assert_eq!(user_data_ret, index);
156 assert_eq!(res.unwrap(), buf.len() as u32);
157 }
158 }
159
160 #[test]
readv_vec()161 fn readv_vec() {
162 let queue_size = 128;
163 const BUF_SIZE: usize = 0x2000;
164
165 let uring = URingContext::new(queue_size, None).unwrap();
166 let mut buf = [0u8; BUF_SIZE];
167 let mut buf2 = [0u8; BUF_SIZE];
168 let mut buf3 = [0u8; BUF_SIZE];
169 // SAFETY:
170 //safe to transmut from IoSlice to iovec.
171 let io_vecs = unsafe {
172 vec![
173 IoSliceMut::new(&mut buf),
174 IoSliceMut::new(&mut buf2),
175 IoSliceMut::new(&mut buf3),
176 ]
177 .into_iter()
178 .map(|slice| std::mem::transmute::<IoSliceMut, libc::iovec>(slice))
179 .collect::<Vec<libc::iovec>>()
180 };
181 let total_len = io_vecs.iter().fold(0, |a, iovec| a + iovec.iov_len);
182 let f = create_test_file(total_len as u64 * 2);
183 // SAFETY:
184 // Safe because the `wait` call waits until the kernel is done with `buf`.
185 let (user_data_ret, res) = unsafe {
186 uring
187 .add_readv_iter(io_vecs.into_iter(), f.as_raw_fd(), Some(0), 55)
188 .unwrap();
189 uring.wait().unwrap().next().unwrap()
190 };
191 assert_eq!(user_data_ret, 55);
192 assert_eq!(res.unwrap(), total_len as u32);
193 }
194
195 #[test]
write_one_block()196 fn write_one_block() {
197 let uring = URingContext::new(16, None).unwrap();
198 let mut buf = [0u8; 4096];
199 let mut f = create_test_file(0);
200 f.write_all(&buf).unwrap();
201 f.write_all(&buf).unwrap();
202
203 // SAFETY:
204 // Safe because the `wait` call waits until the kernel is done mutating `buf`.
205 unsafe {
206 add_one_write(
207 &uring,
208 buf.as_mut_ptr(),
209 buf.len(),
210 f.as_raw_fd(),
211 Some(0),
212 55,
213 )
214 .unwrap();
215 let (user_data, res) = uring.wait().unwrap().next().unwrap();
216 assert_eq!(user_data, 55_u64);
217 assert_eq!(res.unwrap(), buf.len() as u32);
218 }
219 }
220
221 #[test]
write_one_submit_poll()222 fn write_one_submit_poll() {
223 let uring = URingContext::new(16, None).unwrap();
224 let mut buf = [0u8; 4096];
225 let mut f = create_test_file(0);
226 f.write_all(&buf).unwrap();
227 f.write_all(&buf).unwrap();
228
229 let ctx: WaitContext<u64> = WaitContext::build_with(&[(&uring, 1)]).unwrap();
230 {
231 // Test that the uring context isn't readable before any events are complete.
232 let events = ctx.wait_timeout(Duration::from_millis(1)).unwrap();
233 assert!(events.iter().next().is_none());
234 }
235
236 // SAFETY:
237 // Safe because the `wait` call waits until the kernel is done mutating `buf`.
238 unsafe {
239 add_one_write(
240 &uring,
241 buf.as_mut_ptr(),
242 buf.len(),
243 f.as_raw_fd(),
244 Some(0),
245 55,
246 )
247 .unwrap();
248 uring.submit().unwrap();
249 // Poll for completion with epoll.
250 let events = ctx.wait().unwrap();
251 let event = events.iter().next().unwrap();
252 assert!(event.is_readable);
253 assert_eq!(event.token, 1);
254 let (user_data, res) = uring.wait().unwrap().next().unwrap();
255 assert_eq!(user_data, 55_u64);
256 assert_eq!(res.unwrap(), buf.len() as u32);
257 }
258 }
259
260 #[test]
writev_vec()261 fn writev_vec() {
262 let queue_size = 128;
263 const BUF_SIZE: usize = 0x2000;
264 const OFFSET: u64 = 0x2000;
265
266 let uring = URingContext::new(queue_size, None).unwrap();
267 let buf = [0xaau8; BUF_SIZE];
268 let buf2 = [0xffu8; BUF_SIZE];
269 let buf3 = [0x55u8; BUF_SIZE];
270 // SAFETY:
271 //safe to transmut from IoSlice to iovec.
272 let io_vecs = unsafe {
273 vec![IoSlice::new(&buf), IoSlice::new(&buf2), IoSlice::new(&buf3)]
274 .into_iter()
275 .map(|slice| std::mem::transmute::<IoSlice, libc::iovec>(slice))
276 .collect::<Vec<libc::iovec>>()
277 };
278 let total_len = io_vecs.iter().fold(0, |a, iovec| a + iovec.iov_len);
279 let mut f = create_test_file(total_len as u64 * 2);
280 // SAFETY:
281 // Safe because the `wait` call waits until the kernel is done with `buf`.
282 let (user_data_ret, res) = unsafe {
283 uring
284 .add_writev_iter(io_vecs.into_iter(), f.as_raw_fd(), Some(OFFSET), 55)
285 .unwrap();
286 uring.wait().unwrap().next().unwrap()
287 };
288 assert_eq!(user_data_ret, 55);
289 assert_eq!(res.unwrap(), total_len as u32);
290
291 let mut read_back = [0u8; BUF_SIZE];
292 f.seek(SeekFrom::Start(OFFSET)).unwrap();
293 f.read_exact(&mut read_back).unwrap();
294 assert!(!read_back.iter().any(|&b| b != 0xaa));
295 f.read_exact(&mut read_back).unwrap();
296 assert!(!read_back.iter().any(|&b| b != 0xff));
297 f.read_exact(&mut read_back).unwrap();
298 assert!(!read_back.iter().any(|&b| b != 0x55));
299 }
300
301 #[test]
fallocate_fsync()302 fn fallocate_fsync() {
303 let tempdir = TempDir::new().unwrap();
304 let file_path = append_file_name(tempdir.path(), "test");
305
306 {
307 let buf = [0u8; 4096];
308 let mut f = OpenOptions::new()
309 .read(true)
310 .write(true)
311 .create_new(true)
312 .open(&file_path)
313 .unwrap();
314 f.write_all(&buf).unwrap();
315 }
316
317 let init_size = std::fs::metadata(&file_path).unwrap().len() as usize;
318 let set_size = init_size + 1024 * 1024 * 50;
319 let f = OpenOptions::new()
320 .read(true)
321 .write(true)
322 .create(true)
323 .truncate(false)
324 .open(&file_path)
325 .unwrap();
326
327 let uring = URingContext::new(16, None).unwrap();
328 uring
329 .add_fallocate(f.as_raw_fd(), 0, set_size as u64, 0, 66)
330 .unwrap();
331 let (user_data, res) = uring.wait().unwrap().next().unwrap();
332 assert_eq!(user_data, 66_u64);
333 match res {
334 Err(e) => {
335 if e.kind() == std::io::ErrorKind::InvalidInput {
336 // skip on kernels that don't support fallocate.
337 return;
338 }
339 panic!("Unexpected fallocate error: {}", e);
340 }
341 Ok(val) => assert_eq!(val, 0_u32),
342 }
343
344 // Add a few writes and then fsync
345 let buf = [0u8; 4096];
346 let mut pending = std::collections::BTreeSet::new();
347 // TODO(b/315998194): Add safety comment
348 #[allow(clippy::undocumented_unsafe_blocks)]
349 unsafe {
350 add_one_write(&uring, buf.as_ptr(), buf.len(), f.as_raw_fd(), Some(0), 67).unwrap();
351 pending.insert(67u64);
352 add_one_write(
353 &uring,
354 buf.as_ptr(),
355 buf.len(),
356 f.as_raw_fd(),
357 Some(4096),
358 68,
359 )
360 .unwrap();
361 pending.insert(68);
362 add_one_write(
363 &uring,
364 buf.as_ptr(),
365 buf.len(),
366 f.as_raw_fd(),
367 Some(8192),
368 69,
369 )
370 .unwrap();
371 pending.insert(69);
372 }
373 uring.add_fsync(f.as_raw_fd(), 70).unwrap();
374 pending.insert(70);
375
376 let mut wait_calls = 0;
377
378 while !pending.is_empty() && wait_calls < 5 {
379 let events = uring.wait().unwrap();
380 for (user_data, res) in events {
381 assert!(res.is_ok());
382 assert!(pending.contains(&user_data));
383 pending.remove(&user_data);
384 }
385 wait_calls += 1;
386 }
387 assert!(pending.is_empty());
388
389 uring
390 .add_fallocate(
391 f.as_raw_fd(),
392 init_size as u64,
393 (set_size - init_size) as u64,
394 (libc::FALLOC_FL_PUNCH_HOLE | libc::FALLOC_FL_KEEP_SIZE) as u32,
395 68,
396 )
397 .unwrap();
398 let (user_data, res) = uring.wait().unwrap().next().unwrap();
399 assert_eq!(user_data, 68_u64);
400 assert_eq!(res.unwrap(), 0_u32);
401
402 drop(f); // Close to ensure directory entires for metadata are updated.
403
404 let new_size = std::fs::metadata(&file_path).unwrap().len() as usize;
405 assert_eq!(new_size, set_size);
406 }
407
408 #[test]
dev_zero_readable()409 fn dev_zero_readable() {
410 let f = File::open(Path::new("/dev/zero")).unwrap();
411 let uring = URingContext::new(16, None).unwrap();
412 uring
413 .add_poll_fd(f.as_raw_fd(), EventType::Read, 454)
414 .unwrap();
415 let (user_data, res) = uring.wait().unwrap().next().unwrap();
416 assert_eq!(user_data, 454_u64);
417 assert_eq!(res.unwrap(), 1_u32);
418 }
419
420 #[test]
queue_many_ebusy_retry()421 fn queue_many_ebusy_retry() {
422 let num_entries = 16;
423 let f = File::open(Path::new("/dev/zero")).unwrap();
424 let uring = URingContext::new(num_entries, None).unwrap();
425 // Fill the sumbit ring.
426 for sqe_batch in 0..3 {
427 for i in 0..num_entries {
428 uring
429 .add_poll_fd(
430 f.as_raw_fd(),
431 EventType::Read,
432 (sqe_batch * num_entries + i) as u64,
433 )
434 .unwrap();
435 }
436 uring.submit().unwrap();
437 }
438 // Adding more than the number of cqes will cause the uring to return ebusy, make sure that
439 // is handled cleanly and wait still returns the completed entries.
440 uring
441 .add_poll_fd(f.as_raw_fd(), EventType::Read, (num_entries * 3) as u64)
442 .unwrap();
443 // The first wait call should return the cques that are already filled.
444 {
445 let mut results = uring.wait().unwrap();
446 for _i in 0..num_entries * 2 {
447 assert_eq!(results.next().unwrap().1.unwrap(), 1_u32);
448 }
449 assert!(results.next().is_none());
450 }
451 // The second will finish submitting any more sqes and return the rest.
452 let mut results = uring.wait().unwrap();
453 for _i in 0..num_entries + 1 {
454 assert_eq!(results.next().unwrap().1.unwrap(), 1_u32);
455 }
456 assert!(results.next().is_none());
457 }
458
459 #[test]
wake_with_nop()460 fn wake_with_nop() {
461 const PIPE_READ: UserData = 0;
462 const NOP: UserData = 1;
463 const BUF_DATA: [u8; 16] = [0xf4; 16];
464
465 let uring = URingContext::new(4, None).map(Arc::new).unwrap();
466 let (pipe_out, mut pipe_in) = pipe().unwrap();
467 let (tx, rx) = channel();
468
469 let uring2 = uring.clone();
470 let wait_thread = thread::spawn(move || {
471 let mut buf = [0u8; BUF_DATA.len()];
472 // TODO(b/315998194): Add safety comment
473 #[allow(clippy::undocumented_unsafe_blocks)]
474 unsafe {
475 add_one_read(
476 &uring2,
477 buf.as_mut_ptr(),
478 buf.len(),
479 pipe_out.as_raw_fd(),
480 Some(0),
481 0,
482 )
483 .unwrap();
484 }
485
486 // This is still a bit racy as the other thread may end up adding the NOP before we make
487 // the syscall but I'm not aware of a mechanism that will notify the other thread
488 // exactly when we make the syscall.
489 tx.send(()).unwrap();
490 let mut events = uring2.wait().unwrap();
491 let (user_data, result) = events.next().unwrap();
492 assert_eq!(user_data, NOP);
493 assert_eq!(result.unwrap(), 0);
494
495 tx.send(()).unwrap();
496 let mut events = uring2.wait().unwrap();
497 let (user_data, result) = events.next().unwrap();
498 assert_eq!(user_data, PIPE_READ);
499 assert_eq!(result.unwrap(), buf.len() as u32);
500 assert_eq!(&buf, &BUF_DATA);
501 });
502
503 // Wait until the other thread is about to make the syscall.
504 rx.recv_timeout(Duration::from_secs(10)).unwrap();
505
506 // Now add a NOP operation. This should wake up the other thread even though it cannot yet
507 // read from the pipe.
508 uring.add_nop(NOP).unwrap();
509 uring.submit().unwrap();
510
511 // Wait for the other thread to process the NOP result.
512 rx.recv_timeout(Duration::from_secs(10)).unwrap();
513
514 // Now write to the pipe to finish the uring read.
515 pipe_in.write_all(&BUF_DATA).unwrap();
516
517 wait_thread.join().unwrap();
518 }
519
520 #[test]
complete_from_any_thread()521 fn complete_from_any_thread() {
522 let num_entries = 16;
523 let uring = URingContext::new(num_entries, None).map(Arc::new).unwrap();
524
525 // Fill the sumbit ring.
526 for sqe_batch in 0..3 {
527 for i in 0..num_entries {
528 uring.add_nop((sqe_batch * num_entries + i) as u64).unwrap();
529 }
530 uring.submit().unwrap();
531 }
532
533 // Spawn a bunch of threads that pull cqes out of the uring and make sure none of them see a
534 // duplicate.
535 const NUM_THREADS: usize = 7;
536 let completed = Arc::new(Mutex::new(BTreeSet::new()));
537 let cv = Arc::new(Condvar::new());
538 let barrier = Arc::new(Barrier::new(NUM_THREADS));
539
540 let mut threads = Vec::with_capacity(NUM_THREADS);
541 for _ in 0..NUM_THREADS {
542 let uring = uring.clone();
543 let completed = completed.clone();
544 let barrier = barrier.clone();
545 let cv = cv.clone();
546 threads.push(thread::spawn(move || {
547 barrier.wait();
548
549 'wait: while completed.lock().len() < num_entries * 3 {
550 for (user_data, result) in uring.wait().unwrap() {
551 assert_eq!(result.unwrap(), 0);
552
553 let mut completed = completed.lock();
554 assert!(completed.insert(user_data));
555 if completed.len() >= num_entries * 3 {
556 break 'wait;
557 }
558 }
559 }
560
561 cv.notify_one();
562 }));
563 }
564
565 // Wait until all the operations have completed.
566 let mut c = completed.lock();
567 while c.len() < num_entries * 3 {
568 c = cv.wait(c);
569 }
570 mem::drop(c);
571
572 // Let the OS clean up the still-waiting threads after the test run.
573 }
574
575 #[test]
submit_from_any_thread()576 fn submit_from_any_thread() {
577 const NUM_THREADS: usize = 7;
578 const ITERATIONS: usize = 113;
579 const NUM_ENTRIES: usize = 16;
580
581 fn wait_for_completion_thread(in_flight: &Mutex<isize>, cv: &Condvar) {
582 let mut in_flight = in_flight.lock();
583 while *in_flight > NUM_ENTRIES as isize {
584 in_flight = cv.wait(in_flight);
585 }
586 }
587
588 let uring = URingContext::new(NUM_ENTRIES, None).map(Arc::new).unwrap();
589 let in_flight = Arc::new(Mutex::new(0));
590 let cv = Arc::new(Condvar::new());
591
592 let mut threads = Vec::with_capacity(NUM_THREADS);
593 for idx in 0..NUM_THREADS {
594 let uring = uring.clone();
595 let in_flight = in_flight.clone();
596 let cv = cv.clone();
597 threads.push(thread::spawn(move || {
598 for iter in 0..ITERATIONS {
599 loop {
600 match uring.add_nop(((idx * NUM_THREADS) + iter) as UserData) {
601 Ok(()) => *in_flight.lock() += 1,
602 Err(Error::NoSpace) => {
603 wait_for_completion_thread(&in_flight, &cv);
604 continue;
605 }
606 Err(e) => panic!("Failed to add nop: {}", e),
607 }
608
609 // We don't need to wait for the completion queue if the submit fails with
610 // EBUSY because we already added the operation to the submit queue. It will
611 // get added eventually.
612 match uring.submit() {
613 Ok(()) => break,
614 Err(Error::RingEnter(libc::EBUSY)) => break,
615 Err(e) => panic!("Failed to submit ops: {}", e),
616 }
617 }
618 }
619 }));
620 }
621
622 let mut completed = 0;
623 while completed < NUM_THREADS * ITERATIONS {
624 for (_, res) in uring.wait().unwrap() {
625 assert_eq!(res.unwrap(), 0);
626 completed += 1;
627
628 let mut in_flight = in_flight.lock();
629 *in_flight -= 1;
630 let notify_submitters = *in_flight <= NUM_ENTRIES as isize;
631 mem::drop(in_flight);
632
633 if notify_submitters {
634 cv.notify_all();
635 }
636
637 if completed >= NUM_THREADS * ITERATIONS {
638 break;
639 }
640 }
641 }
642
643 for t in threads {
644 t.join().unwrap();
645 }
646
647 // Make sure we didn't submit more entries than expected.
648 assert_eq!(*in_flight.lock(), 0);
649 assert_eq!(uring.submit_ring.lock().added, 0);
650 assert_eq!(uring.complete_ring.num_ready(), 0);
651 }
652
653 // TODO(b/183722981): Fix and re-enable test
654 #[test]
655 #[ignore]
multi_thread_submit_and_complete()656 fn multi_thread_submit_and_complete() {
657 const NUM_SUBMITTERS: usize = 7;
658 const NUM_COMPLETERS: usize = 3;
659 const ITERATIONS: usize = 113;
660 const NUM_ENTRIES: usize = 16;
661
662 fn wait_for_completion_thread(in_flight: &Mutex<isize>, cv: &Condvar) {
663 let mut in_flight = in_flight.lock();
664 while *in_flight > NUM_ENTRIES as isize {
665 in_flight = cv.wait(in_flight);
666 }
667 }
668
669 let uring = URingContext::new(NUM_ENTRIES, None).map(Arc::new).unwrap();
670 let in_flight = Arc::new(Mutex::new(0));
671 let cv = Arc::new(Condvar::new());
672
673 let mut threads = Vec::with_capacity(NUM_SUBMITTERS + NUM_COMPLETERS);
674 for idx in 0..NUM_SUBMITTERS {
675 let uring = uring.clone();
676 let in_flight = in_flight.clone();
677 let cv = cv.clone();
678 threads.push(thread::spawn(move || {
679 for iter in 0..ITERATIONS {
680 loop {
681 match uring.add_nop(((idx * NUM_SUBMITTERS) + iter) as UserData) {
682 Ok(()) => *in_flight.lock() += 1,
683 Err(Error::NoSpace) => {
684 wait_for_completion_thread(&in_flight, &cv);
685 continue;
686 }
687 Err(e) => panic!("Failed to add nop: {}", e),
688 }
689
690 // We don't need to wait for the completion queue if the submit fails with
691 // EBUSY because we already added the operation to the submit queue. It will
692 // get added eventually.
693 match uring.submit() {
694 Ok(()) => break,
695 Err(Error::RingEnter(libc::EBUSY)) => break,
696 Err(e) => panic!("Failed to submit ops: {}", e),
697 }
698 }
699 }
700 }));
701 }
702
703 let completed = Arc::new(AtomicUsize::new(0));
704 for _ in 0..NUM_COMPLETERS {
705 let uring = uring.clone();
706 let in_flight = in_flight.clone();
707 let cv = cv.clone();
708 let completed = completed.clone();
709 threads.push(thread::spawn(move || {
710 while completed.load(Ordering::Relaxed) < NUM_SUBMITTERS * ITERATIONS {
711 for (_, res) in uring.wait().unwrap() {
712 assert_eq!(res.unwrap(), 0);
713 completed.fetch_add(1, Ordering::Relaxed);
714
715 let mut in_flight = in_flight.lock();
716 *in_flight -= 1;
717 let notify_submitters = *in_flight <= NUM_ENTRIES as isize;
718 mem::drop(in_flight);
719
720 if notify_submitters {
721 cv.notify_all();
722 }
723
724 if completed.load(Ordering::Relaxed) >= NUM_SUBMITTERS * ITERATIONS {
725 break;
726 }
727 }
728 }
729 }));
730 }
731
732 for t in threads.drain(..NUM_SUBMITTERS) {
733 t.join().unwrap();
734 }
735
736 // Now that all submitters are finished, add NOPs to wake up any completers blocked on the
737 // syscall.
738 for i in 0..NUM_COMPLETERS {
739 uring
740 .add_nop((NUM_SUBMITTERS * ITERATIONS + i) as UserData)
741 .unwrap();
742 }
743 uring.submit().unwrap();
744
745 for t in threads {
746 t.join().unwrap();
747 }
748
749 // Make sure we didn't submit more entries than expected. Only the last few NOPs added to
750 // wake up the completer threads may still be in the completion ring.
751 assert!(uring.complete_ring.num_ready() <= NUM_COMPLETERS as u32);
752 assert_eq!(
753 in_flight.lock().unsigned_abs() as u32 + uring.complete_ring.num_ready(),
754 NUM_COMPLETERS as u32
755 );
756 assert_eq!(uring.submit_ring.lock().added, 0);
757 }
758
759 #[test]
restrict_ops()760 fn restrict_ops() {
761 const TEST_DATA: &[u8; 4] = b"foo!";
762
763 let queue_size = 128;
764
765 // Allow only Readv operation
766 let mut restriction = URingAllowlist::new();
767 restriction.allow_submit_operation(io_uring::URingOperation::Readv);
768
769 let uring = URingContext::new(queue_size, Some(&restriction)).unwrap();
770
771 let mut buf = [0u8; 4];
772 let mut f = create_test_file(4);
773 f.write_all(TEST_DATA).unwrap();
774
775 // add_read, which submits Readv, should succeed
776
777 // TODO(b/315998194): Add safety comment
778 #[allow(clippy::undocumented_unsafe_blocks)]
779 unsafe {
780 add_one_read(
781 &uring,
782 buf.as_mut_ptr(),
783 buf.len(),
784 f.as_raw_fd(),
785 Some(0),
786 0,
787 )
788 .unwrap();
789 }
790 let result = uring.wait().unwrap().next().unwrap();
791 assert!(result.1.is_ok(), "uring read should succeed");
792 assert_eq!(&buf, TEST_DATA, "file should be read to buf");
793 drop(f);
794
795 // add_write should be rejected.
796
797 let mut buf: [u8; 4] = TEST_DATA.to_owned(); // fake data, which should not be written
798 let mut f = create_test_file(4);
799
800 // TODO(b/315998194): Add safety comment
801 #[allow(clippy::undocumented_unsafe_blocks)]
802 unsafe {
803 add_one_write(
804 &uring,
805 buf.as_mut_ptr(),
806 buf.len(),
807 f.as_raw_fd(),
808 Some(0),
809 0,
810 )
811 .unwrap();
812 }
813 let result = uring.wait().unwrap().next().unwrap();
814 assert!(result.1.is_err(), "uring write should fail");
815 assert_eq!(
816 result.1.unwrap_err().raw_os_error(),
817 Some(EACCES),
818 "the error should be permission denied"
819 );
820 let mut result_f = vec![];
821 f.seek(SeekFrom::Start(0)).unwrap(); // rewind to read from the beginning
822 f.read_to_end(&mut result_f).unwrap();
823 assert_eq!(
824 result_f.as_slice(),
825 &[0, 0, 0, 0],
826 "file should not be written and should stay empty"
827 );
828 }
829