1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::io;
6 use std::io::Cursor;
7 use std::io::Read;
8 use std::io::Write;
9 use std::mem;
10 use std::os::windows::io::AsRawHandle;
11 use std::os::windows::io::RawHandle;
12 use std::time::Duration;
13
14 use log::warn;
15 use serde::de::DeserializeOwned;
16 use serde::Deserialize;
17 use serde::Serialize;
18 use serde::Serializer;
19 use winapi::shared::winerror::ERROR_MORE_DATA;
20 use zerocopy::AsBytes;
21 use zerocopy::FromBytes;
22 use zerocopy::FromZeroes;
23
24 use crate::descriptor::AsRawDescriptor;
25 use crate::descriptor::FromRawDescriptor;
26 use crate::descriptor::SafeDescriptor;
27 use crate::descriptor_reflection::deserialize_with_descriptors;
28 use crate::descriptor_reflection::SerializeDescriptors;
29 use crate::tube::Error;
30 use crate::tube::RecvTube;
31 use crate::tube::Result;
32 use crate::tube::SendTube;
33 use crate::BlockingMode;
34 use crate::CloseNotifier;
35 use crate::Event;
36 use crate::EventToken;
37 use crate::FramingMode;
38 use crate::PipeConnection;
39 use crate::RawDescriptor;
40 use crate::ReadNotifier;
41 use crate::StreamChannel;
42
43 /// Bidirectional tube that support both send and recv.
44 ///
45 /// NOTE: serializing this type across processes is slightly involved. Suppose there is a Tube pair
46 /// (A, B). We wish to send B to another process, and communicate with it using A from the current
47 /// process:
48 /// 1. B's target_pid must be set to the current PID *before* serialization. There is a
49 /// serialization hook that sets it to the current PID automatically if target_pid is unset.
50 /// 2. A's target_pid must be set to the PID of the process where B was sent.
51 ///
52 /// If instead you are sending both A and B to separate processes, then:
53 /// 1. A's target_pid must be set to B's pid, manually.
54 /// 2. B's target_pid must be set to A's pid, manually.
55 ///
56 /// Automating all of this and getting a completely clean interface is tricky. We would need
57 /// intercept the serialization of Tubes in any part of Serde messages, and use Weak refs to sync
58 /// state about PIDs between the ends. There are alternatives like reusing the underlying
59 /// StreamChannel to share PIDs, or having a separate pipe just for this purpose; however, we've yet
60 /// to find a compelling solution that isn't a mess to implement. Suggestions are welcome.
61 #[derive(Serialize, Deserialize, Debug)]
62 pub struct Tube {
63 socket: StreamChannel,
64
65 // Default target_pid to current PID on serialization (see `Tube` comment header for details).
66 #[serde(serialize_with = "set_tube_pid_on_serialize")]
67 target_pid: Option<u32>,
68 }
69
70 /// For a Tube which has not had its target_pid set, when it is serialized, we should automatically
71 /// default it to the current process, because the other end will be in the current process.
set_tube_pid_on_serialize<S>( existing_pid_value: &Option<u32>, serializer: S, ) -> std::result::Result<S::Ok, S::Error> where S: Serializer,72 fn set_tube_pid_on_serialize<S>(
73 existing_pid_value: &Option<u32>,
74 serializer: S,
75 ) -> std::result::Result<S::Ok, S::Error>
76 where
77 S: Serializer,
78 {
79 match existing_pid_value {
80 Some(pid) => serializer.serialize_u32(*pid),
81 None => serializer.serialize_u32(ALIAS_PID.lock().unwrap_or(std::process::id())),
82 }
83 }
84
85 #[derive(Copy, Clone, Debug, Default, AsBytes, FromZeroes, FromBytes)]
86 #[repr(C)]
87 struct MsgHeader {
88 msg_json_size: usize,
89 descriptor_json_size: usize,
90 }
91
92 static DH_TUBE: sync::Mutex<Option<DuplicateHandleTube>> = sync::Mutex::new(None);
93 static ALIAS_PID: sync::Mutex<Option<u32>> = sync::Mutex::new(None);
94
95 /// Set a tube to delegate duplicate handle calls.
set_duplicate_handle_tube(dh_tube: DuplicateHandleTube)96 pub fn set_duplicate_handle_tube(dh_tube: DuplicateHandleTube) {
97 DH_TUBE.lock().replace(dh_tube);
98 }
99
100 /// Set alias pid for use with a DuplicateHandleTube.
set_alias_pid(alias_pid: u32)101 pub fn set_alias_pid(alias_pid: u32) {
102 ALIAS_PID.lock().replace(alias_pid);
103 }
104
105 impl Tube {
106 /// Create a pair of connected tubes. Request is sent in one direction while response is
107 /// received in the other direction.
108 /// The result is in the form (server, client).
pair() -> Result<(Tube, Tube)>109 pub fn pair() -> Result<(Tube, Tube)> {
110 let (socket1, socket2) = StreamChannel::pair(BlockingMode::Blocking, FramingMode::Message)
111 .map_err(|e| Error::Pair(io::Error::from_raw_os_error(e.errno())))?;
112
113 Ok((Tube::new(socket1), Tube::new(socket2)))
114 }
115
116 /// Create a pair of connected tubes with the specified buffer size.
117 /// Request is sent in one direction while response is received in the other direction.
118 /// The result is in the form (server, client).
pair_with_buffer_size(buffer_size: usize) -> Result<(Tube, Tube)>119 pub fn pair_with_buffer_size(buffer_size: usize) -> Result<(Tube, Tube)> {
120 let (socket1, socket2) = StreamChannel::pair_with_buffer_size(
121 BlockingMode::Blocking,
122 FramingMode::Message,
123 buffer_size,
124 )
125 .map_err(|e| Error::Pair(io::Error::from_raw_os_error(e.errno())))?;
126 let tube1 = Tube::new(socket1);
127 let tube2 = Tube::new(socket2);
128 Ok((tube1, tube2))
129 }
130
131 // Create a new `Tube`.
new(socket: StreamChannel) -> Tube132 pub fn new(socket: StreamChannel) -> Tube {
133 Tube {
134 socket,
135 target_pid: None,
136 }
137 }
138
try_clone(&self) -> Result<Self>139 pub(crate) fn try_clone(&self) -> Result<Self> {
140 Ok(Tube {
141 socket: self.socket.try_clone().map_err(Error::Clone)?,
142 target_pid: self.target_pid,
143 })
144 }
145
send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()>146 fn send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()> {
147 let bytes = msg.write_to_bytes().map_err(Error::Proto)?;
148 let size_header = bytes.len();
149
150 let mut data_packet =
151 Cursor::new(Vec::with_capacity(mem::size_of::<usize>() + size_header));
152 data_packet
153 .write(&size_header.to_le_bytes())
154 .map_err(Error::from_send_io_buf_error)?;
155 data_packet.write(&bytes).map_err(Error::SendIoBuf)?;
156 self.socket
157 .write_immutable(&data_packet.into_inner())
158 .map_err(Error::from_send_error)?;
159
160 Ok(())
161 }
162
recv_proto<M: protobuf::Message>(&self) -> Result<M>163 fn recv_proto<M: protobuf::Message>(&self) -> Result<M> {
164 let mut header_bytes = [0u8; mem::size_of::<usize>()];
165 perform_read(&mut |buf| (&self.socket).read(buf), &mut header_bytes)
166 .map_err(Error::from_recv_io_error)?;
167 let size_header = usize::from_le_bytes(header_bytes);
168
169 let mut proto_bytes = vec![0u8; size_header];
170 perform_read(&mut |buf| (&self.socket).read(buf), &mut proto_bytes)
171 .map_err(Error::from_recv_io_error)?;
172 protobuf::Message::parse_from_bytes(&proto_bytes).map_err(Error::Proto)
173 }
174
send<T: Serialize>(&self, msg: &T) -> Result<()>175 pub fn send<T: Serialize>(&self, msg: &T) -> Result<()> {
176 serialize_and_send(|buf| self.socket.write_immutable(buf), msg, self.target_pid)
177 }
178
recv<T: DeserializeOwned>(&self) -> Result<T>179 pub fn recv<T: DeserializeOwned>(&self) -> Result<T> {
180 deserialize_and_recv(|buf| (&self.socket).read(buf))
181 }
182
183 /// NOTE: On Windows this will only succeed if called on a server pipe. See #pair
184 /// documentation to ensure you have a server pipe before calling.
185 #[cfg(windows)]
flush_blocking(&mut self) -> Result<()>186 pub fn flush_blocking(&mut self) -> Result<()> {
187 self.socket.flush_blocking().map_err(Error::Flush)
188 }
189
190 /// For Tubes that span processes, this method must be used to set the PID of the other end
191 /// of the Tube, otherwise sending handles to the other end won't work.
set_target_pid(&mut self, target_pid: u32)192 pub fn set_target_pid(&mut self, target_pid: u32) {
193 self.target_pid = Some(target_pid);
194 }
195
196 /// Returns the PID of the process at the other end of the Tube, if any is set.
target_pid(&self) -> Option<u32>197 pub fn target_pid(&self) -> Option<u32> {
198 self.target_pid
199 }
200
201 /// TODO(b/145998747, b/184398671): this method should be removed.
set_send_timeout(&self, _timeout: Option<Duration>) -> Result<()>202 pub fn set_send_timeout(&self, _timeout: Option<Duration>) -> Result<()> {
203 unimplemented!("To be removed/refactored upstream.");
204 }
205
206 /// TODO(b/145998747, b/184398671): this method should be removed.
set_recv_timeout(&self, _timeout: Option<Duration>) -> Result<()>207 pub fn set_recv_timeout(&self, _timeout: Option<Duration>) -> Result<()> {
208 unimplemented!("To be removed/refactored upstream.");
209 }
210
get_read_notifier_event(&self) -> &Event211 pub fn get_read_notifier_event(&self) -> &Event {
212 self.socket.get_read_notifier_event()
213 }
214
get_close_notifier_event(&self) -> &Event215 pub fn get_close_notifier_event(&self) -> &Event {
216 self.socket.get_close_notifier_event()
217 }
218 }
219
serialize_and_send<T: Serialize, F: Fn(&[u8]) -> io::Result<usize>>( write_fn: F, msg: &T, target_pid: Option<u32>, ) -> Result<()>220 pub fn serialize_and_send<T: Serialize, F: Fn(&[u8]) -> io::Result<usize>>(
221 write_fn: F,
222 msg: &T,
223 target_pid: Option<u32>,
224 ) -> Result<()> {
225 let msg_serialize = SerializeDescriptors::new(&msg);
226 let msg_json = serde_json::to_vec(&msg_serialize).map_err(Error::Json)?;
227 let msg_descriptors = msg_serialize.into_descriptors();
228
229 let mut duped_descriptors = Vec::with_capacity(msg_descriptors.len());
230 for desc in msg_descriptors {
231 // Safe because these handles are guaranteed to be valid. Details:
232 // 1. They come from base::descriptor_reflection::with_as_descriptor.
233 // 2. with_as_descriptor is intended to be applied to owned descriptor types (e.g. File,
234 // SafeDescriptor).
235 // 3. The owning object is borrowed by msg until sending is complete.
236 duped_descriptors.push(duplicate_handle(desc, target_pid)? as usize)
237 }
238
239 let descriptor_json = if duped_descriptors.is_empty() {
240 None
241 } else {
242 Some(serde_json::to_vec(&duped_descriptors).map_err(Error::Json)?)
243 };
244
245 let header = MsgHeader {
246 msg_json_size: msg_json.len(),
247 descriptor_json_size: descriptor_json.as_ref().map_or(0, |json| json.len()),
248 };
249
250 let mut data_packet = Cursor::new(Vec::with_capacity(
251 header.as_bytes().len() + header.msg_json_size + header.descriptor_json_size,
252 ));
253 data_packet
254 .write(header.as_bytes())
255 .map_err(Error::SendIoBuf)?;
256 data_packet
257 .write(msg_json.as_slice())
258 .map_err(Error::SendIoBuf)?;
259 if let Some(descriptor_json) = descriptor_json {
260 data_packet
261 .write(descriptor_json.as_slice())
262 .map_err(Error::SendIoBuf)?;
263 }
264
265 // Multiple writers (producers) are safe because each write is atomic.
266 let data_bytes = data_packet.into_inner();
267
268 write_fn(&data_bytes).map_err(Error::from_send_error)?;
269 Ok(())
270 }
271
duplicate_handle(desc: RawHandle, target_pid: Option<u32>) -> Result<RawHandle>272 fn duplicate_handle(desc: RawHandle, target_pid: Option<u32>) -> Result<RawHandle> {
273 match target_pid {
274 Some(pid) => match &*DH_TUBE.lock() {
275 Some(tube) => tube.request_duplicate_handle(pid, desc),
276 None => {
277 win_util::duplicate_handle_with_target_pid(desc, pid).map_err(Error::DupDescriptor)
278 }
279 },
280 None => win_util::duplicate_handle(desc).map_err(Error::DupDescriptor),
281 }
282 }
283
284 /// Reads a part of a Tube packet asserting that it was correctly read. This means:
285 /// * Treats partial "message" (transport framing) reads are Ok, as long as we filled our buffer. We
286 /// use this to ignore errors when reading the message header, which has the lengths we need to
287 /// allocate our buffers for the remainder of the message.
288 /// * We filled the supplied buffer.
perform_read<F: FnMut(&mut [u8]) -> io::Result<usize>>( read_fn: &mut F, buf: &mut [u8], ) -> io::Result<usize>289 fn perform_read<F: FnMut(&mut [u8]) -> io::Result<usize>>(
290 read_fn: &mut F,
291 buf: &mut [u8],
292 ) -> io::Result<usize> {
293 let bytes_read = match read_fn(buf) {
294 Ok(s) => Ok(s),
295 Err(e)
296 if e.raw_os_error()
297 .map_or(false, |errno| errno == ERROR_MORE_DATA as i32) =>
298 {
299 Ok(buf.len())
300 }
301 Err(e) => Err(e),
302 }?;
303
304 if bytes_read != buf.len() {
305 Err(io::Error::new(
306 io::ErrorKind::UnexpectedEof,
307 format!(
308 "failed to fill whole buffer, expected {} got {}",
309 buf.len(),
310 bytes_read
311 ),
312 ))
313 } else {
314 Ok(bytes_read)
315 }
316 }
317
318 /// Deserializes a Tube packet by calling the supplied read function. This function MUST
319 /// assert that the buffer was filled.
deserialize_and_recv<T: DeserializeOwned, F: FnMut(&mut [u8]) -> io::Result<usize>>( mut read_fn: F, ) -> Result<T>320 pub fn deserialize_and_recv<T: DeserializeOwned, F: FnMut(&mut [u8]) -> io::Result<usize>>(
321 mut read_fn: F,
322 ) -> Result<T> {
323 let mut header = MsgHeader::default();
324 perform_read(&mut read_fn, header.as_bytes_mut()).map_err(Error::from_recv_io_error)?;
325
326 let mut msg_json = vec![0u8; header.msg_json_size];
327 perform_read(&mut read_fn, msg_json.as_mut_slice()).map_err(Error::from_recv_io_error)?;
328
329 if msg_json.is_empty() {
330 // This means we got a message header, but there is no json body (due to a zero size in
331 // the header). This should never happen because it means the receiver is getting no
332 // data whatsoever from the sender.
333 return Err(Error::RecvUnexpectedEmptyBody);
334 }
335
336 let descriptor_usizes: Vec<usize> = if header.descriptor_json_size > 0 {
337 let mut msg_descriptors_json = vec![0u8; header.descriptor_json_size];
338 perform_read(&mut read_fn, msg_descriptors_json.as_mut_slice())
339 .map_err(Error::from_recv_io_error)?;
340 serde_json::from_slice(msg_descriptors_json.as_slice()).map_err(Error::Json)?
341 } else {
342 Vec::new()
343 };
344
345 let msg_descriptors = descriptor_usizes.into_iter().map(|item| {
346 // SAFETY: the usizes are RawDescriptors that were duplicated and converted to usize in the
347 // send method.
348 unsafe { SafeDescriptor::from_raw_descriptor(item as RawDescriptor) }
349 });
350
351 deserialize_with_descriptors(|| serde_json::from_slice(&msg_json), msg_descriptors)
352 .map_err(Error::Json)
353 }
354
355 #[derive(EventToken, Eq, PartialEq, Copy, Clone)]
356 enum Token {
357 SocketReady,
358 }
359
360 impl AsRawDescriptor for Tube {
as_raw_descriptor(&self) -> RawDescriptor361 fn as_raw_descriptor(&self) -> RawDescriptor {
362 self.socket.as_raw_descriptor()
363 }
364 }
365
366 impl AsRawHandle for Tube {
as_raw_handle(&self) -> RawHandle367 fn as_raw_handle(&self) -> RawHandle {
368 self.as_raw_descriptor()
369 }
370 }
371
372 impl ReadNotifier for Tube {
get_read_notifier(&self) -> &dyn AsRawDescriptor373 fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
374 self.socket.get_read_notifier()
375 }
376 }
377
378 impl CloseNotifier for Tube {
get_close_notifier(&self) -> &dyn AsRawDescriptor379 fn get_close_notifier(&self) -> &dyn AsRawDescriptor {
380 self.socket.get_close_notifier()
381 }
382 }
383
384 impl AsRawDescriptor for SendTube {
as_raw_descriptor(&self) -> RawDescriptor385 fn as_raw_descriptor(&self) -> RawDescriptor {
386 self.0.as_raw_descriptor()
387 }
388 }
389
390 impl AsRawDescriptor for RecvTube {
as_raw_descriptor(&self) -> RawDescriptor391 fn as_raw_descriptor(&self) -> RawDescriptor {
392 self.0.as_raw_descriptor()
393 }
394 }
395
396 impl CloseNotifier for SendTube {
get_close_notifier(&self) -> &dyn AsRawDescriptor397 fn get_close_notifier(&self) -> &dyn AsRawDescriptor {
398 self.0.get_close_notifier()
399 }
400 }
401
402 impl CloseNotifier for RecvTube {
get_close_notifier(&self) -> &dyn AsRawDescriptor403 fn get_close_notifier(&self) -> &dyn AsRawDescriptor {
404 self.0.get_close_notifier()
405 }
406 }
407
408 /// A request to duplicate a handle to a target process.
409 #[derive(Serialize, Deserialize, Debug)]
410 pub struct DuplicateHandleRequest {
411 pub target_alias_pid: u32,
412 pub handle: usize,
413 }
414
415 /// Contains a duplicated handle or None if an error occurred.
416 #[derive(Serialize, Deserialize, Debug)]
417 pub struct DuplicateHandleResponse {
418 pub handle: Option<usize>,
419 }
420
421 /// Wrapper for tube which is used to delegate DuplicateHandle function calls to
422 /// the broker process.
423 #[derive(Serialize, Deserialize, Debug)]
424 pub struct DuplicateHandleTube(Tube);
425
426 impl DuplicateHandleTube {
new(tube: Tube) -> Self427 pub fn new(tube: Tube) -> Self {
428 Self(tube)
429 }
430
request_duplicate_handle( &self, target_alias_pid: u32, handle: RawHandle, ) -> Result<RawHandle>431 pub fn request_duplicate_handle(
432 &self,
433 target_alias_pid: u32,
434 handle: RawHandle,
435 ) -> Result<RawHandle> {
436 let req = DuplicateHandleRequest {
437 target_alias_pid,
438 handle: handle as usize,
439 };
440 self.0.send(&req)?;
441 let res: DuplicateHandleResponse = self.0.recv()?;
442 res.handle
443 .map(|h| h as RawHandle)
444 .ok_or(Error::BrokerDupDescriptor)
445 }
446 }
447
448 /// Wrapper for Tube used for sending and recving protos. The main usecase is to send a message
449 /// without serialization bloat caused from `serde-json`.
450 #[derive(Serialize, Deserialize)]
451 pub struct ProtoTube(Tube);
452
453 impl ProtoTube {
pair() -> Result<(ProtoTube, ProtoTube)>454 pub fn pair() -> Result<(ProtoTube, ProtoTube)> {
455 Tube::pair().map(|(t1, t2)| (ProtoTube(t1), ProtoTube(t2)))
456 }
457
pair_with_buffer_size(size: usize) -> Result<(ProtoTube, ProtoTube)>458 pub fn pair_with_buffer_size(size: usize) -> Result<(ProtoTube, ProtoTube)> {
459 Tube::pair_with_buffer_size(size).map(|(t1, t2)| (ProtoTube(t1), ProtoTube(t2)))
460 }
461
send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()>462 pub fn send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()> {
463 self.0.send_proto(msg)
464 }
465
recv_proto<M: protobuf::Message>(&self) -> Result<M>466 pub fn recv_proto<M: protobuf::Message>(&self) -> Result<M> {
467 self.0.recv_proto()
468 }
469 }
470
471 impl ReadNotifier for ProtoTube {
get_read_notifier(&self) -> &dyn AsRawDescriptor472 fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
473 self.0.get_read_notifier()
474 }
475 }
476
477 impl AsRawDescriptor for ProtoTube {
as_raw_descriptor(&self) -> RawDescriptor478 fn as_raw_descriptor(&self) -> RawDescriptor {
479 self.0.as_raw_descriptor()
480 }
481 }
482
483 /// A wrapper around a named pipe that uses Tube serialization.
484 ///
485 /// This limited form of `Tube` offers absolutely no notifier support, and can only send/recv
486 /// blocking messages.
487 pub struct PipeTube {
488 pipe: PipeConnection,
489
490 // Default target_pid to current PID on serialization (see `Tube` comment header for details).
491 target_pid: Option<u32>,
492 }
493
494 impl PipeTube {
from(pipe: PipeConnection, target_pid: Option<u32>) -> Self495 pub fn from(pipe: PipeConnection, target_pid: Option<u32>) -> Self {
496 Self { pipe, target_pid }
497 }
498
send<T: Serialize>(&self, msg: &T) -> Result<()>499 pub fn send<T: Serialize>(&self, msg: &T) -> Result<()> {
500 serialize_and_send(|buf| self.pipe.write(buf), msg, self.target_pid)
501 }
502
recv<T: DeserializeOwned>(&self) -> Result<T>503 pub fn recv<T: DeserializeOwned>(&self) -> Result<T> {
504 deserialize_and_recv(|buf| {
505 // SAFETY:
506 // 1. We are reading bytes, so no matter what data is on the pipe, it is representable
507 // as bytes.
508 // 2. A read is quantized in bytes, so no partial reads are possible.
509 unsafe { self.pipe.read(buf) }
510 })
511 }
512 }
513
514 /// Wrapper around a Tube which is known to be the server end of a named pipe. This wrapper ensures
515 /// that the Tube is flushed before it is dropped.
516 pub struct FlushOnDropTube(pub Tube);
517
518 impl FlushOnDropTube {
from(tube: Tube) -> Self519 pub fn from(tube: Tube) -> Self {
520 Self(tube)
521 }
522 }
523
524 impl Drop for FlushOnDropTube {
drop(&mut self)525 fn drop(&mut self) {
526 if let Err(e) = self.0.flush_blocking() {
527 warn!("failed to flush Tube: {}", e)
528 }
529 }
530 }
531
532 impl Error {
map_io_error(e: io::Error, err_ctor: fn(io::Error) -> Error) -> Error533 fn map_io_error(e: io::Error, err_ctor: fn(io::Error) -> Error) -> Error {
534 if e.kind() == io::ErrorKind::UnexpectedEof || e.kind() == io::ErrorKind::BrokenPipe {
535 Error::Disconnected
536 } else {
537 err_ctor(e)
538 }
539 }
540
from_recv_io_error(e: io::Error) -> Error541 fn from_recv_io_error(e: io::Error) -> Error {
542 Self::map_io_error(e, Error::Recv)
543 }
544
from_send_error(e: io::Error) -> Error545 fn from_send_error(e: io::Error) -> Error {
546 Self::map_io_error(e, Error::Send)
547 }
548
from_send_io_buf_error(e: io::Error) -> Error549 fn from_send_io_buf_error(e: io::Error) -> Error {
550 Self::map_io_error(e, Error::SendIoBuf)
551 }
552 }
553