1 // Copyright 2021 The ChromiumOS Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 use std::io; 6 use std::time::Duration; 7 8 use remain::sorted; 9 use serde::de::DeserializeOwned; 10 use serde::Deserialize; 11 use serde::Serialize; 12 use thiserror::Error as ThisError; 13 14 pub use crate::sys::tube::*; 15 16 impl Tube { 17 /// Given a Tube end, creates two new ends, one each for sending and receiving. split_to_send_recv(self) -> Result<(SendTube, RecvTube)>18 pub fn split_to_send_recv(self) -> Result<(SendTube, RecvTube)> { 19 // Safe because receiving isn't allowd on this end. 20 #[allow(deprecated)] 21 let send_end = self.try_clone()?; 22 23 Ok((SendTube(send_end), RecvTube(self))) 24 } 25 26 /// Creates a Send/Recv pair of Tubes. directional_pair() -> Result<(SendTube, RecvTube)>27 pub fn directional_pair() -> Result<(SendTube, RecvTube)> { 28 let (t1, t2) = Self::pair()?; 29 Ok((SendTube(t1), RecvTube(t2))) 30 } 31 try_clone_send_tube(&self) -> Result<SendTube>32 pub fn try_clone_send_tube(&self) -> Result<SendTube> { 33 // Safe because receiving is only allowed on original Tube. 34 #[allow(deprecated)] 35 let send_end = self.try_clone()?; 36 Ok(SendTube(send_end)) 37 } 38 } 39 40 use crate::AsRawDescriptor; 41 use crate::ReadNotifier; 42 43 #[derive(Serialize, Deserialize)] 44 #[serde(transparent)] 45 /// A Tube end which can only send messages. Cloneable. 46 pub struct SendTube(pub(crate) Tube); 47 48 #[allow(dead_code)] 49 impl SendTube { 50 /// TODO(b/145998747, b/184398671): this method should be removed. set_send_timeout(&self, _timeout: Option<Duration>) -> Result<()>51 pub fn set_send_timeout(&self, _timeout: Option<Duration>) -> Result<()> { 52 unimplemented!("To be removed/refactored upstream."); 53 } 54 send<T: Serialize>(&self, msg: &T) -> Result<()>55 pub fn send<T: Serialize>(&self, msg: &T) -> Result<()> { 56 self.0.send(msg) 57 } 58 try_clone(&self) -> Result<Self>59 pub fn try_clone(&self) -> Result<Self> { 60 Ok(SendTube( 61 #[allow(deprecated)] 62 self.0.try_clone()?, 63 )) 64 } 65 66 /// Never call this function, it is for use by cros_async to provide 67 /// directional wrapper types only. Using it in any other context may 68 /// violate concurrency assumptions. (Type splitting across crates has put 69 /// us in a situation where we can't use Rust privacy to enforce this.) 70 #[deprecated] into_tube(self) -> Tube71 pub fn into_tube(self) -> Tube { 72 self.0 73 } 74 } 75 76 #[derive(Serialize, Deserialize)] 77 #[serde(transparent)] 78 /// A Tube end which can only recv messages. 79 pub struct RecvTube(pub(crate) Tube); 80 81 #[allow(dead_code)] 82 impl RecvTube { recv<T: DeserializeOwned>(&self) -> Result<T>83 pub fn recv<T: DeserializeOwned>(&self) -> Result<T> { 84 self.0.recv() 85 } 86 87 /// TODO(b/145998747, b/184398671): this method should be removed. set_recv_timeout(&self, _timeout: Option<Duration>) -> Result<()>88 pub fn set_recv_timeout(&self, _timeout: Option<Duration>) -> Result<()> { 89 unimplemented!("To be removed/refactored upstream."); 90 } 91 92 /// Never call this function, it is for use by cros_async to provide 93 /// directional wrapper types only. Using it in any other context may 94 /// violate concurrency assumptions. (Type splitting across crates has put 95 /// us in a situation where we can't use Rust privacy to enforce this.) 96 #[deprecated] into_tube(self) -> Tube97 pub fn into_tube(self) -> Tube { 98 self.0 99 } 100 } 101 102 impl ReadNotifier for RecvTube { get_read_notifier(&self) -> &dyn AsRawDescriptor103 fn get_read_notifier(&self) -> &dyn AsRawDescriptor { 104 self.0.get_read_notifier() 105 } 106 } 107 108 #[sorted] 109 #[derive(ThisError, Debug)] 110 pub enum Error { 111 #[cfg(windows)] 112 #[error("attempt to duplicate descriptor via broker failed")] 113 BrokerDupDescriptor, 114 #[error("failed to clone transport: {0}")] 115 Clone(io::Error), 116 #[error("tube was disconnected")] 117 Disconnected, 118 #[error("failed to duplicate descriptor: {0}")] 119 DupDescriptor(io::Error), 120 #[cfg(windows)] 121 #[error("failed to flush named pipe: {0}")] 122 Flush(io::Error), 123 #[cfg(unix)] 124 #[error("byte framing mode is not supported")] 125 InvalidFramingMode, 126 #[error("failed to serialize/deserialize json from packet: {0}")] 127 Json(serde_json::Error), 128 #[error("cancelled a queued async operation")] 129 OperationCancelled, 130 #[error("failed to crate tube pair: {0}")] 131 Pair(io::Error), 132 #[cfg(any(windows, feature = "proto_tube"))] 133 #[error("encountered protobuf error: {0}")] 134 Proto(protobuf::Error), 135 #[error("failed to receive packet: {0}")] 136 Recv(io::Error), 137 #[error("attempted to receive too many file descriptors")] 138 RecvTooManyFds, 139 #[error("Received a message with a zero sized body. This should not happen.")] 140 RecvUnexpectedEmptyBody, 141 #[error("failed to send packet: {0}")] 142 Send(io::Error), 143 #[error("failed to write packet to intermediate buffer: {0}")] 144 SendIoBuf(io::Error), 145 #[error("attempted to send too many file descriptors")] 146 SendTooManyFds, 147 #[error("failed to set recv timeout: {0}")] 148 SetRecvTimeout(io::Error), 149 #[error("failed to set send timeout: {0}")] 150 SetSendTimeout(io::Error), 151 } 152 153 pub type Result<T> = std::result::Result<T, Error>; 154