1 // Signal handling
2 cfg_signal_internal_and_unix! {
3     mod signal;
4 }
5 
6 use crate::io::interest::Interest;
7 use crate::io::ready::Ready;
8 use crate::loom::sync::Mutex;
9 use crate::runtime::driver;
10 use crate::runtime::io::registration_set;
11 use crate::runtime::io::{IoDriverMetrics, RegistrationSet, ScheduledIo};
12 
13 use mio::event::Source;
14 use std::fmt;
15 use std::io;
16 use std::sync::Arc;
17 use std::time::Duration;
18 
19 /// I/O driver, backed by Mio.
20 pub(crate) struct Driver {
21     /// True when an event with the signal token is received
22     signal_ready: bool,
23 
24     /// Reuse the `mio::Events` value across calls to poll.
25     events: mio::Events,
26 
27     /// The system event queue.
28     poll: mio::Poll,
29 }
30 
31 /// A reference to an I/O driver.
32 pub(crate) struct Handle {
33     /// Registers I/O resources.
34     registry: mio::Registry,
35 
36     /// Tracks all registrations
37     registrations: RegistrationSet,
38 
39     /// State that should be synchronized
40     synced: Mutex<registration_set::Synced>,
41 
42     /// Used to wake up the reactor from a call to `turn`.
43     /// Not supported on `Wasi` due to lack of threading support.
44     #[cfg(not(target_os = "wasi"))]
45     waker: mio::Waker,
46 
47     pub(crate) metrics: IoDriverMetrics,
48 }
49 
50 #[derive(Debug)]
51 pub(crate) struct ReadyEvent {
52     pub(super) tick: u8,
53     pub(crate) ready: Ready,
54     pub(super) is_shutdown: bool,
55 }
56 
57 cfg_net_unix!(
58     impl ReadyEvent {
59         pub(crate) fn with_ready(&self, ready: Ready) -> Self {
60             Self {
61                 ready,
62                 tick: self.tick,
63                 is_shutdown: self.is_shutdown,
64             }
65         }
66     }
67 );
68 
69 #[derive(Debug, Eq, PartialEq, Clone, Copy)]
70 pub(super) enum Direction {
71     Read,
72     Write,
73 }
74 
75 pub(super) enum Tick {
76     Set,
77     Clear(u8),
78 }
79 
80 const TOKEN_WAKEUP: mio::Token = mio::Token(0);
81 const TOKEN_SIGNAL: mio::Token = mio::Token(1);
82 
_assert_kinds()83 fn _assert_kinds() {
84     fn _assert<T: Send + Sync>() {}
85 
86     _assert::<Handle>();
87 }
88 
89 // ===== impl Driver =====
90 
91 impl Driver {
92     /// Creates a new event loop, returning any error that happened during the
93     /// creation.
new(nevents: usize) -> io::Result<(Driver, Handle)>94     pub(crate) fn new(nevents: usize) -> io::Result<(Driver, Handle)> {
95         let poll = mio::Poll::new()?;
96         #[cfg(not(target_os = "wasi"))]
97         let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?;
98         let registry = poll.registry().try_clone()?;
99 
100         let driver = Driver {
101             signal_ready: false,
102             events: mio::Events::with_capacity(nevents),
103             poll,
104         };
105 
106         let (registrations, synced) = RegistrationSet::new();
107 
108         let handle = Handle {
109             registry,
110             registrations,
111             synced: Mutex::new(synced),
112             #[cfg(not(target_os = "wasi"))]
113             waker,
114             metrics: IoDriverMetrics::default(),
115         };
116 
117         Ok((driver, handle))
118     }
119 
park(&mut self, rt_handle: &driver::Handle)120     pub(crate) fn park(&mut self, rt_handle: &driver::Handle) {
121         let handle = rt_handle.io();
122         self.turn(handle, None);
123     }
124 
park_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration)125     pub(crate) fn park_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) {
126         let handle = rt_handle.io();
127         self.turn(handle, Some(duration));
128     }
129 
shutdown(&mut self, rt_handle: &driver::Handle)130     pub(crate) fn shutdown(&mut self, rt_handle: &driver::Handle) {
131         let handle = rt_handle.io();
132         let ios = handle.registrations.shutdown(&mut handle.synced.lock());
133 
134         // `shutdown()` must be called without holding the lock.
135         for io in ios {
136             io.shutdown();
137         }
138     }
139 
turn(&mut self, handle: &Handle, max_wait: Option<Duration>)140     fn turn(&mut self, handle: &Handle, max_wait: Option<Duration>) {
141         debug_assert!(!handle.registrations.is_shutdown(&handle.synced.lock()));
142 
143         handle.release_pending_registrations();
144 
145         let events = &mut self.events;
146 
147         // Block waiting for an event to happen, peeling out how many events
148         // happened.
149         match self.poll.poll(events, max_wait) {
150             Ok(()) => {}
151             Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
152             #[cfg(target_os = "wasi")]
153             Err(e) if e.kind() == io::ErrorKind::InvalidInput => {
154                 // In case of wasm32_wasi this error happens, when trying to poll without subscriptions
155                 // just return from the park, as there would be nothing, which wakes us up.
156             }
157             Err(e) => panic!("unexpected error when polling the I/O driver: {e:?}"),
158         }
159 
160         // Process all the events that came in, dispatching appropriately
161         let mut ready_count = 0;
162         for event in events.iter() {
163             let token = event.token();
164 
165             if token == TOKEN_WAKEUP {
166                 // Nothing to do, the event is used to unblock the I/O driver
167             } else if token == TOKEN_SIGNAL {
168                 self.signal_ready = true;
169             } else {
170                 let ready = Ready::from_mio(event);
171                 let ptr = super::EXPOSE_IO.from_exposed_addr(token.0);
172 
173                 // Safety: we ensure that the pointers used as tokens are not freed
174                 // until they are both deregistered from mio **and** we know the I/O
175                 // driver is not concurrently polling. The I/O driver holds ownership of
176                 // an `Arc<ScheduledIo>` so we can safely cast this to a ref.
177                 let io: &ScheduledIo = unsafe { &*ptr };
178 
179                 io.set_readiness(Tick::Set, |curr| curr | ready);
180                 io.wake(ready);
181 
182                 ready_count += 1;
183             }
184         }
185 
186         handle.metrics.incr_ready_count_by(ready_count);
187     }
188 }
189 
190 impl fmt::Debug for Driver {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result191     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
192         write!(f, "Driver")
193     }
194 }
195 
196 impl Handle {
197     /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise
198     /// makes the next call to `turn` return immediately.
199     ///
200     /// This method is intended to be used in situations where a notification
201     /// needs to otherwise be sent to the main reactor. If the reactor is
202     /// currently blocked inside of `turn` then it will wake up and soon return
203     /// after this method has been called. If the reactor is not currently
204     /// blocked in `turn`, then the next call to `turn` will not block and
205     /// return immediately.
unpark(&self)206     pub(crate) fn unpark(&self) {
207         #[cfg(not(target_os = "wasi"))]
208         self.waker.wake().expect("failed to wake I/O driver");
209     }
210 
211     /// Registers an I/O resource with the reactor for a given `mio::Ready` state.
212     ///
213     /// The registration token is returned.
add_source( &self, source: &mut impl mio::event::Source, interest: Interest, ) -> io::Result<Arc<ScheduledIo>>214     pub(super) fn add_source(
215         &self,
216         source: &mut impl mio::event::Source,
217         interest: Interest,
218     ) -> io::Result<Arc<ScheduledIo>> {
219         let scheduled_io = self.registrations.allocate(&mut self.synced.lock())?;
220         let token = scheduled_io.token();
221 
222         // we should remove the `scheduled_io` from the `registrations` set if registering
223         // the `source` with the OS fails. Otherwise it will leak the `scheduled_io`.
224         if let Err(e) = self.registry.register(source, token, interest.to_mio()) {
225             // safety: `scheduled_io` is part of the `registrations` set.
226             unsafe {
227                 self.registrations
228                     .remove(&mut self.synced.lock(), &scheduled_io)
229             };
230 
231             return Err(e);
232         }
233 
234         // TODO: move this logic to `RegistrationSet` and use a `CountedLinkedList`
235         self.metrics.incr_fd_count();
236 
237         Ok(scheduled_io)
238     }
239 
240     /// Deregisters an I/O resource from the reactor.
deregister_source( &self, registration: &Arc<ScheduledIo>, source: &mut impl Source, ) -> io::Result<()>241     pub(super) fn deregister_source(
242         &self,
243         registration: &Arc<ScheduledIo>,
244         source: &mut impl Source,
245     ) -> io::Result<()> {
246         // Deregister the source with the OS poller **first**
247         self.registry.deregister(source)?;
248 
249         if self
250             .registrations
251             .deregister(&mut self.synced.lock(), registration)
252         {
253             self.unpark();
254         }
255 
256         self.metrics.dec_fd_count();
257 
258         Ok(())
259     }
260 
release_pending_registrations(&self)261     fn release_pending_registrations(&self) {
262         if self.registrations.needs_release() {
263             self.registrations.release(&mut self.synced.lock());
264         }
265     }
266 }
267 
268 impl fmt::Debug for Handle {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result269     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
270         write!(f, "Handle")
271     }
272 }
273 
274 impl Direction {
mask(self) -> Ready275     pub(super) fn mask(self) -> Ready {
276         match self {
277             Direction::Read => Ready::READABLE | Ready::READ_CLOSED,
278             Direction::Write => Ready::WRITABLE | Ready::WRITE_CLOSED,
279         }
280     }
281 }
282