1 //! Use POSIX AIO futures with Tokio.
2 
3 use crate::io::interest::Interest;
4 use crate::runtime::io::{ReadyEvent, Registration};
5 use crate::runtime::scheduler;
6 use mio::event::Source;
7 use mio::Registry;
8 use mio::Token;
9 use std::fmt;
10 use std::io;
11 use std::ops::{Deref, DerefMut};
12 use std::os::unix::io::AsRawFd;
13 use std::os::unix::prelude::RawFd;
14 use std::task::{ready, Context, Poll};
15 
16 /// Like [`mio::event::Source`], but for POSIX AIO only.
17 ///
18 /// Tokio's consumer must pass an implementor of this trait to create a
19 /// [`Aio`] object.
20 pub trait AioSource {
21     /// Registers this AIO event source with Tokio's reactor.
register(&mut self, kq: RawFd, token: usize)22     fn register(&mut self, kq: RawFd, token: usize);
23 
24     /// Deregisters this AIO event source with Tokio's reactor.
deregister(&mut self)25     fn deregister(&mut self);
26 }
27 
28 /// Wraps the user's AioSource in order to implement mio::event::Source, which
29 /// is what the rest of the crate wants.
30 struct MioSource<T>(T);
31 
32 impl<T: AioSource> Source for MioSource<T> {
register( &mut self, registry: &Registry, token: Token, interests: mio::Interest, ) -> io::Result<()>33     fn register(
34         &mut self,
35         registry: &Registry,
36         token: Token,
37         interests: mio::Interest,
38     ) -> io::Result<()> {
39         assert!(interests.is_aio() || interests.is_lio());
40         self.0.register(registry.as_raw_fd(), usize::from(token));
41         Ok(())
42     }
43 
deregister(&mut self, _registry: &Registry) -> io::Result<()>44     fn deregister(&mut self, _registry: &Registry) -> io::Result<()> {
45         self.0.deregister();
46         Ok(())
47     }
48 
reregister( &mut self, registry: &Registry, token: Token, interests: mio::Interest, ) -> io::Result<()>49     fn reregister(
50         &mut self,
51         registry: &Registry,
52         token: Token,
53         interests: mio::Interest,
54     ) -> io::Result<()> {
55         assert!(interests.is_aio() || interests.is_lio());
56         self.0.register(registry.as_raw_fd(), usize::from(token));
57         Ok(())
58     }
59 }
60 
61 /// Associates a POSIX AIO control block with the reactor that drives it.
62 ///
63 /// `Aio`'s wrapped type must implement [`AioSource`] to be driven
64 /// by the reactor.
65 ///
66 /// The wrapped source may be accessed through the `Aio` via the `Deref` and
67 /// `DerefMut` traits.
68 ///
69 /// ## Clearing readiness
70 ///
71 /// If [`Aio::poll_ready`] returns ready, but the consumer determines that the
72 /// Source is not completely ready and must return to the Pending state,
73 /// [`Aio::clear_ready`] may be used.  This can be useful with
74 /// [`lio_listio`], which may generate a kevent when only a portion of the
75 /// operations have completed.
76 ///
77 /// ## Platforms
78 ///
79 /// Only FreeBSD implements POSIX AIO with kqueue notification, so
80 /// `Aio` is only available for that operating system.
81 ///
82 /// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html
83 // Note: Unlike every other kqueue event source, POSIX AIO registers events not
84 // via kevent(2) but when the aiocb is submitted to the kernel via aio_read,
85 // aio_write, etc.  It needs the kqueue's file descriptor to do that.  So
86 // AsyncFd can't be used for POSIX AIO.
87 //
88 // Note that Aio doesn't implement Drop.  There's no need.  Unlike other
89 // kqueue sources, simply dropping the object effectively deregisters it.
90 pub struct Aio<E> {
91     io: MioSource<E>,
92     registration: Registration,
93 }
94 
95 // ===== impl Aio =====
96 
97 impl<E: AioSource> Aio<E> {
98     /// Creates a new `Aio` suitable for use with POSIX AIO functions.
99     ///
100     /// It will be associated with the default reactor.  The runtime is usually
101     /// set implicitly when this function is called from a future driven by a
102     /// Tokio runtime, otherwise runtime can be set explicitly with
103     /// [`Runtime::enter`](crate::runtime::Runtime::enter) function.
new_for_aio(io: E) -> io::Result<Self>104     pub fn new_for_aio(io: E) -> io::Result<Self> {
105         Self::new_with_interest(io, Interest::AIO)
106     }
107 
108     /// Creates a new `Aio` suitable for use with [`lio_listio`].
109     ///
110     /// It will be associated with the default reactor.  The runtime is usually
111     /// set implicitly when this function is called from a future driven by a
112     /// Tokio runtime, otherwise runtime can be set explicitly with
113     /// [`Runtime::enter`](crate::runtime::Runtime::enter) function.
114     ///
115     /// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html
new_for_lio(io: E) -> io::Result<Self>116     pub fn new_for_lio(io: E) -> io::Result<Self> {
117         Self::new_with_interest(io, Interest::LIO)
118     }
119 
new_with_interest(io: E, interest: Interest) -> io::Result<Self>120     fn new_with_interest(io: E, interest: Interest) -> io::Result<Self> {
121         let mut io = MioSource(io);
122         let handle = scheduler::Handle::current();
123         let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?;
124         Ok(Self { io, registration })
125     }
126 
127     /// Indicates to Tokio that the source is no longer ready.  The internal
128     /// readiness flag will be cleared, and tokio will wait for the next
129     /// edge-triggered readiness notification from the OS.
130     ///
131     /// It is critical that this method not be called unless your code
132     /// _actually observes_ that the source is _not_ ready.  The OS must
133     /// deliver a subsequent notification, or this source will block
134     /// forever.  It is equally critical that you `do` call this method if you
135     /// resubmit the same structure to the kernel and poll it again.
136     ///
137     /// This method is not very useful with AIO readiness, since each `aiocb`
138     /// structure is typically only used once.  It's main use with
139     /// [`lio_listio`], which will sometimes send notification when only a
140     /// portion of its elements are complete.  In that case, the caller must
141     /// call `clear_ready` before resubmitting it.
142     ///
143     /// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html
clear_ready(&self, ev: AioEvent)144     pub fn clear_ready(&self, ev: AioEvent) {
145         self.registration.clear_readiness(ev.0)
146     }
147 
148     /// Destroy the [`Aio`] and return its inner source.
into_inner(self) -> E149     pub fn into_inner(self) -> E {
150         self.io.0
151     }
152 
153     /// Polls for readiness.  Either AIO or LIO counts.
154     ///
155     /// This method returns:
156     ///  * `Poll::Pending` if the underlying operation is not complete, whether
157     ///     or not it completed successfully.  This will be true if the OS is
158     ///     still processing it, or if it has not yet been submitted to the OS.
159     ///  * `Poll::Ready(Ok(_))` if the underlying operation is complete.
160     ///  * `Poll::Ready(Err(_))` if the reactor has been shutdown.  This does
161     ///     _not_ indicate that the underlying operation encountered an error.
162     ///
163     /// When the method returns `Poll::Pending`, the `Waker` in the provided `Context`
164     /// is scheduled to receive a wakeup when the underlying operation
165     /// completes. Note that on multiple calls to `poll_ready`, only the `Waker` from the
166     /// `Context` passed to the most recent call is scheduled to receive a wakeup.
poll_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<AioEvent>>167     pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<AioEvent>> {
168         let ev = ready!(self.registration.poll_read_ready(cx))?;
169         Poll::Ready(Ok(AioEvent(ev)))
170     }
171 }
172 
173 impl<E: AioSource> Deref for Aio<E> {
174     type Target = E;
175 
deref(&self) -> &E176     fn deref(&self) -> &E {
177         &self.io.0
178     }
179 }
180 
181 impl<E: AioSource> DerefMut for Aio<E> {
deref_mut(&mut self) -> &mut E182     fn deref_mut(&mut self) -> &mut E {
183         &mut self.io.0
184     }
185 }
186 
187 impl<E: AioSource + fmt::Debug> fmt::Debug for Aio<E> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result188     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
189         f.debug_struct("Aio").field("io", &self.io.0).finish()
190     }
191 }
192 
193 /// Opaque data returned by [`Aio::poll_ready`].
194 ///
195 /// It can be fed back to [`Aio::clear_ready`].
196 #[derive(Debug)]
197 pub struct AioEvent(ReadyEvent);
198