1 //! Combinators and utilities for working with `Future`s, `Stream`s, `Sink`s, 2 //! and the `AsyncRead` and `AsyncWrite` traits. 3 4 #![no_std] 5 #![doc(test( 6 no_crate_inject, 7 attr( 8 deny(warnings, rust_2018_idioms, single_use_lifetimes), 9 allow(dead_code, unused_assignments, unused_variables) 10 ) 11 ))] 12 #![warn(missing_docs, unsafe_op_in_unsafe_fn)] 13 #![cfg_attr(docsrs, feature(doc_cfg))] 14 15 #[cfg(all(feature = "bilock", not(feature = "unstable")))] 16 compile_error!("The `bilock` feature requires the `unstable` feature as an explicit opt-in to unstable features"); 17 18 #[cfg(feature = "alloc")] 19 extern crate alloc; 20 #[cfg(feature = "std")] 21 extern crate std; 22 23 // Macro re-exports 24 pub use futures_core::ready; 25 pub use pin_utils::pin_mut; 26 27 #[cfg(feature = "async-await")] 28 #[macro_use] 29 mod async_await; 30 #[cfg(feature = "async-await")] 31 #[doc(hidden)] 32 pub use self::async_await::*; 33 34 // Not public API. 35 #[cfg(feature = "async-await")] 36 #[doc(hidden)] 37 pub mod __private { 38 pub use crate::*; 39 pub use core::{ 40 option::Option::{self, None, Some}, 41 pin::Pin, 42 result::Result::{Err, Ok}, 43 }; 44 45 pub mod async_await { 46 pub use crate::async_await::*; 47 } 48 } 49 50 #[cfg(feature = "sink")] 51 macro_rules! delegate_sink { 52 ($field:ident, $item:ty) => { 53 fn poll_ready( 54 self: core::pin::Pin<&mut Self>, 55 cx: &mut core::task::Context<'_>, 56 ) -> core::task::Poll<Result<(), Self::Error>> { 57 self.project().$field.poll_ready(cx) 58 } 59 60 fn start_send(self: core::pin::Pin<&mut Self>, item: $item) -> Result<(), Self::Error> { 61 self.project().$field.start_send(item) 62 } 63 64 fn poll_flush( 65 self: core::pin::Pin<&mut Self>, 66 cx: &mut core::task::Context<'_>, 67 ) -> core::task::Poll<Result<(), Self::Error>> { 68 self.project().$field.poll_flush(cx) 69 } 70 71 fn poll_close( 72 self: core::pin::Pin<&mut Self>, 73 cx: &mut core::task::Context<'_>, 74 ) -> core::task::Poll<Result<(), Self::Error>> { 75 self.project().$field.poll_close(cx) 76 } 77 }; 78 } 79 80 macro_rules! delegate_future { 81 ($field:ident) => { 82 fn poll( 83 self: core::pin::Pin<&mut Self>, 84 cx: &mut core::task::Context<'_>, 85 ) -> core::task::Poll<Self::Output> { 86 self.project().$field.poll(cx) 87 } 88 }; 89 } 90 91 macro_rules! delegate_stream { 92 ($field:ident) => { 93 fn poll_next( 94 self: core::pin::Pin<&mut Self>, 95 cx: &mut core::task::Context<'_>, 96 ) -> core::task::Poll<Option<Self::Item>> { 97 self.project().$field.poll_next(cx) 98 } 99 fn size_hint(&self) -> (usize, Option<usize>) { 100 self.$field.size_hint() 101 } 102 }; 103 } 104 105 #[cfg(feature = "io")] 106 #[cfg(feature = "std")] 107 macro_rules! delegate_async_write { 108 ($field:ident) => { 109 fn poll_write( 110 self: core::pin::Pin<&mut Self>, 111 cx: &mut core::task::Context<'_>, 112 buf: &[u8], 113 ) -> core::task::Poll<std::io::Result<usize>> { 114 self.project().$field.poll_write(cx, buf) 115 } 116 fn poll_write_vectored( 117 self: core::pin::Pin<&mut Self>, 118 cx: &mut core::task::Context<'_>, 119 bufs: &[std::io::IoSlice<'_>], 120 ) -> core::task::Poll<std::io::Result<usize>> { 121 self.project().$field.poll_write_vectored(cx, bufs) 122 } 123 fn poll_flush( 124 self: core::pin::Pin<&mut Self>, 125 cx: &mut core::task::Context<'_>, 126 ) -> core::task::Poll<std::io::Result<()>> { 127 self.project().$field.poll_flush(cx) 128 } 129 fn poll_close( 130 self: core::pin::Pin<&mut Self>, 131 cx: &mut core::task::Context<'_>, 132 ) -> core::task::Poll<std::io::Result<()>> { 133 self.project().$field.poll_close(cx) 134 } 135 }; 136 } 137 138 #[cfg(feature = "io")] 139 #[cfg(feature = "std")] 140 macro_rules! delegate_async_read { 141 ($field:ident) => { 142 fn poll_read( 143 self: core::pin::Pin<&mut Self>, 144 cx: &mut core::task::Context<'_>, 145 buf: &mut [u8], 146 ) -> core::task::Poll<std::io::Result<usize>> { 147 self.project().$field.poll_read(cx, buf) 148 } 149 150 fn poll_read_vectored( 151 self: core::pin::Pin<&mut Self>, 152 cx: &mut core::task::Context<'_>, 153 bufs: &mut [std::io::IoSliceMut<'_>], 154 ) -> core::task::Poll<std::io::Result<usize>> { 155 self.project().$field.poll_read_vectored(cx, bufs) 156 } 157 }; 158 } 159 160 #[cfg(feature = "io")] 161 #[cfg(feature = "std")] 162 macro_rules! delegate_async_buf_read { 163 ($field:ident) => { 164 fn poll_fill_buf( 165 self: core::pin::Pin<&mut Self>, 166 cx: &mut core::task::Context<'_>, 167 ) -> core::task::Poll<std::io::Result<&[u8]>> { 168 self.project().$field.poll_fill_buf(cx) 169 } 170 171 fn consume(self: core::pin::Pin<&mut Self>, amt: usize) { 172 self.project().$field.consume(amt) 173 } 174 }; 175 } 176 177 macro_rules! delegate_access_inner { 178 ($field:ident, $inner:ty, ($($ind:tt)*)) => { 179 /// Acquires a reference to the underlying sink or stream that this combinator is 180 /// pulling from. 181 pub fn get_ref(&self) -> &$inner { 182 (&self.$field) $($ind get_ref())* 183 } 184 185 /// Acquires a mutable reference to the underlying sink or stream that this 186 /// combinator is pulling from. 187 /// 188 /// Note that care must be taken to avoid tampering with the state of the 189 /// sink or stream which may otherwise confuse this combinator. 190 pub fn get_mut(&mut self) -> &mut $inner { 191 (&mut self.$field) $($ind get_mut())* 192 } 193 194 /// Acquires a pinned mutable reference to the underlying sink or stream that this 195 /// combinator is pulling from. 196 /// 197 /// Note that care must be taken to avoid tampering with the state of the 198 /// sink or stream which may otherwise confuse this combinator. 199 pub fn get_pin_mut(self: core::pin::Pin<&mut Self>) -> core::pin::Pin<&mut $inner> { 200 self.project().$field $($ind get_pin_mut())* 201 } 202 203 /// Consumes this combinator, returning the underlying sink or stream. 204 /// 205 /// Note that this may discard intermediate state of this combinator, so 206 /// care should be taken to avoid losing resources when this is called. 207 pub fn into_inner(self) -> $inner { 208 self.$field $($ind into_inner())* 209 } 210 } 211 } 212 213 macro_rules! delegate_all { 214 (@trait Future $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 215 impl<$($arg),*> futures_core::future::Future for $name<$($arg),*> where $t: futures_core::future::Future $(, $($bound)*)* { 216 type Output = <$t as futures_core::future::Future>::Output; 217 218 delegate_future!(inner); 219 } 220 }; 221 (@trait FusedFuture $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 222 impl<$($arg),*> futures_core::future::FusedFuture for $name<$($arg),*> where $t: futures_core::future::FusedFuture $(, $($bound)*)* { 223 fn is_terminated(&self) -> bool { 224 self.inner.is_terminated() 225 } 226 } 227 }; 228 (@trait Stream $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 229 impl<$($arg),*> futures_core::stream::Stream for $name<$($arg),*> where $t: futures_core::stream::Stream $(, $($bound)*)* { 230 type Item = <$t as futures_core::stream::Stream>::Item; 231 232 delegate_stream!(inner); 233 } 234 }; 235 (@trait FusedStream $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 236 impl<$($arg),*> futures_core::stream::FusedStream for $name<$($arg),*> where $t: futures_core::stream::FusedStream $(, $($bound)*)* { 237 fn is_terminated(&self) -> bool { 238 self.inner.is_terminated() 239 } 240 } 241 }; 242 (@trait Sink $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 243 #[cfg(feature = "sink")] 244 impl<_Item, $($arg),*> futures_sink::Sink<_Item> for $name<$($arg),*> where $t: futures_sink::Sink<_Item> $(, $($bound)*)* { 245 type Error = <$t as futures_sink::Sink<_Item>>::Error; 246 247 delegate_sink!(inner, _Item); 248 } 249 }; 250 (@trait Debug $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 251 impl<$($arg),*> core::fmt::Debug for $name<$($arg),*> where $t: core::fmt::Debug $(, $($bound)*)* { 252 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { 253 core::fmt::Debug::fmt(&self.inner, f) 254 } 255 } 256 }; 257 (@trait AccessInner[$inner:ty, ($($ind:tt)*)] $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 258 impl<$($arg),*> $name<$($arg),*> $(where $($bound)*)* { 259 delegate_access_inner!(inner, $inner, ($($ind)*)); 260 } 261 }; 262 (@trait New[|$($param:ident: $paramt:ty),*| $cons:expr] $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 263 impl<$($arg),*> $name<$($arg),*> $(where $($bound)*)* { 264 pub(crate) fn new($($param: $paramt),*) -> Self { 265 Self { inner: $cons } 266 } 267 } 268 }; 269 ($(#[$attr:meta])* $name:ident<$($arg:ident),*>($t:ty) : $ftrait:ident $([$($targs:tt)*])* $({$($item:tt)*})* $(where $($bound:tt)*)*) => { 270 pin_project_lite::pin_project! { 271 #[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"] 272 $(#[$attr])* 273 pub struct $name< $($arg),* > $(where $($bound)*)* { #[pin] inner: $t } 274 } 275 276 impl<$($arg),*> $name< $($arg),* > $(where $($bound)*)* { 277 $($($item)*)* 278 } 279 280 delegate_all!(@trait $ftrait $([$($targs)*])* $name<$($arg),*>($t) $(where $($bound)*)*); 281 }; 282 ($(#[$attr:meta])* $name:ident<$($arg:ident),*>($t:ty) : $ftrait:ident $([$($ftargs:tt)*])* + $strait:ident $([$($stargs:tt)*])* $(+ $trait:ident $([$($targs:tt)*])*)* $({$($item:tt)*})* $(where $($bound:tt)*)*) => { 283 delegate_all!($(#[$attr])* $name<$($arg),*>($t) : $strait $([$($stargs)*])* $(+ $trait $([$($targs)*])*)* $({$($item)*})* $(where $($bound)*)*); 284 285 delegate_all!(@trait $ftrait $([$($ftargs)*])* $name<$($arg),*>($t) $(where $($bound)*)*); 286 }; 287 } 288 289 pub mod future; 290 #[doc(no_inline)] 291 pub use crate::future::{Future, FutureExt, TryFuture, TryFutureExt}; 292 293 pub mod stream; 294 #[doc(no_inline)] 295 pub use crate::stream::{Stream, StreamExt, TryStream, TryStreamExt}; 296 297 #[cfg(feature = "sink")] 298 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] 299 pub mod sink; 300 #[cfg(feature = "sink")] 301 #[doc(no_inline)] 302 pub use crate::sink::{Sink, SinkExt}; 303 304 pub mod task; 305 306 pub mod never; 307 308 #[cfg(feature = "compat")] 309 #[cfg_attr(docsrs, doc(cfg(feature = "compat")))] 310 pub mod compat; 311 312 #[cfg(feature = "io")] 313 #[cfg_attr(docsrs, doc(cfg(feature = "io")))] 314 #[cfg(feature = "std")] 315 pub mod io; 316 #[cfg(feature = "io")] 317 #[cfg(feature = "std")] 318 #[doc(no_inline)] 319 pub use crate::io::{ 320 AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite, 321 AsyncWriteExt, 322 }; 323 324 #[cfg(feature = "alloc")] 325 pub mod lock; 326 327 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] 328 #[cfg(feature = "alloc")] 329 mod abortable; 330 331 mod fns; 332 mod unfold_state; 333