1 //! Frame a stream of bytes based on a length prefix
2 //!
3 //! Many protocols delimit their frames by prefacing frame data with a
4 //! frame head that specifies the length of the frame. The
5 //! `length_delimited` module provides utilities for handling the length
6 //! based framing. This allows the consumer to work with entire frames
7 //! without having to worry about buffering or other framing logic.
8 //!
9 //! # Getting started
10 //!
11 //! If implementing a protocol from scratch, using length delimited framing
12 //! is an easy way to get started. [`LengthDelimitedCodec::new()`] will
13 //! return a length delimited codec using default configuration values.
14 //! This can then be used to construct a framer to adapt a full-duplex
15 //! byte stream into a stream of frames.
16 //!
17 //! ```
18 //! use tokio::io::{AsyncRead, AsyncWrite};
19 //! use tokio_util::codec::{Framed, LengthDelimitedCodec};
20 //!
21 //! fn bind_transport<T: AsyncRead + AsyncWrite>(io: T)
22 //!     -> Framed<T, LengthDelimitedCodec>
23 //! {
24 //!     Framed::new(io, LengthDelimitedCodec::new())
25 //! }
26 //! # pub fn main() {}
27 //! ```
28 //!
29 //! The returned transport implements `Sink + Stream` for `BytesMut`. It
30 //! encodes the frame with a big-endian `u32` header denoting the frame
31 //! payload length:
32 //!
33 //! ```text
34 //! +----------+--------------------------------+
35 //! | len: u32 |          frame payload         |
36 //! +----------+--------------------------------+
37 //! ```
38 //!
39 //! Specifically, given the following:
40 //!
41 //! ```
42 //! use tokio::io::{AsyncRead, AsyncWrite};
43 //! use tokio_util::codec::{Framed, LengthDelimitedCodec};
44 //!
45 //! use futures::SinkExt;
46 //! use bytes::Bytes;
47 //!
48 //! async fn write_frame<T>(io: T) -> Result<(), Box<dyn std::error::Error>>
49 //! where
50 //!     T: AsyncRead + AsyncWrite + Unpin,
51 //! {
52 //!     let mut transport = Framed::new(io, LengthDelimitedCodec::new());
53 //!     let frame = Bytes::from("hello world");
54 //!
55 //!     transport.send(frame).await?;
56 //!     Ok(())
57 //! }
58 //! ```
59 //!
60 //! The encoded frame will look like this:
61 //!
62 //! ```text
63 //! +---- len: u32 ----+---- data ----+
64 //! | \x00\x00\x00\x0b |  hello world |
65 //! +------------------+--------------+
66 //! ```
67 //!
68 //! # Decoding
69 //!
70 //! [`FramedRead`] adapts an [`AsyncRead`] into a `Stream` of [`BytesMut`],
71 //! such that each yielded [`BytesMut`] value contains the contents of an
72 //! entire frame. There are many configuration parameters enabling
73 //! [`FramedRead`] to handle a wide range of protocols. Here are some
74 //! examples that will cover the various options at a high level.
75 //!
76 //! ## Example 1
77 //!
78 //! The following will parse a `u16` length field at offset 0, omitting the
79 //! frame head in the yielded `BytesMut`.
80 //!
81 //! ```
82 //! # use tokio_stream::StreamExt;
83 //! # use tokio_util::codec::LengthDelimitedCodec;
84 //! # #[tokio::main]
85 //! # async fn main() {
86 //! # let io: &[u8] = b"\x00\x0BHello world";
87 //! let mut reader = LengthDelimitedCodec::builder()
88 //!     .length_field_offset(0) // default value
89 //!     .length_field_type::<u16>()
90 //!     .length_adjustment(0)   // default value
91 //!     .new_read(io);
92 //! # let res = reader.next().await.unwrap().unwrap().to_vec();
93 //! # assert_eq!(res, b"Hello world");
94 //! # }
95 //! ```
96 //!
97 //! The following frame will be decoded as such:
98 //!
99 //! ```text
100 //!          INPUT                        DECODED
101 //! +-- len ---+--- Payload ---+     +--- Payload ---+
102 //! | \x00\x0B |  Hello world  | --> |  Hello world  |
103 //! +----------+---------------+     +---------------+
104 //! ```
105 //!
106 //! The value of the length field is 11 (`\x0B`) which represents the length
107 //! of the payload, `hello world`. By default, [`FramedRead`] assumes that
108 //! the length field represents the number of bytes that **follows** the
109 //! length field. Thus, the entire frame has a length of 13: 2 bytes for the
110 //! frame head + 11 bytes for the payload.
111 //!
112 //! ## Example 2
113 //!
114 //! The following will parse a `u16` length field at offset 0, including the
115 //! frame head in the yielded `BytesMut`.
116 //!
117 //! ```
118 //! # use tokio_stream::StreamExt;
119 //! # use tokio_util::codec::LengthDelimitedCodec;
120 //! # #[tokio::main]
121 //! # async fn main() {
122 //! # let io: &[u8] = b"\x00\x0BHello world";
123 //! let mut reader = LengthDelimitedCodec::builder()
124 //!     .length_field_offset(0) // default value
125 //!     .length_field_type::<u16>()
126 //!     .length_adjustment(2)   // Add head size to length
127 //!     .num_skip(0)            // Do NOT skip the head
128 //!     .new_read(io);
129 //! # let res = reader.next().await.unwrap().unwrap().to_vec();
130 //! # assert_eq!(res, b"\x00\x0BHello world");
131 //! # }
132 //! ```
133 //!
134 //! The following frame will be decoded as such:
135 //!
136 //! ```text
137 //!          INPUT                           DECODED
138 //! +-- len ---+--- Payload ---+     +-- len ---+--- Payload ---+
139 //! | \x00\x0B |  Hello world  | --> | \x00\x0B |  Hello world  |
140 //! +----------+---------------+     +----------+---------------+
141 //! ```
142 //!
143 //! This is similar to the first example, the only difference is that the
144 //! frame head is **included** in the yielded `BytesMut` value. To achieve
145 //! this, we need to add the header size to the length with `length_adjustment`,
146 //! and set `num_skip` to `0` to prevent skipping the head.
147 //!
148 //! ## Example 3
149 //!
150 //! The following will parse a `u16` length field at offset 0, omitting the
151 //! frame head in the yielded `BytesMut`. In this case, the length field
152 //! **includes** the frame head length.
153 //!
154 //! ```
155 //! # use tokio_stream::StreamExt;
156 //! # use tokio_util::codec::LengthDelimitedCodec;
157 //! # #[tokio::main]
158 //! # async fn main() {
159 //! # let io: &[u8] = b"\x00\x0DHello world";
160 //! let mut reader = LengthDelimitedCodec::builder()
161 //!     .length_field_offset(0) // default value
162 //!     .length_field_type::<u16>()
163 //!     .length_adjustment(-2)  // size of head
164 //!     .new_read(io);
165 //! # let res = reader.next().await.unwrap().unwrap().to_vec();
166 //! # assert_eq!(res, b"Hello world");
167 //! # }
168 //! ```
169 //!
170 //! The following frame will be decoded as such:
171 //!
172 //! ```text
173 //!          INPUT                           DECODED
174 //! +-- len ---+--- Payload ---+     +--- Payload ---+
175 //! | \x00\x0D |  Hello world  | --> |  Hello world  |
176 //! +----------+---------------+     +---------------+
177 //! ```
178 //!
179 //! In most cases, the length field represents the length of the payload
180 //! only, as shown in the previous examples. However, in some protocols the
181 //! length field represents the length of the whole frame, including the
182 //! head. In such cases, we specify a negative `length_adjustment` to adjust
183 //! the value provided in the frame head to represent the payload length.
184 //!
185 //! ## Example 4
186 //!
187 //! The following will parse a 3 byte length field at offset 0 in a 5 byte
188 //! frame head, including the frame head in the yielded `BytesMut`.
189 //!
190 //! ```
191 //! # use tokio_stream::StreamExt;
192 //! # use tokio_util::codec::LengthDelimitedCodec;
193 //! # #[tokio::main]
194 //! # async fn main() {
195 //! # let io: &[u8] = b"\x00\x00\x0B\xCA\xFEHello world";
196 //! let mut reader = LengthDelimitedCodec::builder()
197 //!     .length_field_offset(0) // default value
198 //!     .length_field_length(3)
199 //!     .length_adjustment(3 + 2)  // len field and remaining head
200 //!     .num_skip(0)
201 //!     .new_read(io);
202 //! # let res = reader.next().await.unwrap().unwrap().to_vec();
203 //! # assert_eq!(res, b"\x00\x00\x0B\xCA\xFEHello world");
204 //! # }
205 //! ```
206 //!
207 //! The following frame will be decoded as such:
208 //!
209 //! ```text
210 //!                  INPUT
211 //! +---- len -----+- head -+--- Payload ---+
212 //! | \x00\x00\x0B | \xCAFE |  Hello world  |
213 //! +--------------+--------+---------------+
214 //!
215 //!                  DECODED
216 //! +---- len -----+- head -+--- Payload ---+
217 //! | \x00\x00\x0B | \xCAFE |  Hello world  |
218 //! +--------------+--------+---------------+
219 //! ```
220 //!
221 //! A more advanced example that shows a case where there is extra frame
222 //! head data between the length field and the payload. In such cases, it is
223 //! usually desirable to include the frame head as part of the yielded
224 //! `BytesMut`. This lets consumers of the length delimited framer to
225 //! process the frame head as needed.
226 //!
227 //! The positive `length_adjustment` value lets `FramedRead` factor in the
228 //! additional head into the frame length calculation.
229 //!
230 //! ## Example 5
231 //!
232 //! The following will parse a `u16` length field at offset 1 of a 4 byte
233 //! frame head. The first byte and the length field will be omitted from the
234 //! yielded `BytesMut`, but the trailing 2 bytes of the frame head will be
235 //! included.
236 //!
237 //! ```
238 //! # use tokio_stream::StreamExt;
239 //! # use tokio_util::codec::LengthDelimitedCodec;
240 //! # #[tokio::main]
241 //! # async fn main() {
242 //! # let io: &[u8] = b"\xCA\x00\x0B\xFEHello world";
243 //! let mut reader = LengthDelimitedCodec::builder()
244 //!     .length_field_offset(1) // length of hdr1
245 //!     .length_field_type::<u16>()
246 //!     .length_adjustment(1)  // length of hdr2
247 //!     .num_skip(3) // length of hdr1 + LEN
248 //!     .new_read(io);
249 //! # let res = reader.next().await.unwrap().unwrap().to_vec();
250 //! # assert_eq!(res, b"\xFEHello world");
251 //! # }
252 //! ```
253 //!
254 //! The following frame will be decoded as such:
255 //!
256 //! ```text
257 //!                  INPUT
258 //! +- hdr1 -+-- len ---+- hdr2 -+--- Payload ---+
259 //! |  \xCA  | \x00\x0B |  \xFE  |  Hello world  |
260 //! +--------+----------+--------+---------------+
261 //!
262 //!          DECODED
263 //! +- hdr2 -+--- Payload ---+
264 //! |  \xFE  |  Hello world  |
265 //! +--------+---------------+
266 //! ```
267 //!
268 //! The length field is situated in the middle of the frame head. In this
269 //! case, the first byte in the frame head could be a version or some other
270 //! identifier that is not needed for processing. On the other hand, the
271 //! second half of the head is needed.
272 //!
273 //! `length_field_offset` indicates how many bytes to skip before starting
274 //! to read the length field.  `length_adjustment` is the number of bytes to
275 //! skip starting at the end of the length field. In this case, it is the
276 //! second half of the head.
277 //!
278 //! ## Example 6
279 //!
280 //! The following will parse a `u16` length field at offset 1 of a 4 byte
281 //! frame head. The first byte and the length field will be omitted from the
282 //! yielded `BytesMut`, but the trailing 2 bytes of the frame head will be
283 //! included. In this case, the length field **includes** the frame head
284 //! length.
285 //!
286 //! ```
287 //! # use tokio_stream::StreamExt;
288 //! # use tokio_util::codec::LengthDelimitedCodec;
289 //! # #[tokio::main]
290 //! # async fn main() {
291 //! # let io: &[u8] = b"\xCA\x00\x0F\xFEHello world";
292 //! let mut reader = LengthDelimitedCodec::builder()
293 //!     .length_field_offset(1) // length of hdr1
294 //!     .length_field_type::<u16>()
295 //!     .length_adjustment(-3)  // length of hdr1 + LEN, negative
296 //!     .num_skip(3)
297 //!     .new_read(io);
298 //! # let res = reader.next().await.unwrap().unwrap().to_vec();
299 //! # assert_eq!(res, b"\xFEHello world");
300 //! # }
301 //! ```
302 //!
303 //! The following frame will be decoded as such:
304 //!
305 //! ```text
306 //!                  INPUT
307 //! +- hdr1 -+-- len ---+- hdr2 -+--- Payload ---+
308 //! |  \xCA  | \x00\x0F |  \xFE  |  Hello world  |
309 //! +--------+----------+--------+---------------+
310 //!
311 //!          DECODED
312 //! +- hdr2 -+--- Payload ---+
313 //! |  \xFE  |  Hello world  |
314 //! +--------+---------------+
315 //! ```
316 //!
317 //! Similar to the example above, the difference is that the length field
318 //! represents the length of the entire frame instead of just the payload.
319 //! The length of `hdr1` and `len` must be counted in `length_adjustment`.
320 //! Note that the length of `hdr2` does **not** need to be explicitly set
321 //! anywhere because it already is factored into the total frame length that
322 //! is read from the byte stream.
323 //!
324 //! ## Example 7
325 //!
326 //! The following will parse a 3 byte length field at offset 0 in a 4 byte
327 //! frame head, excluding the 4th byte from the yielded `BytesMut`.
328 //!
329 //! ```
330 //! # use tokio_stream::StreamExt;
331 //! # use tokio_util::codec::LengthDelimitedCodec;
332 //! # #[tokio::main]
333 //! # async fn main() {
334 //! # let io: &[u8] = b"\x00\x00\x0B\xFFHello world";
335 //! let mut reader = LengthDelimitedCodec::builder()
336 //!     .length_field_offset(0) // default value
337 //!     .length_field_length(3)
338 //!     .length_adjustment(0)  // default value
339 //!     .num_skip(4) // skip the first 4 bytes
340 //!     .new_read(io);
341 //! # let res = reader.next().await.unwrap().unwrap().to_vec();
342 //! # assert_eq!(res, b"Hello world");
343 //! # }
344 //! ```
345 //!
346 //! The following frame will be decoded as such:
347 //!
348 //! ```text
349 //!                  INPUT                       DECODED
350 //! +------- len ------+--- Payload ---+    +--- Payload ---+
351 //! | \x00\x00\x0B\xFF |  Hello world  | => |  Hello world  |
352 //! +------------------+---------------+    +---------------+
353 //! ```
354 //!
355 //! A simple example where there are unused bytes between the length field
356 //! and the payload.
357 //!
358 //! # Encoding
359 //!
360 //! [`FramedWrite`] adapts an [`AsyncWrite`] into a `Sink` of [`BytesMut`],
361 //! such that each submitted [`BytesMut`] is prefaced by a length field.
362 //! There are fewer configuration options than [`FramedRead`]. Given
363 //! protocols that have more complex frame heads, an encoder should probably
364 //! be written by hand using [`Encoder`].
365 //!
366 //! Here is a simple example, given a `FramedWrite` with the following
367 //! configuration:
368 //!
369 //! ```
370 //! # use tokio::io::AsyncWrite;
371 //! # use tokio_util::codec::LengthDelimitedCodec;
372 //! # fn write_frame<T: AsyncWrite>(io: T) {
373 //! # let _ =
374 //! LengthDelimitedCodec::builder()
375 //!     .length_field_type::<u16>()
376 //!     .new_write(io);
377 //! # }
378 //! # pub fn main() {}
379 //! ```
380 //!
381 //! A payload of `hello world` will be encoded as:
382 //!
383 //! ```text
384 //! +- len: u16 -+---- data ----+
385 //! |  \x00\x0b  |  hello world |
386 //! +------------+--------------+
387 //! ```
388 //!
389 //! [`LengthDelimitedCodec::new()`]: method@LengthDelimitedCodec::new
390 //! [`FramedRead`]: struct@FramedRead
391 //! [`FramedWrite`]: struct@FramedWrite
392 //! [`AsyncRead`]: trait@tokio::io::AsyncRead
393 //! [`AsyncWrite`]: trait@tokio::io::AsyncWrite
394 //! [`Encoder`]: trait@Encoder
395 //! [`BytesMut`]: bytes::BytesMut
396 
397 use crate::codec::{Decoder, Encoder, Framed, FramedRead, FramedWrite};
398 
399 use tokio::io::{AsyncRead, AsyncWrite};
400 
401 use bytes::{Buf, BufMut, Bytes, BytesMut};
402 use std::error::Error as StdError;
403 use std::io::{self, Cursor};
404 use std::{cmp, fmt, mem};
405 
406 /// Configure length delimited `LengthDelimitedCodec`s.
407 ///
408 /// `Builder` enables constructing configured length delimited codecs. Note
409 /// that not all configuration settings apply to both encoding and decoding. See
410 /// the documentation for specific methods for more detail.
411 ///
412 /// Note that the if the value of [`Builder::max_frame_length`] becomes larger than
413 /// what can actually fit in [`Builder::length_field_length`], it will be clipped to
414 /// the maximum value that can fit.
415 #[derive(Debug, Clone, Copy)]
416 pub struct Builder {
417     // Maximum frame length
418     max_frame_len: usize,
419 
420     // Number of bytes representing the field length
421     length_field_len: usize,
422 
423     // Number of bytes in the header before the length field
424     length_field_offset: usize,
425 
426     // Adjust the length specified in the header field by this amount
427     length_adjustment: isize,
428 
429     // Total number of bytes to skip before reading the payload, if not set,
430     // `length_field_len + length_field_offset`
431     num_skip: Option<usize>,
432 
433     // Length field byte order (little or big endian)
434     length_field_is_big_endian: bool,
435 }
436 
437 /// An error when the number of bytes read is more than max frame length.
438 pub struct LengthDelimitedCodecError {
439     _priv: (),
440 }
441 
442 /// A codec for frames delimited by a frame head specifying their lengths.
443 ///
444 /// This allows the consumer to work with entire frames without having to worry
445 /// about buffering or other framing logic.
446 ///
447 /// See [module level] documentation for more detail.
448 ///
449 /// [module level]: index.html
450 #[derive(Debug, Clone)]
451 pub struct LengthDelimitedCodec {
452     // Configuration values
453     builder: Builder,
454 
455     // Read state
456     state: DecodeState,
457 }
458 
459 #[derive(Debug, Clone, Copy)]
460 enum DecodeState {
461     Head,
462     Data(usize),
463 }
464 
465 // ===== impl LengthDelimitedCodec ======
466 
467 impl LengthDelimitedCodec {
468     /// Creates a new `LengthDelimitedCodec` with the default configuration values.
new() -> Self469     pub fn new() -> Self {
470         Self {
471             builder: Builder::new(),
472             state: DecodeState::Head,
473         }
474     }
475 
476     /// Creates a new length delimited codec builder with default configuration
477     /// values.
builder() -> Builder478     pub fn builder() -> Builder {
479         Builder::new()
480     }
481 
482     /// Returns the current max frame setting
483     ///
484     /// This is the largest size this codec will accept from the wire. Larger
485     /// frames will be rejected.
max_frame_length(&self) -> usize486     pub fn max_frame_length(&self) -> usize {
487         self.builder.max_frame_len
488     }
489 
490     /// Updates the max frame setting.
491     ///
492     /// The change takes effect the next time a frame is decoded. In other
493     /// words, if a frame is currently in process of being decoded with a frame
494     /// size greater than `val` but less than the max frame length in effect
495     /// before calling this function, then the frame will be allowed.
set_max_frame_length(&mut self, val: usize)496     pub fn set_max_frame_length(&mut self, val: usize) {
497         self.builder.max_frame_length(val);
498     }
499 
decode_head(&mut self, src: &mut BytesMut) -> io::Result<Option<usize>>500     fn decode_head(&mut self, src: &mut BytesMut) -> io::Result<Option<usize>> {
501         let head_len = self.builder.num_head_bytes();
502         let field_len = self.builder.length_field_len;
503 
504         if src.len() < head_len {
505             // Not enough data
506             return Ok(None);
507         }
508 
509         let n = {
510             let mut src = Cursor::new(&mut *src);
511 
512             // Skip the required bytes
513             src.advance(self.builder.length_field_offset);
514 
515             // match endianness
516             let n = if self.builder.length_field_is_big_endian {
517                 src.get_uint(field_len)
518             } else {
519                 src.get_uint_le(field_len)
520             };
521 
522             if n > self.builder.max_frame_len as u64 {
523                 return Err(io::Error::new(
524                     io::ErrorKind::InvalidData,
525                     LengthDelimitedCodecError { _priv: () },
526                 ));
527             }
528 
529             // The check above ensures there is no overflow
530             let n = n as usize;
531 
532             // Adjust `n` with bounds checking
533             let n = if self.builder.length_adjustment < 0 {
534                 n.checked_sub(-self.builder.length_adjustment as usize)
535             } else {
536                 n.checked_add(self.builder.length_adjustment as usize)
537             };
538 
539             // Error handling
540             match n {
541                 Some(n) => n,
542                 None => {
543                     return Err(io::Error::new(
544                         io::ErrorKind::InvalidInput,
545                         "provided length would overflow after adjustment",
546                     ));
547                 }
548             }
549         };
550 
551         src.advance(self.builder.get_num_skip());
552 
553         // Ensure that the buffer has enough space to read the incoming
554         // payload
555         src.reserve(n.saturating_sub(src.len()));
556 
557         Ok(Some(n))
558     }
559 
decode_data(&self, n: usize, src: &mut BytesMut) -> Option<BytesMut>560     fn decode_data(&self, n: usize, src: &mut BytesMut) -> Option<BytesMut> {
561         // At this point, the buffer has already had the required capacity
562         // reserved. All there is to do is read.
563         if src.len() < n {
564             return None;
565         }
566 
567         Some(src.split_to(n))
568     }
569 }
570 
571 impl Decoder for LengthDelimitedCodec {
572     type Item = BytesMut;
573     type Error = io::Error;
574 
decode(&mut self, src: &mut BytesMut) -> io::Result<Option<BytesMut>>575     fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<BytesMut>> {
576         let n = match self.state {
577             DecodeState::Head => match self.decode_head(src)? {
578                 Some(n) => {
579                     self.state = DecodeState::Data(n);
580                     n
581                 }
582                 None => return Ok(None),
583             },
584             DecodeState::Data(n) => n,
585         };
586 
587         match self.decode_data(n, src) {
588             Some(data) => {
589                 // Update the decode state
590                 self.state = DecodeState::Head;
591 
592                 // Make sure the buffer has enough space to read the next head
593                 src.reserve(self.builder.num_head_bytes().saturating_sub(src.len()));
594 
595                 Ok(Some(data))
596             }
597             None => Ok(None),
598         }
599     }
600 }
601 
602 impl Encoder<Bytes> for LengthDelimitedCodec {
603     type Error = io::Error;
604 
encode(&mut self, data: Bytes, dst: &mut BytesMut) -> Result<(), io::Error>605     fn encode(&mut self, data: Bytes, dst: &mut BytesMut) -> Result<(), io::Error> {
606         let n = data.len();
607 
608         if n > self.builder.max_frame_len {
609             return Err(io::Error::new(
610                 io::ErrorKind::InvalidInput,
611                 LengthDelimitedCodecError { _priv: () },
612             ));
613         }
614 
615         // Adjust `n` with bounds checking
616         let n = if self.builder.length_adjustment < 0 {
617             n.checked_add(-self.builder.length_adjustment as usize)
618         } else {
619             n.checked_sub(self.builder.length_adjustment as usize)
620         };
621 
622         let n = n.ok_or_else(|| {
623             io::Error::new(
624                 io::ErrorKind::InvalidInput,
625                 "provided length would overflow after adjustment",
626             )
627         })?;
628 
629         // Reserve capacity in the destination buffer to fit the frame and
630         // length field (plus adjustment).
631         dst.reserve(self.builder.length_field_len + n);
632 
633         if self.builder.length_field_is_big_endian {
634             dst.put_uint(n as u64, self.builder.length_field_len);
635         } else {
636             dst.put_uint_le(n as u64, self.builder.length_field_len);
637         }
638 
639         // Write the frame to the buffer
640         dst.extend_from_slice(&data[..]);
641 
642         Ok(())
643     }
644 }
645 
646 impl Default for LengthDelimitedCodec {
default() -> Self647     fn default() -> Self {
648         Self::new()
649     }
650 }
651 
652 // ===== impl Builder =====
653 
654 mod builder {
655     /// Types that can be used with `Builder::length_field_type`.
656     pub trait LengthFieldType {}
657 
658     impl LengthFieldType for u8 {}
659     impl LengthFieldType for u16 {}
660     impl LengthFieldType for u32 {}
661     impl LengthFieldType for u64 {}
662 
663     #[cfg(any(
664         target_pointer_width = "16",
665         target_pointer_width = "32",
666         target_pointer_width = "64",
667     ))]
668     impl LengthFieldType for usize {}
669 }
670 
671 impl Builder {
672     /// Creates a new length delimited codec builder with default configuration
673     /// values.
674     ///
675     /// # Examples
676     ///
677     /// ```
678     /// # use tokio::io::AsyncRead;
679     /// use tokio_util::codec::LengthDelimitedCodec;
680     ///
681     /// # fn bind_read<T: AsyncRead>(io: T) {
682     /// LengthDelimitedCodec::builder()
683     ///     .length_field_offset(0)
684     ///     .length_field_type::<u16>()
685     ///     .length_adjustment(0)
686     ///     .num_skip(0)
687     ///     .new_read(io);
688     /// # }
689     /// # pub fn main() {}
690     /// ```
new() -> Builder691     pub fn new() -> Builder {
692         Builder {
693             // Default max frame length of 8MB
694             max_frame_len: 8 * 1_024 * 1_024,
695 
696             // Default byte length of 4
697             length_field_len: 4,
698 
699             // Default to the header field being at the start of the header.
700             length_field_offset: 0,
701 
702             length_adjustment: 0,
703 
704             // Total number of bytes to skip before reading the payload, if not set,
705             // `length_field_len + length_field_offset`
706             num_skip: None,
707 
708             // Default to reading the length field in network (big) endian.
709             length_field_is_big_endian: true,
710         }
711     }
712 
713     /// Read the length field as a big endian integer
714     ///
715     /// This is the default setting.
716     ///
717     /// This configuration option applies to both encoding and decoding.
718     ///
719     /// # Examples
720     ///
721     /// ```
722     /// # use tokio::io::AsyncRead;
723     /// use tokio_util::codec::LengthDelimitedCodec;
724     ///
725     /// # fn bind_read<T: AsyncRead>(io: T) {
726     /// LengthDelimitedCodec::builder()
727     ///     .big_endian()
728     ///     .new_read(io);
729     /// # }
730     /// # pub fn main() {}
731     /// ```
big_endian(&mut self) -> &mut Self732     pub fn big_endian(&mut self) -> &mut Self {
733         self.length_field_is_big_endian = true;
734         self
735     }
736 
737     /// Read the length field as a little endian integer
738     ///
739     /// The default setting is big endian.
740     ///
741     /// This configuration option applies to both encoding and decoding.
742     ///
743     /// # Examples
744     ///
745     /// ```
746     /// # use tokio::io::AsyncRead;
747     /// use tokio_util::codec::LengthDelimitedCodec;
748     ///
749     /// # fn bind_read<T: AsyncRead>(io: T) {
750     /// LengthDelimitedCodec::builder()
751     ///     .little_endian()
752     ///     .new_read(io);
753     /// # }
754     /// # pub fn main() {}
755     /// ```
little_endian(&mut self) -> &mut Self756     pub fn little_endian(&mut self) -> &mut Self {
757         self.length_field_is_big_endian = false;
758         self
759     }
760 
761     /// Read the length field as a native endian integer
762     ///
763     /// The default setting is big endian.
764     ///
765     /// This configuration option applies to both encoding and decoding.
766     ///
767     /// # Examples
768     ///
769     /// ```
770     /// # use tokio::io::AsyncRead;
771     /// use tokio_util::codec::LengthDelimitedCodec;
772     ///
773     /// # fn bind_read<T: AsyncRead>(io: T) {
774     /// LengthDelimitedCodec::builder()
775     ///     .native_endian()
776     ///     .new_read(io);
777     /// # }
778     /// # pub fn main() {}
779     /// ```
native_endian(&mut self) -> &mut Self780     pub fn native_endian(&mut self) -> &mut Self {
781         if cfg!(target_endian = "big") {
782             self.big_endian()
783         } else {
784             self.little_endian()
785         }
786     }
787 
788     /// Sets the max frame length in bytes
789     ///
790     /// This configuration option applies to both encoding and decoding. The
791     /// default value is 8MB.
792     ///
793     /// When decoding, the length field read from the byte stream is checked
794     /// against this setting **before** any adjustments are applied. When
795     /// encoding, the length of the submitted payload is checked against this
796     /// setting.
797     ///
798     /// When frames exceed the max length, an `io::Error` with the custom value
799     /// of the `LengthDelimitedCodecError` type will be returned.
800     ///
801     /// # Examples
802     ///
803     /// ```
804     /// # use tokio::io::AsyncRead;
805     /// use tokio_util::codec::LengthDelimitedCodec;
806     ///
807     /// # fn bind_read<T: AsyncRead>(io: T) {
808     /// LengthDelimitedCodec::builder()
809     ///     .max_frame_length(8 * 1024 * 1024)
810     ///     .new_read(io);
811     /// # }
812     /// # pub fn main() {}
813     /// ```
max_frame_length(&mut self, val: usize) -> &mut Self814     pub fn max_frame_length(&mut self, val: usize) -> &mut Self {
815         self.max_frame_len = val;
816         self
817     }
818 
819     /// Sets the unsigned integer type used to represent the length field.
820     ///
821     /// The default type is [`u32`]. The max type is [`u64`] (or [`usize`] on
822     /// 64-bit targets).
823     ///
824     /// # Examples
825     ///
826     /// ```
827     /// # use tokio::io::AsyncRead;
828     /// use tokio_util::codec::LengthDelimitedCodec;
829     ///
830     /// # fn bind_read<T: AsyncRead>(io: T) {
831     /// LengthDelimitedCodec::builder()
832     ///     .length_field_type::<u32>()
833     ///     .new_read(io);
834     /// # }
835     /// # pub fn main() {}
836     /// ```
837     ///
838     /// Unlike [`Builder::length_field_length`], this does not fail at runtime
839     /// and instead produces a compile error:
840     ///
841     /// ```compile_fail
842     /// # use tokio::io::AsyncRead;
843     /// # use tokio_util::codec::LengthDelimitedCodec;
844     /// # fn bind_read<T: AsyncRead>(io: T) {
845     /// LengthDelimitedCodec::builder()
846     ///     .length_field_type::<u128>()
847     ///     .new_read(io);
848     /// # }
849     /// # pub fn main() {}
850     /// ```
length_field_type<T: builder::LengthFieldType>(&mut self) -> &mut Self851     pub fn length_field_type<T: builder::LengthFieldType>(&mut self) -> &mut Self {
852         self.length_field_length(mem::size_of::<T>())
853     }
854 
855     /// Sets the number of bytes used to represent the length field
856     ///
857     /// The default value is `4`. The max value is `8`.
858     ///
859     /// This configuration option applies to both encoding and decoding.
860     ///
861     /// # Examples
862     ///
863     /// ```
864     /// # use tokio::io::AsyncRead;
865     /// use tokio_util::codec::LengthDelimitedCodec;
866     ///
867     /// # fn bind_read<T: AsyncRead>(io: T) {
868     /// LengthDelimitedCodec::builder()
869     ///     .length_field_length(4)
870     ///     .new_read(io);
871     /// # }
872     /// # pub fn main() {}
873     /// ```
length_field_length(&mut self, val: usize) -> &mut Self874     pub fn length_field_length(&mut self, val: usize) -> &mut Self {
875         assert!(val > 0 && val <= 8, "invalid length field length");
876         self.length_field_len = val;
877         self
878     }
879 
880     /// Sets the number of bytes in the header before the length field
881     ///
882     /// This configuration option only applies to decoding.
883     ///
884     /// # Examples
885     ///
886     /// ```
887     /// # use tokio::io::AsyncRead;
888     /// use tokio_util::codec::LengthDelimitedCodec;
889     ///
890     /// # fn bind_read<T: AsyncRead>(io: T) {
891     /// LengthDelimitedCodec::builder()
892     ///     .length_field_offset(1)
893     ///     .new_read(io);
894     /// # }
895     /// # pub fn main() {}
896     /// ```
length_field_offset(&mut self, val: usize) -> &mut Self897     pub fn length_field_offset(&mut self, val: usize) -> &mut Self {
898         self.length_field_offset = val;
899         self
900     }
901 
902     /// Delta between the payload length specified in the header and the real
903     /// payload length
904     ///
905     /// # Examples
906     ///
907     /// ```
908     /// # use tokio::io::AsyncRead;
909     /// use tokio_util::codec::LengthDelimitedCodec;
910     ///
911     /// # fn bind_read<T: AsyncRead>(io: T) {
912     /// LengthDelimitedCodec::builder()
913     ///     .length_adjustment(-2)
914     ///     .new_read(io);
915     /// # }
916     /// # pub fn main() {}
917     /// ```
length_adjustment(&mut self, val: isize) -> &mut Self918     pub fn length_adjustment(&mut self, val: isize) -> &mut Self {
919         self.length_adjustment = val;
920         self
921     }
922 
923     /// Sets the number of bytes to skip before reading the payload
924     ///
925     /// Default value is `length_field_len + length_field_offset`
926     ///
927     /// This configuration option only applies to decoding
928     ///
929     /// # Examples
930     ///
931     /// ```
932     /// # use tokio::io::AsyncRead;
933     /// use tokio_util::codec::LengthDelimitedCodec;
934     ///
935     /// # fn bind_read<T: AsyncRead>(io: T) {
936     /// LengthDelimitedCodec::builder()
937     ///     .num_skip(4)
938     ///     .new_read(io);
939     /// # }
940     /// # pub fn main() {}
941     /// ```
num_skip(&mut self, val: usize) -> &mut Self942     pub fn num_skip(&mut self, val: usize) -> &mut Self {
943         self.num_skip = Some(val);
944         self
945     }
946 
947     /// Create a configured length delimited `LengthDelimitedCodec`
948     ///
949     /// # Examples
950     ///
951     /// ```
952     /// use tokio_util::codec::LengthDelimitedCodec;
953     /// # pub fn main() {
954     /// LengthDelimitedCodec::builder()
955     ///     .length_field_offset(0)
956     ///     .length_field_type::<u16>()
957     ///     .length_adjustment(0)
958     ///     .num_skip(0)
959     ///     .new_codec();
960     /// # }
961     /// ```
new_codec(&self) -> LengthDelimitedCodec962     pub fn new_codec(&self) -> LengthDelimitedCodec {
963         let mut builder = *self;
964 
965         builder.adjust_max_frame_len();
966 
967         LengthDelimitedCodec {
968             builder,
969             state: DecodeState::Head,
970         }
971     }
972 
973     /// Create a configured length delimited `FramedRead`
974     ///
975     /// # Examples
976     ///
977     /// ```
978     /// # use tokio::io::AsyncRead;
979     /// use tokio_util::codec::LengthDelimitedCodec;
980     ///
981     /// # fn bind_read<T: AsyncRead>(io: T) {
982     /// LengthDelimitedCodec::builder()
983     ///     .length_field_offset(0)
984     ///     .length_field_type::<u16>()
985     ///     .length_adjustment(0)
986     ///     .num_skip(0)
987     ///     .new_read(io);
988     /// # }
989     /// # pub fn main() {}
990     /// ```
new_read<T>(&self, upstream: T) -> FramedRead<T, LengthDelimitedCodec> where T: AsyncRead,991     pub fn new_read<T>(&self, upstream: T) -> FramedRead<T, LengthDelimitedCodec>
992     where
993         T: AsyncRead,
994     {
995         FramedRead::new(upstream, self.new_codec())
996     }
997 
998     /// Create a configured length delimited `FramedWrite`
999     ///
1000     /// # Examples
1001     ///
1002     /// ```
1003     /// # use tokio::io::AsyncWrite;
1004     /// # use tokio_util::codec::LengthDelimitedCodec;
1005     /// # fn write_frame<T: AsyncWrite>(io: T) {
1006     /// LengthDelimitedCodec::builder()
1007     ///     .length_field_type::<u16>()
1008     ///     .new_write(io);
1009     /// # }
1010     /// # pub fn main() {}
1011     /// ```
new_write<T>(&self, inner: T) -> FramedWrite<T, LengthDelimitedCodec> where T: AsyncWrite,1012     pub fn new_write<T>(&self, inner: T) -> FramedWrite<T, LengthDelimitedCodec>
1013     where
1014         T: AsyncWrite,
1015     {
1016         FramedWrite::new(inner, self.new_codec())
1017     }
1018 
1019     /// Create a configured length delimited `Framed`
1020     ///
1021     /// # Examples
1022     ///
1023     /// ```
1024     /// # use tokio::io::{AsyncRead, AsyncWrite};
1025     /// # use tokio_util::codec::LengthDelimitedCodec;
1026     /// # fn write_frame<T: AsyncRead + AsyncWrite>(io: T) {
1027     /// # let _ =
1028     /// LengthDelimitedCodec::builder()
1029     ///     .length_field_type::<u16>()
1030     ///     .new_framed(io);
1031     /// # }
1032     /// # pub fn main() {}
1033     /// ```
new_framed<T>(&self, inner: T) -> Framed<T, LengthDelimitedCodec> where T: AsyncRead + AsyncWrite,1034     pub fn new_framed<T>(&self, inner: T) -> Framed<T, LengthDelimitedCodec>
1035     where
1036         T: AsyncRead + AsyncWrite,
1037     {
1038         Framed::new(inner, self.new_codec())
1039     }
1040 
num_head_bytes(&self) -> usize1041     fn num_head_bytes(&self) -> usize {
1042         let num = self.length_field_offset + self.length_field_len;
1043         cmp::max(num, self.num_skip.unwrap_or(0))
1044     }
1045 
get_num_skip(&self) -> usize1046     fn get_num_skip(&self) -> usize {
1047         self.num_skip
1048             .unwrap_or(self.length_field_offset + self.length_field_len)
1049     }
1050 
adjust_max_frame_len(&mut self)1051     fn adjust_max_frame_len(&mut self) {
1052         // Calculate the maximum number that can be represented using `length_field_len` bytes.
1053         let max_number = match 1u64.checked_shl((8 * self.length_field_len) as u32) {
1054             Some(shl) => shl - 1,
1055             None => u64::MAX,
1056         };
1057 
1058         let max_allowed_len = max_number.saturating_add_signed(self.length_adjustment as i64);
1059 
1060         if self.max_frame_len as u64 > max_allowed_len {
1061             self.max_frame_len = usize::try_from(max_allowed_len).unwrap_or(usize::MAX);
1062         }
1063     }
1064 }
1065 
1066 impl Default for Builder {
default() -> Self1067     fn default() -> Self {
1068         Self::new()
1069     }
1070 }
1071 
1072 // ===== impl LengthDelimitedCodecError =====
1073 
1074 impl fmt::Debug for LengthDelimitedCodecError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1075     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1076         f.debug_struct("LengthDelimitedCodecError").finish()
1077     }
1078 }
1079 
1080 impl fmt::Display for LengthDelimitedCodecError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1081     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1082         f.write_str("frame size too big")
1083     }
1084 }
1085 
1086 impl StdError for LengthDelimitedCodecError {}
1087