1 // Copyright 2021 The ChromiumOS Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 use std::os::unix::prelude::AsRawFd; 6 use std::os::unix::prelude::RawFd; 7 use std::time::Duration; 8 9 use serde::de::DeserializeOwned; 10 use serde::Deserialize; 11 use serde::Serialize; 12 13 use crate::descriptor::AsRawDescriptor; 14 use crate::descriptor_reflection::deserialize_with_descriptors; 15 use crate::descriptor_reflection::SerializeDescriptors; 16 use crate::handle_eintr; 17 use crate::tube::Error; 18 use crate::tube::RecvTube; 19 use crate::tube::Result; 20 use crate::tube::SendTube; 21 use crate::BlockingMode; 22 use crate::FramingMode; 23 use crate::RawDescriptor; 24 use crate::ReadNotifier; 25 use crate::ScmSocket; 26 use crate::StreamChannel; 27 use crate::UnixSeqpacket; 28 use crate::SCM_SOCKET_MAX_FD_COUNT; 29 30 // This size matches the inline buffer size of CmsgBuffer. 31 const TUBE_MAX_FDS: usize = 32; 32 33 /// Bidirectional tube that support both send and recv. 34 #[derive(Serialize, Deserialize)] 35 pub struct Tube { 36 socket: ScmSocket<StreamChannel>, 37 } 38 39 impl Tube { 40 /// Create a pair of connected tubes. Request is sent in one direction while response is in the 41 /// other direction. pair() -> Result<(Tube, Tube)>42 pub fn pair() -> Result<(Tube, Tube)> { 43 let (socket1, socket2) = StreamChannel::pair(BlockingMode::Blocking, FramingMode::Message) 44 .map_err(|errno| Error::Pair(std::io::Error::from(errno)))?; 45 let tube1 = Tube::new(socket1)?; 46 let tube2 = Tube::new(socket2)?; 47 Ok((tube1, tube2)) 48 } 49 50 /// Create a new `Tube` from a `StreamChannel`. 51 /// The StreamChannel must use FramingMode::Message (meaning, must use a SOCK_SEQPACKET as the 52 /// underlying socket type), otherwise, this method returns an error. new(socket: StreamChannel) -> Result<Tube>53 pub fn new(socket: StreamChannel) -> Result<Tube> { 54 match socket.get_framing_mode() { 55 FramingMode::Message => Ok(Tube { 56 socket: socket.try_into().map_err(Error::DupDescriptor)?, 57 }), 58 FramingMode::Byte => Err(Error::InvalidFramingMode), 59 } 60 } 61 62 /// Create a new `Tube` from a UnixSeqpacket. The StreamChannel is implicitly constructed to 63 /// have the right FramingMode by being constructed from a UnixSeqpacket. new_from_unix_seqpacket(sock: UnixSeqpacket) -> Result<Tube>64 pub fn new_from_unix_seqpacket(sock: UnixSeqpacket) -> Result<Tube> { 65 Ok(Tube { 66 socket: StreamChannel::from_unix_seqpacket(sock) 67 .try_into() 68 .map_err(Error::DupDescriptor)?, 69 }) 70 } 71 72 /// DO NOT USE this method directly as it will become private soon (b/221484449). Use a 73 /// directional Tube pair instead. 74 #[deprecated] try_clone(&self) -> Result<Self>75 pub fn try_clone(&self) -> Result<Self> { 76 self.socket 77 .inner() 78 .try_clone() 79 .map(Tube::new) 80 .map_err(Error::Clone)? 81 } 82 83 /// Sends a message via a Tube. 84 /// The number of file descriptors that this method can send is limited to `TUBE_MAX_FDS`. 85 /// If you want to send more descriptors, use `send_with_max_fds` instead. send<T: Serialize>(&self, msg: &T) -> Result<()>86 pub fn send<T: Serialize>(&self, msg: &T) -> Result<()> { 87 self.send_with_max_fds(msg, TUBE_MAX_FDS) 88 } 89 90 /// Sends a message with at most `max_fds` file descriptors via a Tube. 91 /// Note that `max_fds` must not exceed `SCM_SOCKET_MAX_FD_COUNT` (= 253). send_with_max_fds<T: Serialize>(&self, msg: &T, max_fds: usize) -> Result<()>92 pub fn send_with_max_fds<T: Serialize>(&self, msg: &T, max_fds: usize) -> Result<()> { 93 if max_fds > SCM_SOCKET_MAX_FD_COUNT { 94 return Err(Error::SendTooManyFds); 95 } 96 let msg_serialize = SerializeDescriptors::new(&msg); 97 let msg_json = serde_json::to_vec(&msg_serialize).map_err(Error::Json)?; 98 let msg_descriptors = msg_serialize.into_descriptors(); 99 100 if msg_descriptors.len() > max_fds { 101 return Err(Error::SendTooManyFds); 102 } 103 104 handle_eintr!(self.socket.send_with_fds(&msg_json, &msg_descriptors)) 105 .map_err(Error::Send)?; 106 Ok(()) 107 } 108 109 /// Recieves a message from a Tube. 110 /// If the sender sent file descriptors more than TUBE_MAX_FDS with `send_with_max_fds`, use 111 /// `recv_with_max_fds` instead. recv<T: DeserializeOwned>(&self) -> Result<T>112 pub fn recv<T: DeserializeOwned>(&self) -> Result<T> { 113 self.recv_with_max_fds(TUBE_MAX_FDS) 114 } 115 116 /// Recieves a message with at most `max_fds` file descriptors from a Tube. recv_with_max_fds<T: DeserializeOwned>(&self, max_fds: usize) -> Result<T>117 pub fn recv_with_max_fds<T: DeserializeOwned>(&self, max_fds: usize) -> Result<T> { 118 if max_fds > SCM_SOCKET_MAX_FD_COUNT { 119 return Err(Error::RecvTooManyFds); 120 } 121 122 // WARNING: The `cros_async` and `base_tokio` tube wrappers both assume that, if the tube 123 // is readable, then a call to `Tube::recv` will not block (which ought to be true since we 124 // use SOCK_SEQPACKET and a single recvmsg call currently). 125 126 let msg_size = handle_eintr!(self.socket.inner().peek_size()).map_err(Error::Recv)?; 127 // This buffer is the right size, as the size received in peek_size() represents the size 128 // of only the message itself and not the file descriptors. The descriptors are stored 129 // separately in msghdr::msg_control. 130 let mut msg_json = vec![0u8; msg_size]; 131 132 let (msg_json_size, msg_descriptors) = 133 handle_eintr!(self.socket.recv_with_fds(&mut msg_json, max_fds)) 134 .map_err(Error::Recv)?; 135 136 if msg_json_size == 0 { 137 return Err(Error::Disconnected); 138 } 139 140 deserialize_with_descriptors( 141 || serde_json::from_slice(&msg_json[0..msg_json_size]), 142 msg_descriptors, 143 ) 144 .map_err(Error::Json) 145 } 146 set_send_timeout(&self, timeout: Option<Duration>) -> Result<()>147 pub fn set_send_timeout(&self, timeout: Option<Duration>) -> Result<()> { 148 self.socket 149 .inner() 150 .set_write_timeout(timeout) 151 .map_err(Error::SetSendTimeout) 152 } 153 set_recv_timeout(&self, timeout: Option<Duration>) -> Result<()>154 pub fn set_recv_timeout(&self, timeout: Option<Duration>) -> Result<()> { 155 self.socket 156 .inner() 157 .set_read_timeout(timeout) 158 .map_err(Error::SetRecvTimeout) 159 } 160 161 #[cfg(feature = "proto_tube")] send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()>162 fn send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()> { 163 let bytes = msg.write_to_bytes().map_err(Error::Proto)?; 164 let no_fds: [RawFd; 0] = []; 165 166 handle_eintr!(self.socket.send_with_fds(&bytes, &no_fds)).map_err(Error::Send)?; 167 168 Ok(()) 169 } 170 171 #[cfg(feature = "proto_tube")] recv_proto<M: protobuf::Message>(&self) -> Result<M>172 fn recv_proto<M: protobuf::Message>(&self) -> Result<M> { 173 let msg_size = handle_eintr!(self.socket.inner().peek_size()).map_err(Error::Recv)?; 174 let mut msg_bytes = vec![0u8; msg_size]; 175 176 let (msg_bytes_size, _) = 177 handle_eintr!(self.socket.recv_with_fds(&mut msg_bytes, TUBE_MAX_FDS)) 178 .map_err(Error::Recv)?; 179 180 if msg_bytes_size == 0 { 181 return Err(Error::Disconnected); 182 } 183 184 protobuf::Message::parse_from_bytes(&msg_bytes).map_err(Error::Proto) 185 } 186 } 187 188 impl AsRawDescriptor for Tube { as_raw_descriptor(&self) -> RawDescriptor189 fn as_raw_descriptor(&self) -> RawDescriptor { 190 self.socket.as_raw_descriptor() 191 } 192 } 193 194 impl AsRawFd for Tube { as_raw_fd(&self) -> RawFd195 fn as_raw_fd(&self) -> RawFd { 196 self.socket.inner().as_raw_fd() 197 } 198 } 199 200 impl ReadNotifier for Tube { get_read_notifier(&self) -> &dyn AsRawDescriptor201 fn get_read_notifier(&self) -> &dyn AsRawDescriptor { 202 &self.socket 203 } 204 } 205 206 impl AsRawDescriptor for SendTube { as_raw_descriptor(&self) -> RawDescriptor207 fn as_raw_descriptor(&self) -> RawDescriptor { 208 self.0.as_raw_descriptor() 209 } 210 } 211 212 impl AsRawDescriptor for RecvTube { as_raw_descriptor(&self) -> RawDescriptor213 fn as_raw_descriptor(&self) -> RawDescriptor { 214 self.0.as_raw_descriptor() 215 } 216 } 217 218 /// Wrapper for Tube used for sending and receiving protos - avoids extra overhead of serialization 219 /// via serde_json. Since protos should be standalone objects we do not support sending of file 220 /// descriptors as a normal Tube would. 221 #[cfg(feature = "proto_tube")] 222 pub struct ProtoTube(Tube); 223 224 #[cfg(feature = "proto_tube")] 225 impl ProtoTube { pair() -> Result<(ProtoTube, ProtoTube)>226 pub fn pair() -> Result<(ProtoTube, ProtoTube)> { 227 Tube::pair().map(|(t1, t2)| (ProtoTube(t1), ProtoTube(t2))) 228 } 229 send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()>230 pub fn send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()> { 231 self.0.send_proto(msg) 232 } 233 recv_proto<M: protobuf::Message>(&self) -> Result<M>234 pub fn recv_proto<M: protobuf::Message>(&self) -> Result<M> { 235 self.0.recv_proto() 236 } 237 new_from_unix_seqpacket(sock: UnixSeqpacket) -> Result<ProtoTube>238 pub fn new_from_unix_seqpacket(sock: UnixSeqpacket) -> Result<ProtoTube> { 239 Ok(ProtoTube(Tube::new_from_unix_seqpacket(sock)?)) 240 } 241 } 242 243 #[cfg(all(feature = "proto_tube", test))] 244 #[allow(unused_variables)] 245 mod tests { 246 // not testing this proto specifically, just need an existing one to test the ProtoTube. 247 use protos::cdisk_spec::ComponentDisk; 248 249 use super::*; 250 251 #[test] tube_serializes_and_deserializes()252 fn tube_serializes_and_deserializes() { 253 let (pt1, pt2) = ProtoTube::pair().unwrap(); 254 let proto = ComponentDisk { 255 file_path: "/some/cool/path".to_string(), 256 offset: 99, 257 ..ComponentDisk::new() 258 }; 259 260 pt1.send_proto(&proto).unwrap(); 261 262 let recv_proto = pt2.recv_proto().unwrap(); 263 264 assert!(proto.eq(&recv_proto)); 265 } 266 } 267