xref: /aosp_15_r20/external/crosvm/base/src/sys/unix/tube.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::os::unix::prelude::AsRawFd;
6 use std::os::unix::prelude::RawFd;
7 use std::time::Duration;
8 
9 use serde::de::DeserializeOwned;
10 use serde::Deserialize;
11 use serde::Serialize;
12 
13 use crate::descriptor::AsRawDescriptor;
14 use crate::descriptor_reflection::deserialize_with_descriptors;
15 use crate::descriptor_reflection::SerializeDescriptors;
16 use crate::handle_eintr;
17 use crate::tube::Error;
18 use crate::tube::RecvTube;
19 use crate::tube::Result;
20 use crate::tube::SendTube;
21 use crate::BlockingMode;
22 use crate::FramingMode;
23 use crate::RawDescriptor;
24 use crate::ReadNotifier;
25 use crate::ScmSocket;
26 use crate::StreamChannel;
27 use crate::UnixSeqpacket;
28 use crate::SCM_SOCKET_MAX_FD_COUNT;
29 
30 // This size matches the inline buffer size of CmsgBuffer.
31 const TUBE_MAX_FDS: usize = 32;
32 
33 /// Bidirectional tube that support both send and recv.
34 #[derive(Serialize, Deserialize)]
35 pub struct Tube {
36     socket: ScmSocket<StreamChannel>,
37 }
38 
39 impl Tube {
40     /// Create a pair of connected tubes. Request is sent in one direction while response is in the
41     /// other direction.
pair() -> Result<(Tube, Tube)>42     pub fn pair() -> Result<(Tube, Tube)> {
43         let (socket1, socket2) = StreamChannel::pair(BlockingMode::Blocking, FramingMode::Message)
44             .map_err(|errno| Error::Pair(std::io::Error::from(errno)))?;
45         let tube1 = Tube::new(socket1)?;
46         let tube2 = Tube::new(socket2)?;
47         Ok((tube1, tube2))
48     }
49 
50     /// Create a new `Tube` from a `StreamChannel`.
51     /// The StreamChannel must use FramingMode::Message (meaning, must use a SOCK_SEQPACKET as the
52     /// underlying socket type), otherwise, this method returns an error.
new(socket: StreamChannel) -> Result<Tube>53     pub fn new(socket: StreamChannel) -> Result<Tube> {
54         match socket.get_framing_mode() {
55             FramingMode::Message => Ok(Tube {
56                 socket: socket.try_into().map_err(Error::DupDescriptor)?,
57             }),
58             FramingMode::Byte => Err(Error::InvalidFramingMode),
59         }
60     }
61 
62     /// Create a new `Tube` from a UnixSeqpacket. The StreamChannel is implicitly constructed to
63     /// have the right FramingMode by being constructed from a UnixSeqpacket.
new_from_unix_seqpacket(sock: UnixSeqpacket) -> Result<Tube>64     pub fn new_from_unix_seqpacket(sock: UnixSeqpacket) -> Result<Tube> {
65         Ok(Tube {
66             socket: StreamChannel::from_unix_seqpacket(sock)
67                 .try_into()
68                 .map_err(Error::DupDescriptor)?,
69         })
70     }
71 
72     /// DO NOT USE this method directly as it will become private soon (b/221484449). Use a
73     /// directional Tube pair instead.
74     #[deprecated]
try_clone(&self) -> Result<Self>75     pub fn try_clone(&self) -> Result<Self> {
76         self.socket
77             .inner()
78             .try_clone()
79             .map(Tube::new)
80             .map_err(Error::Clone)?
81     }
82 
83     /// Sends a message via a Tube.
84     /// The number of file descriptors that this method can send is limited to `TUBE_MAX_FDS`.
85     /// If you want to send more descriptors, use `send_with_max_fds` instead.
send<T: Serialize>(&self, msg: &T) -> Result<()>86     pub fn send<T: Serialize>(&self, msg: &T) -> Result<()> {
87         self.send_with_max_fds(msg, TUBE_MAX_FDS)
88     }
89 
90     /// Sends a message with at most `max_fds` file descriptors via a Tube.
91     /// Note that `max_fds` must not exceed `SCM_SOCKET_MAX_FD_COUNT` (= 253).
send_with_max_fds<T: Serialize>(&self, msg: &T, max_fds: usize) -> Result<()>92     pub fn send_with_max_fds<T: Serialize>(&self, msg: &T, max_fds: usize) -> Result<()> {
93         if max_fds > SCM_SOCKET_MAX_FD_COUNT {
94             return Err(Error::SendTooManyFds);
95         }
96         let msg_serialize = SerializeDescriptors::new(&msg);
97         let msg_json = serde_json::to_vec(&msg_serialize).map_err(Error::Json)?;
98         let msg_descriptors = msg_serialize.into_descriptors();
99 
100         if msg_descriptors.len() > max_fds {
101             return Err(Error::SendTooManyFds);
102         }
103 
104         handle_eintr!(self.socket.send_with_fds(&msg_json, &msg_descriptors))
105             .map_err(Error::Send)?;
106         Ok(())
107     }
108 
109     /// Recieves a message from a Tube.
110     /// If the sender sent file descriptors more than TUBE_MAX_FDS with `send_with_max_fds`, use
111     /// `recv_with_max_fds` instead.
recv<T: DeserializeOwned>(&self) -> Result<T>112     pub fn recv<T: DeserializeOwned>(&self) -> Result<T> {
113         self.recv_with_max_fds(TUBE_MAX_FDS)
114     }
115 
116     /// Recieves a message with at most `max_fds` file descriptors from a Tube.
recv_with_max_fds<T: DeserializeOwned>(&self, max_fds: usize) -> Result<T>117     pub fn recv_with_max_fds<T: DeserializeOwned>(&self, max_fds: usize) -> Result<T> {
118         if max_fds > SCM_SOCKET_MAX_FD_COUNT {
119             return Err(Error::RecvTooManyFds);
120         }
121 
122         // WARNING: The `cros_async` and `base_tokio` tube wrappers both assume that, if the tube
123         // is readable, then a call to `Tube::recv` will not block (which ought to be true since we
124         // use SOCK_SEQPACKET and a single recvmsg call currently).
125 
126         let msg_size = handle_eintr!(self.socket.inner().peek_size()).map_err(Error::Recv)?;
127         // This buffer is the right size, as the size received in peek_size() represents the size
128         // of only the message itself and not the file descriptors. The descriptors are stored
129         // separately in msghdr::msg_control.
130         let mut msg_json = vec![0u8; msg_size];
131 
132         let (msg_json_size, msg_descriptors) =
133             handle_eintr!(self.socket.recv_with_fds(&mut msg_json, max_fds))
134                 .map_err(Error::Recv)?;
135 
136         if msg_json_size == 0 {
137             return Err(Error::Disconnected);
138         }
139 
140         deserialize_with_descriptors(
141             || serde_json::from_slice(&msg_json[0..msg_json_size]),
142             msg_descriptors,
143         )
144         .map_err(Error::Json)
145     }
146 
set_send_timeout(&self, timeout: Option<Duration>) -> Result<()>147     pub fn set_send_timeout(&self, timeout: Option<Duration>) -> Result<()> {
148         self.socket
149             .inner()
150             .set_write_timeout(timeout)
151             .map_err(Error::SetSendTimeout)
152     }
153 
set_recv_timeout(&self, timeout: Option<Duration>) -> Result<()>154     pub fn set_recv_timeout(&self, timeout: Option<Duration>) -> Result<()> {
155         self.socket
156             .inner()
157             .set_read_timeout(timeout)
158             .map_err(Error::SetRecvTimeout)
159     }
160 
161     #[cfg(feature = "proto_tube")]
send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()>162     fn send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()> {
163         let bytes = msg.write_to_bytes().map_err(Error::Proto)?;
164         let no_fds: [RawFd; 0] = [];
165 
166         handle_eintr!(self.socket.send_with_fds(&bytes, &no_fds)).map_err(Error::Send)?;
167 
168         Ok(())
169     }
170 
171     #[cfg(feature = "proto_tube")]
recv_proto<M: protobuf::Message>(&self) -> Result<M>172     fn recv_proto<M: protobuf::Message>(&self) -> Result<M> {
173         let msg_size = handle_eintr!(self.socket.inner().peek_size()).map_err(Error::Recv)?;
174         let mut msg_bytes = vec![0u8; msg_size];
175 
176         let (msg_bytes_size, _) =
177             handle_eintr!(self.socket.recv_with_fds(&mut msg_bytes, TUBE_MAX_FDS))
178                 .map_err(Error::Recv)?;
179 
180         if msg_bytes_size == 0 {
181             return Err(Error::Disconnected);
182         }
183 
184         protobuf::Message::parse_from_bytes(&msg_bytes).map_err(Error::Proto)
185     }
186 }
187 
188 impl AsRawDescriptor for Tube {
as_raw_descriptor(&self) -> RawDescriptor189     fn as_raw_descriptor(&self) -> RawDescriptor {
190         self.socket.as_raw_descriptor()
191     }
192 }
193 
194 impl AsRawFd for Tube {
as_raw_fd(&self) -> RawFd195     fn as_raw_fd(&self) -> RawFd {
196         self.socket.inner().as_raw_fd()
197     }
198 }
199 
200 impl ReadNotifier for Tube {
get_read_notifier(&self) -> &dyn AsRawDescriptor201     fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
202         &self.socket
203     }
204 }
205 
206 impl AsRawDescriptor for SendTube {
as_raw_descriptor(&self) -> RawDescriptor207     fn as_raw_descriptor(&self) -> RawDescriptor {
208         self.0.as_raw_descriptor()
209     }
210 }
211 
212 impl AsRawDescriptor for RecvTube {
as_raw_descriptor(&self) -> RawDescriptor213     fn as_raw_descriptor(&self) -> RawDescriptor {
214         self.0.as_raw_descriptor()
215     }
216 }
217 
218 /// Wrapper for Tube used for sending and receiving protos - avoids extra overhead of serialization
219 /// via serde_json. Since protos should be standalone objects we do not support sending of file
220 /// descriptors as a normal Tube would.
221 #[cfg(feature = "proto_tube")]
222 pub struct ProtoTube(Tube);
223 
224 #[cfg(feature = "proto_tube")]
225 impl ProtoTube {
pair() -> Result<(ProtoTube, ProtoTube)>226     pub fn pair() -> Result<(ProtoTube, ProtoTube)> {
227         Tube::pair().map(|(t1, t2)| (ProtoTube(t1), ProtoTube(t2)))
228     }
229 
send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()>230     pub fn send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()> {
231         self.0.send_proto(msg)
232     }
233 
recv_proto<M: protobuf::Message>(&self) -> Result<M>234     pub fn recv_proto<M: protobuf::Message>(&self) -> Result<M> {
235         self.0.recv_proto()
236     }
237 
new_from_unix_seqpacket(sock: UnixSeqpacket) -> Result<ProtoTube>238     pub fn new_from_unix_seqpacket(sock: UnixSeqpacket) -> Result<ProtoTube> {
239         Ok(ProtoTube(Tube::new_from_unix_seqpacket(sock)?))
240     }
241 }
242 
243 #[cfg(all(feature = "proto_tube", test))]
244 #[allow(unused_variables)]
245 mod tests {
246     // not testing this proto specifically, just need an existing one to test the ProtoTube.
247     use protos::cdisk_spec::ComponentDisk;
248 
249     use super::*;
250 
251     #[test]
tube_serializes_and_deserializes()252     fn tube_serializes_and_deserializes() {
253         let (pt1, pt2) = ProtoTube::pair().unwrap();
254         let proto = ComponentDisk {
255             file_path: "/some/cool/path".to_string(),
256             offset: 99,
257             ..ComponentDisk::new()
258         };
259 
260         pt1.send_proto(&proto).unwrap();
261 
262         let recv_proto = pt2.recv_proto().unwrap();
263 
264         assert!(proto.eq(&recv_proto));
265     }
266 }
267