1 //! Unix pipe.
2 //!
3 //! See the [`new`] function for documentation.
4
5 use std::io;
6 use std::os::fd::RawFd;
7
new_raw() -> io::Result<[RawFd; 2]>8 pub(crate) fn new_raw() -> io::Result<[RawFd; 2]> {
9 let mut fds: [RawFd; 2] = [-1, -1];
10
11 #[cfg(any(
12 target_os = "android",
13 target_os = "dragonfly",
14 target_os = "freebsd",
15 target_os = "fuchsia",
16 target_os = "hurd",
17 target_os = "linux",
18 target_os = "netbsd",
19 target_os = "openbsd",
20 target_os = "illumos",
21 target_os = "redox",
22 target_os = "solaris",
23 target_os = "vita",
24 ))]
25 unsafe {
26 if libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) != 0 {
27 return Err(io::Error::last_os_error());
28 }
29 }
30
31 #[cfg(any(
32 target_os = "aix",
33 target_os = "haiku",
34 target_os = "ios",
35 target_os = "macos",
36 target_os = "tvos",
37 target_os = "visionos",
38 target_os = "watchos",
39 target_os = "espidf",
40 target_os = "nto",
41 ))]
42 unsafe {
43 // For platforms that don't have `pipe2(2)` we need to manually set the
44 // correct flags on the file descriptor.
45 if libc::pipe(fds.as_mut_ptr()) != 0 {
46 return Err(io::Error::last_os_error());
47 }
48
49 for fd in &fds {
50 if libc::fcntl(*fd, libc::F_SETFL, libc::O_NONBLOCK) != 0
51 || libc::fcntl(*fd, libc::F_SETFD, libc::FD_CLOEXEC) != 0
52 {
53 let err = io::Error::last_os_error();
54 // Don't leak file descriptors. Can't handle closing error though.
55 let _ = libc::close(fds[0]);
56 let _ = libc::close(fds[1]);
57 return Err(err);
58 }
59 }
60 }
61
62 Ok(fds)
63 }
64
65 cfg_os_ext! {
66 use std::fs::File;
67 use std::io::{IoSlice, IoSliceMut, Read, Write};
68 use std::os::fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd};
69 use std::process::{ChildStderr, ChildStdin, ChildStdout};
70
71 use crate::io_source::IoSource;
72 use crate::{event, Interest, Registry, Token};
73
74 /// Create a new non-blocking Unix pipe.
75 ///
76 /// This is a wrapper around Unix's [`pipe(2)`] system call and can be used as
77 /// inter-process or thread communication channel.
78 ///
79 /// This channel may be created before forking the process and then one end used
80 /// in each process, e.g. the parent process has the sending end to send command
81 /// to the child process.
82 ///
83 /// [`pipe(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/pipe.html
84 ///
85 /// # Events
86 ///
87 /// The [`Sender`] can be registered with [`WRITABLE`] interest to receive
88 /// [writable events], the [`Receiver`] with [`READABLE`] interest. Once data is
89 /// written to the `Sender` the `Receiver` will receive an [readable event].
90 ///
91 /// In addition to those events, events will also be generated if the other side
92 /// is dropped. To check if the `Sender` is dropped you'll need to check
93 /// [`is_read_closed`] on events for the `Receiver`, if it returns true the
94 /// `Sender` is dropped. On the `Sender` end check [`is_write_closed`], if it
95 /// returns true the `Receiver` was dropped. Also see the second example below.
96 ///
97 /// [`WRITABLE`]: Interest::WRITABLE
98 /// [writable events]: event::Event::is_writable
99 /// [`READABLE`]: Interest::READABLE
100 /// [readable event]: event::Event::is_readable
101 /// [`is_read_closed`]: event::Event::is_read_closed
102 /// [`is_write_closed`]: event::Event::is_write_closed
103 ///
104 /// # Deregistering
105 ///
106 /// Both `Sender` and `Receiver` will deregister themselves when dropped,
107 /// **iff** the file descriptors are not duplicated (via [`dup(2)`]).
108 ///
109 /// [`dup(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/dup.html
110 ///
111 /// # Examples
112 ///
113 /// Simple example that writes data into the sending end and read it from the
114 /// receiving end.
115 ///
116 /// ```
117 /// use std::io::{self, Read, Write};
118 ///
119 /// use mio::{Poll, Events, Interest, Token};
120 /// use mio::unix::pipe;
121 ///
122 /// // Unique tokens for the two ends of the channel.
123 /// const PIPE_RECV: Token = Token(0);
124 /// const PIPE_SEND: Token = Token(1);
125 ///
126 /// # fn main() -> io::Result<()> {
127 /// // Create our `Poll` instance and the `Events` container.
128 /// let mut poll = Poll::new()?;
129 /// let mut events = Events::with_capacity(8);
130 ///
131 /// // Create a new pipe.
132 /// let (mut sender, mut receiver) = pipe::new()?;
133 ///
134 /// // Register both ends of the channel.
135 /// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?;
136 /// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?;
137 ///
138 /// const MSG: &[u8; 11] = b"Hello world";
139 ///
140 /// loop {
141 /// poll.poll(&mut events, None)?;
142 ///
143 /// for event in events.iter() {
144 /// match event.token() {
145 /// PIPE_SEND => sender.write(MSG)
146 /// .and_then(|n| if n != MSG.len() {
147 /// // We'll consider a short write an error in this
148 /// // example. NOTE: we can't use `write_all` with
149 /// // non-blocking I/O.
150 /// Err(io::ErrorKind::WriteZero.into())
151 /// } else {
152 /// Ok(())
153 /// })?,
154 /// PIPE_RECV => {
155 /// let mut buf = [0; 11];
156 /// let n = receiver.read(&mut buf)?;
157 /// println!("received: {:?}", &buf[0..n]);
158 /// assert_eq!(n, MSG.len());
159 /// assert_eq!(&buf, &*MSG);
160 /// return Ok(());
161 /// },
162 /// _ => unreachable!(),
163 /// }
164 /// }
165 /// }
166 /// # }
167 /// ```
168 ///
169 /// Example that receives an event once the `Sender` is dropped.
170 ///
171 /// ```
172 /// # use std::io;
173 /// #
174 /// # use mio::{Poll, Events, Interest, Token};
175 /// # use mio::unix::pipe;
176 /// #
177 /// # const PIPE_RECV: Token = Token(0);
178 /// # const PIPE_SEND: Token = Token(1);
179 /// #
180 /// # fn main() -> io::Result<()> {
181 /// // Same setup as in the example above.
182 /// let mut poll = Poll::new()?;
183 /// let mut events = Events::with_capacity(8);
184 ///
185 /// let (mut sender, mut receiver) = pipe::new()?;
186 ///
187 /// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?;
188 /// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?;
189 ///
190 /// // Drop the sender.
191 /// drop(sender);
192 ///
193 /// poll.poll(&mut events, None)?;
194 ///
195 /// for event in events.iter() {
196 /// match event.token() {
197 /// PIPE_RECV if event.is_read_closed() => {
198 /// // Detected that the sender was dropped.
199 /// println!("Sender dropped!");
200 /// return Ok(());
201 /// },
202 /// _ => unreachable!(),
203 /// }
204 /// }
205 /// # unreachable!();
206 /// # }
207 /// ```
208 pub fn new() -> io::Result<(Sender, Receiver)> {
209 let fds = new_raw()?;
210 // SAFETY: `new_raw` initialised the `fds` above.
211 let r = unsafe { Receiver::from_raw_fd(fds[0]) };
212 let w = unsafe { Sender::from_raw_fd(fds[1]) };
213 Ok((w, r))
214 }
215
216 /// Sending end of an Unix pipe.
217 ///
218 /// See [`new`] for documentation, including examples.
219 #[derive(Debug)]
220 pub struct Sender {
221 inner: IoSource<File>,
222 }
223
224 impl Sender {
225 /// Set the `Sender` into or out of non-blocking mode.
226 pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
227 set_nonblocking(self.inner.as_raw_fd(), nonblocking)
228 }
229
230 /// Execute an I/O operation ensuring that the socket receives more events
231 /// if it hits a [`WouldBlock`] error.
232 ///
233 /// # Notes
234 ///
235 /// This method is required to be called for **all** I/O operations to
236 /// ensure the user will receive events once the socket is ready again after
237 /// returning a [`WouldBlock`] error.
238 ///
239 /// [`WouldBlock`]: io::ErrorKind::WouldBlock
240 ///
241 /// # Examples
242 ///
243 /// ```
244 /// # use std::error::Error;
245 /// #
246 /// # fn main() -> Result<(), Box<dyn Error>> {
247 /// use std::io;
248 /// use std::os::fd::AsRawFd;
249 /// use mio::unix::pipe;
250 ///
251 /// let (sender, receiver) = pipe::new()?;
252 ///
253 /// // Wait until the sender is writable...
254 ///
255 /// // Write to the sender using a direct libc call, of course the
256 /// // `io::Write` implementation would be easier to use.
257 /// let buf = b"hello";
258 /// let n = sender.try_io(|| {
259 /// let buf_ptr = &buf as *const _ as *const _;
260 /// let res = unsafe { libc::write(sender.as_raw_fd(), buf_ptr, buf.len()) };
261 /// if res != -1 {
262 /// Ok(res as usize)
263 /// } else {
264 /// // If EAGAIN or EWOULDBLOCK is set by libc::write, the closure
265 /// // should return `WouldBlock` error.
266 /// Err(io::Error::last_os_error())
267 /// }
268 /// })?;
269 /// eprintln!("write {} bytes", n);
270 ///
271 /// // Wait until the receiver is readable...
272 ///
273 /// // Read from the receiver using a direct libc call, of course the
274 /// // `io::Read` implementation would be easier to use.
275 /// let mut buf = [0; 512];
276 /// let n = receiver.try_io(|| {
277 /// let buf_ptr = &mut buf as *mut _ as *mut _;
278 /// let res = unsafe { libc::read(receiver.as_raw_fd(), buf_ptr, buf.len()) };
279 /// if res != -1 {
280 /// Ok(res as usize)
281 /// } else {
282 /// // If EAGAIN or EWOULDBLOCK is set by libc::read, the closure
283 /// // should return `WouldBlock` error.
284 /// Err(io::Error::last_os_error())
285 /// }
286 /// })?;
287 /// eprintln!("read {} bytes", n);
288 /// # Ok(())
289 /// # }
290 /// ```
291 pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
292 where
293 F: FnOnce() -> io::Result<T>,
294 {
295 self.inner.do_io(|_| f())
296 }
297 }
298
299 impl event::Source for Sender {
300 fn register(
301 &mut self,
302 registry: &Registry,
303 token: Token,
304 interests: Interest,
305 ) -> io::Result<()> {
306 self.inner.register(registry, token, interests)
307 }
308
309 fn reregister(
310 &mut self,
311 registry: &Registry,
312 token: Token,
313 interests: Interest,
314 ) -> io::Result<()> {
315 self.inner.reregister(registry, token, interests)
316 }
317
318 fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
319 self.inner.deregister(registry)
320 }
321 }
322
323 impl Write for Sender {
324 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
325 self.inner.do_io(|mut sender| sender.write(buf))
326 }
327
328 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
329 self.inner.do_io(|mut sender| sender.write_vectored(bufs))
330 }
331
332 fn flush(&mut self) -> io::Result<()> {
333 self.inner.do_io(|mut sender| sender.flush())
334 }
335 }
336
337 impl Write for &Sender {
338 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
339 self.inner.do_io(|mut sender| sender.write(buf))
340 }
341
342 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
343 self.inner.do_io(|mut sender| sender.write_vectored(bufs))
344 }
345
346 fn flush(&mut self) -> io::Result<()> {
347 self.inner.do_io(|mut sender| sender.flush())
348 }
349 }
350
351 /// # Notes
352 ///
353 /// The underlying pipe is **not** set to non-blocking.
354 impl From<ChildStdin> for Sender {
355 fn from(stdin: ChildStdin) -> Sender {
356 // Safety: `ChildStdin` is guaranteed to be a valid file descriptor.
357 unsafe { Sender::from_raw_fd(stdin.into_raw_fd()) }
358 }
359 }
360
361 impl FromRawFd for Sender {
362 unsafe fn from_raw_fd(fd: RawFd) -> Sender {
363 Sender {
364 inner: IoSource::new(File::from_raw_fd(fd)),
365 }
366 }
367 }
368
369 impl AsRawFd for Sender {
370 fn as_raw_fd(&self) -> RawFd {
371 self.inner.as_raw_fd()
372 }
373 }
374
375 impl IntoRawFd for Sender {
376 fn into_raw_fd(self) -> RawFd {
377 self.inner.into_inner().into_raw_fd()
378 }
379 }
380
381 impl From<Sender> for OwnedFd {
382 fn from(sender: Sender) -> Self {
383 sender.inner.into_inner().into()
384 }
385 }
386
387 impl AsFd for Sender {
388 fn as_fd(&self) -> BorrowedFd<'_> {
389 self.inner.as_fd()
390 }
391 }
392
393 impl From<OwnedFd> for Sender {
394 fn from(fd: OwnedFd) -> Self {
395 Sender {
396 inner: IoSource::new(File::from(fd)),
397 }
398 }
399 }
400
401 /// Receiving end of an Unix pipe.
402 ///
403 /// See [`new`] for documentation, including examples.
404 #[derive(Debug)]
405 pub struct Receiver {
406 inner: IoSource<File>,
407 }
408
409 impl Receiver {
410 /// Set the `Receiver` into or out of non-blocking mode.
411 pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
412 set_nonblocking(self.inner.as_raw_fd(), nonblocking)
413 }
414
415 /// Execute an I/O operation ensuring that the socket receives more events
416 /// if it hits a [`WouldBlock`] error.
417 ///
418 /// # Notes
419 ///
420 /// This method is required to be called for **all** I/O operations to
421 /// ensure the user will receive events once the socket is ready again after
422 /// returning a [`WouldBlock`] error.
423 ///
424 /// [`WouldBlock`]: io::ErrorKind::WouldBlock
425 ///
426 /// # Examples
427 ///
428 /// ```
429 /// # use std::error::Error;
430 /// #
431 /// # fn main() -> Result<(), Box<dyn Error>> {
432 /// use std::io;
433 /// use std::os::fd::AsRawFd;
434 /// use mio::unix::pipe;
435 ///
436 /// let (sender, receiver) = pipe::new()?;
437 ///
438 /// // Wait until the sender is writable...
439 ///
440 /// // Write to the sender using a direct libc call, of course the
441 /// // `io::Write` implementation would be easier to use.
442 /// let buf = b"hello";
443 /// let n = sender.try_io(|| {
444 /// let buf_ptr = &buf as *const _ as *const _;
445 /// let res = unsafe { libc::write(sender.as_raw_fd(), buf_ptr, buf.len()) };
446 /// if res != -1 {
447 /// Ok(res as usize)
448 /// } else {
449 /// // If EAGAIN or EWOULDBLOCK is set by libc::write, the closure
450 /// // should return `WouldBlock` error.
451 /// Err(io::Error::last_os_error())
452 /// }
453 /// })?;
454 /// eprintln!("write {} bytes", n);
455 ///
456 /// // Wait until the receiver is readable...
457 ///
458 /// // Read from the receiver using a direct libc call, of course the
459 /// // `io::Read` implementation would be easier to use.
460 /// let mut buf = [0; 512];
461 /// let n = receiver.try_io(|| {
462 /// let buf_ptr = &mut buf as *mut _ as *mut _;
463 /// let res = unsafe { libc::read(receiver.as_raw_fd(), buf_ptr, buf.len()) };
464 /// if res != -1 {
465 /// Ok(res as usize)
466 /// } else {
467 /// // If EAGAIN or EWOULDBLOCK is set by libc::read, the closure
468 /// // should return `WouldBlock` error.
469 /// Err(io::Error::last_os_error())
470 /// }
471 /// })?;
472 /// eprintln!("read {} bytes", n);
473 /// # Ok(())
474 /// # }
475 /// ```
476 pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
477 where
478 F: FnOnce() -> io::Result<T>,
479 {
480 self.inner.do_io(|_| f())
481 }
482 }
483
484 impl event::Source for Receiver {
485 fn register(
486 &mut self,
487 registry: &Registry,
488 token: Token,
489 interests: Interest,
490 ) -> io::Result<()> {
491 self.inner.register(registry, token, interests)
492 }
493
494 fn reregister(
495 &mut self,
496 registry: &Registry,
497 token: Token,
498 interests: Interest,
499 ) -> io::Result<()> {
500 self.inner.reregister(registry, token, interests)
501 }
502
503 fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
504 self.inner.deregister(registry)
505 }
506 }
507
508 impl Read for Receiver {
509 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
510 self.inner.do_io(|mut sender| sender.read(buf))
511 }
512
513 fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
514 self.inner.do_io(|mut sender| sender.read_vectored(bufs))
515 }
516 }
517
518 impl Read for &Receiver {
519 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
520 self.inner.do_io(|mut sender| sender.read(buf))
521 }
522
523 fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
524 self.inner.do_io(|mut sender| sender.read_vectored(bufs))
525 }
526 }
527
528 /// # Notes
529 ///
530 /// The underlying pipe is **not** set to non-blocking.
531 impl From<ChildStdout> for Receiver {
532 fn from(stdout: ChildStdout) -> Receiver {
533 // Safety: `ChildStdout` is guaranteed to be a valid file descriptor.
534 unsafe { Receiver::from_raw_fd(stdout.into_raw_fd()) }
535 }
536 }
537
538 /// # Notes
539 ///
540 /// The underlying pipe is **not** set to non-blocking.
541 impl From<ChildStderr> for Receiver {
542 fn from(stderr: ChildStderr) -> Receiver {
543 // Safety: `ChildStderr` is guaranteed to be a valid file descriptor.
544 unsafe { Receiver::from_raw_fd(stderr.into_raw_fd()) }
545 }
546 }
547
548 impl IntoRawFd for Receiver {
549 fn into_raw_fd(self) -> RawFd {
550 self.inner.into_inner().into_raw_fd()
551 }
552 }
553
554 impl AsRawFd for Receiver {
555 fn as_raw_fd(&self) -> RawFd {
556 self.inner.as_raw_fd()
557 }
558 }
559
560 impl FromRawFd for Receiver {
561 unsafe fn from_raw_fd(fd: RawFd) -> Receiver {
562 Receiver {
563 inner: IoSource::new(File::from_raw_fd(fd)),
564 }
565 }
566 }
567
568 impl From<Receiver> for OwnedFd {
569 fn from(receiver: Receiver) -> Self {
570 receiver.inner.into_inner().into()
571 }
572 }
573
574 impl AsFd for Receiver {
575 fn as_fd(&self) -> BorrowedFd<'_> {
576 self.inner.as_fd()
577 }
578 }
579
580 impl From<OwnedFd> for Receiver {
581 fn from(fd: OwnedFd) -> Self {
582 Receiver {
583 inner: IoSource::new(File::from(fd)),
584 }
585 }
586 }
587
588 #[cfg(not(any(target_os = "illumos", target_os = "solaris", target_os = "vita")))]
589 fn set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> {
590 let value = nonblocking as libc::c_int;
591 if unsafe { libc::ioctl(fd, libc::FIONBIO, &value) } == -1 {
592 Err(io::Error::last_os_error())
593 } else {
594 Ok(())
595 }
596 }
597
598 #[cfg(any(target_os = "illumos", target_os = "solaris", target_os = "vita"))]
599 fn set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> {
600 let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) };
601 if flags < 0 {
602 return Err(io::Error::last_os_error());
603 }
604
605 let nflags = if nonblocking {
606 flags | libc::O_NONBLOCK
607 } else {
608 flags & !libc::O_NONBLOCK
609 };
610
611 if flags != nflags {
612 if unsafe { libc::fcntl(fd, libc::F_SETFL, nflags) } < 0 {
613 return Err(io::Error::last_os_error());
614 }
615 }
616
617 Ok(())
618 }
619 } // `cfg_os_ext!`.
620