1 // Copyright 2020 The ChromiumOS Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 use std::ops::Deref; 6 use std::ops::DerefMut; 7 use std::sync::Arc; 8 9 use base::sys::FallocateMode; 10 use base::AsRawDescriptor; 11 12 use super::uring_executor::RegisteredSource; 13 use super::uring_executor::Result; 14 use super::uring_executor::UringReactor; 15 use crate::common_executor::RawExecutor; 16 use crate::mem::BackingMemory; 17 use crate::mem::MemRegion; 18 use crate::mem::VecIoWrapper; 19 use crate::AsyncResult; 20 21 /// `UringSource` wraps FD backed IO sources for use with io_uring. It is a thin wrapper around 22 /// registering an IO source with the uring that provides an `IoSource` implementation. 23 pub struct UringSource<F: AsRawDescriptor> { 24 registered_source: RegisteredSource, 25 source: F, 26 } 27 28 impl<F: AsRawDescriptor> UringSource<F> { 29 /// Creates a new `UringSource` that wraps the given `io_source` object. new(io_source: F, ex: &Arc<RawExecutor<UringReactor>>) -> Result<UringSource<F>>30 pub fn new(io_source: F, ex: &Arc<RawExecutor<UringReactor>>) -> Result<UringSource<F>> { 31 let r = ex.reactor.register_source(ex, &io_source)?; 32 Ok(UringSource { 33 registered_source: r, 34 source: io_source, 35 }) 36 } 37 38 /// Reads from the iosource at `file_offset` and fill the given `vec`. read_to_vec( &self, file_offset: Option<u64>, vec: Vec<u8>, ) -> AsyncResult<(usize, Vec<u8>)>39 pub async fn read_to_vec( 40 &self, 41 file_offset: Option<u64>, 42 vec: Vec<u8>, 43 ) -> AsyncResult<(usize, Vec<u8>)> { 44 let buf = Arc::new(VecIoWrapper::from(vec)); 45 let op = self.registered_source.start_read_to_mem( 46 file_offset, 47 buf.clone(), 48 [MemRegion { 49 offset: 0, 50 len: buf.len(), 51 }], 52 )?; 53 let len = op.await?; 54 let bytes = if let Ok(v) = Arc::try_unwrap(buf) { 55 v.into() 56 } else { 57 panic!("too many refs on buf"); 58 }; 59 60 Ok((len as usize, bytes)) 61 } 62 63 /// Wait for the FD of `self` to be readable. wait_readable(&self) -> AsyncResult<()>64 pub async fn wait_readable(&self) -> AsyncResult<()> { 65 let op = self.registered_source.poll_fd_readable()?; 66 op.await?; 67 Ok(()) 68 } 69 70 /// Reads to the given `mem` at the given offsets from the file starting at `file_offset`. read_to_mem( &self, file_offset: Option<u64>, mem: Arc<dyn BackingMemory + Send + Sync>, mem_offsets: impl IntoIterator<Item = MemRegion>, ) -> AsyncResult<usize>71 pub async fn read_to_mem( 72 &self, 73 file_offset: Option<u64>, 74 mem: Arc<dyn BackingMemory + Send + Sync>, 75 mem_offsets: impl IntoIterator<Item = MemRegion>, 76 ) -> AsyncResult<usize> { 77 let op = self 78 .registered_source 79 .start_read_to_mem(file_offset, mem, mem_offsets)?; 80 let len = op.await?; 81 Ok(len as usize) 82 } 83 84 /// Writes from the given `vec` to the file starting at `file_offset`. write_from_vec( &self, file_offset: Option<u64>, vec: Vec<u8>, ) -> AsyncResult<(usize, Vec<u8>)>85 pub async fn write_from_vec( 86 &self, 87 file_offset: Option<u64>, 88 vec: Vec<u8>, 89 ) -> AsyncResult<(usize, Vec<u8>)> { 90 let buf = Arc::new(VecIoWrapper::from(vec)); 91 let op = self.registered_source.start_write_from_mem( 92 file_offset, 93 buf.clone(), 94 [MemRegion { 95 offset: 0, 96 len: buf.len(), 97 }], 98 )?; 99 let len = op.await?; 100 let bytes = if let Ok(v) = Arc::try_unwrap(buf) { 101 v.into() 102 } else { 103 panic!("too many refs on buf"); 104 }; 105 106 Ok((len as usize, bytes)) 107 } 108 109 /// Writes from the given `mem` from the given offsets to the file starting at `file_offset`. write_from_mem( &self, file_offset: Option<u64>, mem: Arc<dyn BackingMemory + Send + Sync>, mem_offsets: impl IntoIterator<Item = MemRegion>, ) -> AsyncResult<usize>110 pub async fn write_from_mem( 111 &self, 112 file_offset: Option<u64>, 113 mem: Arc<dyn BackingMemory + Send + Sync>, 114 mem_offsets: impl IntoIterator<Item = MemRegion>, 115 ) -> AsyncResult<usize> { 116 let op = self 117 .registered_source 118 .start_write_from_mem(file_offset, mem, mem_offsets)?; 119 let len = op.await?; 120 Ok(len as usize) 121 } 122 123 /// Deallocates the given range of a file. punch_hole(&self, file_offset: u64, len: u64) -> AsyncResult<()>124 pub async fn punch_hole(&self, file_offset: u64, len: u64) -> AsyncResult<()> { 125 let op = self.registered_source.start_fallocate( 126 file_offset, 127 len, 128 FallocateMode::PunchHole.into(), 129 )?; 130 let _ = op.await?; 131 Ok(()) 132 } 133 134 /// Fills the given range with zeroes. write_zeroes_at(&self, file_offset: u64, len: u64) -> AsyncResult<()>135 pub async fn write_zeroes_at(&self, file_offset: u64, len: u64) -> AsyncResult<()> { 136 let op = self.registered_source.start_fallocate( 137 file_offset, 138 len, 139 FallocateMode::ZeroRange.into(), 140 )?; 141 let _ = op.await?; 142 Ok(()) 143 } 144 145 /// Sync all completed write operations to the backing storage. fsync(&self) -> AsyncResult<()>146 pub async fn fsync(&self) -> AsyncResult<()> { 147 let op = self.registered_source.start_fsync()?; 148 let _ = op.await?; 149 Ok(()) 150 } 151 152 /// Sync all data of completed write operations to the backing storage. Currently, the 153 /// implementation is equivalent to fsync. fdatasync(&self) -> AsyncResult<()>154 pub async fn fdatasync(&self) -> AsyncResult<()> { 155 // Currently io_uring does not implement fdatasync. Fall back to fsync. 156 // TODO(b/281609112): Implement real fdatasync with io_uring. 157 self.fsync().await 158 } 159 160 /// Yields the underlying IO source. into_source(self) -> F161 pub fn into_source(self) -> F { 162 self.source 163 } 164 165 /// Provides a mutable ref to the underlying IO source. as_source(&self) -> &F166 pub fn as_source(&self) -> &F { 167 &self.source 168 } 169 170 /// Provides a ref to the underlying IO source. as_source_mut(&mut self) -> &mut F171 pub fn as_source_mut(&mut self) -> &mut F { 172 &mut self.source 173 } 174 } 175 176 impl<F: AsRawDescriptor> Deref for UringSource<F> { 177 type Target = F; 178 deref(&self) -> &Self::Target179 fn deref(&self) -> &Self::Target { 180 &self.source 181 } 182 } 183 184 impl<F: AsRawDescriptor> DerefMut for UringSource<F> { deref_mut(&mut self) -> &mut Self::Target185 fn deref_mut(&mut self) -> &mut Self::Target { 186 &mut self.source 187 } 188 } 189 190 // NOTE: Prefer adding tests to io_source.rs if not backend specific. 191 #[cfg(test)] 192 mod tests { 193 use std::fs::File; 194 use std::future::Future; 195 use std::pin::Pin; 196 use std::task::Context; 197 use std::task::Poll; 198 use std::task::Waker; 199 200 use sync::Mutex; 201 202 use super::super::uring_executor::is_uring_stable; 203 use super::super::UringSource; 204 use super::*; 205 use crate::sys::linux::ExecutorKindSys; 206 use crate::Executor; 207 use crate::ExecutorTrait; 208 use crate::IoSource; 209 read_u64<T: AsRawDescriptor>(source: &UringSource<T>) -> u64210 async fn read_u64<T: AsRawDescriptor>(source: &UringSource<T>) -> u64 { 211 // Init a vec that translates to u64::max; 212 let u64_mem = vec![0xffu8; std::mem::size_of::<u64>()]; 213 let (ret, u64_mem) = source.read_to_vec(None, u64_mem).await.unwrap(); 214 assert_eq!(ret, std::mem::size_of::<u64>()); 215 let mut val = 0u64.to_ne_bytes(); 216 val.copy_from_slice(&u64_mem); 217 u64::from_ne_bytes(val) 218 } 219 220 #[test] event()221 fn event() { 222 if !is_uring_stable() { 223 return; 224 } 225 226 use base::Event; 227 use base::EventExt; 228 229 async fn write_event(ev: Event, wait: Event, ex: &Arc<RawExecutor<UringReactor>>) { 230 let wait = UringSource::new(wait, ex).unwrap(); 231 ev.write_count(55).unwrap(); 232 read_u64(&wait).await; 233 ev.write_count(66).unwrap(); 234 read_u64(&wait).await; 235 ev.write_count(77).unwrap(); 236 read_u64(&wait).await; 237 } 238 239 async fn read_events(ev: Event, signal: Event, ex: &Arc<RawExecutor<UringReactor>>) { 240 let source = UringSource::new(ev, ex).unwrap(); 241 assert_eq!(read_u64(&source).await, 55); 242 signal.signal().unwrap(); 243 assert_eq!(read_u64(&source).await, 66); 244 signal.signal().unwrap(); 245 assert_eq!(read_u64(&source).await, 77); 246 signal.signal().unwrap(); 247 } 248 249 let event = Event::new().unwrap(); 250 let signal_wait = Event::new().unwrap(); 251 let ex = RawExecutor::<UringReactor>::new().unwrap(); 252 let write_task = write_event( 253 event.try_clone().unwrap(), 254 signal_wait.try_clone().unwrap(), 255 &ex, 256 ); 257 let read_task = read_events(event, signal_wait, &ex); 258 ex.run_until(futures::future::join(read_task, write_task)) 259 .unwrap(); 260 } 261 262 #[test] pend_on_pipe()263 fn pend_on_pipe() { 264 if !is_uring_stable() { 265 return; 266 } 267 268 use std::io::Write; 269 270 use futures::future::Either; 271 272 async fn do_test(ex: &Arc<RawExecutor<UringReactor>>) { 273 let (read_source, mut w) = base::pipe().unwrap(); 274 let source = UringSource::new(read_source, ex).unwrap(); 275 let done = Box::pin(async { 5usize }); 276 let pending = Box::pin(read_u64(&source)); 277 match futures::future::select(pending, done).await { 278 Either::Right((5, pending)) => { 279 // Write to the pipe so that the kernel will release the memory associated with 280 // the uring read operation. 281 w.write_all(&[0]).expect("failed to write to pipe"); 282 ::std::mem::drop(pending); 283 } 284 _ => panic!("unexpected select result"), 285 }; 286 } 287 288 let ex = RawExecutor::<UringReactor>::new().unwrap(); 289 ex.run_until(do_test(&ex)).unwrap(); 290 } 291 292 #[test] range_error()293 fn range_error() { 294 if !is_uring_stable() { 295 return; 296 } 297 298 async fn go(ex: &Arc<RawExecutor<UringReactor>>) { 299 let f = File::open("/dev/zero").unwrap(); 300 let source = UringSource::new(f, ex).unwrap(); 301 let v = vec![0x55u8; 64]; 302 let vw = Arc::new(VecIoWrapper::from(v)); 303 let ret = source 304 .read_to_mem( 305 None, 306 Arc::<VecIoWrapper>::clone(&vw), 307 [MemRegion { 308 offset: 32, 309 len: 33, 310 }], 311 ) 312 .await; 313 assert!(ret.is_err()); 314 } 315 316 let ex = RawExecutor::<UringReactor>::new().unwrap(); 317 ex.run_until(go(&ex)).unwrap(); 318 } 319 320 #[test] wait_read()321 fn wait_read() { 322 if !is_uring_stable() { 323 return; 324 } 325 326 async fn go(ex: &Arc<RawExecutor<UringReactor>>) { 327 let f = File::open("/dev/zero").unwrap(); 328 let source = UringSource::new(f, ex).unwrap(); 329 source.wait_readable().await.unwrap(); 330 } 331 332 let ex = RawExecutor::<UringReactor>::new().unwrap(); 333 ex.run_until(go(&ex)).unwrap(); 334 } 335 336 struct State { 337 should_quit: bool, 338 waker: Option<Waker>, 339 } 340 341 impl State { wake(&mut self)342 fn wake(&mut self) { 343 self.should_quit = true; 344 let waker = self.waker.take(); 345 346 if let Some(waker) = waker { 347 waker.wake(); 348 } 349 } 350 } 351 352 struct Quit { 353 state: Arc<Mutex<State>>, 354 } 355 356 impl Future for Quit { 357 type Output = (); 358 poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()>359 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> { 360 let mut state = self.state.lock(); 361 if state.should_quit { 362 return Poll::Ready(()); 363 } 364 365 state.waker = Some(cx.waker().clone()); 366 Poll::Pending 367 } 368 } 369 370 #[cfg(any(target_os = "android", target_os = "linux"))] 371 #[test] await_uring_from_poll()372 fn await_uring_from_poll() { 373 if !is_uring_stable() { 374 return; 375 } 376 // Start a uring operation and then await the result from an FdExecutor. 377 async fn go(source: IoSource<File>) { 378 let v = vec![0xa4u8; 16]; 379 let (len, vec) = source.read_to_vec(None, v).await.unwrap(); 380 assert_eq!(len, 16); 381 assert!(vec.iter().all(|&b| b == 0)); 382 } 383 384 let state = Arc::new(Mutex::new(State { 385 should_quit: false, 386 waker: None, 387 })); 388 389 let uring_ex = Executor::with_executor_kind(ExecutorKindSys::Uring.into()).unwrap(); 390 let f = File::open("/dev/zero").unwrap(); 391 let source = uring_ex.async_from(f).unwrap(); 392 393 let quit = Quit { 394 state: state.clone(), 395 }; 396 let handle = std::thread::spawn(move || uring_ex.run_until(quit)); 397 398 let poll_ex = Executor::with_executor_kind(ExecutorKindSys::Fd.into()).unwrap(); 399 poll_ex.run_until(go(source)).unwrap(); 400 401 state.lock().wake(); 402 handle.join().unwrap().unwrap(); 403 } 404 405 #[cfg(any(target_os = "android", target_os = "linux"))] 406 #[test] await_poll_from_uring()407 fn await_poll_from_uring() { 408 if !is_uring_stable() { 409 return; 410 } 411 // Start a poll operation and then await the result 412 async fn go(source: IoSource<File>) { 413 let v = vec![0x2cu8; 16]; 414 let (len, vec) = source.read_to_vec(None, v).await.unwrap(); 415 assert_eq!(len, 16); 416 assert!(vec.iter().all(|&b| b == 0)); 417 } 418 419 let state = Arc::new(Mutex::new(State { 420 should_quit: false, 421 waker: None, 422 })); 423 424 let poll_ex = Executor::with_executor_kind(ExecutorKindSys::Fd.into()).unwrap(); 425 let f = File::open("/dev/zero").unwrap(); 426 let source = poll_ex.async_from(f).unwrap(); 427 428 let quit = Quit { 429 state: state.clone(), 430 }; 431 let handle = std::thread::spawn(move || poll_ex.run_until(quit)); 432 433 let uring_ex = Executor::with_executor_kind(ExecutorKindSys::Uring.into()).unwrap(); 434 uring_ex.run_until(go(source)).unwrap(); 435 436 state.lock().wake(); 437 handle.join().unwrap().unwrap(); 438 } 439 } 440