1 use std::fmt;
2 use std::io::{self, IoSlice, IoSliceMut, Read, Write};
3 use std::net::Shutdown;
4 use std::os::fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
5 use std::os::unix::net::{self, SocketAddr};
6 use std::path::Path;
7 
8 use crate::io_source::IoSource;
9 use crate::{event, sys, Interest, Registry, Token};
10 
11 /// A non-blocking Unix stream socket.
12 pub struct UnixStream {
13     inner: IoSource<net::UnixStream>,
14 }
15 
16 impl UnixStream {
17     /// Connects to the socket named by `path`.
18     ///
19     /// This may return a `WouldBlock` in which case the socket connection
20     /// cannot be completed immediately. Usually it means the backlog is full.
connect<P: AsRef<Path>>(path: P) -> io::Result<UnixStream>21     pub fn connect<P: AsRef<Path>>(path: P) -> io::Result<UnixStream> {
22         let addr = SocketAddr::from_pathname(path)?;
23         UnixStream::connect_addr(&addr)
24     }
25 
26     /// Connects to the socket named by `address`.
27     ///
28     /// This may return a `WouldBlock` in which case the socket connection
29     /// cannot be completed immediately. Usually it means the backlog is full.
connect_addr(address: &SocketAddr) -> io::Result<UnixStream>30     pub fn connect_addr(address: &SocketAddr) -> io::Result<UnixStream> {
31         sys::uds::stream::connect_addr(address).map(UnixStream::from_std)
32     }
33 
34     /// Creates a new `UnixStream` from a standard `net::UnixStream`.
35     ///
36     /// This function is intended to be used to wrap a Unix stream from the
37     /// standard library in the Mio equivalent. The conversion assumes nothing
38     /// about the underlying stream; it is left up to the user to set it in
39     /// non-blocking mode.
40     ///
41     /// # Note
42     ///
43     /// The Unix stream here will not have `connect` called on it, so it
44     /// should already be connected via some other means (be it manually, or
45     /// the standard library).
from_std(stream: net::UnixStream) -> UnixStream46     pub fn from_std(stream: net::UnixStream) -> UnixStream {
47         UnixStream {
48             inner: IoSource::new(stream),
49         }
50     }
51 
52     /// Creates an unnamed pair of connected sockets.
53     ///
54     /// Returns two `UnixStream`s which are connected to each other.
pair() -> io::Result<(UnixStream, UnixStream)>55     pub fn pair() -> io::Result<(UnixStream, UnixStream)> {
56         sys::uds::stream::pair().map(|(stream1, stream2)| {
57             (UnixStream::from_std(stream1), UnixStream::from_std(stream2))
58         })
59     }
60 
61     /// Returns the socket address of the local half of this connection.
local_addr(&self) -> io::Result<SocketAddr>62     pub fn local_addr(&self) -> io::Result<SocketAddr> {
63         self.inner.local_addr()
64     }
65 
66     /// Returns the socket address of the remote half of this connection.
peer_addr(&self) -> io::Result<SocketAddr>67     pub fn peer_addr(&self) -> io::Result<SocketAddr> {
68         self.inner.peer_addr()
69     }
70 
71     /// Returns the value of the `SO_ERROR` option.
take_error(&self) -> io::Result<Option<io::Error>>72     pub fn take_error(&self) -> io::Result<Option<io::Error>> {
73         self.inner.take_error()
74     }
75 
76     /// Shuts down the read, write, or both halves of this connection.
77     ///
78     /// This function will cause all pending and future I/O calls on the
79     /// specified portions to immediately return with an appropriate value
80     /// (see the documentation of `Shutdown`).
shutdown(&self, how: Shutdown) -> io::Result<()>81     pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
82         self.inner.shutdown(how)
83     }
84 
85     /// Execute an I/O operation ensuring that the socket receives more events
86     /// if it hits a [`WouldBlock`] error.
87     ///
88     /// # Notes
89     ///
90     /// This method is required to be called for **all** I/O operations to
91     /// ensure the user will receive events once the socket is ready again after
92     /// returning a [`WouldBlock`] error.
93     ///
94     /// [`WouldBlock`]: io::ErrorKind::WouldBlock
95     ///
96     /// # Examples
97     ///
98     /// ```
99     /// # use std::error::Error;
100     /// #
101     /// # fn main() -> Result<(), Box<dyn Error>> {
102     /// use std::io;
103     /// use std::os::fd::AsRawFd;
104     /// use mio::net::UnixStream;
105     ///
106     /// let (stream1, stream2) = UnixStream::pair()?;
107     ///
108     /// // Wait until the stream is writable...
109     ///
110     /// // Write to the stream using a direct libc call, of course the
111     /// // `io::Write` implementation would be easier to use.
112     /// let buf = b"hello";
113     /// let n = stream1.try_io(|| {
114     ///     let buf_ptr = &buf as *const _ as *const _;
115     ///     let res = unsafe { libc::send(stream1.as_raw_fd(), buf_ptr, buf.len(), 0) };
116     ///     if res != -1 {
117     ///         Ok(res as usize)
118     ///     } else {
119     ///         // If EAGAIN or EWOULDBLOCK is set by libc::send, the closure
120     ///         // should return `WouldBlock` error.
121     ///         Err(io::Error::last_os_error())
122     ///     }
123     /// })?;
124     /// eprintln!("write {} bytes", n);
125     ///
126     /// // Wait until the stream is readable...
127     ///
128     /// // Read from the stream using a direct libc call, of course the
129     /// // `io::Read` implementation would be easier to use.
130     /// let mut buf = [0; 512];
131     /// let n = stream2.try_io(|| {
132     ///     let buf_ptr = &mut buf as *mut _ as *mut _;
133     ///     let res = unsafe { libc::recv(stream2.as_raw_fd(), buf_ptr, buf.len(), 0) };
134     ///     if res != -1 {
135     ///         Ok(res as usize)
136     ///     } else {
137     ///         // If EAGAIN or EWOULDBLOCK is set by libc::recv, the closure
138     ///         // should return `WouldBlock` error.
139     ///         Err(io::Error::last_os_error())
140     ///     }
141     /// })?;
142     /// eprintln!("read {} bytes", n);
143     /// # Ok(())
144     /// # }
145     /// ```
try_io<F, T>(&self, f: F) -> io::Result<T> where F: FnOnce() -> io::Result<T>,146     pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
147     where
148         F: FnOnce() -> io::Result<T>,
149     {
150         self.inner.do_io(|_| f())
151     }
152 }
153 
154 impl Read for UnixStream {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>155     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
156         self.inner.do_io(|mut inner| inner.read(buf))
157     }
158 
read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize>159     fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
160         self.inner.do_io(|mut inner| inner.read_vectored(bufs))
161     }
162 }
163 
164 impl<'a> Read for &'a UnixStream {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>165     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
166         self.inner.do_io(|mut inner| inner.read(buf))
167     }
168 
read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize>169     fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
170         self.inner.do_io(|mut inner| inner.read_vectored(bufs))
171     }
172 }
173 
174 impl Write for UnixStream {
write(&mut self, buf: &[u8]) -> io::Result<usize>175     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
176         self.inner.do_io(|mut inner| inner.write(buf))
177     }
178 
write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize>179     fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
180         self.inner.do_io(|mut inner| inner.write_vectored(bufs))
181     }
182 
flush(&mut self) -> io::Result<()>183     fn flush(&mut self) -> io::Result<()> {
184         self.inner.do_io(|mut inner| inner.flush())
185     }
186 }
187 
188 impl<'a> Write for &'a UnixStream {
write(&mut self, buf: &[u8]) -> io::Result<usize>189     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
190         self.inner.do_io(|mut inner| inner.write(buf))
191     }
192 
write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize>193     fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
194         self.inner.do_io(|mut inner| inner.write_vectored(bufs))
195     }
196 
flush(&mut self) -> io::Result<()>197     fn flush(&mut self) -> io::Result<()> {
198         self.inner.do_io(|mut inner| inner.flush())
199     }
200 }
201 
202 impl event::Source for UnixStream {
register( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>203     fn register(
204         &mut self,
205         registry: &Registry,
206         token: Token,
207         interests: Interest,
208     ) -> io::Result<()> {
209         self.inner.register(registry, token, interests)
210     }
211 
reregister( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>212     fn reregister(
213         &mut self,
214         registry: &Registry,
215         token: Token,
216         interests: Interest,
217     ) -> io::Result<()> {
218         self.inner.reregister(registry, token, interests)
219     }
220 
deregister(&mut self, registry: &Registry) -> io::Result<()>221     fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
222         self.inner.deregister(registry)
223     }
224 }
225 
226 impl fmt::Debug for UnixStream {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result227     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
228         self.inner.fmt(f)
229     }
230 }
231 
232 impl IntoRawFd for UnixStream {
into_raw_fd(self) -> RawFd233     fn into_raw_fd(self) -> RawFd {
234         self.inner.into_inner().into_raw_fd()
235     }
236 }
237 
238 impl AsRawFd for UnixStream {
as_raw_fd(&self) -> RawFd239     fn as_raw_fd(&self) -> RawFd {
240         self.inner.as_raw_fd()
241     }
242 }
243 
244 impl FromRawFd for UnixStream {
245     /// Converts a `RawFd` to a `UnixStream`.
246     ///
247     /// # Notes
248     ///
249     /// The caller is responsible for ensuring that the socket is in
250     /// non-blocking mode.
from_raw_fd(fd: RawFd) -> UnixStream251     unsafe fn from_raw_fd(fd: RawFd) -> UnixStream {
252         UnixStream::from_std(FromRawFd::from_raw_fd(fd))
253     }
254 }
255 
256 impl From<UnixStream> for net::UnixStream {
from(stream: UnixStream) -> Self257     fn from(stream: UnixStream) -> Self {
258         // Safety: This is safe since we are extracting the raw fd from a well-constructed
259         // mio::net::uds::UnixStream which ensures that we actually pass in a valid file
260         // descriptor/socket
261         unsafe { net::UnixStream::from_raw_fd(stream.into_raw_fd()) }
262     }
263 }
264 
265 impl From<UnixStream> for OwnedFd {
from(unix_stream: UnixStream) -> Self266     fn from(unix_stream: UnixStream) -> Self {
267         unix_stream.inner.into_inner().into()
268     }
269 }
270 
271 impl AsFd for UnixStream {
as_fd(&self) -> BorrowedFd<'_>272     fn as_fd(&self) -> BorrowedFd<'_> {
273         self.inner.as_fd()
274     }
275 }
276 
277 impl From<OwnedFd> for UnixStream {
from(fd: OwnedFd) -> Self278     fn from(fd: OwnedFd) -> Self {
279         UnixStream::from_std(From::from(fd))
280     }
281 }
282