1 use crate::io::interest::Interest;
2 use crate::runtime::io::Registration;
3 use crate::runtime::scheduler;
4 
5 use mio::event::Source;
6 use std::fmt;
7 use std::io;
8 use std::ops::Deref;
9 use std::panic::{RefUnwindSafe, UnwindSafe};
10 use std::task::ready;
11 
12 cfg_io_driver! {
13     /// Associates an I/O resource that implements the [`std::io::Read`] and/or
14     /// [`std::io::Write`] traits with the reactor that drives it.
15     ///
16     /// `PollEvented` uses [`Registration`] internally to take a type that
17     /// implements [`mio::event::Source`] as well as [`std::io::Read`] and/or
18     /// [`std::io::Write`] and associate it with a reactor that will drive it.
19     ///
20     /// Once the [`mio::event::Source`] type is wrapped by `PollEvented`, it can be
21     /// used from within the future's execution model. As such, the
22     /// `PollEvented` type provides [`AsyncRead`] and [`AsyncWrite`]
23     /// implementations using the underlying I/O resource as well as readiness
24     /// events provided by the reactor.
25     ///
26     /// **Note**: While `PollEvented` is `Sync` (if the underlying I/O type is
27     /// `Sync`), the caller must ensure that there are at most two tasks that
28     /// use a `PollEvented` instance concurrently. One for reading and one for
29     /// writing. While violating this requirement is "safe" from a Rust memory
30     /// model point of view, it will result in unexpected behavior in the form
31     /// of lost notifications and tasks hanging.
32     ///
33     /// ## Readiness events
34     ///
35     /// Besides just providing [`AsyncRead`] and [`AsyncWrite`] implementations,
36     /// this type also supports access to the underlying readiness event stream.
37     /// While similar in function to what [`Registration`] provides, the
38     /// semantics are a bit different.
39     ///
40     /// Two functions are provided to access the readiness events:
41     /// [`poll_read_ready`] and [`poll_write_ready`]. These functions return the
42     /// current readiness state of the `PollEvented` instance. If
43     /// [`poll_read_ready`] indicates read readiness, immediately calling
44     /// [`poll_read_ready`] again will also indicate read readiness.
45     ///
46     /// When the operation is attempted and is unable to succeed due to the I/O
47     /// resource not being ready, the caller must call [`clear_readiness`].
48     /// This clears the readiness state until a new readiness event is received.
49     ///
50     /// This allows the caller to implement additional functions. For example,
51     /// [`TcpListener`] implements `poll_accept` by using [`poll_read_ready`] and
52     /// [`clear_readiness`].
53     ///
54     /// ## Platform-specific events
55     ///
56     /// `PollEvented` also allows receiving platform-specific `mio::Ready` events.
57     /// These events are included as part of the read readiness event stream. The
58     /// write readiness event stream is only for `Ready::writable()` events.
59     ///
60     /// [`AsyncRead`]: crate::io::AsyncRead
61     /// [`AsyncWrite`]: crate::io::AsyncWrite
62     /// [`TcpListener`]: crate::net::TcpListener
63     /// [`clear_readiness`]: Registration::clear_readiness
64     /// [`poll_read_ready`]: Registration::poll_read_ready
65     /// [`poll_write_ready`]: Registration::poll_write_ready
66     pub(crate) struct PollEvented<E: Source> {
67         io: Option<E>,
68         registration: Registration,
69     }
70 }
71 
72 // ===== impl PollEvented =====
73 
74 impl<E: Source> PollEvented<E> {
75     /// Creates a new `PollEvented` associated with the default reactor.
76     ///
77     /// The returned `PollEvented` has readable and writable interests. For more control, use
78     /// [`Self::new_with_interest`].
79     ///
80     /// # Panics
81     ///
82     /// This function panics if thread-local runtime is not set.
83     ///
84     /// The runtime is usually set implicitly when this function is called
85     /// from a future driven by a tokio runtime, otherwise runtime can be set
86     /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
87     #[track_caller]
88     #[cfg_attr(feature = "signal", allow(unused))]
new(io: E) -> io::Result<Self>89     pub(crate) fn new(io: E) -> io::Result<Self> {
90         PollEvented::new_with_interest(io, Interest::READABLE | Interest::WRITABLE)
91     }
92 
93     /// Creates a new `PollEvented` associated with the default reactor, for
94     /// specific `Interest` state. `new_with_interest` should be used over `new`
95     /// when you need control over the readiness state, such as when a file
96     /// descriptor only allows reads. This does not add `hup` or `error` so if
97     /// you are interested in those states, you will need to add them to the
98     /// readiness state passed to this function.
99     ///
100     /// # Panics
101     ///
102     /// This function panics if thread-local runtime is not set.
103     ///
104     /// The runtime is usually set implicitly when this function is called from
105     /// a future driven by a tokio runtime, otherwise runtime can be set
106     /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter)
107     /// function.
108     #[track_caller]
109     #[cfg_attr(feature = "signal", allow(unused))]
new_with_interest(io: E, interest: Interest) -> io::Result<Self>110     pub(crate) fn new_with_interest(io: E, interest: Interest) -> io::Result<Self> {
111         Self::new_with_interest_and_handle(io, interest, scheduler::Handle::current())
112     }
113 
114     #[track_caller]
new_with_interest_and_handle( mut io: E, interest: Interest, handle: scheduler::Handle, ) -> io::Result<Self>115     pub(crate) fn new_with_interest_and_handle(
116         mut io: E,
117         interest: Interest,
118         handle: scheduler::Handle,
119     ) -> io::Result<Self> {
120         let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?;
121         Ok(Self {
122             io: Some(io),
123             registration,
124         })
125     }
126 
127     /// Returns a reference to the registration.
128     #[cfg(feature = "net")]
registration(&self) -> &Registration129     pub(crate) fn registration(&self) -> &Registration {
130         &self.registration
131     }
132 
133     /// Deregisters the inner io from the registration and returns a Result containing the inner io.
134     #[cfg(any(feature = "net", feature = "process"))]
into_inner(mut self) -> io::Result<E>135     pub(crate) fn into_inner(mut self) -> io::Result<E> {
136         let mut inner = self.io.take().unwrap(); // As io shouldn't ever be None, just unwrap here.
137         self.registration.deregister(&mut inner)?;
138         Ok(inner)
139     }
140 
141     #[cfg(all(feature = "process", target_os = "linux"))]
poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>142     pub(crate) fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
143         self.registration
144             .poll_read_ready(cx)
145             .map_err(io::Error::from)
146             .map_ok(|_| ())
147     }
148 
149     /// Re-register under new runtime with `interest`.
150     #[cfg(all(feature = "process", target_os = "linux"))]
reregister(&mut self, interest: Interest) -> io::Result<()>151     pub(crate) fn reregister(&mut self, interest: Interest) -> io::Result<()> {
152         let io = self.io.as_mut().unwrap(); // As io shouldn't ever be None, just unwrap here.
153         let _ = self.registration.deregister(io);
154         self.registration =
155             Registration::new_with_interest_and_handle(io, interest, scheduler::Handle::current())?;
156 
157         Ok(())
158     }
159 }
160 
161 feature! {
162     #![any(feature = "net", all(unix, feature = "process"))]
163 
164     use crate::io::ReadBuf;
165     use std::task::{Context, Poll};
166 
167     impl<E: Source> PollEvented<E> {
168         // Safety: The caller must ensure that `E` can read into uninitialized memory
169         pub(crate) unsafe fn poll_read<'a>(
170             &'a self,
171             cx: &mut Context<'_>,
172             buf: &mut ReadBuf<'_>,
173         ) -> Poll<io::Result<()>>
174         where
175             &'a E: io::Read + 'a,
176         {
177             use std::io::Read;
178 
179             loop {
180                 let evt = ready!(self.registration.poll_read_ready(cx))?;
181 
182                 let b = &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]);
183 
184                 // used only when the cfgs below apply
185                 #[allow(unused_variables)]
186                 let len = b.len();
187 
188                 match self.io.as_ref().unwrap().read(b) {
189                     Ok(n) => {
190                         // When mio is using the epoll or kqueue selector, reading a partially full
191                         // buffer is sufficient to show that the socket buffer has been drained.
192                         //
193                         // This optimization does not work for level-triggered selectors such as
194                         // windows or when poll is used.
195                         //
196                         // Read more:
197                         // https://github.com/tokio-rs/tokio/issues/5866
198                         #[cfg(all(
199                             not(mio_unsupported_force_poll_poll),
200                             any(
201                                 // epoll
202                                 target_os = "android",
203                                 target_os = "illumos",
204                                 target_os = "linux",
205                                 target_os = "redox",
206                                 // kqueue
207                                 target_os = "dragonfly",
208                                 target_os = "freebsd",
209                                 target_os = "ios",
210                                 target_os = "macos",
211                                 target_os = "netbsd",
212                                 target_os = "openbsd",
213                                 target_os = "tvos",
214                                 target_os = "visionos",
215                                 target_os = "watchos",
216                             )
217                         ))]
218                         if 0 < n && n < len {
219                             self.registration.clear_readiness(evt);
220                         }
221 
222                         // Safety: We trust `TcpStream::read` to have filled up `n` bytes in the
223                         // buffer.
224                         buf.assume_init(n);
225                         buf.advance(n);
226                         return Poll::Ready(Ok(()));
227                     },
228                     Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
229                         self.registration.clear_readiness(evt);
230                     }
231                     Err(e) => return Poll::Ready(Err(e)),
232                 }
233             }
234         }
235 
236         pub(crate) fn poll_write<'a>(&'a self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>>
237         where
238             &'a E: io::Write + 'a,
239         {
240             use std::io::Write;
241 
242             loop {
243                 let evt = ready!(self.registration.poll_write_ready(cx))?;
244 
245                 match self.io.as_ref().unwrap().write(buf) {
246                     Ok(n) => {
247                         // if we write only part of our buffer, this is sufficient on unix to show
248                         // that the socket buffer is full.  Unfortunately this assumption
249                         // fails for level-triggered selectors (like on Windows or poll even for
250                         // UNIX): https://github.com/tokio-rs/tokio/issues/5866
251                         if n > 0 && (!cfg!(windows) && !cfg!(mio_unsupported_force_poll_poll) && n < buf.len()) {
252                             self.registration.clear_readiness(evt);
253                         }
254 
255                         return Poll::Ready(Ok(n));
256                     },
257                     Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
258                         self.registration.clear_readiness(evt);
259                     }
260                     Err(e) => return Poll::Ready(Err(e)),
261                 }
262             }
263         }
264 
265         #[cfg(any(feature = "net", feature = "process"))]
266         pub(crate) fn poll_write_vectored<'a>(
267             &'a self,
268             cx: &mut Context<'_>,
269             bufs: &[io::IoSlice<'_>],
270         ) -> Poll<io::Result<usize>>
271         where
272             &'a E: io::Write + 'a,
273         {
274             use std::io::Write;
275             self.registration.poll_write_io(cx, || self.io.as_ref().unwrap().write_vectored(bufs))
276         }
277     }
278 }
279 
280 impl<E: Source> UnwindSafe for PollEvented<E> {}
281 
282 impl<E: Source> RefUnwindSafe for PollEvented<E> {}
283 
284 impl<E: Source> Deref for PollEvented<E> {
285     type Target = E;
286 
deref(&self) -> &E287     fn deref(&self) -> &E {
288         self.io.as_ref().unwrap()
289     }
290 }
291 
292 impl<E: Source + fmt::Debug> fmt::Debug for PollEvented<E> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result293     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
294         f.debug_struct("PollEvented").field("io", &self.io).finish()
295     }
296 }
297 
298 impl<E: Source> Drop for PollEvented<E> {
drop(&mut self)299     fn drop(&mut self) {
300         if let Some(mut io) = self.io.take() {
301             // Ignore errors
302             let _ = self.registration.deregister(&mut io);
303         }
304     }
305 }
306