1 #![warn(rust_2018_idioms)] 2 #![cfg(all(target_os = "freebsd", feature = "net"))] 3 4 use mio_aio::{AioFsyncMode, SourceApi}; 5 use std::{ 6 future::Future, 7 io, mem, 8 os::fd::AsFd, 9 os::unix::io::{AsRawFd, RawFd}, 10 pin::{pin, Pin}, 11 task::{Context, Poll}, 12 }; 13 use tempfile::tempfile; 14 use tokio::io::bsd::{Aio, AioSource}; 15 use tokio_test::assert_pending; 16 17 mod aio { 18 use super::*; 19 20 #[derive(Debug)] 21 struct TokioSource<'fd>(mio_aio::Source<nix::sys::aio::AioFsync<'fd>>); 22 23 impl<'fd> AioSource for TokioSource<'fd> { register(&mut self, kq: RawFd, token: usize)24 fn register(&mut self, kq: RawFd, token: usize) { 25 self.0.register_raw(kq, token) 26 } deregister(&mut self)27 fn deregister(&mut self) { 28 self.0.deregister_raw() 29 } 30 } 31 32 /// A very crude implementation of an AIO-based future 33 struct FsyncFut<'fd>(Aio<TokioSource<'fd>>); 34 35 impl<'fd> FsyncFut<'fd> { submit(self: Pin<&mut Self>) -> io::Result<()>36 pub fn submit(self: Pin<&mut Self>) -> io::Result<()> { 37 let p = unsafe { self.map_unchecked_mut(|s| &mut s.0 .0) }; 38 match p.submit() { 39 Ok(()) => Ok(()), 40 Err(e) => Err(io::Error::from_raw_os_error(e as i32)), 41 } 42 } 43 } 44 45 impl<'fd> Future for FsyncFut<'fd> { 46 type Output = io::Result<()>; 47 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>48 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 49 let poll_result = self.0.poll_ready(cx); 50 match poll_result { 51 Poll::Pending => Poll::Pending, 52 Poll::Ready(Err(e)) => Poll::Ready(Err(e)), 53 Poll::Ready(Ok(_ev)) => { 54 // At this point, we could clear readiness. But there's no 55 // point, since we're about to drop the Aio. 56 let p = unsafe { self.map_unchecked_mut(|s| &mut s.0 .0) }; 57 let result = p.aio_return(); 58 match result { 59 Ok(r) => Poll::Ready(Ok(r)), 60 Err(e) => Poll::Ready(Err(io::Error::from_raw_os_error(e as i32))), 61 } 62 } 63 } 64 } 65 } 66 67 /// Low-level AIO Source 68 /// 69 /// An example bypassing mio_aio and Nix to demonstrate how the kevent 70 /// registration actually works, under the hood. 71 struct LlSource(Pin<Box<libc::aiocb>>); 72 73 impl LlSource { fsync(mut self: Pin<&mut Self>)74 fn fsync(mut self: Pin<&mut Self>) { 75 let r = unsafe { 76 let p = self.0.as_mut().get_unchecked_mut(); 77 libc::aio_fsync(libc::O_SYNC, p) 78 }; 79 assert_eq!(0, r); 80 } 81 } 82 83 impl AioSource for LlSource { register(&mut self, kq: RawFd, token: usize)84 fn register(&mut self, kq: RawFd, token: usize) { 85 let mut sev: libc::sigevent = unsafe { mem::MaybeUninit::zeroed().assume_init() }; 86 sev.sigev_notify = libc::SIGEV_KEVENT; 87 sev.sigev_signo = kq; 88 sev.sigev_value = libc::sigval { 89 sival_ptr: token as *mut libc::c_void, 90 }; 91 self.0.aio_sigevent = sev; 92 } 93 deregister(&mut self)94 fn deregister(&mut self) { 95 unsafe { 96 self.0.aio_sigevent = mem::zeroed(); 97 } 98 } 99 } 100 101 struct LlFut(Aio<LlSource>); 102 103 impl LlFut { fsync(self: Pin<&mut Self>)104 pub fn fsync(self: Pin<&mut Self>) { 105 let p = unsafe { self.map_unchecked_mut(|s| &mut *(s.0)) }; 106 p.fsync(); 107 } 108 } 109 110 impl Future for LlFut { 111 type Output = std::io::Result<usize>; 112 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>113 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 114 let poll_result = self.0.poll_ready(cx); 115 match poll_result { 116 Poll::Pending => Poll::Pending, 117 Poll::Ready(Err(e)) => Poll::Ready(Err(e)), 118 Poll::Ready(Ok(ev)) => { 119 // Clearing readiness makes the future non-idempotent; the 120 // caller can't poll it repeatedly after it has already 121 // returned Ready. But that's ok; most futures behave this 122 // way. 123 self.0.clear_ready(ev); 124 let r = unsafe { libc::aio_return(self.0 .0.as_mut().get_unchecked_mut()) }; 125 if r >= 0 { 126 Poll::Ready(Ok(r as usize)) 127 } else { 128 Poll::Ready(Err(io::Error::last_os_error())) 129 } 130 } 131 } 132 } 133 } 134 135 #[tokio::test] fsync()136 async fn fsync() { 137 let f = tempfile().unwrap(); 138 let fd = f.as_fd(); 139 let mode = AioFsyncMode::O_SYNC; 140 let source = TokioSource(mio_aio::Fsync::fsync(fd, mode, 0)); 141 let poll_aio = Aio::new_for_aio(source).unwrap(); 142 let mut fut = pin!(FsyncFut(poll_aio)); 143 fut.as_mut().submit().unwrap(); 144 fut.await.unwrap(); 145 } 146 147 #[tokio::test] ll_fsync()148 async fn ll_fsync() { 149 let f = tempfile().unwrap(); 150 let fd = f.as_raw_fd(); 151 let mut aiocb: libc::aiocb = unsafe { mem::MaybeUninit::zeroed().assume_init() }; 152 aiocb.aio_fildes = fd; 153 let source = LlSource(Box::pin(aiocb)); 154 let mut poll_aio = Aio::new_for_aio(source).unwrap(); 155 let r = unsafe { 156 let p = poll_aio.0.as_mut().get_unchecked_mut(); 157 libc::aio_fsync(libc::O_SYNC, p) 158 }; 159 assert_eq!(0, r); 160 let fut = LlFut(poll_aio); 161 fut.await.unwrap(); 162 } 163 164 /// A suitably crafted future type can reuse an Aio object 165 #[tokio::test] reuse()166 async fn reuse() { 167 let f = tempfile().unwrap(); 168 let fd = f.as_raw_fd(); 169 let mut aiocb: libc::aiocb = unsafe { mem::MaybeUninit::zeroed().assume_init() }; 170 aiocb.aio_fildes = fd; 171 let source = LlSource(Box::pin(aiocb)); 172 let poll_aio = Aio::new_for_aio(source).unwrap(); 173 174 // Send the operation to the kernel the first time 175 let mut fut = LlFut(poll_aio); 176 { 177 let mut pfut = Pin::new(&mut fut); 178 pfut.as_mut().fsync(); 179 pfut.as_mut().await.unwrap(); 180 } 181 182 // Check that readiness was cleared 183 let mut ctx = Context::from_waker(futures::task::noop_waker_ref()); 184 assert_pending!(fut.0.poll_ready(&mut ctx)); 185 186 // and reuse the future and its Aio object 187 { 188 let mut pfut = Pin::new(&mut fut); 189 pfut.as_mut().fsync(); 190 pfut.as_mut().await.unwrap(); 191 } 192 } 193 } 194 195 mod lio { 196 use super::*; 197 198 /// Low-level source based on lio_listio 199 /// 200 /// An example demonstrating using AIO with `Interest::Lio`. mio_aio 0.8 201 /// doesn't include any bindings for lio_listio, so we've got to go 202 /// low-level. 203 struct LioSource<'a> { 204 aiocb: Pin<&'a mut [&'a mut libc::aiocb; 1]>, 205 sev: libc::sigevent, 206 } 207 208 impl<'a> LioSource<'a> { new(aiocb: Pin<&'a mut [&'a mut libc::aiocb; 1]>) -> Self209 fn new(aiocb: Pin<&'a mut [&'a mut libc::aiocb; 1]>) -> Self { 210 LioSource { 211 aiocb, 212 sev: unsafe { mem::zeroed() }, 213 } 214 } 215 submit(mut self: Pin<&mut Self>)216 fn submit(mut self: Pin<&mut Self>) { 217 let p: *const *mut libc::aiocb = 218 unsafe { self.aiocb.as_mut().get_unchecked_mut() } as *const _ as *const *mut _; 219 let r = unsafe { libc::lio_listio(libc::LIO_NOWAIT, p, 1, &mut self.sev) }; 220 assert_eq!(r, 0); 221 } 222 } 223 224 impl<'a> AioSource for LioSource<'a> { register(&mut self, kq: RawFd, token: usize)225 fn register(&mut self, kq: RawFd, token: usize) { 226 let mut sev: libc::sigevent = unsafe { mem::MaybeUninit::zeroed().assume_init() }; 227 sev.sigev_notify = libc::SIGEV_KEVENT; 228 sev.sigev_signo = kq; 229 sev.sigev_value = libc::sigval { 230 sival_ptr: token as *mut libc::c_void, 231 }; 232 self.sev = sev; 233 } 234 deregister(&mut self)235 fn deregister(&mut self) { 236 unsafe { 237 self.sev = mem::zeroed(); 238 } 239 } 240 } 241 242 struct LioFut<'a>(Aio<LioSource<'a>>); 243 244 impl<'a> LioFut<'a> { submit(self: Pin<&mut Self>)245 pub fn submit(self: Pin<&mut Self>) { 246 let p = unsafe { self.map_unchecked_mut(|s| &mut *(s.0)) }; 247 p.submit(); 248 } 249 } 250 251 impl<'a> Future for LioFut<'a> { 252 type Output = std::io::Result<usize>; 253 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>254 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 255 let poll_result = self.0.poll_ready(cx); 256 match poll_result { 257 Poll::Pending => Poll::Pending, 258 Poll::Ready(Err(e)) => Poll::Ready(Err(e)), 259 Poll::Ready(Ok(ev)) => { 260 // Clearing readiness makes the future non-idempotent; the 261 // caller can't poll it repeatedly after it has already 262 // returned Ready. But that's ok; most futures behave this 263 // way. Clearing readiness is especially useful for 264 // lio_listio, because sometimes some operations will be 265 // ready but not all. 266 self.0.clear_ready(ev); 267 let r = unsafe { 268 let p1 = self.get_unchecked_mut(); 269 let p2: &mut [&mut libc::aiocb; 1] = 270 p1.0.aiocb.as_mut().get_unchecked_mut(); 271 let p3: &mut libc::aiocb = p2[0]; 272 libc::aio_return(p3) 273 }; 274 if r >= 0 { 275 Poll::Ready(Ok(r as usize)) 276 } else { 277 Poll::Ready(Err(io::Error::last_os_error())) 278 } 279 } 280 } 281 } 282 } 283 284 /// An lio_listio operation with one fsync element 285 #[tokio::test] onewrite()286 async fn onewrite() { 287 const WBUF: &[u8] = b"abcdef"; 288 let f = tempfile().unwrap(); 289 290 let mut aiocb: libc::aiocb = unsafe { mem::zeroed() }; 291 aiocb.aio_fildes = f.as_raw_fd(); 292 aiocb.aio_lio_opcode = libc::LIO_WRITE; 293 aiocb.aio_nbytes = WBUF.len(); 294 aiocb.aio_buf = WBUF.as_ptr() as *mut _; 295 let aiocb = pin!([&mut aiocb]); 296 let source = LioSource::new(aiocb); 297 let poll_aio = Aio::new_for_lio(source).unwrap(); 298 299 // Send the operation to the kernel 300 let mut fut = pin!(LioFut(poll_aio)); 301 fut.as_mut().submit(); 302 fut.await.unwrap(); 303 } 304 305 /// A suitably crafted future type can reuse an Aio object 306 #[tokio::test] reuse()307 async fn reuse() { 308 const WBUF: &[u8] = b"abcdef"; 309 let f = tempfile().unwrap(); 310 311 let mut aiocb: libc::aiocb = unsafe { mem::zeroed() }; 312 aiocb.aio_fildes = f.as_raw_fd(); 313 aiocb.aio_lio_opcode = libc::LIO_WRITE; 314 aiocb.aio_nbytes = WBUF.len(); 315 aiocb.aio_buf = WBUF.as_ptr() as *mut _; 316 let aiocb = pin!([&mut aiocb]); 317 let source = LioSource::new(aiocb); 318 let poll_aio = Aio::new_for_lio(source).unwrap(); 319 320 // Send the operation to the kernel the first time 321 let mut fut = LioFut(poll_aio); 322 { 323 let mut pfut = Pin::new(&mut fut); 324 pfut.as_mut().submit(); 325 pfut.as_mut().await.unwrap(); 326 } 327 328 // Check that readiness was cleared 329 let mut ctx = Context::from_waker(futures::task::noop_waker_ref()); 330 assert_pending!(fut.0.poll_ready(&mut ctx)); 331 332 // and reuse the future and its Aio object 333 { 334 let mut pfut = Pin::new(&mut fut); 335 pfut.as_mut().submit(); 336 pfut.as_mut().await.unwrap(); 337 } 338 } 339 } 340