xref: /aosp_15_r20/external/crosvm/src/sys/windows/control_server.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1*bb4ee6a4SAndroid Build Coastguard Worker // Copyright 2023 The ChromiumOS Authors
2*bb4ee6a4SAndroid Build Coastguard Worker // Use of this source code is governed by a BSD-style license that can be
3*bb4ee6a4SAndroid Build Coastguard Worker // found in the LICENSE file.
4*bb4ee6a4SAndroid Build Coastguard Worker 
5*bb4ee6a4SAndroid Build Coastguard Worker //! Implements the CrosVM control socket on Windows. Unlike on unix, this is a bit involved because
6*bb4ee6a4SAndroid Build Coastguard Worker //! we can't process the raw named pipe in line inside `run_control` (named pipes aren't directly
7*bb4ee6a4SAndroid Build Coastguard Worker //! waitable). In theory, AF_UNIX can be made waitable, but AF_UNIX is very slow, and we already
8*bb4ee6a4SAndroid Build Coastguard Worker //! have significant prior art for using named pipes in a waitable fashion (`base::StreamChannel`).
9*bb4ee6a4SAndroid Build Coastguard Worker 
10*bb4ee6a4SAndroid Build Coastguard Worker use std::io;
11*bb4ee6a4SAndroid Build Coastguard Worker use std::sync::mpsc;
12*bb4ee6a4SAndroid Build Coastguard Worker use std::sync::Arc;
13*bb4ee6a4SAndroid Build Coastguard Worker 
14*bb4ee6a4SAndroid Build Coastguard Worker use base::named_pipes;
15*bb4ee6a4SAndroid Build Coastguard Worker use base::named_pipes::OverlappedWrapper;
16*bb4ee6a4SAndroid Build Coastguard Worker use base::named_pipes::PipeConnection;
17*bb4ee6a4SAndroid Build Coastguard Worker use base::BlockingMode;
18*bb4ee6a4SAndroid Build Coastguard Worker use base::Event;
19*bb4ee6a4SAndroid Build Coastguard Worker use base::EventExt;
20*bb4ee6a4SAndroid Build Coastguard Worker use base::EventToken;
21*bb4ee6a4SAndroid Build Coastguard Worker use base::FlushOnDropTube;
22*bb4ee6a4SAndroid Build Coastguard Worker use base::FramingMode;
23*bb4ee6a4SAndroid Build Coastguard Worker use base::ReadNotifier;
24*bb4ee6a4SAndroid Build Coastguard Worker use base::RecvTube;
25*bb4ee6a4SAndroid Build Coastguard Worker use base::SendTube;
26*bb4ee6a4SAndroid Build Coastguard Worker use base::StreamChannel;
27*bb4ee6a4SAndroid Build Coastguard Worker use base::Tube;
28*bb4ee6a4SAndroid Build Coastguard Worker use base::TubeError;
29*bb4ee6a4SAndroid Build Coastguard Worker use base::WaitContext;
30*bb4ee6a4SAndroid Build Coastguard Worker use base::WorkerThread;
31*bb4ee6a4SAndroid Build Coastguard Worker use libc::EIO;
32*bb4ee6a4SAndroid Build Coastguard Worker use log::error;
33*bb4ee6a4SAndroid Build Coastguard Worker use log::info;
34*bb4ee6a4SAndroid Build Coastguard Worker use log::warn;
35*bb4ee6a4SAndroid Build Coastguard Worker use sync::Mutex;
36*bb4ee6a4SAndroid Build Coastguard Worker use vm_control::VmRequest;
37*bb4ee6a4SAndroid Build Coastguard Worker use vm_control::VmResponse;
38*bb4ee6a4SAndroid Build Coastguard Worker use winapi::shared::winerror::ERROR_MORE_DATA;
39*bb4ee6a4SAndroid Build Coastguard Worker 
40*bb4ee6a4SAndroid Build Coastguard Worker /// Windows named pipes don't fit in well with the control loop (`run_control`) the way sockets do
41*bb4ee6a4SAndroid Build Coastguard Worker /// on unix, so this struct provides a compatibility layer (named pipe server) that functions very
42*bb4ee6a4SAndroid Build Coastguard Worker /// similarly to how a socket server would on unix.
43*bb4ee6a4SAndroid Build Coastguard Worker ///
44*bb4ee6a4SAndroid Build Coastguard Worker /// Terminology:
45*bb4ee6a4SAndroid Build Coastguard Worker ///     * The `ControlServer` is a socket server compatibility layer.
46*bb4ee6a4SAndroid Build Coastguard Worker ///     * The "control loop" is the VMM's main loop (`run_control`). It uses the `ControlServer` to
47*bb4ee6a4SAndroid Build Coastguard Worker ///       accept & service connections from clients that want to control the VMM (e.g. press the
48*bb4ee6a4SAndroid Build Coastguard Worker ///       power button, etc).
49*bb4ee6a4SAndroid Build Coastguard Worker pub struct ControlServer {
50*bb4ee6a4SAndroid Build Coastguard Worker     server_listener_worker: WorkerThread<(io::Result<()>, ClientWorker)>,
51*bb4ee6a4SAndroid Build Coastguard Worker     /// Signaled when a client has connected and can be accepted without blocking.
52*bb4ee6a4SAndroid Build Coastguard Worker     client_waiting: Event,
53*bb4ee6a4SAndroid Build Coastguard Worker     /// Provides the accepted Tube every time a client connects.
54*bb4ee6a4SAndroid Build Coastguard Worker     client_tube_channel: mpsc::Receiver<FlushOnDropTube>,
55*bb4ee6a4SAndroid Build Coastguard Worker }
56*bb4ee6a4SAndroid Build Coastguard Worker 
57*bb4ee6a4SAndroid Build Coastguard Worker #[derive(EventToken)]
58*bb4ee6a4SAndroid Build Coastguard Worker enum Token {
59*bb4ee6a4SAndroid Build Coastguard Worker     Exit,
60*bb4ee6a4SAndroid Build Coastguard Worker     Readable,
61*bb4ee6a4SAndroid Build Coastguard Worker }
62*bb4ee6a4SAndroid Build Coastguard Worker 
63*bb4ee6a4SAndroid Build Coastguard Worker impl ControlServer {
64*bb4ee6a4SAndroid Build Coastguard Worker     /// Creates a named pipe server on `pipe_name` that forwards Tube messages between the connected
65*bb4ee6a4SAndroid Build Coastguard Worker     /// client on that pipe, and the Tube returned by `ControlServer::accept`.
new(pipe_name: &str) -> anyhow::Result<Self>66*bb4ee6a4SAndroid Build Coastguard Worker     pub fn new(pipe_name: &str) -> anyhow::Result<Self> {
67*bb4ee6a4SAndroid Build Coastguard Worker         let client_pipe_read = named_pipes::create_server_pipe(
68*bb4ee6a4SAndroid Build Coastguard Worker             pipe_name,
69*bb4ee6a4SAndroid Build Coastguard Worker             &named_pipes::FramingMode::Message,
70*bb4ee6a4SAndroid Build Coastguard Worker             &named_pipes::BlockingMode::Wait,
71*bb4ee6a4SAndroid Build Coastguard Worker             /* timeout= */ 0,
72*bb4ee6a4SAndroid Build Coastguard Worker             /* buffer_size= */ 1024 * 1024,
73*bb4ee6a4SAndroid Build Coastguard Worker             /* overlapped= */ true,
74*bb4ee6a4SAndroid Build Coastguard Worker         )?;
75*bb4ee6a4SAndroid Build Coastguard Worker         let client_pipe_write = client_pipe_read.try_clone()?;
76*bb4ee6a4SAndroid Build Coastguard Worker         let mut client_worker = ClientWorker::new(client_pipe_write);
77*bb4ee6a4SAndroid Build Coastguard Worker         let client_waiting = Event::new_auto_reset()?;
78*bb4ee6a4SAndroid Build Coastguard Worker         let client_waiting_for_worker = client_waiting.try_clone()?;
79*bb4ee6a4SAndroid Build Coastguard Worker         let (client_tube_channel_send, client_tube_channel_recv) = mpsc::channel();
80*bb4ee6a4SAndroid Build Coastguard Worker 
81*bb4ee6a4SAndroid Build Coastguard Worker         Ok(Self {
82*bb4ee6a4SAndroid Build Coastguard Worker             server_listener_worker: WorkerThread::start("ctrl_srv_listen_loop", move |exit_evt| {
83*bb4ee6a4SAndroid Build Coastguard Worker                 let res = Self::server_listener_loop(
84*bb4ee6a4SAndroid Build Coastguard Worker                     exit_evt,
85*bb4ee6a4SAndroid Build Coastguard Worker                     &mut client_worker,
86*bb4ee6a4SAndroid Build Coastguard Worker                     client_waiting_for_worker,
87*bb4ee6a4SAndroid Build Coastguard Worker                     client_tube_channel_send,
88*bb4ee6a4SAndroid Build Coastguard Worker                     client_pipe_read,
89*bb4ee6a4SAndroid Build Coastguard Worker                 );
90*bb4ee6a4SAndroid Build Coastguard Worker                 if let Err(e) = res.as_ref() {
91*bb4ee6a4SAndroid Build Coastguard Worker                     error!("server_listener_worker failed with error: {:?}", e)
92*bb4ee6a4SAndroid Build Coastguard Worker                 }
93*bb4ee6a4SAndroid Build Coastguard Worker                 (res, client_worker)
94*bb4ee6a4SAndroid Build Coastguard Worker             }),
95*bb4ee6a4SAndroid Build Coastguard Worker             client_waiting,
96*bb4ee6a4SAndroid Build Coastguard Worker             client_tube_channel: client_tube_channel_recv,
97*bb4ee6a4SAndroid Build Coastguard Worker         })
98*bb4ee6a4SAndroid Build Coastguard Worker     }
99*bb4ee6a4SAndroid Build Coastguard Worker 
100*bb4ee6a4SAndroid Build Coastguard Worker     /// Gets the client waiting event. If a client is waiting, [ControlServer::accept] can be called
101*bb4ee6a4SAndroid Build Coastguard Worker     /// and will return a [base::Tube] without blocking.
client_waiting(&self) -> &Event102*bb4ee6a4SAndroid Build Coastguard Worker     pub fn client_waiting(&self) -> &Event {
103*bb4ee6a4SAndroid Build Coastguard Worker         &self.client_waiting
104*bb4ee6a4SAndroid Build Coastguard Worker     }
105*bb4ee6a4SAndroid Build Coastguard Worker 
106*bb4ee6a4SAndroid Build Coastguard Worker     /// Accepts a connection (if one is waiting), returning a [base::Tube] connected to the client.
107*bb4ee6a4SAndroid Build Coastguard Worker     /// If [ControlServer::client_waiting] has not been signaled, this will block until a client
108*bb4ee6a4SAndroid Build Coastguard Worker     /// connects.
accept(&mut self) -> FlushOnDropTube109*bb4ee6a4SAndroid Build Coastguard Worker     pub fn accept(&mut self) -> FlushOnDropTube {
110*bb4ee6a4SAndroid Build Coastguard Worker         self.client_tube_channel
111*bb4ee6a4SAndroid Build Coastguard Worker             .recv()
112*bb4ee6a4SAndroid Build Coastguard Worker             .expect("client worker has done away")
113*bb4ee6a4SAndroid Build Coastguard Worker     }
114*bb4ee6a4SAndroid Build Coastguard Worker 
115*bb4ee6a4SAndroid Build Coastguard Worker     /// Shuts down the control server, disconnecting any connected clients.
shutdown(self) -> base::Result<()>116*bb4ee6a4SAndroid Build Coastguard Worker     pub fn shutdown(self) -> base::Result<()> {
117*bb4ee6a4SAndroid Build Coastguard Worker         let (listen_res, client_worker) = self.server_listener_worker.stop();
118*bb4ee6a4SAndroid Build Coastguard Worker         match listen_res {
119*bb4ee6a4SAndroid Build Coastguard Worker             Err(e) if e.kind() == io::ErrorKind::Interrupted => (),
120*bb4ee6a4SAndroid Build Coastguard Worker             Err(e) => return Err(base::Error::from(e)),
121*bb4ee6a4SAndroid Build Coastguard Worker             Ok(()) => (),
122*bb4ee6a4SAndroid Build Coastguard Worker         };
123*bb4ee6a4SAndroid Build Coastguard Worker         client_worker.shutdown()
124*bb4ee6a4SAndroid Build Coastguard Worker     }
125*bb4ee6a4SAndroid Build Coastguard Worker 
126*bb4ee6a4SAndroid Build Coastguard Worker     /// Listen loop for the control server. Handles waiting for new connections, creates the
127*bb4ee6a4SAndroid Build Coastguard Worker     /// forwarding thread for control loop -> client data, and forwards client -> control loop
128*bb4ee6a4SAndroid Build Coastguard Worker     /// data.
server_listener_loop( exit_evt: Event, client_worker: &mut ClientWorker, client_waiting: Event, client_tube_send_channel: mpsc::Sender<FlushOnDropTube>, mut client_pipe_read: PipeConnection, ) -> io::Result<()>129*bb4ee6a4SAndroid Build Coastguard Worker     fn server_listener_loop(
130*bb4ee6a4SAndroid Build Coastguard Worker         exit_evt: Event,
131*bb4ee6a4SAndroid Build Coastguard Worker         client_worker: &mut ClientWorker,
132*bb4ee6a4SAndroid Build Coastguard Worker         client_waiting: Event,
133*bb4ee6a4SAndroid Build Coastguard Worker         client_tube_send_channel: mpsc::Sender<FlushOnDropTube>,
134*bb4ee6a4SAndroid Build Coastguard Worker         mut client_pipe_read: PipeConnection,
135*bb4ee6a4SAndroid Build Coastguard Worker     ) -> io::Result<()> {
136*bb4ee6a4SAndroid Build Coastguard Worker         loop {
137*bb4ee6a4SAndroid Build Coastguard Worker             info!("control server: started, waiting for clients.");
138*bb4ee6a4SAndroid Build Coastguard Worker             client_pipe_read.wait_for_client_connection_overlapped_blocking(&exit_evt)?;
139*bb4ee6a4SAndroid Build Coastguard Worker 
140*bb4ee6a4SAndroid Build Coastguard Worker             let mut read_overlapped = OverlappedWrapper::new(true)?;
141*bb4ee6a4SAndroid Build Coastguard Worker             let control_send = client_worker.connect_client(&client_tube_send_channel)?;
142*bb4ee6a4SAndroid Build Coastguard Worker             client_waiting.signal()?;
143*bb4ee6a4SAndroid Build Coastguard Worker             info!("control server: accepted client");
144*bb4ee6a4SAndroid Build Coastguard Worker 
145*bb4ee6a4SAndroid Build Coastguard Worker             loop {
146*bb4ee6a4SAndroid Build Coastguard Worker                 let recv_result = base::deserialize_and_recv::<VmRequest, _>(|buf| {
147*bb4ee6a4SAndroid Build Coastguard Worker                     client_pipe_read.read_overlapped_blocking(
148*bb4ee6a4SAndroid Build Coastguard Worker                         buf,
149*bb4ee6a4SAndroid Build Coastguard Worker                         &mut read_overlapped,
150*bb4ee6a4SAndroid Build Coastguard Worker                         &exit_evt,
151*bb4ee6a4SAndroid Build Coastguard Worker                     )?;
152*bb4ee6a4SAndroid Build Coastguard Worker                     Ok(buf.len())
153*bb4ee6a4SAndroid Build Coastguard Worker                 });
154*bb4ee6a4SAndroid Build Coastguard Worker 
155*bb4ee6a4SAndroid Build Coastguard Worker                 match recv_result {
156*bb4ee6a4SAndroid Build Coastguard Worker                     Ok(msg) => {
157*bb4ee6a4SAndroid Build Coastguard Worker                         control_send.send(&msg).map_err(|e| {
158*bb4ee6a4SAndroid Build Coastguard Worker                             error!("unexpected error in control server recv loop: {}", e);
159*bb4ee6a4SAndroid Build Coastguard Worker                             io::Error::new(io::ErrorKind::Other, e)
160*bb4ee6a4SAndroid Build Coastguard Worker                         })?;
161*bb4ee6a4SAndroid Build Coastguard Worker                     }
162*bb4ee6a4SAndroid Build Coastguard Worker                     Err(TubeError::Disconnected) => break,
163*bb4ee6a4SAndroid Build Coastguard Worker                     Err(e) => {
164*bb4ee6a4SAndroid Build Coastguard Worker                         error!("unexpected error in control server recv loop: {}", e);
165*bb4ee6a4SAndroid Build Coastguard Worker                         return Err(io::Error::new(io::ErrorKind::Other, e));
166*bb4ee6a4SAndroid Build Coastguard Worker                     }
167*bb4ee6a4SAndroid Build Coastguard Worker                 };
168*bb4ee6a4SAndroid Build Coastguard Worker             }
169*bb4ee6a4SAndroid Build Coastguard Worker             // Current client has disconnected. Now we can reuse the server pipe for a new client.
170*bb4ee6a4SAndroid Build Coastguard Worker             match client_pipe_read.disconnect_clients() {
171*bb4ee6a4SAndroid Build Coastguard Worker                 Ok(()) => (),
172*bb4ee6a4SAndroid Build Coastguard Worker                 // If the pipe is already broken/closed, we'll get an error about trying to close
173*bb4ee6a4SAndroid Build Coastguard Worker                 // a pipe that has already been closed. Discard that error.
174*bb4ee6a4SAndroid Build Coastguard Worker                 Err(e) if e.kind() == io::ErrorKind::BrokenPipe => (),
175*bb4ee6a4SAndroid Build Coastguard Worker                 Err(e) => return Err(e),
176*bb4ee6a4SAndroid Build Coastguard Worker             }
177*bb4ee6a4SAndroid Build Coastguard Worker             client_worker.stop_control_to_client_worker()?;
178*bb4ee6a4SAndroid Build Coastguard Worker             info!("control server: disconnected client");
179*bb4ee6a4SAndroid Build Coastguard Worker         }
180*bb4ee6a4SAndroid Build Coastguard Worker         unreachable!("loop exits by returning an error");
181*bb4ee6a4SAndroid Build Coastguard Worker     }
182*bb4ee6a4SAndroid Build Coastguard Worker }
183*bb4ee6a4SAndroid Build Coastguard Worker 
184*bb4ee6a4SAndroid Build Coastguard Worker /// Handles connecting clients & forwarding data from client -> control server.
185*bb4ee6a4SAndroid Build Coastguard Worker struct ClientWorker {
186*bb4ee6a4SAndroid Build Coastguard Worker     control_to_client_worker: Option<WorkerThread<(base::Result<()>, PipeConnection)>>,
187*bb4ee6a4SAndroid Build Coastguard Worker     client_pipe_write: Option<PipeConnection>,
188*bb4ee6a4SAndroid Build Coastguard Worker }
189*bb4ee6a4SAndroid Build Coastguard Worker 
190*bb4ee6a4SAndroid Build Coastguard Worker impl ClientWorker {
new(client_pipe_write: PipeConnection) -> Self191*bb4ee6a4SAndroid Build Coastguard Worker     fn new(client_pipe_write: PipeConnection) -> Self {
192*bb4ee6a4SAndroid Build Coastguard Worker         Self {
193*bb4ee6a4SAndroid Build Coastguard Worker             control_to_client_worker: None,
194*bb4ee6a4SAndroid Build Coastguard Worker             client_pipe_write: Some(client_pipe_write),
195*bb4ee6a4SAndroid Build Coastguard Worker         }
196*bb4ee6a4SAndroid Build Coastguard Worker     }
197*bb4ee6a4SAndroid Build Coastguard Worker 
connect_client( &mut self, client_tube_send_channel: &mpsc::Sender<FlushOnDropTube>, ) -> base::Result<SendTube>198*bb4ee6a4SAndroid Build Coastguard Worker     fn connect_client(
199*bb4ee6a4SAndroid Build Coastguard Worker         &mut self,
200*bb4ee6a4SAndroid Build Coastguard Worker         client_tube_send_channel: &mpsc::Sender<FlushOnDropTube>,
201*bb4ee6a4SAndroid Build Coastguard Worker     ) -> base::Result<SendTube> {
202*bb4ee6a4SAndroid Build Coastguard Worker         // It is critical that the server end of the pipe is returned as the Tube in
203*bb4ee6a4SAndroid Build Coastguard Worker         // ControlServer::accept (tube_for_control_loop here). This way, we can ensure data is
204*bb4ee6a4SAndroid Build Coastguard Worker         // flushed before the pipe is dropped. In short, the order of Tubes returned by the pair
205*bb4ee6a4SAndroid Build Coastguard Worker         // matters.
206*bb4ee6a4SAndroid Build Coastguard Worker         let (tube_for_control_loop, tube_to_control_loop) = Tube::pair().map_err(|e| match e {
207*bb4ee6a4SAndroid Build Coastguard Worker             TubeError::Pair(io_err) => base::Error::from(io_err),
208*bb4ee6a4SAndroid Build Coastguard Worker             _ => base::Error::new(EIO),
209*bb4ee6a4SAndroid Build Coastguard Worker         })?;
210*bb4ee6a4SAndroid Build Coastguard Worker 
211*bb4ee6a4SAndroid Build Coastguard Worker         let (control_send, control_recv) =
212*bb4ee6a4SAndroid Build Coastguard Worker             Tube::split_to_send_recv(tube_to_control_loop).map_err(|e| match e {
213*bb4ee6a4SAndroid Build Coastguard Worker                 TubeError::Clone(io_err) => base::Error::from(io_err),
214*bb4ee6a4SAndroid Build Coastguard Worker                 _ => base::Error::new(EIO),
215*bb4ee6a4SAndroid Build Coastguard Worker             })?;
216*bb4ee6a4SAndroid Build Coastguard Worker 
217*bb4ee6a4SAndroid Build Coastguard Worker         let client_pipe_write = self.client_pipe_write.take().expect("loop already running");
218*bb4ee6a4SAndroid Build Coastguard Worker         self.control_to_client_worker = Some(WorkerThread::start(
219*bb4ee6a4SAndroid Build Coastguard Worker             "ctrl_srv_client_to_ctrl",
220*bb4ee6a4SAndroid Build Coastguard Worker             move |exit_evt| {
221*bb4ee6a4SAndroid Build Coastguard Worker                 let res =
222*bb4ee6a4SAndroid Build Coastguard Worker                     Self::control_to_client_worker(exit_evt, &client_pipe_write, control_recv);
223*bb4ee6a4SAndroid Build Coastguard Worker                 if let Err(e) = res.as_ref() {
224*bb4ee6a4SAndroid Build Coastguard Worker                     error!("control_to_client_worker exited with error: {:?}", res);
225*bb4ee6a4SAndroid Build Coastguard Worker                 }
226*bb4ee6a4SAndroid Build Coastguard Worker                 (res, client_pipe_write)
227*bb4ee6a4SAndroid Build Coastguard Worker             },
228*bb4ee6a4SAndroid Build Coastguard Worker         ));
229*bb4ee6a4SAndroid Build Coastguard Worker         client_tube_send_channel
230*bb4ee6a4SAndroid Build Coastguard Worker             .send(FlushOnDropTube::from(tube_for_control_loop))
231*bb4ee6a4SAndroid Build Coastguard Worker             .expect("control server has gone away");
232*bb4ee6a4SAndroid Build Coastguard Worker         Ok(control_send)
233*bb4ee6a4SAndroid Build Coastguard Worker     }
234*bb4ee6a4SAndroid Build Coastguard Worker 
stop_control_to_client_worker(&mut self) -> base::Result<()>235*bb4ee6a4SAndroid Build Coastguard Worker     fn stop_control_to_client_worker(&mut self) -> base::Result<()> {
236*bb4ee6a4SAndroid Build Coastguard Worker         let (res, pipe) = self
237*bb4ee6a4SAndroid Build Coastguard Worker             .control_to_client_worker
238*bb4ee6a4SAndroid Build Coastguard Worker             .take()
239*bb4ee6a4SAndroid Build Coastguard Worker             .expect("loop must be running")
240*bb4ee6a4SAndroid Build Coastguard Worker             .stop();
241*bb4ee6a4SAndroid Build Coastguard Worker         self.client_pipe_write = Some(pipe);
242*bb4ee6a4SAndroid Build Coastguard Worker         res
243*bb4ee6a4SAndroid Build Coastguard Worker     }
244*bb4ee6a4SAndroid Build Coastguard Worker 
shutdown(self) -> base::Result<()>245*bb4ee6a4SAndroid Build Coastguard Worker     fn shutdown(self) -> base::Result<()> {
246*bb4ee6a4SAndroid Build Coastguard Worker         if let Some(worker) = self.control_to_client_worker {
247*bb4ee6a4SAndroid Build Coastguard Worker             worker.stop().0
248*bb4ee6a4SAndroid Build Coastguard Worker         } else {
249*bb4ee6a4SAndroid Build Coastguard Worker             Ok(())
250*bb4ee6a4SAndroid Build Coastguard Worker         }
251*bb4ee6a4SAndroid Build Coastguard Worker     }
252*bb4ee6a4SAndroid Build Coastguard Worker 
253*bb4ee6a4SAndroid Build Coastguard Worker     /// Worker that forwards data from the control loop -> client pipe.
control_to_client_worker( exit_evt: Event, client_pipe_write: &PipeConnection, control_recv: RecvTube, ) -> base::Result<()>254*bb4ee6a4SAndroid Build Coastguard Worker     fn control_to_client_worker(
255*bb4ee6a4SAndroid Build Coastguard Worker         exit_evt: Event,
256*bb4ee6a4SAndroid Build Coastguard Worker         client_pipe_write: &PipeConnection,
257*bb4ee6a4SAndroid Build Coastguard Worker         control_recv: RecvTube,
258*bb4ee6a4SAndroid Build Coastguard Worker     ) -> base::Result<()> {
259*bb4ee6a4SAndroid Build Coastguard Worker         let wait_ctx = WaitContext::new()?;
260*bb4ee6a4SAndroid Build Coastguard Worker         wait_ctx.add(&exit_evt, Token::Exit)?;
261*bb4ee6a4SAndroid Build Coastguard Worker         wait_ctx.add(control_recv.get_read_notifier(), Token::Readable)?;
262*bb4ee6a4SAndroid Build Coastguard Worker 
263*bb4ee6a4SAndroid Build Coastguard Worker         'poll: loop {
264*bb4ee6a4SAndroid Build Coastguard Worker             let events = wait_ctx.wait()?;
265*bb4ee6a4SAndroid Build Coastguard Worker             for event in events {
266*bb4ee6a4SAndroid Build Coastguard Worker                 match event.token {
267*bb4ee6a4SAndroid Build Coastguard Worker                     Token::Exit => {
268*bb4ee6a4SAndroid Build Coastguard Worker                         break 'poll;
269*bb4ee6a4SAndroid Build Coastguard Worker                     }
270*bb4ee6a4SAndroid Build Coastguard Worker                     Token::Readable => {
271*bb4ee6a4SAndroid Build Coastguard Worker                         let msg = match control_recv.recv::<VmResponse>() {
272*bb4ee6a4SAndroid Build Coastguard Worker                             Ok(msg) => Ok(msg),
273*bb4ee6a4SAndroid Build Coastguard Worker                             Err(TubeError::Disconnected) => {
274*bb4ee6a4SAndroid Build Coastguard Worker                                 return Ok(());
275*bb4ee6a4SAndroid Build Coastguard Worker                             }
276*bb4ee6a4SAndroid Build Coastguard Worker                             Err(TubeError::Recv(e)) => Err(base::Error::from(e)),
277*bb4ee6a4SAndroid Build Coastguard Worker                             Err(tube_error) => {
278*bb4ee6a4SAndroid Build Coastguard Worker                                 error!(
279*bb4ee6a4SAndroid Build Coastguard Worker                                     "unexpected error in control server recv loop: {}",
280*bb4ee6a4SAndroid Build Coastguard Worker                                     tube_error
281*bb4ee6a4SAndroid Build Coastguard Worker                                 );
282*bb4ee6a4SAndroid Build Coastguard Worker                                 Err(base::Error::new(EIO))
283*bb4ee6a4SAndroid Build Coastguard Worker                             }
284*bb4ee6a4SAndroid Build Coastguard Worker                         }?;
285*bb4ee6a4SAndroid Build Coastguard Worker                         base::serialize_and_send(|buf| client_pipe_write.write(buf), &msg, None)
286*bb4ee6a4SAndroid Build Coastguard Worker                             .map_err(|e| match e {
287*bb4ee6a4SAndroid Build Coastguard Worker                                 TubeError::Send(e) => base::Error::from(e),
288*bb4ee6a4SAndroid Build Coastguard Worker                                 tube_error => {
289*bb4ee6a4SAndroid Build Coastguard Worker                                     error!(
290*bb4ee6a4SAndroid Build Coastguard Worker                                         "unexpected error in control server recv loop: {}",
291*bb4ee6a4SAndroid Build Coastguard Worker                                         tube_error
292*bb4ee6a4SAndroid Build Coastguard Worker                                     );
293*bb4ee6a4SAndroid Build Coastguard Worker                                     base::Error::new(EIO)
294*bb4ee6a4SAndroid Build Coastguard Worker                                 }
295*bb4ee6a4SAndroid Build Coastguard Worker                             })?;
296*bb4ee6a4SAndroid Build Coastguard Worker                     }
297*bb4ee6a4SAndroid Build Coastguard Worker                 }
298*bb4ee6a4SAndroid Build Coastguard Worker             }
299*bb4ee6a4SAndroid Build Coastguard Worker         }
300*bb4ee6a4SAndroid Build Coastguard Worker         Ok(())
301*bb4ee6a4SAndroid Build Coastguard Worker     }
302*bb4ee6a4SAndroid Build Coastguard Worker }
303*bb4ee6a4SAndroid Build Coastguard Worker 
304*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(test)]
305*bb4ee6a4SAndroid Build Coastguard Worker mod tests {
306*bb4ee6a4SAndroid Build Coastguard Worker     use std::thread;
307*bb4ee6a4SAndroid Build Coastguard Worker     use std::time::Duration;
308*bb4ee6a4SAndroid Build Coastguard Worker 
309*bb4ee6a4SAndroid Build Coastguard Worker     use base::PipeTube;
310*bb4ee6a4SAndroid Build Coastguard Worker     use rand::Rng;
311*bb4ee6a4SAndroid Build Coastguard Worker 
312*bb4ee6a4SAndroid Build Coastguard Worker     use super::*;
313*bb4ee6a4SAndroid Build Coastguard Worker 
generate_pipe_name() -> String314*bb4ee6a4SAndroid Build Coastguard Worker     fn generate_pipe_name() -> String {
315*bb4ee6a4SAndroid Build Coastguard Worker         format!(
316*bb4ee6a4SAndroid Build Coastguard Worker             r"\\.\pipe\test-ipc-pipe-name.rand{}",
317*bb4ee6a4SAndroid Build Coastguard Worker             rand::thread_rng().gen::<u64>(),
318*bb4ee6a4SAndroid Build Coastguard Worker         )
319*bb4ee6a4SAndroid Build Coastguard Worker     }
320*bb4ee6a4SAndroid Build Coastguard Worker 
321*bb4ee6a4SAndroid Build Coastguard Worker     #[track_caller]
create_client(pipe_name: &str) -> PipeTube322*bb4ee6a4SAndroid Build Coastguard Worker     fn create_client(pipe_name: &str) -> PipeTube {
323*bb4ee6a4SAndroid Build Coastguard Worker         let mut last_error: Option<io::Error> = None;
324*bb4ee6a4SAndroid Build Coastguard Worker         for _ in 0..5 {
325*bb4ee6a4SAndroid Build Coastguard Worker             match named_pipes::create_client_pipe(
326*bb4ee6a4SAndroid Build Coastguard Worker                 pipe_name,
327*bb4ee6a4SAndroid Build Coastguard Worker                 &named_pipes::FramingMode::Message,
328*bb4ee6a4SAndroid Build Coastguard Worker                 &named_pipes::BlockingMode::Wait,
329*bb4ee6a4SAndroid Build Coastguard Worker                 /* overlapped= */ false,
330*bb4ee6a4SAndroid Build Coastguard Worker             ) {
331*bb4ee6a4SAndroid Build Coastguard Worker                 Ok(pipe) => return PipeTube::from(pipe, None),
332*bb4ee6a4SAndroid Build Coastguard Worker                 Err(e) => {
333*bb4ee6a4SAndroid Build Coastguard Worker                     last_error = Some(e);
334*bb4ee6a4SAndroid Build Coastguard Worker                     println!("failed client connection");
335*bb4ee6a4SAndroid Build Coastguard Worker                     thread::sleep(Duration::from_millis(100))
336*bb4ee6a4SAndroid Build Coastguard Worker                 }
337*bb4ee6a4SAndroid Build Coastguard Worker             }
338*bb4ee6a4SAndroid Build Coastguard Worker         }
339*bb4ee6a4SAndroid Build Coastguard Worker         panic!(
340*bb4ee6a4SAndroid Build Coastguard Worker             "failed to connect to control server: {:?}",
341*bb4ee6a4SAndroid Build Coastguard Worker             last_error.unwrap()
342*bb4ee6a4SAndroid Build Coastguard Worker         )
343*bb4ee6a4SAndroid Build Coastguard Worker     }
344*bb4ee6a4SAndroid Build Coastguard Worker 
345*bb4ee6a4SAndroid Build Coastguard Worker     #[test]
test_smoke()346*bb4ee6a4SAndroid Build Coastguard Worker     fn test_smoke() {
347*bb4ee6a4SAndroid Build Coastguard Worker         // There are several threads, so run many iterations to exercise any possible race
348*bb4ee6a4SAndroid Build Coastguard Worker         // conditions.
349*bb4ee6a4SAndroid Build Coastguard Worker         for i in 0..100 {
350*bb4ee6a4SAndroid Build Coastguard Worker             println!("starting iteration {}", i);
351*bb4ee6a4SAndroid Build Coastguard Worker             let pipe_name = generate_pipe_name();
352*bb4ee6a4SAndroid Build Coastguard Worker 
353*bb4ee6a4SAndroid Build Coastguard Worker             let mut control_server = ControlServer::new(&pipe_name).unwrap();
354*bb4ee6a4SAndroid Build Coastguard Worker             let fake_control_loop = base::thread::spawn_with_timeout(move || {
355*bb4ee6a4SAndroid Build Coastguard Worker                 // First client.
356*bb4ee6a4SAndroid Build Coastguard Worker                 {
357*bb4ee6a4SAndroid Build Coastguard Worker                     println!("server: starting client 1");
358*bb4ee6a4SAndroid Build Coastguard Worker                     control_server.client_waiting().wait().unwrap();
359*bb4ee6a4SAndroid Build Coastguard Worker                     let client1 = control_server.accept();
360*bb4ee6a4SAndroid Build Coastguard Worker                     let req: VmRequest = client1.0.recv().unwrap();
361*bb4ee6a4SAndroid Build Coastguard Worker                     assert!(matches!(req, VmRequest::Powerbtn));
362*bb4ee6a4SAndroid Build Coastguard Worker                     client1.0.send(&VmResponse::Ok).unwrap();
363*bb4ee6a4SAndroid Build Coastguard Worker                 }
364*bb4ee6a4SAndroid Build Coastguard Worker                 println!("server: finished client 1");
365*bb4ee6a4SAndroid Build Coastguard Worker 
366*bb4ee6a4SAndroid Build Coastguard Worker                 // Second client.
367*bb4ee6a4SAndroid Build Coastguard Worker                 {
368*bb4ee6a4SAndroid Build Coastguard Worker                     println!("server: starting client 2");
369*bb4ee6a4SAndroid Build Coastguard Worker                     control_server.client_waiting().wait().unwrap();
370*bb4ee6a4SAndroid Build Coastguard Worker                     let client2 = control_server.accept();
371*bb4ee6a4SAndroid Build Coastguard Worker                     let req: VmRequest = client2.0.recv().unwrap();
372*bb4ee6a4SAndroid Build Coastguard Worker                     assert!(matches!(req, VmRequest::Exit));
373*bb4ee6a4SAndroid Build Coastguard Worker                     client2
374*bb4ee6a4SAndroid Build Coastguard Worker                         .0
375*bb4ee6a4SAndroid Build Coastguard Worker                         .send(&VmResponse::ErrString("err".to_owned()))
376*bb4ee6a4SAndroid Build Coastguard Worker                         .unwrap();
377*bb4ee6a4SAndroid Build Coastguard Worker                 }
378*bb4ee6a4SAndroid Build Coastguard Worker                 println!("server: finished client 2");
379*bb4ee6a4SAndroid Build Coastguard Worker                 control_server
380*bb4ee6a4SAndroid Build Coastguard Worker             });
381*bb4ee6a4SAndroid Build Coastguard Worker 
382*bb4ee6a4SAndroid Build Coastguard Worker             {
383*bb4ee6a4SAndroid Build Coastguard Worker                 println!("client: starting client 1");
384*bb4ee6a4SAndroid Build Coastguard Worker                 let client1 = create_client(&pipe_name);
385*bb4ee6a4SAndroid Build Coastguard Worker                 client1.send(&VmRequest::Powerbtn).unwrap();
386*bb4ee6a4SAndroid Build Coastguard Worker                 assert!(matches!(client1.recv().unwrap(), VmResponse::Ok));
387*bb4ee6a4SAndroid Build Coastguard Worker                 println!("client: finished client 1");
388*bb4ee6a4SAndroid Build Coastguard Worker             }
389*bb4ee6a4SAndroid Build Coastguard Worker 
390*bb4ee6a4SAndroid Build Coastguard Worker             {
391*bb4ee6a4SAndroid Build Coastguard Worker                 println!("client: starting client 2");
392*bb4ee6a4SAndroid Build Coastguard Worker                 let client2 = create_client(&pipe_name);
393*bb4ee6a4SAndroid Build Coastguard Worker                 client2.send(&VmRequest::Exit).unwrap();
394*bb4ee6a4SAndroid Build Coastguard Worker                 let resp = VmResponse::ErrString("err".to_owned());
395*bb4ee6a4SAndroid Build Coastguard Worker                 assert!(matches!(client2.recv::<VmResponse>().unwrap(), resp,));
396*bb4ee6a4SAndroid Build Coastguard Worker                 println!("client: finished client 2");
397*bb4ee6a4SAndroid Build Coastguard Worker             }
398*bb4ee6a4SAndroid Build Coastguard Worker 
399*bb4ee6a4SAndroid Build Coastguard Worker             let control_server = fake_control_loop.try_join(Duration::from_secs(2)).unwrap();
400*bb4ee6a4SAndroid Build Coastguard Worker             control_server.shutdown().unwrap();
401*bb4ee6a4SAndroid Build Coastguard Worker             println!("completed iteration {}", i);
402*bb4ee6a4SAndroid Build Coastguard Worker         }
403*bb4ee6a4SAndroid Build Coastguard Worker     }
404*bb4ee6a4SAndroid Build Coastguard Worker }
405