1 use crate::codec::framed_impl::{FramedImpl, ReadFrame};
2 use crate::codec::Decoder;
3 
4 use futures_core::Stream;
5 use tokio::io::AsyncRead;
6 
7 use bytes::BytesMut;
8 use futures_sink::Sink;
9 use pin_project_lite::pin_project;
10 use std::fmt;
11 use std::pin::Pin;
12 use std::task::{Context, Poll};
13 
14 pin_project! {
15     /// A [`Stream`] of messages decoded from an [`AsyncRead`].
16     ///
17     /// For examples of how to use `FramedRead` with a codec, see the
18     /// examples on the [`codec`] module.
19     ///
20     /// # Cancellation safety
21     /// * [`tokio_stream::StreamExt::next`]: This method is cancel safe. The returned
22     /// future only holds onto a reference to the underlying stream, so dropping it will
23     /// never lose a value.
24     ///
25     /// [`Stream`]: futures_core::Stream
26     /// [`AsyncRead`]: tokio::io::AsyncRead
27     /// [`codec`]: crate::codec
28     /// [`tokio_stream::StreamExt::next`]: https://docs.rs/tokio-stream/latest/tokio_stream/trait.StreamExt.html#method.next
29     pub struct FramedRead<T, D> {
30         #[pin]
31         inner: FramedImpl<T, D, ReadFrame>,
32     }
33 }
34 
35 // ===== impl FramedRead =====
36 
37 impl<T, D> FramedRead<T, D>
38 where
39     T: AsyncRead,
40     D: Decoder,
41 {
42     /// Creates a new `FramedRead` with the given `decoder`.
new(inner: T, decoder: D) -> FramedRead<T, D>43     pub fn new(inner: T, decoder: D) -> FramedRead<T, D> {
44         FramedRead {
45             inner: FramedImpl {
46                 inner,
47                 codec: decoder,
48                 state: Default::default(),
49             },
50         }
51     }
52 
53     /// Creates a new `FramedRead` with the given `decoder` and a buffer of `capacity`
54     /// initial size.
with_capacity(inner: T, decoder: D, capacity: usize) -> FramedRead<T, D>55     pub fn with_capacity(inner: T, decoder: D, capacity: usize) -> FramedRead<T, D> {
56         FramedRead {
57             inner: FramedImpl {
58                 inner,
59                 codec: decoder,
60                 state: ReadFrame {
61                     eof: false,
62                     is_readable: false,
63                     buffer: BytesMut::with_capacity(capacity),
64                     has_errored: false,
65                 },
66             },
67         }
68     }
69 }
70 
71 impl<T, D> FramedRead<T, D> {
72     /// Returns a reference to the underlying I/O stream wrapped by
73     /// `FramedRead`.
74     ///
75     /// Note that care should be taken to not tamper with the underlying stream
76     /// of data coming in as it may corrupt the stream of frames otherwise
77     /// being worked with.
get_ref(&self) -> &T78     pub fn get_ref(&self) -> &T {
79         &self.inner.inner
80     }
81 
82     /// Returns a mutable reference to the underlying I/O stream wrapped by
83     /// `FramedRead`.
84     ///
85     /// Note that care should be taken to not tamper with the underlying stream
86     /// of data coming in as it may corrupt the stream of frames otherwise
87     /// being worked with.
get_mut(&mut self) -> &mut T88     pub fn get_mut(&mut self) -> &mut T {
89         &mut self.inner.inner
90     }
91 
92     /// Returns a pinned mutable reference to the underlying I/O stream wrapped by
93     /// `FramedRead`.
94     ///
95     /// Note that care should be taken to not tamper with the underlying stream
96     /// of data coming in as it may corrupt the stream of frames otherwise
97     /// being worked with.
get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T>98     pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
99         self.project().inner.project().inner
100     }
101 
102     /// Consumes the `FramedRead`, returning its underlying I/O stream.
103     ///
104     /// Note that care should be taken to not tamper with the underlying stream
105     /// of data coming in as it may corrupt the stream of frames otherwise
106     /// being worked with.
into_inner(self) -> T107     pub fn into_inner(self) -> T {
108         self.inner.inner
109     }
110 
111     /// Returns a reference to the underlying decoder.
decoder(&self) -> &D112     pub fn decoder(&self) -> &D {
113         &self.inner.codec
114     }
115 
116     /// Returns a mutable reference to the underlying decoder.
decoder_mut(&mut self) -> &mut D117     pub fn decoder_mut(&mut self) -> &mut D {
118         &mut self.inner.codec
119     }
120 
121     /// Maps the decoder `D` to `C`, preserving the read buffer
122     /// wrapped by `Framed`.
map_decoder<C, F>(self, map: F) -> FramedRead<T, C> where F: FnOnce(D) -> C,123     pub fn map_decoder<C, F>(self, map: F) -> FramedRead<T, C>
124     where
125         F: FnOnce(D) -> C,
126     {
127         // This could be potentially simplified once rust-lang/rust#86555 hits stable
128         let FramedImpl {
129             inner,
130             state,
131             codec,
132         } = self.inner;
133         FramedRead {
134             inner: FramedImpl {
135                 inner,
136                 state,
137                 codec: map(codec),
138             },
139         }
140     }
141 
142     /// Returns a mutable reference to the underlying decoder.
decoder_pin_mut(self: Pin<&mut Self>) -> &mut D143     pub fn decoder_pin_mut(self: Pin<&mut Self>) -> &mut D {
144         self.project().inner.project().codec
145     }
146 
147     /// Returns a reference to the read buffer.
read_buffer(&self) -> &BytesMut148     pub fn read_buffer(&self) -> &BytesMut {
149         &self.inner.state.buffer
150     }
151 
152     /// Returns a mutable reference to the read buffer.
read_buffer_mut(&mut self) -> &mut BytesMut153     pub fn read_buffer_mut(&mut self) -> &mut BytesMut {
154         &mut self.inner.state.buffer
155     }
156 }
157 
158 // This impl just defers to the underlying FramedImpl
159 impl<T, D> Stream for FramedRead<T, D>
160 where
161     T: AsyncRead,
162     D: Decoder,
163 {
164     type Item = Result<D::Item, D::Error>;
165 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>166     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
167         self.project().inner.poll_next(cx)
168     }
169 }
170 
171 // This impl just defers to the underlying T: Sink
172 impl<T, I, D> Sink<I> for FramedRead<T, D>
173 where
174     T: Sink<I>,
175 {
176     type Error = T::Error;
177 
poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>178     fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
179         self.project().inner.project().inner.poll_ready(cx)
180     }
181 
start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error>182     fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
183         self.project().inner.project().inner.start_send(item)
184     }
185 
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>186     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
187         self.project().inner.project().inner.poll_flush(cx)
188     }
189 
poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>190     fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
191         self.project().inner.project().inner.poll_close(cx)
192     }
193 }
194 
195 impl<T, D> fmt::Debug for FramedRead<T, D>
196 where
197     T: fmt::Debug,
198     D: fmt::Debug,
199 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result200     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
201         f.debug_struct("FramedRead")
202             .field("inner", &self.get_ref())
203             .field("decoder", &self.decoder())
204             .field("eof", &self.inner.state.eof)
205             .field("is_readable", &self.inner.state.is_readable)
206             .field("buffer", &self.read_buffer())
207             .finish()
208     }
209 }
210