xref: /aosp_15_r20/external/crosvm/devices/src/virtio/block/asynchronous.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::cell::RefCell;
6 use std::collections::BTreeMap;
7 use std::collections::BTreeSet;
8 use std::io;
9 use std::io::Write;
10 use std::mem::size_of;
11 #[cfg(windows)]
12 use std::num::NonZeroU32;
13 use std::rc::Rc;
14 use std::result;
15 use std::sync::atomic::AtomicU64;
16 use std::sync::atomic::Ordering;
17 use std::sync::Arc;
18 use std::time::Duration;
19 
20 use anyhow::Context;
21 use base::debug;
22 use base::error;
23 use base::info;
24 use base::warn;
25 use base::AsRawDescriptor;
26 use base::Error as SysError;
27 use base::Event;
28 use base::RawDescriptor;
29 use base::Result as SysResult;
30 use base::Timer;
31 use base::Tube;
32 use base::TubeError;
33 use base::WorkerThread;
34 use cros_async::sync::RwLock as AsyncRwLock;
35 use cros_async::AsyncError;
36 use cros_async::AsyncTube;
37 use cros_async::EventAsync;
38 use cros_async::Executor;
39 use cros_async::ExecutorKind;
40 use cros_async::TimerAsync;
41 use data_model::Le16;
42 use data_model::Le32;
43 use data_model::Le64;
44 use disk::AsyncDisk;
45 use disk::DiskFile;
46 use futures::channel::mpsc;
47 use futures::channel::oneshot;
48 use futures::pin_mut;
49 use futures::stream::FuturesUnordered;
50 use futures::stream::StreamExt;
51 use futures::FutureExt;
52 use remain::sorted;
53 use thiserror::Error as ThisError;
54 use virtio_sys::virtio_config::VIRTIO_F_RING_PACKED;
55 use vm_control::DiskControlCommand;
56 use vm_control::DiskControlResult;
57 use vm_memory::GuestMemory;
58 use zerocopy::AsBytes;
59 
60 use crate::virtio::async_utils;
61 use crate::virtio::block::sys::*;
62 use crate::virtio::block::DiskOption;
63 use crate::virtio::copy_config;
64 use crate::virtio::device_constants::block::virtio_blk_config;
65 use crate::virtio::device_constants::block::virtio_blk_discard_write_zeroes;
66 use crate::virtio::device_constants::block::virtio_blk_req_header;
67 use crate::virtio::device_constants::block::VIRTIO_BLK_DISCARD_WRITE_ZEROES_FLAG_UNMAP;
68 use crate::virtio::device_constants::block::VIRTIO_BLK_F_BLK_SIZE;
69 use crate::virtio::device_constants::block::VIRTIO_BLK_F_DISCARD;
70 use crate::virtio::device_constants::block::VIRTIO_BLK_F_FLUSH;
71 use crate::virtio::device_constants::block::VIRTIO_BLK_F_MQ;
72 use crate::virtio::device_constants::block::VIRTIO_BLK_F_RO;
73 use crate::virtio::device_constants::block::VIRTIO_BLK_F_SEG_MAX;
74 use crate::virtio::device_constants::block::VIRTIO_BLK_F_WRITE_ZEROES;
75 use crate::virtio::device_constants::block::VIRTIO_BLK_S_IOERR;
76 use crate::virtio::device_constants::block::VIRTIO_BLK_S_OK;
77 use crate::virtio::device_constants::block::VIRTIO_BLK_S_UNSUPP;
78 use crate::virtio::device_constants::block::VIRTIO_BLK_T_DISCARD;
79 use crate::virtio::device_constants::block::VIRTIO_BLK_T_FLUSH;
80 use crate::virtio::device_constants::block::VIRTIO_BLK_T_GET_ID;
81 use crate::virtio::device_constants::block::VIRTIO_BLK_T_IN;
82 use crate::virtio::device_constants::block::VIRTIO_BLK_T_OUT;
83 use crate::virtio::device_constants::block::VIRTIO_BLK_T_WRITE_ZEROES;
84 use crate::virtio::DescriptorChain;
85 use crate::virtio::DeviceType;
86 use crate::virtio::Interrupt;
87 use crate::virtio::Queue;
88 use crate::virtio::Reader;
89 use crate::virtio::VirtioDevice;
90 use crate::virtio::Writer;
91 use crate::PciAddress;
92 
93 const DEFAULT_QUEUE_SIZE: u16 = 256;
94 const DEFAULT_NUM_QUEUES: u16 = 16;
95 
96 const SECTOR_SHIFT: u8 = 9;
97 const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT;
98 
99 const MAX_DISCARD_SECTORS: u32 = u32::MAX;
100 const MAX_WRITE_ZEROES_SECTORS: u32 = u32::MAX;
101 // Arbitrary limits for number of discard/write zeroes segments.
102 const MAX_DISCARD_SEG: u32 = 32;
103 const MAX_WRITE_ZEROES_SEG: u32 = 32;
104 // Hard-coded to 64 KiB (in 512-byte sectors) for now,
105 // but this should probably be based on cluster size for qcow.
106 const DISCARD_SECTOR_ALIGNMENT: u32 = 128;
107 
108 #[sorted]
109 #[derive(ThisError, Debug)]
110 enum ExecuteError {
111     #[error("failed to copy ID string: {0}")]
112     CopyId(io::Error),
113     #[error("failed to perform discard or write zeroes; sector={sector} num_sectors={num_sectors} flags={flags}; {ioerr:?}")]
114     DiscardWriteZeroes {
115         ioerr: Option<disk::Error>,
116         sector: u64,
117         num_sectors: u32,
118         flags: u32,
119     },
120     #[error("failed to flush: {0}")]
121     Flush(disk::Error),
122     #[error("not enough space in descriptor chain to write status")]
123     MissingStatus,
124     #[error("out of range")]
125     OutOfRange,
126     #[error("failed to read message: {0}")]
127     Read(io::Error),
128     #[error("io error reading {length} bytes from sector {sector}: {desc_error}")]
129     ReadIo {
130         length: usize,
131         sector: u64,
132         desc_error: disk::Error,
133     },
134     #[error("read only; request_type={request_type}")]
135     ReadOnly { request_type: u32 },
136     #[error("failed to recieve command message: {0}")]
137     ReceivingCommand(TubeError),
138     #[error("failed to send command response: {0}")]
139     SendingResponse(TubeError),
140     #[error("couldn't reset the timer: {0}")]
141     TimerReset(base::Error),
142     #[error("unsupported ({0})")]
143     Unsupported(u32),
144     #[error("io error writing {length} bytes from sector {sector}: {desc_error}")]
145     WriteIo {
146         length: usize,
147         sector: u64,
148         desc_error: disk::Error,
149     },
150     #[error("failed to write request status: {0}")]
151     WriteStatus(io::Error),
152 }
153 
154 enum LogLevel {
155     Debug,
156     Error,
157 }
158 
159 impl ExecuteError {
status(&self) -> u8160     fn status(&self) -> u8 {
161         match self {
162             ExecuteError::CopyId(_) => VIRTIO_BLK_S_IOERR,
163             ExecuteError::DiscardWriteZeroes { .. } => VIRTIO_BLK_S_IOERR,
164             ExecuteError::Flush(_) => VIRTIO_BLK_S_IOERR,
165             ExecuteError::MissingStatus => VIRTIO_BLK_S_IOERR,
166             ExecuteError::OutOfRange { .. } => VIRTIO_BLK_S_IOERR,
167             ExecuteError::Read(_) => VIRTIO_BLK_S_IOERR,
168             ExecuteError::ReadIo { .. } => VIRTIO_BLK_S_IOERR,
169             ExecuteError::ReadOnly { .. } => VIRTIO_BLK_S_IOERR,
170             ExecuteError::ReceivingCommand(_) => VIRTIO_BLK_S_IOERR,
171             ExecuteError::SendingResponse(_) => VIRTIO_BLK_S_IOERR,
172             ExecuteError::TimerReset(_) => VIRTIO_BLK_S_IOERR,
173             ExecuteError::WriteIo { .. } => VIRTIO_BLK_S_IOERR,
174             ExecuteError::WriteStatus(_) => VIRTIO_BLK_S_IOERR,
175             ExecuteError::Unsupported(_) => VIRTIO_BLK_S_UNSUPP,
176         }
177     }
178 
log_level(&self) -> LogLevel179     fn log_level(&self) -> LogLevel {
180         match self {
181             // Since there is no feature bit for the guest to detect support for
182             // VIRTIO_BLK_T_GET_ID, the driver has to try executing the request to see if it works.
183             ExecuteError::Unsupported(VIRTIO_BLK_T_GET_ID) => LogLevel::Debug,
184             // Log disk I/O errors at debug level to avoid flooding the logs.
185             ExecuteError::ReadIo { .. }
186             | ExecuteError::WriteIo { .. }
187             | ExecuteError::Flush { .. }
188             | ExecuteError::DiscardWriteZeroes { .. } => LogLevel::Debug,
189             // Log all other failures as errors.
190             _ => LogLevel::Error,
191         }
192     }
193 }
194 
195 /// Errors that happen in block outside of executing a request.
196 /// This includes errors during resize and flush operations.
197 #[sorted]
198 #[derive(ThisError, Debug)]
199 enum ControlError {
200     #[error("couldn't get a value from a timer for flushing: {0}")]
201     FlushTimer(AsyncError),
202     #[error("failed to fsync the disk: {0}")]
203     FsyncDisk(disk::Error),
204 }
205 
206 /// Maximum length of the virtio-block ID string field.
207 const ID_LEN: usize = 20;
208 
209 /// Virtio block device identifier.
210 /// This is an ASCII string terminated by a \0, unless all 20 bytes are used,
211 /// in which case the \0 terminator is omitted.
212 type BlockId = [u8; ID_LEN];
213 
214 /// Tracks the state of an anynchronous disk.
215 struct DiskState {
216     disk_image: Box<dyn AsyncDisk>,
217     read_only: bool,
218     sparse: bool,
219     id: Option<BlockId>,
220     /// A DiskState is owned by each worker's executor and cannot be shared by workers, thus
221     /// `worker_shared_state` holds the state shared by workers in Arc.
222     worker_shared_state: Arc<AsyncRwLock<WorkerSharedState>>,
223 }
224 
225 /// Disk state which can be modified by other worker threads
226 struct WorkerSharedState {
227     disk_size: Arc<AtomicU64>,
228 }
229 
process_one_request( avail_desc: &mut DescriptorChain, disk_state: &AsyncRwLock<DiskState>, flush_timer: &RefCell<TimerAsync<Timer>>, flush_timer_armed: &RefCell<bool>, ) -> result::Result<usize, ExecuteError>230 async fn process_one_request(
231     avail_desc: &mut DescriptorChain,
232     disk_state: &AsyncRwLock<DiskState>,
233     flush_timer: &RefCell<TimerAsync<Timer>>,
234     flush_timer_armed: &RefCell<bool>,
235 ) -> result::Result<usize, ExecuteError> {
236     let reader = &mut avail_desc.reader;
237     let writer = &mut avail_desc.writer;
238 
239     // The last byte of the buffer is virtio_blk_req::status.
240     // Split it into a separate Writer so that status_writer is the final byte and
241     // the original writer is left with just the actual block I/O data.
242     let available_bytes = writer.available_bytes();
243     let status_offset = available_bytes
244         .checked_sub(1)
245         .ok_or(ExecuteError::MissingStatus)?;
246     let mut status_writer = writer.split_at(status_offset);
247 
248     let status = match BlockAsync::execute_request(
249         reader,
250         writer,
251         disk_state,
252         flush_timer,
253         flush_timer_armed,
254     )
255     .await
256     {
257         Ok(()) => VIRTIO_BLK_S_OK,
258         Err(e) => {
259             match e.log_level() {
260                 LogLevel::Debug => debug!("failed executing disk request: {:#}", e),
261                 LogLevel::Error => error!("failed executing disk request: {:#}", e),
262             }
263             e.status()
264         }
265     };
266 
267     status_writer
268         .write_all(&[status])
269         .map_err(ExecuteError::WriteStatus)?;
270     Ok(available_bytes)
271 }
272 
273 /// Process one descriptor chain asynchronously.
process_one_chain( queue: &RefCell<Queue>, mut avail_desc: DescriptorChain, disk_state: &AsyncRwLock<DiskState>, flush_timer: &RefCell<TimerAsync<Timer>>, flush_timer_armed: &RefCell<bool>, )274 async fn process_one_chain(
275     queue: &RefCell<Queue>,
276     mut avail_desc: DescriptorChain,
277     disk_state: &AsyncRwLock<DiskState>,
278     flush_timer: &RefCell<TimerAsync<Timer>>,
279     flush_timer_armed: &RefCell<bool>,
280 ) {
281     let _trace = cros_tracing::trace_event!(VirtioBlk, "process_one_chain");
282     let len = match process_one_request(&mut avail_desc, disk_state, flush_timer, flush_timer_armed)
283         .await
284     {
285         Ok(len) => len,
286         Err(e) => {
287             error!("block: failed to handle request: {:#}", e);
288             0
289         }
290     };
291 
292     let mut queue = queue.borrow_mut();
293     queue.add_used(avail_desc, len as u32);
294     queue.trigger_interrupt();
295 }
296 
297 // There is one async task running `handle_queue` per virtio queue in use.
298 // Receives messages from the guest and queues a task to complete the operations with the async
299 // executor.
handle_queue( disk_state: Rc<AsyncRwLock<DiskState>>, queue: Queue, evt: EventAsync, flush_timer: Rc<RefCell<TimerAsync<Timer>>>, flush_timer_armed: Rc<RefCell<bool>>, mut stop_rx: oneshot::Receiver<()>, ) -> Queue300 async fn handle_queue(
301     disk_state: Rc<AsyncRwLock<DiskState>>,
302     queue: Queue,
303     evt: EventAsync,
304     flush_timer: Rc<RefCell<TimerAsync<Timer>>>,
305     flush_timer_armed: Rc<RefCell<bool>>,
306     mut stop_rx: oneshot::Receiver<()>,
307 ) -> Queue {
308     let queue = RefCell::new(queue);
309     let mut background_tasks = FuturesUnordered::new();
310     let evt_future = evt.next_val().fuse();
311     pin_mut!(evt_future);
312     loop {
313         // Wait for the next signal from `evt` and process `background_tasks` in the meantime.
314         //
315         // NOTE: We can't call `evt.next_val()` directly in the `select!` expression. That would
316         // create a new future each time, which, in the completion-based async backends like
317         // io_uring, means we'd submit a new syscall each time (i.e. a race condition on the
318         // eventfd).
319         futures::select! {
320             _ = background_tasks.next() => continue,
321             res = evt_future => {
322                 evt_future.set(evt.next_val().fuse());
323                 if let Err(e) = res {
324                     error!("Failed to read the next queue event: {:#}", e);
325                     continue;
326                 }
327             }
328             _ = stop_rx => {
329                 // Process all the descriptors we've already popped from the queue so that we leave
330                 // the queue in a consistent state.
331                 background_tasks.collect::<()>().await;
332                 return queue.into_inner();
333             }
334         };
335         while let Some(descriptor_chain) = queue.borrow_mut().pop() {
336             background_tasks.push(process_one_chain(
337                 &queue,
338                 descriptor_chain,
339                 &disk_state,
340                 &flush_timer,
341                 &flush_timer_armed,
342             ));
343         }
344     }
345 }
346 
handle_command_tube( command_tube: &Option<AsyncTube>, interrupt: &RefCell<Option<Interrupt>>, disk_state: Rc<AsyncRwLock<DiskState>>, ) -> Result<(), ExecuteError>347 async fn handle_command_tube(
348     command_tube: &Option<AsyncTube>,
349     interrupt: &RefCell<Option<Interrupt>>,
350     disk_state: Rc<AsyncRwLock<DiskState>>,
351 ) -> Result<(), ExecuteError> {
352     let command_tube = match command_tube {
353         Some(c) => c,
354         None => {
355             futures::future::pending::<()>().await;
356             return Ok(());
357         }
358     };
359     loop {
360         match command_tube.next().await {
361             Ok(command) => {
362                 let resp = match command {
363                     DiskControlCommand::Resize { new_size } => resize(&disk_state, new_size).await,
364                 };
365 
366                 let resp_clone = resp.clone();
367                 command_tube
368                     .send(resp_clone)
369                     .await
370                     .map_err(ExecuteError::SendingResponse)?;
371                 if let DiskControlResult::Ok = resp {
372                     if let Some(interrupt) = &*interrupt.borrow() {
373                         interrupt.signal_config_changed();
374                     }
375                 }
376             }
377             Err(e) => return Err(ExecuteError::ReceivingCommand(e)),
378         }
379     }
380 }
381 
resize(disk_state: &AsyncRwLock<DiskState>, new_size: u64) -> DiskControlResult382 async fn resize(disk_state: &AsyncRwLock<DiskState>, new_size: u64) -> DiskControlResult {
383     // Acquire exclusive, mutable access to the state so the virtqueue task won't be able to read
384     // the state while resizing.
385     let disk_state = disk_state.lock().await;
386     // Prevent any other worker threads won't be able to do IO.
387     let worker_shared_state = Arc::clone(&disk_state.worker_shared_state);
388     let worker_shared_state = worker_shared_state.lock().await;
389 
390     if disk_state.read_only {
391         error!("Attempted to resize read-only block device");
392         return DiskControlResult::Err(SysError::new(libc::EROFS));
393     }
394 
395     info!("Resizing block device to {} bytes", new_size);
396 
397     if let Err(e) = disk_state.disk_image.set_len(new_size) {
398         error!("Resizing disk failed! {:#}", e);
399         return DiskControlResult::Err(SysError::new(libc::EIO));
400     }
401 
402     // Allocate new space if the disk image is not sparse.
403     if !disk_state.sparse {
404         if let Err(e) = disk_state.disk_image.allocate(0, new_size) {
405             error!("Allocating disk space after resize failed! {:#}", e);
406             return DiskControlResult::Err(SysError::new(libc::EIO));
407         }
408     }
409 
410     if let Ok(new_disk_size) = disk_state.disk_image.get_len() {
411         worker_shared_state
412             .disk_size
413             .store(new_disk_size, Ordering::Release);
414     }
415     DiskControlResult::Ok
416 }
417 
418 /// Periodically flushes the disk when the given timer fires.
flush_disk( disk_state: Rc<AsyncRwLock<DiskState>>, timer: TimerAsync<Timer>, armed: Rc<RefCell<bool>>, ) -> Result<(), ControlError>419 async fn flush_disk(
420     disk_state: Rc<AsyncRwLock<DiskState>>,
421     timer: TimerAsync<Timer>,
422     armed: Rc<RefCell<bool>>,
423 ) -> Result<(), ControlError> {
424     loop {
425         timer.wait().await.map_err(ControlError::FlushTimer)?;
426         if !*armed.borrow() {
427             continue;
428         }
429 
430         // Reset armed before calling fsync to guarantee that IO requests that started after we call
431         // fsync will be committed eventually.
432         *armed.borrow_mut() = false;
433 
434         disk_state
435             .read_lock()
436             .await
437             .disk_image
438             .fsync()
439             .await
440             .map_err(ControlError::FsyncDisk)?;
441     }
442 }
443 
444 enum WorkerCmd {
445     StartQueue {
446         index: usize,
447         queue: Queue,
448     },
449     StopQueue {
450         index: usize,
451         // Once the queue is stopped, it will be sent back over `response_tx`.
452         // `None` indicates that there was no queue at the given index.
453         response_tx: oneshot::Sender<Option<Queue>>,
454     },
455     // Stop all queues without recovering the queues' state and without completing any queued up
456     // work .
457     AbortQueues {
458         // Once the queues are stopped, a `()` value will be sent back over `response_tx`.
459         response_tx: oneshot::Sender<()>,
460     },
461 }
462 
463 // The main worker thread. Initialized the asynchronous worker tasks and passes them to the executor
464 // to be processed.
465 //
466 // `disk_state` is wrapped by `AsyncRwLock`, which provides both shared and exclusive locks. It's
467 // because the state can be read from the virtqueue task while the control task is processing a
468 // resizing command.
run_worker( ex: &Executor, disk_state: &Rc<AsyncRwLock<DiskState>>, control_tube: &Option<AsyncTube>, mut worker_rx: mpsc::UnboundedReceiver<WorkerCmd>, kill_evt: Event, ) -> anyhow::Result<()>469 async fn run_worker(
470     ex: &Executor,
471     disk_state: &Rc<AsyncRwLock<DiskState>>,
472     control_tube: &Option<AsyncTube>,
473     mut worker_rx: mpsc::UnboundedReceiver<WorkerCmd>,
474     kill_evt: Event,
475 ) -> anyhow::Result<()> {
476     // One flush timer per disk.
477     let timer = Timer::new().expect("Failed to create a timer");
478     let flush_timer_armed = Rc::new(RefCell::new(false));
479 
480     // Handles control requests.
481     let control_interrupt = RefCell::new(None);
482     let control = handle_command_tube(control_tube, &control_interrupt, disk_state.clone()).fuse();
483     pin_mut!(control);
484 
485     // Handle all the queues in one sub-select call.
486     let flush_timer = Rc::new(RefCell::new(
487         TimerAsync::new(
488             // Call try_clone() to share the same underlying FD with the `flush_disk` task.
489             timer.try_clone().expect("Failed to clone flush_timer"),
490             ex,
491         )
492         .expect("Failed to create an async timer"),
493     ));
494 
495     // Flushes the disk periodically.
496     let flush_timer2 = TimerAsync::new(timer, ex).expect("Failed to create an async timer");
497     let disk_flush = flush_disk(disk_state.clone(), flush_timer2, flush_timer_armed.clone()).fuse();
498     pin_mut!(disk_flush);
499 
500     // Exit if the kill event is triggered.
501     let kill = async_utils::await_and_exit(ex, kill_evt).fuse();
502     pin_mut!(kill);
503 
504     // Process any requests to resample the irq value.
505     let resample_future = std::future::pending::<anyhow::Result<()>>()
506         .fuse()
507         .left_future();
508     pin_mut!(resample_future);
509 
510     // Running queue handlers.
511     let mut queue_handlers = FuturesUnordered::new();
512     // Async stop functions for queue handlers, by queue index.
513     let mut queue_handler_stop_fns = std::collections::BTreeMap::new();
514 
515     loop {
516         futures::select! {
517             _ = queue_handlers.next() => continue,
518             r = disk_flush => return r.context("failed to flush a disk"),
519             r = control => return r.context("failed to handle a control request"),
520             r = resample_future => return r.context("failed to resample an irq value"),
521             r = kill => return r.context("failed to wait on the kill event"),
522             worker_cmd = worker_rx.next() => {
523                 match worker_cmd {
524                     None => anyhow::bail!("worker control channel unexpectedly closed"),
525                     Some(WorkerCmd::StartQueue{index, queue}) => {
526                         if matches!(&*resample_future, futures::future::Either::Left(_)) {
527                             resample_future.set(
528                                 async_utils::handle_irq_resample(ex, queue.interrupt().clone())
529                                     .fuse()
530                                     .right_future(),
531                             );
532                         }
533                         if control_interrupt.borrow().is_none() {
534                             *control_interrupt.borrow_mut() = Some(queue.interrupt().clone());
535                         }
536 
537                         let (tx, rx) = oneshot::channel();
538                         let kick_evt = queue.event().try_clone().expect("Failed to clone queue event");
539                         let (handle_queue_future, remote_handle) = handle_queue(
540                             Rc::clone(disk_state),
541                             queue,
542                             EventAsync::new(kick_evt, ex).expect("Failed to create async event for queue"),
543                             Rc::clone(&flush_timer),
544                             Rc::clone(&flush_timer_armed),
545                             rx,
546                         ).remote_handle();
547                         let old_stop_fn = queue_handler_stop_fns.insert(index, move || {
548                             // Ask the handler to stop.
549                             tx.send(()).unwrap_or_else(|_| panic!("queue handler channel closed early"));
550                             // Wait for its return value.
551                             remote_handle
552                         });
553 
554                         // If there was already a handler for this index, stop it before adding the
555                         // new handler future.
556                         if let Some(stop_fn) = old_stop_fn {
557                             warn!("Starting new queue handler without stopping old handler");
558                             // Unfortunately we can't just do `stop_fn().await` because the actual
559                             // work we are waiting on is in `queue_handlers`. So, run both.
560                             let mut fut = stop_fn().fuse();
561                             loop {
562                                 futures::select! {
563                                     _ = queue_handlers.next() => continue,
564                                     _queue = fut => break,
565                                 }
566                             }
567                         }
568 
569                         queue_handlers.push(handle_queue_future);
570                     }
571                     Some(WorkerCmd::StopQueue{index, response_tx}) => {
572                         match queue_handler_stop_fns.remove(&index) {
573                             Some(stop_fn) => {
574                                 // NOTE: This await is blocking the select loop. If we want to
575                                 // support stopping queues concurrently, then it needs to be moved.
576                                 // For now, keep it simple.
577                                 //
578                                 // Unfortunately we can't just do `stop_fn().await` because the
579                                 // actual work we are waiting on is in `queue_handlers`. So, run
580                                 // both.
581                                 let mut fut = stop_fn().fuse();
582                                 let queue = loop {
583                                     futures::select! {
584                                         _ = queue_handlers.next() => continue,
585                                         queue = fut => break queue,
586                                     }
587                                 };
588 
589                                 // If this is the last queue, drop references to the interrupt so
590                                 // that, when queues are started up again, we'll use the new
591                                 // interrupt passed with the first queue.
592                                 if queue_handlers.is_empty() {
593                                     resample_future.set(std::future::pending().fuse().left_future());
594                                     *control_interrupt.borrow_mut() = None;
595                                 }
596 
597                                 let _ = response_tx.send(Some(queue));
598                             }
599                             None => { let _ = response_tx.send(None); },
600                         }
601 
602                     }
603                     Some(WorkerCmd::AbortQueues{response_tx}) => {
604                         queue_handlers.clear();
605                         queue_handler_stop_fns.clear();
606 
607                         resample_future.set(std::future::pending().fuse().left_future());
608                         *control_interrupt.borrow_mut() = None;
609 
610                         let _ = response_tx.send(());
611                     }
612                 }
613             }
614         };
615     }
616 }
617 
618 /// Virtio device for exposing block level read/write operations on a host file.
619 pub struct BlockAsync {
620     // We need to make boot_index public bc the field is used by the main crate to determine boot
621     // order
622     boot_index: Option<usize>,
623     // `None` iff `self.worker_per_queue == false` and the worker thread is running.
624     disk_image: Option<Box<dyn DiskFile>>,
625     disk_size: Arc<AtomicU64>,
626     avail_features: u64,
627     read_only: bool,
628     sparse: bool,
629     seg_max: u32,
630     block_size: u32,
631     id: Option<BlockId>,
632     control_tube: Option<Tube>,
633     queue_sizes: Vec<u16>,
634     pub(super) executor_kind: ExecutorKind,
635     // If `worker_per_queue == true`, `worker_threads` contains the worker for each running queue
636     // by index. Otherwise, contains the monolithic worker for all queues at index 0.
637     //
638     // Once a thread is started, we never stop it, except when `BlockAsync` itself is dropped. That
639     // is because we cannot easily convert the `AsyncDisk` back to a `DiskFile` when backed by
640     // Overlapped I/O on Windows because the file becomes permanently associated with the IOCP
641     // instance of the async executor.
642     worker_threads: BTreeMap<usize, (WorkerThread<()>, mpsc::UnboundedSender<WorkerCmd>)>,
643     shared_state: Arc<AsyncRwLock<WorkerSharedState>>,
644     // Whether to run worker threads in parallel for each queue
645     worker_per_queue: bool,
646     // Indices of running queues.
647     // TODO: The worker already tracks this. Only need it here to stop queues on sleep. Maybe add a
648     // worker cmd to stop all at once, then we can delete this field.
649     activated_queues: BTreeSet<usize>,
650     #[cfg(windows)]
651     pub(super) io_concurrency: u32,
652     pci_address: Option<PciAddress>,
653 }
654 
655 impl BlockAsync {
656     /// Create a new virtio block device that operates on the given AsyncDisk.
new( base_features: u64, disk_image: Box<dyn DiskFile>, disk_option: &DiskOption, control_tube: Option<Tube>, queue_size: Option<u16>, num_queues: Option<u16>, ) -> SysResult<BlockAsync>657     pub fn new(
658         base_features: u64,
659         disk_image: Box<dyn DiskFile>,
660         disk_option: &DiskOption,
661         control_tube: Option<Tube>,
662         queue_size: Option<u16>,
663         num_queues: Option<u16>,
664     ) -> SysResult<BlockAsync> {
665         let read_only = disk_option.read_only;
666         let sparse = disk_option.sparse;
667         let block_size = disk_option.block_size;
668         let packed_queue = disk_option.packed_queue;
669         let id = disk_option.id;
670         let mut worker_per_queue = disk_option.multiple_workers;
671         // Automatically disable multiple workers if the disk image can't be cloned.
672         if worker_per_queue && disk_image.try_clone().is_err() {
673             base::warn!("multiple workers requested, but not supported by disk image type");
674             worker_per_queue = false;
675         }
676         let executor_kind = disk_option.async_executor.unwrap_or_default();
677         let boot_index = disk_option.bootindex;
678         #[cfg(windows)]
679         let io_concurrency = disk_option.io_concurrency.get();
680 
681         if block_size % SECTOR_SIZE as u32 != 0 {
682             error!(
683                 "Block size {} is not a multiple of {}.",
684                 block_size, SECTOR_SIZE,
685             );
686             return Err(SysError::new(libc::EINVAL));
687         }
688         let disk_size = disk_image.get_len()?;
689         if disk_size % block_size as u64 != 0 {
690             warn!(
691                 "Disk size {} is not a multiple of block size {}; \
692                  the remainder will not be visible to the guest.",
693                 disk_size, block_size,
694             );
695         }
696         let num_queues = num_queues.unwrap_or(DEFAULT_NUM_QUEUES);
697         let multi_queue = match num_queues {
698             0 => panic!("Number of queues cannot be zero for a block device"),
699             1 => false,
700             _ => true,
701         };
702         let q_size = queue_size.unwrap_or(DEFAULT_QUEUE_SIZE);
703         if !q_size.is_power_of_two() {
704             error!("queue size {} is not a power of 2.", q_size);
705             return Err(SysError::new(libc::EINVAL));
706         }
707         let queue_sizes = vec![q_size; num_queues as usize];
708 
709         let avail_features =
710             Self::build_avail_features(base_features, read_only, sparse, multi_queue, packed_queue);
711 
712         let seg_max = get_seg_max(q_size);
713 
714         let disk_size = Arc::new(AtomicU64::new(disk_size));
715         let shared_state = Arc::new(AsyncRwLock::new(WorkerSharedState {
716             disk_size: disk_size.clone(),
717         }));
718 
719         Ok(BlockAsync {
720             disk_image: Some(disk_image),
721             disk_size,
722             avail_features,
723             read_only,
724             sparse,
725             seg_max,
726             block_size,
727             id,
728             queue_sizes,
729             worker_threads: BTreeMap::new(),
730             shared_state,
731             worker_per_queue,
732             control_tube,
733             executor_kind,
734             activated_queues: BTreeSet::new(),
735             boot_index,
736             #[cfg(windows)]
737             io_concurrency,
738             pci_address: disk_option.pci_address,
739         })
740     }
741 
742     /// Returns the feature flags given the specified attributes.
build_avail_features( base_features: u64, read_only: bool, sparse: bool, multi_queue: bool, packed_queue: bool, ) -> u64743     fn build_avail_features(
744         base_features: u64,
745         read_only: bool,
746         sparse: bool,
747         multi_queue: bool,
748         packed_queue: bool,
749     ) -> u64 {
750         let mut avail_features = base_features;
751         if read_only {
752             avail_features |= 1 << VIRTIO_BLK_F_RO;
753         } else {
754             if sparse {
755                 avail_features |= 1 << VIRTIO_BLK_F_DISCARD;
756             }
757             avail_features |= 1 << VIRTIO_BLK_F_FLUSH;
758             avail_features |= 1 << VIRTIO_BLK_F_WRITE_ZEROES;
759         }
760         avail_features |= 1 << VIRTIO_BLK_F_SEG_MAX;
761         avail_features |= 1 << VIRTIO_BLK_F_BLK_SIZE;
762         if multi_queue {
763             avail_features |= 1 << VIRTIO_BLK_F_MQ;
764         }
765         if packed_queue {
766             avail_features |= 1 << VIRTIO_F_RING_PACKED;
767         }
768         avail_features
769     }
770 
771     // Execute a single block device request.
772     // `writer` includes the data region only; the status byte is not included.
773     // It is up to the caller to convert the result of this function into a status byte
774     // and write it to the expected location in guest memory.
execute_request( reader: &mut Reader, writer: &mut Writer, disk_state: &AsyncRwLock<DiskState>, flush_timer: &RefCell<TimerAsync<Timer>>, flush_timer_armed: &RefCell<bool>, ) -> result::Result<(), ExecuteError>775     async fn execute_request(
776         reader: &mut Reader,
777         writer: &mut Writer,
778         disk_state: &AsyncRwLock<DiskState>,
779         flush_timer: &RefCell<TimerAsync<Timer>>,
780         flush_timer_armed: &RefCell<bool>,
781     ) -> result::Result<(), ExecuteError> {
782         // Acquire immutable access to prevent tasks from resizing disk.
783         let disk_state = disk_state.read_lock().await;
784         // Acquire immutable access to prevent other worker threads from resizing disk.
785         let worker_shared_state = disk_state.worker_shared_state.read_lock().await;
786 
787         let req_header: virtio_blk_req_header = reader.read_obj().map_err(ExecuteError::Read)?;
788 
789         let req_type = req_header.req_type.to_native();
790         let sector = req_header.sector.to_native();
791 
792         if disk_state.read_only && req_type != VIRTIO_BLK_T_IN && req_type != VIRTIO_BLK_T_GET_ID {
793             return Err(ExecuteError::ReadOnly {
794                 request_type: req_type,
795             });
796         }
797 
798         /// Check that a request accesses only data within the disk's current size.
799         /// All parameters are in units of bytes.
800         fn check_range(
801             io_start: u64,
802             io_length: u64,
803             disk_size: u64,
804         ) -> result::Result<(), ExecuteError> {
805             let io_end = io_start
806                 .checked_add(io_length)
807                 .ok_or(ExecuteError::OutOfRange)?;
808             if io_end > disk_size {
809                 Err(ExecuteError::OutOfRange)
810             } else {
811                 Ok(())
812             }
813         }
814 
815         let disk_size = worker_shared_state.disk_size.load(Ordering::Relaxed);
816         match req_type {
817             VIRTIO_BLK_T_IN => {
818                 let data_len = writer.available_bytes();
819                 if data_len == 0 {
820                     return Ok(());
821                 }
822                 let offset = sector
823                     .checked_shl(u32::from(SECTOR_SHIFT))
824                     .ok_or(ExecuteError::OutOfRange)?;
825                 let _trace = cros_tracing::trace_event!(VirtioBlk, "in", offset, data_len);
826                 check_range(offset, data_len as u64, disk_size)?;
827                 let disk_image = &disk_state.disk_image;
828                 writer
829                     .write_all_from_at_fut(&**disk_image, data_len, offset)
830                     .await
831                     .map_err(|desc_error| ExecuteError::ReadIo {
832                         length: data_len,
833                         sector,
834                         desc_error,
835                     })?;
836             }
837             VIRTIO_BLK_T_OUT => {
838                 let data_len = reader.available_bytes();
839                 if data_len == 0 {
840                     return Ok(());
841                 }
842                 let offset = sector
843                     .checked_shl(u32::from(SECTOR_SHIFT))
844                     .ok_or(ExecuteError::OutOfRange)?;
845                 let _trace = cros_tracing::trace_event!(VirtioBlk, "out", offset, data_len);
846                 check_range(offset, data_len as u64, disk_size)?;
847                 let disk_image = &disk_state.disk_image;
848                 reader
849                     .read_exact_to_at_fut(&**disk_image, data_len, offset)
850                     .await
851                     .map_err(|desc_error| ExecuteError::WriteIo {
852                         length: data_len,
853                         sector,
854                         desc_error,
855                     })?;
856 
857                 if !*flush_timer_armed.borrow() {
858                     *flush_timer_armed.borrow_mut() = true;
859 
860                     let flush_delay = Duration::from_secs(60);
861                     flush_timer
862                         .borrow_mut()
863                         .reset_oneshot(flush_delay)
864                         .map_err(ExecuteError::TimerReset)?;
865                 }
866             }
867             VIRTIO_BLK_T_DISCARD | VIRTIO_BLK_T_WRITE_ZEROES => {
868                 #[allow(clippy::if_same_then_else)]
869                 let _trace = if req_type == VIRTIO_BLK_T_DISCARD {
870                     cros_tracing::trace_event!(VirtioBlk, "discard")
871                 } else {
872                     cros_tracing::trace_event!(VirtioBlk, "write_zeroes")
873                 };
874                 if req_type == VIRTIO_BLK_T_DISCARD && !disk_state.sparse {
875                     // Discard is a hint; if this is a non-sparse disk, just ignore it.
876                     return Ok(());
877                 }
878 
879                 while reader.available_bytes() >= size_of::<virtio_blk_discard_write_zeroes>() {
880                     let seg: virtio_blk_discard_write_zeroes =
881                         reader.read_obj().map_err(ExecuteError::Read)?;
882 
883                     let sector = seg.sector.to_native();
884                     let num_sectors = seg.num_sectors.to_native();
885                     let flags = seg.flags.to_native();
886 
887                     let valid_flags = if req_type == VIRTIO_BLK_T_WRITE_ZEROES {
888                         VIRTIO_BLK_DISCARD_WRITE_ZEROES_FLAG_UNMAP
889                     } else {
890                         0
891                     };
892 
893                     if (flags & !valid_flags) != 0 {
894                         return Err(ExecuteError::DiscardWriteZeroes {
895                             ioerr: None,
896                             sector,
897                             num_sectors,
898                             flags,
899                         });
900                     }
901 
902                     let offset = sector
903                         .checked_shl(u32::from(SECTOR_SHIFT))
904                         .ok_or(ExecuteError::OutOfRange)?;
905                     let length = u64::from(num_sectors)
906                         .checked_shl(u32::from(SECTOR_SHIFT))
907                         .ok_or(ExecuteError::OutOfRange)?;
908                     check_range(offset, length, disk_size)?;
909 
910                     if req_type == VIRTIO_BLK_T_DISCARD {
911                         // Since Discard is just a hint and some filesystems may not implement
912                         // FALLOC_FL_PUNCH_HOLE, ignore punch_hole errors.
913                         let _ = disk_state.disk_image.punch_hole(offset, length).await;
914                     } else {
915                         disk_state
916                             .disk_image
917                             .write_zeroes_at(offset, length)
918                             .await
919                             .map_err(|e| ExecuteError::DiscardWriteZeroes {
920                                 ioerr: Some(e),
921                                 sector,
922                                 num_sectors,
923                                 flags,
924                             })?;
925                     }
926                 }
927             }
928             VIRTIO_BLK_T_FLUSH => {
929                 let _trace = cros_tracing::trace_event!(VirtioBlk, "flush");
930                 disk_state
931                     .disk_image
932                     .fdatasync()
933                     .await
934                     .map_err(ExecuteError::Flush)?;
935 
936                 if *flush_timer_armed.borrow() {
937                     flush_timer
938                         .borrow_mut()
939                         .clear()
940                         .map_err(ExecuteError::TimerReset)?;
941                     *flush_timer_armed.borrow_mut() = false;
942                 }
943             }
944             VIRTIO_BLK_T_GET_ID => {
945                 let _trace = cros_tracing::trace_event!(VirtioBlk, "get_id");
946                 if let Some(id) = disk_state.id {
947                     writer.write_all(&id).map_err(ExecuteError::CopyId)?;
948                 } else {
949                     return Err(ExecuteError::Unsupported(req_type));
950                 }
951             }
952             t => return Err(ExecuteError::Unsupported(t)),
953         };
954         Ok(())
955     }
956 
957     /// Builds and returns the config structure used to specify block features.
build_config_space( disk_size: u64, seg_max: u32, block_size: u32, num_queues: u16, ) -> virtio_blk_config958     fn build_config_space(
959         disk_size: u64,
960         seg_max: u32,
961         block_size: u32,
962         num_queues: u16,
963     ) -> virtio_blk_config {
964         virtio_blk_config {
965             // If the image is not a multiple of the sector size, the tail bits are not exposed.
966             capacity: Le64::from(disk_size >> SECTOR_SHIFT),
967             seg_max: Le32::from(seg_max),
968             blk_size: Le32::from(block_size),
969             num_queues: Le16::from(num_queues),
970             max_discard_sectors: Le32::from(MAX_DISCARD_SECTORS),
971             discard_sector_alignment: Le32::from(DISCARD_SECTOR_ALIGNMENT),
972             max_write_zeroes_sectors: Le32::from(MAX_WRITE_ZEROES_SECTORS),
973             write_zeroes_may_unmap: 1,
974             max_discard_seg: Le32::from(MAX_DISCARD_SEG),
975             max_write_zeroes_seg: Le32::from(MAX_WRITE_ZEROES_SEG),
976             ..Default::default()
977         }
978     }
979 
980     /// Get the worker for a queue, starting it if necessary.
981     // NOTE: Can't use `BTreeMap::entry` because it requires an exclusive ref for the whole branch.
982     #[allow(clippy::map_entry)]
start_worker( &mut self, idx: usize, ) -> anyhow::Result<&(WorkerThread<()>, mpsc::UnboundedSender<WorkerCmd>)>983     fn start_worker(
984         &mut self,
985         idx: usize,
986     ) -> anyhow::Result<&(WorkerThread<()>, mpsc::UnboundedSender<WorkerCmd>)> {
987         let key = if self.worker_per_queue { idx } else { 0 };
988         if self.worker_threads.contains_key(&key) {
989             return Ok(self.worker_threads.get(&key).unwrap());
990         }
991 
992         let ex = self.create_executor();
993         let control_tube = self.control_tube.take();
994         let disk_image = if self.worker_per_queue {
995             self.disk_image
996                 .as_ref()
997                 .context("Failed to ref a disk image")?
998                 .try_clone()
999                 .context("Failed to clone a disk image")?
1000         } else {
1001             self.disk_image
1002                 .take()
1003                 .context("Failed to take a disk image")?
1004         };
1005         let read_only = self.read_only;
1006         let sparse = self.sparse;
1007         let id = self.id;
1008         let worker_shared_state = self.shared_state.clone();
1009 
1010         let (worker_tx, worker_rx) = mpsc::unbounded();
1011         let worker_thread = WorkerThread::start("virtio_blk", move |kill_evt| {
1012             let async_control =
1013                 control_tube.map(|c| AsyncTube::new(&ex, c).expect("failed to create async tube"));
1014 
1015             let async_image = match disk_image.to_async_disk(&ex) {
1016                 Ok(d) => d,
1017                 Err(e) => panic!("Failed to create async disk {:#}", e),
1018             };
1019 
1020             let disk_state = Rc::new(AsyncRwLock::new(DiskState {
1021                 disk_image: async_image,
1022                 read_only,
1023                 sparse,
1024                 id,
1025                 worker_shared_state,
1026             }));
1027 
1028             if let Err(err_string) = ex
1029                 .run_until(async {
1030                     let r = run_worker(&ex, &disk_state, &async_control, worker_rx, kill_evt).await;
1031                     // Flush any in-memory disk image state to file.
1032                     if let Err(e) = disk_state.lock().await.disk_image.flush().await {
1033                         error!("failed to flush disk image when stopping worker: {e:?}");
1034                     }
1035                     r
1036                 })
1037                 .expect("run_until failed")
1038             {
1039                 error!("{:#}", err_string);
1040             }
1041         });
1042         match self.worker_threads.entry(key) {
1043             std::collections::btree_map::Entry::Occupied(_) => unreachable!(),
1044             std::collections::btree_map::Entry::Vacant(e) => {
1045                 Ok(e.insert((worker_thread, worker_tx)))
1046             }
1047         }
1048     }
1049 
start_queue( &mut self, idx: usize, queue: Queue, _mem: GuestMemory, ) -> anyhow::Result<()>1050     pub fn start_queue(
1051         &mut self,
1052         idx: usize,
1053         queue: Queue,
1054         _mem: GuestMemory,
1055     ) -> anyhow::Result<()> {
1056         let (_, worker_tx) = self.start_worker(idx)?;
1057         worker_tx
1058             .unbounded_send(WorkerCmd::StartQueue { index: idx, queue })
1059             .expect("worker channel closed early");
1060         self.activated_queues.insert(idx);
1061         Ok(())
1062     }
1063 
stop_queue(&mut self, idx: usize) -> anyhow::Result<Queue>1064     pub fn stop_queue(&mut self, idx: usize) -> anyhow::Result<Queue> {
1065         // TODO: Consider stopping the worker thread if this is the last queue managed by it. Then,
1066         // simplify `virtio_sleep` and/or `reset` methods.
1067         let (_, worker_tx) = self
1068             .worker_threads
1069             .get(if self.worker_per_queue { &idx } else { &0 })
1070             .context("worker not found")?;
1071         let (response_tx, response_rx) = oneshot::channel();
1072         worker_tx
1073             .unbounded_send(WorkerCmd::StopQueue {
1074                 index: idx,
1075                 response_tx,
1076             })
1077             .expect("worker channel closed early");
1078         let queue = cros_async::block_on(async {
1079             response_rx
1080                 .await
1081                 .expect("response_rx closed early")
1082                 .context("queue not found")
1083         })?;
1084         self.activated_queues.remove(&idx);
1085         Ok(queue)
1086     }
1087 }
1088 
1089 impl VirtioDevice for BlockAsync {
keep_rds(&self) -> Vec<RawDescriptor>1090     fn keep_rds(&self) -> Vec<RawDescriptor> {
1091         let mut keep_rds = Vec::new();
1092 
1093         if let Some(disk_image) = &self.disk_image {
1094             keep_rds.extend(disk_image.as_raw_descriptors());
1095         }
1096 
1097         if let Some(control_tube) = &self.control_tube {
1098             keep_rds.push(control_tube.as_raw_descriptor());
1099         }
1100 
1101         keep_rds
1102     }
1103 
features(&self) -> u641104     fn features(&self) -> u64 {
1105         self.avail_features
1106     }
1107 
device_type(&self) -> DeviceType1108     fn device_type(&self) -> DeviceType {
1109         DeviceType::Block
1110     }
1111 
queue_max_sizes(&self) -> &[u16]1112     fn queue_max_sizes(&self) -> &[u16] {
1113         &self.queue_sizes
1114     }
1115 
read_config(&self, offset: u64, data: &mut [u8])1116     fn read_config(&self, offset: u64, data: &mut [u8]) {
1117         let config_space = {
1118             let disk_size = self.disk_size.load(Ordering::Acquire);
1119             Self::build_config_space(
1120                 disk_size,
1121                 self.seg_max,
1122                 self.block_size,
1123                 self.queue_sizes.len() as u16,
1124             )
1125         };
1126         copy_config(data, 0, config_space.as_bytes(), offset);
1127     }
1128 
activate( &mut self, mem: GuestMemory, _interrupt: Interrupt, queues: BTreeMap<usize, Queue>, ) -> anyhow::Result<()>1129     fn activate(
1130         &mut self,
1131         mem: GuestMemory,
1132         _interrupt: Interrupt,
1133         queues: BTreeMap<usize, Queue>,
1134     ) -> anyhow::Result<()> {
1135         for (i, q) in queues {
1136             self.start_queue(i, q, mem.clone())?;
1137         }
1138         Ok(())
1139     }
1140 
reset(&mut self) -> anyhow::Result<()>1141     fn reset(&mut self) -> anyhow::Result<()> {
1142         for (_, (_, worker_tx)) in self.worker_threads.iter_mut() {
1143             let (response_tx, response_rx) = oneshot::channel();
1144             worker_tx
1145                 .unbounded_send(WorkerCmd::AbortQueues { response_tx })
1146                 .expect("worker channel closed early");
1147             cros_async::block_on(async { response_rx.await.expect("response_rx closed early") });
1148         }
1149         self.activated_queues.clear();
1150         Ok(())
1151     }
1152 
virtio_sleep(&mut self) -> anyhow::Result<Option<BTreeMap<usize, Queue>>>1153     fn virtio_sleep(&mut self) -> anyhow::Result<Option<BTreeMap<usize, Queue>>> {
1154         // Reclaim the queues from workers.
1155         let mut queues = BTreeMap::new();
1156         for index in self.activated_queues.clone() {
1157             queues.insert(index, self.stop_queue(index)?);
1158         }
1159         if queues.is_empty() {
1160             return Ok(None); // Not activated.
1161         }
1162         Ok(Some(queues))
1163     }
1164 
virtio_wake( &mut self, queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>, ) -> anyhow::Result<()>1165     fn virtio_wake(
1166         &mut self,
1167         queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>,
1168     ) -> anyhow::Result<()> {
1169         if let Some((mem, _interrupt, queues)) = queues_state {
1170             for (i, q) in queues {
1171                 self.start_queue(i, q, mem.clone())?
1172             }
1173         }
1174         Ok(())
1175     }
1176 
virtio_snapshot(&mut self) -> anyhow::Result<serde_json::Value>1177     fn virtio_snapshot(&mut self) -> anyhow::Result<serde_json::Value> {
1178         // `virtio_sleep` ensures there is no pending state, except for the `Queue`s, which are
1179         // handled at a higher layer.
1180         Ok(serde_json::Value::Null)
1181     }
1182 
virtio_restore(&mut self, data: serde_json::Value) -> anyhow::Result<()>1183     fn virtio_restore(&mut self, data: serde_json::Value) -> anyhow::Result<()> {
1184         anyhow::ensure!(
1185             data == serde_json::Value::Null,
1186             "unexpected snapshot data: should be null, got {}",
1187             data,
1188         );
1189         Ok(())
1190     }
1191 
pci_address(&self) -> Option<PciAddress>1192     fn pci_address(&self) -> Option<PciAddress> {
1193         self.pci_address
1194     }
1195 
bootorder_fw_cfg(&self, pci_slot: u8) -> Option<(Vec<u8>, usize)>1196     fn bootorder_fw_cfg(&self, pci_slot: u8) -> Option<(Vec<u8>, usize)> {
1197         self.boot_index
1198             .map(|s| (format!("scsi@{}/disk@0,0", pci_slot).as_bytes().to_vec(), s))
1199     }
1200 }
1201 
1202 #[cfg(test)]
1203 mod tests {
1204     use std::fs::File;
1205     use std::mem::size_of_val;
1206     use std::sync::atomic::AtomicU64;
1207 
1208     use data_model::Le32;
1209     use data_model::Le64;
1210     use disk::SingleFileDisk;
1211     use hypervisor::ProtectionType;
1212     use tempfile::tempfile;
1213     use tempfile::TempDir;
1214     use vm_memory::GuestAddress;
1215 
1216     use super::*;
1217     use crate::suspendable_virtio_tests;
1218     use crate::virtio::base_features;
1219     use crate::virtio::descriptor_utils::create_descriptor_chain;
1220     use crate::virtio::descriptor_utils::DescriptorType;
1221     use crate::virtio::QueueConfig;
1222 
1223     #[test]
read_size()1224     fn read_size() {
1225         let f = tempfile().unwrap();
1226         f.set_len(0x1000).unwrap();
1227 
1228         let features = base_features(ProtectionType::Unprotected);
1229         let disk_option = DiskOption::default();
1230         let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1231         let mut num_sectors = [0u8; 4];
1232         b.read_config(0, &mut num_sectors);
1233         // size is 0x1000, so num_sectors is 8 (4096/512).
1234         assert_eq!([0x08, 0x00, 0x00, 0x00], num_sectors);
1235         let mut msw_sectors = [0u8; 4];
1236         b.read_config(4, &mut msw_sectors);
1237         // size is 0x1000, so msw_sectors is 0.
1238         assert_eq!([0x00, 0x00, 0x00, 0x00], msw_sectors);
1239     }
1240 
1241     #[test]
read_block_size()1242     fn read_block_size() {
1243         let f = tempfile().unwrap();
1244         f.set_len(0x1000).unwrap();
1245 
1246         let features = base_features(ProtectionType::Unprotected);
1247         let disk_option = DiskOption {
1248             block_size: 4096,
1249             sparse: false,
1250             ..Default::default()
1251         };
1252         let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1253         let mut blk_size = [0u8; 4];
1254         b.read_config(20, &mut blk_size);
1255         // blk_size should be 4096 (0x1000).
1256         assert_eq!([0x00, 0x10, 0x00, 0x00], blk_size);
1257     }
1258 
1259     #[test]
read_features()1260     fn read_features() {
1261         let tempdir = TempDir::new().unwrap();
1262         let mut path = tempdir.path().to_owned();
1263         path.push("disk_image");
1264 
1265         // Feature bits 0-23 and 50-127 are specific for the device type, but
1266         // at the moment crosvm only supports 64 bits of feature bits.
1267         const DEVICE_FEATURE_BITS: u64 = 0xffffff;
1268 
1269         // read-write block device
1270         {
1271             let f = File::create(&path).unwrap();
1272             let features = base_features(ProtectionType::Unprotected);
1273             let disk_option = DiskOption::default();
1274             let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1275             // writable device should set VIRTIO_BLK_F_FLUSH + VIRTIO_BLK_F_DISCARD
1276             // + VIRTIO_BLK_F_WRITE_ZEROES + VIRTIO_BLK_F_BLK_SIZE + VIRTIO_BLK_F_SEG_MAX
1277             // + VIRTIO_BLK_F_MQ
1278             assert_eq!(0x7244, b.features() & DEVICE_FEATURE_BITS);
1279         }
1280 
1281         // read-write block device, non-sparse
1282         {
1283             let f = File::create(&path).unwrap();
1284             let features = base_features(ProtectionType::Unprotected);
1285             let disk_option = DiskOption {
1286                 sparse: false,
1287                 ..Default::default()
1288             };
1289             let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1290             // writable device should set VIRTIO_F_FLUSH + VIRTIO_BLK_F_RO
1291             // + VIRTIO_BLK_F_BLK_SIZE + VIRTIO_BLK_F_SEG_MAX + VIRTIO_BLK_F_MQ
1292             assert_eq!(0x5244, b.features() & DEVICE_FEATURE_BITS);
1293         }
1294 
1295         // read-only block device
1296         {
1297             let f = File::create(&path).unwrap();
1298             let features = base_features(ProtectionType::Unprotected);
1299             let disk_option = DiskOption {
1300                 read_only: true,
1301                 ..Default::default()
1302             };
1303             let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1304             // read-only device should set VIRTIO_BLK_F_RO
1305             // + VIRTIO_BLK_F_BLK_SIZE + VIRTIO_BLK_F_SEG_MAX + VIRTIO_BLK_F_MQ
1306             assert_eq!(0x1064, b.features() & DEVICE_FEATURE_BITS);
1307         }
1308     }
1309 
1310     #[test]
check_pci_adress_configurability()1311     fn check_pci_adress_configurability() {
1312         let f = tempfile().unwrap();
1313 
1314         let features = base_features(ProtectionType::Unprotected);
1315         let disk_option = DiskOption {
1316             pci_address: Some(PciAddress {
1317                 bus: 0,
1318                 dev: 1,
1319                 func: 1,
1320             }),
1321             ..Default::default()
1322         };
1323         let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1324 
1325         assert_eq!(b.pci_address(), disk_option.pci_address);
1326     }
1327 
1328     #[test]
check_runtime_blk_queue_configurability()1329     fn check_runtime_blk_queue_configurability() {
1330         let tempdir = TempDir::new().unwrap();
1331         let mut path = tempdir.path().to_owned();
1332         path.push("disk_image");
1333         let features = base_features(ProtectionType::Unprotected);
1334 
1335         // Default case
1336         let f = File::create(&path).unwrap();
1337         let disk_option = DiskOption::default();
1338         let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1339         assert_eq!(
1340             [DEFAULT_QUEUE_SIZE; DEFAULT_NUM_QUEUES as usize],
1341             b.queue_max_sizes()
1342         );
1343 
1344         // Single queue of size 128
1345         let f = File::create(&path).unwrap();
1346         let disk_option = DiskOption::default();
1347         let b = BlockAsync::new(
1348             features,
1349             Box::new(f),
1350             &disk_option,
1351             None,
1352             Some(128),
1353             Some(1),
1354         )
1355         .unwrap();
1356         assert_eq!([128; 1], b.queue_max_sizes());
1357         // Single queue device should not set VIRTIO_BLK_F_MQ
1358         assert_eq!(0, b.features() & (1 << VIRTIO_BLK_F_MQ) as u64);
1359     }
1360 
1361     #[test]
read_last_sector()1362     fn read_last_sector() {
1363         let ex = Executor::new().expect("creating an executor failed");
1364 
1365         let f = tempfile().unwrap();
1366         let disk_size = 0x1000;
1367         f.set_len(disk_size).unwrap();
1368         let af = SingleFileDisk::new(f, &ex).expect("Failed to create SFD");
1369 
1370         let mem = Rc::new(
1371             GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1372                 .expect("Creating guest memory failed."),
1373         );
1374 
1375         let req_hdr = virtio_blk_req_header {
1376             req_type: Le32::from(VIRTIO_BLK_T_IN),
1377             reserved: Le32::from(0),
1378             sector: Le64::from(7), // Disk is 8 sectors long, so this is the last valid sector.
1379         };
1380         mem.write_obj_at_addr(req_hdr, GuestAddress(0x1000))
1381             .expect("writing req failed");
1382 
1383         let mut avail_desc = create_descriptor_chain(
1384             &mem,
1385             GuestAddress(0x100),  // Place descriptor chain at 0x100.
1386             GuestAddress(0x1000), // Describe buffer at 0x1000.
1387             vec![
1388                 // Request header
1389                 (DescriptorType::Readable, size_of_val(&req_hdr) as u32),
1390                 // I/O buffer (1 sector of data)
1391                 (DescriptorType::Writable, 512),
1392                 // Request status
1393                 (DescriptorType::Writable, 1),
1394             ],
1395             0,
1396         )
1397         .expect("create_descriptor_chain failed");
1398 
1399         let timer = Timer::new().expect("Failed to create a timer");
1400         let flush_timer = Rc::new(RefCell::new(
1401             TimerAsync::new(timer, &ex).expect("Failed to create an async timer"),
1402         ));
1403         let flush_timer_armed = Rc::new(RefCell::new(false));
1404 
1405         let disk_state = Rc::new(AsyncRwLock::new(DiskState {
1406             disk_image: Box::new(af),
1407             read_only: false,
1408             sparse: true,
1409             id: None,
1410             worker_shared_state: Arc::new(AsyncRwLock::new(WorkerSharedState {
1411                 disk_size: Arc::new(AtomicU64::new(disk_size)),
1412             })),
1413         }));
1414 
1415         let fut = process_one_request(
1416             &mut avail_desc,
1417             &disk_state,
1418             &flush_timer,
1419             &flush_timer_armed,
1420         );
1421 
1422         ex.run_until(fut)
1423             .expect("running executor failed")
1424             .expect("execute failed");
1425 
1426         let status_offset = GuestAddress((0x1000 + size_of_val(&req_hdr) + 512) as u64);
1427         let status = mem.read_obj_from_addr::<u8>(status_offset).unwrap();
1428         assert_eq!(status, VIRTIO_BLK_S_OK);
1429     }
1430 
1431     #[test]
read_beyond_last_sector()1432     fn read_beyond_last_sector() {
1433         let f = tempfile().unwrap();
1434         let disk_size = 0x1000;
1435         f.set_len(disk_size).unwrap();
1436         let mem = Rc::new(
1437             GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1438                 .expect("Creating guest memory failed."),
1439         );
1440 
1441         let req_hdr = virtio_blk_req_header {
1442             req_type: Le32::from(VIRTIO_BLK_T_IN),
1443             reserved: Le32::from(0),
1444             sector: Le64::from(7), // Disk is 8 sectors long, so this is the last valid sector.
1445         };
1446         mem.write_obj_at_addr(req_hdr, GuestAddress(0x1000))
1447             .expect("writing req failed");
1448 
1449         let mut avail_desc = create_descriptor_chain(
1450             &mem,
1451             GuestAddress(0x100),  // Place descriptor chain at 0x100.
1452             GuestAddress(0x1000), // Describe buffer at 0x1000.
1453             vec![
1454                 // Request header
1455                 (DescriptorType::Readable, size_of_val(&req_hdr) as u32),
1456                 // I/O buffer (2 sectors of data - overlap the end of the disk).
1457                 (DescriptorType::Writable, 512 * 2),
1458                 // Request status
1459                 (DescriptorType::Writable, 1),
1460             ],
1461             0,
1462         )
1463         .expect("create_descriptor_chain failed");
1464 
1465         let ex = Executor::new().expect("creating an executor failed");
1466 
1467         let af = SingleFileDisk::new(f, &ex).expect("Failed to create SFD");
1468         let timer = Timer::new().expect("Failed to create a timer");
1469         let flush_timer = Rc::new(RefCell::new(
1470             TimerAsync::new(timer, &ex).expect("Failed to create an async timer"),
1471         ));
1472         let flush_timer_armed = Rc::new(RefCell::new(false));
1473         let disk_state = Rc::new(AsyncRwLock::new(DiskState {
1474             disk_image: Box::new(af),
1475             read_only: false,
1476             sparse: true,
1477             id: None,
1478             worker_shared_state: Arc::new(AsyncRwLock::new(WorkerSharedState {
1479                 disk_size: Arc::new(AtomicU64::new(disk_size)),
1480             })),
1481         }));
1482 
1483         let fut = process_one_request(
1484             &mut avail_desc,
1485             &disk_state,
1486             &flush_timer,
1487             &flush_timer_armed,
1488         );
1489 
1490         ex.run_until(fut)
1491             .expect("running executor failed")
1492             .expect("execute failed");
1493 
1494         let status_offset = GuestAddress((0x1000 + size_of_val(&req_hdr) + 512 * 2) as u64);
1495         let status = mem.read_obj_from_addr::<u8>(status_offset).unwrap();
1496         assert_eq!(status, VIRTIO_BLK_S_IOERR);
1497     }
1498 
1499     #[test]
get_id()1500     fn get_id() {
1501         let ex = Executor::new().expect("creating an executor failed");
1502 
1503         let f = tempfile().unwrap();
1504         let disk_size = 0x1000;
1505         f.set_len(disk_size).unwrap();
1506 
1507         let mem = GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1508             .expect("Creating guest memory failed.");
1509 
1510         let req_hdr = virtio_blk_req_header {
1511             req_type: Le32::from(VIRTIO_BLK_T_GET_ID),
1512             reserved: Le32::from(0),
1513             sector: Le64::from(0),
1514         };
1515         mem.write_obj_at_addr(req_hdr, GuestAddress(0x1000))
1516             .expect("writing req failed");
1517 
1518         let mut avail_desc = create_descriptor_chain(
1519             &mem,
1520             GuestAddress(0x100),  // Place descriptor chain at 0x100.
1521             GuestAddress(0x1000), // Describe buffer at 0x1000.
1522             vec![
1523                 // Request header
1524                 (DescriptorType::Readable, size_of_val(&req_hdr) as u32),
1525                 // I/O buffer (20 bytes for serial)
1526                 (DescriptorType::Writable, 20),
1527                 // Request status
1528                 (DescriptorType::Writable, 1),
1529             ],
1530             0,
1531         )
1532         .expect("create_descriptor_chain failed");
1533 
1534         let af = SingleFileDisk::new(f, &ex).expect("Failed to create SFD");
1535         let timer = Timer::new().expect("Failed to create a timer");
1536         let flush_timer = Rc::new(RefCell::new(
1537             TimerAsync::new(timer, &ex).expect("Failed to create an async timer"),
1538         ));
1539         let flush_timer_armed = Rc::new(RefCell::new(false));
1540 
1541         let id = b"a20-byteserialnumber";
1542 
1543         let disk_state = Rc::new(AsyncRwLock::new(DiskState {
1544             disk_image: Box::new(af),
1545             read_only: false,
1546             sparse: true,
1547             id: Some(*id),
1548             worker_shared_state: Arc::new(AsyncRwLock::new(WorkerSharedState {
1549                 disk_size: Arc::new(AtomicU64::new(disk_size)),
1550             })),
1551         }));
1552 
1553         let fut = process_one_request(
1554             &mut avail_desc,
1555             &disk_state,
1556             &flush_timer,
1557             &flush_timer_armed,
1558         );
1559 
1560         ex.run_until(fut)
1561             .expect("running executor failed")
1562             .expect("execute failed");
1563 
1564         let status_offset = GuestAddress((0x1000 + size_of_val(&req_hdr) + 512) as u64);
1565         let status = mem.read_obj_from_addr::<u8>(status_offset).unwrap();
1566         assert_eq!(status, VIRTIO_BLK_S_OK);
1567 
1568         let id_offset = GuestAddress(0x1000 + size_of_val(&req_hdr) as u64);
1569         let returned_id = mem.read_obj_from_addr::<[u8; 20]>(id_offset).unwrap();
1570         assert_eq!(returned_id, *id);
1571     }
1572 
1573     #[test]
reset_and_reactivate_single_worker()1574     fn reset_and_reactivate_single_worker() {
1575         reset_and_reactivate(false, None);
1576     }
1577 
1578     #[test]
reset_and_reactivate_multiple_workers()1579     fn reset_and_reactivate_multiple_workers() {
1580         reset_and_reactivate(true, None);
1581     }
1582 
1583     #[test]
1584     #[cfg(windows)]
reset_and_reactivate_overrlapped_io()1585     fn reset_and_reactivate_overrlapped_io() {
1586         reset_and_reactivate(
1587             false,
1588             Some(
1589                 cros_async::sys::windows::ExecutorKindSys::Overlapped { concurrency: None }.into(),
1590             ),
1591         );
1592     }
1593 
reset_and_reactivate( enables_multiple_workers: bool, async_executor: Option<cros_async::ExecutorKind>, )1594     fn reset_and_reactivate(
1595         enables_multiple_workers: bool,
1596         async_executor: Option<cros_async::ExecutorKind>,
1597     ) {
1598         // Create an empty disk image
1599         let f = tempfile::NamedTempFile::new().unwrap();
1600         f.as_file().set_len(0x1000).unwrap();
1601         // Close the file so that it is possible for the disk implementation to take exclusive
1602         // access when opening it.
1603         let path: tempfile::TempPath = f.into_temp_path();
1604 
1605         // Create an empty guest memory
1606         let mem = GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1607             .expect("Creating guest memory failed.");
1608 
1609         // Create a control tube.
1610         // NOTE: We don't want to drop the vmm half of the tube. That would cause the worker thread
1611         // will immediately fail, which isn't what we want to test in this case.
1612         let (_control_tube, control_tube_device) = Tube::pair().unwrap();
1613 
1614         // Create a BlockAsync to test
1615         let features = base_features(ProtectionType::Unprotected);
1616         let id = b"Block serial number\0";
1617         let disk_option = DiskOption {
1618             path: path.to_path_buf(),
1619             read_only: true,
1620             id: Some(*id),
1621             sparse: false,
1622             multiple_workers: enables_multiple_workers,
1623             async_executor,
1624             ..Default::default()
1625         };
1626         let disk_image = disk_option.open().unwrap();
1627         let mut b = BlockAsync::new(
1628             features,
1629             disk_image,
1630             &disk_option,
1631             Some(control_tube_device),
1632             None,
1633             None,
1634         )
1635         .unwrap();
1636 
1637         let interrupt = Interrupt::new_for_test();
1638 
1639         // activate with queues of an arbitrary size.
1640         let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1641         q0.set_ready(true);
1642         let q0 = q0
1643             .activate(&mem, Event::new().unwrap(), interrupt.clone())
1644             .expect("QueueConfig::activate");
1645 
1646         let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1647         q1.set_ready(true);
1648         let q1 = q1
1649             .activate(&mem, Event::new().unwrap(), interrupt.clone())
1650             .expect("QueueConfig::activate");
1651 
1652         b.activate(mem.clone(), interrupt, BTreeMap::from([(0, q0), (1, q1)]))
1653             .expect("activate should succeed");
1654         // assert resources are consumed
1655         if !enables_multiple_workers {
1656             assert!(
1657                 b.disk_image.is_none(),
1658                 "BlockAsync should not have a disk image"
1659             );
1660         }
1661         assert!(
1662             b.control_tube.is_none(),
1663             "BlockAsync should not have a control tube"
1664         );
1665         assert_eq!(
1666             b.worker_threads.len(),
1667             if enables_multiple_workers { 2 } else { 1 }
1668         );
1669 
1670         // reset and assert resources are still not back (should be in the worker thread)
1671         assert!(b.reset().is_ok(), "reset should succeed");
1672         if !enables_multiple_workers {
1673             assert!(
1674                 b.disk_image.is_none(),
1675                 "BlockAsync should not have a disk image"
1676             );
1677         }
1678         assert!(
1679             b.control_tube.is_none(),
1680             "BlockAsync should not have a control tube"
1681         );
1682         assert_eq!(
1683             b.worker_threads.len(),
1684             if enables_multiple_workers { 2 } else { 1 }
1685         );
1686         assert_eq!(b.id, Some(*b"Block serial number\0"));
1687 
1688         // re-activate should succeed
1689         let interrupt = Interrupt::new_for_test();
1690         let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1691         q0.set_ready(true);
1692         let q0 = q0
1693             .activate(&mem, Event::new().unwrap(), interrupt.clone())
1694             .expect("QueueConfig::activate");
1695 
1696         let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1697         q1.set_ready(true);
1698         let q1 = q1
1699             .activate(&mem, Event::new().unwrap(), interrupt.clone())
1700             .expect("QueueConfig::activate");
1701 
1702         b.activate(mem, interrupt, BTreeMap::from([(0, q0), (1, q1)]))
1703             .expect("re-activate should succeed");
1704     }
1705 
1706     #[test]
resize_with_single_worker()1707     fn resize_with_single_worker() {
1708         resize(false);
1709     }
1710 
1711     #[test]
resize_with_multiple_workers()1712     fn resize_with_multiple_workers() {
1713         // Test resize handled by one worker affect the whole state
1714         resize(true);
1715     }
1716 
resize(enables_multiple_workers: bool)1717     fn resize(enables_multiple_workers: bool) {
1718         // disk image size constants
1719         let original_size = 0x1000;
1720         let resized_size = 0x2000;
1721 
1722         // Create an empty disk image
1723         let f = tempfile().unwrap();
1724         f.set_len(original_size).unwrap();
1725         let disk_image: Box<dyn DiskFile> = Box::new(f);
1726         assert_eq!(disk_image.get_len().unwrap(), original_size);
1727 
1728         // Create an empty guest memory
1729         let mem = GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1730             .expect("Creating guest memory failed.");
1731 
1732         // Create a control tube
1733         let (control_tube, control_tube_device) = Tube::pair().unwrap();
1734 
1735         // Create a BlockAsync to test
1736         let features = base_features(ProtectionType::Unprotected);
1737         let disk_option = DiskOption {
1738             multiple_workers: enables_multiple_workers,
1739             ..Default::default()
1740         };
1741         let mut b = BlockAsync::new(
1742             features,
1743             disk_image.try_clone().unwrap(),
1744             &disk_option,
1745             Some(control_tube_device),
1746             None,
1747             None,
1748         )
1749         .unwrap();
1750 
1751         let interrupt = Interrupt::new_for_test();
1752 
1753         // activate with queues of an arbitrary size.
1754         let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1755         q0.set_ready(true);
1756         let q0 = q0
1757             .activate(&mem, Event::new().unwrap(), interrupt.clone())
1758             .expect("QueueConfig::activate");
1759 
1760         let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1761         q1.set_ready(true);
1762         let q1 = q1
1763             .activate(&mem, Event::new().unwrap(), interrupt.clone())
1764             .expect("QueueConfig::activate");
1765 
1766         b.activate(mem, interrupt.clone(), BTreeMap::from([(0, q0), (1, q1)]))
1767             .expect("activate should succeed");
1768 
1769         // assert the original size first
1770         assert_eq!(
1771             b.disk_size.load(Ordering::Acquire),
1772             original_size,
1773             "disk_size should be the original size first"
1774         );
1775         let mut capacity = [0u8; 8];
1776         b.read_config(0, &mut capacity);
1777         assert_eq!(
1778             capacity,
1779             // original_size (0x1000) >> SECTOR_SHIFT (9) = 0x8
1780             [0x8, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00],
1781             "read_config should read the original capacity first"
1782         );
1783 
1784         // assert resize works
1785         control_tube
1786             .send(&DiskControlCommand::Resize {
1787                 new_size: resized_size,
1788             })
1789             .unwrap();
1790         assert_eq!(
1791             control_tube.recv::<DiskControlResult>().unwrap(),
1792             DiskControlResult::Ok,
1793             "resize command should succeed"
1794         );
1795         assert_eq!(
1796             b.disk_size.load(Ordering::Acquire),
1797             resized_size,
1798             "disk_size should be resized to the new size"
1799         );
1800         assert_eq!(
1801             disk_image.get_len().unwrap(),
1802             resized_size,
1803             "underlying disk image should be resized to the new size"
1804         );
1805         let mut capacity = [0u8; 8];
1806         b.read_config(0, &mut capacity);
1807         assert_eq!(
1808             capacity,
1809             // resized_size (0x2000) >> SECTOR_SHIFT (9) = 0x10
1810             [0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00],
1811             "read_config should read the resized capacity"
1812         );
1813         assert_eq!(
1814             interrupt
1815                     .get_interrupt_evt()
1816                     // Wait a bit until the blk signals the interrupt
1817                     .wait_timeout(Duration::from_millis(300)),
1818             Ok(base::EventWaitResult::Signaled),
1819             "interrupt should be signaled"
1820         );
1821         assert_eq!(
1822             interrupt.read_interrupt_status(),
1823             crate::virtio::INTERRUPT_STATUS_CONFIG_CHANGED as u8,
1824             "INTERRUPT_STATUS_CONFIG_CHANGED should be signaled"
1825         );
1826     }
1827 
1828     #[test]
run_worker_threads()1829     fn run_worker_threads() {
1830         // Create an empty duplicable disk image
1831         let f = tempfile().unwrap();
1832         f.set_len(0x1000).unwrap();
1833         let disk_image: Box<dyn DiskFile> = Box::new(f);
1834 
1835         // Create an empty guest memory
1836         let mem = GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1837             .expect("Creating guest memory failed.");
1838 
1839         // Create a BlockAsync to test with single worker thread
1840         let features = base_features(ProtectionType::Unprotected);
1841         let disk_option = DiskOption::default();
1842         let mut b = BlockAsync::new(
1843             features,
1844             disk_image.try_clone().unwrap(),
1845             &disk_option,
1846             None,
1847             None,
1848             None,
1849         )
1850         .unwrap();
1851 
1852         // activate with queues of an arbitrary size.
1853         let interrupt = Interrupt::new_for_test();
1854         let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1855         q0.set_ready(true);
1856         let q0 = q0
1857             .activate(&mem, Event::new().unwrap(), interrupt.clone())
1858             .expect("QueueConfig::activate");
1859 
1860         let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1861         q1.set_ready(true);
1862         let q1 = q1
1863             .activate(&mem, Event::new().unwrap(), interrupt.clone())
1864             .expect("QueueConfig::activate");
1865 
1866         b.activate(mem.clone(), interrupt, BTreeMap::from([(0, q0), (1, q1)]))
1867             .expect("activate should succeed");
1868 
1869         assert_eq!(b.worker_threads.len(), 1, "1 threads should be spawned.");
1870         drop(b);
1871 
1872         // Create a BlockAsync to test with multiple worker threads
1873         let features = base_features(ProtectionType::Unprotected);
1874         let disk_option = DiskOption {
1875             read_only: true,
1876             sparse: false,
1877             multiple_workers: true,
1878             ..DiskOption::default()
1879         };
1880         let mut b = BlockAsync::new(features, disk_image, &disk_option, None, None, None).unwrap();
1881 
1882         // activate should succeed
1883         let interrupt = Interrupt::new_for_test();
1884         let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1885         q0.set_ready(true);
1886         let q0 = q0
1887             .activate(&mem, Event::new().unwrap(), interrupt.clone())
1888             .expect("QueueConfig::activate");
1889 
1890         let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1891         q1.set_ready(true);
1892         let q1 = q1
1893             .activate(&mem, Event::new().unwrap(), interrupt.clone())
1894             .expect("QueueConfig::activate");
1895 
1896         b.activate(mem, interrupt, BTreeMap::from([(0, q0), (1, q1)]))
1897             .expect("activate should succeed");
1898 
1899         assert_eq!(b.worker_threads.len(), 2, "2 threads should be spawned.");
1900     }
1901 
1902     struct BlockContext {}
1903 
modify_device(_block_context: &mut BlockContext, b: &mut BlockAsync)1904     fn modify_device(_block_context: &mut BlockContext, b: &mut BlockAsync) {
1905         b.avail_features = !b.avail_features;
1906     }
1907 
create_device() -> (BlockContext, BlockAsync)1908     fn create_device() -> (BlockContext, BlockAsync) {
1909         // Create an empty disk image
1910         let f = tempfile().unwrap();
1911         f.set_len(0x1000).unwrap();
1912         let disk_image: Box<dyn DiskFile> = Box::new(f);
1913 
1914         // Create a BlockAsync to test
1915         let features = base_features(ProtectionType::Unprotected);
1916         let id = b"Block serial number\0";
1917         let disk_option = DiskOption {
1918             read_only: true,
1919             id: Some(*id),
1920             sparse: false,
1921             multiple_workers: true,
1922             ..Default::default()
1923         };
1924         (
1925             BlockContext {},
1926             BlockAsync::new(
1927                 features,
1928                 disk_image.try_clone().unwrap(),
1929                 &disk_option,
1930                 None,
1931                 None,
1932                 None,
1933             )
1934             .unwrap(),
1935         )
1936     }
1937 
1938     #[cfg(any(target_os = "android", target_os = "linux"))]
1939     suspendable_virtio_tests!(asyncblock, create_device, 2, modify_device);
1940 }
1941