1 use crate::io::interest::Interest; 2 use crate::runtime::io::Registration; 3 use crate::runtime::scheduler; 4 5 use mio::event::Source; 6 use std::fmt; 7 use std::io; 8 use std::ops::Deref; 9 use std::panic::{RefUnwindSafe, UnwindSafe}; 10 use std::task::ready; 11 12 cfg_io_driver! { 13 /// Associates an I/O resource that implements the [`std::io::Read`] and/or 14 /// [`std::io::Write`] traits with the reactor that drives it. 15 /// 16 /// `PollEvented` uses [`Registration`] internally to take a type that 17 /// implements [`mio::event::Source`] as well as [`std::io::Read`] and/or 18 /// [`std::io::Write`] and associate it with a reactor that will drive it. 19 /// 20 /// Once the [`mio::event::Source`] type is wrapped by `PollEvented`, it can be 21 /// used from within the future's execution model. As such, the 22 /// `PollEvented` type provides [`AsyncRead`] and [`AsyncWrite`] 23 /// implementations using the underlying I/O resource as well as readiness 24 /// events provided by the reactor. 25 /// 26 /// **Note**: While `PollEvented` is `Sync` (if the underlying I/O type is 27 /// `Sync`), the caller must ensure that there are at most two tasks that 28 /// use a `PollEvented` instance concurrently. One for reading and one for 29 /// writing. While violating this requirement is "safe" from a Rust memory 30 /// model point of view, it will result in unexpected behavior in the form 31 /// of lost notifications and tasks hanging. 32 /// 33 /// ## Readiness events 34 /// 35 /// Besides just providing [`AsyncRead`] and [`AsyncWrite`] implementations, 36 /// this type also supports access to the underlying readiness event stream. 37 /// While similar in function to what [`Registration`] provides, the 38 /// semantics are a bit different. 39 /// 40 /// Two functions are provided to access the readiness events: 41 /// [`poll_read_ready`] and [`poll_write_ready`]. These functions return the 42 /// current readiness state of the `PollEvented` instance. If 43 /// [`poll_read_ready`] indicates read readiness, immediately calling 44 /// [`poll_read_ready`] again will also indicate read readiness. 45 /// 46 /// When the operation is attempted and is unable to succeed due to the I/O 47 /// resource not being ready, the caller must call [`clear_readiness`]. 48 /// This clears the readiness state until a new readiness event is received. 49 /// 50 /// This allows the caller to implement additional functions. For example, 51 /// [`TcpListener`] implements `poll_accept` by using [`poll_read_ready`] and 52 /// [`clear_readiness`]. 53 /// 54 /// ## Platform-specific events 55 /// 56 /// `PollEvented` also allows receiving platform-specific `mio::Ready` events. 57 /// These events are included as part of the read readiness event stream. The 58 /// write readiness event stream is only for `Ready::writable()` events. 59 /// 60 /// [`AsyncRead`]: crate::io::AsyncRead 61 /// [`AsyncWrite`]: crate::io::AsyncWrite 62 /// [`TcpListener`]: crate::net::TcpListener 63 /// [`clear_readiness`]: Registration::clear_readiness 64 /// [`poll_read_ready`]: Registration::poll_read_ready 65 /// [`poll_write_ready`]: Registration::poll_write_ready 66 pub(crate) struct PollEvented<E: Source> { 67 io: Option<E>, 68 registration: Registration, 69 } 70 } 71 72 // ===== impl PollEvented ===== 73 74 impl<E: Source> PollEvented<E> { 75 /// Creates a new `PollEvented` associated with the default reactor. 76 /// 77 /// The returned `PollEvented` has readable and writable interests. For more control, use 78 /// [`Self::new_with_interest`]. 79 /// 80 /// # Panics 81 /// 82 /// This function panics if thread-local runtime is not set. 83 /// 84 /// The runtime is usually set implicitly when this function is called 85 /// from a future driven by a tokio runtime, otherwise runtime can be set 86 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. 87 #[track_caller] 88 #[cfg_attr(feature = "signal", allow(unused))] new(io: E) -> io::Result<Self>89 pub(crate) fn new(io: E) -> io::Result<Self> { 90 PollEvented::new_with_interest(io, Interest::READABLE | Interest::WRITABLE) 91 } 92 93 /// Creates a new `PollEvented` associated with the default reactor, for 94 /// specific `Interest` state. `new_with_interest` should be used over `new` 95 /// when you need control over the readiness state, such as when a file 96 /// descriptor only allows reads. This does not add `hup` or `error` so if 97 /// you are interested in those states, you will need to add them to the 98 /// readiness state passed to this function. 99 /// 100 /// # Panics 101 /// 102 /// This function panics if thread-local runtime is not set. 103 /// 104 /// The runtime is usually set implicitly when this function is called from 105 /// a future driven by a tokio runtime, otherwise runtime can be set 106 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) 107 /// function. 108 #[track_caller] 109 #[cfg_attr(feature = "signal", allow(unused))] new_with_interest(io: E, interest: Interest) -> io::Result<Self>110 pub(crate) fn new_with_interest(io: E, interest: Interest) -> io::Result<Self> { 111 Self::new_with_interest_and_handle(io, interest, scheduler::Handle::current()) 112 } 113 114 #[track_caller] new_with_interest_and_handle( mut io: E, interest: Interest, handle: scheduler::Handle, ) -> io::Result<Self>115 pub(crate) fn new_with_interest_and_handle( 116 mut io: E, 117 interest: Interest, 118 handle: scheduler::Handle, 119 ) -> io::Result<Self> { 120 let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?; 121 Ok(Self { 122 io: Some(io), 123 registration, 124 }) 125 } 126 127 /// Returns a reference to the registration. 128 #[cfg(feature = "net")] registration(&self) -> &Registration129 pub(crate) fn registration(&self) -> &Registration { 130 &self.registration 131 } 132 133 /// Deregisters the inner io from the registration and returns a Result containing the inner io. 134 #[cfg(any(feature = "net", feature = "process"))] into_inner(mut self) -> io::Result<E>135 pub(crate) fn into_inner(mut self) -> io::Result<E> { 136 let mut inner = self.io.take().unwrap(); // As io shouldn't ever be None, just unwrap here. 137 self.registration.deregister(&mut inner)?; 138 Ok(inner) 139 } 140 141 #[cfg(all(feature = "process", target_os = "linux"))] poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>142 pub(crate) fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 143 self.registration 144 .poll_read_ready(cx) 145 .map_err(io::Error::from) 146 .map_ok(|_| ()) 147 } 148 149 /// Re-register under new runtime with `interest`. 150 #[cfg(all(feature = "process", target_os = "linux"))] reregister(&mut self, interest: Interest) -> io::Result<()>151 pub(crate) fn reregister(&mut self, interest: Interest) -> io::Result<()> { 152 let io = self.io.as_mut().unwrap(); // As io shouldn't ever be None, just unwrap here. 153 let _ = self.registration.deregister(io); 154 self.registration = 155 Registration::new_with_interest_and_handle(io, interest, scheduler::Handle::current())?; 156 157 Ok(()) 158 } 159 } 160 161 feature! { 162 #![any(feature = "net", all(unix, feature = "process"))] 163 164 use crate::io::ReadBuf; 165 use std::task::{Context, Poll}; 166 167 impl<E: Source> PollEvented<E> { 168 // Safety: The caller must ensure that `E` can read into uninitialized memory 169 pub(crate) unsafe fn poll_read<'a>( 170 &'a self, 171 cx: &mut Context<'_>, 172 buf: &mut ReadBuf<'_>, 173 ) -> Poll<io::Result<()>> 174 where 175 &'a E: io::Read + 'a, 176 { 177 use std::io::Read; 178 179 loop { 180 let evt = ready!(self.registration.poll_read_ready(cx))?; 181 182 let b = &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]); 183 184 // used only when the cfgs below apply 185 #[allow(unused_variables)] 186 let len = b.len(); 187 188 match self.io.as_ref().unwrap().read(b) { 189 Ok(n) => { 190 // When mio is using the epoll or kqueue selector, reading a partially full 191 // buffer is sufficient to show that the socket buffer has been drained. 192 // 193 // This optimization does not work for level-triggered selectors such as 194 // windows or when poll is used. 195 // 196 // Read more: 197 // https://github.com/tokio-rs/tokio/issues/5866 198 #[cfg(all( 199 not(mio_unsupported_force_poll_poll), 200 any( 201 // epoll 202 target_os = "android", 203 target_os = "illumos", 204 target_os = "linux", 205 target_os = "redox", 206 // kqueue 207 target_os = "dragonfly", 208 target_os = "freebsd", 209 target_os = "ios", 210 target_os = "macos", 211 target_os = "netbsd", 212 target_os = "openbsd", 213 target_os = "tvos", 214 target_os = "visionos", 215 target_os = "watchos", 216 ) 217 ))] 218 if 0 < n && n < len { 219 self.registration.clear_readiness(evt); 220 } 221 222 // Safety: We trust `TcpStream::read` to have filled up `n` bytes in the 223 // buffer. 224 buf.assume_init(n); 225 buf.advance(n); 226 return Poll::Ready(Ok(())); 227 }, 228 Err(e) if e.kind() == io::ErrorKind::WouldBlock => { 229 self.registration.clear_readiness(evt); 230 } 231 Err(e) => return Poll::Ready(Err(e)), 232 } 233 } 234 } 235 236 pub(crate) fn poll_write<'a>(&'a self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> 237 where 238 &'a E: io::Write + 'a, 239 { 240 use std::io::Write; 241 242 loop { 243 let evt = ready!(self.registration.poll_write_ready(cx))?; 244 245 match self.io.as_ref().unwrap().write(buf) { 246 Ok(n) => { 247 // if we write only part of our buffer, this is sufficient on unix to show 248 // that the socket buffer is full. Unfortunately this assumption 249 // fails for level-triggered selectors (like on Windows or poll even for 250 // UNIX): https://github.com/tokio-rs/tokio/issues/5866 251 if n > 0 && (!cfg!(windows) && !cfg!(mio_unsupported_force_poll_poll) && n < buf.len()) { 252 self.registration.clear_readiness(evt); 253 } 254 255 return Poll::Ready(Ok(n)); 256 }, 257 Err(e) if e.kind() == io::ErrorKind::WouldBlock => { 258 self.registration.clear_readiness(evt); 259 } 260 Err(e) => return Poll::Ready(Err(e)), 261 } 262 } 263 } 264 265 #[cfg(any(feature = "net", feature = "process"))] 266 pub(crate) fn poll_write_vectored<'a>( 267 &'a self, 268 cx: &mut Context<'_>, 269 bufs: &[io::IoSlice<'_>], 270 ) -> Poll<io::Result<usize>> 271 where 272 &'a E: io::Write + 'a, 273 { 274 use std::io::Write; 275 self.registration.poll_write_io(cx, || self.io.as_ref().unwrap().write_vectored(bufs)) 276 } 277 } 278 } 279 280 impl<E: Source> UnwindSafe for PollEvented<E> {} 281 282 impl<E: Source> RefUnwindSafe for PollEvented<E> {} 283 284 impl<E: Source> Deref for PollEvented<E> { 285 type Target = E; 286 deref(&self) -> &E287 fn deref(&self) -> &E { 288 self.io.as_ref().unwrap() 289 } 290 } 291 292 impl<E: Source + fmt::Debug> fmt::Debug for PollEvented<E> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result293 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 294 f.debug_struct("PollEvented").field("io", &self.io).finish() 295 } 296 } 297 298 impl<E: Source> Drop for PollEvented<E> { drop(&mut self)299 fn drop(&mut self) { 300 if let Some(mut io) = self.io.take() { 301 // Ignore errors 302 let _ = self.registration.deregister(&mut io); 303 } 304 } 305 } 306