xref: /aosp_15_r20/external/crosvm/cros_async/src/sys/windows/handle_source.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1 // Copyright 2020 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::fs::File;
6 use std::io;
7 use std::io::Read;
8 use std::io::Seek;
9 use std::io::SeekFrom;
10 use std::io::Write;
11 use std::mem::ManuallyDrop;
12 use std::ptr::null_mut;
13 use std::sync::Arc;
14 use std::time::Duration;
15 
16 use base::error;
17 use base::warn;
18 use base::AsRawDescriptor;
19 use base::Descriptor;
20 use base::Error as SysUtilError;
21 use base::FileReadWriteAtVolatile;
22 use base::FileReadWriteVolatile;
23 use base::FromRawDescriptor;
24 use base::PunchHole;
25 use base::VolatileSlice;
26 use base::WriteZeroesAt;
27 use smallvec::SmallVec;
28 use sync::Mutex;
29 use thiserror::Error as ThisError;
30 use winapi::um::ioapiset::CancelIoEx;
31 
32 use crate::mem::BackingMemory;
33 use crate::mem::MemRegion;
34 use crate::AsyncError;
35 use crate::AsyncResult;
36 use crate::CancellableBlockingPool;
37 
38 #[derive(ThisError, Debug)]
39 pub enum Error {
40     #[error("An error occurred trying to seek: {0}.")]
41     IoSeekError(io::Error),
42     #[error("An error occurred trying to read: {0}.")]
43     IoReadError(io::Error),
44     #[error("An error occurred trying to write: {0}.")]
45     IoWriteError(io::Error),
46     #[error("An error occurred trying to flush: {0}.")]
47     IoFlushError(io::Error),
48     #[error("An error occurred trying to punch hole: {0}.")]
49     IoPunchHoleError(io::Error),
50     #[error("An error occurred trying to write zeroes: {0}.")]
51     IoWriteZeroesError(io::Error),
52     #[error("An error occurred trying to duplicate source handles: {0}.")]
53     HandleDuplicationFailed(io::Error),
54     #[error("An error occurred trying to wait on source handles: {0}.")]
55     HandleWaitFailed(io::Error),
56     #[error("An error occurred trying to get a VolatileSlice into BackingMemory: {0}.")]
57     BackingMemoryVolatileSliceFetchFailed(crate::mem::Error),
58     #[error("HandleSource is gone, so no handles are available to fulfill the IO request.")]
59     NoHandleSource,
60     #[error("Operation on HandleSource is cancelled.")]
61     OperationCancelled,
62     #[error("Operation on HandleSource was aborted (unexpected).")]
63     OperationAborted,
64 }
65 
66 impl From<Error> for io::Error {
from(e: Error) -> Self67     fn from(e: Error) -> Self {
68         use Error::*;
69         match e {
70             IoSeekError(e) => e,
71             IoReadError(e) => e,
72             IoWriteError(e) => e,
73             IoFlushError(e) => e,
74             IoPunchHoleError(e) => e,
75             IoWriteZeroesError(e) => e,
76             HandleDuplicationFailed(e) => e,
77             HandleWaitFailed(e) => e,
78             BackingMemoryVolatileSliceFetchFailed(e) => io::Error::new(io::ErrorKind::Other, e),
79             NoHandleSource => io::Error::new(io::ErrorKind::Other, NoHandleSource),
80             OperationCancelled => io::Error::new(io::ErrorKind::Interrupted, OperationCancelled),
81             OperationAborted => io::Error::new(io::ErrorKind::Interrupted, OperationAborted),
82         }
83     }
84 }
85 
86 impl From<Error> for AsyncError {
from(e: Error) -> AsyncError87     fn from(e: Error) -> AsyncError {
88         AsyncError::SysVariants(e.into())
89     }
90 }
91 
92 pub type Result<T> = std::result::Result<T, Error>;
93 
94 /// Used to shutdown IO running on a CancellableBlockingPool.
95 pub struct HandleWrapper {
96     handle: Descriptor,
97 }
98 
99 impl HandleWrapper {
new(handle: Descriptor) -> Arc<Mutex<HandleWrapper>>100     pub fn new(handle: Descriptor) -> Arc<Mutex<HandleWrapper>> {
101         Arc::new(Mutex::new(Self { handle }))
102     }
103 
cancel_sync_io<T>(&mut self, ret: T) -> T104     pub fn cancel_sync_io<T>(&mut self, ret: T) -> T {
105         // There isn't much we can do if cancel fails.
106         // SAFETY: trivially safe
107         if unsafe { CancelIoEx(self.handle.as_raw_descriptor(), null_mut()) } == 0 {
108             warn!(
109                 "Cancel IO for handle:{:?} failed with {}",
110                 self.handle.as_raw_descriptor(),
111                 SysUtilError::last()
112             );
113         }
114         ret
115     }
116 }
117 
118 /// Async IO source for Windows, such as a file.
119 pub struct HandleSource<F: AsRawDescriptor> {
120     source: F,
121     source_descriptor: Descriptor,
122     blocking_pool: CancellableBlockingPool,
123 }
124 
125 impl<F: AsRawDescriptor> HandleSource<F> {
126     /// Create a new `HandleSource` from the given IO source.
127     ///
128     /// Each HandleSource uses its own thread pool, with one thread per source supplied. Since these
129     /// threads are generally idle because they're waiting on blocking IO, so the cost is minimal.
130     /// Long term, we may migrate away from this approach toward IOCP or overlapped IO.
131     ///
132     /// WARNING: `source` MUST be a unique file object (e.g. separate handles
133     /// each created by CreateFile), and point at the same file on disk. This is because IO
134     /// operations on the HandleSource are randomly distributed to each source.
135     ///
136     /// # Safety
137     /// The caller must guarantee that `F`'s handle is compatible with the underlying functions
138     /// exposed on `HandleSource`. The behavior when calling unsupported functions is not defined
139     /// by this struct. Note that most winapis will fail with reasonable errors.
new(source: F) -> Result<Self>140     pub fn new(source: F) -> Result<Self> {
141         let source_descriptor = Descriptor(source.as_raw_descriptor());
142 
143         Ok(Self {
144             source,
145             source_descriptor,
146             blocking_pool: CancellableBlockingPool::new(
147                 // WARNING: this is a safety requirement! Threads are 1:1 with sources.
148                 1,
149                 Duration::from_secs(10),
150             ),
151         })
152     }
153 
154     #[inline]
get_slices( mem: &Arc<dyn BackingMemory + Send + Sync>, mem_offsets: Vec<MemRegion>, ) -> Result<SmallVec<[VolatileSlice<'_>; 16]>>155     fn get_slices(
156         mem: &Arc<dyn BackingMemory + Send + Sync>,
157         mem_offsets: Vec<MemRegion>,
158     ) -> Result<SmallVec<[VolatileSlice<'_>; 16]>> {
159         mem_offsets
160             .into_iter()
161             .map(|region| {
162                 mem.get_volatile_slice(region)
163                     .map_err(Error::BackingMemoryVolatileSliceFetchFailed)
164             })
165             .collect::<Result<SmallVec<[VolatileSlice; 16]>>>()
166     }
167 }
168 
get_thread_file(descriptor: Descriptor) -> ManuallyDrop<File>169 fn get_thread_file(descriptor: Descriptor) -> ManuallyDrop<File> {
170     // SAFETY: trivially safe
171     // Safe because all callers must exit *before* these handles will be closed (guaranteed by
172     // HandleSource's Drop impl.).
173     unsafe { ManuallyDrop::new(File::from_raw_descriptor(descriptor.0)) }
174 }
175 
176 impl<F: AsRawDescriptor> HandleSource<F> {
177     /// Reads from the iosource at `file_offset` and fill the given `vec`.
read_to_vec( &self, file_offset: Option<u64>, mut vec: Vec<u8>, ) -> AsyncResult<(usize, Vec<u8>)>178     pub async fn read_to_vec(
179         &self,
180         file_offset: Option<u64>,
181         mut vec: Vec<u8>,
182     ) -> AsyncResult<(usize, Vec<u8>)> {
183         let handles = HandleWrapper::new(self.source_descriptor);
184         let descriptors = self.source_descriptor;
185 
186         Ok(self
187             .blocking_pool
188             .spawn(
189                 move || {
190                     let mut file = get_thread_file(descriptors);
191                     if let Some(file_offset) = file_offset {
192                         file.seek(SeekFrom::Start(file_offset))
193                             .map_err(Error::IoSeekError)?;
194                     }
195                     Ok((
196                         file.read(vec.as_mut_slice()).map_err(Error::IoReadError)?,
197                         vec,
198                     ))
199                 },
200                 move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)),
201             )
202             .await?)
203     }
204 
205     /// Reads to the given `mem` at the given offsets from the file starting at `file_offset`.
read_to_mem( &self, file_offset: Option<u64>, mem: Arc<dyn BackingMemory + Send + Sync>, mem_offsets: impl IntoIterator<Item = MemRegion>, ) -> AsyncResult<usize>206     pub async fn read_to_mem(
207         &self,
208         file_offset: Option<u64>,
209         mem: Arc<dyn BackingMemory + Send + Sync>,
210         mem_offsets: impl IntoIterator<Item = MemRegion>,
211     ) -> AsyncResult<usize> {
212         let mem_offsets = mem_offsets.into_iter().collect();
213         let handles = HandleWrapper::new(self.source_descriptor);
214         let descriptors = self.source_descriptor;
215 
216         Ok(self
217             .blocking_pool
218             .spawn(
219                 move || {
220                     let mut file = get_thread_file(descriptors);
221                     let memory_slices = Self::get_slices(&mem, mem_offsets)?;
222 
223                     match file_offset {
224                         Some(file_offset) => file
225                             .read_vectored_at_volatile(memory_slices.as_slice(), file_offset)
226                             .map_err(Error::IoReadError),
227                         None => file
228                             .read_vectored_volatile(memory_slices.as_slice())
229                             .map_err(Error::IoReadError),
230                     }
231                 },
232                 move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)),
233             )
234             .await?)
235     }
236 
237     /// Wait for the handle of `self` to be readable.
wait_readable(&self) -> AsyncResult<()>238     pub async fn wait_readable(&self) -> AsyncResult<()> {
239         unimplemented!()
240     }
241 
242     /// Reads a single u64 from the current offset.
read_u64(&self) -> AsyncResult<u64>243     pub async fn read_u64(&self) -> AsyncResult<u64> {
244         unimplemented!()
245     }
246 
247     /// Writes from the given `vec` to the file starting at `file_offset`.
write_from_vec( &self, file_offset: Option<u64>, vec: Vec<u8>, ) -> AsyncResult<(usize, Vec<u8>)>248     pub async fn write_from_vec(
249         &self,
250         file_offset: Option<u64>,
251         vec: Vec<u8>,
252     ) -> AsyncResult<(usize, Vec<u8>)> {
253         let handles = HandleWrapper::new(self.source_descriptor);
254         let descriptors = self.source_descriptor;
255 
256         Ok(self
257             .blocking_pool
258             .spawn(
259                 move || {
260                     let mut file = get_thread_file(descriptors);
261                     if let Some(file_offset) = file_offset {
262                         file.seek(SeekFrom::Start(file_offset))
263                             .map_err(Error::IoSeekError)?;
264                     }
265                     Ok((
266                         file.write(vec.as_slice()).map_err(Error::IoWriteError)?,
267                         vec,
268                     ))
269                 },
270                 move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)),
271             )
272             .await?)
273     }
274 
275     /// Writes from the given `mem` from the given offsets to the file starting at `file_offset`.
write_from_mem( &self, file_offset: Option<u64>, mem: Arc<dyn BackingMemory + Send + Sync>, mem_offsets: impl IntoIterator<Item = MemRegion>, ) -> AsyncResult<usize>276     pub async fn write_from_mem(
277         &self,
278         file_offset: Option<u64>,
279         mem: Arc<dyn BackingMemory + Send + Sync>,
280         mem_offsets: impl IntoIterator<Item = MemRegion>,
281     ) -> AsyncResult<usize> {
282         let mem_offsets = mem_offsets.into_iter().collect();
283         let handles = HandleWrapper::new(self.source_descriptor);
284         let descriptors = self.source_descriptor;
285 
286         Ok(self
287             .blocking_pool
288             .spawn(
289                 move || {
290                     let mut file = get_thread_file(descriptors);
291                     let memory_slices = Self::get_slices(&mem, mem_offsets)?;
292 
293                     match file_offset {
294                         Some(file_offset) => file
295                             .write_vectored_at_volatile(memory_slices.as_slice(), file_offset)
296                             .map_err(Error::IoWriteError),
297                         None => file
298                             .write_vectored_volatile(memory_slices.as_slice())
299                             .map_err(Error::IoWriteError),
300                     }
301                 },
302                 move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)),
303             )
304             .await?)
305     }
306 
307     /// Deallocates the given range of a file.
punch_hole(&self, file_offset: u64, len: u64) -> AsyncResult<()>308     pub async fn punch_hole(&self, file_offset: u64, len: u64) -> AsyncResult<()> {
309         let handles = HandleWrapper::new(self.source_descriptor);
310         let descriptors = self.source_descriptor;
311         Ok(self
312             .blocking_pool
313             .spawn(
314                 move || {
315                     let file = get_thread_file(descriptors);
316                     file.punch_hole(file_offset, len)
317                         .map_err(Error::IoPunchHoleError)?;
318                     Ok(())
319                 },
320                 move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)),
321             )
322             .await?)
323     }
324 
325     /// Fills the given range with zeroes.
write_zeroes_at(&self, file_offset: u64, len: u64) -> AsyncResult<()>326     pub async fn write_zeroes_at(&self, file_offset: u64, len: u64) -> AsyncResult<()> {
327         let handles = HandleWrapper::new(self.source_descriptor);
328         let descriptors = self.source_descriptor;
329         Ok(self
330             .blocking_pool
331             .spawn(
332                 move || {
333                     let file = get_thread_file(descriptors);
334                     // ZeroRange calls `punch_hole` which doesn't extend the File size if it needs
335                     // to. Will fix if it becomes a problem.
336                     file.write_zeroes_at(file_offset, len as usize)
337                         .map_err(Error::IoWriteZeroesError)?;
338                     Ok(())
339                 },
340                 move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)),
341             )
342             .await?)
343     }
344 
345     /// Sync all completed write operations to the backing storage.
fsync(&self) -> AsyncResult<()>346     pub async fn fsync(&self) -> AsyncResult<()> {
347         let handles = HandleWrapper::new(self.source_descriptor);
348         let descriptors = self.source_descriptor;
349 
350         Ok(self
351             .blocking_pool
352             .spawn(
353                 move || {
354                     let mut file = get_thread_file(descriptors);
355                     file.flush().map_err(Error::IoFlushError)
356                 },
357                 move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)),
358             )
359             .await?)
360     }
361 
362     /// Sync all data of completed write operations to the backing storage. Currently, the
363     /// implementation is equivalent to fsync.
fdatasync(&self) -> AsyncResult<()>364     pub async fn fdatasync(&self) -> AsyncResult<()> {
365         // TODO(b/282003931): Fall back to regular fsync.
366         self.fsync().await
367     }
368 
369     /// Yields the underlying IO source.
into_source(self) -> F370     pub fn into_source(self) -> F {
371         self.source
372     }
373 
374     /// Provides a mutable ref to the underlying IO source.
as_source_mut(&mut self) -> &mut F375     pub fn as_source_mut(&mut self) -> &mut F {
376         &mut self.source
377     }
378 
379     /// Provides a ref to the underlying IO source.
380     ///
381     /// If sources are not interchangeable, behavior is undefined.
as_source(&self) -> &F382     pub fn as_source(&self) -> &F {
383         &self.source
384     }
385 
386     /// If sources are not interchangeable, behavior is undefined.
wait_for_handle(&self) -> AsyncResult<()>387     pub async fn wait_for_handle(&self) -> AsyncResult<()> {
388         base::sys::windows::async_wait_for_single_object(&self.source)
389             .await
390             .map_err(Error::HandleWaitFailed)?;
391         Ok(())
392     }
393 }
394 
395 // NOTE: Prefer adding tests to io_source.rs if not backend specific.
396 #[cfg(test)]
397 mod tests {
398     use std::fs;
399 
400     use tempfile::NamedTempFile;
401 
402     use super::super::HandleReactor;
403     use super::*;
404     use crate::common_executor::RawExecutor;
405     use crate::ExecutorTrait;
406 
407     #[cfg_attr(all(target_os = "windows", target_env = "gnu"), ignore)]
408     #[test]
test_punch_holes()409     fn test_punch_holes() {
410         let mut temp_file = NamedTempFile::new().unwrap();
411         temp_file.write_all("abcdefghijk".as_bytes()).unwrap();
412         temp_file.flush().unwrap();
413         temp_file.seek(SeekFrom::Start(0)).unwrap();
414 
415         async fn punch_hole(handle_src: &HandleSource<File>) {
416             let offset = 1;
417             let len = 3;
418             handle_src.punch_hole(offset, len).await.unwrap();
419         }
420 
421         let ex = RawExecutor::<HandleReactor>::new().unwrap();
422         let f = fs::OpenOptions::new()
423             .write(true)
424             .open(temp_file.path())
425             .unwrap();
426         let handle_src = HandleSource::new(f).unwrap();
427         ex.run_until(punch_hole(&handle_src)).unwrap();
428 
429         let mut buf = vec![0; 11];
430         temp_file.read_exact(&mut buf).unwrap();
431         assert_eq!(
432             std::str::from_utf8(buf.as_slice()).unwrap(),
433             "a\0\0\0efghijk"
434         );
435     }
436 
437     /// Test should fail because punch hole should not be allowed to allocate more memory
438     #[cfg_attr(all(target_os = "windows", target_env = "gnu"), ignore)]
439     #[test]
test_punch_holes_fail_out_of_bounds()440     fn test_punch_holes_fail_out_of_bounds() {
441         let mut temp_file = NamedTempFile::new().unwrap();
442         temp_file.write_all("abcdefghijk".as_bytes()).unwrap();
443         temp_file.flush().unwrap();
444         temp_file.seek(SeekFrom::Start(0)).unwrap();
445 
446         async fn punch_hole(handle_src: &HandleSource<File>) {
447             let offset = 9;
448             let len = 4;
449             handle_src.punch_hole(offset, len).await.unwrap();
450         }
451 
452         let ex = RawExecutor::<HandleReactor>::new().unwrap();
453         let f = fs::OpenOptions::new()
454             .write(true)
455             .open(temp_file.path())
456             .unwrap();
457         let handle_src = HandleSource::new(f).unwrap();
458         ex.run_until(punch_hole(&handle_src)).unwrap();
459 
460         let mut buf = vec![0; 13];
461         assert!(temp_file.read_exact(&mut buf).is_err());
462     }
463 
464     // TODO(b/194338842): "ZeroRange" is supposed to allocate more memory if it goes out of the
465     // bounds of the file. Determine if we need to support this, since Windows doesn't do this yet.
466     // #[test]
467     // fn test_write_zeroes() {
468     //     let mut temp_file = NamedTempFile::new().unwrap();
469     //     temp_file.write("abcdefghijk".as_bytes()).unwrap();
470     //     temp_file.flush().unwrap();
471     //     temp_file.seek(SeekFrom::Start(0)).unwrap();
472 
473     //     async fn punch_hole(handle_src: &HandleSource<File>) {
474     //         let offset = 9;
475     //         let len = 4;
476     //         handle_src
477     //             .fallocate(offset, len, AllocateMode::ZeroRange)
478     //             .await
479     //             .unwrap();
480     //     }
481 
482     //     let ex = RawExecutor::<HandleReactor>::new();
483     //     let f = fs::OpenOptions::new()
484     //         .write(true)
485     //         .open(temp_file.path())
486     //         .unwrap();
487     //     let handle_src = HandleSource::new(f).unwrap();
488     //     ex.run_until(punch_hole(&handle_src)).unwrap();
489 
490     //     let mut buf = vec![0; 13];
491     //     temp_file.read_exact(&mut buf).unwrap();
492     //     assert_eq!(
493     //         std::str::from_utf8(buf.as_slice()).unwrap(),
494     //         "abcdefghi\0\0\0\0"
495     //     );
496     // }
497 }
498