xref: /aosp_15_r20/external/crosvm/devices/src/virtio/console/worker.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1 // Copyright 2024 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 //! Virtio console device worker thread.
6 
7 use std::collections::BTreeMap;
8 use std::collections::VecDeque;
9 use std::sync::mpsc;
10 use std::sync::Arc;
11 
12 use anyhow::anyhow;
13 use anyhow::Context;
14 use base::error;
15 use base::Event;
16 use base::EventToken;
17 use base::WaitContext;
18 use base::WorkerThread;
19 use sync::Mutex;
20 
21 use crate::virtio::console::control::process_control_receive_queue;
22 use crate::virtio::console::control::process_control_transmit_queue;
23 use crate::virtio::console::control::ControlMsgBytes;
24 use crate::virtio::console::input::process_receive_queue;
25 use crate::virtio::console::output::process_transmit_queue;
26 use crate::virtio::console::port::ConsolePort;
27 use crate::virtio::console::port::ConsolePortInfo;
28 use crate::virtio::Interrupt;
29 use crate::virtio::Queue;
30 
31 const PORT0_RECEIVEQ_IDX: usize = 0;
32 const PORT0_TRANSMITQ_IDX: usize = 1;
33 const CONTROL_RECEIVEQ_IDX: usize = 2;
34 const CONTROL_TRANSMITQ_IDX: usize = 3;
35 const PORT1_RECEIVEQ_IDX: usize = 4;
36 const PORT1_TRANSMITQ_IDX: usize = 5;
37 
38 pub struct WorkerPort {
39     info: Option<ConsolePortInfo>,
40 
41     in_avail_evt: Event,
42     input_buffer: Arc<Mutex<VecDeque<u8>>>,
43     output: Box<dyn std::io::Write + Send>,
44 }
45 
46 impl WorkerPort {
from_console_port(port: &mut ConsolePort) -> WorkerPort47     pub fn from_console_port(port: &mut ConsolePort) -> WorkerPort {
48         let in_avail_evt = port.clone_in_avail_evt().unwrap();
49         let input_buffer = port.clone_input_buffer();
50         let output = port
51             .take_output()
52             .unwrap_or_else(|| Box::new(std::io::sink()));
53         let info = port.port_info().cloned();
54         WorkerPort {
55             info,
56             in_avail_evt,
57             input_buffer,
58             output,
59         }
60     }
61 
62     /// Restore the state retrieved from `ConsolePort` by `WorkerPort::from_console_port()`.
into_console_port(self, console_port: &mut ConsolePort)63     pub fn into_console_port(self, console_port: &mut ConsolePort) {
64         console_port.restore_output(self.output);
65     }
66 
is_console(&self) -> bool67     pub fn is_console(&self) -> bool {
68         self.info
69             .as_ref()
70             .map(|info| info.console)
71             .unwrap_or_default()
72     }
73 
name(&self) -> Option<&str>74     pub fn name(&self) -> Option<&str> {
75         self.info.as_ref().and_then(ConsolePortInfo::name)
76     }
77 }
78 
79 #[derive(EventToken)]
80 enum Token {
81     ReceiveQueueAvailable(u32),
82     TransmitQueueAvailable(u32),
83     InputAvailable(u32),
84     ControlReceiveQueueAvailable,
85     ControlTransmitQueueAvailable,
86     InterruptResample,
87     WorkerRequest,
88     Kill,
89 }
90 
91 pub enum WorkerRequest {
92     StartQueue {
93         idx: usize,
94         queue: Queue,
95         response_sender: mpsc::SyncSender<anyhow::Result<()>>,
96     },
97     StopQueue {
98         idx: usize,
99         response_sender: mpsc::SyncSender<Option<Queue>>,
100     },
101 }
102 
103 pub struct Worker {
104     wait_ctx: WaitContext<Token>,
105     interrupt: Interrupt,
106 
107     // Currently running queues.
108     queues: BTreeMap<usize, Queue>,
109 
110     // Console ports indexed by port ID. At least port 0 will exist, and other ports may be
111     // available if `VIRTIO_CONSOLE_F_MULTIPORT` is enabled.
112     ports: Vec<WorkerPort>,
113 
114     // Device-to-driver messages to be received by the driver via the control receiveq.
115     pending_receive_control_msgs: VecDeque<ControlMsgBytes>,
116 
117     worker_receiver: mpsc::Receiver<WorkerRequest>,
118     worker_event: Event,
119 }
120 
121 impl Worker {
new( interrupt: Interrupt, ports: Vec<WorkerPort>, worker_receiver: mpsc::Receiver<WorkerRequest>, worker_event: Event, ) -> anyhow::Result<Self>122     pub fn new(
123         interrupt: Interrupt,
124         ports: Vec<WorkerPort>,
125         worker_receiver: mpsc::Receiver<WorkerRequest>,
126         worker_event: Event,
127     ) -> anyhow::Result<Self> {
128         let wait_ctx = WaitContext::new().context("WaitContext::new() failed")?;
129 
130         wait_ctx.add(&worker_event, Token::WorkerRequest)?;
131 
132         for (index, port) in ports.iter().enumerate() {
133             let port_id = index as u32;
134             wait_ctx.add(&port.in_avail_evt, Token::InputAvailable(port_id))?;
135         }
136 
137         if let Some(resample_evt) = interrupt.get_resample_evt() {
138             wait_ctx.add(resample_evt, Token::InterruptResample)?;
139         }
140 
141         Ok(Worker {
142             wait_ctx,
143             interrupt,
144             queues: BTreeMap::new(),
145             ports,
146             pending_receive_control_msgs: VecDeque::new(),
147             worker_receiver,
148             worker_event,
149         })
150     }
151 
run(&mut self, kill_evt: &Event) -> anyhow::Result<()>152     pub fn run(&mut self, kill_evt: &Event) -> anyhow::Result<()> {
153         self.wait_ctx.add(kill_evt, Token::Kill)?;
154         let res = self.run_loop();
155         self.wait_ctx.delete(kill_evt)?;
156         res
157     }
158 
run_loop(&mut self) -> anyhow::Result<()>159     fn run_loop(&mut self) -> anyhow::Result<()> {
160         let mut running = true;
161         while running {
162             let events = self.wait_ctx.wait()?;
163 
164             for event in events.iter().filter(|e| e.is_readable) {
165                 match event.token {
166                     Token::TransmitQueueAvailable(port_id) => {
167                         if let (Some(port), Some(transmitq)) = (
168                             self.ports.get_mut(port_id as usize),
169                             transmitq_idx(port_id).and_then(|idx| self.queues.get_mut(&idx)),
170                         ) {
171                             transmitq
172                                 .event()
173                                 .wait()
174                                 .context("failed reading transmit queue Event")?;
175                             process_transmit_queue(transmitq, &mut port.output);
176                         }
177                     }
178                     Token::ReceiveQueueAvailable(port_id) | Token::InputAvailable(port_id) => {
179                         let port = self.ports.get_mut(port_id as usize);
180                         let receiveq =
181                             receiveq_idx(port_id).and_then(|idx| self.queues.get_mut(&idx));
182 
183                         let event = if matches!(event.token, Token::ReceiveQueueAvailable(..)) {
184                             receiveq.as_ref().map(|q| q.event())
185                         } else {
186                             port.as_ref().map(|p| &p.in_avail_evt)
187                         };
188                         if let Some(event) = event {
189                             event.wait().context("failed to clear receive event")?;
190                         }
191 
192                         if let (Some(port), Some(receiveq)) = (port, receiveq) {
193                             let mut input_buffer = port.input_buffer.lock();
194                             process_receive_queue(&mut input_buffer, receiveq);
195                         }
196                     }
197                     Token::ControlReceiveQueueAvailable => {
198                         if let Some(ctrl_receiveq) = self.queues.get_mut(&CONTROL_RECEIVEQ_IDX) {
199                             ctrl_receiveq
200                                 .event()
201                                 .wait()
202                                 .context("failed waiting on control event")?;
203                             process_control_receive_queue(
204                                 ctrl_receiveq,
205                                 &mut self.pending_receive_control_msgs,
206                             );
207                         }
208                     }
209                     Token::ControlTransmitQueueAvailable => {
210                         if let Some(ctrl_transmitq) = self.queues.get_mut(&CONTROL_TRANSMITQ_IDX) {
211                             ctrl_transmitq
212                                 .event()
213                                 .wait()
214                                 .context("failed waiting on control event")?;
215                             process_control_transmit_queue(
216                                 ctrl_transmitq,
217                                 &self.ports,
218                                 &mut self.pending_receive_control_msgs,
219                             );
220                         }
221 
222                         // Attempt to send any new replies if there is space in the receiveq.
223                         if let Some(ctrl_receiveq) = self.queues.get_mut(&CONTROL_RECEIVEQ_IDX) {
224                             process_control_receive_queue(
225                                 ctrl_receiveq,
226                                 &mut self.pending_receive_control_msgs,
227                             )
228                         }
229                     }
230                     Token::InterruptResample => {
231                         self.interrupt.interrupt_resample();
232                     }
233                     Token::WorkerRequest => {
234                         self.worker_event.wait()?;
235                         self.process_worker_requests();
236                     }
237                     Token::Kill => running = false,
238                 }
239             }
240         }
241         Ok(())
242     }
243 
process_worker_requests(&mut self)244     fn process_worker_requests(&mut self) {
245         while let Ok(request) = self.worker_receiver.try_recv() {
246             match request {
247                 WorkerRequest::StartQueue {
248                     idx,
249                     queue,
250                     response_sender,
251                 } => {
252                     let res = self.start_queue(idx, queue);
253                     let _ = response_sender.send(res);
254                 }
255                 WorkerRequest::StopQueue {
256                     idx,
257                     response_sender,
258                 } => {
259                     let res = self.stop_queue(idx);
260                     let _ = response_sender.send(res);
261                 }
262             }
263         }
264     }
265 
start_queue(&mut self, idx: usize, queue: Queue) -> anyhow::Result<()>266     fn start_queue(&mut self, idx: usize, queue: Queue) -> anyhow::Result<()> {
267         if let Some(port_id) = receiveq_port_id(idx) {
268             self.wait_ctx
269                 .add(queue.event(), Token::ReceiveQueueAvailable(port_id))?;
270         } else if let Some(port_id) = transmitq_port_id(idx) {
271             self.wait_ctx
272                 .add(queue.event(), Token::TransmitQueueAvailable(port_id))?;
273         } else if idx == CONTROL_RECEIVEQ_IDX {
274             self.wait_ctx
275                 .add(queue.event(), Token::ControlReceiveQueueAvailable)?;
276         } else if idx == CONTROL_TRANSMITQ_IDX {
277             self.wait_ctx
278                 .add(queue.event(), Token::ControlTransmitQueueAvailable)?;
279         } else {
280             return Err(anyhow!("unhandled queue idx {idx}"));
281         }
282 
283         let prev = self.queues.insert(idx, queue);
284         assert!(prev.is_none());
285         Ok(())
286     }
287 
stop_queue(&mut self, idx: usize) -> Option<Queue>288     fn stop_queue(&mut self, idx: usize) -> Option<Queue> {
289         if let Some(queue) = self.queues.remove(&idx) {
290             let _ = self.wait_ctx.delete(queue.event());
291             Some(queue)
292         } else {
293             None
294         }
295     }
296 }
297 
298 pub struct WorkerHandle {
299     worker_thread: WorkerThread<Vec<WorkerPort>>,
300     worker_sender: mpsc::Sender<WorkerRequest>,
301     worker_event: Event,
302 }
303 
304 impl WorkerHandle {
new(interrupt: Interrupt, ports: Vec<WorkerPort>) -> anyhow::Result<Self>305     pub fn new(interrupt: Interrupt, ports: Vec<WorkerPort>) -> anyhow::Result<Self> {
306         let worker_event = Event::new().context("Event::new")?;
307         let worker_event_clone = worker_event.try_clone().context("Event::try_clone")?;
308         let (worker_sender, worker_receiver) = mpsc::channel();
309         let worker_thread = WorkerThread::start("v_console", move |kill_evt| {
310             let mut worker = Worker::new(interrupt, ports, worker_receiver, worker_event_clone)
311                 .expect("console Worker::new() failed");
312             if let Err(e) = worker.run(&kill_evt) {
313                 error!("console worker failed: {:#}", e);
314             }
315             worker.ports
316         });
317         Ok(WorkerHandle {
318             worker_thread,
319             worker_sender,
320             worker_event,
321         })
322     }
323 
start_queue(&mut self, idx: usize, queue: Queue) -> anyhow::Result<()>324     pub fn start_queue(&mut self, idx: usize, queue: Queue) -> anyhow::Result<()> {
325         let (response_sender, response_receiver) = mpsc::sync_channel(0);
326         self.worker_sender
327             .send(WorkerRequest::StartQueue {
328                 idx,
329                 queue,
330                 response_sender,
331             })
332             .context("mpsc::Sender::send")?;
333         self.worker_event.signal().context("Event::signal")?;
334         response_receiver.recv().context("mpsc::Receiver::recv")?
335     }
336 
stop_queue(&mut self, idx: usize) -> anyhow::Result<Option<Queue>>337     pub fn stop_queue(&mut self, idx: usize) -> anyhow::Result<Option<Queue>> {
338         let (response_sender, response_receiver) = mpsc::sync_channel(0);
339         self.worker_sender
340             .send(WorkerRequest::StopQueue {
341                 idx,
342                 response_sender,
343             })
344             .context("mpsc::Sender::send")?;
345         self.worker_event.signal().context("Event::signal")?;
346         response_receiver.recv().context("mpsc::Receiver::recv")
347     }
348 
stop(self) -> Vec<WorkerPort>349     pub fn stop(self) -> Vec<WorkerPort> {
350         self.worker_thread.stop()
351     }
352 }
353 
receiveq_idx(port_id: u32) -> Option<usize>354 fn receiveq_idx(port_id: u32) -> Option<usize> {
355     if port_id == 0 {
356         Some(PORT0_RECEIVEQ_IDX)
357     } else {
358         PORT1_RECEIVEQ_IDX.checked_add((port_id - 1).checked_mul(2)?.try_into().ok()?)
359     }
360 }
361 
transmitq_idx(port_id: u32) -> Option<usize>362 fn transmitq_idx(port_id: u32) -> Option<usize> {
363     if port_id == 0 {
364         Some(PORT0_TRANSMITQ_IDX)
365     } else {
366         PORT1_TRANSMITQ_IDX.checked_add((port_id - 1).checked_mul(2)?.try_into().ok()?)
367     }
368 }
369 
receiveq_port_id(queue_idx: usize) -> Option<u32>370 fn receiveq_port_id(queue_idx: usize) -> Option<u32> {
371     if queue_idx == PORT0_RECEIVEQ_IDX {
372         Some(0)
373     } else if queue_idx >= PORT1_RECEIVEQ_IDX && (queue_idx & 1) == 0 {
374         ((queue_idx - PORT1_RECEIVEQ_IDX) / 2)
375             .checked_add(1)?
376             .try_into()
377             .ok()
378     } else {
379         None
380     }
381 }
382 
transmitq_port_id(queue_idx: usize) -> Option<u32>383 fn transmitq_port_id(queue_idx: usize) -> Option<u32> {
384     if queue_idx == PORT0_TRANSMITQ_IDX {
385         Some(0)
386     } else if queue_idx >= PORT1_TRANSMITQ_IDX && (queue_idx & 1) == 1 {
387         ((queue_idx - PORT1_TRANSMITQ_IDX) / 2)
388             .checked_add(1)?
389             .try_into()
390             .ok()
391     } else {
392         None
393     }
394 }
395 
396 #[cfg(test)]
397 mod tests {
398     use super::*;
399 
400     #[test]
test_receiveq_idx()401     fn test_receiveq_idx() {
402         assert_eq!(receiveq_idx(0), Some(0));
403         assert_eq!(receiveq_idx(1), Some(4));
404         assert_eq!(receiveq_idx(2), Some(6));
405         assert_eq!(receiveq_idx(3), Some(8));
406     }
407 
408     #[test]
test_transmitq_idx()409     fn test_transmitq_idx() {
410         assert_eq!(transmitq_idx(0), Some(1));
411         assert_eq!(transmitq_idx(1), Some(5));
412         assert_eq!(transmitq_idx(2), Some(7));
413         assert_eq!(transmitq_idx(3), Some(9));
414     }
415 
416     #[test]
test_receiveq_port_id()417     fn test_receiveq_port_id() {
418         assert_eq!(receiveq_port_id(0), Some(0));
419         assert_eq!(receiveq_port_id(1), None); // port0 transmitq
420         assert_eq!(receiveq_port_id(2), None); // ctrl receiveq
421         assert_eq!(receiveq_port_id(3), None); // ctrl transmitq
422         assert_eq!(receiveq_port_id(4), Some(1));
423         assert_eq!(receiveq_port_id(5), None);
424         assert_eq!(receiveq_port_id(6), Some(2));
425         assert_eq!(receiveq_port_id(7), None);
426         assert_eq!(receiveq_port_id(8), Some(3));
427         assert_eq!(receiveq_port_id(9), None);
428     }
429 
430     #[test]
test_transmitq_port_id()431     fn test_transmitq_port_id() {
432         assert_eq!(transmitq_port_id(0), None); // port0 receiveq
433         assert_eq!(transmitq_port_id(1), Some(0));
434         assert_eq!(transmitq_port_id(2), None); // ctrl receiveq
435         assert_eq!(transmitq_port_id(3), None); // ctrl transmitq
436         assert_eq!(transmitq_port_id(4), None); // port1 receiveq
437         assert_eq!(transmitq_port_id(5), Some(1));
438         assert_eq!(transmitq_port_id(6), None);
439         assert_eq!(transmitq_port_id(7), Some(2));
440         assert_eq!(transmitq_port_id(8), None);
441         assert_eq!(transmitq_port_id(9), Some(3));
442     }
443 }
444