xref: /aosp_15_r20/external/crosvm/devices/src/virtio/vhost/user/device/net/sys/windows.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::sync::Arc;
6 
7 use anyhow::bail;
8 use anyhow::Context;
9 use argh::FromArgs;
10 use base::error;
11 use base::info;
12 use base::named_pipes::OverlappedWrapper;
13 use base::named_pipes::PipeConnection;
14 use base::warn;
15 use base::Event;
16 use base::RawDescriptor;
17 use broker_ipc::common_child_setup;
18 use broker_ipc::CommonChildStartupArgs;
19 use cros_async::EventAsync;
20 use cros_async::Executor;
21 use cros_async::IntoAsync;
22 use cros_async::IoSource;
23 use futures::channel::oneshot;
24 use futures::future::AbortHandle;
25 use futures::future::Abortable;
26 use futures::pin_mut;
27 use futures::select_biased;
28 use futures::FutureExt;
29 use hypervisor::ProtectionType;
30 #[cfg(feature = "slirp")]
31 use net_util::Slirp;
32 use net_util::TapT;
33 #[cfg(feature = "slirp")]
34 use serde::Deserialize;
35 #[cfg(feature = "slirp")]
36 use serde::Serialize;
37 use sync::Mutex;
38 use tube_transporter::TubeToken;
39 use virtio_sys::virtio_net;
40 use vm_memory::GuestMemory;
41 use vmm_vhost::VHOST_USER_F_PROTOCOL_FEATURES;
42 
43 use crate::virtio;
44 use crate::virtio::base_features;
45 use crate::virtio::net::process_rx;
46 use crate::virtio::net::NetError;
47 #[cfg(feature = "slirp")]
48 use crate::virtio::net::MAX_BUFFER_SIZE;
49 use crate::virtio::vhost::user::device::handler::sys::windows::read_from_tube_transporter;
50 use crate::virtio::vhost::user::device::handler::sys::windows::run_handler;
51 use crate::virtio::vhost::user::device::handler::DeviceRequestHandler;
52 use crate::virtio::vhost::user::device::handler::VhostUserDevice;
53 use crate::virtio::vhost::user::device::handler::WorkerState;
54 use crate::virtio::vhost::user::device::net::run_ctrl_queue;
55 use crate::virtio::vhost::user::device::net::run_tx_queue;
56 use crate::virtio::vhost::user::device::net::NetBackend;
57 use crate::virtio::vhost::user::device::net::NET_EXECUTOR;
58 use crate::virtio::Interrupt;
59 use crate::virtio::Queue;
60 
61 impl<T: 'static> NetBackend<T>
62 where
63     T: TapT + IntoAsync,
64 {
65     #[cfg(feature = "slirp")]
new_slirp( guest_pipe: PipeConnection, slirp_kill_event: Event, ) -> anyhow::Result<NetBackend<Slirp>>66     pub fn new_slirp(
67         guest_pipe: PipeConnection,
68         slirp_kill_event: Event,
69     ) -> anyhow::Result<NetBackend<Slirp>> {
70         let avail_features = base_features(ProtectionType::Unprotected)
71             | 1 << virtio_net::VIRTIO_NET_F_CTRL_VQ
72             | 1 << VHOST_USER_F_PROTOCOL_FEATURES;
73         let slirp = Slirp::new_for_multi_process(guest_pipe).map_err(NetError::SlirpCreateError)?;
74 
75         Ok(NetBackend::<Slirp> {
76             tap: slirp,
77             avail_features,
78             acked_features: 0,
79             mtu: 1500,
80             slirp_kill_event,
81             workers: Default::default(),
82         })
83     }
84 }
85 
run_rx_queue<T: TapT>( mut queue: Queue, mut tap: IoSource<T>, kick_evt: EventAsync, read_notifier: EventAsync, mut overlapped_wrapper: OverlappedWrapper, mut stop_rx: oneshot::Receiver<()>, ) -> Queue86 async fn run_rx_queue<T: TapT>(
87     mut queue: Queue,
88     mut tap: IoSource<T>,
89     kick_evt: EventAsync,
90     read_notifier: EventAsync,
91     mut overlapped_wrapper: OverlappedWrapper,
92     mut stop_rx: oneshot::Receiver<()>,
93 ) -> Queue {
94     let mut rx_buf = [0u8; MAX_BUFFER_SIZE];
95     let mut rx_count = 0;
96     let mut deferred_rx = false;
97 
98     // SAFETY: safe because rx_buf & overlapped_wrapper live until the
99     // overlapped operation completes and are not used in any other operations
100     // until that time.
101     unsafe {
102         tap.as_source_mut()
103             .read_overlapped(&mut rx_buf, &mut overlapped_wrapper)
104             .expect("read_overlapped failed")
105     };
106 
107     let read_notifier_future = read_notifier.next_val().fuse();
108     pin_mut!(read_notifier_future);
109     let kick_evt_future = kick_evt.next_val().fuse();
110     pin_mut!(kick_evt_future);
111 
112     loop {
113         // If we already have a packet from deferred RX, we don't need to wait for the slirp device.
114         if !deferred_rx {
115             select_biased! {
116                 read_notifier_res = read_notifier_future => {
117                     read_notifier_future.set(read_notifier.next_val().fuse());
118                     if let Err(e) = read_notifier_res {
119                         error!("Failed to wait for tap device to become readable: {}", e);
120                         break;
121                     }
122                 }
123                 _ = stop_rx => {
124                     break;
125                 }
126             }
127             if let Err(e) = read_notifier.next_val().await {
128                 error!("Failed to wait for tap device to become readable: {}", e);
129                 break;
130             }
131         }
132 
133         let needs_interrupt = process_rx(
134             &mut queue,
135             tap.as_source_mut(),
136             &mut rx_buf,
137             &mut deferred_rx,
138             &mut rx_count,
139             &mut overlapped_wrapper,
140         );
141         if needs_interrupt {
142             queue.trigger_interrupt();
143         }
144 
145         // There aren't any RX descriptors available for us to write packets to. Wait for the guest
146         // to consume some packets and make more descriptors available to us.
147         if deferred_rx {
148             select_biased! {
149                 kick = kick_evt_future => {
150                     kick_evt_future.set(kick_evt.next_val().fuse());
151                     if let Err(e) = kick {
152                         error!("Failed to read kick event for rx queue: {}", e);
153                         break;
154                     }
155                 }
156                 _ = stop_rx => {
157                     break;
158                 }
159             }
160         }
161     }
162 
163     queue
164 }
165 
166 /// Platform specific impl of VhostUserDevice::start_queue.
start_queue<T: 'static + IntoAsync + TapT>( backend: &mut NetBackend<T>, idx: usize, queue: virtio::Queue, _mem: GuestMemory, ) -> anyhow::Result<()>167 pub(in crate::virtio::vhost::user::device::net) fn start_queue<T: 'static + IntoAsync + TapT>(
168     backend: &mut NetBackend<T>,
169     idx: usize,
170     queue: virtio::Queue,
171     _mem: GuestMemory,
172 ) -> anyhow::Result<()> {
173     if backend.workers.get(idx).is_some() {
174         warn!("Starting new queue handler without stopping old handler");
175         backend.stop_queue(idx);
176     }
177 
178     let overlapped_wrapper =
179         OverlappedWrapper::new(true).expect("Failed to create overlapped wrapper");
180 
181     super::super::NET_EXECUTOR.with(|ex| {
182         // Safe because the executor is initialized in main() below.
183         let ex = ex.get().expect("Executor not initialized");
184 
185         let kick_evt = queue
186             .event()
187             .try_clone()
188             .context("failed to clone queue event")?;
189         let kick_evt =
190             EventAsync::new(kick_evt, ex).context("failed to create EventAsync for kick_evt")?;
191         let tap = backend
192             .tap
193             .try_clone()
194             .context("failed to clone tap device")?;
195         let worker_tuple = match idx {
196             0 => {
197                 let tap = ex
198                     .async_from(tap)
199                     .context("failed to create async tap device")?;
200                 let read_notifier = overlapped_wrapper
201                     .get_h_event_ref()
202                     .unwrap()
203                     .try_clone()
204                     .unwrap();
205                 let read_notifier = EventAsync::new_without_reset(read_notifier, ex)
206                     .context("failed to create async read notifier")?;
207 
208                 let (stop_tx, stop_rx) = futures::channel::oneshot::channel();
209                 (
210                     ex.spawn_local(run_rx_queue(
211                         queue,
212                         tap,
213                         kick_evt,
214                         read_notifier,
215                         overlapped_wrapper,
216                         stop_rx,
217                     )),
218                     stop_tx,
219                 )
220             }
221             1 => {
222                 let (stop_tx, stop_rx) = futures::channel::oneshot::channel();
223                 (
224                     ex.spawn_local(run_tx_queue(queue, tap, kick_evt, stop_rx)),
225                     stop_tx,
226                 )
227             }
228             2 => {
229                 let (stop_tx, stop_rx) = futures::channel::oneshot::channel();
230                 (
231                     ex.spawn_local(run_ctrl_queue(
232                         queue,
233                         tap,
234                         kick_evt,
235                         backend.acked_features,
236                         1, /* vq_pairs */
237                         stop_rx,
238                     )),
239                     stop_tx,
240                 )
241             }
242             _ => bail!("attempted to start unknown queue: {}", idx),
243         };
244 
245         backend.workers[idx] = Some(worker_tuple);
246         Ok(())
247     })
248 }
249 
250 #[cfg(feature = "slirp")]
251 impl<T> Drop for NetBackend<T>
252 where
253     T: TapT + IntoAsync,
254 {
drop(&mut self)255     fn drop(&mut self) {
256         let _ = self.slirp_kill_event.signal();
257     }
258 }
259 
260 /// Config arguments passed through the bootstrap Tube from the broker to the Net backend
261 /// process.
262 #[cfg(feature = "slirp")]
263 #[derive(Serialize, Deserialize, Debug)]
264 pub struct NetBackendConfig {
265     pub guest_pipe: PipeConnection,
266     pub slirp_kill_event: Event,
267 }
268 
269 #[derive(FromArgs, Debug)]
270 #[argh(subcommand, name = "net", description = "")]
271 pub struct Options {
272     #[argh(
273         option,
274         description = "pipe handle end for Tube Transporter",
275         arg_name = "HANDLE"
276     )]
277     bootstrap: usize,
278 }
279 
280 #[cfg(all(windows, not(feature = "slirp")))]
281 compile_error!("vhost-user net device requires slirp feature on Windows.");
282 
283 #[cfg(feature = "slirp")]
start_device(opts: Options) -> anyhow::Result<()>284 pub fn start_device(opts: Options) -> anyhow::Result<()> {
285     // Get the Tubes from the TubeTransporter. Then get the "Config" from the bootstrap_tube
286     // which will contain slirp settings.
287     let raw_transport_tube = opts.bootstrap as RawDescriptor;
288 
289     let mut tubes = read_from_tube_transporter(raw_transport_tube).unwrap();
290 
291     let vhost_user_tube = tubes.get_tube(TubeToken::VhostUser).unwrap();
292     let bootstrap_tube = tubes.get_tube(TubeToken::Bootstrap).unwrap();
293 
294     let startup_args: CommonChildStartupArgs =
295         bootstrap_tube.recv::<CommonChildStartupArgs>().unwrap();
296     let _child_cleanup = common_child_setup(startup_args).unwrap();
297 
298     let net_backend_config = bootstrap_tube.recv::<NetBackendConfig>().unwrap();
299 
300     let exit_event = bootstrap_tube.recv::<Event>()?;
301 
302     // We only have one net device for now.
303     let dev = NetBackend::<net_util::Slirp>::new_slirp(
304         net_backend_config.guest_pipe,
305         net_backend_config.slirp_kill_event,
306     )
307     .unwrap();
308 
309     let handler = DeviceRequestHandler::new(dev);
310 
311     let ex = Executor::new().context("failed to create executor")?;
312 
313     NET_EXECUTOR.with(|net_ex| {
314         let _ = net_ex.set(ex.clone());
315     });
316 
317     // TODO(b/213170185): Uncomment once sandbox is upstreamed.
318     // if sandbox::is_sandbox_target() {
319     //     sandbox::TargetServices::get()
320     //         .expect("failed to get target services")
321     //         .unwrap()
322     //         .lower_token();
323     // }
324 
325     info!("vhost-user net device ready, starting run loop...");
326     if let Err(e) = ex.run_until(run_handler(
327         Box::new(handler),
328         vhost_user_tube,
329         exit_event,
330         &ex,
331     )) {
332         bail!("error occurred: {}", e);
333     }
334 
335     Ok(())
336 }
337