1 //! Adaptors from `AsyncRead`/`AsyncWrite` to Stream/Sink
2 //!
3 //! Raw I/O objects work with byte sequences, but higher-level code usually
4 //! wants to batch these into meaningful chunks, called "frames".
5 //!
6 //! This module contains adapters to go from streams of bytes, [`AsyncRead`] and
7 //! [`AsyncWrite`], to framed streams implementing [`Sink`] and [`Stream`].
8 //! Framed streams are also known as transports.
9 //!
10 //! # Example encoding using `LinesCodec`
11 //!
12 //! The following example demonstrates how to use a codec such as [`LinesCodec`] to
13 //! write framed data. [`FramedWrite`] can be used to achieve this. Data sent to
14 //! [`FramedWrite`] are first framed according to a specific codec, and then sent to
15 //! an implementor of [`AsyncWrite`].
16 //!
17 //! ```
18 //! use futures::sink::SinkExt;
19 //! use tokio_util::codec::LinesCodec;
20 //! use tokio_util::codec::FramedWrite;
21 //!
22 //! #[tokio::main]
23 //! async fn main() {
24 //!     let buffer = Vec::new();
25 //!     let messages = vec!["Hello", "World"];
26 //!     let encoder = LinesCodec::new();
27 //!
28 //!     // FramedWrite is a sink which means you can send values into it
29 //!     // asynchronously.
30 //!     let mut writer = FramedWrite::new(buffer, encoder);
31 //!
32 //!     // To be able to send values into a FramedWrite, you need to bring the
33 //!     // `SinkExt` trait into scope.
34 //!     writer.send(messages[0]).await.unwrap();
35 //!     writer.send(messages[1]).await.unwrap();
36 //!
37 //!     let buffer = writer.get_ref();
38 //!
39 //!     assert_eq!(buffer.as_slice(), "Hello\nWorld\n".as_bytes());
40 //! }
41 //!```
42 //!
43 //! # Example decoding using `LinesCodec`
44 //! The following example demonstrates how to use a codec such as [`LinesCodec`] to
45 //! read a stream of framed data. [`FramedRead`] can be used to achieve this. [`FramedRead`]
46 //! will keep reading from an [`AsyncRead`] implementor until a whole frame, according to a codec,
47 //! can be parsed.
48 //!
49 //!```
50 //! use tokio_stream::StreamExt;
51 //! use tokio_util::codec::LinesCodec;
52 //! use tokio_util::codec::FramedRead;
53 //!
54 //! #[tokio::main]
55 //! async fn main() {
56 //!     let message = "Hello\nWorld".as_bytes();
57 //!     let decoder = LinesCodec::new();
58 //!
59 //!     // FramedRead can be used to read a stream of values that are framed according to
60 //!     // a codec. FramedRead will read from its input (here `buffer`) until a whole frame
61 //!     // can be parsed.
62 //!     let mut reader = FramedRead::new(message, decoder);
63 //!
64 //!     // To read values from a FramedRead, you need to bring the
65 //!     // `StreamExt` trait into scope.
66 //!     let frame1 = reader.next().await.unwrap().unwrap();
67 //!     let frame2 = reader.next().await.unwrap().unwrap();
68 //!
69 //!     assert!(reader.next().await.is_none());
70 //!     assert_eq!(frame1, "Hello");
71 //!     assert_eq!(frame2, "World");
72 //! }
73 //! ```
74 //!
75 //! # The Decoder trait
76 //!
77 //! A [`Decoder`] is used together with [`FramedRead`] or [`Framed`] to turn an
78 //! [`AsyncRead`] into a [`Stream`]. The job of the decoder trait is to specify
79 //! how sequences of bytes are turned into a sequence of frames, and to
80 //! determine where the boundaries between frames are.  The job of the
81 //! `FramedRead` is to repeatedly switch between reading more data from the IO
82 //! resource, and asking the decoder whether we have received enough data to
83 //! decode another frame of data.
84 //!
85 //! The main method on the `Decoder` trait is the [`decode`] method. This method
86 //! takes as argument the data that has been read so far, and when it is called,
87 //! it will be in one of the following situations:
88 //!
89 //!  1. The buffer contains less than a full frame.
90 //!  2. The buffer contains exactly a full frame.
91 //!  3. The buffer contains more than a full frame.
92 //!
93 //! In the first situation, the decoder should return `Ok(None)`.
94 //!
95 //! In the second situation, the decoder should clear the provided buffer and
96 //! return `Ok(Some(the_decoded_frame))`.
97 //!
98 //! In the third situation, the decoder should use a method such as [`split_to`]
99 //! or [`advance`] to modify the buffer such that the frame is removed from the
100 //! buffer, but any data in the buffer after that frame should still remain in
101 //! the buffer. The decoder should also return `Ok(Some(the_decoded_frame))` in
102 //! this case.
103 //!
104 //! Finally the decoder may return an error if the data is invalid in some way.
105 //! The decoder should _not_ return an error just because it has yet to receive
106 //! a full frame.
107 //!
108 //! It is guaranteed that, from one call to `decode` to another, the provided
109 //! buffer will contain the exact same data as before, except that if more data
110 //! has arrived through the IO resource, that data will have been appended to
111 //! the buffer.  This means that reading frames from a `FramedRead` is
112 //! essentially equivalent to the following loop:
113 //!
114 //! ```no_run
115 //! use tokio::io::AsyncReadExt;
116 //! # // This uses async_stream to create an example that compiles.
117 //! # fn foo() -> impl futures_core::Stream<Item = std::io::Result<bytes::BytesMut>> { async_stream::try_stream! {
118 //! # use tokio_util::codec::Decoder;
119 //! # let mut decoder = tokio_util::codec::BytesCodec::new();
120 //! # let io_resource = &mut &[0u8, 1, 2, 3][..];
121 //!
122 //! let mut buf = bytes::BytesMut::new();
123 //! loop {
124 //!     // The read_buf call will append to buf rather than overwrite existing data.
125 //!     let len = io_resource.read_buf(&mut buf).await?;
126 //!
127 //!     if len == 0 {
128 //!         while let Some(frame) = decoder.decode_eof(&mut buf)? {
129 //!             yield frame;
130 //!         }
131 //!         break;
132 //!     }
133 //!
134 //!     while let Some(frame) = decoder.decode(&mut buf)? {
135 //!         yield frame;
136 //!     }
137 //! }
138 //! # }}
139 //! ```
140 //! The example above uses `yield` whenever the `Stream` produces an item.
141 //!
142 //! ## Example decoder
143 //!
144 //! As an example, consider a protocol that can be used to send strings where
145 //! each frame is a four byte integer that contains the length of the frame,
146 //! followed by that many bytes of string data. The decoder fails with an error
147 //! if the string data is not valid utf-8 or too long.
148 //!
149 //! Such a decoder can be written like this:
150 //! ```
151 //! use tokio_util::codec::Decoder;
152 //! use bytes::{BytesMut, Buf};
153 //!
154 //! struct MyStringDecoder {}
155 //!
156 //! const MAX: usize = 8 * 1024 * 1024;
157 //!
158 //! impl Decoder for MyStringDecoder {
159 //!     type Item = String;
160 //!     type Error = std::io::Error;
161 //!
162 //!     fn decode(
163 //!         &mut self,
164 //!         src: &mut BytesMut
165 //!     ) -> Result<Option<Self::Item>, Self::Error> {
166 //!         if src.len() < 4 {
167 //!             // Not enough data to read length marker.
168 //!             return Ok(None);
169 //!         }
170 //!
171 //!         // Read length marker.
172 //!         let mut length_bytes = [0u8; 4];
173 //!         length_bytes.copy_from_slice(&src[..4]);
174 //!         let length = u32::from_le_bytes(length_bytes) as usize;
175 //!
176 //!         // Check that the length is not too large to avoid a denial of
177 //!         // service attack where the server runs out of memory.
178 //!         if length > MAX {
179 //!             return Err(std::io::Error::new(
180 //!                 std::io::ErrorKind::InvalidData,
181 //!                 format!("Frame of length {} is too large.", length)
182 //!             ));
183 //!         }
184 //!
185 //!         if src.len() < 4 + length {
186 //!             // The full string has not yet arrived.
187 //!             //
188 //!             // We reserve more space in the buffer. This is not strictly
189 //!             // necessary, but is a good idea performance-wise.
190 //!             src.reserve(4 + length - src.len());
191 //!
192 //!             // We inform the Framed that we need more bytes to form the next
193 //!             // frame.
194 //!             return Ok(None);
195 //!         }
196 //!
197 //!         // Use advance to modify src such that it no longer contains
198 //!         // this frame.
199 //!         let data = src[4..4 + length].to_vec();
200 //!         src.advance(4 + length);
201 //!
202 //!         // Convert the data to a string, or fail if it is not valid utf-8.
203 //!         match String::from_utf8(data) {
204 //!             Ok(string) => Ok(Some(string)),
205 //!             Err(utf8_error) => {
206 //!                 Err(std::io::Error::new(
207 //!                     std::io::ErrorKind::InvalidData,
208 //!                     utf8_error.utf8_error(),
209 //!                 ))
210 //!             },
211 //!         }
212 //!     }
213 //! }
214 //! ```
215 //!
216 //! # The Encoder trait
217 //!
218 //! An [`Encoder`] is used together with [`FramedWrite`] or [`Framed`] to turn
219 //! an [`AsyncWrite`] into a [`Sink`]. The job of the encoder trait is to
220 //! specify how frames are turned into a sequences of bytes.  The job of the
221 //! `FramedWrite` is to take the resulting sequence of bytes and write it to the
222 //! IO resource.
223 //!
224 //! The main method on the `Encoder` trait is the [`encode`] method. This method
225 //! takes an item that is being written, and a buffer to write the item to. The
226 //! buffer may already contain data, and in this case, the encoder should append
227 //! the new frame the to buffer rather than overwrite the existing data.
228 //!
229 //! It is guaranteed that, from one call to `encode` to another, the provided
230 //! buffer will contain the exact same data as before, except that some of the
231 //! data may have been removed from the front of the buffer. Writing to a
232 //! `FramedWrite` is essentially equivalent to the following loop:
233 //!
234 //! ```no_run
235 //! use tokio::io::AsyncWriteExt;
236 //! use bytes::Buf; // for advance
237 //! # use tokio_util::codec::Encoder;
238 //! # async fn next_frame() -> bytes::Bytes { bytes::Bytes::new() }
239 //! # async fn no_more_frames() { }
240 //! # #[tokio::main] async fn main() -> std::io::Result<()> {
241 //! # let mut io_resource = tokio::io::sink();
242 //! # let mut encoder = tokio_util::codec::BytesCodec::new();
243 //!
244 //! const MAX: usize = 8192;
245 //!
246 //! let mut buf = bytes::BytesMut::new();
247 //! loop {
248 //!     tokio::select! {
249 //!         num_written = io_resource.write(&buf), if !buf.is_empty() => {
250 //!             buf.advance(num_written?);
251 //!         },
252 //!         frame = next_frame(), if buf.len() < MAX => {
253 //!             encoder.encode(frame, &mut buf)?;
254 //!         },
255 //!         _ = no_more_frames() => {
256 //!             io_resource.write_all(&buf).await?;
257 //!             io_resource.shutdown().await?;
258 //!             return Ok(());
259 //!         },
260 //!     }
261 //! }
262 //! # }
263 //! ```
264 //! Here the `next_frame` method corresponds to any frames you write to the
265 //! `FramedWrite`. The `no_more_frames` method corresponds to closing the
266 //! `FramedWrite` with [`SinkExt::close`].
267 //!
268 //! ## Example encoder
269 //!
270 //! As an example, consider a protocol that can be used to send strings where
271 //! each frame is a four byte integer that contains the length of the frame,
272 //! followed by that many bytes of string data. The encoder will fail if the
273 //! string is too long.
274 //!
275 //! Such an encoder can be written like this:
276 //! ```
277 //! use tokio_util::codec::Encoder;
278 //! use bytes::BytesMut;
279 //!
280 //! struct MyStringEncoder {}
281 //!
282 //! const MAX: usize = 8 * 1024 * 1024;
283 //!
284 //! impl Encoder<String> for MyStringEncoder {
285 //!     type Error = std::io::Error;
286 //!
287 //!     fn encode(&mut self, item: String, dst: &mut BytesMut) -> Result<(), Self::Error> {
288 //!         // Don't send a string if it is longer than the other end will
289 //!         // accept.
290 //!         if item.len() > MAX {
291 //!             return Err(std::io::Error::new(
292 //!                 std::io::ErrorKind::InvalidData,
293 //!                 format!("Frame of length {} is too large.", item.len())
294 //!             ));
295 //!         }
296 //!
297 //!         // Convert the length into a byte array.
298 //!         // The cast to u32 cannot overflow due to the length check above.
299 //!         let len_slice = u32::to_le_bytes(item.len() as u32);
300 //!
301 //!         // Reserve space in the buffer.
302 //!         dst.reserve(4 + item.len());
303 //!
304 //!         // Write the length and string to the buffer.
305 //!         dst.extend_from_slice(&len_slice);
306 //!         dst.extend_from_slice(item.as_bytes());
307 //!         Ok(())
308 //!     }
309 //! }
310 //! ```
311 //!
312 //! [`AsyncRead`]: tokio::io::AsyncRead
313 //! [`AsyncWrite`]: tokio::io::AsyncWrite
314 //! [`Stream`]: futures_core::Stream
315 //! [`Sink`]: futures_sink::Sink
316 //! [`SinkExt`]: futures::sink::SinkExt
317 //! [`SinkExt::close`]: https://docs.rs/futures/0.3/futures/sink/trait.SinkExt.html#method.close
318 //! [`FramedRead`]: struct@crate::codec::FramedRead
319 //! [`FramedWrite`]: struct@crate::codec::FramedWrite
320 //! [`Framed`]: struct@crate::codec::Framed
321 //! [`Decoder`]: trait@crate::codec::Decoder
322 //! [`decode`]: fn@crate::codec::Decoder::decode
323 //! [`encode`]: fn@crate::codec::Encoder::encode
324 //! [`split_to`]: fn@bytes::BytesMut::split_to
325 //! [`advance`]: fn@bytes::Buf::advance
326 
327 mod bytes_codec;
328 pub use self::bytes_codec::BytesCodec;
329 
330 mod decoder;
331 pub use self::decoder::Decoder;
332 
333 mod encoder;
334 pub use self::encoder::Encoder;
335 
336 mod framed_impl;
337 #[allow(unused_imports)]
338 pub(crate) use self::framed_impl::{FramedImpl, RWFrames, ReadFrame, WriteFrame};
339 
340 mod framed;
341 pub use self::framed::{Framed, FramedParts};
342 
343 mod framed_read;
344 pub use self::framed_read::FramedRead;
345 
346 mod framed_write;
347 pub use self::framed_write::FramedWrite;
348 
349 pub mod length_delimited;
350 pub use self::length_delimited::{LengthDelimitedCodec, LengthDelimitedCodecError};
351 
352 mod lines_codec;
353 pub use self::lines_codec::{LinesCodec, LinesCodecError};
354 
355 mod any_delimiter_codec;
356 pub use self::any_delimiter_codec::{AnyDelimiterCodec, AnyDelimiterCodecError};
357