1 #![warn(rust_2018_idioms)]
2 #![cfg(all(target_os = "freebsd", feature = "net"))]
3 
4 use mio_aio::{AioFsyncMode, SourceApi};
5 use std::{
6     future::Future,
7     io, mem,
8     os::fd::AsFd,
9     os::unix::io::{AsRawFd, RawFd},
10     pin::{pin, Pin},
11     task::{Context, Poll},
12 };
13 use tempfile::tempfile;
14 use tokio::io::bsd::{Aio, AioSource};
15 use tokio_test::assert_pending;
16 
17 mod aio {
18     use super::*;
19 
20     #[derive(Debug)]
21     struct TokioSource<'fd>(mio_aio::Source<nix::sys::aio::AioFsync<'fd>>);
22 
23     impl<'fd> AioSource for TokioSource<'fd> {
register(&mut self, kq: RawFd, token: usize)24         fn register(&mut self, kq: RawFd, token: usize) {
25             self.0.register_raw(kq, token)
26         }
deregister(&mut self)27         fn deregister(&mut self) {
28             self.0.deregister_raw()
29         }
30     }
31 
32     /// A very crude implementation of an AIO-based future
33     struct FsyncFut<'fd>(Aio<TokioSource<'fd>>);
34 
35     impl<'fd> FsyncFut<'fd> {
submit(self: Pin<&mut Self>) -> io::Result<()>36         pub fn submit(self: Pin<&mut Self>) -> io::Result<()> {
37             let p = unsafe { self.map_unchecked_mut(|s| &mut s.0 .0) };
38             match p.submit() {
39                 Ok(()) => Ok(()),
40                 Err(e) => Err(io::Error::from_raw_os_error(e as i32)),
41             }
42         }
43     }
44 
45     impl<'fd> Future for FsyncFut<'fd> {
46         type Output = io::Result<()>;
47 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>48         fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
49             let poll_result = self.0.poll_ready(cx);
50             match poll_result {
51                 Poll::Pending => Poll::Pending,
52                 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
53                 Poll::Ready(Ok(_ev)) => {
54                     // At this point, we could clear readiness.  But there's no
55                     // point, since we're about to drop the Aio.
56                     let p = unsafe { self.map_unchecked_mut(|s| &mut s.0 .0) };
57                     let result = p.aio_return();
58                     match result {
59                         Ok(r) => Poll::Ready(Ok(r)),
60                         Err(e) => Poll::Ready(Err(io::Error::from_raw_os_error(e as i32))),
61                     }
62                 }
63             }
64         }
65     }
66 
67     /// Low-level AIO Source
68     ///
69     /// An example bypassing mio_aio and Nix to demonstrate how the kevent
70     /// registration actually works, under the hood.
71     struct LlSource(Pin<Box<libc::aiocb>>);
72 
73     impl LlSource {
fsync(mut self: Pin<&mut Self>)74         fn fsync(mut self: Pin<&mut Self>) {
75             let r = unsafe {
76                 let p = self.0.as_mut().get_unchecked_mut();
77                 libc::aio_fsync(libc::O_SYNC, p)
78             };
79             assert_eq!(0, r);
80         }
81     }
82 
83     impl AioSource for LlSource {
register(&mut self, kq: RawFd, token: usize)84         fn register(&mut self, kq: RawFd, token: usize) {
85             let mut sev: libc::sigevent = unsafe { mem::MaybeUninit::zeroed().assume_init() };
86             sev.sigev_notify = libc::SIGEV_KEVENT;
87             sev.sigev_signo = kq;
88             sev.sigev_value = libc::sigval {
89                 sival_ptr: token as *mut libc::c_void,
90             };
91             self.0.aio_sigevent = sev;
92         }
93 
deregister(&mut self)94         fn deregister(&mut self) {
95             unsafe {
96                 self.0.aio_sigevent = mem::zeroed();
97             }
98         }
99     }
100 
101     struct LlFut(Aio<LlSource>);
102 
103     impl LlFut {
fsync(self: Pin<&mut Self>)104         pub fn fsync(self: Pin<&mut Self>) {
105             let p = unsafe { self.map_unchecked_mut(|s| &mut *(s.0)) };
106             p.fsync();
107         }
108     }
109 
110     impl Future for LlFut {
111         type Output = std::io::Result<usize>;
112 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>113         fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
114             let poll_result = self.0.poll_ready(cx);
115             match poll_result {
116                 Poll::Pending => Poll::Pending,
117                 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
118                 Poll::Ready(Ok(ev)) => {
119                     // Clearing readiness makes the future non-idempotent; the
120                     // caller can't poll it repeatedly after it has already
121                     // returned Ready.  But that's ok; most futures behave this
122                     // way.
123                     self.0.clear_ready(ev);
124                     let r = unsafe { libc::aio_return(self.0 .0.as_mut().get_unchecked_mut()) };
125                     if r >= 0 {
126                         Poll::Ready(Ok(r as usize))
127                     } else {
128                         Poll::Ready(Err(io::Error::last_os_error()))
129                     }
130                 }
131             }
132         }
133     }
134 
135     #[tokio::test]
fsync()136     async fn fsync() {
137         let f = tempfile().unwrap();
138         let fd = f.as_fd();
139         let mode = AioFsyncMode::O_SYNC;
140         let source = TokioSource(mio_aio::Fsync::fsync(fd, mode, 0));
141         let poll_aio = Aio::new_for_aio(source).unwrap();
142         let mut fut = pin!(FsyncFut(poll_aio));
143         fut.as_mut().submit().unwrap();
144         fut.await.unwrap();
145     }
146 
147     #[tokio::test]
ll_fsync()148     async fn ll_fsync() {
149         let f = tempfile().unwrap();
150         let fd = f.as_raw_fd();
151         let mut aiocb: libc::aiocb = unsafe { mem::MaybeUninit::zeroed().assume_init() };
152         aiocb.aio_fildes = fd;
153         let source = LlSource(Box::pin(aiocb));
154         let mut poll_aio = Aio::new_for_aio(source).unwrap();
155         let r = unsafe {
156             let p = poll_aio.0.as_mut().get_unchecked_mut();
157             libc::aio_fsync(libc::O_SYNC, p)
158         };
159         assert_eq!(0, r);
160         let fut = LlFut(poll_aio);
161         fut.await.unwrap();
162     }
163 
164     /// A suitably crafted future type can reuse an Aio object
165     #[tokio::test]
reuse()166     async fn reuse() {
167         let f = tempfile().unwrap();
168         let fd = f.as_raw_fd();
169         let mut aiocb: libc::aiocb = unsafe { mem::MaybeUninit::zeroed().assume_init() };
170         aiocb.aio_fildes = fd;
171         let source = LlSource(Box::pin(aiocb));
172         let poll_aio = Aio::new_for_aio(source).unwrap();
173 
174         // Send the operation to the kernel the first time
175         let mut fut = LlFut(poll_aio);
176         {
177             let mut pfut = Pin::new(&mut fut);
178             pfut.as_mut().fsync();
179             pfut.as_mut().await.unwrap();
180         }
181 
182         // Check that readiness was cleared
183         let mut ctx = Context::from_waker(futures::task::noop_waker_ref());
184         assert_pending!(fut.0.poll_ready(&mut ctx));
185 
186         // and reuse the future and its Aio object
187         {
188             let mut pfut = Pin::new(&mut fut);
189             pfut.as_mut().fsync();
190             pfut.as_mut().await.unwrap();
191         }
192     }
193 }
194 
195 mod lio {
196     use super::*;
197 
198     /// Low-level source based on lio_listio
199     ///
200     /// An example demonstrating using AIO with `Interest::Lio`.  mio_aio 0.8
201     /// doesn't include any bindings for lio_listio, so we've got to go
202     /// low-level.
203     struct LioSource<'a> {
204         aiocb: Pin<&'a mut [&'a mut libc::aiocb; 1]>,
205         sev: libc::sigevent,
206     }
207 
208     impl<'a> LioSource<'a> {
new(aiocb: Pin<&'a mut [&'a mut libc::aiocb; 1]>) -> Self209         fn new(aiocb: Pin<&'a mut [&'a mut libc::aiocb; 1]>) -> Self {
210             LioSource {
211                 aiocb,
212                 sev: unsafe { mem::zeroed() },
213             }
214         }
215 
submit(mut self: Pin<&mut Self>)216         fn submit(mut self: Pin<&mut Self>) {
217             let p: *const *mut libc::aiocb =
218                 unsafe { self.aiocb.as_mut().get_unchecked_mut() } as *const _ as *const *mut _;
219             let r = unsafe { libc::lio_listio(libc::LIO_NOWAIT, p, 1, &mut self.sev) };
220             assert_eq!(r, 0);
221         }
222     }
223 
224     impl<'a> AioSource for LioSource<'a> {
register(&mut self, kq: RawFd, token: usize)225         fn register(&mut self, kq: RawFd, token: usize) {
226             let mut sev: libc::sigevent = unsafe { mem::MaybeUninit::zeroed().assume_init() };
227             sev.sigev_notify = libc::SIGEV_KEVENT;
228             sev.sigev_signo = kq;
229             sev.sigev_value = libc::sigval {
230                 sival_ptr: token as *mut libc::c_void,
231             };
232             self.sev = sev;
233         }
234 
deregister(&mut self)235         fn deregister(&mut self) {
236             unsafe {
237                 self.sev = mem::zeroed();
238             }
239         }
240     }
241 
242     struct LioFut<'a>(Aio<LioSource<'a>>);
243 
244     impl<'a> LioFut<'a> {
submit(self: Pin<&mut Self>)245         pub fn submit(self: Pin<&mut Self>) {
246             let p = unsafe { self.map_unchecked_mut(|s| &mut *(s.0)) };
247             p.submit();
248         }
249     }
250 
251     impl<'a> Future for LioFut<'a> {
252         type Output = std::io::Result<usize>;
253 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>254         fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
255             let poll_result = self.0.poll_ready(cx);
256             match poll_result {
257                 Poll::Pending => Poll::Pending,
258                 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
259                 Poll::Ready(Ok(ev)) => {
260                     // Clearing readiness makes the future non-idempotent; the
261                     // caller can't poll it repeatedly after it has already
262                     // returned Ready.  But that's ok; most futures behave this
263                     // way.  Clearing readiness is especially useful for
264                     // lio_listio, because sometimes some operations will be
265                     // ready but not all.
266                     self.0.clear_ready(ev);
267                     let r = unsafe {
268                         let p1 = self.get_unchecked_mut();
269                         let p2: &mut [&mut libc::aiocb; 1] =
270                             p1.0.aiocb.as_mut().get_unchecked_mut();
271                         let p3: &mut libc::aiocb = p2[0];
272                         libc::aio_return(p3)
273                     };
274                     if r >= 0 {
275                         Poll::Ready(Ok(r as usize))
276                     } else {
277                         Poll::Ready(Err(io::Error::last_os_error()))
278                     }
279                 }
280             }
281         }
282     }
283 
284     /// An lio_listio operation with one fsync element
285     #[tokio::test]
onewrite()286     async fn onewrite() {
287         const WBUF: &[u8] = b"abcdef";
288         let f = tempfile().unwrap();
289 
290         let mut aiocb: libc::aiocb = unsafe { mem::zeroed() };
291         aiocb.aio_fildes = f.as_raw_fd();
292         aiocb.aio_lio_opcode = libc::LIO_WRITE;
293         aiocb.aio_nbytes = WBUF.len();
294         aiocb.aio_buf = WBUF.as_ptr() as *mut _;
295         let aiocb = pin!([&mut aiocb]);
296         let source = LioSource::new(aiocb);
297         let poll_aio = Aio::new_for_lio(source).unwrap();
298 
299         // Send the operation to the kernel
300         let mut fut = pin!(LioFut(poll_aio));
301         fut.as_mut().submit();
302         fut.await.unwrap();
303     }
304 
305     /// A suitably crafted future type can reuse an Aio object
306     #[tokio::test]
reuse()307     async fn reuse() {
308         const WBUF: &[u8] = b"abcdef";
309         let f = tempfile().unwrap();
310 
311         let mut aiocb: libc::aiocb = unsafe { mem::zeroed() };
312         aiocb.aio_fildes = f.as_raw_fd();
313         aiocb.aio_lio_opcode = libc::LIO_WRITE;
314         aiocb.aio_nbytes = WBUF.len();
315         aiocb.aio_buf = WBUF.as_ptr() as *mut _;
316         let aiocb = pin!([&mut aiocb]);
317         let source = LioSource::new(aiocb);
318         let poll_aio = Aio::new_for_lio(source).unwrap();
319 
320         // Send the operation to the kernel the first time
321         let mut fut = LioFut(poll_aio);
322         {
323             let mut pfut = Pin::new(&mut fut);
324             pfut.as_mut().submit();
325             pfut.as_mut().await.unwrap();
326         }
327 
328         // Check that readiness was cleared
329         let mut ctx = Context::from_waker(futures::task::noop_waker_ref());
330         assert_pending!(fut.0.poll_ready(&mut ctx));
331 
332         // and reuse the future and its Aio object
333         {
334             let mut pfut = Pin::new(&mut fut);
335             pfut.as_mut().submit();
336             pfut.as_mut().await.unwrap();
337         }
338     }
339 }
340