1 use crate::codec::decoder::Decoder; 2 use crate::codec::encoder::Encoder; 3 use crate::codec::framed_impl::{FramedImpl, RWFrames, ReadFrame, WriteFrame}; 4 5 use futures_core::Stream; 6 use tokio::io::{AsyncRead, AsyncWrite}; 7 8 use bytes::BytesMut; 9 use futures_sink::Sink; 10 use pin_project_lite::pin_project; 11 use std::fmt; 12 use std::io; 13 use std::pin::Pin; 14 use std::task::{Context, Poll}; 15 16 pin_project! { 17 /// A unified [`Stream`] and [`Sink`] interface to an underlying I/O object, using 18 /// the `Encoder` and `Decoder` traits to encode and decode frames. 19 /// 20 /// You can create a `Framed` instance by using the [`Decoder::framed`] adapter, or 21 /// by using the `new` function seen below. 22 /// 23 /// # Cancellation safety 24 /// 25 /// * [`futures_util::sink::SinkExt::send`]: if send is used as the event in a 26 /// `tokio::select!` statement and some other branch completes first, then it is 27 /// guaranteed that the message was not sent, but the message itself is lost. 28 /// * [`tokio_stream::StreamExt::next`]: This method is cancel safe. The returned 29 /// future only holds onto a reference to the underlying stream, so dropping it will 30 /// never lose a value. 31 /// 32 /// [`Stream`]: futures_core::Stream 33 /// [`Sink`]: futures_sink::Sink 34 /// [`AsyncRead`]: tokio::io::AsyncRead 35 /// [`Decoder::framed`]: crate::codec::Decoder::framed() 36 /// [`futures_util::sink::SinkExt::send`]: futures_util::sink::SinkExt::send 37 /// [`tokio_stream::StreamExt::next`]: https://docs.rs/tokio-stream/latest/tokio_stream/trait.StreamExt.html#method.next 38 pub struct Framed<T, U> { 39 #[pin] 40 inner: FramedImpl<T, U, RWFrames> 41 } 42 } 43 44 impl<T, U> Framed<T, U> 45 where 46 T: AsyncRead + AsyncWrite, 47 { 48 /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this 49 /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data. 50 /// 51 /// Raw I/O objects work with byte sequences, but higher-level code usually 52 /// wants to batch these into meaningful chunks, called "frames". This 53 /// method layers framing on top of an I/O object, by using the codec 54 /// traits to handle encoding and decoding of messages frames. Note that 55 /// the incoming and outgoing frame types may be distinct. 56 /// 57 /// This function returns a *single* object that is both [`Stream`] and 58 /// [`Sink`]; grouping this into a single object is often useful for layering 59 /// things like gzip or TLS, which require both read and write access to the 60 /// underlying object. 61 /// 62 /// If you want to work more directly with the streams and sink, consider 63 /// calling [`split`] on the `Framed` returned by this method, which will 64 /// break them into separate objects, allowing them to interact more easily. 65 /// 66 /// Note that, for some byte sources, the stream can be resumed after an EOF 67 /// by reading from it, even after it has returned `None`. Repeated attempts 68 /// to do so, without new data available, continue to return `None` without 69 /// creating more (closing) frames. 70 /// 71 /// [`Stream`]: futures_core::Stream 72 /// [`Sink`]: futures_sink::Sink 73 /// [`Decode`]: crate::codec::Decoder 74 /// [`Encoder`]: crate::codec::Encoder 75 /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split new(inner: T, codec: U) -> Framed<T, U>76 pub fn new(inner: T, codec: U) -> Framed<T, U> { 77 Framed { 78 inner: FramedImpl { 79 inner, 80 codec, 81 state: Default::default(), 82 }, 83 } 84 } 85 86 /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this 87 /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data, 88 /// with a specific read buffer initial capacity. 89 /// 90 /// Raw I/O objects work with byte sequences, but higher-level code usually 91 /// wants to batch these into meaningful chunks, called "frames". This 92 /// method layers framing on top of an I/O object, by using the codec 93 /// traits to handle encoding and decoding of messages frames. Note that 94 /// the incoming and outgoing frame types may be distinct. 95 /// 96 /// This function returns a *single* object that is both [`Stream`] and 97 /// [`Sink`]; grouping this into a single object is often useful for layering 98 /// things like gzip or TLS, which require both read and write access to the 99 /// underlying object. 100 /// 101 /// If you want to work more directly with the streams and sink, consider 102 /// calling [`split`] on the `Framed` returned by this method, which will 103 /// break them into separate objects, allowing them to interact more easily. 104 /// 105 /// [`Stream`]: futures_core::Stream 106 /// [`Sink`]: futures_sink::Sink 107 /// [`Decode`]: crate::codec::Decoder 108 /// [`Encoder`]: crate::codec::Encoder 109 /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split with_capacity(inner: T, codec: U, capacity: usize) -> Framed<T, U>110 pub fn with_capacity(inner: T, codec: U, capacity: usize) -> Framed<T, U> { 111 Framed { 112 inner: FramedImpl { 113 inner, 114 codec, 115 state: RWFrames { 116 read: ReadFrame { 117 eof: false, 118 is_readable: false, 119 buffer: BytesMut::with_capacity(capacity), 120 has_errored: false, 121 }, 122 write: WriteFrame::default(), 123 }, 124 }, 125 } 126 } 127 } 128 129 impl<T, U> Framed<T, U> { 130 /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this 131 /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data. 132 /// 133 /// Raw I/O objects work with byte sequences, but higher-level code usually 134 /// wants to batch these into meaningful chunks, called "frames". This 135 /// method layers framing on top of an I/O object, by using the `Codec` 136 /// traits to handle encoding and decoding of messages frames. Note that 137 /// the incoming and outgoing frame types may be distinct. 138 /// 139 /// This function returns a *single* object that is both [`Stream`] and 140 /// [`Sink`]; grouping this into a single object is often useful for layering 141 /// things like gzip or TLS, which require both read and write access to the 142 /// underlying object. 143 /// 144 /// This objects takes a stream and a `readbuffer` and a `writebuffer`. These field 145 /// can be obtained from an existing `Framed` with the [`into_parts`] method. 146 /// 147 /// If you want to work more directly with the streams and sink, consider 148 /// calling [`split`] on the `Framed` returned by this method, which will 149 /// break them into separate objects, allowing them to interact more easily. 150 /// 151 /// [`Stream`]: futures_core::Stream 152 /// [`Sink`]: futures_sink::Sink 153 /// [`Decoder`]: crate::codec::Decoder 154 /// [`Encoder`]: crate::codec::Encoder 155 /// [`into_parts`]: crate::codec::Framed::into_parts() 156 /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split from_parts(parts: FramedParts<T, U>) -> Framed<T, U>157 pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> { 158 Framed { 159 inner: FramedImpl { 160 inner: parts.io, 161 codec: parts.codec, 162 state: RWFrames { 163 read: parts.read_buf.into(), 164 write: parts.write_buf.into(), 165 }, 166 }, 167 } 168 } 169 170 /// Returns a reference to the underlying I/O stream wrapped by 171 /// `Framed`. 172 /// 173 /// Note that care should be taken to not tamper with the underlying stream 174 /// of data coming in as it may corrupt the stream of frames otherwise 175 /// being worked with. get_ref(&self) -> &T176 pub fn get_ref(&self) -> &T { 177 &self.inner.inner 178 } 179 180 /// Returns a mutable reference to the underlying I/O stream wrapped by 181 /// `Framed`. 182 /// 183 /// Note that care should be taken to not tamper with the underlying stream 184 /// of data coming in as it may corrupt the stream of frames otherwise 185 /// being worked with. get_mut(&mut self) -> &mut T186 pub fn get_mut(&mut self) -> &mut T { 187 &mut self.inner.inner 188 } 189 190 /// Returns a pinned mutable reference to the underlying I/O stream wrapped by 191 /// `Framed`. 192 /// 193 /// Note that care should be taken to not tamper with the underlying stream 194 /// of data coming in as it may corrupt the stream of frames otherwise 195 /// being worked with. get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T>196 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> { 197 self.project().inner.project().inner 198 } 199 200 /// Returns a reference to the underlying codec wrapped by 201 /// `Framed`. 202 /// 203 /// Note that care should be taken to not tamper with the underlying codec 204 /// as it may corrupt the stream of frames otherwise being worked with. codec(&self) -> &U205 pub fn codec(&self) -> &U { 206 &self.inner.codec 207 } 208 209 /// Returns a mutable reference to the underlying codec wrapped by 210 /// `Framed`. 211 /// 212 /// Note that care should be taken to not tamper with the underlying codec 213 /// as it may corrupt the stream of frames otherwise being worked with. codec_mut(&mut self) -> &mut U214 pub fn codec_mut(&mut self) -> &mut U { 215 &mut self.inner.codec 216 } 217 218 /// Maps the codec `U` to `C`, preserving the read and write buffers 219 /// wrapped by `Framed`. 220 /// 221 /// Note that care should be taken to not tamper with the underlying codec 222 /// as it may corrupt the stream of frames otherwise being worked with. map_codec<C, F>(self, map: F) -> Framed<T, C> where F: FnOnce(U) -> C,223 pub fn map_codec<C, F>(self, map: F) -> Framed<T, C> 224 where 225 F: FnOnce(U) -> C, 226 { 227 // This could be potentially simplified once rust-lang/rust#86555 hits stable 228 let parts = self.into_parts(); 229 Framed::from_parts(FramedParts { 230 io: parts.io, 231 codec: map(parts.codec), 232 read_buf: parts.read_buf, 233 write_buf: parts.write_buf, 234 _priv: (), 235 }) 236 } 237 238 /// Returns a mutable reference to the underlying codec wrapped by 239 /// `Framed`. 240 /// 241 /// Note that care should be taken to not tamper with the underlying codec 242 /// as it may corrupt the stream of frames otherwise being worked with. codec_pin_mut(self: Pin<&mut Self>) -> &mut U243 pub fn codec_pin_mut(self: Pin<&mut Self>) -> &mut U { 244 self.project().inner.project().codec 245 } 246 247 /// Returns a reference to the read buffer. read_buffer(&self) -> &BytesMut248 pub fn read_buffer(&self) -> &BytesMut { 249 &self.inner.state.read.buffer 250 } 251 252 /// Returns a mutable reference to the read buffer. read_buffer_mut(&mut self) -> &mut BytesMut253 pub fn read_buffer_mut(&mut self) -> &mut BytesMut { 254 &mut self.inner.state.read.buffer 255 } 256 257 /// Returns a reference to the write buffer. write_buffer(&self) -> &BytesMut258 pub fn write_buffer(&self) -> &BytesMut { 259 &self.inner.state.write.buffer 260 } 261 262 /// Returns a mutable reference to the write buffer. write_buffer_mut(&mut self) -> &mut BytesMut263 pub fn write_buffer_mut(&mut self) -> &mut BytesMut { 264 &mut self.inner.state.write.buffer 265 } 266 267 /// Returns backpressure boundary backpressure_boundary(&self) -> usize268 pub fn backpressure_boundary(&self) -> usize { 269 self.inner.state.write.backpressure_boundary 270 } 271 272 /// Updates backpressure boundary set_backpressure_boundary(&mut self, boundary: usize)273 pub fn set_backpressure_boundary(&mut self, boundary: usize) { 274 self.inner.state.write.backpressure_boundary = boundary; 275 } 276 277 /// Consumes the `Framed`, returning its underlying I/O stream. 278 /// 279 /// Note that care should be taken to not tamper with the underlying stream 280 /// of data coming in as it may corrupt the stream of frames otherwise 281 /// being worked with. into_inner(self) -> T282 pub fn into_inner(self) -> T { 283 self.inner.inner 284 } 285 286 /// Consumes the `Framed`, returning its underlying I/O stream, the buffer 287 /// with unprocessed data, and the codec. 288 /// 289 /// Note that care should be taken to not tamper with the underlying stream 290 /// of data coming in as it may corrupt the stream of frames otherwise 291 /// being worked with. into_parts(self) -> FramedParts<T, U>292 pub fn into_parts(self) -> FramedParts<T, U> { 293 FramedParts { 294 io: self.inner.inner, 295 codec: self.inner.codec, 296 read_buf: self.inner.state.read.buffer, 297 write_buf: self.inner.state.write.buffer, 298 _priv: (), 299 } 300 } 301 } 302 303 // This impl just defers to the underlying FramedImpl 304 impl<T, U> Stream for Framed<T, U> 305 where 306 T: AsyncRead, 307 U: Decoder, 308 { 309 type Item = Result<U::Item, U::Error>; 310 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>311 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 312 self.project().inner.poll_next(cx) 313 } 314 } 315 316 // This impl just defers to the underlying FramedImpl 317 impl<T, I, U> Sink<I> for Framed<T, U> 318 where 319 T: AsyncWrite, 320 U: Encoder<I>, 321 U::Error: From<io::Error>, 322 { 323 type Error = U::Error; 324 poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>325 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 326 self.project().inner.poll_ready(cx) 327 } 328 start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error>329 fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { 330 self.project().inner.start_send(item) 331 } 332 poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>333 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 334 self.project().inner.poll_flush(cx) 335 } 336 poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>337 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 338 self.project().inner.poll_close(cx) 339 } 340 } 341 342 impl<T, U> fmt::Debug for Framed<T, U> 343 where 344 T: fmt::Debug, 345 U: fmt::Debug, 346 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result347 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 348 f.debug_struct("Framed") 349 .field("io", self.get_ref()) 350 .field("codec", self.codec()) 351 .finish() 352 } 353 } 354 355 /// `FramedParts` contains an export of the data of a Framed transport. 356 /// It can be used to construct a new [`Framed`] with a different codec. 357 /// It contains all current buffers and the inner transport. 358 /// 359 /// [`Framed`]: crate::codec::Framed 360 #[derive(Debug)] 361 #[allow(clippy::manual_non_exhaustive)] 362 pub struct FramedParts<T, U> { 363 /// The inner transport used to read bytes to and write bytes to 364 pub io: T, 365 366 /// The codec 367 pub codec: U, 368 369 /// The buffer with read but unprocessed data. 370 pub read_buf: BytesMut, 371 372 /// A buffer with unprocessed data which are not written yet. 373 pub write_buf: BytesMut, 374 375 /// This private field allows us to add additional fields in the future in a 376 /// backwards compatible way. 377 _priv: (), 378 } 379 380 impl<T, U> FramedParts<T, U> { 381 /// Create a new, default, `FramedParts` new<I>(io: T, codec: U) -> FramedParts<T, U> where U: Encoder<I>,382 pub fn new<I>(io: T, codec: U) -> FramedParts<T, U> 383 where 384 U: Encoder<I>, 385 { 386 FramedParts { 387 io, 388 codec, 389 read_buf: BytesMut::new(), 390 write_buf: BytesMut::new(), 391 _priv: (), 392 } 393 } 394 } 395