1 //! Adaptors from `AsyncRead`/`AsyncWrite` to Stream/Sink 2 //! 3 //! Raw I/O objects work with byte sequences, but higher-level code usually 4 //! wants to batch these into meaningful chunks, called "frames". 5 //! 6 //! This module contains adapters to go from streams of bytes, [`AsyncRead`] and 7 //! [`AsyncWrite`], to framed streams implementing [`Sink`] and [`Stream`]. 8 //! Framed streams are also known as transports. 9 //! 10 //! # Example encoding using `LinesCodec` 11 //! 12 //! The following example demonstrates how to use a codec such as [`LinesCodec`] to 13 //! write framed data. [`FramedWrite`] can be used to achieve this. Data sent to 14 //! [`FramedWrite`] are first framed according to a specific codec, and then sent to 15 //! an implementor of [`AsyncWrite`]. 16 //! 17 //! ``` 18 //! use futures::sink::SinkExt; 19 //! use tokio_util::codec::LinesCodec; 20 //! use tokio_util::codec::FramedWrite; 21 //! 22 //! #[tokio::main] 23 //! async fn main() { 24 //! let buffer = Vec::new(); 25 //! let messages = vec!["Hello", "World"]; 26 //! let encoder = LinesCodec::new(); 27 //! 28 //! // FramedWrite is a sink which means you can send values into it 29 //! // asynchronously. 30 //! let mut writer = FramedWrite::new(buffer, encoder); 31 //! 32 //! // To be able to send values into a FramedWrite, you need to bring the 33 //! // `SinkExt` trait into scope. 34 //! writer.send(messages[0]).await.unwrap(); 35 //! writer.send(messages[1]).await.unwrap(); 36 //! 37 //! let buffer = writer.get_ref(); 38 //! 39 //! assert_eq!(buffer.as_slice(), "Hello\nWorld\n".as_bytes()); 40 //! } 41 //!``` 42 //! 43 //! # Example decoding using `LinesCodec` 44 //! The following example demonstrates how to use a codec such as [`LinesCodec`] to 45 //! read a stream of framed data. [`FramedRead`] can be used to achieve this. [`FramedRead`] 46 //! will keep reading from an [`AsyncRead`] implementor until a whole frame, according to a codec, 47 //! can be parsed. 48 //! 49 //!``` 50 //! use tokio_stream::StreamExt; 51 //! use tokio_util::codec::LinesCodec; 52 //! use tokio_util::codec::FramedRead; 53 //! 54 //! #[tokio::main] 55 //! async fn main() { 56 //! let message = "Hello\nWorld".as_bytes(); 57 //! let decoder = LinesCodec::new(); 58 //! 59 //! // FramedRead can be used to read a stream of values that are framed according to 60 //! // a codec. FramedRead will read from its input (here `buffer`) until a whole frame 61 //! // can be parsed. 62 //! let mut reader = FramedRead::new(message, decoder); 63 //! 64 //! // To read values from a FramedRead, you need to bring the 65 //! // `StreamExt` trait into scope. 66 //! let frame1 = reader.next().await.unwrap().unwrap(); 67 //! let frame2 = reader.next().await.unwrap().unwrap(); 68 //! 69 //! assert!(reader.next().await.is_none()); 70 //! assert_eq!(frame1, "Hello"); 71 //! assert_eq!(frame2, "World"); 72 //! } 73 //! ``` 74 //! 75 //! # The Decoder trait 76 //! 77 //! A [`Decoder`] is used together with [`FramedRead`] or [`Framed`] to turn an 78 //! [`AsyncRead`] into a [`Stream`]. The job of the decoder trait is to specify 79 //! how sequences of bytes are turned into a sequence of frames, and to 80 //! determine where the boundaries between frames are. The job of the 81 //! `FramedRead` is to repeatedly switch between reading more data from the IO 82 //! resource, and asking the decoder whether we have received enough data to 83 //! decode another frame of data. 84 //! 85 //! The main method on the `Decoder` trait is the [`decode`] method. This method 86 //! takes as argument the data that has been read so far, and when it is called, 87 //! it will be in one of the following situations: 88 //! 89 //! 1. The buffer contains less than a full frame. 90 //! 2. The buffer contains exactly a full frame. 91 //! 3. The buffer contains more than a full frame. 92 //! 93 //! In the first situation, the decoder should return `Ok(None)`. 94 //! 95 //! In the second situation, the decoder should clear the provided buffer and 96 //! return `Ok(Some(the_decoded_frame))`. 97 //! 98 //! In the third situation, the decoder should use a method such as [`split_to`] 99 //! or [`advance`] to modify the buffer such that the frame is removed from the 100 //! buffer, but any data in the buffer after that frame should still remain in 101 //! the buffer. The decoder should also return `Ok(Some(the_decoded_frame))` in 102 //! this case. 103 //! 104 //! Finally the decoder may return an error if the data is invalid in some way. 105 //! The decoder should _not_ return an error just because it has yet to receive 106 //! a full frame. 107 //! 108 //! It is guaranteed that, from one call to `decode` to another, the provided 109 //! buffer will contain the exact same data as before, except that if more data 110 //! has arrived through the IO resource, that data will have been appended to 111 //! the buffer. This means that reading frames from a `FramedRead` is 112 //! essentially equivalent to the following loop: 113 //! 114 //! ```no_run 115 //! use tokio::io::AsyncReadExt; 116 //! # // This uses async_stream to create an example that compiles. 117 //! # fn foo() -> impl futures_core::Stream<Item = std::io::Result<bytes::BytesMut>> { async_stream::try_stream! { 118 //! # use tokio_util::codec::Decoder; 119 //! # let mut decoder = tokio_util::codec::BytesCodec::new(); 120 //! # let io_resource = &mut &[0u8, 1, 2, 3][..]; 121 //! 122 //! let mut buf = bytes::BytesMut::new(); 123 //! loop { 124 //! // The read_buf call will append to buf rather than overwrite existing data. 125 //! let len = io_resource.read_buf(&mut buf).await?; 126 //! 127 //! if len == 0 { 128 //! while let Some(frame) = decoder.decode_eof(&mut buf)? { 129 //! yield frame; 130 //! } 131 //! break; 132 //! } 133 //! 134 //! while let Some(frame) = decoder.decode(&mut buf)? { 135 //! yield frame; 136 //! } 137 //! } 138 //! # }} 139 //! ``` 140 //! The example above uses `yield` whenever the `Stream` produces an item. 141 //! 142 //! ## Example decoder 143 //! 144 //! As an example, consider a protocol that can be used to send strings where 145 //! each frame is a four byte integer that contains the length of the frame, 146 //! followed by that many bytes of string data. The decoder fails with an error 147 //! if the string data is not valid utf-8 or too long. 148 //! 149 //! Such a decoder can be written like this: 150 //! ``` 151 //! use tokio_util::codec::Decoder; 152 //! use bytes::{BytesMut, Buf}; 153 //! 154 //! struct MyStringDecoder {} 155 //! 156 //! const MAX: usize = 8 * 1024 * 1024; 157 //! 158 //! impl Decoder for MyStringDecoder { 159 //! type Item = String; 160 //! type Error = std::io::Error; 161 //! 162 //! fn decode( 163 //! &mut self, 164 //! src: &mut BytesMut 165 //! ) -> Result<Option<Self::Item>, Self::Error> { 166 //! if src.len() < 4 { 167 //! // Not enough data to read length marker. 168 //! return Ok(None); 169 //! } 170 //! 171 //! // Read length marker. 172 //! let mut length_bytes = [0u8; 4]; 173 //! length_bytes.copy_from_slice(&src[..4]); 174 //! let length = u32::from_le_bytes(length_bytes) as usize; 175 //! 176 //! // Check that the length is not too large to avoid a denial of 177 //! // service attack where the server runs out of memory. 178 //! if length > MAX { 179 //! return Err(std::io::Error::new( 180 //! std::io::ErrorKind::InvalidData, 181 //! format!("Frame of length {} is too large.", length) 182 //! )); 183 //! } 184 //! 185 //! if src.len() < 4 + length { 186 //! // The full string has not yet arrived. 187 //! // 188 //! // We reserve more space in the buffer. This is not strictly 189 //! // necessary, but is a good idea performance-wise. 190 //! src.reserve(4 + length - src.len()); 191 //! 192 //! // We inform the Framed that we need more bytes to form the next 193 //! // frame. 194 //! return Ok(None); 195 //! } 196 //! 197 //! // Use advance to modify src such that it no longer contains 198 //! // this frame. 199 //! let data = src[4..4 + length].to_vec(); 200 //! src.advance(4 + length); 201 //! 202 //! // Convert the data to a string, or fail if it is not valid utf-8. 203 //! match String::from_utf8(data) { 204 //! Ok(string) => Ok(Some(string)), 205 //! Err(utf8_error) => { 206 //! Err(std::io::Error::new( 207 //! std::io::ErrorKind::InvalidData, 208 //! utf8_error.utf8_error(), 209 //! )) 210 //! }, 211 //! } 212 //! } 213 //! } 214 //! ``` 215 //! 216 //! # The Encoder trait 217 //! 218 //! An [`Encoder`] is used together with [`FramedWrite`] or [`Framed`] to turn 219 //! an [`AsyncWrite`] into a [`Sink`]. The job of the encoder trait is to 220 //! specify how frames are turned into a sequences of bytes. The job of the 221 //! `FramedWrite` is to take the resulting sequence of bytes and write it to the 222 //! IO resource. 223 //! 224 //! The main method on the `Encoder` trait is the [`encode`] method. This method 225 //! takes an item that is being written, and a buffer to write the item to. The 226 //! buffer may already contain data, and in this case, the encoder should append 227 //! the new frame the to buffer rather than overwrite the existing data. 228 //! 229 //! It is guaranteed that, from one call to `encode` to another, the provided 230 //! buffer will contain the exact same data as before, except that some of the 231 //! data may have been removed from the front of the buffer. Writing to a 232 //! `FramedWrite` is essentially equivalent to the following loop: 233 //! 234 //! ```no_run 235 //! use tokio::io::AsyncWriteExt; 236 //! use bytes::Buf; // for advance 237 //! # use tokio_util::codec::Encoder; 238 //! # async fn next_frame() -> bytes::Bytes { bytes::Bytes::new() } 239 //! # async fn no_more_frames() { } 240 //! # #[tokio::main] async fn main() -> std::io::Result<()> { 241 //! # let mut io_resource = tokio::io::sink(); 242 //! # let mut encoder = tokio_util::codec::BytesCodec::new(); 243 //! 244 //! const MAX: usize = 8192; 245 //! 246 //! let mut buf = bytes::BytesMut::new(); 247 //! loop { 248 //! tokio::select! { 249 //! num_written = io_resource.write(&buf), if !buf.is_empty() => { 250 //! buf.advance(num_written?); 251 //! }, 252 //! frame = next_frame(), if buf.len() < MAX => { 253 //! encoder.encode(frame, &mut buf)?; 254 //! }, 255 //! _ = no_more_frames() => { 256 //! io_resource.write_all(&buf).await?; 257 //! io_resource.shutdown().await?; 258 //! return Ok(()); 259 //! }, 260 //! } 261 //! } 262 //! # } 263 //! ``` 264 //! Here the `next_frame` method corresponds to any frames you write to the 265 //! `FramedWrite`. The `no_more_frames` method corresponds to closing the 266 //! `FramedWrite` with [`SinkExt::close`]. 267 //! 268 //! ## Example encoder 269 //! 270 //! As an example, consider a protocol that can be used to send strings where 271 //! each frame is a four byte integer that contains the length of the frame, 272 //! followed by that many bytes of string data. The encoder will fail if the 273 //! string is too long. 274 //! 275 //! Such an encoder can be written like this: 276 //! ``` 277 //! use tokio_util::codec::Encoder; 278 //! use bytes::BytesMut; 279 //! 280 //! struct MyStringEncoder {} 281 //! 282 //! const MAX: usize = 8 * 1024 * 1024; 283 //! 284 //! impl Encoder<String> for MyStringEncoder { 285 //! type Error = std::io::Error; 286 //! 287 //! fn encode(&mut self, item: String, dst: &mut BytesMut) -> Result<(), Self::Error> { 288 //! // Don't send a string if it is longer than the other end will 289 //! // accept. 290 //! if item.len() > MAX { 291 //! return Err(std::io::Error::new( 292 //! std::io::ErrorKind::InvalidData, 293 //! format!("Frame of length {} is too large.", item.len()) 294 //! )); 295 //! } 296 //! 297 //! // Convert the length into a byte array. 298 //! // The cast to u32 cannot overflow due to the length check above. 299 //! let len_slice = u32::to_le_bytes(item.len() as u32); 300 //! 301 //! // Reserve space in the buffer. 302 //! dst.reserve(4 + item.len()); 303 //! 304 //! // Write the length and string to the buffer. 305 //! dst.extend_from_slice(&len_slice); 306 //! dst.extend_from_slice(item.as_bytes()); 307 //! Ok(()) 308 //! } 309 //! } 310 //! ``` 311 //! 312 //! [`AsyncRead`]: tokio::io::AsyncRead 313 //! [`AsyncWrite`]: tokio::io::AsyncWrite 314 //! [`Stream`]: futures_core::Stream 315 //! [`Sink`]: futures_sink::Sink 316 //! [`SinkExt`]: futures::sink::SinkExt 317 //! [`SinkExt::close`]: https://docs.rs/futures/0.3/futures/sink/trait.SinkExt.html#method.close 318 //! [`FramedRead`]: struct@crate::codec::FramedRead 319 //! [`FramedWrite`]: struct@crate::codec::FramedWrite 320 //! [`Framed`]: struct@crate::codec::Framed 321 //! [`Decoder`]: trait@crate::codec::Decoder 322 //! [`decode`]: fn@crate::codec::Decoder::decode 323 //! [`encode`]: fn@crate::codec::Encoder::encode 324 //! [`split_to`]: fn@bytes::BytesMut::split_to 325 //! [`advance`]: fn@bytes::Buf::advance 326 327 mod bytes_codec; 328 pub use self::bytes_codec::BytesCodec; 329 330 mod decoder; 331 pub use self::decoder::Decoder; 332 333 mod encoder; 334 pub use self::encoder::Encoder; 335 336 mod framed_impl; 337 #[allow(unused_imports)] 338 pub(crate) use self::framed_impl::{FramedImpl, RWFrames, ReadFrame, WriteFrame}; 339 340 mod framed; 341 pub use self::framed::{Framed, FramedParts}; 342 343 mod framed_read; 344 pub use self::framed_read::FramedRead; 345 346 mod framed_write; 347 pub use self::framed_write::FramedWrite; 348 349 pub mod length_delimited; 350 pub use self::length_delimited::{LengthDelimitedCodec, LengthDelimitedCodecError}; 351 352 mod lines_codec; 353 pub use self::lines_codec::{LinesCodec, LinesCodecError}; 354 355 mod any_delimiter_codec; 356 pub use self::any_delimiter_codec::{AnyDelimiterCodec, AnyDelimiterCodecError}; 357