1 // Copyright 2023 The ChromiumOS Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 use std::sync::Arc; 6 7 use base::AsRawDescriptor; 8 9 #[cfg(any(target_os = "android", target_os = "linux"))] 10 use crate::sys::linux::PollSource; 11 #[cfg(any(target_os = "android", target_os = "linux"))] 12 use crate::sys::linux::UringSource; 13 #[cfg(feature = "tokio")] 14 use crate::sys::platform::tokio_source::TokioSource; 15 #[cfg(windows)] 16 use crate::sys::windows::HandleSource; 17 #[cfg(windows)] 18 use crate::sys::windows::OverlappedSource; 19 use crate::AsyncResult; 20 use crate::BackingMemory; 21 use crate::MemRegion; 22 23 /// Associates an IO object `F` with cros_async's runtime and exposes an API to perform async IO on 24 /// that object's descriptor. 25 pub enum IoSource<F: base::AsRawDescriptor> { 26 #[cfg(any(target_os = "android", target_os = "linux"))] 27 Uring(UringSource<F>), 28 #[cfg(any(target_os = "android", target_os = "linux"))] 29 Epoll(PollSource<F>), 30 #[cfg(windows)] 31 Handle(HandleSource<F>), 32 #[cfg(windows)] 33 Overlapped(OverlappedSource<F>), 34 #[cfg(feature = "tokio")] 35 Tokio(TokioSource<F>), 36 } 37 38 static_assertions::assert_impl_all!(IoSource<std::fs::File>: Send, Sync); 39 40 /// Invoke a method on the underlying source type and await the result. 41 /// 42 /// `await_on_inner(io_source, method, ...)` => `inner_source.method(...).await` 43 macro_rules! await_on_inner { 44 ($x:ident, $method:ident $(, $args:expr)*) => { 45 match $x { 46 #[cfg(any(target_os = "android", target_os = "linux"))] 47 IoSource::Uring(x) => UringSource::$method(x, $($args),*).await, 48 #[cfg(any(target_os = "android", target_os = "linux"))] 49 IoSource::Epoll(x) => PollSource::$method(x, $($args),*).await, 50 #[cfg(windows)] 51 IoSource::Handle(x) => HandleSource::$method(x, $($args),*).await, 52 #[cfg(windows)] 53 IoSource::Overlapped(x) => OverlappedSource::$method(x, $($args),*).await, 54 #[cfg(feature = "tokio")] 55 IoSource::Tokio(x) => TokioSource::$method(x, $($args),*).await, 56 } 57 }; 58 } 59 60 /// Invoke a method on the underlying source type. 61 /// 62 /// `on_inner(io_source, method, ...)` => `inner_source.method(...)` 63 macro_rules! on_inner { 64 ($x:ident, $method:ident $(, $args:expr)*) => { 65 match $x { 66 #[cfg(any(target_os = "android", target_os = "linux"))] 67 IoSource::Uring(x) => UringSource::$method(x, $($args),*), 68 #[cfg(any(target_os = "android", target_os = "linux"))] 69 IoSource::Epoll(x) => PollSource::$method(x, $($args),*), 70 #[cfg(windows)] 71 IoSource::Handle(x) => HandleSource::$method(x, $($args),*), 72 #[cfg(windows)] 73 IoSource::Overlapped(x) => OverlappedSource::$method(x, $($args),*), 74 #[cfg(feature = "tokio")] 75 IoSource::Tokio(x) => TokioSource::$method(x, $($args),*), 76 } 77 }; 78 } 79 80 impl<F: AsRawDescriptor> IoSource<F> { 81 /// Reads at `file_offset` and fills the given `vec`. read_to_vec( &self, file_offset: Option<u64>, vec: Vec<u8>, ) -> AsyncResult<(usize, Vec<u8>)>82 pub async fn read_to_vec( 83 &self, 84 file_offset: Option<u64>, 85 vec: Vec<u8>, 86 ) -> AsyncResult<(usize, Vec<u8>)> { 87 await_on_inner!(self, read_to_vec, file_offset, vec) 88 } 89 90 /// Reads to the given `mem` at the given offsets from the file starting at `file_offset`. read_to_mem( &self, file_offset: Option<u64>, mem: Arc<dyn BackingMemory + Send + Sync>, mem_offsets: impl IntoIterator<Item = MemRegion>, ) -> AsyncResult<usize>91 pub async fn read_to_mem( 92 &self, 93 file_offset: Option<u64>, 94 mem: Arc<dyn BackingMemory + Send + Sync>, 95 mem_offsets: impl IntoIterator<Item = MemRegion>, 96 ) -> AsyncResult<usize> { 97 await_on_inner!(self, read_to_mem, file_offset, mem, mem_offsets) 98 } 99 100 /// Waits for the object to be readable. wait_readable(&self) -> AsyncResult<()>101 pub async fn wait_readable(&self) -> AsyncResult<()> { 102 await_on_inner!(self, wait_readable) 103 } 104 105 /// Writes from the given `vec` to the file starting at `file_offset`. write_from_vec( &self, file_offset: Option<u64>, vec: Vec<u8>, ) -> AsyncResult<(usize, Vec<u8>)>106 pub async fn write_from_vec( 107 &self, 108 file_offset: Option<u64>, 109 vec: Vec<u8>, 110 ) -> AsyncResult<(usize, Vec<u8>)> { 111 await_on_inner!(self, write_from_vec, file_offset, vec) 112 } 113 114 /// Writes from the given `mem` at the given offsets to the file starting at `file_offset`. write_from_mem( &self, file_offset: Option<u64>, mem: Arc<dyn BackingMemory + Send + Sync>, mem_offsets: impl IntoIterator<Item = MemRegion>, ) -> AsyncResult<usize>115 pub async fn write_from_mem( 116 &self, 117 file_offset: Option<u64>, 118 mem: Arc<dyn BackingMemory + Send + Sync>, 119 mem_offsets: impl IntoIterator<Item = MemRegion>, 120 ) -> AsyncResult<usize> { 121 await_on_inner!(self, write_from_mem, file_offset, mem, mem_offsets) 122 } 123 124 /// Deallocates the given range of a file. punch_hole(&self, file_offset: u64, len: u64) -> AsyncResult<()>125 pub async fn punch_hole(&self, file_offset: u64, len: u64) -> AsyncResult<()> { 126 await_on_inner!(self, punch_hole, file_offset, len) 127 } 128 129 /// Fills the given range with zeroes. write_zeroes_at(&self, file_offset: u64, len: u64) -> AsyncResult<()>130 pub async fn write_zeroes_at(&self, file_offset: u64, len: u64) -> AsyncResult<()> { 131 await_on_inner!(self, write_zeroes_at, file_offset, len) 132 } 133 134 /// Sync all completed write operations to the backing storage. fsync(&self) -> AsyncResult<()>135 pub async fn fsync(&self) -> AsyncResult<()> { 136 await_on_inner!(self, fsync) 137 } 138 139 /// Sync all data of completed write operations to the backing storage, avoiding updating extra 140 /// metadata. Note that an implementation may simply implement fsync for fdatasync. fdatasync(&self) -> AsyncResult<()>141 pub async fn fdatasync(&self) -> AsyncResult<()> { 142 await_on_inner!(self, fdatasync) 143 } 144 145 /// Yields the underlying IO source. into_source(self) -> F146 pub fn into_source(self) -> F { 147 on_inner!(self, into_source) 148 } 149 150 /// Provides a ref to the underlying IO source. as_source(&self) -> &F151 pub fn as_source(&self) -> &F { 152 on_inner!(self, as_source) 153 } 154 155 /// Provides a mutable ref to the underlying IO source. as_source_mut(&mut self) -> &mut F156 pub fn as_source_mut(&mut self) -> &mut F { 157 on_inner!(self, as_source_mut) 158 } 159 160 /// Waits on a waitable handle. 161 /// 162 /// Needed for Windows currently, and subject to a potential future upstream. 163 #[cfg(windows)] wait_for_handle(&self) -> AsyncResult<()>164 pub async fn wait_for_handle(&self) -> AsyncResult<()> { 165 await_on_inner!(self, wait_for_handle) 166 } 167 } 168 169 #[cfg(test)] 170 mod tests { 171 use std::fs::File; 172 use std::io::Read; 173 use std::io::Seek; 174 use std::io::SeekFrom; 175 use std::io::Write; 176 use std::sync::Arc; 177 178 use tempfile::tempfile; 179 180 use super::*; 181 use crate::mem::VecIoWrapper; 182 #[cfg(any(target_os = "android", target_os = "linux"))] 183 use crate::sys::linux::uring_executor::is_uring_stable; 184 use crate::sys::ExecutorKindSys; 185 use crate::Executor; 186 use crate::ExecutorKind; 187 use crate::MemRegion; 188 189 #[cfg(any(target_os = "android", target_os = "linux"))] all_kinds() -> Vec<ExecutorKind>190 fn all_kinds() -> Vec<ExecutorKind> { 191 let mut kinds = vec![ExecutorKindSys::Fd.into()]; 192 if is_uring_stable() { 193 kinds.push(ExecutorKindSys::Uring.into()); 194 } 195 kinds 196 } 197 #[cfg(windows)] all_kinds() -> Vec<ExecutorKind>198 fn all_kinds() -> Vec<ExecutorKind> { 199 // TODO: Test OverlappedSource. It requires files to be opened specially, so this test 200 // fixture needs to be refactored first. 201 vec![ExecutorKindSys::Handle.into()] 202 } 203 tmpfile_with_contents(bytes: &[u8]) -> File204 fn tmpfile_with_contents(bytes: &[u8]) -> File { 205 let mut f = tempfile().unwrap(); 206 f.write_all(bytes).unwrap(); 207 f.flush().unwrap(); 208 f.seek(SeekFrom::Start(0)).unwrap(); 209 f 210 } 211 212 #[test] readvec()213 fn readvec() { 214 for kind in all_kinds() { 215 async fn go<F: AsRawDescriptor>(async_source: IoSource<F>) { 216 let v = vec![0x55u8; 32]; 217 let v_ptr = v.as_ptr(); 218 let (n, v) = async_source.read_to_vec(None, v).await.unwrap(); 219 assert_eq!(v_ptr, v.as_ptr()); 220 assert_eq!(n, 4); 221 assert_eq!(&v[..4], "data".as_bytes()); 222 } 223 224 let f = tmpfile_with_contents("data".as_bytes()); 225 let ex = Executor::with_executor_kind(kind).unwrap(); 226 let source = ex.async_from(f).unwrap(); 227 ex.run_until(go(source)).unwrap(); 228 } 229 } 230 231 #[test] writevec()232 fn writevec() { 233 for kind in all_kinds() { 234 async fn go<F: AsRawDescriptor>(async_source: IoSource<F>) { 235 let v = "data".as_bytes().to_vec(); 236 let v_ptr = v.as_ptr(); 237 let (n, v) = async_source.write_from_vec(None, v).await.unwrap(); 238 assert_eq!(n, 4); 239 assert_eq!(v_ptr, v.as_ptr()); 240 } 241 242 let mut f = tmpfile_with_contents(&[]); 243 let ex = Executor::with_executor_kind(kind).unwrap(); 244 let source = ex.async_from(f.try_clone().unwrap()).unwrap(); 245 ex.run_until(go(source)).unwrap(); 246 247 f.rewind().unwrap(); 248 assert_eq!(std::io::read_to_string(f).unwrap(), "data"); 249 } 250 } 251 252 #[test] readmem()253 fn readmem() { 254 for kind in all_kinds() { 255 async fn go<F: AsRawDescriptor>(async_source: IoSource<F>) { 256 let mem = Arc::new(VecIoWrapper::from(vec![b' '; 10])); 257 let n = async_source 258 .read_to_mem( 259 None, 260 Arc::<VecIoWrapper>::clone(&mem), 261 [ 262 MemRegion { offset: 0, len: 2 }, 263 MemRegion { offset: 4, len: 1 }, 264 ], 265 ) 266 .await 267 .unwrap(); 268 assert_eq!(n, 3); 269 let vec: Vec<u8> = match Arc::try_unwrap(mem) { 270 Ok(v) => v.into(), 271 Err(_) => panic!("Too many vec refs"), 272 }; 273 assert_eq!(std::str::from_utf8(&vec).unwrap(), "da t "); 274 } 275 276 let f = tmpfile_with_contents("data".as_bytes()); 277 let ex = Executor::with_executor_kind(kind).unwrap(); 278 let source = ex.async_from(f).unwrap(); 279 ex.run_until(go(source)).unwrap(); 280 } 281 } 282 283 #[test] writemem()284 fn writemem() { 285 for kind in all_kinds() { 286 async fn go<F: AsRawDescriptor>(async_source: IoSource<F>) { 287 let mem = Arc::new(VecIoWrapper::from("data".as_bytes().to_vec())); 288 let ret = async_source 289 .write_from_mem( 290 None, 291 Arc::<VecIoWrapper>::clone(&mem), 292 [ 293 MemRegion { offset: 0, len: 1 }, 294 MemRegion { offset: 2, len: 2 }, 295 ], 296 ) 297 .await 298 .unwrap(); 299 assert_eq!(ret, 3); 300 } 301 302 let mut f = tmpfile_with_contents(&[]); 303 let ex = Executor::with_executor_kind(kind).unwrap(); 304 let source = ex.async_from(f.try_clone().unwrap()).unwrap(); 305 ex.run_until(go(source)).unwrap(); 306 307 f.rewind().unwrap(); 308 assert_eq!(std::io::read_to_string(f).unwrap(), "dta"); 309 } 310 } 311 312 #[test] fsync()313 fn fsync() { 314 for kind in all_kinds() { 315 async fn go<F: AsRawDescriptor>(source: IoSource<F>) { 316 let v = vec![0x55u8; 32]; 317 let v_ptr = v.as_ptr(); 318 let ret = source.write_from_vec(None, v).await.unwrap(); 319 assert_eq!(ret.0, 32); 320 let ret_v = ret.1; 321 assert_eq!(v_ptr, ret_v.as_ptr()); 322 source.fsync().await.unwrap(); 323 } 324 325 let f = tempfile::tempfile().unwrap(); 326 let ex = Executor::with_executor_kind(kind).unwrap(); 327 let source = ex.async_from(f).unwrap(); 328 329 ex.run_until(go(source)).unwrap(); 330 } 331 } 332 333 #[test] readmulti()334 fn readmulti() { 335 for kind in all_kinds() { 336 async fn go<F: AsRawDescriptor>(source: IoSource<F>) { 337 let v = vec![0x55u8; 32]; 338 let v2 = vec![0x55u8; 32]; 339 let (ret, ret2) = futures::future::join( 340 source.read_to_vec(None, v), 341 source.read_to_vec(Some(32), v2), 342 ) 343 .await; 344 345 let (count, v) = ret.unwrap(); 346 let (count2, v2) = ret2.unwrap(); 347 348 assert!(v.iter().take(count).all(|&b| b == 0xAA)); 349 assert!(v2.iter().take(count2).all(|&b| b == 0xBB)); 350 } 351 352 let mut f = tempfile::tempfile().unwrap(); 353 f.write_all(&[0xAA; 32]).unwrap(); 354 f.write_all(&[0xBB; 32]).unwrap(); 355 f.rewind().unwrap(); 356 357 let ex = Executor::with_executor_kind(kind).unwrap(); 358 let source = ex.async_from(f).unwrap(); 359 360 ex.run_until(go(source)).unwrap(); 361 } 362 } 363 364 #[test] writemulti()365 fn writemulti() { 366 for kind in all_kinds() { 367 async fn go<F: AsRawDescriptor>(source: IoSource<F>) { 368 let v = vec![0x55u8; 32]; 369 let v2 = vec![0x55u8; 32]; 370 let (r, r2) = futures::future::join( 371 source.write_from_vec(None, v), 372 source.write_from_vec(Some(32), v2), 373 ) 374 .await; 375 assert_eq!(32, r.unwrap().0); 376 assert_eq!(32, r2.unwrap().0); 377 } 378 379 let f = tempfile::tempfile().unwrap(); 380 let ex = Executor::with_executor_kind(kind).unwrap(); 381 let source = ex.async_from(f).unwrap(); 382 383 ex.run_until(go(source)).unwrap(); 384 } 385 } 386 387 #[test] read_current_file_position()388 fn read_current_file_position() { 389 for kind in all_kinds() { 390 async fn go<F: AsRawDescriptor>(source: IoSource<F>) { 391 let (count1, verify1) = source.read_to_vec(None, vec![0u8; 32]).await.unwrap(); 392 let (count2, verify2) = source.read_to_vec(None, vec![0u8; 32]).await.unwrap(); 393 assert_eq!(count1, 32); 394 assert_eq!(count2, 32); 395 assert_eq!(verify1, [0x55u8; 32]); 396 assert_eq!(verify2, [0xffu8; 32]); 397 } 398 399 let mut f = tempfile::tempfile().unwrap(); 400 f.write_all(&[0x55u8; 32]).unwrap(); 401 f.write_all(&[0xffu8; 32]).unwrap(); 402 f.rewind().unwrap(); 403 404 let ex = Executor::with_executor_kind(kind).unwrap(); 405 let source = ex.async_from(f).unwrap(); 406 407 ex.run_until(go(source)).unwrap(); 408 } 409 } 410 411 #[test] write_current_file_position()412 fn write_current_file_position() { 413 for kind in all_kinds() { 414 async fn go<F: AsRawDescriptor>(source: IoSource<F>) { 415 let count1 = source 416 .write_from_vec(None, vec![0x55u8; 32]) 417 .await 418 .unwrap() 419 .0; 420 assert_eq!(count1, 32); 421 let count2 = source 422 .write_from_vec(None, vec![0xffu8; 32]) 423 .await 424 .unwrap() 425 .0; 426 assert_eq!(count2, 32); 427 } 428 429 let mut f = tempfile::tempfile().unwrap(); 430 let ex = Executor::with_executor_kind(kind).unwrap(); 431 let source = ex.async_from(f.try_clone().unwrap()).unwrap(); 432 433 ex.run_until(go(source)).unwrap(); 434 435 f.rewind().unwrap(); 436 let mut verify1 = [0u8; 32]; 437 let mut verify2 = [0u8; 32]; 438 f.read_exact(&mut verify1).unwrap(); 439 f.read_exact(&mut verify2).unwrap(); 440 assert_eq!(verify1, [0x55u8; 32]); 441 assert_eq!(verify2, [0xffu8; 32]); 442 } 443 } 444 } 445