1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::cell::RefCell;
6 use std::collections::BTreeMap;
7 use std::collections::BTreeSet;
8 use std::io;
9 use std::io::Write;
10 use std::mem::size_of;
11 #[cfg(windows)]
12 use std::num::NonZeroU32;
13 use std::rc::Rc;
14 use std::result;
15 use std::sync::atomic::AtomicU64;
16 use std::sync::atomic::Ordering;
17 use std::sync::Arc;
18 use std::time::Duration;
19
20 use anyhow::Context;
21 use base::debug;
22 use base::error;
23 use base::info;
24 use base::warn;
25 use base::AsRawDescriptor;
26 use base::Error as SysError;
27 use base::Event;
28 use base::RawDescriptor;
29 use base::Result as SysResult;
30 use base::Timer;
31 use base::Tube;
32 use base::TubeError;
33 use base::WorkerThread;
34 use cros_async::sync::RwLock as AsyncRwLock;
35 use cros_async::AsyncError;
36 use cros_async::AsyncTube;
37 use cros_async::EventAsync;
38 use cros_async::Executor;
39 use cros_async::ExecutorKind;
40 use cros_async::TimerAsync;
41 use data_model::Le16;
42 use data_model::Le32;
43 use data_model::Le64;
44 use disk::AsyncDisk;
45 use disk::DiskFile;
46 use futures::channel::mpsc;
47 use futures::channel::oneshot;
48 use futures::pin_mut;
49 use futures::stream::FuturesUnordered;
50 use futures::stream::StreamExt;
51 use futures::FutureExt;
52 use remain::sorted;
53 use thiserror::Error as ThisError;
54 use virtio_sys::virtio_config::VIRTIO_F_RING_PACKED;
55 use vm_control::DiskControlCommand;
56 use vm_control::DiskControlResult;
57 use vm_memory::GuestMemory;
58 use zerocopy::AsBytes;
59
60 use crate::virtio::async_utils;
61 use crate::virtio::block::sys::*;
62 use crate::virtio::block::DiskOption;
63 use crate::virtio::copy_config;
64 use crate::virtio::device_constants::block::virtio_blk_config;
65 use crate::virtio::device_constants::block::virtio_blk_discard_write_zeroes;
66 use crate::virtio::device_constants::block::virtio_blk_req_header;
67 use crate::virtio::device_constants::block::VIRTIO_BLK_DISCARD_WRITE_ZEROES_FLAG_UNMAP;
68 use crate::virtio::device_constants::block::VIRTIO_BLK_F_BLK_SIZE;
69 use crate::virtio::device_constants::block::VIRTIO_BLK_F_DISCARD;
70 use crate::virtio::device_constants::block::VIRTIO_BLK_F_FLUSH;
71 use crate::virtio::device_constants::block::VIRTIO_BLK_F_MQ;
72 use crate::virtio::device_constants::block::VIRTIO_BLK_F_RO;
73 use crate::virtio::device_constants::block::VIRTIO_BLK_F_SEG_MAX;
74 use crate::virtio::device_constants::block::VIRTIO_BLK_F_WRITE_ZEROES;
75 use crate::virtio::device_constants::block::VIRTIO_BLK_S_IOERR;
76 use crate::virtio::device_constants::block::VIRTIO_BLK_S_OK;
77 use crate::virtio::device_constants::block::VIRTIO_BLK_S_UNSUPP;
78 use crate::virtio::device_constants::block::VIRTIO_BLK_T_DISCARD;
79 use crate::virtio::device_constants::block::VIRTIO_BLK_T_FLUSH;
80 use crate::virtio::device_constants::block::VIRTIO_BLK_T_GET_ID;
81 use crate::virtio::device_constants::block::VIRTIO_BLK_T_IN;
82 use crate::virtio::device_constants::block::VIRTIO_BLK_T_OUT;
83 use crate::virtio::device_constants::block::VIRTIO_BLK_T_WRITE_ZEROES;
84 use crate::virtio::DescriptorChain;
85 use crate::virtio::DeviceType;
86 use crate::virtio::Interrupt;
87 use crate::virtio::Queue;
88 use crate::virtio::Reader;
89 use crate::virtio::VirtioDevice;
90 use crate::virtio::Writer;
91 use crate::PciAddress;
92
93 const DEFAULT_QUEUE_SIZE: u16 = 256;
94 const DEFAULT_NUM_QUEUES: u16 = 16;
95
96 const SECTOR_SHIFT: u8 = 9;
97 const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT;
98
99 const MAX_DISCARD_SECTORS: u32 = u32::MAX;
100 const MAX_WRITE_ZEROES_SECTORS: u32 = u32::MAX;
101 // Arbitrary limits for number of discard/write zeroes segments.
102 const MAX_DISCARD_SEG: u32 = 32;
103 const MAX_WRITE_ZEROES_SEG: u32 = 32;
104 // Hard-coded to 64 KiB (in 512-byte sectors) for now,
105 // but this should probably be based on cluster size for qcow.
106 const DISCARD_SECTOR_ALIGNMENT: u32 = 128;
107
108 #[sorted]
109 #[derive(ThisError, Debug)]
110 enum ExecuteError {
111 #[error("failed to copy ID string: {0}")]
112 CopyId(io::Error),
113 #[error("failed to perform discard or write zeroes; sector={sector} num_sectors={num_sectors} flags={flags}; {ioerr:?}")]
114 DiscardWriteZeroes {
115 ioerr: Option<disk::Error>,
116 sector: u64,
117 num_sectors: u32,
118 flags: u32,
119 },
120 #[error("failed to flush: {0}")]
121 Flush(disk::Error),
122 #[error("not enough space in descriptor chain to write status")]
123 MissingStatus,
124 #[error("out of range")]
125 OutOfRange,
126 #[error("failed to read message: {0}")]
127 Read(io::Error),
128 #[error("io error reading {length} bytes from sector {sector}: {desc_error}")]
129 ReadIo {
130 length: usize,
131 sector: u64,
132 desc_error: disk::Error,
133 },
134 #[error("read only; request_type={request_type}")]
135 ReadOnly { request_type: u32 },
136 #[error("failed to recieve command message: {0}")]
137 ReceivingCommand(TubeError),
138 #[error("failed to send command response: {0}")]
139 SendingResponse(TubeError),
140 #[error("couldn't reset the timer: {0}")]
141 TimerReset(base::Error),
142 #[error("unsupported ({0})")]
143 Unsupported(u32),
144 #[error("io error writing {length} bytes from sector {sector}: {desc_error}")]
145 WriteIo {
146 length: usize,
147 sector: u64,
148 desc_error: disk::Error,
149 },
150 #[error("failed to write request status: {0}")]
151 WriteStatus(io::Error),
152 }
153
154 enum LogLevel {
155 Debug,
156 Error,
157 }
158
159 impl ExecuteError {
status(&self) -> u8160 fn status(&self) -> u8 {
161 match self {
162 ExecuteError::CopyId(_) => VIRTIO_BLK_S_IOERR,
163 ExecuteError::DiscardWriteZeroes { .. } => VIRTIO_BLK_S_IOERR,
164 ExecuteError::Flush(_) => VIRTIO_BLK_S_IOERR,
165 ExecuteError::MissingStatus => VIRTIO_BLK_S_IOERR,
166 ExecuteError::OutOfRange { .. } => VIRTIO_BLK_S_IOERR,
167 ExecuteError::Read(_) => VIRTIO_BLK_S_IOERR,
168 ExecuteError::ReadIo { .. } => VIRTIO_BLK_S_IOERR,
169 ExecuteError::ReadOnly { .. } => VIRTIO_BLK_S_IOERR,
170 ExecuteError::ReceivingCommand(_) => VIRTIO_BLK_S_IOERR,
171 ExecuteError::SendingResponse(_) => VIRTIO_BLK_S_IOERR,
172 ExecuteError::TimerReset(_) => VIRTIO_BLK_S_IOERR,
173 ExecuteError::WriteIo { .. } => VIRTIO_BLK_S_IOERR,
174 ExecuteError::WriteStatus(_) => VIRTIO_BLK_S_IOERR,
175 ExecuteError::Unsupported(_) => VIRTIO_BLK_S_UNSUPP,
176 }
177 }
178
log_level(&self) -> LogLevel179 fn log_level(&self) -> LogLevel {
180 match self {
181 // Since there is no feature bit for the guest to detect support for
182 // VIRTIO_BLK_T_GET_ID, the driver has to try executing the request to see if it works.
183 ExecuteError::Unsupported(VIRTIO_BLK_T_GET_ID) => LogLevel::Debug,
184 // Log disk I/O errors at debug level to avoid flooding the logs.
185 ExecuteError::ReadIo { .. }
186 | ExecuteError::WriteIo { .. }
187 | ExecuteError::Flush { .. }
188 | ExecuteError::DiscardWriteZeroes { .. } => LogLevel::Debug,
189 // Log all other failures as errors.
190 _ => LogLevel::Error,
191 }
192 }
193 }
194
195 /// Errors that happen in block outside of executing a request.
196 /// This includes errors during resize and flush operations.
197 #[sorted]
198 #[derive(ThisError, Debug)]
199 enum ControlError {
200 #[error("couldn't get a value from a timer for flushing: {0}")]
201 FlushTimer(AsyncError),
202 #[error("failed to fsync the disk: {0}")]
203 FsyncDisk(disk::Error),
204 }
205
206 /// Maximum length of the virtio-block ID string field.
207 const ID_LEN: usize = 20;
208
209 /// Virtio block device identifier.
210 /// This is an ASCII string terminated by a \0, unless all 20 bytes are used,
211 /// in which case the \0 terminator is omitted.
212 type BlockId = [u8; ID_LEN];
213
214 /// Tracks the state of an anynchronous disk.
215 struct DiskState {
216 disk_image: Box<dyn AsyncDisk>,
217 read_only: bool,
218 sparse: bool,
219 id: Option<BlockId>,
220 /// A DiskState is owned by each worker's executor and cannot be shared by workers, thus
221 /// `worker_shared_state` holds the state shared by workers in Arc.
222 worker_shared_state: Arc<AsyncRwLock<WorkerSharedState>>,
223 }
224
225 /// Disk state which can be modified by other worker threads
226 struct WorkerSharedState {
227 disk_size: Arc<AtomicU64>,
228 }
229
process_one_request( avail_desc: &mut DescriptorChain, disk_state: &AsyncRwLock<DiskState>, flush_timer: &RefCell<TimerAsync<Timer>>, flush_timer_armed: &RefCell<bool>, ) -> result::Result<usize, ExecuteError>230 async fn process_one_request(
231 avail_desc: &mut DescriptorChain,
232 disk_state: &AsyncRwLock<DiskState>,
233 flush_timer: &RefCell<TimerAsync<Timer>>,
234 flush_timer_armed: &RefCell<bool>,
235 ) -> result::Result<usize, ExecuteError> {
236 let reader = &mut avail_desc.reader;
237 let writer = &mut avail_desc.writer;
238
239 // The last byte of the buffer is virtio_blk_req::status.
240 // Split it into a separate Writer so that status_writer is the final byte and
241 // the original writer is left with just the actual block I/O data.
242 let available_bytes = writer.available_bytes();
243 let status_offset = available_bytes
244 .checked_sub(1)
245 .ok_or(ExecuteError::MissingStatus)?;
246 let mut status_writer = writer.split_at(status_offset);
247
248 let status = match BlockAsync::execute_request(
249 reader,
250 writer,
251 disk_state,
252 flush_timer,
253 flush_timer_armed,
254 )
255 .await
256 {
257 Ok(()) => VIRTIO_BLK_S_OK,
258 Err(e) => {
259 match e.log_level() {
260 LogLevel::Debug => debug!("failed executing disk request: {:#}", e),
261 LogLevel::Error => error!("failed executing disk request: {:#}", e),
262 }
263 e.status()
264 }
265 };
266
267 status_writer
268 .write_all(&[status])
269 .map_err(ExecuteError::WriteStatus)?;
270 Ok(available_bytes)
271 }
272
273 /// Process one descriptor chain asynchronously.
process_one_chain( queue: &RefCell<Queue>, mut avail_desc: DescriptorChain, disk_state: &AsyncRwLock<DiskState>, flush_timer: &RefCell<TimerAsync<Timer>>, flush_timer_armed: &RefCell<bool>, )274 async fn process_one_chain(
275 queue: &RefCell<Queue>,
276 mut avail_desc: DescriptorChain,
277 disk_state: &AsyncRwLock<DiskState>,
278 flush_timer: &RefCell<TimerAsync<Timer>>,
279 flush_timer_armed: &RefCell<bool>,
280 ) {
281 let _trace = cros_tracing::trace_event!(VirtioBlk, "process_one_chain");
282 let len = match process_one_request(&mut avail_desc, disk_state, flush_timer, flush_timer_armed)
283 .await
284 {
285 Ok(len) => len,
286 Err(e) => {
287 error!("block: failed to handle request: {:#}", e);
288 0
289 }
290 };
291
292 let mut queue = queue.borrow_mut();
293 queue.add_used(avail_desc, len as u32);
294 queue.trigger_interrupt();
295 }
296
297 // There is one async task running `handle_queue` per virtio queue in use.
298 // Receives messages from the guest and queues a task to complete the operations with the async
299 // executor.
handle_queue( disk_state: Rc<AsyncRwLock<DiskState>>, queue: Queue, evt: EventAsync, flush_timer: Rc<RefCell<TimerAsync<Timer>>>, flush_timer_armed: Rc<RefCell<bool>>, mut stop_rx: oneshot::Receiver<()>, ) -> Queue300 async fn handle_queue(
301 disk_state: Rc<AsyncRwLock<DiskState>>,
302 queue: Queue,
303 evt: EventAsync,
304 flush_timer: Rc<RefCell<TimerAsync<Timer>>>,
305 flush_timer_armed: Rc<RefCell<bool>>,
306 mut stop_rx: oneshot::Receiver<()>,
307 ) -> Queue {
308 let queue = RefCell::new(queue);
309 let mut background_tasks = FuturesUnordered::new();
310 let evt_future = evt.next_val().fuse();
311 pin_mut!(evt_future);
312 loop {
313 // Wait for the next signal from `evt` and process `background_tasks` in the meantime.
314 //
315 // NOTE: We can't call `evt.next_val()` directly in the `select!` expression. That would
316 // create a new future each time, which, in the completion-based async backends like
317 // io_uring, means we'd submit a new syscall each time (i.e. a race condition on the
318 // eventfd).
319 futures::select! {
320 _ = background_tasks.next() => continue,
321 res = evt_future => {
322 evt_future.set(evt.next_val().fuse());
323 if let Err(e) = res {
324 error!("Failed to read the next queue event: {:#}", e);
325 continue;
326 }
327 }
328 _ = stop_rx => {
329 // Process all the descriptors we've already popped from the queue so that we leave
330 // the queue in a consistent state.
331 background_tasks.collect::<()>().await;
332 return queue.into_inner();
333 }
334 };
335 while let Some(descriptor_chain) = queue.borrow_mut().pop() {
336 background_tasks.push(process_one_chain(
337 &queue,
338 descriptor_chain,
339 &disk_state,
340 &flush_timer,
341 &flush_timer_armed,
342 ));
343 }
344 }
345 }
346
handle_command_tube( command_tube: &Option<AsyncTube>, interrupt: &RefCell<Option<Interrupt>>, disk_state: Rc<AsyncRwLock<DiskState>>, ) -> Result<(), ExecuteError>347 async fn handle_command_tube(
348 command_tube: &Option<AsyncTube>,
349 interrupt: &RefCell<Option<Interrupt>>,
350 disk_state: Rc<AsyncRwLock<DiskState>>,
351 ) -> Result<(), ExecuteError> {
352 let command_tube = match command_tube {
353 Some(c) => c,
354 None => {
355 futures::future::pending::<()>().await;
356 return Ok(());
357 }
358 };
359 loop {
360 match command_tube.next().await {
361 Ok(command) => {
362 let resp = match command {
363 DiskControlCommand::Resize { new_size } => resize(&disk_state, new_size).await,
364 };
365
366 let resp_clone = resp.clone();
367 command_tube
368 .send(resp_clone)
369 .await
370 .map_err(ExecuteError::SendingResponse)?;
371 if let DiskControlResult::Ok = resp {
372 if let Some(interrupt) = &*interrupt.borrow() {
373 interrupt.signal_config_changed();
374 }
375 }
376 }
377 Err(e) => return Err(ExecuteError::ReceivingCommand(e)),
378 }
379 }
380 }
381
resize(disk_state: &AsyncRwLock<DiskState>, new_size: u64) -> DiskControlResult382 async fn resize(disk_state: &AsyncRwLock<DiskState>, new_size: u64) -> DiskControlResult {
383 // Acquire exclusive, mutable access to the state so the virtqueue task won't be able to read
384 // the state while resizing.
385 let disk_state = disk_state.lock().await;
386 // Prevent any other worker threads won't be able to do IO.
387 let worker_shared_state = Arc::clone(&disk_state.worker_shared_state);
388 let worker_shared_state = worker_shared_state.lock().await;
389
390 if disk_state.read_only {
391 error!("Attempted to resize read-only block device");
392 return DiskControlResult::Err(SysError::new(libc::EROFS));
393 }
394
395 info!("Resizing block device to {} bytes", new_size);
396
397 if let Err(e) = disk_state.disk_image.set_len(new_size) {
398 error!("Resizing disk failed! {:#}", e);
399 return DiskControlResult::Err(SysError::new(libc::EIO));
400 }
401
402 // Allocate new space if the disk image is not sparse.
403 if !disk_state.sparse {
404 if let Err(e) = disk_state.disk_image.allocate(0, new_size) {
405 error!("Allocating disk space after resize failed! {:#}", e);
406 return DiskControlResult::Err(SysError::new(libc::EIO));
407 }
408 }
409
410 if let Ok(new_disk_size) = disk_state.disk_image.get_len() {
411 worker_shared_state
412 .disk_size
413 .store(new_disk_size, Ordering::Release);
414 }
415 DiskControlResult::Ok
416 }
417
418 /// Periodically flushes the disk when the given timer fires.
flush_disk( disk_state: Rc<AsyncRwLock<DiskState>>, timer: TimerAsync<Timer>, armed: Rc<RefCell<bool>>, ) -> Result<(), ControlError>419 async fn flush_disk(
420 disk_state: Rc<AsyncRwLock<DiskState>>,
421 timer: TimerAsync<Timer>,
422 armed: Rc<RefCell<bool>>,
423 ) -> Result<(), ControlError> {
424 loop {
425 timer.wait().await.map_err(ControlError::FlushTimer)?;
426 if !*armed.borrow() {
427 continue;
428 }
429
430 // Reset armed before calling fsync to guarantee that IO requests that started after we call
431 // fsync will be committed eventually.
432 *armed.borrow_mut() = false;
433
434 disk_state
435 .read_lock()
436 .await
437 .disk_image
438 .fsync()
439 .await
440 .map_err(ControlError::FsyncDisk)?;
441 }
442 }
443
444 enum WorkerCmd {
445 StartQueue {
446 index: usize,
447 queue: Queue,
448 },
449 StopQueue {
450 index: usize,
451 // Once the queue is stopped, it will be sent back over `response_tx`.
452 // `None` indicates that there was no queue at the given index.
453 response_tx: oneshot::Sender<Option<Queue>>,
454 },
455 // Stop all queues without recovering the queues' state and without completing any queued up
456 // work .
457 AbortQueues {
458 // Once the queues are stopped, a `()` value will be sent back over `response_tx`.
459 response_tx: oneshot::Sender<()>,
460 },
461 }
462
463 // The main worker thread. Initialized the asynchronous worker tasks and passes them to the executor
464 // to be processed.
465 //
466 // `disk_state` is wrapped by `AsyncRwLock`, which provides both shared and exclusive locks. It's
467 // because the state can be read from the virtqueue task while the control task is processing a
468 // resizing command.
run_worker( ex: &Executor, disk_state: &Rc<AsyncRwLock<DiskState>>, control_tube: &Option<AsyncTube>, mut worker_rx: mpsc::UnboundedReceiver<WorkerCmd>, kill_evt: Event, ) -> anyhow::Result<()>469 async fn run_worker(
470 ex: &Executor,
471 disk_state: &Rc<AsyncRwLock<DiskState>>,
472 control_tube: &Option<AsyncTube>,
473 mut worker_rx: mpsc::UnboundedReceiver<WorkerCmd>,
474 kill_evt: Event,
475 ) -> anyhow::Result<()> {
476 // One flush timer per disk.
477 let timer = Timer::new().expect("Failed to create a timer");
478 let flush_timer_armed = Rc::new(RefCell::new(false));
479
480 // Handles control requests.
481 let control_interrupt = RefCell::new(None);
482 let control = handle_command_tube(control_tube, &control_interrupt, disk_state.clone()).fuse();
483 pin_mut!(control);
484
485 // Handle all the queues in one sub-select call.
486 let flush_timer = Rc::new(RefCell::new(
487 TimerAsync::new(
488 // Call try_clone() to share the same underlying FD with the `flush_disk` task.
489 timer.try_clone().expect("Failed to clone flush_timer"),
490 ex,
491 )
492 .expect("Failed to create an async timer"),
493 ));
494
495 // Flushes the disk periodically.
496 let flush_timer2 = TimerAsync::new(timer, ex).expect("Failed to create an async timer");
497 let disk_flush = flush_disk(disk_state.clone(), flush_timer2, flush_timer_armed.clone()).fuse();
498 pin_mut!(disk_flush);
499
500 // Exit if the kill event is triggered.
501 let kill = async_utils::await_and_exit(ex, kill_evt).fuse();
502 pin_mut!(kill);
503
504 // Process any requests to resample the irq value.
505 let resample_future = std::future::pending::<anyhow::Result<()>>()
506 .fuse()
507 .left_future();
508 pin_mut!(resample_future);
509
510 // Running queue handlers.
511 let mut queue_handlers = FuturesUnordered::new();
512 // Async stop functions for queue handlers, by queue index.
513 let mut queue_handler_stop_fns = std::collections::BTreeMap::new();
514
515 loop {
516 futures::select! {
517 _ = queue_handlers.next() => continue,
518 r = disk_flush => return r.context("failed to flush a disk"),
519 r = control => return r.context("failed to handle a control request"),
520 r = resample_future => return r.context("failed to resample an irq value"),
521 r = kill => return r.context("failed to wait on the kill event"),
522 worker_cmd = worker_rx.next() => {
523 match worker_cmd {
524 None => anyhow::bail!("worker control channel unexpectedly closed"),
525 Some(WorkerCmd::StartQueue{index, queue}) => {
526 if matches!(&*resample_future, futures::future::Either::Left(_)) {
527 resample_future.set(
528 async_utils::handle_irq_resample(ex, queue.interrupt().clone())
529 .fuse()
530 .right_future(),
531 );
532 }
533 if control_interrupt.borrow().is_none() {
534 *control_interrupt.borrow_mut() = Some(queue.interrupt().clone());
535 }
536
537 let (tx, rx) = oneshot::channel();
538 let kick_evt = queue.event().try_clone().expect("Failed to clone queue event");
539 let (handle_queue_future, remote_handle) = handle_queue(
540 Rc::clone(disk_state),
541 queue,
542 EventAsync::new(kick_evt, ex).expect("Failed to create async event for queue"),
543 Rc::clone(&flush_timer),
544 Rc::clone(&flush_timer_armed),
545 rx,
546 ).remote_handle();
547 let old_stop_fn = queue_handler_stop_fns.insert(index, move || {
548 // Ask the handler to stop.
549 tx.send(()).unwrap_or_else(|_| panic!("queue handler channel closed early"));
550 // Wait for its return value.
551 remote_handle
552 });
553
554 // If there was already a handler for this index, stop it before adding the
555 // new handler future.
556 if let Some(stop_fn) = old_stop_fn {
557 warn!("Starting new queue handler without stopping old handler");
558 // Unfortunately we can't just do `stop_fn().await` because the actual
559 // work we are waiting on is in `queue_handlers`. So, run both.
560 let mut fut = stop_fn().fuse();
561 loop {
562 futures::select! {
563 _ = queue_handlers.next() => continue,
564 _queue = fut => break,
565 }
566 }
567 }
568
569 queue_handlers.push(handle_queue_future);
570 }
571 Some(WorkerCmd::StopQueue{index, response_tx}) => {
572 match queue_handler_stop_fns.remove(&index) {
573 Some(stop_fn) => {
574 // NOTE: This await is blocking the select loop. If we want to
575 // support stopping queues concurrently, then it needs to be moved.
576 // For now, keep it simple.
577 //
578 // Unfortunately we can't just do `stop_fn().await` because the
579 // actual work we are waiting on is in `queue_handlers`. So, run
580 // both.
581 let mut fut = stop_fn().fuse();
582 let queue = loop {
583 futures::select! {
584 _ = queue_handlers.next() => continue,
585 queue = fut => break queue,
586 }
587 };
588
589 // If this is the last queue, drop references to the interrupt so
590 // that, when queues are started up again, we'll use the new
591 // interrupt passed with the first queue.
592 if queue_handlers.is_empty() {
593 resample_future.set(std::future::pending().fuse().left_future());
594 *control_interrupt.borrow_mut() = None;
595 }
596
597 let _ = response_tx.send(Some(queue));
598 }
599 None => { let _ = response_tx.send(None); },
600 }
601
602 }
603 Some(WorkerCmd::AbortQueues{response_tx}) => {
604 queue_handlers.clear();
605 queue_handler_stop_fns.clear();
606
607 resample_future.set(std::future::pending().fuse().left_future());
608 *control_interrupt.borrow_mut() = None;
609
610 let _ = response_tx.send(());
611 }
612 }
613 }
614 };
615 }
616 }
617
618 /// Virtio device for exposing block level read/write operations on a host file.
619 pub struct BlockAsync {
620 // We need to make boot_index public bc the field is used by the main crate to determine boot
621 // order
622 boot_index: Option<usize>,
623 // `None` iff `self.worker_per_queue == false` and the worker thread is running.
624 disk_image: Option<Box<dyn DiskFile>>,
625 disk_size: Arc<AtomicU64>,
626 avail_features: u64,
627 read_only: bool,
628 sparse: bool,
629 seg_max: u32,
630 block_size: u32,
631 id: Option<BlockId>,
632 control_tube: Option<Tube>,
633 queue_sizes: Vec<u16>,
634 pub(super) executor_kind: ExecutorKind,
635 // If `worker_per_queue == true`, `worker_threads` contains the worker for each running queue
636 // by index. Otherwise, contains the monolithic worker for all queues at index 0.
637 //
638 // Once a thread is started, we never stop it, except when `BlockAsync` itself is dropped. That
639 // is because we cannot easily convert the `AsyncDisk` back to a `DiskFile` when backed by
640 // Overlapped I/O on Windows because the file becomes permanently associated with the IOCP
641 // instance of the async executor.
642 worker_threads: BTreeMap<usize, (WorkerThread<()>, mpsc::UnboundedSender<WorkerCmd>)>,
643 shared_state: Arc<AsyncRwLock<WorkerSharedState>>,
644 // Whether to run worker threads in parallel for each queue
645 worker_per_queue: bool,
646 // Indices of running queues.
647 // TODO: The worker already tracks this. Only need it here to stop queues on sleep. Maybe add a
648 // worker cmd to stop all at once, then we can delete this field.
649 activated_queues: BTreeSet<usize>,
650 #[cfg(windows)]
651 pub(super) io_concurrency: u32,
652 pci_address: Option<PciAddress>,
653 }
654
655 impl BlockAsync {
656 /// Create a new virtio block device that operates on the given AsyncDisk.
new( base_features: u64, disk_image: Box<dyn DiskFile>, disk_option: &DiskOption, control_tube: Option<Tube>, queue_size: Option<u16>, num_queues: Option<u16>, ) -> SysResult<BlockAsync>657 pub fn new(
658 base_features: u64,
659 disk_image: Box<dyn DiskFile>,
660 disk_option: &DiskOption,
661 control_tube: Option<Tube>,
662 queue_size: Option<u16>,
663 num_queues: Option<u16>,
664 ) -> SysResult<BlockAsync> {
665 let read_only = disk_option.read_only;
666 let sparse = disk_option.sparse;
667 let block_size = disk_option.block_size;
668 let packed_queue = disk_option.packed_queue;
669 let id = disk_option.id;
670 let mut worker_per_queue = disk_option.multiple_workers;
671 // Automatically disable multiple workers if the disk image can't be cloned.
672 if worker_per_queue && disk_image.try_clone().is_err() {
673 base::warn!("multiple workers requested, but not supported by disk image type");
674 worker_per_queue = false;
675 }
676 let executor_kind = disk_option.async_executor.unwrap_or_default();
677 let boot_index = disk_option.bootindex;
678 #[cfg(windows)]
679 let io_concurrency = disk_option.io_concurrency.get();
680
681 if block_size % SECTOR_SIZE as u32 != 0 {
682 error!(
683 "Block size {} is not a multiple of {}.",
684 block_size, SECTOR_SIZE,
685 );
686 return Err(SysError::new(libc::EINVAL));
687 }
688 let disk_size = disk_image.get_len()?;
689 if disk_size % block_size as u64 != 0 {
690 warn!(
691 "Disk size {} is not a multiple of block size {}; \
692 the remainder will not be visible to the guest.",
693 disk_size, block_size,
694 );
695 }
696 let num_queues = num_queues.unwrap_or(DEFAULT_NUM_QUEUES);
697 let multi_queue = match num_queues {
698 0 => panic!("Number of queues cannot be zero for a block device"),
699 1 => false,
700 _ => true,
701 };
702 let q_size = queue_size.unwrap_or(DEFAULT_QUEUE_SIZE);
703 if !q_size.is_power_of_two() {
704 error!("queue size {} is not a power of 2.", q_size);
705 return Err(SysError::new(libc::EINVAL));
706 }
707 let queue_sizes = vec![q_size; num_queues as usize];
708
709 let avail_features =
710 Self::build_avail_features(base_features, read_only, sparse, multi_queue, packed_queue);
711
712 let seg_max = get_seg_max(q_size);
713
714 let disk_size = Arc::new(AtomicU64::new(disk_size));
715 let shared_state = Arc::new(AsyncRwLock::new(WorkerSharedState {
716 disk_size: disk_size.clone(),
717 }));
718
719 Ok(BlockAsync {
720 disk_image: Some(disk_image),
721 disk_size,
722 avail_features,
723 read_only,
724 sparse,
725 seg_max,
726 block_size,
727 id,
728 queue_sizes,
729 worker_threads: BTreeMap::new(),
730 shared_state,
731 worker_per_queue,
732 control_tube,
733 executor_kind,
734 activated_queues: BTreeSet::new(),
735 boot_index,
736 #[cfg(windows)]
737 io_concurrency,
738 pci_address: disk_option.pci_address,
739 })
740 }
741
742 /// Returns the feature flags given the specified attributes.
build_avail_features( base_features: u64, read_only: bool, sparse: bool, multi_queue: bool, packed_queue: bool, ) -> u64743 fn build_avail_features(
744 base_features: u64,
745 read_only: bool,
746 sparse: bool,
747 multi_queue: bool,
748 packed_queue: bool,
749 ) -> u64 {
750 let mut avail_features = base_features;
751 if read_only {
752 avail_features |= 1 << VIRTIO_BLK_F_RO;
753 } else {
754 if sparse {
755 avail_features |= 1 << VIRTIO_BLK_F_DISCARD;
756 }
757 avail_features |= 1 << VIRTIO_BLK_F_FLUSH;
758 avail_features |= 1 << VIRTIO_BLK_F_WRITE_ZEROES;
759 }
760 avail_features |= 1 << VIRTIO_BLK_F_SEG_MAX;
761 avail_features |= 1 << VIRTIO_BLK_F_BLK_SIZE;
762 if multi_queue {
763 avail_features |= 1 << VIRTIO_BLK_F_MQ;
764 }
765 if packed_queue {
766 avail_features |= 1 << VIRTIO_F_RING_PACKED;
767 }
768 avail_features
769 }
770
771 // Execute a single block device request.
772 // `writer` includes the data region only; the status byte is not included.
773 // It is up to the caller to convert the result of this function into a status byte
774 // and write it to the expected location in guest memory.
execute_request( reader: &mut Reader, writer: &mut Writer, disk_state: &AsyncRwLock<DiskState>, flush_timer: &RefCell<TimerAsync<Timer>>, flush_timer_armed: &RefCell<bool>, ) -> result::Result<(), ExecuteError>775 async fn execute_request(
776 reader: &mut Reader,
777 writer: &mut Writer,
778 disk_state: &AsyncRwLock<DiskState>,
779 flush_timer: &RefCell<TimerAsync<Timer>>,
780 flush_timer_armed: &RefCell<bool>,
781 ) -> result::Result<(), ExecuteError> {
782 // Acquire immutable access to prevent tasks from resizing disk.
783 let disk_state = disk_state.read_lock().await;
784 // Acquire immutable access to prevent other worker threads from resizing disk.
785 let worker_shared_state = disk_state.worker_shared_state.read_lock().await;
786
787 let req_header: virtio_blk_req_header = reader.read_obj().map_err(ExecuteError::Read)?;
788
789 let req_type = req_header.req_type.to_native();
790 let sector = req_header.sector.to_native();
791
792 if disk_state.read_only && req_type != VIRTIO_BLK_T_IN && req_type != VIRTIO_BLK_T_GET_ID {
793 return Err(ExecuteError::ReadOnly {
794 request_type: req_type,
795 });
796 }
797
798 /// Check that a request accesses only data within the disk's current size.
799 /// All parameters are in units of bytes.
800 fn check_range(
801 io_start: u64,
802 io_length: u64,
803 disk_size: u64,
804 ) -> result::Result<(), ExecuteError> {
805 let io_end = io_start
806 .checked_add(io_length)
807 .ok_or(ExecuteError::OutOfRange)?;
808 if io_end > disk_size {
809 Err(ExecuteError::OutOfRange)
810 } else {
811 Ok(())
812 }
813 }
814
815 let disk_size = worker_shared_state.disk_size.load(Ordering::Relaxed);
816 match req_type {
817 VIRTIO_BLK_T_IN => {
818 let data_len = writer.available_bytes();
819 if data_len == 0 {
820 return Ok(());
821 }
822 let offset = sector
823 .checked_shl(u32::from(SECTOR_SHIFT))
824 .ok_or(ExecuteError::OutOfRange)?;
825 let _trace = cros_tracing::trace_event!(VirtioBlk, "in", offset, data_len);
826 check_range(offset, data_len as u64, disk_size)?;
827 let disk_image = &disk_state.disk_image;
828 writer
829 .write_all_from_at_fut(&**disk_image, data_len, offset)
830 .await
831 .map_err(|desc_error| ExecuteError::ReadIo {
832 length: data_len,
833 sector,
834 desc_error,
835 })?;
836 }
837 VIRTIO_BLK_T_OUT => {
838 let data_len = reader.available_bytes();
839 if data_len == 0 {
840 return Ok(());
841 }
842 let offset = sector
843 .checked_shl(u32::from(SECTOR_SHIFT))
844 .ok_or(ExecuteError::OutOfRange)?;
845 let _trace = cros_tracing::trace_event!(VirtioBlk, "out", offset, data_len);
846 check_range(offset, data_len as u64, disk_size)?;
847 let disk_image = &disk_state.disk_image;
848 reader
849 .read_exact_to_at_fut(&**disk_image, data_len, offset)
850 .await
851 .map_err(|desc_error| ExecuteError::WriteIo {
852 length: data_len,
853 sector,
854 desc_error,
855 })?;
856
857 if !*flush_timer_armed.borrow() {
858 *flush_timer_armed.borrow_mut() = true;
859
860 let flush_delay = Duration::from_secs(60);
861 flush_timer
862 .borrow_mut()
863 .reset_oneshot(flush_delay)
864 .map_err(ExecuteError::TimerReset)?;
865 }
866 }
867 VIRTIO_BLK_T_DISCARD | VIRTIO_BLK_T_WRITE_ZEROES => {
868 #[allow(clippy::if_same_then_else)]
869 let _trace = if req_type == VIRTIO_BLK_T_DISCARD {
870 cros_tracing::trace_event!(VirtioBlk, "discard")
871 } else {
872 cros_tracing::trace_event!(VirtioBlk, "write_zeroes")
873 };
874 if req_type == VIRTIO_BLK_T_DISCARD && !disk_state.sparse {
875 // Discard is a hint; if this is a non-sparse disk, just ignore it.
876 return Ok(());
877 }
878
879 while reader.available_bytes() >= size_of::<virtio_blk_discard_write_zeroes>() {
880 let seg: virtio_blk_discard_write_zeroes =
881 reader.read_obj().map_err(ExecuteError::Read)?;
882
883 let sector = seg.sector.to_native();
884 let num_sectors = seg.num_sectors.to_native();
885 let flags = seg.flags.to_native();
886
887 let valid_flags = if req_type == VIRTIO_BLK_T_WRITE_ZEROES {
888 VIRTIO_BLK_DISCARD_WRITE_ZEROES_FLAG_UNMAP
889 } else {
890 0
891 };
892
893 if (flags & !valid_flags) != 0 {
894 return Err(ExecuteError::DiscardWriteZeroes {
895 ioerr: None,
896 sector,
897 num_sectors,
898 flags,
899 });
900 }
901
902 let offset = sector
903 .checked_shl(u32::from(SECTOR_SHIFT))
904 .ok_or(ExecuteError::OutOfRange)?;
905 let length = u64::from(num_sectors)
906 .checked_shl(u32::from(SECTOR_SHIFT))
907 .ok_or(ExecuteError::OutOfRange)?;
908 check_range(offset, length, disk_size)?;
909
910 if req_type == VIRTIO_BLK_T_DISCARD {
911 // Since Discard is just a hint and some filesystems may not implement
912 // FALLOC_FL_PUNCH_HOLE, ignore punch_hole errors.
913 let _ = disk_state.disk_image.punch_hole(offset, length).await;
914 } else {
915 disk_state
916 .disk_image
917 .write_zeroes_at(offset, length)
918 .await
919 .map_err(|e| ExecuteError::DiscardWriteZeroes {
920 ioerr: Some(e),
921 sector,
922 num_sectors,
923 flags,
924 })?;
925 }
926 }
927 }
928 VIRTIO_BLK_T_FLUSH => {
929 let _trace = cros_tracing::trace_event!(VirtioBlk, "flush");
930 disk_state
931 .disk_image
932 .fdatasync()
933 .await
934 .map_err(ExecuteError::Flush)?;
935
936 if *flush_timer_armed.borrow() {
937 flush_timer
938 .borrow_mut()
939 .clear()
940 .map_err(ExecuteError::TimerReset)?;
941 *flush_timer_armed.borrow_mut() = false;
942 }
943 }
944 VIRTIO_BLK_T_GET_ID => {
945 let _trace = cros_tracing::trace_event!(VirtioBlk, "get_id");
946 if let Some(id) = disk_state.id {
947 writer.write_all(&id).map_err(ExecuteError::CopyId)?;
948 } else {
949 return Err(ExecuteError::Unsupported(req_type));
950 }
951 }
952 t => return Err(ExecuteError::Unsupported(t)),
953 };
954 Ok(())
955 }
956
957 /// Builds and returns the config structure used to specify block features.
build_config_space( disk_size: u64, seg_max: u32, block_size: u32, num_queues: u16, ) -> virtio_blk_config958 fn build_config_space(
959 disk_size: u64,
960 seg_max: u32,
961 block_size: u32,
962 num_queues: u16,
963 ) -> virtio_blk_config {
964 virtio_blk_config {
965 // If the image is not a multiple of the sector size, the tail bits are not exposed.
966 capacity: Le64::from(disk_size >> SECTOR_SHIFT),
967 seg_max: Le32::from(seg_max),
968 blk_size: Le32::from(block_size),
969 num_queues: Le16::from(num_queues),
970 max_discard_sectors: Le32::from(MAX_DISCARD_SECTORS),
971 discard_sector_alignment: Le32::from(DISCARD_SECTOR_ALIGNMENT),
972 max_write_zeroes_sectors: Le32::from(MAX_WRITE_ZEROES_SECTORS),
973 write_zeroes_may_unmap: 1,
974 max_discard_seg: Le32::from(MAX_DISCARD_SEG),
975 max_write_zeroes_seg: Le32::from(MAX_WRITE_ZEROES_SEG),
976 ..Default::default()
977 }
978 }
979
980 /// Get the worker for a queue, starting it if necessary.
981 // NOTE: Can't use `BTreeMap::entry` because it requires an exclusive ref for the whole branch.
982 #[allow(clippy::map_entry)]
start_worker( &mut self, idx: usize, ) -> anyhow::Result<&(WorkerThread<()>, mpsc::UnboundedSender<WorkerCmd>)>983 fn start_worker(
984 &mut self,
985 idx: usize,
986 ) -> anyhow::Result<&(WorkerThread<()>, mpsc::UnboundedSender<WorkerCmd>)> {
987 let key = if self.worker_per_queue { idx } else { 0 };
988 if self.worker_threads.contains_key(&key) {
989 return Ok(self.worker_threads.get(&key).unwrap());
990 }
991
992 let ex = self.create_executor();
993 let control_tube = self.control_tube.take();
994 let disk_image = if self.worker_per_queue {
995 self.disk_image
996 .as_ref()
997 .context("Failed to ref a disk image")?
998 .try_clone()
999 .context("Failed to clone a disk image")?
1000 } else {
1001 self.disk_image
1002 .take()
1003 .context("Failed to take a disk image")?
1004 };
1005 let read_only = self.read_only;
1006 let sparse = self.sparse;
1007 let id = self.id;
1008 let worker_shared_state = self.shared_state.clone();
1009
1010 let (worker_tx, worker_rx) = mpsc::unbounded();
1011 let worker_thread = WorkerThread::start("virtio_blk", move |kill_evt| {
1012 let async_control =
1013 control_tube.map(|c| AsyncTube::new(&ex, c).expect("failed to create async tube"));
1014
1015 let async_image = match disk_image.to_async_disk(&ex) {
1016 Ok(d) => d,
1017 Err(e) => panic!("Failed to create async disk {:#}", e),
1018 };
1019
1020 let disk_state = Rc::new(AsyncRwLock::new(DiskState {
1021 disk_image: async_image,
1022 read_only,
1023 sparse,
1024 id,
1025 worker_shared_state,
1026 }));
1027
1028 if let Err(err_string) = ex
1029 .run_until(async {
1030 let r = run_worker(&ex, &disk_state, &async_control, worker_rx, kill_evt).await;
1031 // Flush any in-memory disk image state to file.
1032 if let Err(e) = disk_state.lock().await.disk_image.flush().await {
1033 error!("failed to flush disk image when stopping worker: {e:?}");
1034 }
1035 r
1036 })
1037 .expect("run_until failed")
1038 {
1039 error!("{:#}", err_string);
1040 }
1041 });
1042 match self.worker_threads.entry(key) {
1043 std::collections::btree_map::Entry::Occupied(_) => unreachable!(),
1044 std::collections::btree_map::Entry::Vacant(e) => {
1045 Ok(e.insert((worker_thread, worker_tx)))
1046 }
1047 }
1048 }
1049
start_queue( &mut self, idx: usize, queue: Queue, _mem: GuestMemory, ) -> anyhow::Result<()>1050 pub fn start_queue(
1051 &mut self,
1052 idx: usize,
1053 queue: Queue,
1054 _mem: GuestMemory,
1055 ) -> anyhow::Result<()> {
1056 let (_, worker_tx) = self.start_worker(idx)?;
1057 worker_tx
1058 .unbounded_send(WorkerCmd::StartQueue { index: idx, queue })
1059 .expect("worker channel closed early");
1060 self.activated_queues.insert(idx);
1061 Ok(())
1062 }
1063
stop_queue(&mut self, idx: usize) -> anyhow::Result<Queue>1064 pub fn stop_queue(&mut self, idx: usize) -> anyhow::Result<Queue> {
1065 // TODO: Consider stopping the worker thread if this is the last queue managed by it. Then,
1066 // simplify `virtio_sleep` and/or `reset` methods.
1067 let (_, worker_tx) = self
1068 .worker_threads
1069 .get(if self.worker_per_queue { &idx } else { &0 })
1070 .context("worker not found")?;
1071 let (response_tx, response_rx) = oneshot::channel();
1072 worker_tx
1073 .unbounded_send(WorkerCmd::StopQueue {
1074 index: idx,
1075 response_tx,
1076 })
1077 .expect("worker channel closed early");
1078 let queue = cros_async::block_on(async {
1079 response_rx
1080 .await
1081 .expect("response_rx closed early")
1082 .context("queue not found")
1083 })?;
1084 self.activated_queues.remove(&idx);
1085 Ok(queue)
1086 }
1087 }
1088
1089 impl VirtioDevice for BlockAsync {
keep_rds(&self) -> Vec<RawDescriptor>1090 fn keep_rds(&self) -> Vec<RawDescriptor> {
1091 let mut keep_rds = Vec::new();
1092
1093 if let Some(disk_image) = &self.disk_image {
1094 keep_rds.extend(disk_image.as_raw_descriptors());
1095 }
1096
1097 if let Some(control_tube) = &self.control_tube {
1098 keep_rds.push(control_tube.as_raw_descriptor());
1099 }
1100
1101 keep_rds
1102 }
1103
features(&self) -> u641104 fn features(&self) -> u64 {
1105 self.avail_features
1106 }
1107
device_type(&self) -> DeviceType1108 fn device_type(&self) -> DeviceType {
1109 DeviceType::Block
1110 }
1111
queue_max_sizes(&self) -> &[u16]1112 fn queue_max_sizes(&self) -> &[u16] {
1113 &self.queue_sizes
1114 }
1115
read_config(&self, offset: u64, data: &mut [u8])1116 fn read_config(&self, offset: u64, data: &mut [u8]) {
1117 let config_space = {
1118 let disk_size = self.disk_size.load(Ordering::Acquire);
1119 Self::build_config_space(
1120 disk_size,
1121 self.seg_max,
1122 self.block_size,
1123 self.queue_sizes.len() as u16,
1124 )
1125 };
1126 copy_config(data, 0, config_space.as_bytes(), offset);
1127 }
1128
activate( &mut self, mem: GuestMemory, _interrupt: Interrupt, queues: BTreeMap<usize, Queue>, ) -> anyhow::Result<()>1129 fn activate(
1130 &mut self,
1131 mem: GuestMemory,
1132 _interrupt: Interrupt,
1133 queues: BTreeMap<usize, Queue>,
1134 ) -> anyhow::Result<()> {
1135 for (i, q) in queues {
1136 self.start_queue(i, q, mem.clone())?;
1137 }
1138 Ok(())
1139 }
1140
reset(&mut self) -> anyhow::Result<()>1141 fn reset(&mut self) -> anyhow::Result<()> {
1142 for (_, (_, worker_tx)) in self.worker_threads.iter_mut() {
1143 let (response_tx, response_rx) = oneshot::channel();
1144 worker_tx
1145 .unbounded_send(WorkerCmd::AbortQueues { response_tx })
1146 .expect("worker channel closed early");
1147 cros_async::block_on(async { response_rx.await.expect("response_rx closed early") });
1148 }
1149 self.activated_queues.clear();
1150 Ok(())
1151 }
1152
virtio_sleep(&mut self) -> anyhow::Result<Option<BTreeMap<usize, Queue>>>1153 fn virtio_sleep(&mut self) -> anyhow::Result<Option<BTreeMap<usize, Queue>>> {
1154 // Reclaim the queues from workers.
1155 let mut queues = BTreeMap::new();
1156 for index in self.activated_queues.clone() {
1157 queues.insert(index, self.stop_queue(index)?);
1158 }
1159 if queues.is_empty() {
1160 return Ok(None); // Not activated.
1161 }
1162 Ok(Some(queues))
1163 }
1164
virtio_wake( &mut self, queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>, ) -> anyhow::Result<()>1165 fn virtio_wake(
1166 &mut self,
1167 queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>,
1168 ) -> anyhow::Result<()> {
1169 if let Some((mem, _interrupt, queues)) = queues_state {
1170 for (i, q) in queues {
1171 self.start_queue(i, q, mem.clone())?
1172 }
1173 }
1174 Ok(())
1175 }
1176
virtio_snapshot(&mut self) -> anyhow::Result<serde_json::Value>1177 fn virtio_snapshot(&mut self) -> anyhow::Result<serde_json::Value> {
1178 // `virtio_sleep` ensures there is no pending state, except for the `Queue`s, which are
1179 // handled at a higher layer.
1180 Ok(serde_json::Value::Null)
1181 }
1182
virtio_restore(&mut self, data: serde_json::Value) -> anyhow::Result<()>1183 fn virtio_restore(&mut self, data: serde_json::Value) -> anyhow::Result<()> {
1184 anyhow::ensure!(
1185 data == serde_json::Value::Null,
1186 "unexpected snapshot data: should be null, got {}",
1187 data,
1188 );
1189 Ok(())
1190 }
1191
pci_address(&self) -> Option<PciAddress>1192 fn pci_address(&self) -> Option<PciAddress> {
1193 self.pci_address
1194 }
1195
bootorder_fw_cfg(&self, pci_slot: u8) -> Option<(Vec<u8>, usize)>1196 fn bootorder_fw_cfg(&self, pci_slot: u8) -> Option<(Vec<u8>, usize)> {
1197 self.boot_index
1198 .map(|s| (format!("scsi@{}/disk@0,0", pci_slot).as_bytes().to_vec(), s))
1199 }
1200 }
1201
1202 #[cfg(test)]
1203 mod tests {
1204 use std::fs::File;
1205 use std::mem::size_of_val;
1206 use std::sync::atomic::AtomicU64;
1207
1208 use data_model::Le32;
1209 use data_model::Le64;
1210 use disk::SingleFileDisk;
1211 use hypervisor::ProtectionType;
1212 use tempfile::tempfile;
1213 use tempfile::TempDir;
1214 use vm_memory::GuestAddress;
1215
1216 use super::*;
1217 use crate::suspendable_virtio_tests;
1218 use crate::virtio::base_features;
1219 use crate::virtio::descriptor_utils::create_descriptor_chain;
1220 use crate::virtio::descriptor_utils::DescriptorType;
1221 use crate::virtio::QueueConfig;
1222
1223 #[test]
read_size()1224 fn read_size() {
1225 let f = tempfile().unwrap();
1226 f.set_len(0x1000).unwrap();
1227
1228 let features = base_features(ProtectionType::Unprotected);
1229 let disk_option = DiskOption::default();
1230 let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1231 let mut num_sectors = [0u8; 4];
1232 b.read_config(0, &mut num_sectors);
1233 // size is 0x1000, so num_sectors is 8 (4096/512).
1234 assert_eq!([0x08, 0x00, 0x00, 0x00], num_sectors);
1235 let mut msw_sectors = [0u8; 4];
1236 b.read_config(4, &mut msw_sectors);
1237 // size is 0x1000, so msw_sectors is 0.
1238 assert_eq!([0x00, 0x00, 0x00, 0x00], msw_sectors);
1239 }
1240
1241 #[test]
read_block_size()1242 fn read_block_size() {
1243 let f = tempfile().unwrap();
1244 f.set_len(0x1000).unwrap();
1245
1246 let features = base_features(ProtectionType::Unprotected);
1247 let disk_option = DiskOption {
1248 block_size: 4096,
1249 sparse: false,
1250 ..Default::default()
1251 };
1252 let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1253 let mut blk_size = [0u8; 4];
1254 b.read_config(20, &mut blk_size);
1255 // blk_size should be 4096 (0x1000).
1256 assert_eq!([0x00, 0x10, 0x00, 0x00], blk_size);
1257 }
1258
1259 #[test]
read_features()1260 fn read_features() {
1261 let tempdir = TempDir::new().unwrap();
1262 let mut path = tempdir.path().to_owned();
1263 path.push("disk_image");
1264
1265 // Feature bits 0-23 and 50-127 are specific for the device type, but
1266 // at the moment crosvm only supports 64 bits of feature bits.
1267 const DEVICE_FEATURE_BITS: u64 = 0xffffff;
1268
1269 // read-write block device
1270 {
1271 let f = File::create(&path).unwrap();
1272 let features = base_features(ProtectionType::Unprotected);
1273 let disk_option = DiskOption::default();
1274 let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1275 // writable device should set VIRTIO_BLK_F_FLUSH + VIRTIO_BLK_F_DISCARD
1276 // + VIRTIO_BLK_F_WRITE_ZEROES + VIRTIO_BLK_F_BLK_SIZE + VIRTIO_BLK_F_SEG_MAX
1277 // + VIRTIO_BLK_F_MQ
1278 assert_eq!(0x7244, b.features() & DEVICE_FEATURE_BITS);
1279 }
1280
1281 // read-write block device, non-sparse
1282 {
1283 let f = File::create(&path).unwrap();
1284 let features = base_features(ProtectionType::Unprotected);
1285 let disk_option = DiskOption {
1286 sparse: false,
1287 ..Default::default()
1288 };
1289 let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1290 // writable device should set VIRTIO_F_FLUSH + VIRTIO_BLK_F_RO
1291 // + VIRTIO_BLK_F_BLK_SIZE + VIRTIO_BLK_F_SEG_MAX + VIRTIO_BLK_F_MQ
1292 assert_eq!(0x5244, b.features() & DEVICE_FEATURE_BITS);
1293 }
1294
1295 // read-only block device
1296 {
1297 let f = File::create(&path).unwrap();
1298 let features = base_features(ProtectionType::Unprotected);
1299 let disk_option = DiskOption {
1300 read_only: true,
1301 ..Default::default()
1302 };
1303 let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1304 // read-only device should set VIRTIO_BLK_F_RO
1305 // + VIRTIO_BLK_F_BLK_SIZE + VIRTIO_BLK_F_SEG_MAX + VIRTIO_BLK_F_MQ
1306 assert_eq!(0x1064, b.features() & DEVICE_FEATURE_BITS);
1307 }
1308 }
1309
1310 #[test]
check_pci_adress_configurability()1311 fn check_pci_adress_configurability() {
1312 let f = tempfile().unwrap();
1313
1314 let features = base_features(ProtectionType::Unprotected);
1315 let disk_option = DiskOption {
1316 pci_address: Some(PciAddress {
1317 bus: 0,
1318 dev: 1,
1319 func: 1,
1320 }),
1321 ..Default::default()
1322 };
1323 let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1324
1325 assert_eq!(b.pci_address(), disk_option.pci_address);
1326 }
1327
1328 #[test]
check_runtime_blk_queue_configurability()1329 fn check_runtime_blk_queue_configurability() {
1330 let tempdir = TempDir::new().unwrap();
1331 let mut path = tempdir.path().to_owned();
1332 path.push("disk_image");
1333 let features = base_features(ProtectionType::Unprotected);
1334
1335 // Default case
1336 let f = File::create(&path).unwrap();
1337 let disk_option = DiskOption::default();
1338 let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1339 assert_eq!(
1340 [DEFAULT_QUEUE_SIZE; DEFAULT_NUM_QUEUES as usize],
1341 b.queue_max_sizes()
1342 );
1343
1344 // Single queue of size 128
1345 let f = File::create(&path).unwrap();
1346 let disk_option = DiskOption::default();
1347 let b = BlockAsync::new(
1348 features,
1349 Box::new(f),
1350 &disk_option,
1351 None,
1352 Some(128),
1353 Some(1),
1354 )
1355 .unwrap();
1356 assert_eq!([128; 1], b.queue_max_sizes());
1357 // Single queue device should not set VIRTIO_BLK_F_MQ
1358 assert_eq!(0, b.features() & (1 << VIRTIO_BLK_F_MQ) as u64);
1359 }
1360
1361 #[test]
read_last_sector()1362 fn read_last_sector() {
1363 let ex = Executor::new().expect("creating an executor failed");
1364
1365 let f = tempfile().unwrap();
1366 let disk_size = 0x1000;
1367 f.set_len(disk_size).unwrap();
1368 let af = SingleFileDisk::new(f, &ex).expect("Failed to create SFD");
1369
1370 let mem = Rc::new(
1371 GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1372 .expect("Creating guest memory failed."),
1373 );
1374
1375 let req_hdr = virtio_blk_req_header {
1376 req_type: Le32::from(VIRTIO_BLK_T_IN),
1377 reserved: Le32::from(0),
1378 sector: Le64::from(7), // Disk is 8 sectors long, so this is the last valid sector.
1379 };
1380 mem.write_obj_at_addr(req_hdr, GuestAddress(0x1000))
1381 .expect("writing req failed");
1382
1383 let mut avail_desc = create_descriptor_chain(
1384 &mem,
1385 GuestAddress(0x100), // Place descriptor chain at 0x100.
1386 GuestAddress(0x1000), // Describe buffer at 0x1000.
1387 vec![
1388 // Request header
1389 (DescriptorType::Readable, size_of_val(&req_hdr) as u32),
1390 // I/O buffer (1 sector of data)
1391 (DescriptorType::Writable, 512),
1392 // Request status
1393 (DescriptorType::Writable, 1),
1394 ],
1395 0,
1396 )
1397 .expect("create_descriptor_chain failed");
1398
1399 let timer = Timer::new().expect("Failed to create a timer");
1400 let flush_timer = Rc::new(RefCell::new(
1401 TimerAsync::new(timer, &ex).expect("Failed to create an async timer"),
1402 ));
1403 let flush_timer_armed = Rc::new(RefCell::new(false));
1404
1405 let disk_state = Rc::new(AsyncRwLock::new(DiskState {
1406 disk_image: Box::new(af),
1407 read_only: false,
1408 sparse: true,
1409 id: None,
1410 worker_shared_state: Arc::new(AsyncRwLock::new(WorkerSharedState {
1411 disk_size: Arc::new(AtomicU64::new(disk_size)),
1412 })),
1413 }));
1414
1415 let fut = process_one_request(
1416 &mut avail_desc,
1417 &disk_state,
1418 &flush_timer,
1419 &flush_timer_armed,
1420 );
1421
1422 ex.run_until(fut)
1423 .expect("running executor failed")
1424 .expect("execute failed");
1425
1426 let status_offset = GuestAddress((0x1000 + size_of_val(&req_hdr) + 512) as u64);
1427 let status = mem.read_obj_from_addr::<u8>(status_offset).unwrap();
1428 assert_eq!(status, VIRTIO_BLK_S_OK);
1429 }
1430
1431 #[test]
read_beyond_last_sector()1432 fn read_beyond_last_sector() {
1433 let f = tempfile().unwrap();
1434 let disk_size = 0x1000;
1435 f.set_len(disk_size).unwrap();
1436 let mem = Rc::new(
1437 GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1438 .expect("Creating guest memory failed."),
1439 );
1440
1441 let req_hdr = virtio_blk_req_header {
1442 req_type: Le32::from(VIRTIO_BLK_T_IN),
1443 reserved: Le32::from(0),
1444 sector: Le64::from(7), // Disk is 8 sectors long, so this is the last valid sector.
1445 };
1446 mem.write_obj_at_addr(req_hdr, GuestAddress(0x1000))
1447 .expect("writing req failed");
1448
1449 let mut avail_desc = create_descriptor_chain(
1450 &mem,
1451 GuestAddress(0x100), // Place descriptor chain at 0x100.
1452 GuestAddress(0x1000), // Describe buffer at 0x1000.
1453 vec![
1454 // Request header
1455 (DescriptorType::Readable, size_of_val(&req_hdr) as u32),
1456 // I/O buffer (2 sectors of data - overlap the end of the disk).
1457 (DescriptorType::Writable, 512 * 2),
1458 // Request status
1459 (DescriptorType::Writable, 1),
1460 ],
1461 0,
1462 )
1463 .expect("create_descriptor_chain failed");
1464
1465 let ex = Executor::new().expect("creating an executor failed");
1466
1467 let af = SingleFileDisk::new(f, &ex).expect("Failed to create SFD");
1468 let timer = Timer::new().expect("Failed to create a timer");
1469 let flush_timer = Rc::new(RefCell::new(
1470 TimerAsync::new(timer, &ex).expect("Failed to create an async timer"),
1471 ));
1472 let flush_timer_armed = Rc::new(RefCell::new(false));
1473 let disk_state = Rc::new(AsyncRwLock::new(DiskState {
1474 disk_image: Box::new(af),
1475 read_only: false,
1476 sparse: true,
1477 id: None,
1478 worker_shared_state: Arc::new(AsyncRwLock::new(WorkerSharedState {
1479 disk_size: Arc::new(AtomicU64::new(disk_size)),
1480 })),
1481 }));
1482
1483 let fut = process_one_request(
1484 &mut avail_desc,
1485 &disk_state,
1486 &flush_timer,
1487 &flush_timer_armed,
1488 );
1489
1490 ex.run_until(fut)
1491 .expect("running executor failed")
1492 .expect("execute failed");
1493
1494 let status_offset = GuestAddress((0x1000 + size_of_val(&req_hdr) + 512 * 2) as u64);
1495 let status = mem.read_obj_from_addr::<u8>(status_offset).unwrap();
1496 assert_eq!(status, VIRTIO_BLK_S_IOERR);
1497 }
1498
1499 #[test]
get_id()1500 fn get_id() {
1501 let ex = Executor::new().expect("creating an executor failed");
1502
1503 let f = tempfile().unwrap();
1504 let disk_size = 0x1000;
1505 f.set_len(disk_size).unwrap();
1506
1507 let mem = GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1508 .expect("Creating guest memory failed.");
1509
1510 let req_hdr = virtio_blk_req_header {
1511 req_type: Le32::from(VIRTIO_BLK_T_GET_ID),
1512 reserved: Le32::from(0),
1513 sector: Le64::from(0),
1514 };
1515 mem.write_obj_at_addr(req_hdr, GuestAddress(0x1000))
1516 .expect("writing req failed");
1517
1518 let mut avail_desc = create_descriptor_chain(
1519 &mem,
1520 GuestAddress(0x100), // Place descriptor chain at 0x100.
1521 GuestAddress(0x1000), // Describe buffer at 0x1000.
1522 vec![
1523 // Request header
1524 (DescriptorType::Readable, size_of_val(&req_hdr) as u32),
1525 // I/O buffer (20 bytes for serial)
1526 (DescriptorType::Writable, 20),
1527 // Request status
1528 (DescriptorType::Writable, 1),
1529 ],
1530 0,
1531 )
1532 .expect("create_descriptor_chain failed");
1533
1534 let af = SingleFileDisk::new(f, &ex).expect("Failed to create SFD");
1535 let timer = Timer::new().expect("Failed to create a timer");
1536 let flush_timer = Rc::new(RefCell::new(
1537 TimerAsync::new(timer, &ex).expect("Failed to create an async timer"),
1538 ));
1539 let flush_timer_armed = Rc::new(RefCell::new(false));
1540
1541 let id = b"a20-byteserialnumber";
1542
1543 let disk_state = Rc::new(AsyncRwLock::new(DiskState {
1544 disk_image: Box::new(af),
1545 read_only: false,
1546 sparse: true,
1547 id: Some(*id),
1548 worker_shared_state: Arc::new(AsyncRwLock::new(WorkerSharedState {
1549 disk_size: Arc::new(AtomicU64::new(disk_size)),
1550 })),
1551 }));
1552
1553 let fut = process_one_request(
1554 &mut avail_desc,
1555 &disk_state,
1556 &flush_timer,
1557 &flush_timer_armed,
1558 );
1559
1560 ex.run_until(fut)
1561 .expect("running executor failed")
1562 .expect("execute failed");
1563
1564 let status_offset = GuestAddress((0x1000 + size_of_val(&req_hdr) + 512) as u64);
1565 let status = mem.read_obj_from_addr::<u8>(status_offset).unwrap();
1566 assert_eq!(status, VIRTIO_BLK_S_OK);
1567
1568 let id_offset = GuestAddress(0x1000 + size_of_val(&req_hdr) as u64);
1569 let returned_id = mem.read_obj_from_addr::<[u8; 20]>(id_offset).unwrap();
1570 assert_eq!(returned_id, *id);
1571 }
1572
1573 #[test]
reset_and_reactivate_single_worker()1574 fn reset_and_reactivate_single_worker() {
1575 reset_and_reactivate(false, None);
1576 }
1577
1578 #[test]
reset_and_reactivate_multiple_workers()1579 fn reset_and_reactivate_multiple_workers() {
1580 reset_and_reactivate(true, None);
1581 }
1582
1583 #[test]
1584 #[cfg(windows)]
reset_and_reactivate_overrlapped_io()1585 fn reset_and_reactivate_overrlapped_io() {
1586 reset_and_reactivate(
1587 false,
1588 Some(
1589 cros_async::sys::windows::ExecutorKindSys::Overlapped { concurrency: None }.into(),
1590 ),
1591 );
1592 }
1593
reset_and_reactivate( enables_multiple_workers: bool, async_executor: Option<cros_async::ExecutorKind>, )1594 fn reset_and_reactivate(
1595 enables_multiple_workers: bool,
1596 async_executor: Option<cros_async::ExecutorKind>,
1597 ) {
1598 // Create an empty disk image
1599 let f = tempfile::NamedTempFile::new().unwrap();
1600 f.as_file().set_len(0x1000).unwrap();
1601 // Close the file so that it is possible for the disk implementation to take exclusive
1602 // access when opening it.
1603 let path: tempfile::TempPath = f.into_temp_path();
1604
1605 // Create an empty guest memory
1606 let mem = GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1607 .expect("Creating guest memory failed.");
1608
1609 // Create a control tube.
1610 // NOTE: We don't want to drop the vmm half of the tube. That would cause the worker thread
1611 // will immediately fail, which isn't what we want to test in this case.
1612 let (_control_tube, control_tube_device) = Tube::pair().unwrap();
1613
1614 // Create a BlockAsync to test
1615 let features = base_features(ProtectionType::Unprotected);
1616 let id = b"Block serial number\0";
1617 let disk_option = DiskOption {
1618 path: path.to_path_buf(),
1619 read_only: true,
1620 id: Some(*id),
1621 sparse: false,
1622 multiple_workers: enables_multiple_workers,
1623 async_executor,
1624 ..Default::default()
1625 };
1626 let disk_image = disk_option.open().unwrap();
1627 let mut b = BlockAsync::new(
1628 features,
1629 disk_image,
1630 &disk_option,
1631 Some(control_tube_device),
1632 None,
1633 None,
1634 )
1635 .unwrap();
1636
1637 let interrupt = Interrupt::new_for_test();
1638
1639 // activate with queues of an arbitrary size.
1640 let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1641 q0.set_ready(true);
1642 let q0 = q0
1643 .activate(&mem, Event::new().unwrap(), interrupt.clone())
1644 .expect("QueueConfig::activate");
1645
1646 let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1647 q1.set_ready(true);
1648 let q1 = q1
1649 .activate(&mem, Event::new().unwrap(), interrupt.clone())
1650 .expect("QueueConfig::activate");
1651
1652 b.activate(mem.clone(), interrupt, BTreeMap::from([(0, q0), (1, q1)]))
1653 .expect("activate should succeed");
1654 // assert resources are consumed
1655 if !enables_multiple_workers {
1656 assert!(
1657 b.disk_image.is_none(),
1658 "BlockAsync should not have a disk image"
1659 );
1660 }
1661 assert!(
1662 b.control_tube.is_none(),
1663 "BlockAsync should not have a control tube"
1664 );
1665 assert_eq!(
1666 b.worker_threads.len(),
1667 if enables_multiple_workers { 2 } else { 1 }
1668 );
1669
1670 // reset and assert resources are still not back (should be in the worker thread)
1671 assert!(b.reset().is_ok(), "reset should succeed");
1672 if !enables_multiple_workers {
1673 assert!(
1674 b.disk_image.is_none(),
1675 "BlockAsync should not have a disk image"
1676 );
1677 }
1678 assert!(
1679 b.control_tube.is_none(),
1680 "BlockAsync should not have a control tube"
1681 );
1682 assert_eq!(
1683 b.worker_threads.len(),
1684 if enables_multiple_workers { 2 } else { 1 }
1685 );
1686 assert_eq!(b.id, Some(*b"Block serial number\0"));
1687
1688 // re-activate should succeed
1689 let interrupt = Interrupt::new_for_test();
1690 let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1691 q0.set_ready(true);
1692 let q0 = q0
1693 .activate(&mem, Event::new().unwrap(), interrupt.clone())
1694 .expect("QueueConfig::activate");
1695
1696 let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1697 q1.set_ready(true);
1698 let q1 = q1
1699 .activate(&mem, Event::new().unwrap(), interrupt.clone())
1700 .expect("QueueConfig::activate");
1701
1702 b.activate(mem, interrupt, BTreeMap::from([(0, q0), (1, q1)]))
1703 .expect("re-activate should succeed");
1704 }
1705
1706 #[test]
resize_with_single_worker()1707 fn resize_with_single_worker() {
1708 resize(false);
1709 }
1710
1711 #[test]
resize_with_multiple_workers()1712 fn resize_with_multiple_workers() {
1713 // Test resize handled by one worker affect the whole state
1714 resize(true);
1715 }
1716
resize(enables_multiple_workers: bool)1717 fn resize(enables_multiple_workers: bool) {
1718 // disk image size constants
1719 let original_size = 0x1000;
1720 let resized_size = 0x2000;
1721
1722 // Create an empty disk image
1723 let f = tempfile().unwrap();
1724 f.set_len(original_size).unwrap();
1725 let disk_image: Box<dyn DiskFile> = Box::new(f);
1726 assert_eq!(disk_image.get_len().unwrap(), original_size);
1727
1728 // Create an empty guest memory
1729 let mem = GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1730 .expect("Creating guest memory failed.");
1731
1732 // Create a control tube
1733 let (control_tube, control_tube_device) = Tube::pair().unwrap();
1734
1735 // Create a BlockAsync to test
1736 let features = base_features(ProtectionType::Unprotected);
1737 let disk_option = DiskOption {
1738 multiple_workers: enables_multiple_workers,
1739 ..Default::default()
1740 };
1741 let mut b = BlockAsync::new(
1742 features,
1743 disk_image.try_clone().unwrap(),
1744 &disk_option,
1745 Some(control_tube_device),
1746 None,
1747 None,
1748 )
1749 .unwrap();
1750
1751 let interrupt = Interrupt::new_for_test();
1752
1753 // activate with queues of an arbitrary size.
1754 let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1755 q0.set_ready(true);
1756 let q0 = q0
1757 .activate(&mem, Event::new().unwrap(), interrupt.clone())
1758 .expect("QueueConfig::activate");
1759
1760 let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1761 q1.set_ready(true);
1762 let q1 = q1
1763 .activate(&mem, Event::new().unwrap(), interrupt.clone())
1764 .expect("QueueConfig::activate");
1765
1766 b.activate(mem, interrupt.clone(), BTreeMap::from([(0, q0), (1, q1)]))
1767 .expect("activate should succeed");
1768
1769 // assert the original size first
1770 assert_eq!(
1771 b.disk_size.load(Ordering::Acquire),
1772 original_size,
1773 "disk_size should be the original size first"
1774 );
1775 let mut capacity = [0u8; 8];
1776 b.read_config(0, &mut capacity);
1777 assert_eq!(
1778 capacity,
1779 // original_size (0x1000) >> SECTOR_SHIFT (9) = 0x8
1780 [0x8, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00],
1781 "read_config should read the original capacity first"
1782 );
1783
1784 // assert resize works
1785 control_tube
1786 .send(&DiskControlCommand::Resize {
1787 new_size: resized_size,
1788 })
1789 .unwrap();
1790 assert_eq!(
1791 control_tube.recv::<DiskControlResult>().unwrap(),
1792 DiskControlResult::Ok,
1793 "resize command should succeed"
1794 );
1795 assert_eq!(
1796 b.disk_size.load(Ordering::Acquire),
1797 resized_size,
1798 "disk_size should be resized to the new size"
1799 );
1800 assert_eq!(
1801 disk_image.get_len().unwrap(),
1802 resized_size,
1803 "underlying disk image should be resized to the new size"
1804 );
1805 let mut capacity = [0u8; 8];
1806 b.read_config(0, &mut capacity);
1807 assert_eq!(
1808 capacity,
1809 // resized_size (0x2000) >> SECTOR_SHIFT (9) = 0x10
1810 [0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00],
1811 "read_config should read the resized capacity"
1812 );
1813 assert_eq!(
1814 interrupt
1815 .get_interrupt_evt()
1816 // Wait a bit until the blk signals the interrupt
1817 .wait_timeout(Duration::from_millis(300)),
1818 Ok(base::EventWaitResult::Signaled),
1819 "interrupt should be signaled"
1820 );
1821 assert_eq!(
1822 interrupt.read_interrupt_status(),
1823 crate::virtio::INTERRUPT_STATUS_CONFIG_CHANGED as u8,
1824 "INTERRUPT_STATUS_CONFIG_CHANGED should be signaled"
1825 );
1826 }
1827
1828 #[test]
run_worker_threads()1829 fn run_worker_threads() {
1830 // Create an empty duplicable disk image
1831 let f = tempfile().unwrap();
1832 f.set_len(0x1000).unwrap();
1833 let disk_image: Box<dyn DiskFile> = Box::new(f);
1834
1835 // Create an empty guest memory
1836 let mem = GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1837 .expect("Creating guest memory failed.");
1838
1839 // Create a BlockAsync to test with single worker thread
1840 let features = base_features(ProtectionType::Unprotected);
1841 let disk_option = DiskOption::default();
1842 let mut b = BlockAsync::new(
1843 features,
1844 disk_image.try_clone().unwrap(),
1845 &disk_option,
1846 None,
1847 None,
1848 None,
1849 )
1850 .unwrap();
1851
1852 // activate with queues of an arbitrary size.
1853 let interrupt = Interrupt::new_for_test();
1854 let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1855 q0.set_ready(true);
1856 let q0 = q0
1857 .activate(&mem, Event::new().unwrap(), interrupt.clone())
1858 .expect("QueueConfig::activate");
1859
1860 let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1861 q1.set_ready(true);
1862 let q1 = q1
1863 .activate(&mem, Event::new().unwrap(), interrupt.clone())
1864 .expect("QueueConfig::activate");
1865
1866 b.activate(mem.clone(), interrupt, BTreeMap::from([(0, q0), (1, q1)]))
1867 .expect("activate should succeed");
1868
1869 assert_eq!(b.worker_threads.len(), 1, "1 threads should be spawned.");
1870 drop(b);
1871
1872 // Create a BlockAsync to test with multiple worker threads
1873 let features = base_features(ProtectionType::Unprotected);
1874 let disk_option = DiskOption {
1875 read_only: true,
1876 sparse: false,
1877 multiple_workers: true,
1878 ..DiskOption::default()
1879 };
1880 let mut b = BlockAsync::new(features, disk_image, &disk_option, None, None, None).unwrap();
1881
1882 // activate should succeed
1883 let interrupt = Interrupt::new_for_test();
1884 let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1885 q0.set_ready(true);
1886 let q0 = q0
1887 .activate(&mem, Event::new().unwrap(), interrupt.clone())
1888 .expect("QueueConfig::activate");
1889
1890 let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1891 q1.set_ready(true);
1892 let q1 = q1
1893 .activate(&mem, Event::new().unwrap(), interrupt.clone())
1894 .expect("QueueConfig::activate");
1895
1896 b.activate(mem, interrupt, BTreeMap::from([(0, q0), (1, q1)]))
1897 .expect("activate should succeed");
1898
1899 assert_eq!(b.worker_threads.len(), 2, "2 threads should be spawned.");
1900 }
1901
1902 struct BlockContext {}
1903
modify_device(_block_context: &mut BlockContext, b: &mut BlockAsync)1904 fn modify_device(_block_context: &mut BlockContext, b: &mut BlockAsync) {
1905 b.avail_features = !b.avail_features;
1906 }
1907
create_device() -> (BlockContext, BlockAsync)1908 fn create_device() -> (BlockContext, BlockAsync) {
1909 // Create an empty disk image
1910 let f = tempfile().unwrap();
1911 f.set_len(0x1000).unwrap();
1912 let disk_image: Box<dyn DiskFile> = Box::new(f);
1913
1914 // Create a BlockAsync to test
1915 let features = base_features(ProtectionType::Unprotected);
1916 let id = b"Block serial number\0";
1917 let disk_option = DiskOption {
1918 read_only: true,
1919 id: Some(*id),
1920 sparse: false,
1921 multiple_workers: true,
1922 ..Default::default()
1923 };
1924 (
1925 BlockContext {},
1926 BlockAsync::new(
1927 features,
1928 disk_image.try_clone().unwrap(),
1929 &disk_option,
1930 None,
1931 None,
1932 None,
1933 )
1934 .unwrap(),
1935 )
1936 }
1937
1938 #[cfg(any(target_os = "android", target_os = "linux"))]
1939 suspendable_virtio_tests!(asyncblock, create_device, 2, modify_device);
1940 }
1941