1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::ffi::CString;
6 use std::fs::OpenOptions;
7 use std::io;
8 use std::io::Result;
9 use std::mem;
10 use std::os::windows::fs::OpenOptionsExt;
11 use std::process;
12 use std::ptr;
13 use std::sync::atomic::AtomicBool;
14 use std::sync::atomic::AtomicUsize;
15 use std::sync::atomic::Ordering;
16 use std::sync::Arc;
17
18 use rand::Rng;
19 use serde::Deserialize;
20 use serde::Serialize;
21 use sync::Mutex;
22 use win_util::fail_if_zero;
23 use win_util::SecurityAttributes;
24 use win_util::SelfRelativeSecurityDescriptor;
25 use winapi::shared::minwindef::DWORD;
26 use winapi::shared::minwindef::FALSE;
27 use winapi::shared::minwindef::TRUE;
28 use winapi::shared::winerror::ERROR_BROKEN_PIPE;
29 use winapi::shared::winerror::ERROR_IO_INCOMPLETE;
30 use winapi::shared::winerror::ERROR_IO_PENDING;
31 use winapi::shared::winerror::ERROR_MORE_DATA;
32 use winapi::shared::winerror::ERROR_NO_DATA;
33 use winapi::shared::winerror::ERROR_PIPE_CONNECTED;
34 use winapi::um::errhandlingapi::GetLastError;
35 use winapi::um::fileapi::FlushFileBuffers;
36 use winapi::um::handleapi::INVALID_HANDLE_VALUE;
37 use winapi::um::ioapiset::CancelIoEx;
38 use winapi::um::ioapiset::GetOverlappedResult;
39 use winapi::um::minwinbase::OVERLAPPED;
40 use winapi::um::namedpipeapi::ConnectNamedPipe;
41 use winapi::um::namedpipeapi::DisconnectNamedPipe;
42 use winapi::um::namedpipeapi::GetNamedPipeInfo;
43 use winapi::um::namedpipeapi::PeekNamedPipe;
44 use winapi::um::namedpipeapi::SetNamedPipeHandleState;
45 use winapi::um::winbase::CreateNamedPipeA;
46 use winapi::um::winbase::FILE_FLAG_FIRST_PIPE_INSTANCE;
47 use winapi::um::winbase::FILE_FLAG_OVERLAPPED;
48 use winapi::um::winbase::PIPE_ACCESS_DUPLEX;
49 use winapi::um::winbase::PIPE_NOWAIT;
50 use winapi::um::winbase::PIPE_READMODE_BYTE;
51 use winapi::um::winbase::PIPE_READMODE_MESSAGE;
52 use winapi::um::winbase::PIPE_REJECT_REMOTE_CLIENTS;
53 use winapi::um::winbase::PIPE_TYPE_BYTE;
54 use winapi::um::winbase::PIPE_TYPE_MESSAGE;
55 use winapi::um::winbase::PIPE_WAIT;
56 use winapi::um::winbase::SECURITY_IDENTIFICATION;
57
58 use super::RawDescriptor;
59 use crate::descriptor::AsRawDescriptor;
60 use crate::descriptor::FromRawDescriptor;
61 use crate::descriptor::IntoRawDescriptor;
62 use crate::descriptor::SafeDescriptor;
63 use crate::Event;
64 use crate::EventToken;
65 use crate::WaitContext;
66
67 /// The default buffer size for all named pipes in the system. If this size is too small, writers
68 /// on named pipes that expect not to block *can* block until the reading side empties the buffer.
69 ///
70 /// The general rule is this should be *at least* as big as the largest message, otherwise
71 /// unexpected blocking behavior can result; for example, if too small, this can interact badly with
72 /// crate::windows::StreamChannel, which expects to be able to make a complete write before
73 /// releasing a lock that the opposite side needs to complete a read. This means that if the buffer
74 /// is too small:
75 /// * The writer can't complete its write and release the lock because the buffer is too small.
76 /// * The reader can't start reading because the lock is held by the writer, so it can't relieve
77 /// buffer pressure. Note that for message pipes, the reader couldn't do anything to help
78 /// anyway, because a message mode pipe should NOT have a partial read (which is what we would
79 /// need to relieve pressure).
80 /// * Conditions for deadlock are met, and both the reader & writer enter circular waiting.
81 pub const DEFAULT_BUFFER_SIZE: usize = 50 * 1024;
82
83 static NEXT_PIPE_INDEX: AtomicUsize = AtomicUsize::new(1);
84
85 /// Represents one end of a named pipe
86 #[derive(Serialize, Deserialize, Debug)]
87 pub struct PipeConnection {
88 handle: SafeDescriptor,
89 framing_mode: FramingMode,
90 blocking_mode: BlockingMode,
91 }
92
93 /// `OVERLAPPED` is allocated on the heap because it must not move while performing I/O operations.
94 ///
95 /// Defined as a separate type so that we can mark it as `Send` and `Sync`.
96 pub struct BoxedOverlapped(pub Box<OVERLAPPED>);
97
98 // SAFETY: `OVERLAPPED` is not automatically `Send` because it contains a `HANDLE`, which is a raw
99 // pointer, but `HANDLE`s are safe to move between threads and thus so is `OVERLAPPED`.
100 unsafe impl Send for BoxedOverlapped {}
101
102 // SAFETY: See the argument for `Send` above. `HANDLE`s are also safe to share between threads.
103 unsafe impl Sync for BoxedOverlapped {}
104
105 /// Wraps the OVERLAPPED structure. Also keeps track of whether OVERLAPPED is being used by a
106 /// Readfile or WriteFile operation and holds onto the event object so it doesn't get dropped.
107 pub struct OverlappedWrapper {
108 overlapped: BoxedOverlapped,
109 // This field prevents the event handle from being dropped too early and allows callers to
110 // be notified when a read or write overlapped operation has completed.
111 h_event: Option<Event>,
112 in_use: bool,
113 }
114
115 impl OverlappedWrapper {
get_h_event_ref(&self) -> Option<&Event>116 pub fn get_h_event_ref(&self) -> Option<&Event> {
117 self.h_event.as_ref()
118 }
119
120 /// Creates a valid `OVERLAPPED` struct used to pass into `ReadFile` and `WriteFile` in order
121 /// to perform asynchronous I/O. When passing in the OVERLAPPED struct, the Event object
122 /// returned must not be dropped.
123 ///
124 /// There is an option to create the event object and set it to the `hEvent` field. If hEvent
125 /// is not set and the named pipe handle was created with `FILE_FLAG_OVERLAPPED`, then the file
126 /// handle will be signaled when the operation is complete. In other words, you can use
127 /// `WaitForSingleObject` on the file handle. Not setting an event is highly discouraged by
128 /// Microsoft though.
new(include_event: bool) -> Result<OverlappedWrapper>129 pub fn new(include_event: bool) -> Result<OverlappedWrapper> {
130 let mut overlapped = OVERLAPPED::default();
131 let h_event = if include_event {
132 Some(Event::new()?)
133 } else {
134 None
135 };
136
137 overlapped.hEvent = if let Some(event) = h_event.as_ref() {
138 event.as_raw_descriptor()
139 } else {
140 0 as RawDescriptor
141 };
142
143 Ok(OverlappedWrapper {
144 overlapped: BoxedOverlapped(Box::new(overlapped)),
145 h_event,
146 in_use: false,
147 })
148 }
149 }
150
151 pub trait WriteOverlapped {
152 /// Perform an overlapped write operation with the specified buffer and overlapped wrapper.
153 /// If successful, the write operation will complete asynchronously, and
154 /// `write_result()` should be called to get the result.
155 ///
156 /// # Safety
157 /// `buf` and `overlapped_wrapper` will be in use for the duration of
158 /// the overlapped operation. These must not be reused and must live until
159 /// after `write_result()` has been called.
write_overlapped( &mut self, buf: &mut [u8], overlapped_wrapper: &mut OverlappedWrapper, ) -> io::Result<()>160 unsafe fn write_overlapped(
161 &mut self,
162 buf: &mut [u8],
163 overlapped_wrapper: &mut OverlappedWrapper,
164 ) -> io::Result<()>;
165
166 /// Gets the result of the overlapped write operation. Must only be called
167 /// after issuing an overlapped write operation using `write_overlapped`. The
168 /// same `overlapped_wrapper` must be provided.
write_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>169 fn write_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>;
170
171 /// Tries to get the result of the overlapped write operation. Must only be
172 /// called once, and only after issuing an overlapped write operation using
173 /// `write_overlapped`. The same `overlapped_wrapper` must be provided.
174 ///
175 /// An error indicates that the operation hasn't completed yet and
176 /// `write_result` or `try_write_result` should be called again.
try_write_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>177 fn try_write_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper)
178 -> io::Result<usize>;
179 }
180
181 pub trait ReadOverlapped {
182 /// Perform an overlapped read operation with the specified buffer and overlapped wrapper.
183 /// If successful, the read operation will complete asynchronously, and
184 /// `read_result()` should be called to get the result.
185 ///
186 /// # Safety
187 /// `buf` and `overlapped_wrapper` will be in use for the duration of
188 /// the overlapped operation. These must not be reused and must live until
189 /// after `read_result()` has been called.
read_overlapped( &mut self, buf: &mut [u8], overlapped_wrapper: &mut OverlappedWrapper, ) -> io::Result<()>190 unsafe fn read_overlapped(
191 &mut self,
192 buf: &mut [u8],
193 overlapped_wrapper: &mut OverlappedWrapper,
194 ) -> io::Result<()>;
195
196 /// Gets the result of the overlapped read operation. Must only be called
197 /// once, and only after issuing an overlapped read operation using
198 /// `read_overlapped`. The same `overlapped_wrapper` must be provided.
read_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>199 fn read_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>;
200
201 /// Tries to get the result of the overlapped read operation. Must only be called
202 /// after issuing an overlapped read operation using `read_overlapped`. The
203 /// same `overlapped_wrapper` must be provided.
204 ///
205 /// An error indicates that the operation hasn't completed yet and
206 /// `read_result` or `try_read_result` should be called again.
try_read_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>207 fn try_read_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>;
208 }
209
210 #[derive(Serialize, Deserialize, Copy, Clone, Debug, PartialEq, Eq)]
211 pub enum FramingMode {
212 Byte,
213 Message,
214 }
215
216 impl FramingMode {
to_readmode(self) -> DWORD217 fn to_readmode(self) -> DWORD {
218 match self {
219 FramingMode::Message => PIPE_READMODE_MESSAGE,
220 FramingMode::Byte => PIPE_READMODE_BYTE,
221 }
222 }
223
to_pipetype(self) -> DWORD224 fn to_pipetype(self) -> DWORD {
225 match self {
226 FramingMode::Message => PIPE_TYPE_MESSAGE,
227 FramingMode::Byte => PIPE_TYPE_BYTE,
228 }
229 }
230 }
231
232 #[derive(Serialize, Deserialize, Copy, Clone, PartialEq, Debug, Eq)]
233 pub enum BlockingMode {
234 /// Calls to read() block until data is received
235 Wait,
236 /// Calls to read() return immediately even if there is nothing read with error code 232
237 /// (Rust maps this to BrokenPipe but it's actually ERROR_NO_DATA)
238 ///
239 /// NOTE: This mode is discouraged by the Windows API documentation.
240 NoWait,
241 }
242
243 impl From<&BlockingMode> for DWORD {
from(blocking_mode: &BlockingMode) -> DWORD244 fn from(blocking_mode: &BlockingMode) -> DWORD {
245 match blocking_mode {
246 BlockingMode::Wait => PIPE_WAIT,
247 BlockingMode::NoWait => PIPE_NOWAIT,
248 }
249 }
250 }
251
252 /// Sets the handle state for a named pipe in a rust friendly way.
253 /// SAFETY:
254 /// This is safe if the pipe handle is open.
set_named_pipe_handle_state( pipe_handle: RawDescriptor, client_mode: &mut DWORD, ) -> Result<()>255 unsafe fn set_named_pipe_handle_state(
256 pipe_handle: RawDescriptor,
257 client_mode: &mut DWORD,
258 ) -> Result<()> {
259 // Safe when the pipe handle is open. Safety also requires checking the return value, which we
260 // do below.
261 let success_flag = SetNamedPipeHandleState(
262 /* hNamedPipe= */ pipe_handle,
263 /* lpMode= */ client_mode,
264 /* lpMaxCollectionCount= */ ptr::null_mut(),
265 /* lpCollectDataTimeout= */ ptr::null_mut(),
266 );
267 if success_flag == 0 {
268 Err(io::Error::last_os_error())
269 } else {
270 Ok(())
271 }
272 }
273
pair( framing_mode: &FramingMode, blocking_mode: &BlockingMode, timeout: u64, ) -> Result<(PipeConnection, PipeConnection)>274 pub fn pair(
275 framing_mode: &FramingMode,
276 blocking_mode: &BlockingMode,
277 timeout: u64,
278 ) -> Result<(PipeConnection, PipeConnection)> {
279 pair_with_buffer_size(
280 framing_mode,
281 blocking_mode,
282 timeout,
283 DEFAULT_BUFFER_SIZE,
284 false,
285 )
286 }
287
288 /// Creates a pair of handles connected to either end of a duplex named pipe.
289 ///
290 /// The pipe created will have a semi-random name and a default set of security options that
291 /// help prevent common named-pipe based vulnerabilities. Specifically the pipe is set to reject
292 /// remote clients, allow only a single server instance, and prevent impersonation by the server
293 /// end of the pipe.
294 ///
295 /// # Arguments
296 ///
297 /// * `framing_mode` - Whether the system should provide a simple byte stream (Byte) or an
298 /// automatically framed sequence of messages (Message). In message mode it's an error to read
299 /// fewer bytes than were sent in a message from the other end of the pipe.
300 /// * `blocking_mode` - Whether the system should wait on read() until data is available (Wait) or
301 /// return immediately if there is nothing available (NoWait).
302 /// * `timeout` - A timeout to apply for socket operations, in milliseconds. Setting this to
303 /// zero will create sockets with the system default timeout.
304 /// * `buffer_size` - The default buffer size for the named pipe. The system should expand the
305 /// buffer automatically as needed, except in the case of NOWAIT pipes, where it will just fail
306 /// writes that don't fit in the buffer.
307 /// # Return value
308 ///
309 /// Returns a pair of pipes, of the form (server, client). Note that for some winapis, such as
310 /// FlushFileBuffers, the server & client ends WILL BEHAVE DIFFERENTLY.
pair_with_buffer_size( framing_mode: &FramingMode, blocking_mode: &BlockingMode, timeout: u64, buffer_size: usize, overlapped: bool, ) -> Result<(PipeConnection, PipeConnection)>311 pub fn pair_with_buffer_size(
312 framing_mode: &FramingMode,
313 blocking_mode: &BlockingMode,
314 timeout: u64,
315 buffer_size: usize,
316 overlapped: bool,
317 ) -> Result<(PipeConnection, PipeConnection)> {
318 // Give the pipe a unique name to avoid accidental collisions
319 let pipe_name = format!(
320 r"\\.\pipe\crosvm_ipc.pid{}.{}.rand{}",
321 process::id(),
322 NEXT_PIPE_INDEX.fetch_add(1, Ordering::SeqCst),
323 rand::thread_rng().gen::<u32>(),
324 );
325
326 let server_end = create_server_pipe(
327 &pipe_name,
328 framing_mode,
329 blocking_mode,
330 timeout,
331 buffer_size,
332 overlapped,
333 )?;
334
335 // Open the named pipe we just created as the client
336 let client_end = create_client_pipe(&pipe_name, framing_mode, blocking_mode, overlapped)?;
337
338 // Accept the client's connection
339 // Not sure if this is strictly needed but I'm doing it just in case.
340 // We expect at this point that the client will already be connected,
341 // so we'll get a return code of 0 and an ERROR_PIPE_CONNECTED.
342 // It's also OK if we get a return code of success.
343 server_end.wait_for_client_connection()?;
344
345 Ok((server_end, client_end))
346 }
347
348 /// Creates a PipeConnection for the server end of a named pipe with the given path and pipe
349 /// settings.
350 ///
351 /// The pipe will be set to reject remote clients and allow only a single connection at a time.
352 ///
353 /// # Arguments
354 ///
355 /// * `pipe_name` - The path of the named pipe to create. Should be in the form
356 /// `\\.\pipe\<some-name>`.
357 /// * `framing_mode` - Whether the system should provide a simple byte stream (Byte) or an
358 /// automatically framed sequence of messages (Message). In message mode it's an error to read
359 /// fewer bytes than were sent in a message from the other end of the pipe.
360 /// * `blocking_mode` - Whether the system should wait on read() until data is available (Wait) or
361 /// return immediately if there is nothing available (NoWait).
362 /// * `timeout` - A timeout to apply for socket operations, in milliseconds. Setting this to
363 /// zero will create sockets with the system default timeout.
364 /// * `buffer_size` - The default buffer size for the named pipe. The system should expand the
365 /// buffer automatically as needed, except in the case of NOWAIT pipes, where it will just fail
366 /// writes that don't fit in the buffer.
367 /// * `overlapped` - Sets whether overlapped mode is set on the pipe.
create_server_pipe( pipe_name: &str, framing_mode: &FramingMode, blocking_mode: &BlockingMode, timeout: u64, buffer_size: usize, overlapped: bool, ) -> Result<PipeConnection>368 pub fn create_server_pipe(
369 pipe_name: &str,
370 framing_mode: &FramingMode,
371 blocking_mode: &BlockingMode,
372 timeout: u64,
373 buffer_size: usize,
374 overlapped: bool,
375 ) -> Result<PipeConnection> {
376 let c_pipe_name = CString::new(pipe_name).unwrap();
377
378 let mut open_mode_flags = PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE;
379 if overlapped {
380 open_mode_flags |= FILE_FLAG_OVERLAPPED
381 }
382
383 // This sets flags so there will be an error if >1 instance (server end)
384 // of this pipe name is opened because we expect exactly one.
385 // SAFETY:
386 // Safe because security attributes are valid, pipe_name is valid C string,
387 // and we're checking the return code
388 let server_handle = unsafe {
389 CreateNamedPipeA(
390 c_pipe_name.as_ptr(),
391 /* dwOpenMode= */
392 open_mode_flags,
393 /* dwPipeMode= */
394 framing_mode.to_pipetype()
395 | framing_mode.to_readmode()
396 | DWORD::from(blocking_mode)
397 | PIPE_REJECT_REMOTE_CLIENTS,
398 /* nMaxInstances= */ 1,
399 /* nOutBufferSize= */ buffer_size as DWORD,
400 /* nInBufferSize= */ buffer_size as DWORD,
401 /* nDefaultTimeOut= */ timeout as DWORD, // Default is 50ms
402 /* lpSecurityAttributes= */
403 SecurityAttributes::new_with_security_descriptor(
404 SelfRelativeSecurityDescriptor::get_singleton(),
405 /* inherit= */ true,
406 )
407 .as_mut(),
408 )
409 };
410
411 if server_handle == INVALID_HANDLE_VALUE {
412 Err(io::Error::last_os_error())
413 } else {
414 // SAFETY: Safe because server_handle is valid.
415 unsafe {
416 Ok(PipeConnection {
417 handle: SafeDescriptor::from_raw_descriptor(server_handle),
418 framing_mode: *framing_mode,
419 blocking_mode: *blocking_mode,
420 })
421 }
422 }
423 }
424
425 /// Creates a PipeConnection for the client end of a named pipe with the given path and pipe
426 /// settings.
427 ///
428 /// The pipe will be set to prevent impersonation of the client by the server process.
429 ///
430 /// # Arguments
431 ///
432 /// * `pipe_name` - The path of the named pipe to create. Should be in the form
433 /// `\\.\pipe\<some-name>`.
434 /// * `framing_mode` - Whether the system should provide a simple byte stream (Byte) or an
435 /// automatically framed sequence of messages (Message). In message mode it's an error to read
436 /// fewer bytes than were sent in a message from the other end of the pipe.
437 /// * `blocking_mode` - Whether the system should wait on read() until data is available (Wait) or
438 /// return immediately if there is nothing available (NoWait).
439 /// * `overlapped` - Sets whether the pipe is opened in overlapped mode.
create_client_pipe( pipe_name: &str, framing_mode: &FramingMode, blocking_mode: &BlockingMode, overlapped: bool, ) -> Result<PipeConnection>440 pub fn create_client_pipe(
441 pipe_name: &str,
442 framing_mode: &FramingMode,
443 blocking_mode: &BlockingMode,
444 overlapped: bool,
445 ) -> Result<PipeConnection> {
446 let client_handle = OpenOptions::new()
447 .read(true)
448 .write(true)
449 .create(true)
450 .security_qos_flags(SECURITY_IDENTIFICATION)
451 .custom_flags(if overlapped { FILE_FLAG_OVERLAPPED } else { 0 })
452 .open(pipe_name)?
453 .into_raw_descriptor();
454
455 let mut client_mode = framing_mode.to_readmode() | DWORD::from(blocking_mode);
456
457 // SAFETY:
458 // Safe because client_handle's open() call did not return an error.
459 unsafe {
460 set_named_pipe_handle_state(client_handle, &mut client_mode)?;
461 }
462
463 Ok(PipeConnection {
464 // SAFETY:
465 // Safe because client_handle is valid
466 handle: unsafe { SafeDescriptor::from_raw_descriptor(client_handle) },
467 framing_mode: *framing_mode,
468 blocking_mode: *blocking_mode,
469 })
470 }
471
472 // This is used to mark types which can be appropriately sent through the
473 // generic helper functions write_to_pipe and read_from_pipe.
474 pub trait PipeSendable {
475 // Default values used to fill in new empty indexes when resizing a buffer to
476 // a larger size.
default() -> Self477 fn default() -> Self;
478 }
479 impl PipeSendable for u8 {
default() -> Self480 fn default() -> Self {
481 0
482 }
483 }
484 impl PipeSendable for RawDescriptor {
default() -> Self485 fn default() -> Self {
486 ptr::null_mut()
487 }
488 }
489
490 impl PipeConnection {
try_clone(&self) -> Result<PipeConnection>491 pub fn try_clone(&self) -> Result<PipeConnection> {
492 let copy_handle = self.handle.try_clone()?;
493 Ok(PipeConnection {
494 handle: copy_handle,
495 framing_mode: self.framing_mode,
496 blocking_mode: self.blocking_mode,
497 })
498 }
499
500 /// Creates a PipeConnection from an existing RawDescriptor, and the underlying the framing &
501 /// blocking modes.
502 ///
503 /// # Safety
504 /// 1. rd is valid and ownership is transferred to this function when it is called.
505 ///
506 /// To avoid undefined behavior, framing_mode & blocking_modes must match those of the
507 /// underlying pipe.
from_raw_descriptor( rd: RawDescriptor, framing_mode: FramingMode, blocking_mode: BlockingMode, ) -> PipeConnection508 pub unsafe fn from_raw_descriptor(
509 rd: RawDescriptor,
510 framing_mode: FramingMode,
511 blocking_mode: BlockingMode,
512 ) -> PipeConnection {
513 PipeConnection {
514 handle: SafeDescriptor::from_raw_descriptor(rd),
515 framing_mode,
516 blocking_mode,
517 }
518 }
519
520 /// Reads bytes from the pipe into the provided buffer, up to the capacity of the buffer.
521 /// Returns the number of bytes (not values) read.
522 ///
523 /// # Safety
524 ///
525 /// This is safe only when the following conditions hold:
526 /// 1. The data on the other end of the pipe is a valid binary representation of data for
527 /// type T, and
528 /// 2. The number of bytes read is a multiple of the size of T; this must be checked by
529 /// the caller.
530 /// If buf's type is file descriptors, this is only safe when those file descriptors are valid
531 /// for the process where this function was called.
read<T: PipeSendable>(&self, buf: &mut [T]) -> Result<usize>532 pub unsafe fn read<T: PipeSendable>(&self, buf: &mut [T]) -> Result<usize> {
533 PipeConnection::read_internal(&self.handle, self.blocking_mode, buf, None)
534 }
535
536 /// Similar to `PipeConnection::read` except it also allows:
537 /// 1. The same end of the named pipe to read and write at the same time in different
538 /// threads.
539 /// 2. Asynchronous read and write (read and write won't block).
540 ///
541 /// When reading, it will not block, but instead an `OVERLAPPED` struct that contains an event
542 /// (can be created with `OverlappedWrapper::new`) will be passed into
543 /// `ReadFile`. That event will be triggered when the read operation is complete.
544 ///
545 /// In order to get how many bytes were read, call `get_overlapped_result`. That function will
546 /// also help with waiting until the read operation is complete.
547 ///
548 /// # Safety
549 ///
550 /// Same as `PipeConnection::read` safety comments. In addition, the pipe MUST be opened in
551 /// overlapped mode otherwise there may be unexpected behavior.
read_overlapped<T: PipeSendable>( &mut self, buf: &mut [T], overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()>552 pub unsafe fn read_overlapped<T: PipeSendable>(
553 &mut self,
554 buf: &mut [T],
555 overlapped_wrapper: &mut OverlappedWrapper,
556 ) -> Result<()> {
557 if overlapped_wrapper.in_use {
558 return Err(std::io::Error::new(
559 std::io::ErrorKind::InvalidInput,
560 "Overlapped struct already in use",
561 ));
562 }
563 overlapped_wrapper.in_use = true;
564
565 PipeConnection::read_internal(
566 &self.handle,
567 self.blocking_mode,
568 buf,
569 Some(&mut overlapped_wrapper.overlapped.0),
570 )?;
571 Ok(())
572 }
573
574 /// Helper for `read_overlapped` and `read`
575 ///
576 /// # Safety
577 /// Comments `read_overlapped` or `read`, depending on which is used.
read_internal<T: PipeSendable>( handle: &SafeDescriptor, blocking_mode: BlockingMode, buf: &mut [T], overlapped: Option<&mut OVERLAPPED>, ) -> Result<usize>578 unsafe fn read_internal<T: PipeSendable>(
579 handle: &SafeDescriptor,
580 blocking_mode: BlockingMode,
581 buf: &mut [T],
582 overlapped: Option<&mut OVERLAPPED>,
583 ) -> Result<usize> {
584 let res = crate::windows::read_file(
585 handle,
586 buf.as_mut_ptr() as *mut u8,
587 mem::size_of_val(buf),
588 overlapped,
589 );
590 match res {
591 Ok(bytes_read) => Ok(bytes_read),
592 // Treat a closed pipe like an EOF.
593 // We check the raw error because `ErrorKind::BrokenPipe` is ambiguous on Windows.
594 Err(e) if e.raw_os_error() == Some(ERROR_BROKEN_PIPE as i32) => Ok(0),
595 Err(e)
596 if blocking_mode == BlockingMode::NoWait
597 && e.raw_os_error() == Some(ERROR_NO_DATA as i32) =>
598 {
599 // A NOWAIT pipe will return ERROR_NO_DATA when no data is available; however,
600 // this code is interpreted as a std::io::ErrorKind::BrokenPipe, which is not
601 // correct. For further details see:
602 // https://docs.microsoft.com/en-us/windows/win32/debug/system-error-codes--0-499-
603 // https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipe-type-read-and-wait-modes
604 Err(std::io::Error::new(std::io::ErrorKind::WouldBlock, e))
605 }
606 Err(e) => Err(e),
607 }
608 }
609
610 /// Blockingly reads a `buf` bytes from the pipe. The blocking read can be interrupted
611 /// by an event on `exit_event`.
read_overlapped_blocking<T: PipeSendable>( &mut self, buf: &mut [T], overlapped_wrapper: &mut OverlappedWrapper, exit_event: &Event, ) -> Result<()>612 pub fn read_overlapped_blocking<T: PipeSendable>(
613 &mut self,
614 buf: &mut [T],
615 overlapped_wrapper: &mut OverlappedWrapper,
616 exit_event: &Event,
617 ) -> Result<()> {
618 // SAFETY:
619 // Safe because we are providing a valid buffer slice and also providing a valid
620 // overlapped struct.
621 match unsafe { self.read_overlapped(buf, overlapped_wrapper) } {
622 // More data isn't necessarily an error as long as we've filled the provided buffer,
623 // as is checked later in this function.
624 Err(e) if e.raw_os_error() == Some(ERROR_MORE_DATA as i32) => Ok(()),
625 Err(e) => Err(e),
626 Ok(()) => Ok(()),
627 }?;
628
629 #[derive(EventToken)]
630 enum Token {
631 ReadOverlapped,
632 Exit,
633 }
634
635 let wait_ctx = WaitContext::build_with(&[
636 (
637 overlapped_wrapper.get_h_event_ref().unwrap(),
638 Token::ReadOverlapped,
639 ),
640 (exit_event, Token::Exit),
641 ])?;
642
643 let events = wait_ctx.wait()?;
644 for event in events {
645 match event.token {
646 Token::ReadOverlapped => {
647 let size_read_in_bytes =
648 self.get_overlapped_result(overlapped_wrapper)? as usize;
649
650 // If this error shows, most likely the overlapped named pipe was set up
651 // incorrectly.
652 if size_read_in_bytes != buf.len() {
653 return Err(std::io::Error::new(
654 std::io::ErrorKind::UnexpectedEof,
655 "Short read",
656 ));
657 }
658 }
659 Token::Exit => {
660 return Err(std::io::Error::new(
661 std::io::ErrorKind::Interrupted,
662 "IO canceled on exit request",
663 ));
664 }
665 }
666 }
667
668 Ok(())
669 }
670
671 /// Gets the size in bytes of data in the pipe.
672 ///
673 /// Note that PeekNamedPipes (the underlying win32 API) will return zero if the packets have
674 /// not finished writing on the producer side.
get_available_byte_count(&self) -> io::Result<u32>675 pub fn get_available_byte_count(&self) -> io::Result<u32> {
676 let mut total_bytes_avail: DWORD = 0;
677
678 // SAFETY:
679 // Safe because the underlying pipe handle is guaranteed to be open, and the output values
680 // live at valid memory locations.
681 fail_if_zero!(unsafe {
682 PeekNamedPipe(
683 self.as_raw_descriptor(),
684 ptr::null_mut(),
685 0,
686 ptr::null_mut(),
687 &mut total_bytes_avail,
688 ptr::null_mut(),
689 )
690 });
691
692 Ok(total_bytes_avail)
693 }
694
695 /// Writes the bytes from a slice into the pipe. Returns the number of bytes written, which
696 /// callers should check to ensure that it was the number expected.
write<T: PipeSendable>(&self, buf: &[T]) -> Result<usize>697 pub fn write<T: PipeSendable>(&self, buf: &[T]) -> Result<usize> {
698 // SAFETY: overlapped is None so this is safe.
699 unsafe { PipeConnection::write_internal(&self.handle, buf, None) }
700 }
701
702 /// Similar to `PipeConnection::write` except it also allows:
703 /// 1. The same end of the named pipe to read and write at the same time in different
704 /// threads.
705 /// 2. Asynchronous read and write (read and write won't block).
706 ///
707 /// When writing, it will not block, but instead an `OVERLAPPED` struct that contains an event
708 /// (can be created with `OverlappedWrapper::new`) will be passed into
709 /// `WriteFile`. That event will be triggered when the write operation is complete.
710 ///
711 /// In order to get how many bytes were written, call `get_overlapped_result`. That function
712 /// will also help with waiting until the write operation is complete. The pipe must be
713 /// opened in overlapped otherwise there may be unexpected behavior.
714 ///
715 /// # Safety
716 /// * buf & overlapped_wrapper MUST live until the overlapped operation is complete.
write_overlapped<T: PipeSendable>( &mut self, buf: &[T], overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()>717 pub unsafe fn write_overlapped<T: PipeSendable>(
718 &mut self,
719 buf: &[T],
720 overlapped_wrapper: &mut OverlappedWrapper,
721 ) -> Result<()> {
722 if overlapped_wrapper.in_use {
723 return Err(std::io::Error::new(
724 std::io::ErrorKind::InvalidInput,
725 "Overlapped struct already in use",
726 ));
727 }
728 overlapped_wrapper.in_use = true;
729
730 PipeConnection::write_internal(
731 &self.handle,
732 buf,
733 Some(&mut overlapped_wrapper.overlapped.0),
734 )?;
735 Ok(())
736 }
737
738 /// Helper for `write_overlapped` and `write`.
739 ///
740 /// # Safety
741 /// * Safe if overlapped is None.
742 /// * Safe if overlapped is Some and:
743 /// + buf lives until the overlapped operation is complete.
744 /// + overlapped lives until the overlapped operation is complete.
write_internal<T: PipeSendable>( handle: &SafeDescriptor, buf: &[T], overlapped: Option<&mut OVERLAPPED>, ) -> Result<usize>745 unsafe fn write_internal<T: PipeSendable>(
746 handle: &SafeDescriptor,
747 buf: &[T],
748 overlapped: Option<&mut OVERLAPPED>,
749 ) -> Result<usize> {
750 // SAFETY:
751 // Safe because buf points to memory valid until the write completes and we pass a valid
752 // length for that memory.
753 unsafe {
754 crate::windows::write_file(
755 handle,
756 buf.as_ptr() as *const u8,
757 mem::size_of_val(buf),
758 overlapped,
759 )
760 }
761 }
762
763 /// Sets the blocking mode on the pipe.
set_blocking(&mut self, blocking_mode: &BlockingMode) -> io::Result<()>764 pub fn set_blocking(&mut self, blocking_mode: &BlockingMode) -> io::Result<()> {
765 let mut client_mode = DWORD::from(blocking_mode) | self.framing_mode.to_readmode();
766 self.blocking_mode = *blocking_mode;
767
768 // SAFETY:
769 // Safe because the pipe has not been closed (it is managed by this object).
770 unsafe { set_named_pipe_handle_state(self.handle.as_raw_descriptor(), &mut client_mode) }
771 }
772
773 /// For a server named pipe, waits for a client to connect (blocking).
wait_for_client_connection(&self) -> Result<()>774 pub fn wait_for_client_connection(&self) -> Result<()> {
775 let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event = */ true)?;
776 self.wait_for_client_connection_internal(
777 &mut overlapped_wrapper,
778 /* should_block = */ true,
779 )
780 }
781
782 /// Interruptable blocking wait for a client to connect.
wait_for_client_connection_overlapped_blocking( &mut self, exit_event: &Event, ) -> Result<()>783 pub fn wait_for_client_connection_overlapped_blocking(
784 &mut self,
785 exit_event: &Event,
786 ) -> Result<()> {
787 let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event = */ true)?;
788 self.wait_for_client_connection_internal(
789 &mut overlapped_wrapper,
790 /* should_block = */ false,
791 )?;
792
793 #[derive(EventToken)]
794 enum Token {
795 Connected,
796 Exit,
797 }
798
799 let wait_ctx = WaitContext::build_with(&[
800 (
801 overlapped_wrapper.get_h_event_ref().unwrap(),
802 Token::Connected,
803 ),
804 (exit_event, Token::Exit),
805 ])?;
806
807 let events = wait_ctx.wait()?;
808 if let Some(event) = events.into_iter().next() {
809 return match event.token {
810 Token::Connected => Ok(()),
811 Token::Exit => {
812 // We must cancel IO here because it is unsafe to free the overlapped wrapper
813 // while the IO operation is active.
814 self.cancel_io()?;
815
816 Err(std::io::Error::new(
817 std::io::ErrorKind::Interrupted,
818 "IO canceled on exit request",
819 ))
820 }
821 };
822 }
823 unreachable!("wait cannot return Ok with zero events");
824 }
825
826 /// For a server named pipe, waits for a client to connect using the given overlapped wrapper
827 /// to signal connection.
wait_for_client_connection_overlapped( &self, overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()>828 pub fn wait_for_client_connection_overlapped(
829 &self,
830 overlapped_wrapper: &mut OverlappedWrapper,
831 ) -> Result<()> {
832 self.wait_for_client_connection_internal(
833 overlapped_wrapper,
834 /* should_block = */ false,
835 )
836 }
837
wait_for_client_connection_internal( &self, overlapped_wrapper: &mut OverlappedWrapper, should_block: bool, ) -> Result<()>838 fn wait_for_client_connection_internal(
839 &self,
840 overlapped_wrapper: &mut OverlappedWrapper,
841 should_block: bool,
842 ) -> Result<()> {
843 // SAFETY:
844 // Safe because the handle is valid and we're checking the return
845 // code according to the documentation
846 //
847 // TODO(b/279669296) this safety statement is incomplete, and as such incorrect in one case:
848 // overlapped_wrapper must live until the overlapped operation is complete; however,
849 // if should_block is false, nothing guarantees that lifetime and so overlapped_wrapper
850 // could be freed while the operation is still running.
851 unsafe {
852 let success_flag = ConnectNamedPipe(
853 self.as_raw_descriptor(),
854 // Note: The overlapped structure is only used if the pipe was opened in
855 // OVERLAPPED mode, but is necessary in that case.
856 &mut *overlapped_wrapper.overlapped.0,
857 );
858 if success_flag == 0 {
859 return match GetLastError() {
860 ERROR_PIPE_CONNECTED => {
861 if !should_block {
862 // If async, make sure the event is signalled to indicate the client
863 // is ready.
864 overlapped_wrapper.get_h_event_ref().unwrap().signal()?;
865 }
866
867 Ok(())
868 }
869 ERROR_IO_PENDING => {
870 if should_block {
871 overlapped_wrapper.get_h_event_ref().unwrap().wait()?;
872 }
873 Ok(())
874 }
875 err => Err(io::Error::from_raw_os_error(err as i32)),
876 };
877 }
878 }
879 Ok(())
880 }
881
882 /// Used for overlapped read and write operations.
883 ///
884 /// This will block until the ReadFile or WriteFile operation that also took in
885 /// `overlapped_wrapper` is complete, assuming `overlapped_wrapper` was created from
886 /// `OverlappedWrapper::new` or that `OVERLAPPED.hEvent` is set. This will also get
887 /// the number of bytes that were read or written.
get_overlapped_result( &mut self, overlapped_wrapper: &mut OverlappedWrapper, ) -> io::Result<u32>888 pub fn get_overlapped_result(
889 &mut self,
890 overlapped_wrapper: &mut OverlappedWrapper,
891 ) -> io::Result<u32> {
892 let res = self.get_overlapped_result_internal(overlapped_wrapper, /* wait= */ true);
893 overlapped_wrapper.in_use = false;
894 res
895 }
896
897 /// Used for overlapped read and write operations.
898 ///
899 /// This will return immediately, regardless of the completion status of the
900 /// ReadFile or WriteFile operation that took in `overlapped_wrapper`,
901 /// assuming `overlapped_wrapper` was created from `OverlappedWrapper::new`
902 /// or that `OVERLAPPED.hEvent` is set. This will also get the number of bytes
903 /// that were read or written, if completed. If the operation hasn't
904 /// completed, an error of kind `io::ErrorKind::WouldBlock` will be
905 /// returned.
try_get_overlapped_result( &mut self, overlapped_wrapper: &mut OverlappedWrapper, ) -> io::Result<u32>906 pub fn try_get_overlapped_result(
907 &mut self,
908 overlapped_wrapper: &mut OverlappedWrapper,
909 ) -> io::Result<u32> {
910 let res = self.get_overlapped_result_internal(overlapped_wrapper, /* wait= */ false);
911 match res {
912 Err(err) if err.raw_os_error().unwrap() as u32 == ERROR_IO_INCOMPLETE => {
913 Err(io::Error::new(io::ErrorKind::WouldBlock, err))
914 }
915 _ => {
916 overlapped_wrapper.in_use = false;
917 res
918 }
919 }
920 }
921
get_overlapped_result_internal( &mut self, overlapped_wrapper: &mut OverlappedWrapper, wait: bool, ) -> io::Result<u32>922 fn get_overlapped_result_internal(
923 &mut self,
924 overlapped_wrapper: &mut OverlappedWrapper,
925 wait: bool,
926 ) -> io::Result<u32> {
927 if !overlapped_wrapper.in_use {
928 return Err(std::io::Error::new(
929 std::io::ErrorKind::InvalidInput,
930 "Overlapped struct is not in use",
931 ));
932 }
933 let mut size_transferred = 0;
934 // SAFETY:
935 // Safe as long as `overlapped_struct` isn't copied and also contains a valid event.
936 // Also the named pipe handle must created with `FILE_FLAG_OVERLAPPED`.
937 fail_if_zero!(unsafe {
938 GetOverlappedResult(
939 self.handle.as_raw_descriptor(),
940 &mut *overlapped_wrapper.overlapped.0,
941 &mut size_transferred,
942 if wait { TRUE } else { FALSE },
943 )
944 });
945
946 Ok(size_transferred)
947 }
948
949 /// Cancels I/O Operations in the current process. Since `lpOverlapped` is null, this will
950 /// cancel all I/O requests for the file handle passed in.
cancel_io(&mut self) -> Result<()>951 pub fn cancel_io(&mut self) -> Result<()> {
952 fail_if_zero!(
953 // SAFETY: descriptor is valid and the return value is checked.
954 unsafe {
955 CancelIoEx(
956 self.handle.as_raw_descriptor(),
957 /* lpOverlapped= */ std::ptr::null_mut(),
958 )
959 }
960 );
961
962 Ok(())
963 }
964
965 /// Get the framing mode of the pipe.
get_framing_mode(&self) -> FramingMode966 pub fn get_framing_mode(&self) -> FramingMode {
967 self.framing_mode
968 }
969
970 /// Returns metadata about the connected NamedPipe.
get_info(&self) -> Result<NamedPipeInfo>971 pub fn get_info(&self) -> Result<NamedPipeInfo> {
972 let mut flags: u32 = 0;
973 let mut incoming_buffer_size: u32 = 0;
974 let mut outgoing_buffer_size: u32 = 0;
975 let mut max_instances: u32 = 0;
976 // SAFETY: all pointers are valid
977 fail_if_zero!(unsafe {
978 GetNamedPipeInfo(
979 self.as_raw_descriptor(),
980 &mut flags,
981 &mut outgoing_buffer_size,
982 &mut incoming_buffer_size,
983 &mut max_instances,
984 )
985 });
986
987 Ok(NamedPipeInfo {
988 outgoing_buffer_size,
989 incoming_buffer_size,
990 max_instances,
991 flags,
992 })
993 }
994
995 /// For a server pipe, flush the pipe contents. This will
996 /// block until the pipe is cleared by the client. Only
997 /// call this if you are sure the client is reading the
998 /// data!
flush_data_blocking(&self) -> Result<()>999 pub fn flush_data_blocking(&self) -> Result<()> {
1000 // SAFETY:
1001 // Safe because the only buffers interacted with are
1002 // outside of Rust memory
1003 fail_if_zero!(unsafe { FlushFileBuffers(self.as_raw_descriptor()) });
1004 Ok(())
1005 }
1006
1007 /// For a server pipe, disconnect all clients, discarding any buffered data.
disconnect_clients(&self) -> Result<()>1008 pub fn disconnect_clients(&self) -> Result<()> {
1009 // SAFETY:
1010 // Safe because we own the handle passed in and know it will remain valid for the duration
1011 // of the call. Discarded buffers are not managed by rust.
1012 fail_if_zero!(unsafe { DisconnectNamedPipe(self.as_raw_descriptor()) });
1013 Ok(())
1014 }
1015 }
1016
1017 impl AsRawDescriptor for PipeConnection {
as_raw_descriptor(&self) -> RawDescriptor1018 fn as_raw_descriptor(&self) -> RawDescriptor {
1019 self.handle.as_raw_descriptor()
1020 }
1021 }
1022
1023 impl IntoRawDescriptor for PipeConnection {
into_raw_descriptor(self) -> RawDescriptor1024 fn into_raw_descriptor(self) -> RawDescriptor {
1025 self.handle.into_raw_descriptor()
1026 }
1027 }
1028
1029 // SAFETY: Send safety is ensured by inner fields.
1030 unsafe impl Send for PipeConnection {}
1031 // SAFETY: Sync safety is ensured by inner fields.
1032 unsafe impl Sync for PipeConnection {}
1033
1034 impl io::Read for PipeConnection {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>1035 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
1036 // SAFETY:
1037 // This is safe because PipeConnection::read is always safe for u8
1038 unsafe { PipeConnection::read(self, buf) }
1039 }
1040 }
1041
1042 impl io::Write for PipeConnection {
write(&mut self, buf: &[u8]) -> io::Result<usize>1043 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1044 PipeConnection::write(self, buf)
1045 }
1046
flush(&mut self) -> io::Result<()>1047 fn flush(&mut self) -> io::Result<()> {
1048 Ok(())
1049 }
1050 }
1051
1052 /// A simple data struct representing
1053 /// metadata about a NamedPipe.
1054 #[derive(Debug, PartialEq, Eq)]
1055 pub struct NamedPipeInfo {
1056 pub outgoing_buffer_size: u32,
1057 pub incoming_buffer_size: u32,
1058 pub max_instances: u32,
1059 pub flags: u32,
1060 }
1061
1062 /// This is a wrapper around PipeConnection. This allows a read and a write operations
1063 /// to run in parallel but not multiple reads or writes in parallel.
1064 ///
1065 /// Reason: The message from/to service are two-parts - a fixed size header that
1066 /// contains the size of the actual message. By allowing only one write at a time
1067 /// we ensure that the variable size message is written/read right after writing/reading
1068 /// fixed size header. For example it avoid sending or receiving in messages in order like
1069 /// H1, H2, M1, M2
1070 /// - where header H1 and its message M1 are sent by one event loop and H2 and its message M2 are
1071 /// sent by another event loop.
1072 ///
1073 /// Do not expose direct access to reader or writer pipes.
1074 ///
1075 /// The struct is clone-able so that different event loops can talk to the other end.
1076 #[derive(Clone)]
1077 pub struct MultiPartMessagePipe {
1078 // Lock protected pipe to receive messages.
1079 reader: Arc<Mutex<PipeConnection>>,
1080 // Lock protected pipe to send messages.
1081 writer: Arc<Mutex<PipeConnection>>,
1082 // Whether this end is created as server or client. The variable helps to
1083 // decide if something meanigful should be done when `wait_for_connection` is called.
1084 is_server: bool,
1085 // Always true if pipe is created as client.
1086 // Defaults to false on server. Updated to true on calling `wait_for_connection`
1087 // after a client connects.
1088 is_connected: Arc<AtomicBool>,
1089 }
1090
1091 impl MultiPartMessagePipe {
create_from_pipe(pipe: PipeConnection, is_server: bool) -> Result<Self>1092 fn create_from_pipe(pipe: PipeConnection, is_server: bool) -> Result<Self> {
1093 Ok(Self {
1094 reader: Arc::new(Mutex::new(pipe.try_clone()?)),
1095 writer: Arc::new(Mutex::new(pipe)),
1096 is_server,
1097 is_connected: Arc::new(AtomicBool::new(false)),
1098 })
1099 }
1100
1101 /// Create server side of MutiPartMessagePipe.
1102 /// # Safety
1103 /// `pipe` must be a server named pipe.
1104 #[deny(unsafe_op_in_unsafe_fn)]
create_from_server_pipe(pipe: PipeConnection) -> Result<Self>1105 pub unsafe fn create_from_server_pipe(pipe: PipeConnection) -> Result<Self> {
1106 Self::create_from_pipe(pipe, true)
1107 }
1108
1109 /// Create client side of MutiPartMessagePipe.
create_as_client(pipe_name: &str) -> Result<Self>1110 pub fn create_as_client(pipe_name: &str) -> Result<Self> {
1111 let pipe = create_client_pipe(
1112 &format!(r"\\.\pipe\{}", pipe_name),
1113 &FramingMode::Message,
1114 &BlockingMode::Wait,
1115 /* overlapped= */ true,
1116 )?;
1117 Self::create_from_pipe(pipe, false)
1118 }
1119
1120 /// Create server side of MutiPartMessagePipe.
create_as_server(pipe_name: &str) -> Result<Self>1121 pub fn create_as_server(pipe_name: &str) -> Result<Self> {
1122 let pipe = create_server_pipe(
1123 &format!(r"\\.\pipe\{}", pipe_name,),
1124 &FramingMode::Message,
1125 &BlockingMode::Wait,
1126 0,
1127 1024 * 1024,
1128 true,
1129 )?;
1130 // SAFETY: `pipe` is a server named pipe.
1131 unsafe { Self::create_from_server_pipe(pipe) }
1132 }
1133
1134 /// If the struct is created as a server then waits for client connection to arrive.
1135 /// It only waits on reader as reader and writer are clones.
wait_for_connection(&self) -> Result<()>1136 pub fn wait_for_connection(&self) -> Result<()> {
1137 if self.is_server && !self.is_connected.load(Ordering::Relaxed) {
1138 self.reader.lock().wait_for_client_connection()?;
1139 self.is_connected.store(true, Ordering::Relaxed);
1140 }
1141 Ok(())
1142 }
1143
write_overlapped_blocking_message_internal<T: PipeSendable>( pipe: &mut PipeConnection, buf: &[T], overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()>1144 fn write_overlapped_blocking_message_internal<T: PipeSendable>(
1145 pipe: &mut PipeConnection,
1146 buf: &[T],
1147 overlapped_wrapper: &mut OverlappedWrapper,
1148 ) -> Result<()> {
1149 // Safety:
1150 // `buf` and `overlapped_wrapper` will be in use for the duration of
1151 // the overlapped operation. These must not be reused and must live until
1152 // after `get_overlapped_result()` has been called which is done right
1153 // after this call.
1154 unsafe {
1155 pipe.write_overlapped(buf, overlapped_wrapper)?;
1156 }
1157
1158 let size_written_in_bytes = pipe.get_overlapped_result(overlapped_wrapper)?;
1159
1160 if size_written_in_bytes as usize != buf.len() {
1161 return Err(std::io::Error::new(
1162 std::io::ErrorKind::UnexpectedEof,
1163 format!(
1164 "Short write expected:{} found:{}",
1165 size_written_in_bytes,
1166 buf.len(),
1167 ),
1168 ));
1169 }
1170 Ok(())
1171 }
1172 /// Sends, blockingly,`buf` over the pipe in its entirety. Partial write is considered
write_overlapped_blocking_message<T: PipeSendable>( &self, header: &[T], message: &[T], overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()>1173 pub fn write_overlapped_blocking_message<T: PipeSendable>(
1174 &self,
1175 header: &[T],
1176 message: &[T],
1177 overlapped_wrapper: &mut OverlappedWrapper,
1178 ) -> Result<()> {
1179 let mut writer = self.writer.lock();
1180 Self::write_overlapped_blocking_message_internal(&mut writer, header, overlapped_wrapper)?;
1181 Self::write_overlapped_blocking_message_internal(&mut writer, message, overlapped_wrapper)
1182 }
1183
1184 /// Reads a variable size message and returns the message on success.
1185 /// The size of the message is expected to proceed the message in
1186 /// the form of `header_size` message.
1187 ///
1188 /// `parse_message_size` lets caller parse the header to extract
1189 /// message size.
1190 ///
1191 /// Event on `exit_event` is used to interrupt the blocked read.
read_overlapped_blocking_message<F: FnOnce(&[u8]) -> usize>( &self, header_size: usize, parse_message_size: F, overlapped_wrapper: &mut OverlappedWrapper, exit_event: &Event, ) -> Result<Vec<u8>>1192 pub fn read_overlapped_blocking_message<F: FnOnce(&[u8]) -> usize>(
1193 &self,
1194 header_size: usize,
1195 parse_message_size: F,
1196 overlapped_wrapper: &mut OverlappedWrapper,
1197 exit_event: &Event,
1198 ) -> Result<Vec<u8>> {
1199 let mut pipe = self.reader.lock();
1200 let mut header = vec![0; header_size];
1201 header.resize_with(header_size, Default::default);
1202 pipe.read_overlapped_blocking(&mut header, overlapped_wrapper, exit_event)?;
1203 let message_size = parse_message_size(&header);
1204 if message_size == 0 {
1205 return Ok(vec![]);
1206 }
1207 let mut buf = vec![];
1208 buf.resize_with(message_size, Default::default);
1209 pipe.read_overlapped_blocking(&mut buf, overlapped_wrapper, exit_event)?;
1210 Ok(buf)
1211 }
1212
1213 /// Returns the inner named pipe if the current struct is the sole owner of the underlying
1214 /// named pipe.
1215 ///
1216 /// Otherwise, [`None`] is returned and the struct is dropped.
1217 ///
1218 /// Note that this has a similar race condition like [`Arc::try_unwrap`]: if multiple threads
1219 /// call this function simultaneously on the same clone of [`MultiPartMessagePipe`], it is
1220 /// possible that all of them will result in [`None`]. This is Due to Rust version
1221 /// restriction(1.68.2) when this function is introduced). This race condition can be resolved
1222 /// once we upgrade to 1.70.0 or higher by using [`Arc::into_inner`].
1223 ///
1224 /// If the underlying named pipe is a server named pipe, this method allows the caller to
1225 /// terminate the connection by first flushing the named pipe then disconnecting the clients
1226 /// idiomatically per
1227 /// https://learn.microsoft.com/en-us/windows/win32/ipc/named-pipe-operations#:~:text=When%20a%20client,of%20the%20pipe.
into_inner_pipe(self) -> Option<PipeConnection>1228 pub fn into_inner_pipe(self) -> Option<PipeConnection> {
1229 let piper = Arc::clone(&self.reader);
1230 drop(self);
1231 Arc::try_unwrap(piper).ok().map(Mutex::into_inner)
1232 }
1233 }
1234
1235 impl TryFrom<PipeConnection> for MultiPartMessagePipe {
1236 type Error = std::io::Error;
try_from(pipe: PipeConnection) -> Result<Self>1237 fn try_from(pipe: PipeConnection) -> Result<Self> {
1238 Self::create_from_pipe(pipe, false)
1239 }
1240 }
1241
1242 #[cfg(test)]
1243 mod tests {
1244 use std::mem::size_of;
1245 use std::thread::JoinHandle;
1246 use std::time::Duration;
1247
1248 use super::*;
1249
1250 #[test]
duplex_pipe_stream()1251 fn duplex_pipe_stream() {
1252 let (p1, p2) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
1253
1254 // Test both forward and reverse direction since the underlying APIs are a bit asymmetrical
1255 // SAFETY: trivially safe with pipe created and return value checked.
1256 unsafe {
1257 for (dir, sender, receiver) in [("1 -> 2", &p1, &p2), ("2 -> 1", &p2, &p1)].iter() {
1258 println!("{}", dir);
1259
1260 sender.write(&[75, 77, 54, 82, 76, 65]).unwrap();
1261
1262 // Smaller than what we sent so we get multiple chunks
1263 let mut recv_buffer: [u8; 4] = [0; 4];
1264
1265 let mut size = receiver.read(&mut recv_buffer).unwrap();
1266 assert_eq!(size, 4);
1267 assert_eq!(recv_buffer, [75, 77, 54, 82]);
1268
1269 size = receiver.read(&mut recv_buffer).unwrap();
1270 assert_eq!(size, 2);
1271 assert_eq!(recv_buffer[0..2], [76, 65]);
1272 }
1273 }
1274 }
1275
1276 #[test]
available_byte_count_byte_mode()1277 fn available_byte_count_byte_mode() {
1278 let (p1, p2) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
1279 p1.write(&[1, 23, 45]).unwrap();
1280 assert_eq!(p2.get_available_byte_count().unwrap(), 3);
1281
1282 // PeekNamedPipe should NOT touch the data in the pipe. So if we call it again, it should
1283 // yield the same value.
1284 assert_eq!(p2.get_available_byte_count().unwrap(), 3);
1285 }
1286
1287 #[test]
available_byte_count_message_mode()1288 fn available_byte_count_message_mode() {
1289 let (p1, p2) = pair(&FramingMode::Message, &BlockingMode::Wait, 0).unwrap();
1290 p1.write(&[1, 23, 45]).unwrap();
1291 assert_eq!(p2.get_available_byte_count().unwrap(), 3);
1292
1293 // PeekNamedPipe should NOT touch the data in the pipe. So if we call it again, it should
1294 // yield the same value.
1295 assert_eq!(p2.get_available_byte_count().unwrap(), 3);
1296 }
1297
1298 #[test]
available_byte_count_message_mode_multiple_messages()1299 fn available_byte_count_message_mode_multiple_messages() {
1300 let (p1, p2) = pair(&FramingMode::Message, &BlockingMode::Wait, 0).unwrap();
1301 p1.write(&[1, 2, 3]).unwrap();
1302 p1.write(&[4, 5]).unwrap();
1303 assert_eq!(p2.get_available_byte_count().unwrap(), 5);
1304 }
1305
1306 #[test]
duplex_pipe_message()1307 fn duplex_pipe_message() {
1308 let (p1, p2) = pair(&FramingMode::Message, &BlockingMode::Wait, 0).unwrap();
1309
1310 // Test both forward and reverse direction since the underlying APIs are a bit asymmetrical
1311 // SAFETY: trivially safe with pipe created and return value checked.
1312 unsafe {
1313 for (dir, sender, receiver) in [("1 -> 2", &p1, &p2), ("2 -> 1", &p2, &p1)].iter() {
1314 println!("{}", dir);
1315
1316 // Send 2 messages so that we can check that message framing works
1317 sender.write(&[1, 23, 45]).unwrap();
1318 sender.write(&[67, 89, 10]).unwrap();
1319
1320 let mut recv_buffer: [u8; 5] = [0; 5]; // Larger than required for messages
1321
1322 let mut size = receiver.read(&mut recv_buffer).unwrap();
1323 assert_eq!(size, 3);
1324 assert_eq!(recv_buffer[0..3], [1, 23, 45]);
1325
1326 size = receiver.read(&mut recv_buffer).unwrap();
1327 assert_eq!(size, 3);
1328 assert_eq!(recv_buffer[0..3], [67, 89, 10]);
1329 }
1330 }
1331 }
1332
1333 #[cfg(test)]
duplex_nowait_helper(p1: &PipeConnection, p2: &PipeConnection)1334 fn duplex_nowait_helper(p1: &PipeConnection, p2: &PipeConnection) {
1335 let mut recv_buffer: [u8; 1] = [0; 1];
1336
1337 // Test both forward and reverse direction since the underlying APIs are a bit asymmetrical
1338 // SAFETY: trivially safe with PipeConnection created and return value checked.
1339 unsafe {
1340 for (dir, sender, receiver) in [("1 -> 2", &p1, &p2), ("2 -> 1", &p2, &p1)].iter() {
1341 println!("{}", dir);
1342 sender.write(&[1]).unwrap();
1343 assert_eq!(receiver.read(&mut recv_buffer).unwrap(), 1); // Should succeed!
1344 assert_eq!(
1345 receiver.read(&mut recv_buffer).unwrap_err().kind(),
1346 std::io::ErrorKind::WouldBlock
1347 );
1348 }
1349 }
1350 }
1351
1352 #[test]
duplex_nowait()1353 fn duplex_nowait() {
1354 let (p1, p2) = pair(&FramingMode::Byte, &BlockingMode::NoWait, 0).unwrap();
1355 duplex_nowait_helper(&p1, &p2);
1356 }
1357
1358 #[test]
duplex_nowait_set_after_creation()1359 fn duplex_nowait_set_after_creation() {
1360 // Tests non blocking setting after pipe creation
1361 let (mut p1, mut p2) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
1362 p1.set_blocking(&BlockingMode::NoWait)
1363 .expect("Failed to set blocking mode on pipe p1");
1364 p2.set_blocking(&BlockingMode::NoWait)
1365 .expect("Failed to set blocking mode on pipe p2");
1366 duplex_nowait_helper(&p1, &p2);
1367 }
1368
1369 #[test]
duplex_overlapped()1370 fn duplex_overlapped() {
1371 let pipe_name = generate_pipe_name();
1372
1373 let mut p1 = create_server_pipe(
1374 &pipe_name,
1375 &FramingMode::Message,
1376 &BlockingMode::Wait,
1377 /* timeout= */ 0,
1378 /* buffer_size= */ 1000,
1379 /* overlapped= */ true,
1380 )
1381 .unwrap();
1382
1383 let mut p2 = create_client_pipe(
1384 &pipe_name,
1385 &FramingMode::Message,
1386 &BlockingMode::Wait,
1387 /* overlapped= */ true,
1388 )
1389 .unwrap();
1390
1391 // SAFETY:
1392 // Safe because `read_overlapped` can be called since overlapped struct is created.
1393 unsafe {
1394 let mut p1_overlapped_wrapper =
1395 OverlappedWrapper::new(/* include_event= */ true).unwrap();
1396 p1.write_overlapped(&[75, 77, 54, 82, 76, 65], &mut p1_overlapped_wrapper)
1397 .unwrap();
1398 let size = p1
1399 .get_overlapped_result(&mut p1_overlapped_wrapper)
1400 .unwrap();
1401 assert_eq!(size, 6);
1402
1403 let mut recv_buffer: [u8; 6] = [0; 6];
1404
1405 let mut p2_overlapped_wrapper =
1406 OverlappedWrapper::new(/* include_event= */ true).unwrap();
1407 p2.read_overlapped(&mut recv_buffer, &mut p2_overlapped_wrapper)
1408 .unwrap();
1409 let size = p2
1410 .get_overlapped_result(&mut p2_overlapped_wrapper)
1411 .unwrap();
1412 assert_eq!(size, 6);
1413 assert_eq!(recv_buffer, [75, 77, 54, 82, 76, 65]);
1414 }
1415 }
1416
1417 #[test]
duplex_overlapped_test_in_use()1418 fn duplex_overlapped_test_in_use() {
1419 let pipe_name = generate_pipe_name();
1420
1421 let mut p1 = create_server_pipe(
1422 &pipe_name,
1423 &FramingMode::Message,
1424 &BlockingMode::Wait,
1425 /* timeout= */ 0,
1426 /* buffer_size= */ 1000,
1427 /* overlapped= */ true,
1428 )
1429 .unwrap();
1430
1431 let mut p2 = create_client_pipe(
1432 &pipe_name,
1433 &FramingMode::Message,
1434 &BlockingMode::Wait,
1435 /* overlapped= */ true,
1436 )
1437 .unwrap();
1438 let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event= */ true).unwrap();
1439
1440 let res = p1.get_overlapped_result(&mut overlapped_wrapper);
1441 assert!(res.is_err());
1442
1443 let data = vec![75, 77, 54, 82, 76, 65];
1444 // SAFETY: safe because: data & overlapped wrapper live until the
1445 // operation is verified completed below.
1446 let res = unsafe { p1.write_overlapped(&data, &mut overlapped_wrapper) };
1447 assert!(res.is_ok());
1448
1449 let res =
1450 // SAFETY: safe because we know the unsafe re-use of overlapped wrapper
1451 // will error out.
1452 unsafe { p2.write_overlapped(&[75, 77, 54, 82, 76, 65], &mut overlapped_wrapper) };
1453 assert!(res.is_err());
1454
1455 let mut recv_buffer: [u8; 6] = [0; 6];
1456 // SAFETY: safe because we know the unsafe re-use of overlapped wrapper
1457 // will error out.
1458 let res = unsafe { p2.read_overlapped(&mut recv_buffer, &mut overlapped_wrapper) };
1459 assert!(res.is_err());
1460
1461 let res = p1.get_overlapped_result(&mut overlapped_wrapper);
1462 assert!(res.is_ok());
1463
1464 let mut recv_buffer: [u8; 6] = [0; 6];
1465 // SAFETY: safe because recv_buffer & overlapped_wrapper live until the
1466 // operation is verified completed below.
1467 let res = unsafe { p2.read_overlapped(&mut recv_buffer, &mut overlapped_wrapper) };
1468 assert!(res.is_ok());
1469 let res = p2.get_overlapped_result(&mut overlapped_wrapper);
1470 assert!(res.is_ok());
1471 }
1472
generate_pipe_name() -> String1473 fn generate_pipe_name() -> String {
1474 format!(
1475 r"\\.\pipe\test-ipc-pipe-name.rand{}",
1476 rand::thread_rng().gen::<u64>(),
1477 )
1478 }
1479
send_receive_msgs(pipe: MultiPartMessagePipe, msg_count: u32) -> JoinHandle<()>1480 fn send_receive_msgs(pipe: MultiPartMessagePipe, msg_count: u32) -> JoinHandle<()> {
1481 let messages = ["a", "bb", "ccc", "dddd", "eeeee", "ffffff"];
1482 std::thread::spawn(move || {
1483 let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event= */ true).unwrap();
1484 let exit_event = Event::new().unwrap();
1485 for _i in 0..msg_count {
1486 let message = *messages
1487 .get(rand::thread_rng().gen::<usize>() % messages.len())
1488 .unwrap();
1489 pipe.write_overlapped_blocking_message(
1490 &message.len().to_be_bytes(),
1491 message.as_bytes(),
1492 &mut overlapped_wrapper,
1493 )
1494 .unwrap();
1495 }
1496 for _i in 0..msg_count {
1497 let message = pipe
1498 .read_overlapped_blocking_message(
1499 size_of::<usize>(),
1500 |bytes: &[u8]| {
1501 assert_eq!(bytes.len(), size_of::<usize>());
1502 usize::from_be_bytes(
1503 bytes.try_into().expect("failed to get array from slice"),
1504 )
1505 },
1506 &mut overlapped_wrapper,
1507 &exit_event,
1508 )
1509 .unwrap();
1510 assert_eq!(
1511 *messages.get(message.len() - 1).unwrap(),
1512 std::str::from_utf8(&message).unwrap(),
1513 );
1514 }
1515 })
1516 }
1517
1518 #[test]
multipart_message_smoke_test()1519 fn multipart_message_smoke_test() {
1520 let pipe_name = generate_pipe_name();
1521 let server = MultiPartMessagePipe::create_as_server(&pipe_name).unwrap();
1522 let client = MultiPartMessagePipe::create_as_client(&pipe_name).unwrap();
1523 let handles = [
1524 send_receive_msgs(server.clone(), 100),
1525 send_receive_msgs(client.clone(), 100),
1526 send_receive_msgs(server, 100),
1527 send_receive_msgs(client, 100),
1528 ];
1529 for h in handles {
1530 h.join().unwrap();
1531 }
1532 }
1533
1534 #[test]
multipart_message_into_inner_pipe()1535 fn multipart_message_into_inner_pipe() {
1536 let pipe_name = generate_pipe_name();
1537 let mut pipe = create_server_pipe(
1538 &format!(r"\\.\pipe\{}", pipe_name),
1539 &FramingMode::Message,
1540 &BlockingMode::Wait,
1541 0,
1542 1024 * 1024,
1543 true,
1544 )
1545 .expect("should create the server pipe with success");
1546 let server1 = {
1547 let pipe = pipe
1548 .try_clone()
1549 .expect("should duplicate the named pipe with success");
1550 // SAFETY: `pipe` is a server named pipe.
1551 unsafe { MultiPartMessagePipe::create_from_server_pipe(pipe) }
1552 .expect("should create the multipart message pipe with success")
1553 };
1554 let server2 = server1.clone();
1555 assert!(
1556 server2.into_inner_pipe().is_none(),
1557 "not the last reference, should be None"
1558 );
1559 let inner_pipe = server1
1560 .into_inner_pipe()
1561 .expect("the last reference, should return the underlying pipe");
1562 // CompareObjectHandles is a Windows 10 API and is not available in mingw, so we can't use
1563 // that API to compare if 2 handles are the same.
1564 pipe.set_blocking(&BlockingMode::NoWait)
1565 .expect("should set the blocking mode on the original pipe with success");
1566 assert_eq!(
1567 pipe.get_info()
1568 .expect("should get the pipe information on the original pipe successfully"),
1569 inner_pipe
1570 .get_info()
1571 .expect("should get the pipe information on the inner pipe successfully")
1572 );
1573 pipe.set_blocking(&BlockingMode::Wait)
1574 .expect("should set the blocking mode on the original pipe with success");
1575 assert_eq!(
1576 pipe.get_info()
1577 .expect("should get the pipe information on the original pipe successfully"),
1578 inner_pipe
1579 .get_info()
1580 .expect("should get the pipe information on the inner pipe successfully")
1581 );
1582 }
1583
1584 #[test]
test_wait_for_connection_blocking()1585 fn test_wait_for_connection_blocking() {
1586 let pipe_name = generate_pipe_name();
1587
1588 let mut server_pipe = create_server_pipe(
1589 &pipe_name,
1590 &FramingMode::Message,
1591 &BlockingMode::Wait,
1592 /* timeout= */ 0,
1593 /* buffer_size= */ 1000,
1594 /* overlapped= */ true,
1595 )
1596 .unwrap();
1597
1598 let server = crate::thread::spawn_with_timeout(move || {
1599 let exit_event = Event::new().unwrap();
1600 server_pipe
1601 .wait_for_client_connection_overlapped_blocking(&exit_event)
1602 .unwrap();
1603 });
1604
1605 let _client = create_client_pipe(
1606 &pipe_name,
1607 &FramingMode::Message,
1608 &BlockingMode::Wait,
1609 /* overlapped= */ true,
1610 )
1611 .unwrap();
1612 server.try_join(Duration::from_secs(10)).unwrap();
1613 }
1614
1615 #[test]
test_wait_for_connection_blocking_exit_triggered()1616 fn test_wait_for_connection_blocking_exit_triggered() {
1617 let pipe_name = generate_pipe_name();
1618
1619 let mut server_pipe = create_server_pipe(
1620 &pipe_name,
1621 &FramingMode::Message,
1622 &BlockingMode::Wait,
1623 /* timeout= */ 0,
1624 /* buffer_size= */ 1000,
1625 /* overlapped= */ true,
1626 )
1627 .unwrap();
1628
1629 let exit_event = Event::new().unwrap();
1630 let exit_event_for_server = exit_event.try_clone().unwrap();
1631 let server = crate::thread::spawn_with_timeout(move || {
1632 assert!(server_pipe
1633 .wait_for_client_connection_overlapped_blocking(&exit_event_for_server)
1634 .is_err());
1635 });
1636 exit_event.signal().unwrap();
1637 server.try_join(Duration::from_secs(10)).unwrap();
1638 }
1639
1640 #[test]
std_io_read_eof()1641 fn std_io_read_eof() {
1642 let (mut w, mut r) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
1643 std::io::Write::write(&mut w, &[1, 2, 3]).unwrap();
1644 std::mem::drop(w);
1645
1646 let mut buffer: [u8; 4] = [0; 4];
1647 assert_eq!(std::io::Read::read(&mut r, &mut buffer).unwrap(), 3);
1648 assert_eq!(buffer, [1, 2, 3, 0]);
1649 assert_eq!(std::io::Read::read(&mut r, &mut buffer).unwrap(), 0);
1650 assert_eq!(std::io::Read::read(&mut r, &mut buffer).unwrap(), 0);
1651 }
1652
1653 #[test]
std_io_write_eof()1654 fn std_io_write_eof() {
1655 let (mut w, r) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
1656 std::mem::drop(r);
1657 let result = std::io::Write::write(&mut w, &[1, 2, 3]);
1658 // Not required to return BrokenPipe here, something like Ok(0) is also acceptable.
1659 assert!(
1660 result.is_err()
1661 && result.as_ref().unwrap_err().kind() == std::io::ErrorKind::BrokenPipe,
1662 "expected Err(BrokenPipe), got {result:?}"
1663 );
1664 }
1665 }
1666