1 //! Use POSIX AIO futures with Tokio. 2 3 use crate::io::interest::Interest; 4 use crate::runtime::io::{ReadyEvent, Registration}; 5 use crate::runtime::scheduler; 6 use mio::event::Source; 7 use mio::Registry; 8 use mio::Token; 9 use std::fmt; 10 use std::io; 11 use std::ops::{Deref, DerefMut}; 12 use std::os::unix::io::AsRawFd; 13 use std::os::unix::prelude::RawFd; 14 use std::task::{ready, Context, Poll}; 15 16 /// Like [`mio::event::Source`], but for POSIX AIO only. 17 /// 18 /// Tokio's consumer must pass an implementor of this trait to create a 19 /// [`Aio`] object. 20 pub trait AioSource { 21 /// Registers this AIO event source with Tokio's reactor. register(&mut self, kq: RawFd, token: usize)22 fn register(&mut self, kq: RawFd, token: usize); 23 24 /// Deregisters this AIO event source with Tokio's reactor. deregister(&mut self)25 fn deregister(&mut self); 26 } 27 28 /// Wraps the user's AioSource in order to implement mio::event::Source, which 29 /// is what the rest of the crate wants. 30 struct MioSource<T>(T); 31 32 impl<T: AioSource> Source for MioSource<T> { register( &mut self, registry: &Registry, token: Token, interests: mio::Interest, ) -> io::Result<()>33 fn register( 34 &mut self, 35 registry: &Registry, 36 token: Token, 37 interests: mio::Interest, 38 ) -> io::Result<()> { 39 assert!(interests.is_aio() || interests.is_lio()); 40 self.0.register(registry.as_raw_fd(), usize::from(token)); 41 Ok(()) 42 } 43 deregister(&mut self, _registry: &Registry) -> io::Result<()>44 fn deregister(&mut self, _registry: &Registry) -> io::Result<()> { 45 self.0.deregister(); 46 Ok(()) 47 } 48 reregister( &mut self, registry: &Registry, token: Token, interests: mio::Interest, ) -> io::Result<()>49 fn reregister( 50 &mut self, 51 registry: &Registry, 52 token: Token, 53 interests: mio::Interest, 54 ) -> io::Result<()> { 55 assert!(interests.is_aio() || interests.is_lio()); 56 self.0.register(registry.as_raw_fd(), usize::from(token)); 57 Ok(()) 58 } 59 } 60 61 /// Associates a POSIX AIO control block with the reactor that drives it. 62 /// 63 /// `Aio`'s wrapped type must implement [`AioSource`] to be driven 64 /// by the reactor. 65 /// 66 /// The wrapped source may be accessed through the `Aio` via the `Deref` and 67 /// `DerefMut` traits. 68 /// 69 /// ## Clearing readiness 70 /// 71 /// If [`Aio::poll_ready`] returns ready, but the consumer determines that the 72 /// Source is not completely ready and must return to the Pending state, 73 /// [`Aio::clear_ready`] may be used. This can be useful with 74 /// [`lio_listio`], which may generate a kevent when only a portion of the 75 /// operations have completed. 76 /// 77 /// ## Platforms 78 /// 79 /// Only FreeBSD implements POSIX AIO with kqueue notification, so 80 /// `Aio` is only available for that operating system. 81 /// 82 /// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html 83 // Note: Unlike every other kqueue event source, POSIX AIO registers events not 84 // via kevent(2) but when the aiocb is submitted to the kernel via aio_read, 85 // aio_write, etc. It needs the kqueue's file descriptor to do that. So 86 // AsyncFd can't be used for POSIX AIO. 87 // 88 // Note that Aio doesn't implement Drop. There's no need. Unlike other 89 // kqueue sources, simply dropping the object effectively deregisters it. 90 pub struct Aio<E> { 91 io: MioSource<E>, 92 registration: Registration, 93 } 94 95 // ===== impl Aio ===== 96 97 impl<E: AioSource> Aio<E> { 98 /// Creates a new `Aio` suitable for use with POSIX AIO functions. 99 /// 100 /// It will be associated with the default reactor. The runtime is usually 101 /// set implicitly when this function is called from a future driven by a 102 /// Tokio runtime, otherwise runtime can be set explicitly with 103 /// [`Runtime::enter`](crate::runtime::Runtime::enter) function. new_for_aio(io: E) -> io::Result<Self>104 pub fn new_for_aio(io: E) -> io::Result<Self> { 105 Self::new_with_interest(io, Interest::AIO) 106 } 107 108 /// Creates a new `Aio` suitable for use with [`lio_listio`]. 109 /// 110 /// It will be associated with the default reactor. The runtime is usually 111 /// set implicitly when this function is called from a future driven by a 112 /// Tokio runtime, otherwise runtime can be set explicitly with 113 /// [`Runtime::enter`](crate::runtime::Runtime::enter) function. 114 /// 115 /// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html new_for_lio(io: E) -> io::Result<Self>116 pub fn new_for_lio(io: E) -> io::Result<Self> { 117 Self::new_with_interest(io, Interest::LIO) 118 } 119 new_with_interest(io: E, interest: Interest) -> io::Result<Self>120 fn new_with_interest(io: E, interest: Interest) -> io::Result<Self> { 121 let mut io = MioSource(io); 122 let handle = scheduler::Handle::current(); 123 let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?; 124 Ok(Self { io, registration }) 125 } 126 127 /// Indicates to Tokio that the source is no longer ready. The internal 128 /// readiness flag will be cleared, and tokio will wait for the next 129 /// edge-triggered readiness notification from the OS. 130 /// 131 /// It is critical that this method not be called unless your code 132 /// _actually observes_ that the source is _not_ ready. The OS must 133 /// deliver a subsequent notification, or this source will block 134 /// forever. It is equally critical that you `do` call this method if you 135 /// resubmit the same structure to the kernel and poll it again. 136 /// 137 /// This method is not very useful with AIO readiness, since each `aiocb` 138 /// structure is typically only used once. It's main use with 139 /// [`lio_listio`], which will sometimes send notification when only a 140 /// portion of its elements are complete. In that case, the caller must 141 /// call `clear_ready` before resubmitting it. 142 /// 143 /// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html clear_ready(&self, ev: AioEvent)144 pub fn clear_ready(&self, ev: AioEvent) { 145 self.registration.clear_readiness(ev.0) 146 } 147 148 /// Destroy the [`Aio`] and return its inner source. into_inner(self) -> E149 pub fn into_inner(self) -> E { 150 self.io.0 151 } 152 153 /// Polls for readiness. Either AIO or LIO counts. 154 /// 155 /// This method returns: 156 /// * `Poll::Pending` if the underlying operation is not complete, whether 157 /// or not it completed successfully. This will be true if the OS is 158 /// still processing it, or if it has not yet been submitted to the OS. 159 /// * `Poll::Ready(Ok(_))` if the underlying operation is complete. 160 /// * `Poll::Ready(Err(_))` if the reactor has been shutdown. This does 161 /// _not_ indicate that the underlying operation encountered an error. 162 /// 163 /// When the method returns `Poll::Pending`, the `Waker` in the provided `Context` 164 /// is scheduled to receive a wakeup when the underlying operation 165 /// completes. Note that on multiple calls to `poll_ready`, only the `Waker` from the 166 /// `Context` passed to the most recent call is scheduled to receive a wakeup. poll_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<AioEvent>>167 pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<AioEvent>> { 168 let ev = ready!(self.registration.poll_read_ready(cx))?; 169 Poll::Ready(Ok(AioEvent(ev))) 170 } 171 } 172 173 impl<E: AioSource> Deref for Aio<E> { 174 type Target = E; 175 deref(&self) -> &E176 fn deref(&self) -> &E { 177 &self.io.0 178 } 179 } 180 181 impl<E: AioSource> DerefMut for Aio<E> { deref_mut(&mut self) -> &mut E182 fn deref_mut(&mut self) -> &mut E { 183 &mut self.io.0 184 } 185 } 186 187 impl<E: AioSource + fmt::Debug> fmt::Debug for Aio<E> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result188 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 189 f.debug_struct("Aio").field("io", &self.io.0).finish() 190 } 191 } 192 193 /// Opaque data returned by [`Aio::poll_ready`]. 194 /// 195 /// It can be fed back to [`Aio::clear_ready`]. 196 #[derive(Debug)] 197 pub struct AioEvent(ReadyEvent); 198