xref: /aosp_15_r20/external/crosvm/base/src/tube.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::io;
6 use std::time::Duration;
7 
8 use remain::sorted;
9 use serde::de::DeserializeOwned;
10 use serde::Deserialize;
11 use serde::Serialize;
12 use thiserror::Error as ThisError;
13 
14 pub use crate::sys::tube::*;
15 
16 impl Tube {
17     /// Given a Tube end, creates two new ends, one each for sending and receiving.
split_to_send_recv(self) -> Result<(SendTube, RecvTube)>18     pub fn split_to_send_recv(self) -> Result<(SendTube, RecvTube)> {
19         // Safe because receiving isn't allowd on this end.
20         #[allow(deprecated)]
21         let send_end = self.try_clone()?;
22 
23         Ok((SendTube(send_end), RecvTube(self)))
24     }
25 
26     /// Creates a Send/Recv pair of Tubes.
directional_pair() -> Result<(SendTube, RecvTube)>27     pub fn directional_pair() -> Result<(SendTube, RecvTube)> {
28         let (t1, t2) = Self::pair()?;
29         Ok((SendTube(t1), RecvTube(t2)))
30     }
31 
try_clone_send_tube(&self) -> Result<SendTube>32     pub fn try_clone_send_tube(&self) -> Result<SendTube> {
33         // Safe because receiving is only allowed on original Tube.
34         #[allow(deprecated)]
35         let send_end = self.try_clone()?;
36         Ok(SendTube(send_end))
37     }
38 }
39 
40 use crate::AsRawDescriptor;
41 use crate::ReadNotifier;
42 
43 #[derive(Serialize, Deserialize)]
44 #[serde(transparent)]
45 /// A Tube end which can only send messages. Cloneable.
46 pub struct SendTube(pub(crate) Tube);
47 
48 #[allow(dead_code)]
49 impl SendTube {
50     /// TODO(b/145998747, b/184398671): this method should be removed.
set_send_timeout(&self, _timeout: Option<Duration>) -> Result<()>51     pub fn set_send_timeout(&self, _timeout: Option<Duration>) -> Result<()> {
52         unimplemented!("To be removed/refactored upstream.");
53     }
54 
send<T: Serialize>(&self, msg: &T) -> Result<()>55     pub fn send<T: Serialize>(&self, msg: &T) -> Result<()> {
56         self.0.send(msg)
57     }
58 
try_clone(&self) -> Result<Self>59     pub fn try_clone(&self) -> Result<Self> {
60         Ok(SendTube(
61             #[allow(deprecated)]
62             self.0.try_clone()?,
63         ))
64     }
65 
66     /// Never call this function, it is for use by cros_async to provide
67     /// directional wrapper types only. Using it in any other context may
68     /// violate concurrency assumptions. (Type splitting across crates has put
69     /// us in a situation where we can't use Rust privacy to enforce this.)
70     #[deprecated]
into_tube(self) -> Tube71     pub fn into_tube(self) -> Tube {
72         self.0
73     }
74 }
75 
76 #[derive(Serialize, Deserialize)]
77 #[serde(transparent)]
78 /// A Tube end which can only recv messages.
79 pub struct RecvTube(pub(crate) Tube);
80 
81 #[allow(dead_code)]
82 impl RecvTube {
recv<T: DeserializeOwned>(&self) -> Result<T>83     pub fn recv<T: DeserializeOwned>(&self) -> Result<T> {
84         self.0.recv()
85     }
86 
87     /// TODO(b/145998747, b/184398671): this method should be removed.
set_recv_timeout(&self, _timeout: Option<Duration>) -> Result<()>88     pub fn set_recv_timeout(&self, _timeout: Option<Duration>) -> Result<()> {
89         unimplemented!("To be removed/refactored upstream.");
90     }
91 
92     /// Never call this function, it is for use by cros_async to provide
93     /// directional wrapper types only. Using it in any other context may
94     /// violate concurrency assumptions. (Type splitting across crates has put
95     /// us in a situation where we can't use Rust privacy to enforce this.)
96     #[deprecated]
into_tube(self) -> Tube97     pub fn into_tube(self) -> Tube {
98         self.0
99     }
100 }
101 
102 impl ReadNotifier for RecvTube {
get_read_notifier(&self) -> &dyn AsRawDescriptor103     fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
104         self.0.get_read_notifier()
105     }
106 }
107 
108 #[sorted]
109 #[derive(ThisError, Debug)]
110 pub enum Error {
111     #[cfg(windows)]
112     #[error("attempt to duplicate descriptor via broker failed")]
113     BrokerDupDescriptor,
114     #[error("failed to clone transport: {0}")]
115     Clone(io::Error),
116     #[error("tube was disconnected")]
117     Disconnected,
118     #[error("failed to duplicate descriptor: {0}")]
119     DupDescriptor(io::Error),
120     #[cfg(windows)]
121     #[error("failed to flush named pipe: {0}")]
122     Flush(io::Error),
123     #[cfg(unix)]
124     #[error("byte framing mode is not supported")]
125     InvalidFramingMode,
126     #[error("failed to serialize/deserialize json from packet: {0}")]
127     Json(serde_json::Error),
128     #[error("cancelled a queued async operation")]
129     OperationCancelled,
130     #[error("failed to crate tube pair: {0}")]
131     Pair(io::Error),
132     #[cfg(any(windows, feature = "proto_tube"))]
133     #[error("encountered protobuf error: {0}")]
134     Proto(protobuf::Error),
135     #[error("failed to receive packet: {0}")]
136     Recv(io::Error),
137     #[error("attempted to receive too many file descriptors")]
138     RecvTooManyFds,
139     #[error("Received a message with a zero sized body. This should not happen.")]
140     RecvUnexpectedEmptyBody,
141     #[error("failed to send packet: {0}")]
142     Send(io::Error),
143     #[error("failed to write packet to intermediate buffer: {0}")]
144     SendIoBuf(io::Error),
145     #[error("attempted to send too many file descriptors")]
146     SendTooManyFds,
147     #[error("failed to set recv timeout: {0}")]
148     SetRecvTimeout(io::Error),
149     #[error("failed to set send timeout: {0}")]
150     SetSendTimeout(io::Error),
151 }
152 
153 pub type Result<T> = std::result::Result<T, Error>;
154