1 use std::fmt; 2 use std::io::{self, IoSlice, IoSliceMut, Read, Write}; 3 use std::net::Shutdown; 4 use std::os::fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}; 5 use std::os::unix::net::{self, SocketAddr}; 6 use std::path::Path; 7 8 use crate::io_source::IoSource; 9 use crate::{event, sys, Interest, Registry, Token}; 10 11 /// A non-blocking Unix stream socket. 12 pub struct UnixStream { 13 inner: IoSource<net::UnixStream>, 14 } 15 16 impl UnixStream { 17 /// Connects to the socket named by `path`. 18 /// 19 /// This may return a `WouldBlock` in which case the socket connection 20 /// cannot be completed immediately. Usually it means the backlog is full. connect<P: AsRef<Path>>(path: P) -> io::Result<UnixStream>21 pub fn connect<P: AsRef<Path>>(path: P) -> io::Result<UnixStream> { 22 let addr = SocketAddr::from_pathname(path)?; 23 UnixStream::connect_addr(&addr) 24 } 25 26 /// Connects to the socket named by `address`. 27 /// 28 /// This may return a `WouldBlock` in which case the socket connection 29 /// cannot be completed immediately. Usually it means the backlog is full. connect_addr(address: &SocketAddr) -> io::Result<UnixStream>30 pub fn connect_addr(address: &SocketAddr) -> io::Result<UnixStream> { 31 sys::uds::stream::connect_addr(address).map(UnixStream::from_std) 32 } 33 34 /// Creates a new `UnixStream` from a standard `net::UnixStream`. 35 /// 36 /// This function is intended to be used to wrap a Unix stream from the 37 /// standard library in the Mio equivalent. The conversion assumes nothing 38 /// about the underlying stream; it is left up to the user to set it in 39 /// non-blocking mode. 40 /// 41 /// # Note 42 /// 43 /// The Unix stream here will not have `connect` called on it, so it 44 /// should already be connected via some other means (be it manually, or 45 /// the standard library). from_std(stream: net::UnixStream) -> UnixStream46 pub fn from_std(stream: net::UnixStream) -> UnixStream { 47 UnixStream { 48 inner: IoSource::new(stream), 49 } 50 } 51 52 /// Creates an unnamed pair of connected sockets. 53 /// 54 /// Returns two `UnixStream`s which are connected to each other. pair() -> io::Result<(UnixStream, UnixStream)>55 pub fn pair() -> io::Result<(UnixStream, UnixStream)> { 56 sys::uds::stream::pair().map(|(stream1, stream2)| { 57 (UnixStream::from_std(stream1), UnixStream::from_std(stream2)) 58 }) 59 } 60 61 /// Returns the socket address of the local half of this connection. local_addr(&self) -> io::Result<SocketAddr>62 pub fn local_addr(&self) -> io::Result<SocketAddr> { 63 self.inner.local_addr() 64 } 65 66 /// Returns the socket address of the remote half of this connection. peer_addr(&self) -> io::Result<SocketAddr>67 pub fn peer_addr(&self) -> io::Result<SocketAddr> { 68 self.inner.peer_addr() 69 } 70 71 /// Returns the value of the `SO_ERROR` option. take_error(&self) -> io::Result<Option<io::Error>>72 pub fn take_error(&self) -> io::Result<Option<io::Error>> { 73 self.inner.take_error() 74 } 75 76 /// Shuts down the read, write, or both halves of this connection. 77 /// 78 /// This function will cause all pending and future I/O calls on the 79 /// specified portions to immediately return with an appropriate value 80 /// (see the documentation of `Shutdown`). shutdown(&self, how: Shutdown) -> io::Result<()>81 pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { 82 self.inner.shutdown(how) 83 } 84 85 /// Execute an I/O operation ensuring that the socket receives more events 86 /// if it hits a [`WouldBlock`] error. 87 /// 88 /// # Notes 89 /// 90 /// This method is required to be called for **all** I/O operations to 91 /// ensure the user will receive events once the socket is ready again after 92 /// returning a [`WouldBlock`] error. 93 /// 94 /// [`WouldBlock`]: io::ErrorKind::WouldBlock 95 /// 96 /// # Examples 97 /// 98 /// ``` 99 /// # use std::error::Error; 100 /// # 101 /// # fn main() -> Result<(), Box<dyn Error>> { 102 /// use std::io; 103 /// use std::os::fd::AsRawFd; 104 /// use mio::net::UnixStream; 105 /// 106 /// let (stream1, stream2) = UnixStream::pair()?; 107 /// 108 /// // Wait until the stream is writable... 109 /// 110 /// // Write to the stream using a direct libc call, of course the 111 /// // `io::Write` implementation would be easier to use. 112 /// let buf = b"hello"; 113 /// let n = stream1.try_io(|| { 114 /// let buf_ptr = &buf as *const _ as *const _; 115 /// let res = unsafe { libc::send(stream1.as_raw_fd(), buf_ptr, buf.len(), 0) }; 116 /// if res != -1 { 117 /// Ok(res as usize) 118 /// } else { 119 /// // If EAGAIN or EWOULDBLOCK is set by libc::send, the closure 120 /// // should return `WouldBlock` error. 121 /// Err(io::Error::last_os_error()) 122 /// } 123 /// })?; 124 /// eprintln!("write {} bytes", n); 125 /// 126 /// // Wait until the stream is readable... 127 /// 128 /// // Read from the stream using a direct libc call, of course the 129 /// // `io::Read` implementation would be easier to use. 130 /// let mut buf = [0; 512]; 131 /// let n = stream2.try_io(|| { 132 /// let buf_ptr = &mut buf as *mut _ as *mut _; 133 /// let res = unsafe { libc::recv(stream2.as_raw_fd(), buf_ptr, buf.len(), 0) }; 134 /// if res != -1 { 135 /// Ok(res as usize) 136 /// } else { 137 /// // If EAGAIN or EWOULDBLOCK is set by libc::recv, the closure 138 /// // should return `WouldBlock` error. 139 /// Err(io::Error::last_os_error()) 140 /// } 141 /// })?; 142 /// eprintln!("read {} bytes", n); 143 /// # Ok(()) 144 /// # } 145 /// ``` try_io<F, T>(&self, f: F) -> io::Result<T> where F: FnOnce() -> io::Result<T>,146 pub fn try_io<F, T>(&self, f: F) -> io::Result<T> 147 where 148 F: FnOnce() -> io::Result<T>, 149 { 150 self.inner.do_io(|_| f()) 151 } 152 } 153 154 impl Read for UnixStream { read(&mut self, buf: &mut [u8]) -> io::Result<usize>155 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { 156 self.inner.do_io(|mut inner| inner.read(buf)) 157 } 158 read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize>159 fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> { 160 self.inner.do_io(|mut inner| inner.read_vectored(bufs)) 161 } 162 } 163 164 impl<'a> Read for &'a UnixStream { read(&mut self, buf: &mut [u8]) -> io::Result<usize>165 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { 166 self.inner.do_io(|mut inner| inner.read(buf)) 167 } 168 read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize>169 fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> { 170 self.inner.do_io(|mut inner| inner.read_vectored(bufs)) 171 } 172 } 173 174 impl Write for UnixStream { write(&mut self, buf: &[u8]) -> io::Result<usize>175 fn write(&mut self, buf: &[u8]) -> io::Result<usize> { 176 self.inner.do_io(|mut inner| inner.write(buf)) 177 } 178 write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize>179 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> { 180 self.inner.do_io(|mut inner| inner.write_vectored(bufs)) 181 } 182 flush(&mut self) -> io::Result<()>183 fn flush(&mut self) -> io::Result<()> { 184 self.inner.do_io(|mut inner| inner.flush()) 185 } 186 } 187 188 impl<'a> Write for &'a UnixStream { write(&mut self, buf: &[u8]) -> io::Result<usize>189 fn write(&mut self, buf: &[u8]) -> io::Result<usize> { 190 self.inner.do_io(|mut inner| inner.write(buf)) 191 } 192 write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize>193 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> { 194 self.inner.do_io(|mut inner| inner.write_vectored(bufs)) 195 } 196 flush(&mut self) -> io::Result<()>197 fn flush(&mut self) -> io::Result<()> { 198 self.inner.do_io(|mut inner| inner.flush()) 199 } 200 } 201 202 impl event::Source for UnixStream { register( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>203 fn register( 204 &mut self, 205 registry: &Registry, 206 token: Token, 207 interests: Interest, 208 ) -> io::Result<()> { 209 self.inner.register(registry, token, interests) 210 } 211 reregister( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>212 fn reregister( 213 &mut self, 214 registry: &Registry, 215 token: Token, 216 interests: Interest, 217 ) -> io::Result<()> { 218 self.inner.reregister(registry, token, interests) 219 } 220 deregister(&mut self, registry: &Registry) -> io::Result<()>221 fn deregister(&mut self, registry: &Registry) -> io::Result<()> { 222 self.inner.deregister(registry) 223 } 224 } 225 226 impl fmt::Debug for UnixStream { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result227 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 228 self.inner.fmt(f) 229 } 230 } 231 232 impl IntoRawFd for UnixStream { into_raw_fd(self) -> RawFd233 fn into_raw_fd(self) -> RawFd { 234 self.inner.into_inner().into_raw_fd() 235 } 236 } 237 238 impl AsRawFd for UnixStream { as_raw_fd(&self) -> RawFd239 fn as_raw_fd(&self) -> RawFd { 240 self.inner.as_raw_fd() 241 } 242 } 243 244 impl FromRawFd for UnixStream { 245 /// Converts a `RawFd` to a `UnixStream`. 246 /// 247 /// # Notes 248 /// 249 /// The caller is responsible for ensuring that the socket is in 250 /// non-blocking mode. from_raw_fd(fd: RawFd) -> UnixStream251 unsafe fn from_raw_fd(fd: RawFd) -> UnixStream { 252 UnixStream::from_std(FromRawFd::from_raw_fd(fd)) 253 } 254 } 255 256 impl From<UnixStream> for net::UnixStream { from(stream: UnixStream) -> Self257 fn from(stream: UnixStream) -> Self { 258 // Safety: This is safe since we are extracting the raw fd from a well-constructed 259 // mio::net::uds::UnixStream which ensures that we actually pass in a valid file 260 // descriptor/socket 261 unsafe { net::UnixStream::from_raw_fd(stream.into_raw_fd()) } 262 } 263 } 264 265 impl From<UnixStream> for OwnedFd { from(unix_stream: UnixStream) -> Self266 fn from(unix_stream: UnixStream) -> Self { 267 unix_stream.inner.into_inner().into() 268 } 269 } 270 271 impl AsFd for UnixStream { as_fd(&self) -> BorrowedFd<'_>272 fn as_fd(&self) -> BorrowedFd<'_> { 273 self.inner.as_fd() 274 } 275 } 276 277 impl From<OwnedFd> for UnixStream { from(fd: OwnedFd) -> Self278 fn from(fd: OwnedFd) -> Self { 279 UnixStream::from_std(From::from(fd)) 280 } 281 } 282