xref: /aosp_15_r20/external/crosvm/base/src/sys/windows/named_pipes.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::ffi::CString;
6 use std::fs::OpenOptions;
7 use std::io;
8 use std::io::Result;
9 use std::mem;
10 use std::os::windows::fs::OpenOptionsExt;
11 use std::process;
12 use std::ptr;
13 use std::sync::atomic::AtomicBool;
14 use std::sync::atomic::AtomicUsize;
15 use std::sync::atomic::Ordering;
16 use std::sync::Arc;
17 
18 use rand::Rng;
19 use serde::Deserialize;
20 use serde::Serialize;
21 use sync::Mutex;
22 use win_util::fail_if_zero;
23 use win_util::SecurityAttributes;
24 use win_util::SelfRelativeSecurityDescriptor;
25 use winapi::shared::minwindef::DWORD;
26 use winapi::shared::minwindef::FALSE;
27 use winapi::shared::minwindef::TRUE;
28 use winapi::shared::winerror::ERROR_BROKEN_PIPE;
29 use winapi::shared::winerror::ERROR_IO_INCOMPLETE;
30 use winapi::shared::winerror::ERROR_IO_PENDING;
31 use winapi::shared::winerror::ERROR_MORE_DATA;
32 use winapi::shared::winerror::ERROR_NO_DATA;
33 use winapi::shared::winerror::ERROR_PIPE_CONNECTED;
34 use winapi::um::errhandlingapi::GetLastError;
35 use winapi::um::fileapi::FlushFileBuffers;
36 use winapi::um::handleapi::INVALID_HANDLE_VALUE;
37 use winapi::um::ioapiset::CancelIoEx;
38 use winapi::um::ioapiset::GetOverlappedResult;
39 use winapi::um::minwinbase::OVERLAPPED;
40 use winapi::um::namedpipeapi::ConnectNamedPipe;
41 use winapi::um::namedpipeapi::DisconnectNamedPipe;
42 use winapi::um::namedpipeapi::GetNamedPipeInfo;
43 use winapi::um::namedpipeapi::PeekNamedPipe;
44 use winapi::um::namedpipeapi::SetNamedPipeHandleState;
45 use winapi::um::winbase::CreateNamedPipeA;
46 use winapi::um::winbase::FILE_FLAG_FIRST_PIPE_INSTANCE;
47 use winapi::um::winbase::FILE_FLAG_OVERLAPPED;
48 use winapi::um::winbase::PIPE_ACCESS_DUPLEX;
49 use winapi::um::winbase::PIPE_NOWAIT;
50 use winapi::um::winbase::PIPE_READMODE_BYTE;
51 use winapi::um::winbase::PIPE_READMODE_MESSAGE;
52 use winapi::um::winbase::PIPE_REJECT_REMOTE_CLIENTS;
53 use winapi::um::winbase::PIPE_TYPE_BYTE;
54 use winapi::um::winbase::PIPE_TYPE_MESSAGE;
55 use winapi::um::winbase::PIPE_WAIT;
56 use winapi::um::winbase::SECURITY_IDENTIFICATION;
57 
58 use super::RawDescriptor;
59 use crate::descriptor::AsRawDescriptor;
60 use crate::descriptor::FromRawDescriptor;
61 use crate::descriptor::IntoRawDescriptor;
62 use crate::descriptor::SafeDescriptor;
63 use crate::Event;
64 use crate::EventToken;
65 use crate::WaitContext;
66 
67 /// The default buffer size for all named pipes in the system. If this size is too small, writers
68 /// on named pipes that expect not to block *can* block until the reading side empties the buffer.
69 ///
70 /// The general rule is this should be *at least* as big as the largest message, otherwise
71 /// unexpected blocking behavior can result; for example, if too small, this can interact badly with
72 /// crate::windows::StreamChannel, which expects to be able to make a complete write before
73 /// releasing a lock that the opposite side needs to complete a read. This means that if the buffer
74 /// is too small:
75 ///     * The writer can't complete its write and release the lock because the buffer is too small.
76 ///     * The reader can't start reading because the lock is held by the writer, so it can't relieve
77 ///       buffer pressure. Note that for message pipes, the reader couldn't do anything to help
78 ///       anyway, because a message mode pipe should NOT have a partial read (which is what we would
79 ///       need to relieve pressure).
80 ///     * Conditions for deadlock are met, and both the reader & writer enter circular waiting.
81 pub const DEFAULT_BUFFER_SIZE: usize = 50 * 1024;
82 
83 static NEXT_PIPE_INDEX: AtomicUsize = AtomicUsize::new(1);
84 
85 /// Represents one end of a named pipe
86 #[derive(Serialize, Deserialize, Debug)]
87 pub struct PipeConnection {
88     handle: SafeDescriptor,
89     framing_mode: FramingMode,
90     blocking_mode: BlockingMode,
91 }
92 
93 /// `OVERLAPPED` is allocated on the heap because it must not move while performing I/O operations.
94 ///
95 /// Defined as a separate type so that we can mark it as `Send` and `Sync`.
96 pub struct BoxedOverlapped(pub Box<OVERLAPPED>);
97 
98 // SAFETY: `OVERLAPPED` is not automatically `Send` because it contains a `HANDLE`, which is a raw
99 // pointer, but `HANDLE`s are safe to move between threads and thus so is `OVERLAPPED`.
100 unsafe impl Send for BoxedOverlapped {}
101 
102 // SAFETY: See the argument for `Send` above. `HANDLE`s are also safe to share between threads.
103 unsafe impl Sync for BoxedOverlapped {}
104 
105 /// Wraps the OVERLAPPED structure. Also keeps track of whether OVERLAPPED is being used by a
106 /// Readfile or WriteFile operation and holds onto the event object so it doesn't get dropped.
107 pub struct OverlappedWrapper {
108     overlapped: BoxedOverlapped,
109     // This field prevents the event handle from being dropped too early and allows callers to
110     // be notified when a read or write overlapped operation has completed.
111     h_event: Option<Event>,
112     in_use: bool,
113 }
114 
115 impl OverlappedWrapper {
get_h_event_ref(&self) -> Option<&Event>116     pub fn get_h_event_ref(&self) -> Option<&Event> {
117         self.h_event.as_ref()
118     }
119 
120     /// Creates a valid `OVERLAPPED` struct used to pass into `ReadFile` and `WriteFile` in order
121     /// to perform asynchronous I/O. When passing in the OVERLAPPED struct, the Event object
122     /// returned must not be dropped.
123     ///
124     /// There is an option to create the event object and set it to the `hEvent` field. If hEvent
125     /// is not set and the named pipe handle was created with `FILE_FLAG_OVERLAPPED`, then the file
126     /// handle will be signaled when the operation is complete. In other words, you can use
127     /// `WaitForSingleObject` on the file handle. Not setting an event is highly discouraged by
128     /// Microsoft though.
new(include_event: bool) -> Result<OverlappedWrapper>129     pub fn new(include_event: bool) -> Result<OverlappedWrapper> {
130         let mut overlapped = OVERLAPPED::default();
131         let h_event = if include_event {
132             Some(Event::new()?)
133         } else {
134             None
135         };
136 
137         overlapped.hEvent = if let Some(event) = h_event.as_ref() {
138             event.as_raw_descriptor()
139         } else {
140             0 as RawDescriptor
141         };
142 
143         Ok(OverlappedWrapper {
144             overlapped: BoxedOverlapped(Box::new(overlapped)),
145             h_event,
146             in_use: false,
147         })
148     }
149 }
150 
151 pub trait WriteOverlapped {
152     /// Perform an overlapped write operation with the specified buffer and overlapped wrapper.
153     /// If successful, the write operation will complete asynchronously, and
154     /// `write_result()` should be called to get the result.
155     ///
156     /// # Safety
157     /// `buf` and `overlapped_wrapper` will be in use for the duration of
158     /// the overlapped operation. These must not be reused and must live until
159     /// after `write_result()` has been called.
write_overlapped( &mut self, buf: &mut [u8], overlapped_wrapper: &mut OverlappedWrapper, ) -> io::Result<()>160     unsafe fn write_overlapped(
161         &mut self,
162         buf: &mut [u8],
163         overlapped_wrapper: &mut OverlappedWrapper,
164     ) -> io::Result<()>;
165 
166     /// Gets the result of the overlapped write operation. Must only be called
167     /// after issuing an overlapped write operation using `write_overlapped`. The
168     /// same `overlapped_wrapper` must be provided.
write_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>169     fn write_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>;
170 
171     /// Tries to get the result of the overlapped write operation. Must only be
172     /// called once, and only after issuing an overlapped write operation using
173     /// `write_overlapped`. The same `overlapped_wrapper` must be provided.
174     ///
175     /// An error indicates that the operation hasn't completed yet and
176     /// `write_result` or `try_write_result` should be called again.
try_write_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>177     fn try_write_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper)
178         -> io::Result<usize>;
179 }
180 
181 pub trait ReadOverlapped {
182     /// Perform an overlapped read operation with the specified buffer and overlapped wrapper.
183     /// If successful, the read operation will complete asynchronously, and
184     /// `read_result()` should be called to get the result.
185     ///
186     /// # Safety
187     /// `buf` and `overlapped_wrapper` will be in use for the duration of
188     /// the overlapped operation. These must not be reused and must live until
189     /// after `read_result()` has been called.
read_overlapped( &mut self, buf: &mut [u8], overlapped_wrapper: &mut OverlappedWrapper, ) -> io::Result<()>190     unsafe fn read_overlapped(
191         &mut self,
192         buf: &mut [u8],
193         overlapped_wrapper: &mut OverlappedWrapper,
194     ) -> io::Result<()>;
195 
196     /// Gets the result of the overlapped read operation. Must only be called
197     /// once, and only after issuing an overlapped read operation using
198     /// `read_overlapped`. The same `overlapped_wrapper` must be provided.
read_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>199     fn read_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>;
200 
201     /// Tries to get the result of the overlapped read operation. Must only be called
202     /// after issuing an overlapped read operation using `read_overlapped`. The
203     /// same `overlapped_wrapper` must be provided.
204     ///
205     /// An error indicates that the operation hasn't completed yet and
206     /// `read_result` or `try_read_result` should be called again.
try_read_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>207     fn try_read_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>;
208 }
209 
210 #[derive(Serialize, Deserialize, Copy, Clone, Debug, PartialEq, Eq)]
211 pub enum FramingMode {
212     Byte,
213     Message,
214 }
215 
216 impl FramingMode {
to_readmode(self) -> DWORD217     fn to_readmode(self) -> DWORD {
218         match self {
219             FramingMode::Message => PIPE_READMODE_MESSAGE,
220             FramingMode::Byte => PIPE_READMODE_BYTE,
221         }
222     }
223 
to_pipetype(self) -> DWORD224     fn to_pipetype(self) -> DWORD {
225         match self {
226             FramingMode::Message => PIPE_TYPE_MESSAGE,
227             FramingMode::Byte => PIPE_TYPE_BYTE,
228         }
229     }
230 }
231 
232 #[derive(Serialize, Deserialize, Copy, Clone, PartialEq, Debug, Eq)]
233 pub enum BlockingMode {
234     /// Calls to read() block until data is received
235     Wait,
236     /// Calls to read() return immediately even if there is nothing read with error code 232
237     /// (Rust maps this to BrokenPipe but it's actually ERROR_NO_DATA)
238     ///
239     /// NOTE: This mode is discouraged by the Windows API documentation.
240     NoWait,
241 }
242 
243 impl From<&BlockingMode> for DWORD {
from(blocking_mode: &BlockingMode) -> DWORD244     fn from(blocking_mode: &BlockingMode) -> DWORD {
245         match blocking_mode {
246             BlockingMode::Wait => PIPE_WAIT,
247             BlockingMode::NoWait => PIPE_NOWAIT,
248         }
249     }
250 }
251 
252 /// Sets the handle state for a named pipe in a rust friendly way.
253 /// SAFETY:
254 /// This is safe if the pipe handle is open.
set_named_pipe_handle_state( pipe_handle: RawDescriptor, client_mode: &mut DWORD, ) -> Result<()>255 unsafe fn set_named_pipe_handle_state(
256     pipe_handle: RawDescriptor,
257     client_mode: &mut DWORD,
258 ) -> Result<()> {
259     // Safe when the pipe handle is open. Safety also requires checking the return value, which we
260     // do below.
261     let success_flag = SetNamedPipeHandleState(
262         /* hNamedPipe= */ pipe_handle,
263         /* lpMode= */ client_mode,
264         /* lpMaxCollectionCount= */ ptr::null_mut(),
265         /* lpCollectDataTimeout= */ ptr::null_mut(),
266     );
267     if success_flag == 0 {
268         Err(io::Error::last_os_error())
269     } else {
270         Ok(())
271     }
272 }
273 
pair( framing_mode: &FramingMode, blocking_mode: &BlockingMode, timeout: u64, ) -> Result<(PipeConnection, PipeConnection)>274 pub fn pair(
275     framing_mode: &FramingMode,
276     blocking_mode: &BlockingMode,
277     timeout: u64,
278 ) -> Result<(PipeConnection, PipeConnection)> {
279     pair_with_buffer_size(
280         framing_mode,
281         blocking_mode,
282         timeout,
283         DEFAULT_BUFFER_SIZE,
284         false,
285     )
286 }
287 
288 /// Creates a pair of handles connected to either end of a duplex named pipe.
289 ///
290 /// The pipe created will have a semi-random name and a default set of security options that
291 /// help prevent common named-pipe based vulnerabilities. Specifically the pipe is set to reject
292 /// remote clients, allow only a single server instance, and prevent impersonation by the server
293 /// end of the pipe.
294 ///
295 /// # Arguments
296 ///
297 /// * `framing_mode`  - Whether the system should provide a simple byte stream (Byte) or an
298 ///   automatically framed sequence of messages (Message). In message mode it's an error to read
299 ///   fewer bytes than were sent in a message from the other end of the pipe.
300 /// * `blocking_mode` - Whether the system should wait on read() until data is available (Wait) or
301 ///   return immediately if there is nothing available (NoWait).
302 /// * `timeout`       - A timeout to apply for socket operations, in milliseconds. Setting this to
303 ///   zero will create sockets with the system default timeout.
304 /// * `buffer_size`   - The default buffer size for the named pipe. The system should expand the
305 ///   buffer automatically as needed, except in the case of NOWAIT pipes, where it will just fail
306 ///   writes that don't fit in the buffer.
307 /// # Return value
308 ///
309 /// Returns a pair of pipes, of the form (server, client). Note that for some winapis, such as
310 /// FlushFileBuffers, the server & client ends WILL BEHAVE DIFFERENTLY.
pair_with_buffer_size( framing_mode: &FramingMode, blocking_mode: &BlockingMode, timeout: u64, buffer_size: usize, overlapped: bool, ) -> Result<(PipeConnection, PipeConnection)>311 pub fn pair_with_buffer_size(
312     framing_mode: &FramingMode,
313     blocking_mode: &BlockingMode,
314     timeout: u64,
315     buffer_size: usize,
316     overlapped: bool,
317 ) -> Result<(PipeConnection, PipeConnection)> {
318     // Give the pipe a unique name to avoid accidental collisions
319     let pipe_name = format!(
320         r"\\.\pipe\crosvm_ipc.pid{}.{}.rand{}",
321         process::id(),
322         NEXT_PIPE_INDEX.fetch_add(1, Ordering::SeqCst),
323         rand::thread_rng().gen::<u32>(),
324     );
325 
326     let server_end = create_server_pipe(
327         &pipe_name,
328         framing_mode,
329         blocking_mode,
330         timeout,
331         buffer_size,
332         overlapped,
333     )?;
334 
335     // Open the named pipe we just created as the client
336     let client_end = create_client_pipe(&pipe_name, framing_mode, blocking_mode, overlapped)?;
337 
338     // Accept the client's connection
339     // Not sure if this is strictly needed but I'm doing it just in case.
340     // We expect at this point that the client will already be connected,
341     // so we'll get a return code of 0 and an ERROR_PIPE_CONNECTED.
342     // It's also OK if we get a return code of success.
343     server_end.wait_for_client_connection()?;
344 
345     Ok((server_end, client_end))
346 }
347 
348 /// Creates a PipeConnection for the server end of a named pipe with the given path and pipe
349 /// settings.
350 ///
351 /// The pipe will be set to reject remote clients and allow only a single connection at a time.
352 ///
353 /// # Arguments
354 ///
355 /// * `pipe_name`     - The path of the named pipe to create. Should be in the form
356 ///   `\\.\pipe\<some-name>`.
357 /// * `framing_mode`  - Whether the system should provide a simple byte stream (Byte) or an
358 ///   automatically framed sequence of messages (Message). In message mode it's an error to read
359 ///   fewer bytes than were sent in a message from the other end of the pipe.
360 /// * `blocking_mode` - Whether the system should wait on read() until data is available (Wait) or
361 ///   return immediately if there is nothing available (NoWait).
362 /// * `timeout`       - A timeout to apply for socket operations, in milliseconds. Setting this to
363 ///   zero will create sockets with the system default timeout.
364 /// * `buffer_size`   - The default buffer size for the named pipe. The system should expand the
365 ///   buffer automatically as needed, except in the case of NOWAIT pipes, where it will just fail
366 ///   writes that don't fit in the buffer.
367 /// * `overlapped`    - Sets whether overlapped mode is set on the pipe.
create_server_pipe( pipe_name: &str, framing_mode: &FramingMode, blocking_mode: &BlockingMode, timeout: u64, buffer_size: usize, overlapped: bool, ) -> Result<PipeConnection>368 pub fn create_server_pipe(
369     pipe_name: &str,
370     framing_mode: &FramingMode,
371     blocking_mode: &BlockingMode,
372     timeout: u64,
373     buffer_size: usize,
374     overlapped: bool,
375 ) -> Result<PipeConnection> {
376     let c_pipe_name = CString::new(pipe_name).unwrap();
377 
378     let mut open_mode_flags = PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE;
379     if overlapped {
380         open_mode_flags |= FILE_FLAG_OVERLAPPED
381     }
382 
383     // This sets flags so there will be an error if >1 instance (server end)
384     // of this pipe name is opened because we expect exactly one.
385     // SAFETY:
386     // Safe because security attributes are valid, pipe_name is valid C string,
387     // and we're checking the return code
388     let server_handle = unsafe {
389         CreateNamedPipeA(
390             c_pipe_name.as_ptr(),
391             /* dwOpenMode= */
392             open_mode_flags,
393             /* dwPipeMode= */
394             framing_mode.to_pipetype()
395                 | framing_mode.to_readmode()
396                 | DWORD::from(blocking_mode)
397                 | PIPE_REJECT_REMOTE_CLIENTS,
398             /* nMaxInstances= */ 1,
399             /* nOutBufferSize= */ buffer_size as DWORD,
400             /* nInBufferSize= */ buffer_size as DWORD,
401             /* nDefaultTimeOut= */ timeout as DWORD, // Default is 50ms
402             /* lpSecurityAttributes= */
403             SecurityAttributes::new_with_security_descriptor(
404                 SelfRelativeSecurityDescriptor::get_singleton(),
405                 /* inherit= */ true,
406             )
407             .as_mut(),
408         )
409     };
410 
411     if server_handle == INVALID_HANDLE_VALUE {
412         Err(io::Error::last_os_error())
413     } else {
414         // SAFETY: Safe because server_handle is valid.
415         unsafe {
416             Ok(PipeConnection {
417                 handle: SafeDescriptor::from_raw_descriptor(server_handle),
418                 framing_mode: *framing_mode,
419                 blocking_mode: *blocking_mode,
420             })
421         }
422     }
423 }
424 
425 /// Creates a PipeConnection for the client end of a named pipe with the given path and pipe
426 /// settings.
427 ///
428 /// The pipe will be set to prevent impersonation of the client by the server process.
429 ///
430 /// # Arguments
431 ///
432 /// * `pipe_name`     - The path of the named pipe to create. Should be in the form
433 ///   `\\.\pipe\<some-name>`.
434 /// * `framing_mode`  - Whether the system should provide a simple byte stream (Byte) or an
435 ///   automatically framed sequence of messages (Message). In message mode it's an error to read
436 ///   fewer bytes than were sent in a message from the other end of the pipe.
437 /// * `blocking_mode` - Whether the system should wait on read() until data is available (Wait) or
438 ///   return immediately if there is nothing available (NoWait).
439 /// * `overlapped`    - Sets whether the pipe is opened in overlapped mode.
create_client_pipe( pipe_name: &str, framing_mode: &FramingMode, blocking_mode: &BlockingMode, overlapped: bool, ) -> Result<PipeConnection>440 pub fn create_client_pipe(
441     pipe_name: &str,
442     framing_mode: &FramingMode,
443     blocking_mode: &BlockingMode,
444     overlapped: bool,
445 ) -> Result<PipeConnection> {
446     let client_handle = OpenOptions::new()
447         .read(true)
448         .write(true)
449         .create(true)
450         .security_qos_flags(SECURITY_IDENTIFICATION)
451         .custom_flags(if overlapped { FILE_FLAG_OVERLAPPED } else { 0 })
452         .open(pipe_name)?
453         .into_raw_descriptor();
454 
455     let mut client_mode = framing_mode.to_readmode() | DWORD::from(blocking_mode);
456 
457     // SAFETY:
458     // Safe because client_handle's open() call did not return an error.
459     unsafe {
460         set_named_pipe_handle_state(client_handle, &mut client_mode)?;
461     }
462 
463     Ok(PipeConnection {
464         // SAFETY:
465         // Safe because client_handle is valid
466         handle: unsafe { SafeDescriptor::from_raw_descriptor(client_handle) },
467         framing_mode: *framing_mode,
468         blocking_mode: *blocking_mode,
469     })
470 }
471 
472 // This is used to mark types which can be appropriately sent through the
473 // generic helper functions write_to_pipe and read_from_pipe.
474 pub trait PipeSendable {
475     // Default values used to fill in new empty indexes when resizing a buffer to
476     // a larger size.
default() -> Self477     fn default() -> Self;
478 }
479 impl PipeSendable for u8 {
default() -> Self480     fn default() -> Self {
481         0
482     }
483 }
484 impl PipeSendable for RawDescriptor {
default() -> Self485     fn default() -> Self {
486         ptr::null_mut()
487     }
488 }
489 
490 impl PipeConnection {
try_clone(&self) -> Result<PipeConnection>491     pub fn try_clone(&self) -> Result<PipeConnection> {
492         let copy_handle = self.handle.try_clone()?;
493         Ok(PipeConnection {
494             handle: copy_handle,
495             framing_mode: self.framing_mode,
496             blocking_mode: self.blocking_mode,
497         })
498     }
499 
500     /// Creates a PipeConnection from an existing RawDescriptor, and the underlying the framing &
501     /// blocking modes.
502     ///
503     /// # Safety
504     /// 1. rd is valid and ownership is transferred to this function when it is called.
505     ///
506     /// To avoid undefined behavior, framing_mode & blocking_modes must match those of the
507     /// underlying pipe.
from_raw_descriptor( rd: RawDescriptor, framing_mode: FramingMode, blocking_mode: BlockingMode, ) -> PipeConnection508     pub unsafe fn from_raw_descriptor(
509         rd: RawDescriptor,
510         framing_mode: FramingMode,
511         blocking_mode: BlockingMode,
512     ) -> PipeConnection {
513         PipeConnection {
514             handle: SafeDescriptor::from_raw_descriptor(rd),
515             framing_mode,
516             blocking_mode,
517         }
518     }
519 
520     /// Reads bytes from the pipe into the provided buffer, up to the capacity of the buffer.
521     /// Returns the number of bytes (not values) read.
522     ///
523     /// # Safety
524     ///
525     /// This is safe only when the following conditions hold:
526     ///     1. The data on the other end of the pipe is a valid binary representation of data for
527     ///     type T, and
528     ///     2. The number of bytes read is a multiple of the size of T; this must be checked by
529     ///     the caller.
530     /// If buf's type is file descriptors, this is only safe when those file descriptors are valid
531     /// for the process where this function was called.
read<T: PipeSendable>(&self, buf: &mut [T]) -> Result<usize>532     pub unsafe fn read<T: PipeSendable>(&self, buf: &mut [T]) -> Result<usize> {
533         PipeConnection::read_internal(&self.handle, self.blocking_mode, buf, None)
534     }
535 
536     /// Similar to `PipeConnection::read` except it also allows:
537     ///     1. The same end of the named pipe to read and write at the same time in different
538     ///        threads.
539     ///     2. Asynchronous read and write (read and write won't block).
540     ///
541     /// When reading, it will not block, but instead an `OVERLAPPED` struct that contains an event
542     /// (can be created with `OverlappedWrapper::new`) will be passed into
543     /// `ReadFile`. That event will be triggered when the read operation is complete.
544     ///
545     /// In order to get how many bytes were read, call `get_overlapped_result`. That function will
546     /// also help with waiting until the read operation is complete.
547     ///
548     /// # Safety
549     ///
550     /// Same as `PipeConnection::read` safety comments. In addition, the pipe MUST be opened in
551     /// overlapped mode otherwise there may be unexpected behavior.
read_overlapped<T: PipeSendable>( &mut self, buf: &mut [T], overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()>552     pub unsafe fn read_overlapped<T: PipeSendable>(
553         &mut self,
554         buf: &mut [T],
555         overlapped_wrapper: &mut OverlappedWrapper,
556     ) -> Result<()> {
557         if overlapped_wrapper.in_use {
558             return Err(std::io::Error::new(
559                 std::io::ErrorKind::InvalidInput,
560                 "Overlapped struct already in use",
561             ));
562         }
563         overlapped_wrapper.in_use = true;
564 
565         PipeConnection::read_internal(
566             &self.handle,
567             self.blocking_mode,
568             buf,
569             Some(&mut overlapped_wrapper.overlapped.0),
570         )?;
571         Ok(())
572     }
573 
574     /// Helper for `read_overlapped` and `read`
575     ///
576     /// # Safety
577     /// Comments `read_overlapped` or `read`, depending on which is used.
read_internal<T: PipeSendable>( handle: &SafeDescriptor, blocking_mode: BlockingMode, buf: &mut [T], overlapped: Option<&mut OVERLAPPED>, ) -> Result<usize>578     unsafe fn read_internal<T: PipeSendable>(
579         handle: &SafeDescriptor,
580         blocking_mode: BlockingMode,
581         buf: &mut [T],
582         overlapped: Option<&mut OVERLAPPED>,
583     ) -> Result<usize> {
584         let res = crate::windows::read_file(
585             handle,
586             buf.as_mut_ptr() as *mut u8,
587             mem::size_of_val(buf),
588             overlapped,
589         );
590         match res {
591             Ok(bytes_read) => Ok(bytes_read),
592             // Treat a closed pipe like an EOF.
593             // We check the raw error because `ErrorKind::BrokenPipe` is ambiguous on Windows.
594             Err(e) if e.raw_os_error() == Some(ERROR_BROKEN_PIPE as i32) => Ok(0),
595             Err(e)
596                 if blocking_mode == BlockingMode::NoWait
597                     && e.raw_os_error() == Some(ERROR_NO_DATA as i32) =>
598             {
599                 // A NOWAIT pipe will return ERROR_NO_DATA when no data is available; however,
600                 // this code is interpreted as a std::io::ErrorKind::BrokenPipe, which is not
601                 // correct. For further details see:
602                 // https://docs.microsoft.com/en-us/windows/win32/debug/system-error-codes--0-499-
603                 // https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipe-type-read-and-wait-modes
604                 Err(std::io::Error::new(std::io::ErrorKind::WouldBlock, e))
605             }
606             Err(e) => Err(e),
607         }
608     }
609 
610     /// Blockingly reads a `buf` bytes from the pipe. The blocking read can be interrupted
611     /// by an event on `exit_event`.
read_overlapped_blocking<T: PipeSendable>( &mut self, buf: &mut [T], overlapped_wrapper: &mut OverlappedWrapper, exit_event: &Event, ) -> Result<()>612     pub fn read_overlapped_blocking<T: PipeSendable>(
613         &mut self,
614         buf: &mut [T],
615         overlapped_wrapper: &mut OverlappedWrapper,
616         exit_event: &Event,
617     ) -> Result<()> {
618         // SAFETY:
619         // Safe because we are providing a valid buffer slice and also providing a valid
620         // overlapped struct.
621         match unsafe { self.read_overlapped(buf, overlapped_wrapper) } {
622             // More data isn't necessarily an error as long as we've filled the provided buffer,
623             // as is checked later in this function.
624             Err(e) if e.raw_os_error() == Some(ERROR_MORE_DATA as i32) => Ok(()),
625             Err(e) => Err(e),
626             Ok(()) => Ok(()),
627         }?;
628 
629         #[derive(EventToken)]
630         enum Token {
631             ReadOverlapped,
632             Exit,
633         }
634 
635         let wait_ctx = WaitContext::build_with(&[
636             (
637                 overlapped_wrapper.get_h_event_ref().unwrap(),
638                 Token::ReadOverlapped,
639             ),
640             (exit_event, Token::Exit),
641         ])?;
642 
643         let events = wait_ctx.wait()?;
644         for event in events {
645             match event.token {
646                 Token::ReadOverlapped => {
647                     let size_read_in_bytes =
648                         self.get_overlapped_result(overlapped_wrapper)? as usize;
649 
650                     // If this error shows, most likely the overlapped named pipe was set up
651                     // incorrectly.
652                     if size_read_in_bytes != buf.len() {
653                         return Err(std::io::Error::new(
654                             std::io::ErrorKind::UnexpectedEof,
655                             "Short read",
656                         ));
657                     }
658                 }
659                 Token::Exit => {
660                     return Err(std::io::Error::new(
661                         std::io::ErrorKind::Interrupted,
662                         "IO canceled on exit request",
663                     ));
664                 }
665             }
666         }
667 
668         Ok(())
669     }
670 
671     /// Gets the size in bytes of data in the pipe.
672     ///
673     /// Note that PeekNamedPipes (the underlying win32 API) will return zero if the packets have
674     /// not finished writing on the producer side.
get_available_byte_count(&self) -> io::Result<u32>675     pub fn get_available_byte_count(&self) -> io::Result<u32> {
676         let mut total_bytes_avail: DWORD = 0;
677 
678         // SAFETY:
679         // Safe because the underlying pipe handle is guaranteed to be open, and the output values
680         // live at valid memory locations.
681         fail_if_zero!(unsafe {
682             PeekNamedPipe(
683                 self.as_raw_descriptor(),
684                 ptr::null_mut(),
685                 0,
686                 ptr::null_mut(),
687                 &mut total_bytes_avail,
688                 ptr::null_mut(),
689             )
690         });
691 
692         Ok(total_bytes_avail)
693     }
694 
695     /// Writes the bytes from a slice into the pipe. Returns the number of bytes written, which
696     /// callers should check to ensure that it was the number expected.
write<T: PipeSendable>(&self, buf: &[T]) -> Result<usize>697     pub fn write<T: PipeSendable>(&self, buf: &[T]) -> Result<usize> {
698         // SAFETY: overlapped is None so this is safe.
699         unsafe { PipeConnection::write_internal(&self.handle, buf, None) }
700     }
701 
702     /// Similar to `PipeConnection::write` except it also allows:
703     ///     1. The same end of the named pipe to read and write at the same time in different
704     ///        threads.
705     ///     2. Asynchronous read and write (read and write won't block).
706     ///
707     /// When writing, it will not block, but instead an `OVERLAPPED` struct that contains an event
708     /// (can be created with `OverlappedWrapper::new`) will be passed into
709     /// `WriteFile`. That event will be triggered when the write operation is complete.
710     ///
711     /// In order to get how many bytes were written, call `get_overlapped_result`. That function
712     /// will also help with waiting until the write operation is complete. The pipe must be
713     /// opened in overlapped otherwise there may be unexpected behavior.
714     ///
715     /// # Safety
716     /// * buf & overlapped_wrapper MUST live until the overlapped operation is complete.
write_overlapped<T: PipeSendable>( &mut self, buf: &[T], overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()>717     pub unsafe fn write_overlapped<T: PipeSendable>(
718         &mut self,
719         buf: &[T],
720         overlapped_wrapper: &mut OverlappedWrapper,
721     ) -> Result<()> {
722         if overlapped_wrapper.in_use {
723             return Err(std::io::Error::new(
724                 std::io::ErrorKind::InvalidInput,
725                 "Overlapped struct already in use",
726             ));
727         }
728         overlapped_wrapper.in_use = true;
729 
730         PipeConnection::write_internal(
731             &self.handle,
732             buf,
733             Some(&mut overlapped_wrapper.overlapped.0),
734         )?;
735         Ok(())
736     }
737 
738     /// Helper for `write_overlapped` and `write`.
739     ///
740     /// # Safety
741     /// * Safe if overlapped is None.
742     /// * Safe if overlapped is Some and:
743     ///   + buf lives until the overlapped operation is complete.
744     ///   + overlapped lives until the overlapped operation is complete.
write_internal<T: PipeSendable>( handle: &SafeDescriptor, buf: &[T], overlapped: Option<&mut OVERLAPPED>, ) -> Result<usize>745     unsafe fn write_internal<T: PipeSendable>(
746         handle: &SafeDescriptor,
747         buf: &[T],
748         overlapped: Option<&mut OVERLAPPED>,
749     ) -> Result<usize> {
750         // SAFETY:
751         // Safe because buf points to memory valid until the write completes and we pass a valid
752         // length for that memory.
753         unsafe {
754             crate::windows::write_file(
755                 handle,
756                 buf.as_ptr() as *const u8,
757                 mem::size_of_val(buf),
758                 overlapped,
759             )
760         }
761     }
762 
763     /// Sets the blocking mode on the pipe.
set_blocking(&mut self, blocking_mode: &BlockingMode) -> io::Result<()>764     pub fn set_blocking(&mut self, blocking_mode: &BlockingMode) -> io::Result<()> {
765         let mut client_mode = DWORD::from(blocking_mode) | self.framing_mode.to_readmode();
766         self.blocking_mode = *blocking_mode;
767 
768         // SAFETY:
769         // Safe because the pipe has not been closed (it is managed by this object).
770         unsafe { set_named_pipe_handle_state(self.handle.as_raw_descriptor(), &mut client_mode) }
771     }
772 
773     /// For a server named pipe, waits for a client to connect (blocking).
wait_for_client_connection(&self) -> Result<()>774     pub fn wait_for_client_connection(&self) -> Result<()> {
775         let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event = */ true)?;
776         self.wait_for_client_connection_internal(
777             &mut overlapped_wrapper,
778             /* should_block = */ true,
779         )
780     }
781 
782     /// Interruptable blocking wait for a client to connect.
wait_for_client_connection_overlapped_blocking( &mut self, exit_event: &Event, ) -> Result<()>783     pub fn wait_for_client_connection_overlapped_blocking(
784         &mut self,
785         exit_event: &Event,
786     ) -> Result<()> {
787         let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event = */ true)?;
788         self.wait_for_client_connection_internal(
789             &mut overlapped_wrapper,
790             /* should_block = */ false,
791         )?;
792 
793         #[derive(EventToken)]
794         enum Token {
795             Connected,
796             Exit,
797         }
798 
799         let wait_ctx = WaitContext::build_with(&[
800             (
801                 overlapped_wrapper.get_h_event_ref().unwrap(),
802                 Token::Connected,
803             ),
804             (exit_event, Token::Exit),
805         ])?;
806 
807         let events = wait_ctx.wait()?;
808         if let Some(event) = events.into_iter().next() {
809             return match event.token {
810                 Token::Connected => Ok(()),
811                 Token::Exit => {
812                     // We must cancel IO here because it is unsafe to free the overlapped wrapper
813                     // while the IO operation is active.
814                     self.cancel_io()?;
815 
816                     Err(std::io::Error::new(
817                         std::io::ErrorKind::Interrupted,
818                         "IO canceled on exit request",
819                     ))
820                 }
821             };
822         }
823         unreachable!("wait cannot return Ok with zero events");
824     }
825 
826     /// For a server named pipe, waits for a client to connect using the given overlapped wrapper
827     /// to signal connection.
wait_for_client_connection_overlapped( &self, overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()>828     pub fn wait_for_client_connection_overlapped(
829         &self,
830         overlapped_wrapper: &mut OverlappedWrapper,
831     ) -> Result<()> {
832         self.wait_for_client_connection_internal(
833             overlapped_wrapper,
834             /* should_block = */ false,
835         )
836     }
837 
wait_for_client_connection_internal( &self, overlapped_wrapper: &mut OverlappedWrapper, should_block: bool, ) -> Result<()>838     fn wait_for_client_connection_internal(
839         &self,
840         overlapped_wrapper: &mut OverlappedWrapper,
841         should_block: bool,
842     ) -> Result<()> {
843         // SAFETY:
844         // Safe because the handle is valid and we're checking the return
845         // code according to the documentation
846         //
847         // TODO(b/279669296) this safety statement is incomplete, and as such incorrect in one case:
848         //      overlapped_wrapper must live until the overlapped operation is complete; however,
849         //      if should_block is false, nothing guarantees that lifetime and so overlapped_wrapper
850         //      could be freed while the operation is still running.
851         unsafe {
852             let success_flag = ConnectNamedPipe(
853                 self.as_raw_descriptor(),
854                 // Note: The overlapped structure is only used if the pipe was opened in
855                 // OVERLAPPED mode, but is necessary in that case.
856                 &mut *overlapped_wrapper.overlapped.0,
857             );
858             if success_flag == 0 {
859                 return match GetLastError() {
860                     ERROR_PIPE_CONNECTED => {
861                         if !should_block {
862                             // If async, make sure the event is signalled to indicate the client
863                             // is ready.
864                             overlapped_wrapper.get_h_event_ref().unwrap().signal()?;
865                         }
866 
867                         Ok(())
868                     }
869                     ERROR_IO_PENDING => {
870                         if should_block {
871                             overlapped_wrapper.get_h_event_ref().unwrap().wait()?;
872                         }
873                         Ok(())
874                     }
875                     err => Err(io::Error::from_raw_os_error(err as i32)),
876                 };
877             }
878         }
879         Ok(())
880     }
881 
882     /// Used for overlapped read and write operations.
883     ///
884     /// This will block until the ReadFile or WriteFile operation that also took in
885     /// `overlapped_wrapper` is complete, assuming `overlapped_wrapper` was created from
886     /// `OverlappedWrapper::new` or that `OVERLAPPED.hEvent` is set. This will also get
887     /// the number of bytes that were read or written.
get_overlapped_result( &mut self, overlapped_wrapper: &mut OverlappedWrapper, ) -> io::Result<u32>888     pub fn get_overlapped_result(
889         &mut self,
890         overlapped_wrapper: &mut OverlappedWrapper,
891     ) -> io::Result<u32> {
892         let res = self.get_overlapped_result_internal(overlapped_wrapper, /* wait= */ true);
893         overlapped_wrapper.in_use = false;
894         res
895     }
896 
897     /// Used for overlapped read and write operations.
898     ///
899     /// This will return immediately, regardless of the completion status of the
900     /// ReadFile or WriteFile operation that took in `overlapped_wrapper`,
901     /// assuming `overlapped_wrapper` was created from `OverlappedWrapper::new`
902     /// or that `OVERLAPPED.hEvent` is set. This will also get the number of bytes
903     /// that were read or written, if completed.  If the operation hasn't
904     /// completed, an error of kind `io::ErrorKind::WouldBlock` will be
905     /// returned.
try_get_overlapped_result( &mut self, overlapped_wrapper: &mut OverlappedWrapper, ) -> io::Result<u32>906     pub fn try_get_overlapped_result(
907         &mut self,
908         overlapped_wrapper: &mut OverlappedWrapper,
909     ) -> io::Result<u32> {
910         let res = self.get_overlapped_result_internal(overlapped_wrapper, /* wait= */ false);
911         match res {
912             Err(err) if err.raw_os_error().unwrap() as u32 == ERROR_IO_INCOMPLETE => {
913                 Err(io::Error::new(io::ErrorKind::WouldBlock, err))
914             }
915             _ => {
916                 overlapped_wrapper.in_use = false;
917                 res
918             }
919         }
920     }
921 
get_overlapped_result_internal( &mut self, overlapped_wrapper: &mut OverlappedWrapper, wait: bool, ) -> io::Result<u32>922     fn get_overlapped_result_internal(
923         &mut self,
924         overlapped_wrapper: &mut OverlappedWrapper,
925         wait: bool,
926     ) -> io::Result<u32> {
927         if !overlapped_wrapper.in_use {
928             return Err(std::io::Error::new(
929                 std::io::ErrorKind::InvalidInput,
930                 "Overlapped struct is not in use",
931             ));
932         }
933         let mut size_transferred = 0;
934         // SAFETY:
935         // Safe as long as `overlapped_struct` isn't copied and also contains a valid event.
936         // Also the named pipe handle must created with `FILE_FLAG_OVERLAPPED`.
937         fail_if_zero!(unsafe {
938             GetOverlappedResult(
939                 self.handle.as_raw_descriptor(),
940                 &mut *overlapped_wrapper.overlapped.0,
941                 &mut size_transferred,
942                 if wait { TRUE } else { FALSE },
943             )
944         });
945 
946         Ok(size_transferred)
947     }
948 
949     /// Cancels I/O Operations in the current process. Since `lpOverlapped` is null, this will
950     /// cancel all I/O requests for the file handle passed in.
cancel_io(&mut self) -> Result<()>951     pub fn cancel_io(&mut self) -> Result<()> {
952         fail_if_zero!(
953             // SAFETY: descriptor is valid and the return value is checked.
954             unsafe {
955                 CancelIoEx(
956                     self.handle.as_raw_descriptor(),
957                     /* lpOverlapped= */ std::ptr::null_mut(),
958                 )
959             }
960         );
961 
962         Ok(())
963     }
964 
965     /// Get the framing mode of the pipe.
get_framing_mode(&self) -> FramingMode966     pub fn get_framing_mode(&self) -> FramingMode {
967         self.framing_mode
968     }
969 
970     /// Returns metadata about the connected NamedPipe.
get_info(&self) -> Result<NamedPipeInfo>971     pub fn get_info(&self) -> Result<NamedPipeInfo> {
972         let mut flags: u32 = 0;
973         let mut incoming_buffer_size: u32 = 0;
974         let mut outgoing_buffer_size: u32 = 0;
975         let mut max_instances: u32 = 0;
976         // SAFETY: all pointers are valid
977         fail_if_zero!(unsafe {
978             GetNamedPipeInfo(
979                 self.as_raw_descriptor(),
980                 &mut flags,
981                 &mut outgoing_buffer_size,
982                 &mut incoming_buffer_size,
983                 &mut max_instances,
984             )
985         });
986 
987         Ok(NamedPipeInfo {
988             outgoing_buffer_size,
989             incoming_buffer_size,
990             max_instances,
991             flags,
992         })
993     }
994 
995     /// For a server pipe, flush the pipe contents. This will
996     /// block until the pipe is cleared by the client. Only
997     /// call this if you are sure the client is reading the
998     /// data!
flush_data_blocking(&self) -> Result<()>999     pub fn flush_data_blocking(&self) -> Result<()> {
1000         // SAFETY:
1001         // Safe because the only buffers interacted with are
1002         // outside of Rust memory
1003         fail_if_zero!(unsafe { FlushFileBuffers(self.as_raw_descriptor()) });
1004         Ok(())
1005     }
1006 
1007     /// For a server pipe, disconnect all clients, discarding any buffered data.
disconnect_clients(&self) -> Result<()>1008     pub fn disconnect_clients(&self) -> Result<()> {
1009         // SAFETY:
1010         // Safe because we own the handle passed in and know it will remain valid for the duration
1011         // of the call. Discarded buffers are not managed by rust.
1012         fail_if_zero!(unsafe { DisconnectNamedPipe(self.as_raw_descriptor()) });
1013         Ok(())
1014     }
1015 }
1016 
1017 impl AsRawDescriptor for PipeConnection {
as_raw_descriptor(&self) -> RawDescriptor1018     fn as_raw_descriptor(&self) -> RawDescriptor {
1019         self.handle.as_raw_descriptor()
1020     }
1021 }
1022 
1023 impl IntoRawDescriptor for PipeConnection {
into_raw_descriptor(self) -> RawDescriptor1024     fn into_raw_descriptor(self) -> RawDescriptor {
1025         self.handle.into_raw_descriptor()
1026     }
1027 }
1028 
1029 // SAFETY: Send safety is ensured by inner fields.
1030 unsafe impl Send for PipeConnection {}
1031 // SAFETY: Sync safety is ensured by inner fields.
1032 unsafe impl Sync for PipeConnection {}
1033 
1034 impl io::Read for PipeConnection {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>1035     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
1036         // SAFETY:
1037         // This is safe because PipeConnection::read is always safe for u8
1038         unsafe { PipeConnection::read(self, buf) }
1039     }
1040 }
1041 
1042 impl io::Write for PipeConnection {
write(&mut self, buf: &[u8]) -> io::Result<usize>1043     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1044         PipeConnection::write(self, buf)
1045     }
1046 
flush(&mut self) -> io::Result<()>1047     fn flush(&mut self) -> io::Result<()> {
1048         Ok(())
1049     }
1050 }
1051 
1052 /// A simple data struct representing
1053 /// metadata about a NamedPipe.
1054 #[derive(Debug, PartialEq, Eq)]
1055 pub struct NamedPipeInfo {
1056     pub outgoing_buffer_size: u32,
1057     pub incoming_buffer_size: u32,
1058     pub max_instances: u32,
1059     pub flags: u32,
1060 }
1061 
1062 /// This is a wrapper around PipeConnection. This allows a read and a write operations
1063 /// to run in parallel but not multiple reads or writes in parallel.
1064 ///
1065 /// Reason: The message from/to service are two-parts - a fixed size header that
1066 /// contains the size of the actual message. By allowing only one write at a time
1067 /// we ensure that the variable size message is written/read right after writing/reading
1068 /// fixed size header. For example it avoid sending or receiving in messages in order like
1069 /// H1, H2, M1, M2
1070 ///   - where header H1 and its message M1 are sent by one event loop and H2 and its message M2 are
1071 ///     sent by another event loop.
1072 ///
1073 /// Do not expose direct access to reader or writer pipes.
1074 ///
1075 /// The struct is clone-able so that different event loops can talk to the other end.
1076 #[derive(Clone)]
1077 pub struct MultiPartMessagePipe {
1078     // Lock protected pipe to receive messages.
1079     reader: Arc<Mutex<PipeConnection>>,
1080     // Lock protected pipe to send messages.
1081     writer: Arc<Mutex<PipeConnection>>,
1082     // Whether this end is created as server or client. The variable helps to
1083     // decide if something meanigful should be done when `wait_for_connection` is called.
1084     is_server: bool,
1085     // Always true if pipe is created as client.
1086     // Defaults to false on server. Updated to true on calling `wait_for_connection`
1087     // after a client connects.
1088     is_connected: Arc<AtomicBool>,
1089 }
1090 
1091 impl MultiPartMessagePipe {
create_from_pipe(pipe: PipeConnection, is_server: bool) -> Result<Self>1092     fn create_from_pipe(pipe: PipeConnection, is_server: bool) -> Result<Self> {
1093         Ok(Self {
1094             reader: Arc::new(Mutex::new(pipe.try_clone()?)),
1095             writer: Arc::new(Mutex::new(pipe)),
1096             is_server,
1097             is_connected: Arc::new(AtomicBool::new(false)),
1098         })
1099     }
1100 
1101     /// Create server side of MutiPartMessagePipe.
1102     /// # Safety
1103     /// `pipe` must be a server named pipe.
1104     #[deny(unsafe_op_in_unsafe_fn)]
create_from_server_pipe(pipe: PipeConnection) -> Result<Self>1105     pub unsafe fn create_from_server_pipe(pipe: PipeConnection) -> Result<Self> {
1106         Self::create_from_pipe(pipe, true)
1107     }
1108 
1109     /// Create client side of MutiPartMessagePipe.
create_as_client(pipe_name: &str) -> Result<Self>1110     pub fn create_as_client(pipe_name: &str) -> Result<Self> {
1111         let pipe = create_client_pipe(
1112             &format!(r"\\.\pipe\{}", pipe_name),
1113             &FramingMode::Message,
1114             &BlockingMode::Wait,
1115             /* overlapped= */ true,
1116         )?;
1117         Self::create_from_pipe(pipe, false)
1118     }
1119 
1120     /// Create server side of MutiPartMessagePipe.
create_as_server(pipe_name: &str) -> Result<Self>1121     pub fn create_as_server(pipe_name: &str) -> Result<Self> {
1122         let pipe = create_server_pipe(
1123             &format!(r"\\.\pipe\{}", pipe_name,),
1124             &FramingMode::Message,
1125             &BlockingMode::Wait,
1126             0,
1127             1024 * 1024,
1128             true,
1129         )?;
1130         // SAFETY: `pipe` is a server named pipe.
1131         unsafe { Self::create_from_server_pipe(pipe) }
1132     }
1133 
1134     /// If the struct is created as a server then waits for client connection to arrive.
1135     /// It only waits on reader as reader and writer are clones.
wait_for_connection(&self) -> Result<()>1136     pub fn wait_for_connection(&self) -> Result<()> {
1137         if self.is_server && !self.is_connected.load(Ordering::Relaxed) {
1138             self.reader.lock().wait_for_client_connection()?;
1139             self.is_connected.store(true, Ordering::Relaxed);
1140         }
1141         Ok(())
1142     }
1143 
write_overlapped_blocking_message_internal<T: PipeSendable>( pipe: &mut PipeConnection, buf: &[T], overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()>1144     fn write_overlapped_blocking_message_internal<T: PipeSendable>(
1145         pipe: &mut PipeConnection,
1146         buf: &[T],
1147         overlapped_wrapper: &mut OverlappedWrapper,
1148     ) -> Result<()> {
1149         // Safety:
1150         // `buf` and `overlapped_wrapper` will be in use for the duration of
1151         // the overlapped operation. These must not be reused and must live until
1152         // after `get_overlapped_result()` has been called which is done right
1153         // after this call.
1154         unsafe {
1155             pipe.write_overlapped(buf, overlapped_wrapper)?;
1156         }
1157 
1158         let size_written_in_bytes = pipe.get_overlapped_result(overlapped_wrapper)?;
1159 
1160         if size_written_in_bytes as usize != buf.len() {
1161             return Err(std::io::Error::new(
1162                 std::io::ErrorKind::UnexpectedEof,
1163                 format!(
1164                     "Short write expected:{} found:{}",
1165                     size_written_in_bytes,
1166                     buf.len(),
1167                 ),
1168             ));
1169         }
1170         Ok(())
1171     }
1172     /// Sends, blockingly,`buf` over the pipe in its entirety. Partial write is considered
write_overlapped_blocking_message<T: PipeSendable>( &self, header: &[T], message: &[T], overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()>1173     pub fn write_overlapped_blocking_message<T: PipeSendable>(
1174         &self,
1175         header: &[T],
1176         message: &[T],
1177         overlapped_wrapper: &mut OverlappedWrapper,
1178     ) -> Result<()> {
1179         let mut writer = self.writer.lock();
1180         Self::write_overlapped_blocking_message_internal(&mut writer, header, overlapped_wrapper)?;
1181         Self::write_overlapped_blocking_message_internal(&mut writer, message, overlapped_wrapper)
1182     }
1183 
1184     /// Reads a variable size message and returns the message on success.
1185     /// The size of the message is expected to proceed the message in
1186     /// the form of `header_size` message.
1187     ///
1188     /// `parse_message_size` lets caller parse the header to extract
1189     /// message size.
1190     ///
1191     /// Event on `exit_event` is used to interrupt the blocked read.
read_overlapped_blocking_message<F: FnOnce(&[u8]) -> usize>( &self, header_size: usize, parse_message_size: F, overlapped_wrapper: &mut OverlappedWrapper, exit_event: &Event, ) -> Result<Vec<u8>>1192     pub fn read_overlapped_blocking_message<F: FnOnce(&[u8]) -> usize>(
1193         &self,
1194         header_size: usize,
1195         parse_message_size: F,
1196         overlapped_wrapper: &mut OverlappedWrapper,
1197         exit_event: &Event,
1198     ) -> Result<Vec<u8>> {
1199         let mut pipe = self.reader.lock();
1200         let mut header = vec![0; header_size];
1201         header.resize_with(header_size, Default::default);
1202         pipe.read_overlapped_blocking(&mut header, overlapped_wrapper, exit_event)?;
1203         let message_size = parse_message_size(&header);
1204         if message_size == 0 {
1205             return Ok(vec![]);
1206         }
1207         let mut buf = vec![];
1208         buf.resize_with(message_size, Default::default);
1209         pipe.read_overlapped_blocking(&mut buf, overlapped_wrapper, exit_event)?;
1210         Ok(buf)
1211     }
1212 
1213     /// Returns the inner named pipe if the current struct is the sole owner of the underlying
1214     /// named pipe.
1215     ///
1216     /// Otherwise, [`None`] is returned and the struct is dropped.
1217     ///
1218     /// Note that this has a similar race condition like [`Arc::try_unwrap`]: if multiple threads
1219     /// call this function simultaneously on the same clone of [`MultiPartMessagePipe`], it is
1220     /// possible that all of them will result in [`None`]. This is Due to Rust version
1221     /// restriction(1.68.2) when this function is introduced). This race condition can be resolved
1222     /// once we upgrade to 1.70.0 or higher by using [`Arc::into_inner`].
1223     ///
1224     /// If the underlying named pipe is a server named pipe, this method allows the caller to
1225     /// terminate the connection by first flushing the named pipe then disconnecting the clients
1226     /// idiomatically per
1227     /// https://learn.microsoft.com/en-us/windows/win32/ipc/named-pipe-operations#:~:text=When%20a%20client,of%20the%20pipe.
into_inner_pipe(self) -> Option<PipeConnection>1228     pub fn into_inner_pipe(self) -> Option<PipeConnection> {
1229         let piper = Arc::clone(&self.reader);
1230         drop(self);
1231         Arc::try_unwrap(piper).ok().map(Mutex::into_inner)
1232     }
1233 }
1234 
1235 impl TryFrom<PipeConnection> for MultiPartMessagePipe {
1236     type Error = std::io::Error;
try_from(pipe: PipeConnection) -> Result<Self>1237     fn try_from(pipe: PipeConnection) -> Result<Self> {
1238         Self::create_from_pipe(pipe, false)
1239     }
1240 }
1241 
1242 #[cfg(test)]
1243 mod tests {
1244     use std::mem::size_of;
1245     use std::thread::JoinHandle;
1246     use std::time::Duration;
1247 
1248     use super::*;
1249 
1250     #[test]
duplex_pipe_stream()1251     fn duplex_pipe_stream() {
1252         let (p1, p2) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
1253 
1254         // Test both forward and reverse direction since the underlying APIs are a bit asymmetrical
1255         // SAFETY: trivially safe with pipe created and return value checked.
1256         unsafe {
1257             for (dir, sender, receiver) in [("1 -> 2", &p1, &p2), ("2 -> 1", &p2, &p1)].iter() {
1258                 println!("{}", dir);
1259 
1260                 sender.write(&[75, 77, 54, 82, 76, 65]).unwrap();
1261 
1262                 // Smaller than what we sent so we get multiple chunks
1263                 let mut recv_buffer: [u8; 4] = [0; 4];
1264 
1265                 let mut size = receiver.read(&mut recv_buffer).unwrap();
1266                 assert_eq!(size, 4);
1267                 assert_eq!(recv_buffer, [75, 77, 54, 82]);
1268 
1269                 size = receiver.read(&mut recv_buffer).unwrap();
1270                 assert_eq!(size, 2);
1271                 assert_eq!(recv_buffer[0..2], [76, 65]);
1272             }
1273         }
1274     }
1275 
1276     #[test]
available_byte_count_byte_mode()1277     fn available_byte_count_byte_mode() {
1278         let (p1, p2) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
1279         p1.write(&[1, 23, 45]).unwrap();
1280         assert_eq!(p2.get_available_byte_count().unwrap(), 3);
1281 
1282         // PeekNamedPipe should NOT touch the data in the pipe. So if we call it again, it should
1283         // yield the same value.
1284         assert_eq!(p2.get_available_byte_count().unwrap(), 3);
1285     }
1286 
1287     #[test]
available_byte_count_message_mode()1288     fn available_byte_count_message_mode() {
1289         let (p1, p2) = pair(&FramingMode::Message, &BlockingMode::Wait, 0).unwrap();
1290         p1.write(&[1, 23, 45]).unwrap();
1291         assert_eq!(p2.get_available_byte_count().unwrap(), 3);
1292 
1293         // PeekNamedPipe should NOT touch the data in the pipe. So if we call it again, it should
1294         // yield the same value.
1295         assert_eq!(p2.get_available_byte_count().unwrap(), 3);
1296     }
1297 
1298     #[test]
available_byte_count_message_mode_multiple_messages()1299     fn available_byte_count_message_mode_multiple_messages() {
1300         let (p1, p2) = pair(&FramingMode::Message, &BlockingMode::Wait, 0).unwrap();
1301         p1.write(&[1, 2, 3]).unwrap();
1302         p1.write(&[4, 5]).unwrap();
1303         assert_eq!(p2.get_available_byte_count().unwrap(), 5);
1304     }
1305 
1306     #[test]
duplex_pipe_message()1307     fn duplex_pipe_message() {
1308         let (p1, p2) = pair(&FramingMode::Message, &BlockingMode::Wait, 0).unwrap();
1309 
1310         // Test both forward and reverse direction since the underlying APIs are a bit asymmetrical
1311         // SAFETY: trivially safe with pipe created and return value checked.
1312         unsafe {
1313             for (dir, sender, receiver) in [("1 -> 2", &p1, &p2), ("2 -> 1", &p2, &p1)].iter() {
1314                 println!("{}", dir);
1315 
1316                 // Send 2 messages so that we can check that message framing works
1317                 sender.write(&[1, 23, 45]).unwrap();
1318                 sender.write(&[67, 89, 10]).unwrap();
1319 
1320                 let mut recv_buffer: [u8; 5] = [0; 5]; // Larger than required for messages
1321 
1322                 let mut size = receiver.read(&mut recv_buffer).unwrap();
1323                 assert_eq!(size, 3);
1324                 assert_eq!(recv_buffer[0..3], [1, 23, 45]);
1325 
1326                 size = receiver.read(&mut recv_buffer).unwrap();
1327                 assert_eq!(size, 3);
1328                 assert_eq!(recv_buffer[0..3], [67, 89, 10]);
1329             }
1330         }
1331     }
1332 
1333     #[cfg(test)]
duplex_nowait_helper(p1: &PipeConnection, p2: &PipeConnection)1334     fn duplex_nowait_helper(p1: &PipeConnection, p2: &PipeConnection) {
1335         let mut recv_buffer: [u8; 1] = [0; 1];
1336 
1337         // Test both forward and reverse direction since the underlying APIs are a bit asymmetrical
1338         // SAFETY: trivially safe with PipeConnection created and return value checked.
1339         unsafe {
1340             for (dir, sender, receiver) in [("1 -> 2", &p1, &p2), ("2 -> 1", &p2, &p1)].iter() {
1341                 println!("{}", dir);
1342                 sender.write(&[1]).unwrap();
1343                 assert_eq!(receiver.read(&mut recv_buffer).unwrap(), 1); // Should succeed!
1344                 assert_eq!(
1345                     receiver.read(&mut recv_buffer).unwrap_err().kind(),
1346                     std::io::ErrorKind::WouldBlock
1347                 );
1348             }
1349         }
1350     }
1351 
1352     #[test]
duplex_nowait()1353     fn duplex_nowait() {
1354         let (p1, p2) = pair(&FramingMode::Byte, &BlockingMode::NoWait, 0).unwrap();
1355         duplex_nowait_helper(&p1, &p2);
1356     }
1357 
1358     #[test]
duplex_nowait_set_after_creation()1359     fn duplex_nowait_set_after_creation() {
1360         // Tests non blocking setting after pipe creation
1361         let (mut p1, mut p2) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
1362         p1.set_blocking(&BlockingMode::NoWait)
1363             .expect("Failed to set blocking mode on pipe p1");
1364         p2.set_blocking(&BlockingMode::NoWait)
1365             .expect("Failed to set blocking mode on pipe p2");
1366         duplex_nowait_helper(&p1, &p2);
1367     }
1368 
1369     #[test]
duplex_overlapped()1370     fn duplex_overlapped() {
1371         let pipe_name = generate_pipe_name();
1372 
1373         let mut p1 = create_server_pipe(
1374             &pipe_name,
1375             &FramingMode::Message,
1376             &BlockingMode::Wait,
1377             /* timeout= */ 0,
1378             /* buffer_size= */ 1000,
1379             /* overlapped= */ true,
1380         )
1381         .unwrap();
1382 
1383         let mut p2 = create_client_pipe(
1384             &pipe_name,
1385             &FramingMode::Message,
1386             &BlockingMode::Wait,
1387             /* overlapped= */ true,
1388         )
1389         .unwrap();
1390 
1391         // SAFETY:
1392         // Safe because `read_overlapped` can be called since overlapped struct is created.
1393         unsafe {
1394             let mut p1_overlapped_wrapper =
1395                 OverlappedWrapper::new(/* include_event= */ true).unwrap();
1396             p1.write_overlapped(&[75, 77, 54, 82, 76, 65], &mut p1_overlapped_wrapper)
1397                 .unwrap();
1398             let size = p1
1399                 .get_overlapped_result(&mut p1_overlapped_wrapper)
1400                 .unwrap();
1401             assert_eq!(size, 6);
1402 
1403             let mut recv_buffer: [u8; 6] = [0; 6];
1404 
1405             let mut p2_overlapped_wrapper =
1406                 OverlappedWrapper::new(/* include_event= */ true).unwrap();
1407             p2.read_overlapped(&mut recv_buffer, &mut p2_overlapped_wrapper)
1408                 .unwrap();
1409             let size = p2
1410                 .get_overlapped_result(&mut p2_overlapped_wrapper)
1411                 .unwrap();
1412             assert_eq!(size, 6);
1413             assert_eq!(recv_buffer, [75, 77, 54, 82, 76, 65]);
1414         }
1415     }
1416 
1417     #[test]
duplex_overlapped_test_in_use()1418     fn duplex_overlapped_test_in_use() {
1419         let pipe_name = generate_pipe_name();
1420 
1421         let mut p1 = create_server_pipe(
1422             &pipe_name,
1423             &FramingMode::Message,
1424             &BlockingMode::Wait,
1425             /* timeout= */ 0,
1426             /* buffer_size= */ 1000,
1427             /* overlapped= */ true,
1428         )
1429         .unwrap();
1430 
1431         let mut p2 = create_client_pipe(
1432             &pipe_name,
1433             &FramingMode::Message,
1434             &BlockingMode::Wait,
1435             /* overlapped= */ true,
1436         )
1437         .unwrap();
1438         let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event= */ true).unwrap();
1439 
1440         let res = p1.get_overlapped_result(&mut overlapped_wrapper);
1441         assert!(res.is_err());
1442 
1443         let data = vec![75, 77, 54, 82, 76, 65];
1444         // SAFETY: safe because: data & overlapped wrapper live until the
1445         // operation is verified completed below.
1446         let res = unsafe { p1.write_overlapped(&data, &mut overlapped_wrapper) };
1447         assert!(res.is_ok());
1448 
1449         let res =
1450             // SAFETY: safe because we know the unsafe re-use of overlapped wrapper
1451             // will error out.
1452             unsafe { p2.write_overlapped(&[75, 77, 54, 82, 76, 65], &mut overlapped_wrapper) };
1453         assert!(res.is_err());
1454 
1455         let mut recv_buffer: [u8; 6] = [0; 6];
1456         // SAFETY: safe because we know the unsafe re-use of overlapped wrapper
1457         // will error out.
1458         let res = unsafe { p2.read_overlapped(&mut recv_buffer, &mut overlapped_wrapper) };
1459         assert!(res.is_err());
1460 
1461         let res = p1.get_overlapped_result(&mut overlapped_wrapper);
1462         assert!(res.is_ok());
1463 
1464         let mut recv_buffer: [u8; 6] = [0; 6];
1465         // SAFETY: safe because recv_buffer & overlapped_wrapper live until the
1466         // operation is verified completed below.
1467         let res = unsafe { p2.read_overlapped(&mut recv_buffer, &mut overlapped_wrapper) };
1468         assert!(res.is_ok());
1469         let res = p2.get_overlapped_result(&mut overlapped_wrapper);
1470         assert!(res.is_ok());
1471     }
1472 
generate_pipe_name() -> String1473     fn generate_pipe_name() -> String {
1474         format!(
1475             r"\\.\pipe\test-ipc-pipe-name.rand{}",
1476             rand::thread_rng().gen::<u64>(),
1477         )
1478     }
1479 
send_receive_msgs(pipe: MultiPartMessagePipe, msg_count: u32) -> JoinHandle<()>1480     fn send_receive_msgs(pipe: MultiPartMessagePipe, msg_count: u32) -> JoinHandle<()> {
1481         let messages = ["a", "bb", "ccc", "dddd", "eeeee", "ffffff"];
1482         std::thread::spawn(move || {
1483             let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event= */ true).unwrap();
1484             let exit_event = Event::new().unwrap();
1485             for _i in 0..msg_count {
1486                 let message = *messages
1487                     .get(rand::thread_rng().gen::<usize>() % messages.len())
1488                     .unwrap();
1489                 pipe.write_overlapped_blocking_message(
1490                     &message.len().to_be_bytes(),
1491                     message.as_bytes(),
1492                     &mut overlapped_wrapper,
1493                 )
1494                 .unwrap();
1495             }
1496             for _i in 0..msg_count {
1497                 let message = pipe
1498                     .read_overlapped_blocking_message(
1499                         size_of::<usize>(),
1500                         |bytes: &[u8]| {
1501                             assert_eq!(bytes.len(), size_of::<usize>());
1502                             usize::from_be_bytes(
1503                                 bytes.try_into().expect("failed to get array from slice"),
1504                             )
1505                         },
1506                         &mut overlapped_wrapper,
1507                         &exit_event,
1508                     )
1509                     .unwrap();
1510                 assert_eq!(
1511                     *messages.get(message.len() - 1).unwrap(),
1512                     std::str::from_utf8(&message).unwrap(),
1513                 );
1514             }
1515         })
1516     }
1517 
1518     #[test]
multipart_message_smoke_test()1519     fn multipart_message_smoke_test() {
1520         let pipe_name = generate_pipe_name();
1521         let server = MultiPartMessagePipe::create_as_server(&pipe_name).unwrap();
1522         let client = MultiPartMessagePipe::create_as_client(&pipe_name).unwrap();
1523         let handles = [
1524             send_receive_msgs(server.clone(), 100),
1525             send_receive_msgs(client.clone(), 100),
1526             send_receive_msgs(server, 100),
1527             send_receive_msgs(client, 100),
1528         ];
1529         for h in handles {
1530             h.join().unwrap();
1531         }
1532     }
1533 
1534     #[test]
multipart_message_into_inner_pipe()1535     fn multipart_message_into_inner_pipe() {
1536         let pipe_name = generate_pipe_name();
1537         let mut pipe = create_server_pipe(
1538             &format!(r"\\.\pipe\{}", pipe_name),
1539             &FramingMode::Message,
1540             &BlockingMode::Wait,
1541             0,
1542             1024 * 1024,
1543             true,
1544         )
1545         .expect("should create the server pipe with success");
1546         let server1 = {
1547             let pipe = pipe
1548                 .try_clone()
1549                 .expect("should duplicate the named pipe with success");
1550             // SAFETY: `pipe` is a server named pipe.
1551             unsafe { MultiPartMessagePipe::create_from_server_pipe(pipe) }
1552                 .expect("should create the multipart message pipe with success")
1553         };
1554         let server2 = server1.clone();
1555         assert!(
1556             server2.into_inner_pipe().is_none(),
1557             "not the last reference, should be None"
1558         );
1559         let inner_pipe = server1
1560             .into_inner_pipe()
1561             .expect("the last reference, should return the underlying pipe");
1562         // CompareObjectHandles is a Windows 10 API and is not available in mingw, so we can't use
1563         // that API to compare if 2 handles are the same.
1564         pipe.set_blocking(&BlockingMode::NoWait)
1565             .expect("should set the blocking mode on the original pipe with success");
1566         assert_eq!(
1567             pipe.get_info()
1568                 .expect("should get the pipe information on the original pipe successfully"),
1569             inner_pipe
1570                 .get_info()
1571                 .expect("should get the pipe information on the inner pipe successfully")
1572         );
1573         pipe.set_blocking(&BlockingMode::Wait)
1574             .expect("should set the blocking mode on the original pipe with success");
1575         assert_eq!(
1576             pipe.get_info()
1577                 .expect("should get the pipe information on the original pipe successfully"),
1578             inner_pipe
1579                 .get_info()
1580                 .expect("should get the pipe information on the inner pipe successfully")
1581         );
1582     }
1583 
1584     #[test]
test_wait_for_connection_blocking()1585     fn test_wait_for_connection_blocking() {
1586         let pipe_name = generate_pipe_name();
1587 
1588         let mut server_pipe = create_server_pipe(
1589             &pipe_name,
1590             &FramingMode::Message,
1591             &BlockingMode::Wait,
1592             /* timeout= */ 0,
1593             /* buffer_size= */ 1000,
1594             /* overlapped= */ true,
1595         )
1596         .unwrap();
1597 
1598         let server = crate::thread::spawn_with_timeout(move || {
1599             let exit_event = Event::new().unwrap();
1600             server_pipe
1601                 .wait_for_client_connection_overlapped_blocking(&exit_event)
1602                 .unwrap();
1603         });
1604 
1605         let _client = create_client_pipe(
1606             &pipe_name,
1607             &FramingMode::Message,
1608             &BlockingMode::Wait,
1609             /* overlapped= */ true,
1610         )
1611         .unwrap();
1612         server.try_join(Duration::from_secs(10)).unwrap();
1613     }
1614 
1615     #[test]
test_wait_for_connection_blocking_exit_triggered()1616     fn test_wait_for_connection_blocking_exit_triggered() {
1617         let pipe_name = generate_pipe_name();
1618 
1619         let mut server_pipe = create_server_pipe(
1620             &pipe_name,
1621             &FramingMode::Message,
1622             &BlockingMode::Wait,
1623             /* timeout= */ 0,
1624             /* buffer_size= */ 1000,
1625             /* overlapped= */ true,
1626         )
1627         .unwrap();
1628 
1629         let exit_event = Event::new().unwrap();
1630         let exit_event_for_server = exit_event.try_clone().unwrap();
1631         let server = crate::thread::spawn_with_timeout(move || {
1632             assert!(server_pipe
1633                 .wait_for_client_connection_overlapped_blocking(&exit_event_for_server)
1634                 .is_err());
1635         });
1636         exit_event.signal().unwrap();
1637         server.try_join(Duration::from_secs(10)).unwrap();
1638     }
1639 
1640     #[test]
std_io_read_eof()1641     fn std_io_read_eof() {
1642         let (mut w, mut r) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
1643         std::io::Write::write(&mut w, &[1, 2, 3]).unwrap();
1644         std::mem::drop(w);
1645 
1646         let mut buffer: [u8; 4] = [0; 4];
1647         assert_eq!(std::io::Read::read(&mut r, &mut buffer).unwrap(), 3);
1648         assert_eq!(buffer, [1, 2, 3, 0]);
1649         assert_eq!(std::io::Read::read(&mut r, &mut buffer).unwrap(), 0);
1650         assert_eq!(std::io::Read::read(&mut r, &mut buffer).unwrap(), 0);
1651     }
1652 
1653     #[test]
std_io_write_eof()1654     fn std_io_write_eof() {
1655         let (mut w, r) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
1656         std::mem::drop(r);
1657         let result = std::io::Write::write(&mut w, &[1, 2, 3]);
1658         // Not required to return BrokenPipe here, something like Ok(0) is also acceptable.
1659         assert!(
1660             result.is_err()
1661                 && result.as_ref().unwrap_err().kind() == std::io::ErrorKind::BrokenPipe,
1662             "expected Err(BrokenPipe), got {result:?}"
1663         );
1664     }
1665 }
1666