1 use crate::codec::framed_impl::{FramedImpl, ReadFrame}; 2 use crate::codec::Decoder; 3 4 use futures_core::Stream; 5 use tokio::io::AsyncRead; 6 7 use bytes::BytesMut; 8 use futures_sink::Sink; 9 use pin_project_lite::pin_project; 10 use std::fmt; 11 use std::pin::Pin; 12 use std::task::{Context, Poll}; 13 14 pin_project! { 15 /// A [`Stream`] of messages decoded from an [`AsyncRead`]. 16 /// 17 /// For examples of how to use `FramedRead` with a codec, see the 18 /// examples on the [`codec`] module. 19 /// 20 /// # Cancellation safety 21 /// * [`tokio_stream::StreamExt::next`]: This method is cancel safe. The returned 22 /// future only holds onto a reference to the underlying stream, so dropping it will 23 /// never lose a value. 24 /// 25 /// [`Stream`]: futures_core::Stream 26 /// [`AsyncRead`]: tokio::io::AsyncRead 27 /// [`codec`]: crate::codec 28 /// [`tokio_stream::StreamExt::next`]: https://docs.rs/tokio-stream/latest/tokio_stream/trait.StreamExt.html#method.next 29 pub struct FramedRead<T, D> { 30 #[pin] 31 inner: FramedImpl<T, D, ReadFrame>, 32 } 33 } 34 35 // ===== impl FramedRead ===== 36 37 impl<T, D> FramedRead<T, D> 38 where 39 T: AsyncRead, 40 D: Decoder, 41 { 42 /// Creates a new `FramedRead` with the given `decoder`. new(inner: T, decoder: D) -> FramedRead<T, D>43 pub fn new(inner: T, decoder: D) -> FramedRead<T, D> { 44 FramedRead { 45 inner: FramedImpl { 46 inner, 47 codec: decoder, 48 state: Default::default(), 49 }, 50 } 51 } 52 53 /// Creates a new `FramedRead` with the given `decoder` and a buffer of `capacity` 54 /// initial size. with_capacity(inner: T, decoder: D, capacity: usize) -> FramedRead<T, D>55 pub fn with_capacity(inner: T, decoder: D, capacity: usize) -> FramedRead<T, D> { 56 FramedRead { 57 inner: FramedImpl { 58 inner, 59 codec: decoder, 60 state: ReadFrame { 61 eof: false, 62 is_readable: false, 63 buffer: BytesMut::with_capacity(capacity), 64 has_errored: false, 65 }, 66 }, 67 } 68 } 69 } 70 71 impl<T, D> FramedRead<T, D> { 72 /// Returns a reference to the underlying I/O stream wrapped by 73 /// `FramedRead`. 74 /// 75 /// Note that care should be taken to not tamper with the underlying stream 76 /// of data coming in as it may corrupt the stream of frames otherwise 77 /// being worked with. get_ref(&self) -> &T78 pub fn get_ref(&self) -> &T { 79 &self.inner.inner 80 } 81 82 /// Returns a mutable reference to the underlying I/O stream wrapped by 83 /// `FramedRead`. 84 /// 85 /// Note that care should be taken to not tamper with the underlying stream 86 /// of data coming in as it may corrupt the stream of frames otherwise 87 /// being worked with. get_mut(&mut self) -> &mut T88 pub fn get_mut(&mut self) -> &mut T { 89 &mut self.inner.inner 90 } 91 92 /// Returns a pinned mutable reference to the underlying I/O stream wrapped by 93 /// `FramedRead`. 94 /// 95 /// Note that care should be taken to not tamper with the underlying stream 96 /// of data coming in as it may corrupt the stream of frames otherwise 97 /// being worked with. get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T>98 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> { 99 self.project().inner.project().inner 100 } 101 102 /// Consumes the `FramedRead`, returning its underlying I/O stream. 103 /// 104 /// Note that care should be taken to not tamper with the underlying stream 105 /// of data coming in as it may corrupt the stream of frames otherwise 106 /// being worked with. into_inner(self) -> T107 pub fn into_inner(self) -> T { 108 self.inner.inner 109 } 110 111 /// Returns a reference to the underlying decoder. decoder(&self) -> &D112 pub fn decoder(&self) -> &D { 113 &self.inner.codec 114 } 115 116 /// Returns a mutable reference to the underlying decoder. decoder_mut(&mut self) -> &mut D117 pub fn decoder_mut(&mut self) -> &mut D { 118 &mut self.inner.codec 119 } 120 121 /// Maps the decoder `D` to `C`, preserving the read buffer 122 /// wrapped by `Framed`. map_decoder<C, F>(self, map: F) -> FramedRead<T, C> where F: FnOnce(D) -> C,123 pub fn map_decoder<C, F>(self, map: F) -> FramedRead<T, C> 124 where 125 F: FnOnce(D) -> C, 126 { 127 // This could be potentially simplified once rust-lang/rust#86555 hits stable 128 let FramedImpl { 129 inner, 130 state, 131 codec, 132 } = self.inner; 133 FramedRead { 134 inner: FramedImpl { 135 inner, 136 state, 137 codec: map(codec), 138 }, 139 } 140 } 141 142 /// Returns a mutable reference to the underlying decoder. decoder_pin_mut(self: Pin<&mut Self>) -> &mut D143 pub fn decoder_pin_mut(self: Pin<&mut Self>) -> &mut D { 144 self.project().inner.project().codec 145 } 146 147 /// Returns a reference to the read buffer. read_buffer(&self) -> &BytesMut148 pub fn read_buffer(&self) -> &BytesMut { 149 &self.inner.state.buffer 150 } 151 152 /// Returns a mutable reference to the read buffer. read_buffer_mut(&mut self) -> &mut BytesMut153 pub fn read_buffer_mut(&mut self) -> &mut BytesMut { 154 &mut self.inner.state.buffer 155 } 156 } 157 158 // This impl just defers to the underlying FramedImpl 159 impl<T, D> Stream for FramedRead<T, D> 160 where 161 T: AsyncRead, 162 D: Decoder, 163 { 164 type Item = Result<D::Item, D::Error>; 165 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>166 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 167 self.project().inner.poll_next(cx) 168 } 169 } 170 171 // This impl just defers to the underlying T: Sink 172 impl<T, I, D> Sink<I> for FramedRead<T, D> 173 where 174 T: Sink<I>, 175 { 176 type Error = T::Error; 177 poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>178 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 179 self.project().inner.project().inner.poll_ready(cx) 180 } 181 start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error>182 fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { 183 self.project().inner.project().inner.start_send(item) 184 } 185 poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>186 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 187 self.project().inner.project().inner.poll_flush(cx) 188 } 189 poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>190 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 191 self.project().inner.project().inner.poll_close(cx) 192 } 193 } 194 195 impl<T, D> fmt::Debug for FramedRead<T, D> 196 where 197 T: fmt::Debug, 198 D: fmt::Debug, 199 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result200 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 201 f.debug_struct("FramedRead") 202 .field("inner", &self.get_ref()) 203 .field("decoder", &self.decoder()) 204 .field("eof", &self.inner.state.eof) 205 .field("is_readable", &self.inner.state.is_readable) 206 .field("buffer", &self.read_buffer()) 207 .finish() 208 } 209 } 210