1 //! # Notes
2 //!
3 //! The current implementation is somewhat limited. The `Waker` is not
4 //! implemented, as at the time of writing there is no way to support to wake-up
5 //! a thread from calling `poll_oneoff`.
6 //!
7 //! Furthermore the (re/de)register functions also don't work while concurrently
8 //! polling as both registering and polling requires a lock on the
9 //! `subscriptions`.
10 //!
11 //! Finally `Selector::try_clone`, required by `Registry::try_clone`, doesn't
12 //! work. However this could be implemented by use of an `Arc`.
13 //!
14 //! In summary, this only (barely) works using a single thread.
15
16 use std::cmp::min;
17 use std::io;
18 #[cfg(all(feature = "net", debug_assertions))]
19 use std::sync::atomic::{AtomicUsize, Ordering};
20 use std::sync::{Arc, Mutex};
21 use std::time::Duration;
22
23 #[cfg(feature = "net")]
24 use crate::{Interest, Token};
25
26 cfg_net! {
27 pub(crate) mod tcp {
28 use std::io;
29 use std::net::{self, SocketAddr};
30
31 pub(crate) fn accept(listener: &net::TcpListener) -> io::Result<(net::TcpStream, SocketAddr)> {
32 let (stream, addr) = listener.accept()?;
33 stream.set_nonblocking(true)?;
34 Ok((stream, addr))
35 }
36 }
37 }
38
39 /// Unique id for use as `SelectorId`.
40 #[cfg(all(debug_assertions, feature = "net"))]
41 static NEXT_ID: AtomicUsize = AtomicUsize::new(1);
42
43 pub(crate) struct Selector {
44 #[cfg(all(debug_assertions, feature = "net"))]
45 id: usize,
46 /// Subscriptions (reads events) we're interested in.
47 subscriptions: Arc<Mutex<Vec<wasi::Subscription>>>,
48 }
49
50 impl Selector {
new() -> io::Result<Selector>51 pub(crate) fn new() -> io::Result<Selector> {
52 Ok(Selector {
53 #[cfg(all(debug_assertions, feature = "net"))]
54 id: NEXT_ID.fetch_add(1, Ordering::Relaxed),
55 subscriptions: Arc::new(Mutex::new(Vec::new())),
56 })
57 }
58
59 #[cfg(all(debug_assertions, feature = "net"))]
id(&self) -> usize60 pub(crate) fn id(&self) -> usize {
61 self.id
62 }
63
select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()>64 pub(crate) fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
65 events.clear();
66
67 let mut subscriptions = self.subscriptions.lock().unwrap();
68
69 // If we want to a use a timeout in the `wasi_poll_oneoff()` function
70 // we need another subscription to the list.
71 if let Some(timeout) = timeout {
72 subscriptions.push(timeout_subscription(timeout));
73 }
74
75 // `poll_oneoff` needs the same number of events as subscriptions.
76 let length = subscriptions.len();
77 events.reserve(length);
78
79 debug_assert!(events.capacity() >= length);
80 #[cfg(debug_assertions)]
81 if length == 0 {
82 warn!(
83 "calling mio::Poll::poll with empty subscriptions, this likely not what you want"
84 );
85 }
86
87 let res = unsafe { wasi::poll_oneoff(subscriptions.as_ptr(), events.as_mut_ptr(), length) };
88
89 // Remove the timeout subscription we possibly added above.
90 if timeout.is_some() {
91 let timeout_sub = subscriptions.pop();
92 debug_assert_eq!(
93 timeout_sub.unwrap().u.tag,
94 wasi::EVENTTYPE_CLOCK.raw(),
95 "failed to remove timeout subscription"
96 );
97 }
98
99 drop(subscriptions); // Unlock.
100
101 match res {
102 Ok(n_events) => {
103 // Safety: `poll_oneoff` initialises the `events` for us.
104 unsafe { events.set_len(n_events) };
105
106 // Remove the timeout event.
107 if timeout.is_some() {
108 if let Some(index) = events.iter().position(is_timeout_event) {
109 events.swap_remove(index);
110 }
111 }
112
113 check_errors(&events)
114 }
115 Err(err) => Err(io_err(err)),
116 }
117 }
118
try_clone(&self) -> io::Result<Selector>119 pub(crate) fn try_clone(&self) -> io::Result<Selector> {
120 Ok(Selector {
121 #[cfg(all(debug_assertions, feature = "net"))]
122 id: self.id,
123 subscriptions: self.subscriptions.clone(),
124 })
125 }
126
127 #[cfg(feature = "net")]
register( &self, fd: wasi::Fd, token: Token, interests: Interest, ) -> io::Result<()>128 pub(crate) fn register(
129 &self,
130 fd: wasi::Fd,
131 token: Token,
132 interests: Interest,
133 ) -> io::Result<()> {
134 let mut subscriptions = self.subscriptions.lock().unwrap();
135
136 if interests.is_writable() {
137 let subscription = wasi::Subscription {
138 userdata: token.0 as wasi::Userdata,
139 u: wasi::SubscriptionU {
140 tag: wasi::EVENTTYPE_FD_WRITE.raw(),
141 u: wasi::SubscriptionUU {
142 fd_write: wasi::SubscriptionFdReadwrite {
143 file_descriptor: fd,
144 },
145 },
146 },
147 };
148 subscriptions.push(subscription);
149 }
150
151 if interests.is_readable() {
152 let subscription = wasi::Subscription {
153 userdata: token.0 as wasi::Userdata,
154 u: wasi::SubscriptionU {
155 tag: wasi::EVENTTYPE_FD_READ.raw(),
156 u: wasi::SubscriptionUU {
157 fd_read: wasi::SubscriptionFdReadwrite {
158 file_descriptor: fd,
159 },
160 },
161 },
162 };
163 subscriptions.push(subscription);
164 }
165
166 Ok(())
167 }
168
169 #[cfg(feature = "net")]
reregister( &self, fd: wasi::Fd, token: Token, interests: Interest, ) -> io::Result<()>170 pub(crate) fn reregister(
171 &self,
172 fd: wasi::Fd,
173 token: Token,
174 interests: Interest,
175 ) -> io::Result<()> {
176 self.deregister(fd)
177 .and_then(|()| self.register(fd, token, interests))
178 }
179
180 #[cfg(feature = "net")]
deregister(&self, fd: wasi::Fd) -> io::Result<()>181 pub(crate) fn deregister(&self, fd: wasi::Fd) -> io::Result<()> {
182 let mut subscriptions = self.subscriptions.lock().unwrap();
183
184 let predicate = |subscription: &wasi::Subscription| {
185 // Safety: `subscription.u.tag` defines the type of the union in
186 // `subscription.u.u`.
187 match subscription.u.tag {
188 t if t == wasi::EVENTTYPE_FD_WRITE.raw() => unsafe {
189 subscription.u.u.fd_write.file_descriptor == fd
190 },
191 t if t == wasi::EVENTTYPE_FD_READ.raw() => unsafe {
192 subscription.u.u.fd_read.file_descriptor == fd
193 },
194 _ => false,
195 }
196 };
197
198 let mut ret = Err(io::ErrorKind::NotFound.into());
199
200 while let Some(index) = subscriptions.iter().position(predicate) {
201 subscriptions.swap_remove(index);
202 ret = Ok(())
203 }
204
205 ret
206 }
207 }
208
209 /// Token used to a add a timeout subscription, also used in removing it again.
210 const TIMEOUT_TOKEN: wasi::Userdata = wasi::Userdata::MAX;
211
212 /// Returns a `wasi::Subscription` for `timeout`.
timeout_subscription(timeout: Duration) -> wasi::Subscription213 fn timeout_subscription(timeout: Duration) -> wasi::Subscription {
214 wasi::Subscription {
215 userdata: TIMEOUT_TOKEN,
216 u: wasi::SubscriptionU {
217 tag: wasi::EVENTTYPE_CLOCK.raw(),
218 u: wasi::SubscriptionUU {
219 clock: wasi::SubscriptionClock {
220 id: wasi::CLOCKID_MONOTONIC,
221 // Timestamp is in nanoseconds.
222 timeout: min(wasi::Timestamp::MAX as u128, timeout.as_nanos())
223 as wasi::Timestamp,
224 // Give the implementation another millisecond to coalesce
225 // events.
226 precision: Duration::from_millis(1).as_nanos() as wasi::Timestamp,
227 // Zero means the `timeout` is considered relative to the
228 // current time.
229 flags: 0,
230 },
231 },
232 },
233 }
234 }
235
is_timeout_event(event: &wasi::Event) -> bool236 fn is_timeout_event(event: &wasi::Event) -> bool {
237 event.type_ == wasi::EVENTTYPE_CLOCK && event.userdata == TIMEOUT_TOKEN
238 }
239
240 /// Check all events for possible errors, it returns the first error found.
check_errors(events: &[Event]) -> io::Result<()>241 fn check_errors(events: &[Event]) -> io::Result<()> {
242 for event in events {
243 if event.error != wasi::ERRNO_SUCCESS {
244 return Err(io_err(event.error));
245 }
246 }
247 Ok(())
248 }
249
250 /// Convert `wasi::Errno` into an `io::Error`.
io_err(errno: wasi::Errno) -> io::Error251 fn io_err(errno: wasi::Errno) -> io::Error {
252 // TODO: check if this is valid.
253 io::Error::from_raw_os_error(errno.raw() as i32)
254 }
255
256 pub(crate) type Events = Vec<Event>;
257
258 pub(crate) type Event = wasi::Event;
259
260 pub(crate) mod event {
261 use std::fmt;
262
263 use crate::sys::Event;
264 use crate::Token;
265
token(event: &Event) -> Token266 pub(crate) fn token(event: &Event) -> Token {
267 Token(event.userdata as usize)
268 }
269
is_readable(event: &Event) -> bool270 pub(crate) fn is_readable(event: &Event) -> bool {
271 event.type_ == wasi::EVENTTYPE_FD_READ
272 }
273
is_writable(event: &Event) -> bool274 pub(crate) fn is_writable(event: &Event) -> bool {
275 event.type_ == wasi::EVENTTYPE_FD_WRITE
276 }
277
is_error(_: &Event) -> bool278 pub(crate) fn is_error(_: &Event) -> bool {
279 // Not supported? It could be that `wasi::Event.error` could be used for
280 // this, but the docs say `error that occurred while processing the
281 // subscription request`, so it's checked in `Select::select` already.
282 false
283 }
284
is_read_closed(event: &Event) -> bool285 pub(crate) fn is_read_closed(event: &Event) -> bool {
286 event.type_ == wasi::EVENTTYPE_FD_READ
287 // Safety: checked the type of the union above.
288 && (event.fd_readwrite.flags & wasi::EVENTRWFLAGS_FD_READWRITE_HANGUP) != 0
289 }
290
is_write_closed(event: &Event) -> bool291 pub(crate) fn is_write_closed(event: &Event) -> bool {
292 event.type_ == wasi::EVENTTYPE_FD_WRITE
293 // Safety: checked the type of the union above.
294 && (event.fd_readwrite.flags & wasi::EVENTRWFLAGS_FD_READWRITE_HANGUP) != 0
295 }
296
is_priority(_: &Event) -> bool297 pub(crate) fn is_priority(_: &Event) -> bool {
298 // Not supported.
299 false
300 }
301
is_aio(_: &Event) -> bool302 pub(crate) fn is_aio(_: &Event) -> bool {
303 // Not supported.
304 false
305 }
306
is_lio(_: &Event) -> bool307 pub(crate) fn is_lio(_: &Event) -> bool {
308 // Not supported.
309 false
310 }
311
debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result312 pub(crate) fn debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result {
313 debug_detail!(
314 TypeDetails(wasi::Eventtype),
315 PartialEq::eq,
316 wasi::EVENTTYPE_CLOCK,
317 wasi::EVENTTYPE_FD_READ,
318 wasi::EVENTTYPE_FD_WRITE,
319 );
320
321 #[allow(clippy::trivially_copy_pass_by_ref)]
322 fn check_flag(got: &wasi::Eventrwflags, want: &wasi::Eventrwflags) -> bool {
323 (got & want) != 0
324 }
325 debug_detail!(
326 EventrwflagsDetails(wasi::Eventrwflags),
327 check_flag,
328 wasi::EVENTRWFLAGS_FD_READWRITE_HANGUP,
329 );
330
331 struct EventFdReadwriteDetails(wasi::EventFdReadwrite);
332
333 impl fmt::Debug for EventFdReadwriteDetails {
334 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
335 f.debug_struct("EventFdReadwrite")
336 .field("nbytes", &self.0.nbytes)
337 .field("flags", &EventrwflagsDetails(self.0.flags))
338 .finish()
339 }
340 }
341
342 f.debug_struct("Event")
343 .field("userdata", &event.userdata)
344 .field("error", &event.error)
345 .field("type", &TypeDetails(event.type_))
346 .field("fd_readwrite", &EventFdReadwriteDetails(event.fd_readwrite))
347 .finish()
348 }
349 }
350
351 cfg_os_poll! {
352 cfg_io_source! {
353 pub(crate) struct IoSourceState;
354
355 impl IoSourceState {
356 pub(crate) fn new() -> IoSourceState {
357 IoSourceState
358 }
359
360 pub(crate) fn do_io<T, F, R>(&self, f: F, io: &T) -> io::Result<R>
361 where
362 F: FnOnce(&T) -> io::Result<R>,
363 {
364 // We don't hold state, so we can just call the function and
365 // return.
366 f(io)
367 }
368 }
369 }
370 }
371