1 //! # Notes
2 //!
3 //! The current implementation is somewhat limited. The `Waker` is not
4 //! implemented, as at the time of writing there is no way to support to wake-up
5 //! a thread from calling `poll_oneoff`.
6 //!
7 //! Furthermore the (re/de)register functions also don't work while concurrently
8 //! polling as both registering and polling requires a lock on the
9 //! `subscriptions`.
10 //!
11 //! Finally `Selector::try_clone`, required by `Registry::try_clone`, doesn't
12 //! work. However this could be implemented by use of an `Arc`.
13 //!
14 //! In summary, this only (barely) works using a single thread.
15 
16 use std::cmp::min;
17 use std::io;
18 #[cfg(all(feature = "net", debug_assertions))]
19 use std::sync::atomic::{AtomicUsize, Ordering};
20 use std::sync::{Arc, Mutex};
21 use std::time::Duration;
22 
23 #[cfg(feature = "net")]
24 use crate::{Interest, Token};
25 
26 cfg_net! {
27     pub(crate) mod tcp {
28         use std::io;
29         use std::net::{self, SocketAddr};
30 
31         pub(crate) fn accept(listener: &net::TcpListener) -> io::Result<(net::TcpStream, SocketAddr)> {
32             let (stream, addr) = listener.accept()?;
33             stream.set_nonblocking(true)?;
34             Ok((stream, addr))
35         }
36     }
37 }
38 
39 /// Unique id for use as `SelectorId`.
40 #[cfg(all(debug_assertions, feature = "net"))]
41 static NEXT_ID: AtomicUsize = AtomicUsize::new(1);
42 
43 pub(crate) struct Selector {
44     #[cfg(all(debug_assertions, feature = "net"))]
45     id: usize,
46     /// Subscriptions (reads events) we're interested in.
47     subscriptions: Arc<Mutex<Vec<wasi::Subscription>>>,
48 }
49 
50 impl Selector {
new() -> io::Result<Selector>51     pub(crate) fn new() -> io::Result<Selector> {
52         Ok(Selector {
53             #[cfg(all(debug_assertions, feature = "net"))]
54             id: NEXT_ID.fetch_add(1, Ordering::Relaxed),
55             subscriptions: Arc::new(Mutex::new(Vec::new())),
56         })
57     }
58 
59     #[cfg(all(debug_assertions, feature = "net"))]
id(&self) -> usize60     pub(crate) fn id(&self) -> usize {
61         self.id
62     }
63 
select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()>64     pub(crate) fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
65         events.clear();
66 
67         let mut subscriptions = self.subscriptions.lock().unwrap();
68 
69         // If we want to a use a timeout in the `wasi_poll_oneoff()` function
70         // we need another subscription to the list.
71         if let Some(timeout) = timeout {
72             subscriptions.push(timeout_subscription(timeout));
73         }
74 
75         // `poll_oneoff` needs the same number of events as subscriptions.
76         let length = subscriptions.len();
77         events.reserve(length);
78 
79         debug_assert!(events.capacity() >= length);
80         #[cfg(debug_assertions)]
81         if length == 0 {
82             warn!(
83                 "calling mio::Poll::poll with empty subscriptions, this likely not what you want"
84             );
85         }
86 
87         let res = unsafe { wasi::poll_oneoff(subscriptions.as_ptr(), events.as_mut_ptr(), length) };
88 
89         // Remove the timeout subscription we possibly added above.
90         if timeout.is_some() {
91             let timeout_sub = subscriptions.pop();
92             debug_assert_eq!(
93                 timeout_sub.unwrap().u.tag,
94                 wasi::EVENTTYPE_CLOCK.raw(),
95                 "failed to remove timeout subscription"
96             );
97         }
98 
99         drop(subscriptions); // Unlock.
100 
101         match res {
102             Ok(n_events) => {
103                 // Safety: `poll_oneoff` initialises the `events` for us.
104                 unsafe { events.set_len(n_events) };
105 
106                 // Remove the timeout event.
107                 if timeout.is_some() {
108                     if let Some(index) = events.iter().position(is_timeout_event) {
109                         events.swap_remove(index);
110                     }
111                 }
112 
113                 check_errors(&events)
114             }
115             Err(err) => Err(io_err(err)),
116         }
117     }
118 
try_clone(&self) -> io::Result<Selector>119     pub(crate) fn try_clone(&self) -> io::Result<Selector> {
120         Ok(Selector {
121             #[cfg(all(debug_assertions, feature = "net"))]
122             id: self.id,
123             subscriptions: self.subscriptions.clone(),
124         })
125     }
126 
127     #[cfg(feature = "net")]
register( &self, fd: wasi::Fd, token: Token, interests: Interest, ) -> io::Result<()>128     pub(crate) fn register(
129         &self,
130         fd: wasi::Fd,
131         token: Token,
132         interests: Interest,
133     ) -> io::Result<()> {
134         let mut subscriptions = self.subscriptions.lock().unwrap();
135 
136         if interests.is_writable() {
137             let subscription = wasi::Subscription {
138                 userdata: token.0 as wasi::Userdata,
139                 u: wasi::SubscriptionU {
140                     tag: wasi::EVENTTYPE_FD_WRITE.raw(),
141                     u: wasi::SubscriptionUU {
142                         fd_write: wasi::SubscriptionFdReadwrite {
143                             file_descriptor: fd,
144                         },
145                     },
146                 },
147             };
148             subscriptions.push(subscription);
149         }
150 
151         if interests.is_readable() {
152             let subscription = wasi::Subscription {
153                 userdata: token.0 as wasi::Userdata,
154                 u: wasi::SubscriptionU {
155                     tag: wasi::EVENTTYPE_FD_READ.raw(),
156                     u: wasi::SubscriptionUU {
157                         fd_read: wasi::SubscriptionFdReadwrite {
158                             file_descriptor: fd,
159                         },
160                     },
161                 },
162             };
163             subscriptions.push(subscription);
164         }
165 
166         Ok(())
167     }
168 
169     #[cfg(feature = "net")]
reregister( &self, fd: wasi::Fd, token: Token, interests: Interest, ) -> io::Result<()>170     pub(crate) fn reregister(
171         &self,
172         fd: wasi::Fd,
173         token: Token,
174         interests: Interest,
175     ) -> io::Result<()> {
176         self.deregister(fd)
177             .and_then(|()| self.register(fd, token, interests))
178     }
179 
180     #[cfg(feature = "net")]
deregister(&self, fd: wasi::Fd) -> io::Result<()>181     pub(crate) fn deregister(&self, fd: wasi::Fd) -> io::Result<()> {
182         let mut subscriptions = self.subscriptions.lock().unwrap();
183 
184         let predicate = |subscription: &wasi::Subscription| {
185             // Safety: `subscription.u.tag` defines the type of the union in
186             // `subscription.u.u`.
187             match subscription.u.tag {
188                 t if t == wasi::EVENTTYPE_FD_WRITE.raw() => unsafe {
189                     subscription.u.u.fd_write.file_descriptor == fd
190                 },
191                 t if t == wasi::EVENTTYPE_FD_READ.raw() => unsafe {
192                     subscription.u.u.fd_read.file_descriptor == fd
193                 },
194                 _ => false,
195             }
196         };
197 
198         let mut ret = Err(io::ErrorKind::NotFound.into());
199 
200         while let Some(index) = subscriptions.iter().position(predicate) {
201             subscriptions.swap_remove(index);
202             ret = Ok(())
203         }
204 
205         ret
206     }
207 }
208 
209 /// Token used to a add a timeout subscription, also used in removing it again.
210 const TIMEOUT_TOKEN: wasi::Userdata = wasi::Userdata::MAX;
211 
212 /// Returns a `wasi::Subscription` for `timeout`.
timeout_subscription(timeout: Duration) -> wasi::Subscription213 fn timeout_subscription(timeout: Duration) -> wasi::Subscription {
214     wasi::Subscription {
215         userdata: TIMEOUT_TOKEN,
216         u: wasi::SubscriptionU {
217             tag: wasi::EVENTTYPE_CLOCK.raw(),
218             u: wasi::SubscriptionUU {
219                 clock: wasi::SubscriptionClock {
220                     id: wasi::CLOCKID_MONOTONIC,
221                     // Timestamp is in nanoseconds.
222                     timeout: min(wasi::Timestamp::MAX as u128, timeout.as_nanos())
223                         as wasi::Timestamp,
224                     // Give the implementation another millisecond to coalesce
225                     // events.
226                     precision: Duration::from_millis(1).as_nanos() as wasi::Timestamp,
227                     // Zero means the `timeout` is considered relative to the
228                     // current time.
229                     flags: 0,
230                 },
231             },
232         },
233     }
234 }
235 
is_timeout_event(event: &wasi::Event) -> bool236 fn is_timeout_event(event: &wasi::Event) -> bool {
237     event.type_ == wasi::EVENTTYPE_CLOCK && event.userdata == TIMEOUT_TOKEN
238 }
239 
240 /// Check all events for possible errors, it returns the first error found.
check_errors(events: &[Event]) -> io::Result<()>241 fn check_errors(events: &[Event]) -> io::Result<()> {
242     for event in events {
243         if event.error != wasi::ERRNO_SUCCESS {
244             return Err(io_err(event.error));
245         }
246     }
247     Ok(())
248 }
249 
250 /// Convert `wasi::Errno` into an `io::Error`.
io_err(errno: wasi::Errno) -> io::Error251 fn io_err(errno: wasi::Errno) -> io::Error {
252     // TODO: check if this is valid.
253     io::Error::from_raw_os_error(errno.raw() as i32)
254 }
255 
256 pub(crate) type Events = Vec<Event>;
257 
258 pub(crate) type Event = wasi::Event;
259 
260 pub(crate) mod event {
261     use std::fmt;
262 
263     use crate::sys::Event;
264     use crate::Token;
265 
token(event: &Event) -> Token266     pub(crate) fn token(event: &Event) -> Token {
267         Token(event.userdata as usize)
268     }
269 
is_readable(event: &Event) -> bool270     pub(crate) fn is_readable(event: &Event) -> bool {
271         event.type_ == wasi::EVENTTYPE_FD_READ
272     }
273 
is_writable(event: &Event) -> bool274     pub(crate) fn is_writable(event: &Event) -> bool {
275         event.type_ == wasi::EVENTTYPE_FD_WRITE
276     }
277 
is_error(_: &Event) -> bool278     pub(crate) fn is_error(_: &Event) -> bool {
279         // Not supported? It could be that `wasi::Event.error` could be used for
280         // this, but the docs say `error that occurred while processing the
281         // subscription request`, so it's checked in `Select::select` already.
282         false
283     }
284 
is_read_closed(event: &Event) -> bool285     pub(crate) fn is_read_closed(event: &Event) -> bool {
286         event.type_ == wasi::EVENTTYPE_FD_READ
287             // Safety: checked the type of the union above.
288             && (event.fd_readwrite.flags & wasi::EVENTRWFLAGS_FD_READWRITE_HANGUP) != 0
289     }
290 
is_write_closed(event: &Event) -> bool291     pub(crate) fn is_write_closed(event: &Event) -> bool {
292         event.type_ == wasi::EVENTTYPE_FD_WRITE
293             // Safety: checked the type of the union above.
294             && (event.fd_readwrite.flags & wasi::EVENTRWFLAGS_FD_READWRITE_HANGUP) != 0
295     }
296 
is_priority(_: &Event) -> bool297     pub(crate) fn is_priority(_: &Event) -> bool {
298         // Not supported.
299         false
300     }
301 
is_aio(_: &Event) -> bool302     pub(crate) fn is_aio(_: &Event) -> bool {
303         // Not supported.
304         false
305     }
306 
is_lio(_: &Event) -> bool307     pub(crate) fn is_lio(_: &Event) -> bool {
308         // Not supported.
309         false
310     }
311 
debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result312     pub(crate) fn debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result {
313         debug_detail!(
314             TypeDetails(wasi::Eventtype),
315             PartialEq::eq,
316             wasi::EVENTTYPE_CLOCK,
317             wasi::EVENTTYPE_FD_READ,
318             wasi::EVENTTYPE_FD_WRITE,
319         );
320 
321         #[allow(clippy::trivially_copy_pass_by_ref)]
322         fn check_flag(got: &wasi::Eventrwflags, want: &wasi::Eventrwflags) -> bool {
323             (got & want) != 0
324         }
325         debug_detail!(
326             EventrwflagsDetails(wasi::Eventrwflags),
327             check_flag,
328             wasi::EVENTRWFLAGS_FD_READWRITE_HANGUP,
329         );
330 
331         struct EventFdReadwriteDetails(wasi::EventFdReadwrite);
332 
333         impl fmt::Debug for EventFdReadwriteDetails {
334             fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
335                 f.debug_struct("EventFdReadwrite")
336                     .field("nbytes", &self.0.nbytes)
337                     .field("flags", &EventrwflagsDetails(self.0.flags))
338                     .finish()
339             }
340         }
341 
342         f.debug_struct("Event")
343             .field("userdata", &event.userdata)
344             .field("error", &event.error)
345             .field("type", &TypeDetails(event.type_))
346             .field("fd_readwrite", &EventFdReadwriteDetails(event.fd_readwrite))
347             .finish()
348     }
349 }
350 
351 cfg_os_poll! {
352     cfg_io_source! {
353         pub(crate) struct IoSourceState;
354 
355         impl IoSourceState {
356             pub(crate) fn new() -> IoSourceState {
357                 IoSourceState
358             }
359 
360             pub(crate) fn do_io<T, F, R>(&self, f: F, io: &T) -> io::Result<R>
361             where
362                 F: FnOnce(&T) -> io::Result<R>,
363             {
364                 // We don't hold state, so we can just call the function and
365                 // return.
366                 f(io)
367             }
368         }
369     }
370 }
371