1 //! Unix pipe.
2 //!
3 //! See the [`new`] function for documentation.
4 
5 use std::io;
6 use std::os::fd::RawFd;
7 
new_raw() -> io::Result<[RawFd; 2]>8 pub(crate) fn new_raw() -> io::Result<[RawFd; 2]> {
9     let mut fds: [RawFd; 2] = [-1, -1];
10 
11     #[cfg(any(
12         target_os = "android",
13         target_os = "dragonfly",
14         target_os = "freebsd",
15         target_os = "fuchsia",
16         target_os = "hurd",
17         target_os = "linux",
18         target_os = "netbsd",
19         target_os = "openbsd",
20         target_os = "illumos",
21         target_os = "redox",
22         target_os = "solaris",
23         target_os = "vita",
24     ))]
25     unsafe {
26         if libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) != 0 {
27             return Err(io::Error::last_os_error());
28         }
29     }
30 
31     #[cfg(any(
32         target_os = "aix",
33         target_os = "haiku",
34         target_os = "ios",
35         target_os = "macos",
36         target_os = "tvos",
37         target_os = "visionos",
38         target_os = "watchos",
39         target_os = "espidf",
40         target_os = "nto",
41     ))]
42     unsafe {
43         // For platforms that don't have `pipe2(2)` we need to manually set the
44         // correct flags on the file descriptor.
45         if libc::pipe(fds.as_mut_ptr()) != 0 {
46             return Err(io::Error::last_os_error());
47         }
48 
49         for fd in &fds {
50             if libc::fcntl(*fd, libc::F_SETFL, libc::O_NONBLOCK) != 0
51                 || libc::fcntl(*fd, libc::F_SETFD, libc::FD_CLOEXEC) != 0
52             {
53                 let err = io::Error::last_os_error();
54                 // Don't leak file descriptors. Can't handle closing error though.
55                 let _ = libc::close(fds[0]);
56                 let _ = libc::close(fds[1]);
57                 return Err(err);
58             }
59         }
60     }
61 
62     Ok(fds)
63 }
64 
65 cfg_os_ext! {
66 use std::fs::File;
67 use std::io::{IoSlice, IoSliceMut, Read, Write};
68 use std::os::fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd};
69 use std::process::{ChildStderr, ChildStdin, ChildStdout};
70 
71 use crate::io_source::IoSource;
72 use crate::{event, Interest, Registry, Token};
73 
74 /// Create a new non-blocking Unix pipe.
75 ///
76 /// This is a wrapper around Unix's [`pipe(2)`] system call and can be used as
77 /// inter-process or thread communication channel.
78 ///
79 /// This channel may be created before forking the process and then one end used
80 /// in each process, e.g. the parent process has the sending end to send command
81 /// to the child process.
82 ///
83 /// [`pipe(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/pipe.html
84 ///
85 /// # Events
86 ///
87 /// The [`Sender`] can be registered with [`WRITABLE`] interest to receive
88 /// [writable events], the [`Receiver`] with [`READABLE`] interest. Once data is
89 /// written to the `Sender` the `Receiver` will receive an [readable event].
90 ///
91 /// In addition to those events, events will also be generated if the other side
92 /// is dropped. To check if the `Sender` is dropped you'll need to check
93 /// [`is_read_closed`] on events for the `Receiver`, if it returns true the
94 /// `Sender` is dropped. On the `Sender` end check [`is_write_closed`], if it
95 /// returns true the `Receiver` was dropped. Also see the second example below.
96 ///
97 /// [`WRITABLE`]: Interest::WRITABLE
98 /// [writable events]: event::Event::is_writable
99 /// [`READABLE`]: Interest::READABLE
100 /// [readable event]: event::Event::is_readable
101 /// [`is_read_closed`]: event::Event::is_read_closed
102 /// [`is_write_closed`]: event::Event::is_write_closed
103 ///
104 /// # Deregistering
105 ///
106 /// Both `Sender` and `Receiver` will deregister themselves when dropped,
107 /// **iff** the file descriptors are not duplicated (via [`dup(2)`]).
108 ///
109 /// [`dup(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/dup.html
110 ///
111 /// # Examples
112 ///
113 /// Simple example that writes data into the sending end and read it from the
114 /// receiving end.
115 ///
116 /// ```
117 /// use std::io::{self, Read, Write};
118 ///
119 /// use mio::{Poll, Events, Interest, Token};
120 /// use mio::unix::pipe;
121 ///
122 /// // Unique tokens for the two ends of the channel.
123 /// const PIPE_RECV: Token = Token(0);
124 /// const PIPE_SEND: Token = Token(1);
125 ///
126 /// # fn main() -> io::Result<()> {
127 /// // Create our `Poll` instance and the `Events` container.
128 /// let mut poll = Poll::new()?;
129 /// let mut events = Events::with_capacity(8);
130 ///
131 /// // Create a new pipe.
132 /// let (mut sender, mut receiver) = pipe::new()?;
133 ///
134 /// // Register both ends of the channel.
135 /// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?;
136 /// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?;
137 ///
138 /// const MSG: &[u8; 11] = b"Hello world";
139 ///
140 /// loop {
141 ///     poll.poll(&mut events, None)?;
142 ///
143 ///     for event in events.iter() {
144 ///         match event.token() {
145 ///             PIPE_SEND => sender.write(MSG)
146 ///                 .and_then(|n| if n != MSG.len() {
147 ///                         // We'll consider a short write an error in this
148 ///                         // example. NOTE: we can't use `write_all` with
149 ///                         // non-blocking I/O.
150 ///                         Err(io::ErrorKind::WriteZero.into())
151 ///                     } else {
152 ///                         Ok(())
153 ///                     })?,
154 ///             PIPE_RECV => {
155 ///                 let mut buf = [0; 11];
156 ///                 let n = receiver.read(&mut buf)?;
157 ///                 println!("received: {:?}", &buf[0..n]);
158 ///                 assert_eq!(n, MSG.len());
159 ///                 assert_eq!(&buf, &*MSG);
160 ///                 return Ok(());
161 ///             },
162 ///             _ => unreachable!(),
163 ///         }
164 ///     }
165 /// }
166 /// # }
167 /// ```
168 ///
169 /// Example that receives an event once the `Sender` is dropped.
170 ///
171 /// ```
172 /// # use std::io;
173 /// #
174 /// # use mio::{Poll, Events, Interest, Token};
175 /// # use mio::unix::pipe;
176 /// #
177 /// # const PIPE_RECV: Token = Token(0);
178 /// # const PIPE_SEND: Token = Token(1);
179 /// #
180 /// # fn main() -> io::Result<()> {
181 /// // Same setup as in the example above.
182 /// let mut poll = Poll::new()?;
183 /// let mut events = Events::with_capacity(8);
184 ///
185 /// let (mut sender, mut receiver) = pipe::new()?;
186 ///
187 /// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?;
188 /// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?;
189 ///
190 /// // Drop the sender.
191 /// drop(sender);
192 ///
193 /// poll.poll(&mut events, None)?;
194 ///
195 /// for event in events.iter() {
196 ///     match event.token() {
197 ///         PIPE_RECV if event.is_read_closed() => {
198 ///             // Detected that the sender was dropped.
199 ///             println!("Sender dropped!");
200 ///             return Ok(());
201 ///         },
202 ///         _ => unreachable!(),
203 ///     }
204 /// }
205 /// # unreachable!();
206 /// # }
207 /// ```
208 pub fn new() -> io::Result<(Sender, Receiver)> {
209     let fds = new_raw()?;
210     // SAFETY: `new_raw` initialised the `fds` above.
211     let r = unsafe { Receiver::from_raw_fd(fds[0]) };
212     let w = unsafe { Sender::from_raw_fd(fds[1]) };
213     Ok((w, r))
214 }
215 
216 /// Sending end of an Unix pipe.
217 ///
218 /// See [`new`] for documentation, including examples.
219 #[derive(Debug)]
220 pub struct Sender {
221     inner: IoSource<File>,
222 }
223 
224 impl Sender {
225     /// Set the `Sender` into or out of non-blocking mode.
226     pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
227         set_nonblocking(self.inner.as_raw_fd(), nonblocking)
228     }
229 
230     /// Execute an I/O operation ensuring that the socket receives more events
231     /// if it hits a [`WouldBlock`] error.
232     ///
233     /// # Notes
234     ///
235     /// This method is required to be called for **all** I/O operations to
236     /// ensure the user will receive events once the socket is ready again after
237     /// returning a [`WouldBlock`] error.
238     ///
239     /// [`WouldBlock`]: io::ErrorKind::WouldBlock
240     ///
241     /// # Examples
242     ///
243     /// ```
244     /// # use std::error::Error;
245     /// #
246     /// # fn main() -> Result<(), Box<dyn Error>> {
247     /// use std::io;
248     /// use std::os::fd::AsRawFd;
249     /// use mio::unix::pipe;
250     ///
251     /// let (sender, receiver) = pipe::new()?;
252     ///
253     /// // Wait until the sender is writable...
254     ///
255     /// // Write to the sender using a direct libc call, of course the
256     /// // `io::Write` implementation would be easier to use.
257     /// let buf = b"hello";
258     /// let n = sender.try_io(|| {
259     ///     let buf_ptr = &buf as *const _ as *const _;
260     ///     let res = unsafe { libc::write(sender.as_raw_fd(), buf_ptr, buf.len()) };
261     ///     if res != -1 {
262     ///         Ok(res as usize)
263     ///     } else {
264     ///         // If EAGAIN or EWOULDBLOCK is set by libc::write, the closure
265     ///         // should return `WouldBlock` error.
266     ///         Err(io::Error::last_os_error())
267     ///     }
268     /// })?;
269     /// eprintln!("write {} bytes", n);
270     ///
271     /// // Wait until the receiver is readable...
272     ///
273     /// // Read from the receiver using a direct libc call, of course the
274     /// // `io::Read` implementation would be easier to use.
275     /// let mut buf = [0; 512];
276     /// let n = receiver.try_io(|| {
277     ///     let buf_ptr = &mut buf as *mut _ as *mut _;
278     ///     let res = unsafe { libc::read(receiver.as_raw_fd(), buf_ptr, buf.len()) };
279     ///     if res != -1 {
280     ///         Ok(res as usize)
281     ///     } else {
282     ///         // If EAGAIN or EWOULDBLOCK is set by libc::read, the closure
283     ///         // should return `WouldBlock` error.
284     ///         Err(io::Error::last_os_error())
285     ///     }
286     /// })?;
287     /// eprintln!("read {} bytes", n);
288     /// # Ok(())
289     /// # }
290     /// ```
291     pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
292     where
293         F: FnOnce() -> io::Result<T>,
294     {
295         self.inner.do_io(|_| f())
296     }
297 }
298 
299 impl event::Source for Sender {
300     fn register(
301         &mut self,
302         registry: &Registry,
303         token: Token,
304         interests: Interest,
305     ) -> io::Result<()> {
306         self.inner.register(registry, token, interests)
307     }
308 
309     fn reregister(
310         &mut self,
311         registry: &Registry,
312         token: Token,
313         interests: Interest,
314     ) -> io::Result<()> {
315         self.inner.reregister(registry, token, interests)
316     }
317 
318     fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
319         self.inner.deregister(registry)
320     }
321 }
322 
323 impl Write for Sender {
324     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
325         self.inner.do_io(|mut sender| sender.write(buf))
326     }
327 
328     fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
329         self.inner.do_io(|mut sender| sender.write_vectored(bufs))
330     }
331 
332     fn flush(&mut self) -> io::Result<()> {
333         self.inner.do_io(|mut sender| sender.flush())
334     }
335 }
336 
337 impl Write for &Sender {
338     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
339         self.inner.do_io(|mut sender| sender.write(buf))
340     }
341 
342     fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
343         self.inner.do_io(|mut sender| sender.write_vectored(bufs))
344     }
345 
346     fn flush(&mut self) -> io::Result<()> {
347         self.inner.do_io(|mut sender| sender.flush())
348     }
349 }
350 
351 /// # Notes
352 ///
353 /// The underlying pipe is **not** set to non-blocking.
354 impl From<ChildStdin> for Sender {
355     fn from(stdin: ChildStdin) -> Sender {
356         // Safety: `ChildStdin` is guaranteed to be a valid file descriptor.
357         unsafe { Sender::from_raw_fd(stdin.into_raw_fd()) }
358     }
359 }
360 
361 impl FromRawFd for Sender {
362     unsafe fn from_raw_fd(fd: RawFd) -> Sender {
363         Sender {
364             inner: IoSource::new(File::from_raw_fd(fd)),
365         }
366     }
367 }
368 
369 impl AsRawFd for Sender {
370     fn as_raw_fd(&self) -> RawFd {
371         self.inner.as_raw_fd()
372     }
373 }
374 
375 impl IntoRawFd for Sender {
376     fn into_raw_fd(self) -> RawFd {
377         self.inner.into_inner().into_raw_fd()
378     }
379 }
380 
381 impl From<Sender> for OwnedFd {
382     fn from(sender: Sender) -> Self {
383         sender.inner.into_inner().into()
384     }
385 }
386 
387 impl AsFd for Sender {
388     fn as_fd(&self) -> BorrowedFd<'_> {
389         self.inner.as_fd()
390     }
391 }
392 
393 impl From<OwnedFd> for Sender {
394     fn from(fd: OwnedFd) -> Self {
395         Sender {
396             inner: IoSource::new(File::from(fd)),
397         }
398     }
399 }
400 
401 /// Receiving end of an Unix pipe.
402 ///
403 /// See [`new`] for documentation, including examples.
404 #[derive(Debug)]
405 pub struct Receiver {
406     inner: IoSource<File>,
407 }
408 
409 impl Receiver {
410     /// Set the `Receiver` into or out of non-blocking mode.
411     pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
412         set_nonblocking(self.inner.as_raw_fd(), nonblocking)
413     }
414 
415     /// Execute an I/O operation ensuring that the socket receives more events
416     /// if it hits a [`WouldBlock`] error.
417     ///
418     /// # Notes
419     ///
420     /// This method is required to be called for **all** I/O operations to
421     /// ensure the user will receive events once the socket is ready again after
422     /// returning a [`WouldBlock`] error.
423     ///
424     /// [`WouldBlock`]: io::ErrorKind::WouldBlock
425     ///
426     /// # Examples
427     ///
428     /// ```
429     /// # use std::error::Error;
430     /// #
431     /// # fn main() -> Result<(), Box<dyn Error>> {
432     /// use std::io;
433     /// use std::os::fd::AsRawFd;
434     /// use mio::unix::pipe;
435     ///
436     /// let (sender, receiver) = pipe::new()?;
437     ///
438     /// // Wait until the sender is writable...
439     ///
440     /// // Write to the sender using a direct libc call, of course the
441     /// // `io::Write` implementation would be easier to use.
442     /// let buf = b"hello";
443     /// let n = sender.try_io(|| {
444     ///     let buf_ptr = &buf as *const _ as *const _;
445     ///     let res = unsafe { libc::write(sender.as_raw_fd(), buf_ptr, buf.len()) };
446     ///     if res != -1 {
447     ///         Ok(res as usize)
448     ///     } else {
449     ///         // If EAGAIN or EWOULDBLOCK is set by libc::write, the closure
450     ///         // should return `WouldBlock` error.
451     ///         Err(io::Error::last_os_error())
452     ///     }
453     /// })?;
454     /// eprintln!("write {} bytes", n);
455     ///
456     /// // Wait until the receiver is readable...
457     ///
458     /// // Read from the receiver using a direct libc call, of course the
459     /// // `io::Read` implementation would be easier to use.
460     /// let mut buf = [0; 512];
461     /// let n = receiver.try_io(|| {
462     ///     let buf_ptr = &mut buf as *mut _ as *mut _;
463     ///     let res = unsafe { libc::read(receiver.as_raw_fd(), buf_ptr, buf.len()) };
464     ///     if res != -1 {
465     ///         Ok(res as usize)
466     ///     } else {
467     ///         // If EAGAIN or EWOULDBLOCK is set by libc::read, the closure
468     ///         // should return `WouldBlock` error.
469     ///         Err(io::Error::last_os_error())
470     ///     }
471     /// })?;
472     /// eprintln!("read {} bytes", n);
473     /// # Ok(())
474     /// # }
475     /// ```
476     pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
477     where
478         F: FnOnce() -> io::Result<T>,
479     {
480         self.inner.do_io(|_| f())
481     }
482 }
483 
484 impl event::Source for Receiver {
485     fn register(
486         &mut self,
487         registry: &Registry,
488         token: Token,
489         interests: Interest,
490     ) -> io::Result<()> {
491         self.inner.register(registry, token, interests)
492     }
493 
494     fn reregister(
495         &mut self,
496         registry: &Registry,
497         token: Token,
498         interests: Interest,
499     ) -> io::Result<()> {
500         self.inner.reregister(registry, token, interests)
501     }
502 
503     fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
504         self.inner.deregister(registry)
505     }
506 }
507 
508 impl Read for Receiver {
509     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
510         self.inner.do_io(|mut sender| sender.read(buf))
511     }
512 
513     fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
514         self.inner.do_io(|mut sender| sender.read_vectored(bufs))
515     }
516 }
517 
518 impl Read for &Receiver {
519     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
520         self.inner.do_io(|mut sender| sender.read(buf))
521     }
522 
523     fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
524         self.inner.do_io(|mut sender| sender.read_vectored(bufs))
525     }
526 }
527 
528 /// # Notes
529 ///
530 /// The underlying pipe is **not** set to non-blocking.
531 impl From<ChildStdout> for Receiver {
532     fn from(stdout: ChildStdout) -> Receiver {
533         // Safety: `ChildStdout` is guaranteed to be a valid file descriptor.
534         unsafe { Receiver::from_raw_fd(stdout.into_raw_fd()) }
535     }
536 }
537 
538 /// # Notes
539 ///
540 /// The underlying pipe is **not** set to non-blocking.
541 impl From<ChildStderr> for Receiver {
542     fn from(stderr: ChildStderr) -> Receiver {
543         // Safety: `ChildStderr` is guaranteed to be a valid file descriptor.
544         unsafe { Receiver::from_raw_fd(stderr.into_raw_fd()) }
545     }
546 }
547 
548 impl IntoRawFd for Receiver {
549     fn into_raw_fd(self) -> RawFd {
550         self.inner.into_inner().into_raw_fd()
551     }
552 }
553 
554 impl AsRawFd for Receiver {
555     fn as_raw_fd(&self) -> RawFd {
556         self.inner.as_raw_fd()
557     }
558 }
559 
560 impl FromRawFd for Receiver {
561     unsafe fn from_raw_fd(fd: RawFd) -> Receiver {
562         Receiver {
563             inner: IoSource::new(File::from_raw_fd(fd)),
564         }
565     }
566 }
567 
568 impl From<Receiver> for OwnedFd {
569     fn from(receiver: Receiver) -> Self {
570         receiver.inner.into_inner().into()
571     }
572 }
573 
574 impl AsFd for Receiver {
575     fn as_fd(&self) -> BorrowedFd<'_> {
576         self.inner.as_fd()
577     }
578 }
579 
580 impl From<OwnedFd> for Receiver {
581     fn from(fd: OwnedFd) -> Self {
582         Receiver {
583             inner: IoSource::new(File::from(fd)),
584         }
585     }
586 }
587 
588 #[cfg(not(any(target_os = "illumos", target_os = "solaris", target_os = "vita")))]
589 fn set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> {
590     let value = nonblocking as libc::c_int;
591     if unsafe { libc::ioctl(fd, libc::FIONBIO, &value) } == -1 {
592         Err(io::Error::last_os_error())
593     } else {
594         Ok(())
595     }
596 }
597 
598 #[cfg(any(target_os = "illumos", target_os = "solaris", target_os = "vita"))]
599 fn set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> {
600     let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) };
601     if flags < 0 {
602         return Err(io::Error::last_os_error());
603     }
604 
605     let nflags = if nonblocking {
606         flags | libc::O_NONBLOCK
607     } else {
608         flags & !libc::O_NONBLOCK
609     };
610 
611     if flags != nflags {
612         if unsafe { libc::fcntl(fd, libc::F_SETFL, nflags) } < 0 {
613             return Err(io::Error::last_os_error());
614         }
615     }
616 
617     Ok(())
618 }
619 } // `cfg_os_ext!`.
620