1 //! Frame a stream of bytes based on a length prefix 2 //! 3 //! Many protocols delimit their frames by prefacing frame data with a 4 //! frame head that specifies the length of the frame. The 5 //! `length_delimited` module provides utilities for handling the length 6 //! based framing. This allows the consumer to work with entire frames 7 //! without having to worry about buffering or other framing logic. 8 //! 9 //! # Getting started 10 //! 11 //! If implementing a protocol from scratch, using length delimited framing 12 //! is an easy way to get started. [`LengthDelimitedCodec::new()`] will 13 //! return a length delimited codec using default configuration values. 14 //! This can then be used to construct a framer to adapt a full-duplex 15 //! byte stream into a stream of frames. 16 //! 17 //! ``` 18 //! use tokio::io::{AsyncRead, AsyncWrite}; 19 //! use tokio_util::codec::{Framed, LengthDelimitedCodec}; 20 //! 21 //! fn bind_transport<T: AsyncRead + AsyncWrite>(io: T) 22 //! -> Framed<T, LengthDelimitedCodec> 23 //! { 24 //! Framed::new(io, LengthDelimitedCodec::new()) 25 //! } 26 //! # pub fn main() {} 27 //! ``` 28 //! 29 //! The returned transport implements `Sink + Stream` for `BytesMut`. It 30 //! encodes the frame with a big-endian `u32` header denoting the frame 31 //! payload length: 32 //! 33 //! ```text 34 //! +----------+--------------------------------+ 35 //! | len: u32 | frame payload | 36 //! +----------+--------------------------------+ 37 //! ``` 38 //! 39 //! Specifically, given the following: 40 //! 41 //! ``` 42 //! use tokio::io::{AsyncRead, AsyncWrite}; 43 //! use tokio_util::codec::{Framed, LengthDelimitedCodec}; 44 //! 45 //! use futures::SinkExt; 46 //! use bytes::Bytes; 47 //! 48 //! async fn write_frame<T>(io: T) -> Result<(), Box<dyn std::error::Error>> 49 //! where 50 //! T: AsyncRead + AsyncWrite + Unpin, 51 //! { 52 //! let mut transport = Framed::new(io, LengthDelimitedCodec::new()); 53 //! let frame = Bytes::from("hello world"); 54 //! 55 //! transport.send(frame).await?; 56 //! Ok(()) 57 //! } 58 //! ``` 59 //! 60 //! The encoded frame will look like this: 61 //! 62 //! ```text 63 //! +---- len: u32 ----+---- data ----+ 64 //! | \x00\x00\x00\x0b | hello world | 65 //! +------------------+--------------+ 66 //! ``` 67 //! 68 //! # Decoding 69 //! 70 //! [`FramedRead`] adapts an [`AsyncRead`] into a `Stream` of [`BytesMut`], 71 //! such that each yielded [`BytesMut`] value contains the contents of an 72 //! entire frame. There are many configuration parameters enabling 73 //! [`FramedRead`] to handle a wide range of protocols. Here are some 74 //! examples that will cover the various options at a high level. 75 //! 76 //! ## Example 1 77 //! 78 //! The following will parse a `u16` length field at offset 0, omitting the 79 //! frame head in the yielded `BytesMut`. 80 //! 81 //! ``` 82 //! # use tokio_stream::StreamExt; 83 //! # use tokio_util::codec::LengthDelimitedCodec; 84 //! # #[tokio::main] 85 //! # async fn main() { 86 //! # let io: &[u8] = b"\x00\x0BHello world"; 87 //! let mut reader = LengthDelimitedCodec::builder() 88 //! .length_field_offset(0) // default value 89 //! .length_field_type::<u16>() 90 //! .length_adjustment(0) // default value 91 //! .new_read(io); 92 //! # let res = reader.next().await.unwrap().unwrap().to_vec(); 93 //! # assert_eq!(res, b"Hello world"); 94 //! # } 95 //! ``` 96 //! 97 //! The following frame will be decoded as such: 98 //! 99 //! ```text 100 //! INPUT DECODED 101 //! +-- len ---+--- Payload ---+ +--- Payload ---+ 102 //! | \x00\x0B | Hello world | --> | Hello world | 103 //! +----------+---------------+ +---------------+ 104 //! ``` 105 //! 106 //! The value of the length field is 11 (`\x0B`) which represents the length 107 //! of the payload, `hello world`. By default, [`FramedRead`] assumes that 108 //! the length field represents the number of bytes that **follows** the 109 //! length field. Thus, the entire frame has a length of 13: 2 bytes for the 110 //! frame head + 11 bytes for the payload. 111 //! 112 //! ## Example 2 113 //! 114 //! The following will parse a `u16` length field at offset 0, including the 115 //! frame head in the yielded `BytesMut`. 116 //! 117 //! ``` 118 //! # use tokio_stream::StreamExt; 119 //! # use tokio_util::codec::LengthDelimitedCodec; 120 //! # #[tokio::main] 121 //! # async fn main() { 122 //! # let io: &[u8] = b"\x00\x0BHello world"; 123 //! let mut reader = LengthDelimitedCodec::builder() 124 //! .length_field_offset(0) // default value 125 //! .length_field_type::<u16>() 126 //! .length_adjustment(2) // Add head size to length 127 //! .num_skip(0) // Do NOT skip the head 128 //! .new_read(io); 129 //! # let res = reader.next().await.unwrap().unwrap().to_vec(); 130 //! # assert_eq!(res, b"\x00\x0BHello world"); 131 //! # } 132 //! ``` 133 //! 134 //! The following frame will be decoded as such: 135 //! 136 //! ```text 137 //! INPUT DECODED 138 //! +-- len ---+--- Payload ---+ +-- len ---+--- Payload ---+ 139 //! | \x00\x0B | Hello world | --> | \x00\x0B | Hello world | 140 //! +----------+---------------+ +----------+---------------+ 141 //! ``` 142 //! 143 //! This is similar to the first example, the only difference is that the 144 //! frame head is **included** in the yielded `BytesMut` value. To achieve 145 //! this, we need to add the header size to the length with `length_adjustment`, 146 //! and set `num_skip` to `0` to prevent skipping the head. 147 //! 148 //! ## Example 3 149 //! 150 //! The following will parse a `u16` length field at offset 0, omitting the 151 //! frame head in the yielded `BytesMut`. In this case, the length field 152 //! **includes** the frame head length. 153 //! 154 //! ``` 155 //! # use tokio_stream::StreamExt; 156 //! # use tokio_util::codec::LengthDelimitedCodec; 157 //! # #[tokio::main] 158 //! # async fn main() { 159 //! # let io: &[u8] = b"\x00\x0DHello world"; 160 //! let mut reader = LengthDelimitedCodec::builder() 161 //! .length_field_offset(0) // default value 162 //! .length_field_type::<u16>() 163 //! .length_adjustment(-2) // size of head 164 //! .new_read(io); 165 //! # let res = reader.next().await.unwrap().unwrap().to_vec(); 166 //! # assert_eq!(res, b"Hello world"); 167 //! # } 168 //! ``` 169 //! 170 //! The following frame will be decoded as such: 171 //! 172 //! ```text 173 //! INPUT DECODED 174 //! +-- len ---+--- Payload ---+ +--- Payload ---+ 175 //! | \x00\x0D | Hello world | --> | Hello world | 176 //! +----------+---------------+ +---------------+ 177 //! ``` 178 //! 179 //! In most cases, the length field represents the length of the payload 180 //! only, as shown in the previous examples. However, in some protocols the 181 //! length field represents the length of the whole frame, including the 182 //! head. In such cases, we specify a negative `length_adjustment` to adjust 183 //! the value provided in the frame head to represent the payload length. 184 //! 185 //! ## Example 4 186 //! 187 //! The following will parse a 3 byte length field at offset 0 in a 5 byte 188 //! frame head, including the frame head in the yielded `BytesMut`. 189 //! 190 //! ``` 191 //! # use tokio_stream::StreamExt; 192 //! # use tokio_util::codec::LengthDelimitedCodec; 193 //! # #[tokio::main] 194 //! # async fn main() { 195 //! # let io: &[u8] = b"\x00\x00\x0B\xCA\xFEHello world"; 196 //! let mut reader = LengthDelimitedCodec::builder() 197 //! .length_field_offset(0) // default value 198 //! .length_field_length(3) 199 //! .length_adjustment(3 + 2) // len field and remaining head 200 //! .num_skip(0) 201 //! .new_read(io); 202 //! # let res = reader.next().await.unwrap().unwrap().to_vec(); 203 //! # assert_eq!(res, b"\x00\x00\x0B\xCA\xFEHello world"); 204 //! # } 205 //! ``` 206 //! 207 //! The following frame will be decoded as such: 208 //! 209 //! ```text 210 //! INPUT 211 //! +---- len -----+- head -+--- Payload ---+ 212 //! | \x00\x00\x0B | \xCAFE | Hello world | 213 //! +--------------+--------+---------------+ 214 //! 215 //! DECODED 216 //! +---- len -----+- head -+--- Payload ---+ 217 //! | \x00\x00\x0B | \xCAFE | Hello world | 218 //! +--------------+--------+---------------+ 219 //! ``` 220 //! 221 //! A more advanced example that shows a case where there is extra frame 222 //! head data between the length field and the payload. In such cases, it is 223 //! usually desirable to include the frame head as part of the yielded 224 //! `BytesMut`. This lets consumers of the length delimited framer to 225 //! process the frame head as needed. 226 //! 227 //! The positive `length_adjustment` value lets `FramedRead` factor in the 228 //! additional head into the frame length calculation. 229 //! 230 //! ## Example 5 231 //! 232 //! The following will parse a `u16` length field at offset 1 of a 4 byte 233 //! frame head. The first byte and the length field will be omitted from the 234 //! yielded `BytesMut`, but the trailing 2 bytes of the frame head will be 235 //! included. 236 //! 237 //! ``` 238 //! # use tokio_stream::StreamExt; 239 //! # use tokio_util::codec::LengthDelimitedCodec; 240 //! # #[tokio::main] 241 //! # async fn main() { 242 //! # let io: &[u8] = b"\xCA\x00\x0B\xFEHello world"; 243 //! let mut reader = LengthDelimitedCodec::builder() 244 //! .length_field_offset(1) // length of hdr1 245 //! .length_field_type::<u16>() 246 //! .length_adjustment(1) // length of hdr2 247 //! .num_skip(3) // length of hdr1 + LEN 248 //! .new_read(io); 249 //! # let res = reader.next().await.unwrap().unwrap().to_vec(); 250 //! # assert_eq!(res, b"\xFEHello world"); 251 //! # } 252 //! ``` 253 //! 254 //! The following frame will be decoded as such: 255 //! 256 //! ```text 257 //! INPUT 258 //! +- hdr1 -+-- len ---+- hdr2 -+--- Payload ---+ 259 //! | \xCA | \x00\x0B | \xFE | Hello world | 260 //! +--------+----------+--------+---------------+ 261 //! 262 //! DECODED 263 //! +- hdr2 -+--- Payload ---+ 264 //! | \xFE | Hello world | 265 //! +--------+---------------+ 266 //! ``` 267 //! 268 //! The length field is situated in the middle of the frame head. In this 269 //! case, the first byte in the frame head could be a version or some other 270 //! identifier that is not needed for processing. On the other hand, the 271 //! second half of the head is needed. 272 //! 273 //! `length_field_offset` indicates how many bytes to skip before starting 274 //! to read the length field. `length_adjustment` is the number of bytes to 275 //! skip starting at the end of the length field. In this case, it is the 276 //! second half of the head. 277 //! 278 //! ## Example 6 279 //! 280 //! The following will parse a `u16` length field at offset 1 of a 4 byte 281 //! frame head. The first byte and the length field will be omitted from the 282 //! yielded `BytesMut`, but the trailing 2 bytes of the frame head will be 283 //! included. In this case, the length field **includes** the frame head 284 //! length. 285 //! 286 //! ``` 287 //! # use tokio_stream::StreamExt; 288 //! # use tokio_util::codec::LengthDelimitedCodec; 289 //! # #[tokio::main] 290 //! # async fn main() { 291 //! # let io: &[u8] = b"\xCA\x00\x0F\xFEHello world"; 292 //! let mut reader = LengthDelimitedCodec::builder() 293 //! .length_field_offset(1) // length of hdr1 294 //! .length_field_type::<u16>() 295 //! .length_adjustment(-3) // length of hdr1 + LEN, negative 296 //! .num_skip(3) 297 //! .new_read(io); 298 //! # let res = reader.next().await.unwrap().unwrap().to_vec(); 299 //! # assert_eq!(res, b"\xFEHello world"); 300 //! # } 301 //! ``` 302 //! 303 //! The following frame will be decoded as such: 304 //! 305 //! ```text 306 //! INPUT 307 //! +- hdr1 -+-- len ---+- hdr2 -+--- Payload ---+ 308 //! | \xCA | \x00\x0F | \xFE | Hello world | 309 //! +--------+----------+--------+---------------+ 310 //! 311 //! DECODED 312 //! +- hdr2 -+--- Payload ---+ 313 //! | \xFE | Hello world | 314 //! +--------+---------------+ 315 //! ``` 316 //! 317 //! Similar to the example above, the difference is that the length field 318 //! represents the length of the entire frame instead of just the payload. 319 //! The length of `hdr1` and `len` must be counted in `length_adjustment`. 320 //! Note that the length of `hdr2` does **not** need to be explicitly set 321 //! anywhere because it already is factored into the total frame length that 322 //! is read from the byte stream. 323 //! 324 //! ## Example 7 325 //! 326 //! The following will parse a 3 byte length field at offset 0 in a 4 byte 327 //! frame head, excluding the 4th byte from the yielded `BytesMut`. 328 //! 329 //! ``` 330 //! # use tokio_stream::StreamExt; 331 //! # use tokio_util::codec::LengthDelimitedCodec; 332 //! # #[tokio::main] 333 //! # async fn main() { 334 //! # let io: &[u8] = b"\x00\x00\x0B\xFFHello world"; 335 //! let mut reader = LengthDelimitedCodec::builder() 336 //! .length_field_offset(0) // default value 337 //! .length_field_length(3) 338 //! .length_adjustment(0) // default value 339 //! .num_skip(4) // skip the first 4 bytes 340 //! .new_read(io); 341 //! # let res = reader.next().await.unwrap().unwrap().to_vec(); 342 //! # assert_eq!(res, b"Hello world"); 343 //! # } 344 //! ``` 345 //! 346 //! The following frame will be decoded as such: 347 //! 348 //! ```text 349 //! INPUT DECODED 350 //! +------- len ------+--- Payload ---+ +--- Payload ---+ 351 //! | \x00\x00\x0B\xFF | Hello world | => | Hello world | 352 //! +------------------+---------------+ +---------------+ 353 //! ``` 354 //! 355 //! A simple example where there are unused bytes between the length field 356 //! and the payload. 357 //! 358 //! # Encoding 359 //! 360 //! [`FramedWrite`] adapts an [`AsyncWrite`] into a `Sink` of [`BytesMut`], 361 //! such that each submitted [`BytesMut`] is prefaced by a length field. 362 //! There are fewer configuration options than [`FramedRead`]. Given 363 //! protocols that have more complex frame heads, an encoder should probably 364 //! be written by hand using [`Encoder`]. 365 //! 366 //! Here is a simple example, given a `FramedWrite` with the following 367 //! configuration: 368 //! 369 //! ``` 370 //! # use tokio::io::AsyncWrite; 371 //! # use tokio_util::codec::LengthDelimitedCodec; 372 //! # fn write_frame<T: AsyncWrite>(io: T) { 373 //! # let _ = 374 //! LengthDelimitedCodec::builder() 375 //! .length_field_type::<u16>() 376 //! .new_write(io); 377 //! # } 378 //! # pub fn main() {} 379 //! ``` 380 //! 381 //! A payload of `hello world` will be encoded as: 382 //! 383 //! ```text 384 //! +- len: u16 -+---- data ----+ 385 //! | \x00\x0b | hello world | 386 //! +------------+--------------+ 387 //! ``` 388 //! 389 //! [`LengthDelimitedCodec::new()`]: method@LengthDelimitedCodec::new 390 //! [`FramedRead`]: struct@FramedRead 391 //! [`FramedWrite`]: struct@FramedWrite 392 //! [`AsyncRead`]: trait@tokio::io::AsyncRead 393 //! [`AsyncWrite`]: trait@tokio::io::AsyncWrite 394 //! [`Encoder`]: trait@Encoder 395 //! [`BytesMut`]: bytes::BytesMut 396 397 use crate::codec::{Decoder, Encoder, Framed, FramedRead, FramedWrite}; 398 399 use tokio::io::{AsyncRead, AsyncWrite}; 400 401 use bytes::{Buf, BufMut, Bytes, BytesMut}; 402 use std::error::Error as StdError; 403 use std::io::{self, Cursor}; 404 use std::{cmp, fmt, mem}; 405 406 /// Configure length delimited `LengthDelimitedCodec`s. 407 /// 408 /// `Builder` enables constructing configured length delimited codecs. Note 409 /// that not all configuration settings apply to both encoding and decoding. See 410 /// the documentation for specific methods for more detail. 411 /// 412 /// Note that the if the value of [`Builder::max_frame_length`] becomes larger than 413 /// what can actually fit in [`Builder::length_field_length`], it will be clipped to 414 /// the maximum value that can fit. 415 #[derive(Debug, Clone, Copy)] 416 pub struct Builder { 417 // Maximum frame length 418 max_frame_len: usize, 419 420 // Number of bytes representing the field length 421 length_field_len: usize, 422 423 // Number of bytes in the header before the length field 424 length_field_offset: usize, 425 426 // Adjust the length specified in the header field by this amount 427 length_adjustment: isize, 428 429 // Total number of bytes to skip before reading the payload, if not set, 430 // `length_field_len + length_field_offset` 431 num_skip: Option<usize>, 432 433 // Length field byte order (little or big endian) 434 length_field_is_big_endian: bool, 435 } 436 437 /// An error when the number of bytes read is more than max frame length. 438 pub struct LengthDelimitedCodecError { 439 _priv: (), 440 } 441 442 /// A codec for frames delimited by a frame head specifying their lengths. 443 /// 444 /// This allows the consumer to work with entire frames without having to worry 445 /// about buffering or other framing logic. 446 /// 447 /// See [module level] documentation for more detail. 448 /// 449 /// [module level]: index.html 450 #[derive(Debug, Clone)] 451 pub struct LengthDelimitedCodec { 452 // Configuration values 453 builder: Builder, 454 455 // Read state 456 state: DecodeState, 457 } 458 459 #[derive(Debug, Clone, Copy)] 460 enum DecodeState { 461 Head, 462 Data(usize), 463 } 464 465 // ===== impl LengthDelimitedCodec ====== 466 467 impl LengthDelimitedCodec { 468 /// Creates a new `LengthDelimitedCodec` with the default configuration values. new() -> Self469 pub fn new() -> Self { 470 Self { 471 builder: Builder::new(), 472 state: DecodeState::Head, 473 } 474 } 475 476 /// Creates a new length delimited codec builder with default configuration 477 /// values. builder() -> Builder478 pub fn builder() -> Builder { 479 Builder::new() 480 } 481 482 /// Returns the current max frame setting 483 /// 484 /// This is the largest size this codec will accept from the wire. Larger 485 /// frames will be rejected. max_frame_length(&self) -> usize486 pub fn max_frame_length(&self) -> usize { 487 self.builder.max_frame_len 488 } 489 490 /// Updates the max frame setting. 491 /// 492 /// The change takes effect the next time a frame is decoded. In other 493 /// words, if a frame is currently in process of being decoded with a frame 494 /// size greater than `val` but less than the max frame length in effect 495 /// before calling this function, then the frame will be allowed. set_max_frame_length(&mut self, val: usize)496 pub fn set_max_frame_length(&mut self, val: usize) { 497 self.builder.max_frame_length(val); 498 } 499 decode_head(&mut self, src: &mut BytesMut) -> io::Result<Option<usize>>500 fn decode_head(&mut self, src: &mut BytesMut) -> io::Result<Option<usize>> { 501 let head_len = self.builder.num_head_bytes(); 502 let field_len = self.builder.length_field_len; 503 504 if src.len() < head_len { 505 // Not enough data 506 return Ok(None); 507 } 508 509 let n = { 510 let mut src = Cursor::new(&mut *src); 511 512 // Skip the required bytes 513 src.advance(self.builder.length_field_offset); 514 515 // match endianness 516 let n = if self.builder.length_field_is_big_endian { 517 src.get_uint(field_len) 518 } else { 519 src.get_uint_le(field_len) 520 }; 521 522 if n > self.builder.max_frame_len as u64 { 523 return Err(io::Error::new( 524 io::ErrorKind::InvalidData, 525 LengthDelimitedCodecError { _priv: () }, 526 )); 527 } 528 529 // The check above ensures there is no overflow 530 let n = n as usize; 531 532 // Adjust `n` with bounds checking 533 let n = if self.builder.length_adjustment < 0 { 534 n.checked_sub(-self.builder.length_adjustment as usize) 535 } else { 536 n.checked_add(self.builder.length_adjustment as usize) 537 }; 538 539 // Error handling 540 match n { 541 Some(n) => n, 542 None => { 543 return Err(io::Error::new( 544 io::ErrorKind::InvalidInput, 545 "provided length would overflow after adjustment", 546 )); 547 } 548 } 549 }; 550 551 src.advance(self.builder.get_num_skip()); 552 553 // Ensure that the buffer has enough space to read the incoming 554 // payload 555 src.reserve(n.saturating_sub(src.len())); 556 557 Ok(Some(n)) 558 } 559 decode_data(&self, n: usize, src: &mut BytesMut) -> Option<BytesMut>560 fn decode_data(&self, n: usize, src: &mut BytesMut) -> Option<BytesMut> { 561 // At this point, the buffer has already had the required capacity 562 // reserved. All there is to do is read. 563 if src.len() < n { 564 return None; 565 } 566 567 Some(src.split_to(n)) 568 } 569 } 570 571 impl Decoder for LengthDelimitedCodec { 572 type Item = BytesMut; 573 type Error = io::Error; 574 decode(&mut self, src: &mut BytesMut) -> io::Result<Option<BytesMut>>575 fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<BytesMut>> { 576 let n = match self.state { 577 DecodeState::Head => match self.decode_head(src)? { 578 Some(n) => { 579 self.state = DecodeState::Data(n); 580 n 581 } 582 None => return Ok(None), 583 }, 584 DecodeState::Data(n) => n, 585 }; 586 587 match self.decode_data(n, src) { 588 Some(data) => { 589 // Update the decode state 590 self.state = DecodeState::Head; 591 592 // Make sure the buffer has enough space to read the next head 593 src.reserve(self.builder.num_head_bytes().saturating_sub(src.len())); 594 595 Ok(Some(data)) 596 } 597 None => Ok(None), 598 } 599 } 600 } 601 602 impl Encoder<Bytes> for LengthDelimitedCodec { 603 type Error = io::Error; 604 encode(&mut self, data: Bytes, dst: &mut BytesMut) -> Result<(), io::Error>605 fn encode(&mut self, data: Bytes, dst: &mut BytesMut) -> Result<(), io::Error> { 606 let n = data.len(); 607 608 if n > self.builder.max_frame_len { 609 return Err(io::Error::new( 610 io::ErrorKind::InvalidInput, 611 LengthDelimitedCodecError { _priv: () }, 612 )); 613 } 614 615 // Adjust `n` with bounds checking 616 let n = if self.builder.length_adjustment < 0 { 617 n.checked_add(-self.builder.length_adjustment as usize) 618 } else { 619 n.checked_sub(self.builder.length_adjustment as usize) 620 }; 621 622 let n = n.ok_or_else(|| { 623 io::Error::new( 624 io::ErrorKind::InvalidInput, 625 "provided length would overflow after adjustment", 626 ) 627 })?; 628 629 // Reserve capacity in the destination buffer to fit the frame and 630 // length field (plus adjustment). 631 dst.reserve(self.builder.length_field_len + n); 632 633 if self.builder.length_field_is_big_endian { 634 dst.put_uint(n as u64, self.builder.length_field_len); 635 } else { 636 dst.put_uint_le(n as u64, self.builder.length_field_len); 637 } 638 639 // Write the frame to the buffer 640 dst.extend_from_slice(&data[..]); 641 642 Ok(()) 643 } 644 } 645 646 impl Default for LengthDelimitedCodec { default() -> Self647 fn default() -> Self { 648 Self::new() 649 } 650 } 651 652 // ===== impl Builder ===== 653 654 mod builder { 655 /// Types that can be used with `Builder::length_field_type`. 656 pub trait LengthFieldType {} 657 658 impl LengthFieldType for u8 {} 659 impl LengthFieldType for u16 {} 660 impl LengthFieldType for u32 {} 661 impl LengthFieldType for u64 {} 662 663 #[cfg(any( 664 target_pointer_width = "16", 665 target_pointer_width = "32", 666 target_pointer_width = "64", 667 ))] 668 impl LengthFieldType for usize {} 669 } 670 671 impl Builder { 672 /// Creates a new length delimited codec builder with default configuration 673 /// values. 674 /// 675 /// # Examples 676 /// 677 /// ``` 678 /// # use tokio::io::AsyncRead; 679 /// use tokio_util::codec::LengthDelimitedCodec; 680 /// 681 /// # fn bind_read<T: AsyncRead>(io: T) { 682 /// LengthDelimitedCodec::builder() 683 /// .length_field_offset(0) 684 /// .length_field_type::<u16>() 685 /// .length_adjustment(0) 686 /// .num_skip(0) 687 /// .new_read(io); 688 /// # } 689 /// # pub fn main() {} 690 /// ``` new() -> Builder691 pub fn new() -> Builder { 692 Builder { 693 // Default max frame length of 8MB 694 max_frame_len: 8 * 1_024 * 1_024, 695 696 // Default byte length of 4 697 length_field_len: 4, 698 699 // Default to the header field being at the start of the header. 700 length_field_offset: 0, 701 702 length_adjustment: 0, 703 704 // Total number of bytes to skip before reading the payload, if not set, 705 // `length_field_len + length_field_offset` 706 num_skip: None, 707 708 // Default to reading the length field in network (big) endian. 709 length_field_is_big_endian: true, 710 } 711 } 712 713 /// Read the length field as a big endian integer 714 /// 715 /// This is the default setting. 716 /// 717 /// This configuration option applies to both encoding and decoding. 718 /// 719 /// # Examples 720 /// 721 /// ``` 722 /// # use tokio::io::AsyncRead; 723 /// use tokio_util::codec::LengthDelimitedCodec; 724 /// 725 /// # fn bind_read<T: AsyncRead>(io: T) { 726 /// LengthDelimitedCodec::builder() 727 /// .big_endian() 728 /// .new_read(io); 729 /// # } 730 /// # pub fn main() {} 731 /// ``` big_endian(&mut self) -> &mut Self732 pub fn big_endian(&mut self) -> &mut Self { 733 self.length_field_is_big_endian = true; 734 self 735 } 736 737 /// Read the length field as a little endian integer 738 /// 739 /// The default setting is big endian. 740 /// 741 /// This configuration option applies to both encoding and decoding. 742 /// 743 /// # Examples 744 /// 745 /// ``` 746 /// # use tokio::io::AsyncRead; 747 /// use tokio_util::codec::LengthDelimitedCodec; 748 /// 749 /// # fn bind_read<T: AsyncRead>(io: T) { 750 /// LengthDelimitedCodec::builder() 751 /// .little_endian() 752 /// .new_read(io); 753 /// # } 754 /// # pub fn main() {} 755 /// ``` little_endian(&mut self) -> &mut Self756 pub fn little_endian(&mut self) -> &mut Self { 757 self.length_field_is_big_endian = false; 758 self 759 } 760 761 /// Read the length field as a native endian integer 762 /// 763 /// The default setting is big endian. 764 /// 765 /// This configuration option applies to both encoding and decoding. 766 /// 767 /// # Examples 768 /// 769 /// ``` 770 /// # use tokio::io::AsyncRead; 771 /// use tokio_util::codec::LengthDelimitedCodec; 772 /// 773 /// # fn bind_read<T: AsyncRead>(io: T) { 774 /// LengthDelimitedCodec::builder() 775 /// .native_endian() 776 /// .new_read(io); 777 /// # } 778 /// # pub fn main() {} 779 /// ``` native_endian(&mut self) -> &mut Self780 pub fn native_endian(&mut self) -> &mut Self { 781 if cfg!(target_endian = "big") { 782 self.big_endian() 783 } else { 784 self.little_endian() 785 } 786 } 787 788 /// Sets the max frame length in bytes 789 /// 790 /// This configuration option applies to both encoding and decoding. The 791 /// default value is 8MB. 792 /// 793 /// When decoding, the length field read from the byte stream is checked 794 /// against this setting **before** any adjustments are applied. When 795 /// encoding, the length of the submitted payload is checked against this 796 /// setting. 797 /// 798 /// When frames exceed the max length, an `io::Error` with the custom value 799 /// of the `LengthDelimitedCodecError` type will be returned. 800 /// 801 /// # Examples 802 /// 803 /// ``` 804 /// # use tokio::io::AsyncRead; 805 /// use tokio_util::codec::LengthDelimitedCodec; 806 /// 807 /// # fn bind_read<T: AsyncRead>(io: T) { 808 /// LengthDelimitedCodec::builder() 809 /// .max_frame_length(8 * 1024 * 1024) 810 /// .new_read(io); 811 /// # } 812 /// # pub fn main() {} 813 /// ``` max_frame_length(&mut self, val: usize) -> &mut Self814 pub fn max_frame_length(&mut self, val: usize) -> &mut Self { 815 self.max_frame_len = val; 816 self 817 } 818 819 /// Sets the unsigned integer type used to represent the length field. 820 /// 821 /// The default type is [`u32`]. The max type is [`u64`] (or [`usize`] on 822 /// 64-bit targets). 823 /// 824 /// # Examples 825 /// 826 /// ``` 827 /// # use tokio::io::AsyncRead; 828 /// use tokio_util::codec::LengthDelimitedCodec; 829 /// 830 /// # fn bind_read<T: AsyncRead>(io: T) { 831 /// LengthDelimitedCodec::builder() 832 /// .length_field_type::<u32>() 833 /// .new_read(io); 834 /// # } 835 /// # pub fn main() {} 836 /// ``` 837 /// 838 /// Unlike [`Builder::length_field_length`], this does not fail at runtime 839 /// and instead produces a compile error: 840 /// 841 /// ```compile_fail 842 /// # use tokio::io::AsyncRead; 843 /// # use tokio_util::codec::LengthDelimitedCodec; 844 /// # fn bind_read<T: AsyncRead>(io: T) { 845 /// LengthDelimitedCodec::builder() 846 /// .length_field_type::<u128>() 847 /// .new_read(io); 848 /// # } 849 /// # pub fn main() {} 850 /// ``` length_field_type<T: builder::LengthFieldType>(&mut self) -> &mut Self851 pub fn length_field_type<T: builder::LengthFieldType>(&mut self) -> &mut Self { 852 self.length_field_length(mem::size_of::<T>()) 853 } 854 855 /// Sets the number of bytes used to represent the length field 856 /// 857 /// The default value is `4`. The max value is `8`. 858 /// 859 /// This configuration option applies to both encoding and decoding. 860 /// 861 /// # Examples 862 /// 863 /// ``` 864 /// # use tokio::io::AsyncRead; 865 /// use tokio_util::codec::LengthDelimitedCodec; 866 /// 867 /// # fn bind_read<T: AsyncRead>(io: T) { 868 /// LengthDelimitedCodec::builder() 869 /// .length_field_length(4) 870 /// .new_read(io); 871 /// # } 872 /// # pub fn main() {} 873 /// ``` length_field_length(&mut self, val: usize) -> &mut Self874 pub fn length_field_length(&mut self, val: usize) -> &mut Self { 875 assert!(val > 0 && val <= 8, "invalid length field length"); 876 self.length_field_len = val; 877 self 878 } 879 880 /// Sets the number of bytes in the header before the length field 881 /// 882 /// This configuration option only applies to decoding. 883 /// 884 /// # Examples 885 /// 886 /// ``` 887 /// # use tokio::io::AsyncRead; 888 /// use tokio_util::codec::LengthDelimitedCodec; 889 /// 890 /// # fn bind_read<T: AsyncRead>(io: T) { 891 /// LengthDelimitedCodec::builder() 892 /// .length_field_offset(1) 893 /// .new_read(io); 894 /// # } 895 /// # pub fn main() {} 896 /// ``` length_field_offset(&mut self, val: usize) -> &mut Self897 pub fn length_field_offset(&mut self, val: usize) -> &mut Self { 898 self.length_field_offset = val; 899 self 900 } 901 902 /// Delta between the payload length specified in the header and the real 903 /// payload length 904 /// 905 /// # Examples 906 /// 907 /// ``` 908 /// # use tokio::io::AsyncRead; 909 /// use tokio_util::codec::LengthDelimitedCodec; 910 /// 911 /// # fn bind_read<T: AsyncRead>(io: T) { 912 /// LengthDelimitedCodec::builder() 913 /// .length_adjustment(-2) 914 /// .new_read(io); 915 /// # } 916 /// # pub fn main() {} 917 /// ``` length_adjustment(&mut self, val: isize) -> &mut Self918 pub fn length_adjustment(&mut self, val: isize) -> &mut Self { 919 self.length_adjustment = val; 920 self 921 } 922 923 /// Sets the number of bytes to skip before reading the payload 924 /// 925 /// Default value is `length_field_len + length_field_offset` 926 /// 927 /// This configuration option only applies to decoding 928 /// 929 /// # Examples 930 /// 931 /// ``` 932 /// # use tokio::io::AsyncRead; 933 /// use tokio_util::codec::LengthDelimitedCodec; 934 /// 935 /// # fn bind_read<T: AsyncRead>(io: T) { 936 /// LengthDelimitedCodec::builder() 937 /// .num_skip(4) 938 /// .new_read(io); 939 /// # } 940 /// # pub fn main() {} 941 /// ``` num_skip(&mut self, val: usize) -> &mut Self942 pub fn num_skip(&mut self, val: usize) -> &mut Self { 943 self.num_skip = Some(val); 944 self 945 } 946 947 /// Create a configured length delimited `LengthDelimitedCodec` 948 /// 949 /// # Examples 950 /// 951 /// ``` 952 /// use tokio_util::codec::LengthDelimitedCodec; 953 /// # pub fn main() { 954 /// LengthDelimitedCodec::builder() 955 /// .length_field_offset(0) 956 /// .length_field_type::<u16>() 957 /// .length_adjustment(0) 958 /// .num_skip(0) 959 /// .new_codec(); 960 /// # } 961 /// ``` new_codec(&self) -> LengthDelimitedCodec962 pub fn new_codec(&self) -> LengthDelimitedCodec { 963 let mut builder = *self; 964 965 builder.adjust_max_frame_len(); 966 967 LengthDelimitedCodec { 968 builder, 969 state: DecodeState::Head, 970 } 971 } 972 973 /// Create a configured length delimited `FramedRead` 974 /// 975 /// # Examples 976 /// 977 /// ``` 978 /// # use tokio::io::AsyncRead; 979 /// use tokio_util::codec::LengthDelimitedCodec; 980 /// 981 /// # fn bind_read<T: AsyncRead>(io: T) { 982 /// LengthDelimitedCodec::builder() 983 /// .length_field_offset(0) 984 /// .length_field_type::<u16>() 985 /// .length_adjustment(0) 986 /// .num_skip(0) 987 /// .new_read(io); 988 /// # } 989 /// # pub fn main() {} 990 /// ``` new_read<T>(&self, upstream: T) -> FramedRead<T, LengthDelimitedCodec> where T: AsyncRead,991 pub fn new_read<T>(&self, upstream: T) -> FramedRead<T, LengthDelimitedCodec> 992 where 993 T: AsyncRead, 994 { 995 FramedRead::new(upstream, self.new_codec()) 996 } 997 998 /// Create a configured length delimited `FramedWrite` 999 /// 1000 /// # Examples 1001 /// 1002 /// ``` 1003 /// # use tokio::io::AsyncWrite; 1004 /// # use tokio_util::codec::LengthDelimitedCodec; 1005 /// # fn write_frame<T: AsyncWrite>(io: T) { 1006 /// LengthDelimitedCodec::builder() 1007 /// .length_field_type::<u16>() 1008 /// .new_write(io); 1009 /// # } 1010 /// # pub fn main() {} 1011 /// ``` new_write<T>(&self, inner: T) -> FramedWrite<T, LengthDelimitedCodec> where T: AsyncWrite,1012 pub fn new_write<T>(&self, inner: T) -> FramedWrite<T, LengthDelimitedCodec> 1013 where 1014 T: AsyncWrite, 1015 { 1016 FramedWrite::new(inner, self.new_codec()) 1017 } 1018 1019 /// Create a configured length delimited `Framed` 1020 /// 1021 /// # Examples 1022 /// 1023 /// ``` 1024 /// # use tokio::io::{AsyncRead, AsyncWrite}; 1025 /// # use tokio_util::codec::LengthDelimitedCodec; 1026 /// # fn write_frame<T: AsyncRead + AsyncWrite>(io: T) { 1027 /// # let _ = 1028 /// LengthDelimitedCodec::builder() 1029 /// .length_field_type::<u16>() 1030 /// .new_framed(io); 1031 /// # } 1032 /// # pub fn main() {} 1033 /// ``` new_framed<T>(&self, inner: T) -> Framed<T, LengthDelimitedCodec> where T: AsyncRead + AsyncWrite,1034 pub fn new_framed<T>(&self, inner: T) -> Framed<T, LengthDelimitedCodec> 1035 where 1036 T: AsyncRead + AsyncWrite, 1037 { 1038 Framed::new(inner, self.new_codec()) 1039 } 1040 num_head_bytes(&self) -> usize1041 fn num_head_bytes(&self) -> usize { 1042 let num = self.length_field_offset + self.length_field_len; 1043 cmp::max(num, self.num_skip.unwrap_or(0)) 1044 } 1045 get_num_skip(&self) -> usize1046 fn get_num_skip(&self) -> usize { 1047 self.num_skip 1048 .unwrap_or(self.length_field_offset + self.length_field_len) 1049 } 1050 adjust_max_frame_len(&mut self)1051 fn adjust_max_frame_len(&mut self) { 1052 // Calculate the maximum number that can be represented using `length_field_len` bytes. 1053 let max_number = match 1u64.checked_shl((8 * self.length_field_len) as u32) { 1054 Some(shl) => shl - 1, 1055 None => u64::MAX, 1056 }; 1057 1058 let max_allowed_len = max_number.saturating_add_signed(self.length_adjustment as i64); 1059 1060 if self.max_frame_len as u64 > max_allowed_len { 1061 self.max_frame_len = usize::try_from(max_allowed_len).unwrap_or(usize::MAX); 1062 } 1063 } 1064 } 1065 1066 impl Default for Builder { default() -> Self1067 fn default() -> Self { 1068 Self::new() 1069 } 1070 } 1071 1072 // ===== impl LengthDelimitedCodecError ===== 1073 1074 impl fmt::Debug for LengthDelimitedCodecError { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1075 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 1076 f.debug_struct("LengthDelimitedCodecError").finish() 1077 } 1078 } 1079 1080 impl fmt::Display for LengthDelimitedCodecError { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1081 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 1082 f.write_str("frame size too big") 1083 } 1084 } 1085 1086 impl StdError for LengthDelimitedCodecError {} 1087