1 // Copyright 2020 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::fs::File;
6 use std::io;
7 use std::io::Read;
8 use std::io::Seek;
9 use std::io::SeekFrom;
10 use std::io::Write;
11 use std::mem::ManuallyDrop;
12 use std::ptr::null_mut;
13 use std::sync::Arc;
14 use std::time::Duration;
15
16 use base::error;
17 use base::warn;
18 use base::AsRawDescriptor;
19 use base::Descriptor;
20 use base::Error as SysUtilError;
21 use base::FileReadWriteAtVolatile;
22 use base::FileReadWriteVolatile;
23 use base::FromRawDescriptor;
24 use base::PunchHole;
25 use base::VolatileSlice;
26 use base::WriteZeroesAt;
27 use smallvec::SmallVec;
28 use sync::Mutex;
29 use thiserror::Error as ThisError;
30 use winapi::um::ioapiset::CancelIoEx;
31
32 use crate::mem::BackingMemory;
33 use crate::mem::MemRegion;
34 use crate::AsyncError;
35 use crate::AsyncResult;
36 use crate::CancellableBlockingPool;
37
38 #[derive(ThisError, Debug)]
39 pub enum Error {
40 #[error("An error occurred trying to seek: {0}.")]
41 IoSeekError(io::Error),
42 #[error("An error occurred trying to read: {0}.")]
43 IoReadError(io::Error),
44 #[error("An error occurred trying to write: {0}.")]
45 IoWriteError(io::Error),
46 #[error("An error occurred trying to flush: {0}.")]
47 IoFlushError(io::Error),
48 #[error("An error occurred trying to punch hole: {0}.")]
49 IoPunchHoleError(io::Error),
50 #[error("An error occurred trying to write zeroes: {0}.")]
51 IoWriteZeroesError(io::Error),
52 #[error("An error occurred trying to duplicate source handles: {0}.")]
53 HandleDuplicationFailed(io::Error),
54 #[error("An error occurred trying to wait on source handles: {0}.")]
55 HandleWaitFailed(io::Error),
56 #[error("An error occurred trying to get a VolatileSlice into BackingMemory: {0}.")]
57 BackingMemoryVolatileSliceFetchFailed(crate::mem::Error),
58 #[error("HandleSource is gone, so no handles are available to fulfill the IO request.")]
59 NoHandleSource,
60 #[error("Operation on HandleSource is cancelled.")]
61 OperationCancelled,
62 #[error("Operation on HandleSource was aborted (unexpected).")]
63 OperationAborted,
64 }
65
66 impl From<Error> for io::Error {
from(e: Error) -> Self67 fn from(e: Error) -> Self {
68 use Error::*;
69 match e {
70 IoSeekError(e) => e,
71 IoReadError(e) => e,
72 IoWriteError(e) => e,
73 IoFlushError(e) => e,
74 IoPunchHoleError(e) => e,
75 IoWriteZeroesError(e) => e,
76 HandleDuplicationFailed(e) => e,
77 HandleWaitFailed(e) => e,
78 BackingMemoryVolatileSliceFetchFailed(e) => io::Error::new(io::ErrorKind::Other, e),
79 NoHandleSource => io::Error::new(io::ErrorKind::Other, NoHandleSource),
80 OperationCancelled => io::Error::new(io::ErrorKind::Interrupted, OperationCancelled),
81 OperationAborted => io::Error::new(io::ErrorKind::Interrupted, OperationAborted),
82 }
83 }
84 }
85
86 impl From<Error> for AsyncError {
from(e: Error) -> AsyncError87 fn from(e: Error) -> AsyncError {
88 AsyncError::SysVariants(e.into())
89 }
90 }
91
92 pub type Result<T> = std::result::Result<T, Error>;
93
94 /// Used to shutdown IO running on a CancellableBlockingPool.
95 pub struct HandleWrapper {
96 handle: Descriptor,
97 }
98
99 impl HandleWrapper {
new(handle: Descriptor) -> Arc<Mutex<HandleWrapper>>100 pub fn new(handle: Descriptor) -> Arc<Mutex<HandleWrapper>> {
101 Arc::new(Mutex::new(Self { handle }))
102 }
103
cancel_sync_io<T>(&mut self, ret: T) -> T104 pub fn cancel_sync_io<T>(&mut self, ret: T) -> T {
105 // There isn't much we can do if cancel fails.
106 // SAFETY: trivially safe
107 if unsafe { CancelIoEx(self.handle.as_raw_descriptor(), null_mut()) } == 0 {
108 warn!(
109 "Cancel IO for handle:{:?} failed with {}",
110 self.handle.as_raw_descriptor(),
111 SysUtilError::last()
112 );
113 }
114 ret
115 }
116 }
117
118 /// Async IO source for Windows, such as a file.
119 pub struct HandleSource<F: AsRawDescriptor> {
120 source: F,
121 source_descriptor: Descriptor,
122 blocking_pool: CancellableBlockingPool,
123 }
124
125 impl<F: AsRawDescriptor> HandleSource<F> {
126 /// Create a new `HandleSource` from the given IO source.
127 ///
128 /// Each HandleSource uses its own thread pool, with one thread per source supplied. Since these
129 /// threads are generally idle because they're waiting on blocking IO, so the cost is minimal.
130 /// Long term, we may migrate away from this approach toward IOCP or overlapped IO.
131 ///
132 /// WARNING: `source` MUST be a unique file object (e.g. separate handles
133 /// each created by CreateFile), and point at the same file on disk. This is because IO
134 /// operations on the HandleSource are randomly distributed to each source.
135 ///
136 /// # Safety
137 /// The caller must guarantee that `F`'s handle is compatible with the underlying functions
138 /// exposed on `HandleSource`. The behavior when calling unsupported functions is not defined
139 /// by this struct. Note that most winapis will fail with reasonable errors.
new(source: F) -> Result<Self>140 pub fn new(source: F) -> Result<Self> {
141 let source_descriptor = Descriptor(source.as_raw_descriptor());
142
143 Ok(Self {
144 source,
145 source_descriptor,
146 blocking_pool: CancellableBlockingPool::new(
147 // WARNING: this is a safety requirement! Threads are 1:1 with sources.
148 1,
149 Duration::from_secs(10),
150 ),
151 })
152 }
153
154 #[inline]
get_slices( mem: &Arc<dyn BackingMemory + Send + Sync>, mem_offsets: Vec<MemRegion>, ) -> Result<SmallVec<[VolatileSlice<'_>; 16]>>155 fn get_slices(
156 mem: &Arc<dyn BackingMemory + Send + Sync>,
157 mem_offsets: Vec<MemRegion>,
158 ) -> Result<SmallVec<[VolatileSlice<'_>; 16]>> {
159 mem_offsets
160 .into_iter()
161 .map(|region| {
162 mem.get_volatile_slice(region)
163 .map_err(Error::BackingMemoryVolatileSliceFetchFailed)
164 })
165 .collect::<Result<SmallVec<[VolatileSlice; 16]>>>()
166 }
167 }
168
get_thread_file(descriptor: Descriptor) -> ManuallyDrop<File>169 fn get_thread_file(descriptor: Descriptor) -> ManuallyDrop<File> {
170 // SAFETY: trivially safe
171 // Safe because all callers must exit *before* these handles will be closed (guaranteed by
172 // HandleSource's Drop impl.).
173 unsafe { ManuallyDrop::new(File::from_raw_descriptor(descriptor.0)) }
174 }
175
176 impl<F: AsRawDescriptor> HandleSource<F> {
177 /// Reads from the iosource at `file_offset` and fill the given `vec`.
read_to_vec( &self, file_offset: Option<u64>, mut vec: Vec<u8>, ) -> AsyncResult<(usize, Vec<u8>)>178 pub async fn read_to_vec(
179 &self,
180 file_offset: Option<u64>,
181 mut vec: Vec<u8>,
182 ) -> AsyncResult<(usize, Vec<u8>)> {
183 let handles = HandleWrapper::new(self.source_descriptor);
184 let descriptors = self.source_descriptor;
185
186 Ok(self
187 .blocking_pool
188 .spawn(
189 move || {
190 let mut file = get_thread_file(descriptors);
191 if let Some(file_offset) = file_offset {
192 file.seek(SeekFrom::Start(file_offset))
193 .map_err(Error::IoSeekError)?;
194 }
195 Ok((
196 file.read(vec.as_mut_slice()).map_err(Error::IoReadError)?,
197 vec,
198 ))
199 },
200 move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)),
201 )
202 .await?)
203 }
204
205 /// Reads to the given `mem` at the given offsets from the file starting at `file_offset`.
read_to_mem( &self, file_offset: Option<u64>, mem: Arc<dyn BackingMemory + Send + Sync>, mem_offsets: impl IntoIterator<Item = MemRegion>, ) -> AsyncResult<usize>206 pub async fn read_to_mem(
207 &self,
208 file_offset: Option<u64>,
209 mem: Arc<dyn BackingMemory + Send + Sync>,
210 mem_offsets: impl IntoIterator<Item = MemRegion>,
211 ) -> AsyncResult<usize> {
212 let mem_offsets = mem_offsets.into_iter().collect();
213 let handles = HandleWrapper::new(self.source_descriptor);
214 let descriptors = self.source_descriptor;
215
216 Ok(self
217 .blocking_pool
218 .spawn(
219 move || {
220 let mut file = get_thread_file(descriptors);
221 let memory_slices = Self::get_slices(&mem, mem_offsets)?;
222
223 match file_offset {
224 Some(file_offset) => file
225 .read_vectored_at_volatile(memory_slices.as_slice(), file_offset)
226 .map_err(Error::IoReadError),
227 None => file
228 .read_vectored_volatile(memory_slices.as_slice())
229 .map_err(Error::IoReadError),
230 }
231 },
232 move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)),
233 )
234 .await?)
235 }
236
237 /// Wait for the handle of `self` to be readable.
wait_readable(&self) -> AsyncResult<()>238 pub async fn wait_readable(&self) -> AsyncResult<()> {
239 unimplemented!()
240 }
241
242 /// Reads a single u64 from the current offset.
read_u64(&self) -> AsyncResult<u64>243 pub async fn read_u64(&self) -> AsyncResult<u64> {
244 unimplemented!()
245 }
246
247 /// Writes from the given `vec` to the file starting at `file_offset`.
write_from_vec( &self, file_offset: Option<u64>, vec: Vec<u8>, ) -> AsyncResult<(usize, Vec<u8>)>248 pub async fn write_from_vec(
249 &self,
250 file_offset: Option<u64>,
251 vec: Vec<u8>,
252 ) -> AsyncResult<(usize, Vec<u8>)> {
253 let handles = HandleWrapper::new(self.source_descriptor);
254 let descriptors = self.source_descriptor;
255
256 Ok(self
257 .blocking_pool
258 .spawn(
259 move || {
260 let mut file = get_thread_file(descriptors);
261 if let Some(file_offset) = file_offset {
262 file.seek(SeekFrom::Start(file_offset))
263 .map_err(Error::IoSeekError)?;
264 }
265 Ok((
266 file.write(vec.as_slice()).map_err(Error::IoWriteError)?,
267 vec,
268 ))
269 },
270 move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)),
271 )
272 .await?)
273 }
274
275 /// Writes from the given `mem` from the given offsets to the file starting at `file_offset`.
write_from_mem( &self, file_offset: Option<u64>, mem: Arc<dyn BackingMemory + Send + Sync>, mem_offsets: impl IntoIterator<Item = MemRegion>, ) -> AsyncResult<usize>276 pub async fn write_from_mem(
277 &self,
278 file_offset: Option<u64>,
279 mem: Arc<dyn BackingMemory + Send + Sync>,
280 mem_offsets: impl IntoIterator<Item = MemRegion>,
281 ) -> AsyncResult<usize> {
282 let mem_offsets = mem_offsets.into_iter().collect();
283 let handles = HandleWrapper::new(self.source_descriptor);
284 let descriptors = self.source_descriptor;
285
286 Ok(self
287 .blocking_pool
288 .spawn(
289 move || {
290 let mut file = get_thread_file(descriptors);
291 let memory_slices = Self::get_slices(&mem, mem_offsets)?;
292
293 match file_offset {
294 Some(file_offset) => file
295 .write_vectored_at_volatile(memory_slices.as_slice(), file_offset)
296 .map_err(Error::IoWriteError),
297 None => file
298 .write_vectored_volatile(memory_slices.as_slice())
299 .map_err(Error::IoWriteError),
300 }
301 },
302 move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)),
303 )
304 .await?)
305 }
306
307 /// Deallocates the given range of a file.
punch_hole(&self, file_offset: u64, len: u64) -> AsyncResult<()>308 pub async fn punch_hole(&self, file_offset: u64, len: u64) -> AsyncResult<()> {
309 let handles = HandleWrapper::new(self.source_descriptor);
310 let descriptors = self.source_descriptor;
311 Ok(self
312 .blocking_pool
313 .spawn(
314 move || {
315 let file = get_thread_file(descriptors);
316 file.punch_hole(file_offset, len)
317 .map_err(Error::IoPunchHoleError)?;
318 Ok(())
319 },
320 move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)),
321 )
322 .await?)
323 }
324
325 /// Fills the given range with zeroes.
write_zeroes_at(&self, file_offset: u64, len: u64) -> AsyncResult<()>326 pub async fn write_zeroes_at(&self, file_offset: u64, len: u64) -> AsyncResult<()> {
327 let handles = HandleWrapper::new(self.source_descriptor);
328 let descriptors = self.source_descriptor;
329 Ok(self
330 .blocking_pool
331 .spawn(
332 move || {
333 let file = get_thread_file(descriptors);
334 // ZeroRange calls `punch_hole` which doesn't extend the File size if it needs
335 // to. Will fix if it becomes a problem.
336 file.write_zeroes_at(file_offset, len as usize)
337 .map_err(Error::IoWriteZeroesError)?;
338 Ok(())
339 },
340 move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)),
341 )
342 .await?)
343 }
344
345 /// Sync all completed write operations to the backing storage.
fsync(&self) -> AsyncResult<()>346 pub async fn fsync(&self) -> AsyncResult<()> {
347 let handles = HandleWrapper::new(self.source_descriptor);
348 let descriptors = self.source_descriptor;
349
350 Ok(self
351 .blocking_pool
352 .spawn(
353 move || {
354 let mut file = get_thread_file(descriptors);
355 file.flush().map_err(Error::IoFlushError)
356 },
357 move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)),
358 )
359 .await?)
360 }
361
362 /// Sync all data of completed write operations to the backing storage. Currently, the
363 /// implementation is equivalent to fsync.
fdatasync(&self) -> AsyncResult<()>364 pub async fn fdatasync(&self) -> AsyncResult<()> {
365 // TODO(b/282003931): Fall back to regular fsync.
366 self.fsync().await
367 }
368
369 /// Yields the underlying IO source.
into_source(self) -> F370 pub fn into_source(self) -> F {
371 self.source
372 }
373
374 /// Provides a mutable ref to the underlying IO source.
as_source_mut(&mut self) -> &mut F375 pub fn as_source_mut(&mut self) -> &mut F {
376 &mut self.source
377 }
378
379 /// Provides a ref to the underlying IO source.
380 ///
381 /// If sources are not interchangeable, behavior is undefined.
as_source(&self) -> &F382 pub fn as_source(&self) -> &F {
383 &self.source
384 }
385
386 /// If sources are not interchangeable, behavior is undefined.
wait_for_handle(&self) -> AsyncResult<()>387 pub async fn wait_for_handle(&self) -> AsyncResult<()> {
388 base::sys::windows::async_wait_for_single_object(&self.source)
389 .await
390 .map_err(Error::HandleWaitFailed)?;
391 Ok(())
392 }
393 }
394
395 // NOTE: Prefer adding tests to io_source.rs if not backend specific.
396 #[cfg(test)]
397 mod tests {
398 use std::fs;
399
400 use tempfile::NamedTempFile;
401
402 use super::super::HandleReactor;
403 use super::*;
404 use crate::common_executor::RawExecutor;
405 use crate::ExecutorTrait;
406
407 #[cfg_attr(all(target_os = "windows", target_env = "gnu"), ignore)]
408 #[test]
test_punch_holes()409 fn test_punch_holes() {
410 let mut temp_file = NamedTempFile::new().unwrap();
411 temp_file.write_all("abcdefghijk".as_bytes()).unwrap();
412 temp_file.flush().unwrap();
413 temp_file.seek(SeekFrom::Start(0)).unwrap();
414
415 async fn punch_hole(handle_src: &HandleSource<File>) {
416 let offset = 1;
417 let len = 3;
418 handle_src.punch_hole(offset, len).await.unwrap();
419 }
420
421 let ex = RawExecutor::<HandleReactor>::new().unwrap();
422 let f = fs::OpenOptions::new()
423 .write(true)
424 .open(temp_file.path())
425 .unwrap();
426 let handle_src = HandleSource::new(f).unwrap();
427 ex.run_until(punch_hole(&handle_src)).unwrap();
428
429 let mut buf = vec![0; 11];
430 temp_file.read_exact(&mut buf).unwrap();
431 assert_eq!(
432 std::str::from_utf8(buf.as_slice()).unwrap(),
433 "a\0\0\0efghijk"
434 );
435 }
436
437 /// Test should fail because punch hole should not be allowed to allocate more memory
438 #[cfg_attr(all(target_os = "windows", target_env = "gnu"), ignore)]
439 #[test]
test_punch_holes_fail_out_of_bounds()440 fn test_punch_holes_fail_out_of_bounds() {
441 let mut temp_file = NamedTempFile::new().unwrap();
442 temp_file.write_all("abcdefghijk".as_bytes()).unwrap();
443 temp_file.flush().unwrap();
444 temp_file.seek(SeekFrom::Start(0)).unwrap();
445
446 async fn punch_hole(handle_src: &HandleSource<File>) {
447 let offset = 9;
448 let len = 4;
449 handle_src.punch_hole(offset, len).await.unwrap();
450 }
451
452 let ex = RawExecutor::<HandleReactor>::new().unwrap();
453 let f = fs::OpenOptions::new()
454 .write(true)
455 .open(temp_file.path())
456 .unwrap();
457 let handle_src = HandleSource::new(f).unwrap();
458 ex.run_until(punch_hole(&handle_src)).unwrap();
459
460 let mut buf = vec![0; 13];
461 assert!(temp_file.read_exact(&mut buf).is_err());
462 }
463
464 // TODO(b/194338842): "ZeroRange" is supposed to allocate more memory if it goes out of the
465 // bounds of the file. Determine if we need to support this, since Windows doesn't do this yet.
466 // #[test]
467 // fn test_write_zeroes() {
468 // let mut temp_file = NamedTempFile::new().unwrap();
469 // temp_file.write("abcdefghijk".as_bytes()).unwrap();
470 // temp_file.flush().unwrap();
471 // temp_file.seek(SeekFrom::Start(0)).unwrap();
472
473 // async fn punch_hole(handle_src: &HandleSource<File>) {
474 // let offset = 9;
475 // let len = 4;
476 // handle_src
477 // .fallocate(offset, len, AllocateMode::ZeroRange)
478 // .await
479 // .unwrap();
480 // }
481
482 // let ex = RawExecutor::<HandleReactor>::new();
483 // let f = fs::OpenOptions::new()
484 // .write(true)
485 // .open(temp_file.path())
486 // .unwrap();
487 // let handle_src = HandleSource::new(f).unwrap();
488 // ex.run_until(punch_hole(&handle_src)).unwrap();
489
490 // let mut buf = vec![0; 13];
491 // temp_file.read_exact(&mut buf).unwrap();
492 // assert_eq!(
493 // std::str::from_utf8(buf.as_slice()).unwrap(),
494 // "abcdefghi\0\0\0\0"
495 // );
496 // }
497 }
498