xref: /aosp_15_r20/external/crosvm/devices/src/virtio/vhost/user/device/wl.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::cell::RefCell;
6 use std::collections::BTreeMap;
7 use std::path::PathBuf;
8 use std::rc::Rc;
9 use std::sync::Arc;
10 use std::thread;
11 use std::time::Duration;
12 use std::time::Instant;
13 
14 use anyhow::bail;
15 use anyhow::Context;
16 use argh::FromArgs;
17 use base::clone_descriptor;
18 use base::error;
19 use base::warn;
20 use base::RawDescriptor;
21 use base::SafeDescriptor;
22 use base::Tube;
23 use base::UnixSeqpacket;
24 use cros_async::AsyncWrapper;
25 use cros_async::EventAsync;
26 use cros_async::Executor;
27 use cros_async::IoSource;
28 use hypervisor::ProtectionType;
29 #[cfg(feature = "minigbm")]
30 use rutabaga_gfx::RutabagaGralloc;
31 #[cfg(feature = "minigbm")]
32 use rutabaga_gfx::RutabagaGrallocBackendFlags;
33 use vm_memory::GuestMemory;
34 use vmm_vhost::message::VhostUserProtocolFeatures;
35 use vmm_vhost::VHOST_USER_F_PROTOCOL_FEATURES;
36 
37 use crate::virtio::base_features;
38 use crate::virtio::device_constants::wl::NUM_QUEUES;
39 use crate::virtio::device_constants::wl::VIRTIO_WL_F_SEND_FENCES;
40 use crate::virtio::device_constants::wl::VIRTIO_WL_F_TRANS_FLAGS;
41 use crate::virtio::device_constants::wl::VIRTIO_WL_F_USE_SHMEM;
42 use crate::virtio::vhost::user::device::handler::Error as DeviceError;
43 use crate::virtio::vhost::user::device::handler::VhostBackendReqConnection;
44 use crate::virtio::vhost::user::device::handler::VhostBackendReqConnectionState;
45 use crate::virtio::vhost::user::device::handler::VhostUserDevice;
46 use crate::virtio::vhost::user::device::handler::WorkerState;
47 use crate::virtio::vhost::user::device::BackendConnection;
48 use crate::virtio::wl;
49 use crate::virtio::Queue;
50 use crate::virtio::SharedMemoryRegion;
51 
run_out_queue( queue: Rc<RefCell<Queue>>, kick_evt: EventAsync, wlstate: Rc<RefCell<wl::WlState>>, )52 async fn run_out_queue(
53     queue: Rc<RefCell<Queue>>,
54     kick_evt: EventAsync,
55     wlstate: Rc<RefCell<wl::WlState>>,
56 ) {
57     loop {
58         if let Err(e) = kick_evt.next_val().await {
59             error!("Failed to read kick event for out queue: {}", e);
60             break;
61         }
62 
63         wl::process_out_queue(&mut queue.borrow_mut(), &mut wlstate.borrow_mut());
64     }
65 }
66 
run_in_queue( queue: Rc<RefCell<Queue>>, kick_evt: EventAsync, wlstate: Rc<RefCell<wl::WlState>>, wlstate_ctx: IoSource<AsyncWrapper<SafeDescriptor>>, )67 async fn run_in_queue(
68     queue: Rc<RefCell<Queue>>,
69     kick_evt: EventAsync,
70     wlstate: Rc<RefCell<wl::WlState>>,
71     wlstate_ctx: IoSource<AsyncWrapper<SafeDescriptor>>,
72 ) {
73     loop {
74         if let Err(e) = wlstate_ctx.wait_readable().await {
75             error!(
76                 "Failed to wait for inner WaitContext to become readable: {}",
77                 e
78             );
79             break;
80         }
81 
82         if wl::process_in_queue(&mut queue.borrow_mut(), &mut wlstate.borrow_mut())
83             == Err(wl::DescriptorsExhausted)
84         {
85             if let Err(e) = kick_evt.next_val().await {
86                 error!("Failed to read kick event for in queue: {}", e);
87                 break;
88             }
89         }
90     }
91 }
92 
93 struct WlBackend {
94     ex: Executor,
95     wayland_paths: Option<BTreeMap<String, PathBuf>>,
96     resource_bridge: Option<Tube>,
97     use_transition_flags: bool,
98     use_send_vfd_v2: bool,
99     use_shmem: bool,
100     features: u64,
101     acked_features: u64,
102     wlstate: Option<Rc<RefCell<wl::WlState>>>,
103     workers: [Option<WorkerState<Rc<RefCell<Queue>>, ()>>; NUM_QUEUES],
104     backend_req_conn: VhostBackendReqConnectionState,
105 }
106 
107 impl WlBackend {
new( ex: &Executor, wayland_paths: BTreeMap<String, PathBuf>, resource_bridge: Option<Tube>, ) -> WlBackend108     fn new(
109         ex: &Executor,
110         wayland_paths: BTreeMap<String, PathBuf>,
111         resource_bridge: Option<Tube>,
112     ) -> WlBackend {
113         let features = base_features(ProtectionType::Unprotected)
114             | 1 << VIRTIO_WL_F_TRANS_FLAGS
115             | 1 << VIRTIO_WL_F_SEND_FENCES
116             | 1 << VIRTIO_WL_F_USE_SHMEM
117             | 1 << VHOST_USER_F_PROTOCOL_FEATURES;
118         WlBackend {
119             ex: ex.clone(),
120             wayland_paths: Some(wayland_paths),
121             resource_bridge,
122             use_transition_flags: false,
123             use_send_vfd_v2: false,
124             use_shmem: false,
125             features,
126             acked_features: 0,
127             wlstate: None,
128             workers: Default::default(),
129             backend_req_conn: VhostBackendReqConnectionState::NoConnection,
130         }
131     }
132 }
133 
134 impl VhostUserDevice for WlBackend {
max_queue_num(&self) -> usize135     fn max_queue_num(&self) -> usize {
136         NUM_QUEUES
137     }
138 
features(&self) -> u64139     fn features(&self) -> u64 {
140         self.features
141     }
142 
ack_features(&mut self, value: u64) -> anyhow::Result<()>143     fn ack_features(&mut self, value: u64) -> anyhow::Result<()> {
144         self.acked_features |= value;
145 
146         if value & (1 << VIRTIO_WL_F_TRANS_FLAGS) != 0 {
147             self.use_transition_flags = true;
148         }
149         if value & (1 << VIRTIO_WL_F_SEND_FENCES) != 0 {
150             self.use_send_vfd_v2 = true;
151         }
152         if value & (1 << VIRTIO_WL_F_USE_SHMEM) != 0 {
153             self.use_shmem = true;
154         }
155 
156         Ok(())
157     }
158 
protocol_features(&self) -> VhostUserProtocolFeatures159     fn protocol_features(&self) -> VhostUserProtocolFeatures {
160         VhostUserProtocolFeatures::BACKEND_REQ | VhostUserProtocolFeatures::SHARED_MEMORY_REGIONS
161     }
162 
read_config(&self, _offset: u64, _dst: &mut [u8])163     fn read_config(&self, _offset: u64, _dst: &mut [u8]) {}
164 
start_queue(&mut self, idx: usize, queue: Queue, _mem: GuestMemory) -> anyhow::Result<()>165     fn start_queue(&mut self, idx: usize, queue: Queue, _mem: GuestMemory) -> anyhow::Result<()> {
166         if self.workers[idx].is_some() {
167             warn!("Starting new queue handler without stopping old handler");
168             self.stop_queue(idx)?;
169         }
170 
171         let kick_evt = queue
172             .event()
173             .try_clone()
174             .context("failed to clone queue event")?;
175         let kick_evt = EventAsync::new(kick_evt, &self.ex)
176             .context("failed to create EventAsync for kick_evt")?;
177 
178         if !self.use_shmem {
179             bail!("Incompatible driver: vhost-user-wl requires shmem support");
180         }
181 
182         // We use this de-structuring let binding to separate borrows so that the compiler doesn't
183         // think we're borrowing all of `self` in the closure below.
184         let WlBackend {
185             ref mut wayland_paths,
186             ref mut resource_bridge,
187             ref use_transition_flags,
188             ref use_send_vfd_v2,
189             ..
190         } = self;
191 
192         #[cfg(feature = "minigbm")]
193         let gralloc = RutabagaGralloc::new(RutabagaGrallocBackendFlags::new())
194             .context("Failed to initailize gralloc")?;
195         let wlstate = match &self.wlstate {
196             None => {
197                 let mapper = {
198                     match &mut self.backend_req_conn {
199                         VhostBackendReqConnectionState::Connected(request) => {
200                             request.take_shmem_mapper()?
201                         }
202                         VhostBackendReqConnectionState::NoConnection => {
203                             bail!("No backend request connection found")
204                         }
205                     }
206                 };
207 
208                 let wlstate = Rc::new(RefCell::new(wl::WlState::new(
209                     wayland_paths.take().expect("WlState already initialized"),
210                     mapper,
211                     *use_transition_flags,
212                     *use_send_vfd_v2,
213                     resource_bridge.take(),
214                     #[cfg(feature = "minigbm")]
215                     gralloc,
216                     None, /* address_offset */
217                 )));
218                 self.wlstate = Some(wlstate.clone());
219                 wlstate
220             }
221             Some(state) => state.clone(),
222         };
223         let queue = Rc::new(RefCell::new(queue));
224         let queue_task = match idx {
225             0 => {
226                 let wlstate_ctx = clone_descriptor(wlstate.borrow().wait_ctx())
227                     .map(AsyncWrapper::new)
228                     .context("failed to clone inner WaitContext for WlState")
229                     .and_then(|ctx| {
230                         self.ex
231                             .async_from(ctx)
232                             .context("failed to create async WaitContext")
233                     })?;
234 
235                 self.ex
236                     .spawn_local(run_in_queue(queue.clone(), kick_evt, wlstate, wlstate_ctx))
237             }
238             1 => self
239                 .ex
240                 .spawn_local(run_out_queue(queue.clone(), kick_evt, wlstate)),
241             _ => bail!("attempted to start unknown queue: {}", idx),
242         };
243         self.workers[idx] = Some(WorkerState { queue_task, queue });
244         Ok(())
245     }
246 
stop_queue(&mut self, idx: usize) -> anyhow::Result<Queue>247     fn stop_queue(&mut self, idx: usize) -> anyhow::Result<Queue> {
248         if let Some(worker) = self.workers.get_mut(idx).and_then(Option::take) {
249             // Wait for queue_task to be aborted.
250             let _ = self.ex.run_until(worker.queue_task.cancel());
251 
252             let queue = match Rc::try_unwrap(worker.queue) {
253                 Ok(queue_cell) => queue_cell.into_inner(),
254                 Err(_) => panic!("failed to recover queue from worker"),
255             };
256 
257             Ok(queue)
258         } else {
259             Err(anyhow::Error::new(DeviceError::WorkerNotFound))
260         }
261     }
262 
reset(&mut self)263     fn reset(&mut self) {
264         for worker in self.workers.iter_mut().filter_map(Option::take) {
265             let _ = self.ex.run_until(worker.queue_task.cancel());
266         }
267     }
268 
get_shared_memory_region(&self) -> Option<SharedMemoryRegion>269     fn get_shared_memory_region(&self) -> Option<SharedMemoryRegion> {
270         Some(SharedMemoryRegion {
271             id: wl::WL_SHMEM_ID,
272             length: wl::WL_SHMEM_SIZE,
273         })
274     }
275 
set_backend_req_connection(&mut self, conn: Arc<VhostBackendReqConnection>)276     fn set_backend_req_connection(&mut self, conn: Arc<VhostBackendReqConnection>) {
277         if let VhostBackendReqConnectionState::Connected(_) = &self.backend_req_conn {
278             warn!("connection already established. Overwriting");
279         }
280 
281         self.backend_req_conn = VhostBackendReqConnectionState::Connected(conn);
282     }
283 
enter_suspended_state(&mut self) -> anyhow::Result<()>284     fn enter_suspended_state(&mut self) -> anyhow::Result<()> {
285         // No non-queue workers.
286         Ok(())
287     }
288 
snapshot(&mut self) -> anyhow::Result<serde_json::Value>289     fn snapshot(&mut self) -> anyhow::Result<serde_json::Value> {
290         bail!("snapshot not implemented for vhost-user wl");
291     }
292 
restore(&mut self, _data: serde_json::Value) -> anyhow::Result<()>293     fn restore(&mut self, _data: serde_json::Value) -> anyhow::Result<()> {
294         bail!("snapshot not implemented for vhost-user wl");
295     }
296 }
297 
parse_wayland_sock(value: &str) -> Result<(String, PathBuf), String>298 pub fn parse_wayland_sock(value: &str) -> Result<(String, PathBuf), String> {
299     let mut components = value.split(',');
300     let path = PathBuf::from(match components.next() {
301         None => return Err("missing socket path".to_string()),
302         Some(c) => c,
303     });
304     let mut name = "";
305     for c in components {
306         let mut kv = c.splitn(2, '=');
307         let (kind, value) = match (kv.next(), kv.next()) {
308             (Some(kind), Some(value)) => (kind, value),
309             _ => return Err(format!("option must be of the form `kind=value`: {}", c)),
310         };
311         match kind {
312             "name" => name = value,
313             _ => return Err(format!("unrecognized option: {}", kind)),
314         }
315     }
316 
317     Ok((name.to_string(), path))
318 }
319 
320 #[derive(FromArgs)]
321 #[argh(subcommand, name = "wl")]
322 /// Wayland device
323 pub struct Options {
324     #[argh(option, arg_name = "PATH", hidden_help)]
325     /// deprecated - please use --socket-path instead
326     socket: Option<String>,
327     #[argh(option, arg_name = "PATH")]
328     /// path to the vhost-user socket to bind to.
329     /// If this flag is set, --fd cannot be specified.
330     socket_path: Option<String>,
331     #[argh(option, arg_name = "FD")]
332     /// file descriptor of a connected vhost-user socket.
333     /// If this flag is set, --socket-path cannot be specified.
334     fd: Option<RawDescriptor>,
335 
336     #[argh(option, from_str_fn(parse_wayland_sock), arg_name = "PATH[,name=NAME]")]
337     /// path to one or more Wayland sockets. The unnamed socket is used for
338     /// displaying virtual screens while the named ones are used for IPC
339     wayland_sock: Vec<(String, PathBuf)>,
340     #[argh(option, arg_name = "PATH")]
341     /// path to the GPU resource bridge
342     resource_bridge: Option<String>,
343 }
344 
345 /// Starts a vhost-user wayland device.
346 /// Returns an error if the given `args` is invalid or the device fails to run.
run_wl_device(opts: Options) -> anyhow::Result<()>347 pub fn run_wl_device(opts: Options) -> anyhow::Result<()> {
348     let Options {
349         wayland_sock,
350         socket,
351         socket_path,
352         fd,
353         resource_bridge,
354     } = opts;
355 
356     let wayland_paths: BTreeMap<_, _> = wayland_sock.into_iter().collect();
357 
358     let resource_bridge = resource_bridge
359         .map(|p| -> anyhow::Result<Tube> {
360             let deadline = Instant::now() + Duration::from_secs(5);
361             loop {
362                 match UnixSeqpacket::connect(&p) {
363                     Ok(s) => return Ok(Tube::new_from_unix_seqpacket(s).unwrap()),
364                     Err(e) => {
365                         if Instant::now() < deadline {
366                             thread::sleep(Duration::from_millis(50));
367                         } else {
368                             return Err(anyhow::Error::new(e));
369                         }
370                     }
371                 }
372             }
373         })
374         .transpose()
375         .context("failed to connect to resource bridge socket")?;
376 
377     let ex = Executor::new().context("failed to create executor")?;
378 
379     let conn = BackendConnection::from_opts(socket.as_deref(), socket_path.as_deref(), fd)?;
380 
381     let backend = WlBackend::new(&ex, wayland_paths, resource_bridge);
382     // run_until() returns an Result<Result<..>> which the ? operator lets us flatten.
383     ex.run_until(conn.run_backend(backend, &ex))?
384 }
385