xref: /aosp_15_r20/external/crosvm/base/src/sys/windows/tube.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::io;
6 use std::io::Cursor;
7 use std::io::Read;
8 use std::io::Write;
9 use std::mem;
10 use std::os::windows::io::AsRawHandle;
11 use std::os::windows::io::RawHandle;
12 use std::time::Duration;
13 
14 use log::warn;
15 use serde::de::DeserializeOwned;
16 use serde::Deserialize;
17 use serde::Serialize;
18 use serde::Serializer;
19 use winapi::shared::winerror::ERROR_MORE_DATA;
20 use zerocopy::AsBytes;
21 use zerocopy::FromBytes;
22 use zerocopy::FromZeroes;
23 
24 use crate::descriptor::AsRawDescriptor;
25 use crate::descriptor::FromRawDescriptor;
26 use crate::descriptor::SafeDescriptor;
27 use crate::descriptor_reflection::deserialize_with_descriptors;
28 use crate::descriptor_reflection::SerializeDescriptors;
29 use crate::tube::Error;
30 use crate::tube::RecvTube;
31 use crate::tube::Result;
32 use crate::tube::SendTube;
33 use crate::BlockingMode;
34 use crate::CloseNotifier;
35 use crate::Event;
36 use crate::EventToken;
37 use crate::FramingMode;
38 use crate::PipeConnection;
39 use crate::RawDescriptor;
40 use crate::ReadNotifier;
41 use crate::StreamChannel;
42 
43 /// Bidirectional tube that support both send and recv.
44 ///
45 /// NOTE: serializing this type across processes is slightly involved. Suppose there is a Tube pair
46 /// (A, B). We wish to send B to another process, and communicate with it using A from the current
47 /// process:
48 ///     1. B's target_pid must be set to the current PID *before* serialization. There is a
49 ///        serialization hook that sets it to the current PID automatically if target_pid is unset.
50 ///     2. A's target_pid must be set to the PID of the process where B was sent.
51 ///
52 /// If instead you are sending both A and B to separate processes, then:
53 ///     1. A's target_pid must be set to B's pid, manually.
54 ///     2. B's target_pid must be set to A's pid, manually.
55 ///
56 /// Automating all of this and getting a completely clean interface is tricky. We would need
57 /// intercept the serialization of Tubes in any part of Serde messages, and use Weak refs to sync
58 /// state about PIDs between the ends. There are alternatives like reusing the underlying
59 /// StreamChannel to share PIDs, or having a separate pipe just for this purpose; however, we've yet
60 /// to find a compelling solution that isn't a mess to implement. Suggestions are welcome.
61 #[derive(Serialize, Deserialize, Debug)]
62 pub struct Tube {
63     socket: StreamChannel,
64 
65     // Default target_pid to current PID on serialization (see `Tube` comment header for details).
66     #[serde(serialize_with = "set_tube_pid_on_serialize")]
67     target_pid: Option<u32>,
68 }
69 
70 /// For a Tube which has not had its target_pid set, when it is serialized, we should automatically
71 /// default it to the current process, because the other end will be in the current process.
set_tube_pid_on_serialize<S>( existing_pid_value: &Option<u32>, serializer: S, ) -> std::result::Result<S::Ok, S::Error> where S: Serializer,72 fn set_tube_pid_on_serialize<S>(
73     existing_pid_value: &Option<u32>,
74     serializer: S,
75 ) -> std::result::Result<S::Ok, S::Error>
76 where
77     S: Serializer,
78 {
79     match existing_pid_value {
80         Some(pid) => serializer.serialize_u32(*pid),
81         None => serializer.serialize_u32(ALIAS_PID.lock().unwrap_or(std::process::id())),
82     }
83 }
84 
85 #[derive(Copy, Clone, Debug, Default, AsBytes, FromZeroes, FromBytes)]
86 #[repr(C)]
87 struct MsgHeader {
88     msg_json_size: usize,
89     descriptor_json_size: usize,
90 }
91 
92 static DH_TUBE: sync::Mutex<Option<DuplicateHandleTube>> = sync::Mutex::new(None);
93 static ALIAS_PID: sync::Mutex<Option<u32>> = sync::Mutex::new(None);
94 
95 /// Set a tube to delegate duplicate handle calls.
set_duplicate_handle_tube(dh_tube: DuplicateHandleTube)96 pub fn set_duplicate_handle_tube(dh_tube: DuplicateHandleTube) {
97     DH_TUBE.lock().replace(dh_tube);
98 }
99 
100 /// Set alias pid for use with a DuplicateHandleTube.
set_alias_pid(alias_pid: u32)101 pub fn set_alias_pid(alias_pid: u32) {
102     ALIAS_PID.lock().replace(alias_pid);
103 }
104 
105 impl Tube {
106     /// Create a pair of connected tubes. Request is sent in one direction while response is
107     /// received in the other direction.
108     /// The result is in the form (server, client).
pair() -> Result<(Tube, Tube)>109     pub fn pair() -> Result<(Tube, Tube)> {
110         let (socket1, socket2) = StreamChannel::pair(BlockingMode::Blocking, FramingMode::Message)
111             .map_err(|e| Error::Pair(io::Error::from_raw_os_error(e.errno())))?;
112 
113         Ok((Tube::new(socket1), Tube::new(socket2)))
114     }
115 
116     /// Create a pair of connected tubes with the specified buffer size.
117     /// Request is sent in one direction while response is received in the other direction.
118     /// The result is in the form (server, client).
pair_with_buffer_size(buffer_size: usize) -> Result<(Tube, Tube)>119     pub fn pair_with_buffer_size(buffer_size: usize) -> Result<(Tube, Tube)> {
120         let (socket1, socket2) = StreamChannel::pair_with_buffer_size(
121             BlockingMode::Blocking,
122             FramingMode::Message,
123             buffer_size,
124         )
125         .map_err(|e| Error::Pair(io::Error::from_raw_os_error(e.errno())))?;
126         let tube1 = Tube::new(socket1);
127         let tube2 = Tube::new(socket2);
128         Ok((tube1, tube2))
129     }
130 
131     // Create a new `Tube`.
new(socket: StreamChannel) -> Tube132     pub fn new(socket: StreamChannel) -> Tube {
133         Tube {
134             socket,
135             target_pid: None,
136         }
137     }
138 
try_clone(&self) -> Result<Self>139     pub(crate) fn try_clone(&self) -> Result<Self> {
140         Ok(Tube {
141             socket: self.socket.try_clone().map_err(Error::Clone)?,
142             target_pid: self.target_pid,
143         })
144     }
145 
send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()>146     fn send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()> {
147         let bytes = msg.write_to_bytes().map_err(Error::Proto)?;
148         let size_header = bytes.len();
149 
150         let mut data_packet =
151             Cursor::new(Vec::with_capacity(mem::size_of::<usize>() + size_header));
152         data_packet
153             .write(&size_header.to_le_bytes())
154             .map_err(Error::from_send_io_buf_error)?;
155         data_packet.write(&bytes).map_err(Error::SendIoBuf)?;
156         self.socket
157             .write_immutable(&data_packet.into_inner())
158             .map_err(Error::from_send_error)?;
159 
160         Ok(())
161     }
162 
recv_proto<M: protobuf::Message>(&self) -> Result<M>163     fn recv_proto<M: protobuf::Message>(&self) -> Result<M> {
164         let mut header_bytes = [0u8; mem::size_of::<usize>()];
165         perform_read(&mut |buf| (&self.socket).read(buf), &mut header_bytes)
166             .map_err(Error::from_recv_io_error)?;
167         let size_header = usize::from_le_bytes(header_bytes);
168 
169         let mut proto_bytes = vec![0u8; size_header];
170         perform_read(&mut |buf| (&self.socket).read(buf), &mut proto_bytes)
171             .map_err(Error::from_recv_io_error)?;
172         protobuf::Message::parse_from_bytes(&proto_bytes).map_err(Error::Proto)
173     }
174 
send<T: Serialize>(&self, msg: &T) -> Result<()>175     pub fn send<T: Serialize>(&self, msg: &T) -> Result<()> {
176         serialize_and_send(|buf| self.socket.write_immutable(buf), msg, self.target_pid)
177     }
178 
recv<T: DeserializeOwned>(&self) -> Result<T>179     pub fn recv<T: DeserializeOwned>(&self) -> Result<T> {
180         deserialize_and_recv(|buf| (&self.socket).read(buf))
181     }
182 
183     /// NOTE: On Windows this will only succeed if called on a server pipe. See #pair
184     /// documentation to ensure you have a server pipe before calling.
185     #[cfg(windows)]
flush_blocking(&mut self) -> Result<()>186     pub fn flush_blocking(&mut self) -> Result<()> {
187         self.socket.flush_blocking().map_err(Error::Flush)
188     }
189 
190     /// For Tubes that span processes, this method must be used to set the PID of the other end
191     /// of the Tube, otherwise sending handles to the other end won't work.
set_target_pid(&mut self, target_pid: u32)192     pub fn set_target_pid(&mut self, target_pid: u32) {
193         self.target_pid = Some(target_pid);
194     }
195 
196     /// Returns the PID of the process at the other end of the Tube, if any is set.
target_pid(&self) -> Option<u32>197     pub fn target_pid(&self) -> Option<u32> {
198         self.target_pid
199     }
200 
201     /// TODO(b/145998747, b/184398671): this method should be removed.
set_send_timeout(&self, _timeout: Option<Duration>) -> Result<()>202     pub fn set_send_timeout(&self, _timeout: Option<Duration>) -> Result<()> {
203         unimplemented!("To be removed/refactored upstream.");
204     }
205 
206     /// TODO(b/145998747, b/184398671): this method should be removed.
set_recv_timeout(&self, _timeout: Option<Duration>) -> Result<()>207     pub fn set_recv_timeout(&self, _timeout: Option<Duration>) -> Result<()> {
208         unimplemented!("To be removed/refactored upstream.");
209     }
210 
get_read_notifier_event(&self) -> &Event211     pub fn get_read_notifier_event(&self) -> &Event {
212         self.socket.get_read_notifier_event()
213     }
214 
get_close_notifier_event(&self) -> &Event215     pub fn get_close_notifier_event(&self) -> &Event {
216         self.socket.get_close_notifier_event()
217     }
218 }
219 
serialize_and_send<T: Serialize, F: Fn(&[u8]) -> io::Result<usize>>( write_fn: F, msg: &T, target_pid: Option<u32>, ) -> Result<()>220 pub fn serialize_and_send<T: Serialize, F: Fn(&[u8]) -> io::Result<usize>>(
221     write_fn: F,
222     msg: &T,
223     target_pid: Option<u32>,
224 ) -> Result<()> {
225     let msg_serialize = SerializeDescriptors::new(&msg);
226     let msg_json = serde_json::to_vec(&msg_serialize).map_err(Error::Json)?;
227     let msg_descriptors = msg_serialize.into_descriptors();
228 
229     let mut duped_descriptors = Vec::with_capacity(msg_descriptors.len());
230     for desc in msg_descriptors {
231         // Safe because these handles are guaranteed to be valid. Details:
232         // 1. They come from base::descriptor_reflection::with_as_descriptor.
233         // 2. with_as_descriptor is intended to be applied to owned descriptor types (e.g. File,
234         //    SafeDescriptor).
235         // 3. The owning object is borrowed by msg until sending is complete.
236         duped_descriptors.push(duplicate_handle(desc, target_pid)? as usize)
237     }
238 
239     let descriptor_json = if duped_descriptors.is_empty() {
240         None
241     } else {
242         Some(serde_json::to_vec(&duped_descriptors).map_err(Error::Json)?)
243     };
244 
245     let header = MsgHeader {
246         msg_json_size: msg_json.len(),
247         descriptor_json_size: descriptor_json.as_ref().map_or(0, |json| json.len()),
248     };
249 
250     let mut data_packet = Cursor::new(Vec::with_capacity(
251         header.as_bytes().len() + header.msg_json_size + header.descriptor_json_size,
252     ));
253     data_packet
254         .write(header.as_bytes())
255         .map_err(Error::SendIoBuf)?;
256     data_packet
257         .write(msg_json.as_slice())
258         .map_err(Error::SendIoBuf)?;
259     if let Some(descriptor_json) = descriptor_json {
260         data_packet
261             .write(descriptor_json.as_slice())
262             .map_err(Error::SendIoBuf)?;
263     }
264 
265     // Multiple writers (producers) are safe because each write is atomic.
266     let data_bytes = data_packet.into_inner();
267 
268     write_fn(&data_bytes).map_err(Error::from_send_error)?;
269     Ok(())
270 }
271 
duplicate_handle(desc: RawHandle, target_pid: Option<u32>) -> Result<RawHandle>272 fn duplicate_handle(desc: RawHandle, target_pid: Option<u32>) -> Result<RawHandle> {
273     match target_pid {
274         Some(pid) => match &*DH_TUBE.lock() {
275             Some(tube) => tube.request_duplicate_handle(pid, desc),
276             None => {
277                 win_util::duplicate_handle_with_target_pid(desc, pid).map_err(Error::DupDescriptor)
278             }
279         },
280         None => win_util::duplicate_handle(desc).map_err(Error::DupDescriptor),
281     }
282 }
283 
284 /// Reads a part of a Tube packet asserting that it was correctly read. This means:
285 /// * Treats partial "message" (transport framing) reads are Ok, as long as we filled our buffer. We
286 ///   use this to ignore errors when reading the message header, which has the lengths we need to
287 ///   allocate our buffers for the remainder of the message.
288 /// * We filled the supplied buffer.
perform_read<F: FnMut(&mut [u8]) -> io::Result<usize>>( read_fn: &mut F, buf: &mut [u8], ) -> io::Result<usize>289 fn perform_read<F: FnMut(&mut [u8]) -> io::Result<usize>>(
290     read_fn: &mut F,
291     buf: &mut [u8],
292 ) -> io::Result<usize> {
293     let bytes_read = match read_fn(buf) {
294         Ok(s) => Ok(s),
295         Err(e)
296             if e.raw_os_error()
297                 .map_or(false, |errno| errno == ERROR_MORE_DATA as i32) =>
298         {
299             Ok(buf.len())
300         }
301         Err(e) => Err(e),
302     }?;
303 
304     if bytes_read != buf.len() {
305         Err(io::Error::new(
306             io::ErrorKind::UnexpectedEof,
307             format!(
308                 "failed to fill whole buffer, expected {} got {}",
309                 buf.len(),
310                 bytes_read
311             ),
312         ))
313     } else {
314         Ok(bytes_read)
315     }
316 }
317 
318 /// Deserializes a Tube packet by calling the supplied read function. This function MUST
319 /// assert that the buffer was filled.
deserialize_and_recv<T: DeserializeOwned, F: FnMut(&mut [u8]) -> io::Result<usize>>( mut read_fn: F, ) -> Result<T>320 pub fn deserialize_and_recv<T: DeserializeOwned, F: FnMut(&mut [u8]) -> io::Result<usize>>(
321     mut read_fn: F,
322 ) -> Result<T> {
323     let mut header = MsgHeader::default();
324     perform_read(&mut read_fn, header.as_bytes_mut()).map_err(Error::from_recv_io_error)?;
325 
326     let mut msg_json = vec![0u8; header.msg_json_size];
327     perform_read(&mut read_fn, msg_json.as_mut_slice()).map_err(Error::from_recv_io_error)?;
328 
329     if msg_json.is_empty() {
330         // This means we got a message header, but there is no json body (due to a zero size in
331         // the header). This should never happen because it means the receiver is getting no
332         // data whatsoever from the sender.
333         return Err(Error::RecvUnexpectedEmptyBody);
334     }
335 
336     let descriptor_usizes: Vec<usize> = if header.descriptor_json_size > 0 {
337         let mut msg_descriptors_json = vec![0u8; header.descriptor_json_size];
338         perform_read(&mut read_fn, msg_descriptors_json.as_mut_slice())
339             .map_err(Error::from_recv_io_error)?;
340         serde_json::from_slice(msg_descriptors_json.as_slice()).map_err(Error::Json)?
341     } else {
342         Vec::new()
343     };
344 
345     let msg_descriptors = descriptor_usizes.into_iter().map(|item| {
346         // SAFETY: the usizes are RawDescriptors that were duplicated and converted to usize in the
347         // send method.
348         unsafe { SafeDescriptor::from_raw_descriptor(item as RawDescriptor) }
349     });
350 
351     deserialize_with_descriptors(|| serde_json::from_slice(&msg_json), msg_descriptors)
352         .map_err(Error::Json)
353 }
354 
355 #[derive(EventToken, Eq, PartialEq, Copy, Clone)]
356 enum Token {
357     SocketReady,
358 }
359 
360 impl AsRawDescriptor for Tube {
as_raw_descriptor(&self) -> RawDescriptor361     fn as_raw_descriptor(&self) -> RawDescriptor {
362         self.socket.as_raw_descriptor()
363     }
364 }
365 
366 impl AsRawHandle for Tube {
as_raw_handle(&self) -> RawHandle367     fn as_raw_handle(&self) -> RawHandle {
368         self.as_raw_descriptor()
369     }
370 }
371 
372 impl ReadNotifier for Tube {
get_read_notifier(&self) -> &dyn AsRawDescriptor373     fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
374         self.socket.get_read_notifier()
375     }
376 }
377 
378 impl CloseNotifier for Tube {
get_close_notifier(&self) -> &dyn AsRawDescriptor379     fn get_close_notifier(&self) -> &dyn AsRawDescriptor {
380         self.socket.get_close_notifier()
381     }
382 }
383 
384 impl AsRawDescriptor for SendTube {
as_raw_descriptor(&self) -> RawDescriptor385     fn as_raw_descriptor(&self) -> RawDescriptor {
386         self.0.as_raw_descriptor()
387     }
388 }
389 
390 impl AsRawDescriptor for RecvTube {
as_raw_descriptor(&self) -> RawDescriptor391     fn as_raw_descriptor(&self) -> RawDescriptor {
392         self.0.as_raw_descriptor()
393     }
394 }
395 
396 impl CloseNotifier for SendTube {
get_close_notifier(&self) -> &dyn AsRawDescriptor397     fn get_close_notifier(&self) -> &dyn AsRawDescriptor {
398         self.0.get_close_notifier()
399     }
400 }
401 
402 impl CloseNotifier for RecvTube {
get_close_notifier(&self) -> &dyn AsRawDescriptor403     fn get_close_notifier(&self) -> &dyn AsRawDescriptor {
404         self.0.get_close_notifier()
405     }
406 }
407 
408 /// A request to duplicate a handle to a target process.
409 #[derive(Serialize, Deserialize, Debug)]
410 pub struct DuplicateHandleRequest {
411     pub target_alias_pid: u32,
412     pub handle: usize,
413 }
414 
415 /// Contains a duplicated handle or None if an error occurred.
416 #[derive(Serialize, Deserialize, Debug)]
417 pub struct DuplicateHandleResponse {
418     pub handle: Option<usize>,
419 }
420 
421 /// Wrapper for tube which is used to delegate DuplicateHandle function calls to
422 /// the broker process.
423 #[derive(Serialize, Deserialize, Debug)]
424 pub struct DuplicateHandleTube(Tube);
425 
426 impl DuplicateHandleTube {
new(tube: Tube) -> Self427     pub fn new(tube: Tube) -> Self {
428         Self(tube)
429     }
430 
request_duplicate_handle( &self, target_alias_pid: u32, handle: RawHandle, ) -> Result<RawHandle>431     pub fn request_duplicate_handle(
432         &self,
433         target_alias_pid: u32,
434         handle: RawHandle,
435     ) -> Result<RawHandle> {
436         let req = DuplicateHandleRequest {
437             target_alias_pid,
438             handle: handle as usize,
439         };
440         self.0.send(&req)?;
441         let res: DuplicateHandleResponse = self.0.recv()?;
442         res.handle
443             .map(|h| h as RawHandle)
444             .ok_or(Error::BrokerDupDescriptor)
445     }
446 }
447 
448 /// Wrapper for Tube used for sending and recving protos. The main usecase is to send a message
449 /// without serialization bloat caused from `serde-json`.
450 #[derive(Serialize, Deserialize)]
451 pub struct ProtoTube(Tube);
452 
453 impl ProtoTube {
pair() -> Result<(ProtoTube, ProtoTube)>454     pub fn pair() -> Result<(ProtoTube, ProtoTube)> {
455         Tube::pair().map(|(t1, t2)| (ProtoTube(t1), ProtoTube(t2)))
456     }
457 
pair_with_buffer_size(size: usize) -> Result<(ProtoTube, ProtoTube)>458     pub fn pair_with_buffer_size(size: usize) -> Result<(ProtoTube, ProtoTube)> {
459         Tube::pair_with_buffer_size(size).map(|(t1, t2)| (ProtoTube(t1), ProtoTube(t2)))
460     }
461 
send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()>462     pub fn send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()> {
463         self.0.send_proto(msg)
464     }
465 
recv_proto<M: protobuf::Message>(&self) -> Result<M>466     pub fn recv_proto<M: protobuf::Message>(&self) -> Result<M> {
467         self.0.recv_proto()
468     }
469 }
470 
471 impl ReadNotifier for ProtoTube {
get_read_notifier(&self) -> &dyn AsRawDescriptor472     fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
473         self.0.get_read_notifier()
474     }
475 }
476 
477 impl AsRawDescriptor for ProtoTube {
as_raw_descriptor(&self) -> RawDescriptor478     fn as_raw_descriptor(&self) -> RawDescriptor {
479         self.0.as_raw_descriptor()
480     }
481 }
482 
483 /// A wrapper around a named pipe that uses Tube serialization.
484 ///
485 /// This limited form of `Tube` offers absolutely no notifier support, and can only send/recv
486 /// blocking messages.
487 pub struct PipeTube {
488     pipe: PipeConnection,
489 
490     // Default target_pid to current PID on serialization (see `Tube` comment header for details).
491     target_pid: Option<u32>,
492 }
493 
494 impl PipeTube {
from(pipe: PipeConnection, target_pid: Option<u32>) -> Self495     pub fn from(pipe: PipeConnection, target_pid: Option<u32>) -> Self {
496         Self { pipe, target_pid }
497     }
498 
send<T: Serialize>(&self, msg: &T) -> Result<()>499     pub fn send<T: Serialize>(&self, msg: &T) -> Result<()> {
500         serialize_and_send(|buf| self.pipe.write(buf), msg, self.target_pid)
501     }
502 
recv<T: DeserializeOwned>(&self) -> Result<T>503     pub fn recv<T: DeserializeOwned>(&self) -> Result<T> {
504         deserialize_and_recv(|buf| {
505             // SAFETY:
506             // 1. We are reading bytes, so no matter what data is on the pipe, it is representable
507             //    as bytes.
508             // 2. A read is quantized in bytes, so no partial reads are possible.
509             unsafe { self.pipe.read(buf) }
510         })
511     }
512 }
513 
514 /// Wrapper around a Tube which is known to be the server end of a named pipe. This wrapper ensures
515 /// that the Tube is flushed before it is dropped.
516 pub struct FlushOnDropTube(pub Tube);
517 
518 impl FlushOnDropTube {
from(tube: Tube) -> Self519     pub fn from(tube: Tube) -> Self {
520         Self(tube)
521     }
522 }
523 
524 impl Drop for FlushOnDropTube {
drop(&mut self)525     fn drop(&mut self) {
526         if let Err(e) = self.0.flush_blocking() {
527             warn!("failed to flush Tube: {}", e)
528         }
529     }
530 }
531 
532 impl Error {
map_io_error(e: io::Error, err_ctor: fn(io::Error) -> Error) -> Error533     fn map_io_error(e: io::Error, err_ctor: fn(io::Error) -> Error) -> Error {
534         if e.kind() == io::ErrorKind::UnexpectedEof || e.kind() == io::ErrorKind::BrokenPipe {
535             Error::Disconnected
536         } else {
537             err_ctor(e)
538         }
539     }
540 
from_recv_io_error(e: io::Error) -> Error541     fn from_recv_io_error(e: io::Error) -> Error {
542         Self::map_io_error(e, Error::Recv)
543     }
544 
from_send_error(e: io::Error) -> Error545     fn from_send_error(e: io::Error) -> Error {
546         Self::map_io_error(e, Error::Send)
547     }
548 
from_send_io_buf_error(e: io::Error) -> Error549     fn from_send_io_buf_error(e: io::Error) -> Error {
550         Self::map_io_error(e, Error::SendIoBuf)
551     }
552 }
553