1 // Copyright 2024 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 //! Virtio console device worker thread.
6
7 use std::collections::BTreeMap;
8 use std::collections::VecDeque;
9 use std::sync::mpsc;
10 use std::sync::Arc;
11
12 use anyhow::anyhow;
13 use anyhow::Context;
14 use base::error;
15 use base::Event;
16 use base::EventToken;
17 use base::WaitContext;
18 use base::WorkerThread;
19 use sync::Mutex;
20
21 use crate::virtio::console::control::process_control_receive_queue;
22 use crate::virtio::console::control::process_control_transmit_queue;
23 use crate::virtio::console::control::ControlMsgBytes;
24 use crate::virtio::console::input::process_receive_queue;
25 use crate::virtio::console::output::process_transmit_queue;
26 use crate::virtio::console::port::ConsolePort;
27 use crate::virtio::console::port::ConsolePortInfo;
28 use crate::virtio::Interrupt;
29 use crate::virtio::Queue;
30
31 const PORT0_RECEIVEQ_IDX: usize = 0;
32 const PORT0_TRANSMITQ_IDX: usize = 1;
33 const CONTROL_RECEIVEQ_IDX: usize = 2;
34 const CONTROL_TRANSMITQ_IDX: usize = 3;
35 const PORT1_RECEIVEQ_IDX: usize = 4;
36 const PORT1_TRANSMITQ_IDX: usize = 5;
37
38 pub struct WorkerPort {
39 info: Option<ConsolePortInfo>,
40
41 in_avail_evt: Event,
42 input_buffer: Arc<Mutex<VecDeque<u8>>>,
43 output: Box<dyn std::io::Write + Send>,
44 }
45
46 impl WorkerPort {
from_console_port(port: &mut ConsolePort) -> WorkerPort47 pub fn from_console_port(port: &mut ConsolePort) -> WorkerPort {
48 let in_avail_evt = port.clone_in_avail_evt().unwrap();
49 let input_buffer = port.clone_input_buffer();
50 let output = port
51 .take_output()
52 .unwrap_or_else(|| Box::new(std::io::sink()));
53 let info = port.port_info().cloned();
54 WorkerPort {
55 info,
56 in_avail_evt,
57 input_buffer,
58 output,
59 }
60 }
61
62 /// Restore the state retrieved from `ConsolePort` by `WorkerPort::from_console_port()`.
into_console_port(self, console_port: &mut ConsolePort)63 pub fn into_console_port(self, console_port: &mut ConsolePort) {
64 console_port.restore_output(self.output);
65 }
66
is_console(&self) -> bool67 pub fn is_console(&self) -> bool {
68 self.info
69 .as_ref()
70 .map(|info| info.console)
71 .unwrap_or_default()
72 }
73
name(&self) -> Option<&str>74 pub fn name(&self) -> Option<&str> {
75 self.info.as_ref().and_then(ConsolePortInfo::name)
76 }
77 }
78
79 #[derive(EventToken)]
80 enum Token {
81 ReceiveQueueAvailable(u32),
82 TransmitQueueAvailable(u32),
83 InputAvailable(u32),
84 ControlReceiveQueueAvailable,
85 ControlTransmitQueueAvailable,
86 InterruptResample,
87 WorkerRequest,
88 Kill,
89 }
90
91 pub enum WorkerRequest {
92 StartQueue {
93 idx: usize,
94 queue: Queue,
95 response_sender: mpsc::SyncSender<anyhow::Result<()>>,
96 },
97 StopQueue {
98 idx: usize,
99 response_sender: mpsc::SyncSender<Option<Queue>>,
100 },
101 }
102
103 pub struct Worker {
104 wait_ctx: WaitContext<Token>,
105 interrupt: Interrupt,
106
107 // Currently running queues.
108 queues: BTreeMap<usize, Queue>,
109
110 // Console ports indexed by port ID. At least port 0 will exist, and other ports may be
111 // available if `VIRTIO_CONSOLE_F_MULTIPORT` is enabled.
112 ports: Vec<WorkerPort>,
113
114 // Device-to-driver messages to be received by the driver via the control receiveq.
115 pending_receive_control_msgs: VecDeque<ControlMsgBytes>,
116
117 worker_receiver: mpsc::Receiver<WorkerRequest>,
118 worker_event: Event,
119 }
120
121 impl Worker {
new( interrupt: Interrupt, ports: Vec<WorkerPort>, worker_receiver: mpsc::Receiver<WorkerRequest>, worker_event: Event, ) -> anyhow::Result<Self>122 pub fn new(
123 interrupt: Interrupt,
124 ports: Vec<WorkerPort>,
125 worker_receiver: mpsc::Receiver<WorkerRequest>,
126 worker_event: Event,
127 ) -> anyhow::Result<Self> {
128 let wait_ctx = WaitContext::new().context("WaitContext::new() failed")?;
129
130 wait_ctx.add(&worker_event, Token::WorkerRequest)?;
131
132 for (index, port) in ports.iter().enumerate() {
133 let port_id = index as u32;
134 wait_ctx.add(&port.in_avail_evt, Token::InputAvailable(port_id))?;
135 }
136
137 if let Some(resample_evt) = interrupt.get_resample_evt() {
138 wait_ctx.add(resample_evt, Token::InterruptResample)?;
139 }
140
141 Ok(Worker {
142 wait_ctx,
143 interrupt,
144 queues: BTreeMap::new(),
145 ports,
146 pending_receive_control_msgs: VecDeque::new(),
147 worker_receiver,
148 worker_event,
149 })
150 }
151
run(&mut self, kill_evt: &Event) -> anyhow::Result<()>152 pub fn run(&mut self, kill_evt: &Event) -> anyhow::Result<()> {
153 self.wait_ctx.add(kill_evt, Token::Kill)?;
154 let res = self.run_loop();
155 self.wait_ctx.delete(kill_evt)?;
156 res
157 }
158
run_loop(&mut self) -> anyhow::Result<()>159 fn run_loop(&mut self) -> anyhow::Result<()> {
160 let mut running = true;
161 while running {
162 let events = self.wait_ctx.wait()?;
163
164 for event in events.iter().filter(|e| e.is_readable) {
165 match event.token {
166 Token::TransmitQueueAvailable(port_id) => {
167 if let (Some(port), Some(transmitq)) = (
168 self.ports.get_mut(port_id as usize),
169 transmitq_idx(port_id).and_then(|idx| self.queues.get_mut(&idx)),
170 ) {
171 transmitq
172 .event()
173 .wait()
174 .context("failed reading transmit queue Event")?;
175 process_transmit_queue(transmitq, &mut port.output);
176 }
177 }
178 Token::ReceiveQueueAvailable(port_id) | Token::InputAvailable(port_id) => {
179 let port = self.ports.get_mut(port_id as usize);
180 let receiveq =
181 receiveq_idx(port_id).and_then(|idx| self.queues.get_mut(&idx));
182
183 let event = if matches!(event.token, Token::ReceiveQueueAvailable(..)) {
184 receiveq.as_ref().map(|q| q.event())
185 } else {
186 port.as_ref().map(|p| &p.in_avail_evt)
187 };
188 if let Some(event) = event {
189 event.wait().context("failed to clear receive event")?;
190 }
191
192 if let (Some(port), Some(receiveq)) = (port, receiveq) {
193 let mut input_buffer = port.input_buffer.lock();
194 process_receive_queue(&mut input_buffer, receiveq);
195 }
196 }
197 Token::ControlReceiveQueueAvailable => {
198 if let Some(ctrl_receiveq) = self.queues.get_mut(&CONTROL_RECEIVEQ_IDX) {
199 ctrl_receiveq
200 .event()
201 .wait()
202 .context("failed waiting on control event")?;
203 process_control_receive_queue(
204 ctrl_receiveq,
205 &mut self.pending_receive_control_msgs,
206 );
207 }
208 }
209 Token::ControlTransmitQueueAvailable => {
210 if let Some(ctrl_transmitq) = self.queues.get_mut(&CONTROL_TRANSMITQ_IDX) {
211 ctrl_transmitq
212 .event()
213 .wait()
214 .context("failed waiting on control event")?;
215 process_control_transmit_queue(
216 ctrl_transmitq,
217 &self.ports,
218 &mut self.pending_receive_control_msgs,
219 );
220 }
221
222 // Attempt to send any new replies if there is space in the receiveq.
223 if let Some(ctrl_receiveq) = self.queues.get_mut(&CONTROL_RECEIVEQ_IDX) {
224 process_control_receive_queue(
225 ctrl_receiveq,
226 &mut self.pending_receive_control_msgs,
227 )
228 }
229 }
230 Token::InterruptResample => {
231 self.interrupt.interrupt_resample();
232 }
233 Token::WorkerRequest => {
234 self.worker_event.wait()?;
235 self.process_worker_requests();
236 }
237 Token::Kill => running = false,
238 }
239 }
240 }
241 Ok(())
242 }
243
process_worker_requests(&mut self)244 fn process_worker_requests(&mut self) {
245 while let Ok(request) = self.worker_receiver.try_recv() {
246 match request {
247 WorkerRequest::StartQueue {
248 idx,
249 queue,
250 response_sender,
251 } => {
252 let res = self.start_queue(idx, queue);
253 let _ = response_sender.send(res);
254 }
255 WorkerRequest::StopQueue {
256 idx,
257 response_sender,
258 } => {
259 let res = self.stop_queue(idx);
260 let _ = response_sender.send(res);
261 }
262 }
263 }
264 }
265
start_queue(&mut self, idx: usize, queue: Queue) -> anyhow::Result<()>266 fn start_queue(&mut self, idx: usize, queue: Queue) -> anyhow::Result<()> {
267 if let Some(port_id) = receiveq_port_id(idx) {
268 self.wait_ctx
269 .add(queue.event(), Token::ReceiveQueueAvailable(port_id))?;
270 } else if let Some(port_id) = transmitq_port_id(idx) {
271 self.wait_ctx
272 .add(queue.event(), Token::TransmitQueueAvailable(port_id))?;
273 } else if idx == CONTROL_RECEIVEQ_IDX {
274 self.wait_ctx
275 .add(queue.event(), Token::ControlReceiveQueueAvailable)?;
276 } else if idx == CONTROL_TRANSMITQ_IDX {
277 self.wait_ctx
278 .add(queue.event(), Token::ControlTransmitQueueAvailable)?;
279 } else {
280 return Err(anyhow!("unhandled queue idx {idx}"));
281 }
282
283 let prev = self.queues.insert(idx, queue);
284 assert!(prev.is_none());
285 Ok(())
286 }
287
stop_queue(&mut self, idx: usize) -> Option<Queue>288 fn stop_queue(&mut self, idx: usize) -> Option<Queue> {
289 if let Some(queue) = self.queues.remove(&idx) {
290 let _ = self.wait_ctx.delete(queue.event());
291 Some(queue)
292 } else {
293 None
294 }
295 }
296 }
297
298 pub struct WorkerHandle {
299 worker_thread: WorkerThread<Vec<WorkerPort>>,
300 worker_sender: mpsc::Sender<WorkerRequest>,
301 worker_event: Event,
302 }
303
304 impl WorkerHandle {
new(interrupt: Interrupt, ports: Vec<WorkerPort>) -> anyhow::Result<Self>305 pub fn new(interrupt: Interrupt, ports: Vec<WorkerPort>) -> anyhow::Result<Self> {
306 let worker_event = Event::new().context("Event::new")?;
307 let worker_event_clone = worker_event.try_clone().context("Event::try_clone")?;
308 let (worker_sender, worker_receiver) = mpsc::channel();
309 let worker_thread = WorkerThread::start("v_console", move |kill_evt| {
310 let mut worker = Worker::new(interrupt, ports, worker_receiver, worker_event_clone)
311 .expect("console Worker::new() failed");
312 if let Err(e) = worker.run(&kill_evt) {
313 error!("console worker failed: {:#}", e);
314 }
315 worker.ports
316 });
317 Ok(WorkerHandle {
318 worker_thread,
319 worker_sender,
320 worker_event,
321 })
322 }
323
start_queue(&mut self, idx: usize, queue: Queue) -> anyhow::Result<()>324 pub fn start_queue(&mut self, idx: usize, queue: Queue) -> anyhow::Result<()> {
325 let (response_sender, response_receiver) = mpsc::sync_channel(0);
326 self.worker_sender
327 .send(WorkerRequest::StartQueue {
328 idx,
329 queue,
330 response_sender,
331 })
332 .context("mpsc::Sender::send")?;
333 self.worker_event.signal().context("Event::signal")?;
334 response_receiver.recv().context("mpsc::Receiver::recv")?
335 }
336
stop_queue(&mut self, idx: usize) -> anyhow::Result<Option<Queue>>337 pub fn stop_queue(&mut self, idx: usize) -> anyhow::Result<Option<Queue>> {
338 let (response_sender, response_receiver) = mpsc::sync_channel(0);
339 self.worker_sender
340 .send(WorkerRequest::StopQueue {
341 idx,
342 response_sender,
343 })
344 .context("mpsc::Sender::send")?;
345 self.worker_event.signal().context("Event::signal")?;
346 response_receiver.recv().context("mpsc::Receiver::recv")
347 }
348
stop(self) -> Vec<WorkerPort>349 pub fn stop(self) -> Vec<WorkerPort> {
350 self.worker_thread.stop()
351 }
352 }
353
receiveq_idx(port_id: u32) -> Option<usize>354 fn receiveq_idx(port_id: u32) -> Option<usize> {
355 if port_id == 0 {
356 Some(PORT0_RECEIVEQ_IDX)
357 } else {
358 PORT1_RECEIVEQ_IDX.checked_add((port_id - 1).checked_mul(2)?.try_into().ok()?)
359 }
360 }
361
transmitq_idx(port_id: u32) -> Option<usize>362 fn transmitq_idx(port_id: u32) -> Option<usize> {
363 if port_id == 0 {
364 Some(PORT0_TRANSMITQ_IDX)
365 } else {
366 PORT1_TRANSMITQ_IDX.checked_add((port_id - 1).checked_mul(2)?.try_into().ok()?)
367 }
368 }
369
receiveq_port_id(queue_idx: usize) -> Option<u32>370 fn receiveq_port_id(queue_idx: usize) -> Option<u32> {
371 if queue_idx == PORT0_RECEIVEQ_IDX {
372 Some(0)
373 } else if queue_idx >= PORT1_RECEIVEQ_IDX && (queue_idx & 1) == 0 {
374 ((queue_idx - PORT1_RECEIVEQ_IDX) / 2)
375 .checked_add(1)?
376 .try_into()
377 .ok()
378 } else {
379 None
380 }
381 }
382
transmitq_port_id(queue_idx: usize) -> Option<u32>383 fn transmitq_port_id(queue_idx: usize) -> Option<u32> {
384 if queue_idx == PORT0_TRANSMITQ_IDX {
385 Some(0)
386 } else if queue_idx >= PORT1_TRANSMITQ_IDX && (queue_idx & 1) == 1 {
387 ((queue_idx - PORT1_TRANSMITQ_IDX) / 2)
388 .checked_add(1)?
389 .try_into()
390 .ok()
391 } else {
392 None
393 }
394 }
395
396 #[cfg(test)]
397 mod tests {
398 use super::*;
399
400 #[test]
test_receiveq_idx()401 fn test_receiveq_idx() {
402 assert_eq!(receiveq_idx(0), Some(0));
403 assert_eq!(receiveq_idx(1), Some(4));
404 assert_eq!(receiveq_idx(2), Some(6));
405 assert_eq!(receiveq_idx(3), Some(8));
406 }
407
408 #[test]
test_transmitq_idx()409 fn test_transmitq_idx() {
410 assert_eq!(transmitq_idx(0), Some(1));
411 assert_eq!(transmitq_idx(1), Some(5));
412 assert_eq!(transmitq_idx(2), Some(7));
413 assert_eq!(transmitq_idx(3), Some(9));
414 }
415
416 #[test]
test_receiveq_port_id()417 fn test_receiveq_port_id() {
418 assert_eq!(receiveq_port_id(0), Some(0));
419 assert_eq!(receiveq_port_id(1), None); // port0 transmitq
420 assert_eq!(receiveq_port_id(2), None); // ctrl receiveq
421 assert_eq!(receiveq_port_id(3), None); // ctrl transmitq
422 assert_eq!(receiveq_port_id(4), Some(1));
423 assert_eq!(receiveq_port_id(5), None);
424 assert_eq!(receiveq_port_id(6), Some(2));
425 assert_eq!(receiveq_port_id(7), None);
426 assert_eq!(receiveq_port_id(8), Some(3));
427 assert_eq!(receiveq_port_id(9), None);
428 }
429
430 #[test]
test_transmitq_port_id()431 fn test_transmitq_port_id() {
432 assert_eq!(transmitq_port_id(0), None); // port0 receiveq
433 assert_eq!(transmitq_port_id(1), Some(0));
434 assert_eq!(transmitq_port_id(2), None); // ctrl receiveq
435 assert_eq!(transmitq_port_id(3), None); // ctrl transmitq
436 assert_eq!(transmitq_port_id(4), None); // port1 receiveq
437 assert_eq!(transmitq_port_id(5), Some(1));
438 assert_eq!(transmitq_port_id(6), None);
439 assert_eq!(transmitq_port_id(7), Some(2));
440 assert_eq!(transmitq_port_id(8), None);
441 assert_eq!(transmitq_port_id(9), Some(3));
442 }
443 }
444