1 use std::error::Error as StdError; 2 use std::fmt; 3 use std::io; 4 use std::task::{Context, Poll}; 5 use std::usize; 6 7 use bytes::Bytes; 8 use tracing::{debug, trace}; 9 10 use super::io::MemRead; 11 use super::DecodedLength; 12 13 use self::Kind::{Chunked, Eof, Length}; 14 15 /// Maximum amount of bytes allowed in chunked extensions. 16 /// 17 /// This limit is currentlty applied for the entire body, not per chunk. 18 const CHUNKED_EXTENSIONS_LIMIT: u64 = 1024 * 16; 19 20 /// Decoders to handle different Transfer-Encodings. 21 /// 22 /// If a message body does not include a Transfer-Encoding, it *should* 23 /// include a Content-Length header. 24 #[derive(Clone, PartialEq)] 25 pub(crate) struct Decoder { 26 kind: Kind, 27 } 28 29 #[derive(Debug, Clone, Copy, PartialEq)] 30 enum Kind { 31 /// A Reader used when a Content-Length header is passed with a positive integer. 32 Length(u64), 33 /// A Reader used when Transfer-Encoding is `chunked`. 34 Chunked { 35 state: ChunkedState, 36 chunk_len: u64, 37 extensions_cnt: u64, 38 }, 39 /// A Reader used for responses that don't indicate a length or chunked. 40 /// 41 /// The bool tracks when EOF is seen on the transport. 42 /// 43 /// Note: This should only used for `Response`s. It is illegal for a 44 /// `Request` to be made with both `Content-Length` and 45 /// `Transfer-Encoding: chunked` missing, as explained from the spec: 46 /// 47 /// > If a Transfer-Encoding header field is present in a response and 48 /// > the chunked transfer coding is not the final encoding, the 49 /// > message body length is determined by reading the connection until 50 /// > it is closed by the server. If a Transfer-Encoding header field 51 /// > is present in a request and the chunked transfer coding is not 52 /// > the final encoding, the message body length cannot be determined 53 /// > reliably; the server MUST respond with the 400 (Bad Request) 54 /// > status code and then close the connection. 55 Eof(bool), 56 } 57 58 #[derive(Debug, PartialEq, Clone, Copy)] 59 enum ChunkedState { 60 Start, 61 Size, 62 SizeLws, 63 Extension, 64 SizeLf, 65 Body, 66 BodyCr, 67 BodyLf, 68 Trailer, 69 TrailerLf, 70 EndCr, 71 EndLf, 72 End, 73 } 74 75 impl Decoder { 76 // constructors 77 length(x: u64) -> Decoder78 pub(crate) fn length(x: u64) -> Decoder { 79 Decoder { 80 kind: Kind::Length(x), 81 } 82 } 83 chunked() -> Decoder84 pub(crate) fn chunked() -> Decoder { 85 Decoder { 86 kind: Kind::Chunked { 87 state: ChunkedState::new(), 88 chunk_len: 0, 89 extensions_cnt: 0, 90 }, 91 } 92 } 93 eof() -> Decoder94 pub(crate) fn eof() -> Decoder { 95 Decoder { 96 kind: Kind::Eof(false), 97 } 98 } 99 new(len: DecodedLength) -> Self100 pub(super) fn new(len: DecodedLength) -> Self { 101 match len { 102 DecodedLength::CHUNKED => Decoder::chunked(), 103 DecodedLength::CLOSE_DELIMITED => Decoder::eof(), 104 length => Decoder::length(length.danger_len()), 105 } 106 } 107 108 // methods 109 is_eof(&self) -> bool110 pub(crate) fn is_eof(&self) -> bool { 111 matches!( 112 self.kind, 113 Length(0) 114 | Chunked { 115 state: ChunkedState::End, 116 .. 117 } 118 | Eof(true) 119 ) 120 } 121 decode<R: MemRead>( &mut self, cx: &mut Context<'_>, body: &mut R, ) -> Poll<Result<Bytes, io::Error>>122 pub(crate) fn decode<R: MemRead>( 123 &mut self, 124 cx: &mut Context<'_>, 125 body: &mut R, 126 ) -> Poll<Result<Bytes, io::Error>> { 127 trace!("decode; state={:?}", self.kind); 128 match self.kind { 129 Length(ref mut remaining) => { 130 if *remaining == 0 { 131 Poll::Ready(Ok(Bytes::new())) 132 } else { 133 let to_read = *remaining as usize; 134 let buf = ready!(body.read_mem(cx, to_read))?; 135 let num = buf.as_ref().len() as u64; 136 if num > *remaining { 137 *remaining = 0; 138 } else if num == 0 { 139 return Poll::Ready(Err(io::Error::new( 140 io::ErrorKind::UnexpectedEof, 141 IncompleteBody, 142 ))); 143 } else { 144 *remaining -= num; 145 } 146 Poll::Ready(Ok(buf)) 147 } 148 } 149 Chunked { 150 ref mut state, 151 ref mut chunk_len, 152 ref mut extensions_cnt, 153 } => { 154 loop { 155 let mut buf = None; 156 // advances the chunked state 157 *state = ready!(state.step(cx, body, chunk_len, extensions_cnt, &mut buf))?; 158 if *state == ChunkedState::End { 159 trace!("end of chunked"); 160 return Poll::Ready(Ok(Bytes::new())); 161 } 162 if let Some(buf) = buf { 163 return Poll::Ready(Ok(buf)); 164 } 165 } 166 } 167 Eof(ref mut is_eof) => { 168 if *is_eof { 169 Poll::Ready(Ok(Bytes::new())) 170 } else { 171 // 8192 chosen because its about 2 packets, there probably 172 // won't be that much available, so don't have MemReaders 173 // allocate buffers to big 174 body.read_mem(cx, 8192).map_ok(|slice| { 175 *is_eof = slice.is_empty(); 176 slice 177 }) 178 } 179 } 180 } 181 } 182 183 #[cfg(test)] decode_fut<R: MemRead>(&mut self, body: &mut R) -> Result<Bytes, io::Error>184 async fn decode_fut<R: MemRead>(&mut self, body: &mut R) -> Result<Bytes, io::Error> { 185 futures_util::future::poll_fn(move |cx| self.decode(cx, body)).await 186 } 187 } 188 189 impl fmt::Debug for Decoder { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result190 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 191 fmt::Debug::fmt(&self.kind, f) 192 } 193 } 194 195 macro_rules! byte ( 196 ($rdr:ident, $cx:expr) => ({ 197 let buf = ready!($rdr.read_mem($cx, 1))?; 198 if !buf.is_empty() { 199 buf[0] 200 } else { 201 return Poll::Ready(Err(io::Error::new(io::ErrorKind::UnexpectedEof, 202 "unexpected EOF during chunk size line"))); 203 } 204 }) 205 ); 206 207 macro_rules! or_overflow { 208 ($e:expr) => ( 209 match $e { 210 Some(val) => val, 211 None => return Poll::Ready(Err(io::Error::new( 212 io::ErrorKind::InvalidData, 213 "invalid chunk size: overflow", 214 ))), 215 } 216 ) 217 } 218 219 impl ChunkedState { new() -> ChunkedState220 fn new() -> ChunkedState { 221 ChunkedState::Start 222 } step<R: MemRead>( &self, cx: &mut Context<'_>, body: &mut R, size: &mut u64, extensions_cnt: &mut u64, buf: &mut Option<Bytes>, ) -> Poll<Result<ChunkedState, io::Error>>223 fn step<R: MemRead>( 224 &self, 225 cx: &mut Context<'_>, 226 body: &mut R, 227 size: &mut u64, 228 extensions_cnt: &mut u64, 229 buf: &mut Option<Bytes>, 230 ) -> Poll<Result<ChunkedState, io::Error>> { 231 use self::ChunkedState::*; 232 match *self { 233 Start => ChunkedState::read_start(cx, body, size), 234 Size => ChunkedState::read_size(cx, body, size), 235 SizeLws => ChunkedState::read_size_lws(cx, body), 236 Extension => ChunkedState::read_extension(cx, body, extensions_cnt), 237 SizeLf => ChunkedState::read_size_lf(cx, body, *size), 238 Body => ChunkedState::read_body(cx, body, size, buf), 239 BodyCr => ChunkedState::read_body_cr(cx, body), 240 BodyLf => ChunkedState::read_body_lf(cx, body), 241 Trailer => ChunkedState::read_trailer(cx, body), 242 TrailerLf => ChunkedState::read_trailer_lf(cx, body), 243 EndCr => ChunkedState::read_end_cr(cx, body), 244 EndLf => ChunkedState::read_end_lf(cx, body), 245 End => Poll::Ready(Ok(ChunkedState::End)), 246 } 247 } 248 read_start<R: MemRead>( cx: &mut Context<'_>, rdr: &mut R, size: &mut u64, ) -> Poll<Result<ChunkedState, io::Error>>249 fn read_start<R: MemRead>( 250 cx: &mut Context<'_>, 251 rdr: &mut R, 252 size: &mut u64, 253 ) -> Poll<Result<ChunkedState, io::Error>> { 254 trace!("Read chunk start"); 255 256 let radix = 16; 257 match byte!(rdr, cx) { 258 b @ b'0'..=b'9' => { 259 *size = or_overflow!(size.checked_mul(radix)); 260 *size = or_overflow!(size.checked_add((b - b'0') as u64)); 261 } 262 b @ b'a'..=b'f' => { 263 *size = or_overflow!(size.checked_mul(radix)); 264 *size = or_overflow!(size.checked_add((b + 10 - b'a') as u64)); 265 } 266 b @ b'A'..=b'F' => { 267 *size = or_overflow!(size.checked_mul(radix)); 268 *size = or_overflow!(size.checked_add((b + 10 - b'A') as u64)); 269 } 270 _ => { 271 return Poll::Ready(Err(io::Error::new( 272 io::ErrorKind::InvalidInput, 273 "Invalid chunk size line: missing size digit", 274 ))); 275 } 276 } 277 278 Poll::Ready(Ok(ChunkedState::Size)) 279 } 280 read_size<R: MemRead>( cx: &mut Context<'_>, rdr: &mut R, size: &mut u64, ) -> Poll<Result<ChunkedState, io::Error>>281 fn read_size<R: MemRead>( 282 cx: &mut Context<'_>, 283 rdr: &mut R, 284 size: &mut u64, 285 ) -> Poll<Result<ChunkedState, io::Error>> { 286 trace!("Read chunk hex size"); 287 288 let radix = 16; 289 match byte!(rdr, cx) { 290 b @ b'0'..=b'9' => { 291 *size = or_overflow!(size.checked_mul(radix)); 292 *size = or_overflow!(size.checked_add((b - b'0') as u64)); 293 } 294 b @ b'a'..=b'f' => { 295 *size = or_overflow!(size.checked_mul(radix)); 296 *size = or_overflow!(size.checked_add((b + 10 - b'a') as u64)); 297 } 298 b @ b'A'..=b'F' => { 299 *size = or_overflow!(size.checked_mul(radix)); 300 *size = or_overflow!(size.checked_add((b + 10 - b'A') as u64)); 301 } 302 b'\t' | b' ' => return Poll::Ready(Ok(ChunkedState::SizeLws)), 303 b';' => return Poll::Ready(Ok(ChunkedState::Extension)), 304 b'\r' => return Poll::Ready(Ok(ChunkedState::SizeLf)), 305 _ => { 306 return Poll::Ready(Err(io::Error::new( 307 io::ErrorKind::InvalidInput, 308 "Invalid chunk size line: Invalid Size", 309 ))); 310 } 311 } 312 Poll::Ready(Ok(ChunkedState::Size)) 313 } read_size_lws<R: MemRead>( cx: &mut Context<'_>, rdr: &mut R, ) -> Poll<Result<ChunkedState, io::Error>>314 fn read_size_lws<R: MemRead>( 315 cx: &mut Context<'_>, 316 rdr: &mut R, 317 ) -> Poll<Result<ChunkedState, io::Error>> { 318 trace!("read_size_lws"); 319 match byte!(rdr, cx) { 320 // LWS can follow the chunk size, but no more digits can come 321 b'\t' | b' ' => Poll::Ready(Ok(ChunkedState::SizeLws)), 322 b';' => Poll::Ready(Ok(ChunkedState::Extension)), 323 b'\r' => Poll::Ready(Ok(ChunkedState::SizeLf)), 324 _ => Poll::Ready(Err(io::Error::new( 325 io::ErrorKind::InvalidInput, 326 "Invalid chunk size linear white space", 327 ))), 328 } 329 } read_extension<R: MemRead>( cx: &mut Context<'_>, rdr: &mut R, extensions_cnt: &mut u64, ) -> Poll<Result<ChunkedState, io::Error>>330 fn read_extension<R: MemRead>( 331 cx: &mut Context<'_>, 332 rdr: &mut R, 333 extensions_cnt: &mut u64, 334 ) -> Poll<Result<ChunkedState, io::Error>> { 335 trace!("read_extension"); 336 // We don't care about extensions really at all. Just ignore them. 337 // They "end" at the next CRLF. 338 // 339 // However, some implementations may not check for the CR, so to save 340 // them from themselves, we reject extensions containing plain LF as 341 // well. 342 match byte!(rdr, cx) { 343 b'\r' => Poll::Ready(Ok(ChunkedState::SizeLf)), 344 b'\n' => Poll::Ready(Err(io::Error::new( 345 io::ErrorKind::InvalidData, 346 "invalid chunk extension contains newline", 347 ))), 348 _ => { 349 *extensions_cnt += 1; 350 if *extensions_cnt >= CHUNKED_EXTENSIONS_LIMIT { 351 Poll::Ready(Err(io::Error::new( 352 io::ErrorKind::InvalidData, 353 "chunk extensions over limit", 354 ))) 355 } else { 356 Poll::Ready(Ok(ChunkedState::Extension)) 357 } 358 } // no supported extensions 359 } 360 } read_size_lf<R: MemRead>( cx: &mut Context<'_>, rdr: &mut R, size: u64, ) -> Poll<Result<ChunkedState, io::Error>>361 fn read_size_lf<R: MemRead>( 362 cx: &mut Context<'_>, 363 rdr: &mut R, 364 size: u64, 365 ) -> Poll<Result<ChunkedState, io::Error>> { 366 trace!("Chunk size is {:?}", size); 367 match byte!(rdr, cx) { 368 b'\n' => { 369 if size == 0 { 370 Poll::Ready(Ok(ChunkedState::EndCr)) 371 } else { 372 debug!("incoming chunked header: {0:#X} ({0} bytes)", size); 373 Poll::Ready(Ok(ChunkedState::Body)) 374 } 375 } 376 _ => Poll::Ready(Err(io::Error::new( 377 io::ErrorKind::InvalidInput, 378 "Invalid chunk size LF", 379 ))), 380 } 381 } 382 read_body<R: MemRead>( cx: &mut Context<'_>, rdr: &mut R, rem: &mut u64, buf: &mut Option<Bytes>, ) -> Poll<Result<ChunkedState, io::Error>>383 fn read_body<R: MemRead>( 384 cx: &mut Context<'_>, 385 rdr: &mut R, 386 rem: &mut u64, 387 buf: &mut Option<Bytes>, 388 ) -> Poll<Result<ChunkedState, io::Error>> { 389 trace!("Chunked read, remaining={:?}", rem); 390 391 // cap remaining bytes at the max capacity of usize 392 let rem_cap = match *rem { 393 r if r > usize::MAX as u64 => usize::MAX, 394 r => r as usize, 395 }; 396 397 let to_read = rem_cap; 398 let slice = ready!(rdr.read_mem(cx, to_read))?; 399 let count = slice.len(); 400 401 if count == 0 { 402 *rem = 0; 403 return Poll::Ready(Err(io::Error::new( 404 io::ErrorKind::UnexpectedEof, 405 IncompleteBody, 406 ))); 407 } 408 *buf = Some(slice); 409 *rem -= count as u64; 410 411 if *rem > 0 { 412 Poll::Ready(Ok(ChunkedState::Body)) 413 } else { 414 Poll::Ready(Ok(ChunkedState::BodyCr)) 415 } 416 } read_body_cr<R: MemRead>( cx: &mut Context<'_>, rdr: &mut R, ) -> Poll<Result<ChunkedState, io::Error>>417 fn read_body_cr<R: MemRead>( 418 cx: &mut Context<'_>, 419 rdr: &mut R, 420 ) -> Poll<Result<ChunkedState, io::Error>> { 421 match byte!(rdr, cx) { 422 b'\r' => Poll::Ready(Ok(ChunkedState::BodyLf)), 423 _ => Poll::Ready(Err(io::Error::new( 424 io::ErrorKind::InvalidInput, 425 "Invalid chunk body CR", 426 ))), 427 } 428 } read_body_lf<R: MemRead>( cx: &mut Context<'_>, rdr: &mut R, ) -> Poll<Result<ChunkedState, io::Error>>429 fn read_body_lf<R: MemRead>( 430 cx: &mut Context<'_>, 431 rdr: &mut R, 432 ) -> Poll<Result<ChunkedState, io::Error>> { 433 match byte!(rdr, cx) { 434 b'\n' => Poll::Ready(Ok(ChunkedState::Size)), 435 _ => Poll::Ready(Err(io::Error::new( 436 io::ErrorKind::InvalidInput, 437 "Invalid chunk body LF", 438 ))), 439 } 440 } 441 read_trailer<R: MemRead>( cx: &mut Context<'_>, rdr: &mut R, ) -> Poll<Result<ChunkedState, io::Error>>442 fn read_trailer<R: MemRead>( 443 cx: &mut Context<'_>, 444 rdr: &mut R, 445 ) -> Poll<Result<ChunkedState, io::Error>> { 446 trace!("read_trailer"); 447 match byte!(rdr, cx) { 448 b'\r' => Poll::Ready(Ok(ChunkedState::TrailerLf)), 449 _ => Poll::Ready(Ok(ChunkedState::Trailer)), 450 } 451 } read_trailer_lf<R: MemRead>( cx: &mut Context<'_>, rdr: &mut R, ) -> Poll<Result<ChunkedState, io::Error>>452 fn read_trailer_lf<R: MemRead>( 453 cx: &mut Context<'_>, 454 rdr: &mut R, 455 ) -> Poll<Result<ChunkedState, io::Error>> { 456 match byte!(rdr, cx) { 457 b'\n' => Poll::Ready(Ok(ChunkedState::EndCr)), 458 _ => Poll::Ready(Err(io::Error::new( 459 io::ErrorKind::InvalidInput, 460 "Invalid trailer end LF", 461 ))), 462 } 463 } 464 read_end_cr<R: MemRead>( cx: &mut Context<'_>, rdr: &mut R, ) -> Poll<Result<ChunkedState, io::Error>>465 fn read_end_cr<R: MemRead>( 466 cx: &mut Context<'_>, 467 rdr: &mut R, 468 ) -> Poll<Result<ChunkedState, io::Error>> { 469 match byte!(rdr, cx) { 470 b'\r' => Poll::Ready(Ok(ChunkedState::EndLf)), 471 _ => Poll::Ready(Ok(ChunkedState::Trailer)), 472 } 473 } read_end_lf<R: MemRead>( cx: &mut Context<'_>, rdr: &mut R, ) -> Poll<Result<ChunkedState, io::Error>>474 fn read_end_lf<R: MemRead>( 475 cx: &mut Context<'_>, 476 rdr: &mut R, 477 ) -> Poll<Result<ChunkedState, io::Error>> { 478 match byte!(rdr, cx) { 479 b'\n' => Poll::Ready(Ok(ChunkedState::End)), 480 _ => Poll::Ready(Err(io::Error::new( 481 io::ErrorKind::InvalidInput, 482 "Invalid chunk end LF", 483 ))), 484 } 485 } 486 } 487 488 #[derive(Debug)] 489 struct IncompleteBody; 490 491 impl fmt::Display for IncompleteBody { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result492 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 493 write!(f, "end of file before message length reached") 494 } 495 } 496 497 impl StdError for IncompleteBody {} 498 499 #[cfg(test)] 500 mod tests { 501 use super::*; 502 use std::pin::Pin; 503 use std::time::Duration; 504 use tokio::io::{AsyncRead, ReadBuf}; 505 506 impl<'a> MemRead for &'a [u8] { read_mem(&mut self, _: &mut Context<'_>, len: usize) -> Poll<io::Result<Bytes>>507 fn read_mem(&mut self, _: &mut Context<'_>, len: usize) -> Poll<io::Result<Bytes>> { 508 let n = std::cmp::min(len, self.len()); 509 if n > 0 { 510 let (a, b) = self.split_at(n); 511 let buf = Bytes::copy_from_slice(a); 512 *self = b; 513 Poll::Ready(Ok(buf)) 514 } else { 515 Poll::Ready(Ok(Bytes::new())) 516 } 517 } 518 } 519 520 impl<'a> MemRead for &'a mut (dyn AsyncRead + Unpin) { read_mem(&mut self, cx: &mut Context<'_>, len: usize) -> Poll<io::Result<Bytes>>521 fn read_mem(&mut self, cx: &mut Context<'_>, len: usize) -> Poll<io::Result<Bytes>> { 522 let mut v = vec![0; len]; 523 let mut buf = ReadBuf::new(&mut v); 524 ready!(Pin::new(self).poll_read(cx, &mut buf)?); 525 Poll::Ready(Ok(Bytes::copy_from_slice(&buf.filled()))) 526 } 527 } 528 529 impl MemRead for Bytes { read_mem(&mut self, _: &mut Context<'_>, len: usize) -> Poll<io::Result<Bytes>>530 fn read_mem(&mut self, _: &mut Context<'_>, len: usize) -> Poll<io::Result<Bytes>> { 531 let n = std::cmp::min(len, self.len()); 532 let ret = self.split_to(n); 533 Poll::Ready(Ok(ret)) 534 } 535 } 536 537 /* 538 use std::io; 539 use std::io::Write; 540 use super::Decoder; 541 use super::ChunkedState; 542 use futures::{Async, Poll}; 543 use bytes::{BytesMut, Bytes}; 544 use crate::mock::AsyncIo; 545 */ 546 547 #[tokio::test] test_read_chunk_size()548 async fn test_read_chunk_size() { 549 use std::io::ErrorKind::{InvalidData, InvalidInput, UnexpectedEof}; 550 551 async fn read(s: &str) -> u64 { 552 let mut state = ChunkedState::new(); 553 let rdr = &mut s.as_bytes(); 554 let mut size = 0; 555 let mut ext_cnt = 0; 556 loop { 557 let result = futures_util::future::poll_fn(|cx| { 558 state.step(cx, rdr, &mut size, &mut ext_cnt, &mut None) 559 }) 560 .await; 561 let desc = format!("read_size failed for {:?}", s); 562 state = result.expect(desc.as_str()); 563 if state == ChunkedState::Body || state == ChunkedState::EndCr { 564 break; 565 } 566 } 567 size 568 } 569 570 async fn read_err(s: &str, expected_err: io::ErrorKind) { 571 let mut state = ChunkedState::new(); 572 let rdr = &mut s.as_bytes(); 573 let mut size = 0; 574 let mut ext_cnt = 0; 575 loop { 576 let result = futures_util::future::poll_fn(|cx| { 577 state.step(cx, rdr, &mut size, &mut ext_cnt, &mut None) 578 }) 579 .await; 580 state = match result { 581 Ok(s) => s, 582 Err(e) => { 583 assert!( 584 expected_err == e.kind(), 585 "Reading {:?}, expected {:?}, but got {:?}", 586 s, 587 expected_err, 588 e.kind() 589 ); 590 return; 591 } 592 }; 593 if state == ChunkedState::Body || state == ChunkedState::End { 594 panic!("Was Ok. Expected Err for {:?}", s); 595 } 596 } 597 } 598 599 assert_eq!(1, read("1\r\n").await); 600 assert_eq!(1, read("01\r\n").await); 601 assert_eq!(0, read("0\r\n").await); 602 assert_eq!(0, read("00\r\n").await); 603 assert_eq!(10, read("A\r\n").await); 604 assert_eq!(10, read("a\r\n").await); 605 assert_eq!(255, read("Ff\r\n").await); 606 assert_eq!(255, read("Ff \r\n").await); 607 // Missing LF or CRLF 608 read_err("F\rF", InvalidInput).await; 609 read_err("F", UnexpectedEof).await; 610 // Missing digit 611 read_err("\r\n\r\n", InvalidInput).await; 612 read_err("\r\n", InvalidInput).await; 613 // Invalid hex digit 614 read_err("X\r\n", InvalidInput).await; 615 read_err("1X\r\n", InvalidInput).await; 616 read_err("-\r\n", InvalidInput).await; 617 read_err("-1\r\n", InvalidInput).await; 618 // Acceptable (if not fully valid) extensions do not influence the size 619 assert_eq!(1, read("1;extension\r\n").await); 620 assert_eq!(10, read("a;ext name=value\r\n").await); 621 assert_eq!(1, read("1;extension;extension2\r\n").await); 622 assert_eq!(1, read("1;;; ;\r\n").await); 623 assert_eq!(2, read("2; extension...\r\n").await); 624 assert_eq!(3, read("3 ; extension=123\r\n").await); 625 assert_eq!(3, read("3 ;\r\n").await); 626 assert_eq!(3, read("3 ; \r\n").await); 627 // Invalid extensions cause an error 628 read_err("1 invalid extension\r\n", InvalidInput).await; 629 read_err("1 A\r\n", InvalidInput).await; 630 read_err("1;no CRLF", UnexpectedEof).await; 631 read_err("1;reject\nnewlines\r\n", InvalidData).await; 632 // Overflow 633 read_err("f0000000000000003\r\n", InvalidData).await; 634 } 635 636 #[tokio::test] test_read_sized_early_eof()637 async fn test_read_sized_early_eof() { 638 let mut bytes = &b"foo bar"[..]; 639 let mut decoder = Decoder::length(10); 640 assert_eq!(decoder.decode_fut(&mut bytes).await.unwrap().len(), 7); 641 let e = decoder.decode_fut(&mut bytes).await.unwrap_err(); 642 assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof); 643 } 644 645 #[tokio::test] test_read_chunked_early_eof()646 async fn test_read_chunked_early_eof() { 647 let mut bytes = &b"\ 648 9\r\n\ 649 foo bar\ 650 "[..]; 651 let mut decoder = Decoder::chunked(); 652 assert_eq!(decoder.decode_fut(&mut bytes).await.unwrap().len(), 7); 653 let e = decoder.decode_fut(&mut bytes).await.unwrap_err(); 654 assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof); 655 } 656 657 #[tokio::test] test_read_chunked_single_read()658 async fn test_read_chunked_single_read() { 659 let mut mock_buf = &b"10\r\n1234567890abcdef\r\n0\r\n"[..]; 660 let buf = Decoder::chunked() 661 .decode_fut(&mut mock_buf) 662 .await 663 .expect("decode"); 664 assert_eq!(16, buf.len()); 665 let result = String::from_utf8(buf.as_ref().to_vec()).expect("decode String"); 666 assert_eq!("1234567890abcdef", &result); 667 } 668 669 #[tokio::test] test_read_chunked_extensions_over_limit()670 async fn test_read_chunked_extensions_over_limit() { 671 // construct a chunked body where each individual chunked extension 672 // is totally fine, but combined is over the limit. 673 let per_chunk = super::CHUNKED_EXTENSIONS_LIMIT * 2 / 3; 674 let mut scratch = vec![]; 675 for _ in 0..2 { 676 scratch.extend(b"1;"); 677 scratch.extend(b"x".repeat(per_chunk as usize)); 678 scratch.extend(b"\r\nA\r\n"); 679 } 680 scratch.extend(b"0\r\n\r\n"); 681 let mut mock_buf = Bytes::from(scratch); 682 683 let mut decoder = Decoder::chunked(); 684 let buf1 = decoder.decode_fut(&mut mock_buf).await.expect("decode1"); 685 assert_eq!(&buf1[..], b"A"); 686 687 let err = decoder 688 .decode_fut(&mut mock_buf) 689 .await 690 .expect_err("decode2"); 691 assert_eq!(err.kind(), io::ErrorKind::InvalidData); 692 assert_eq!(err.to_string(), "chunk extensions over limit"); 693 } 694 695 #[cfg(not(miri))] 696 #[tokio::test] test_read_chunked_trailer_with_missing_lf()697 async fn test_read_chunked_trailer_with_missing_lf() { 698 let mut mock_buf = &b"10\r\n1234567890abcdef\r\n0\r\nbad\r\r\n"[..]; 699 let mut decoder = Decoder::chunked(); 700 decoder.decode_fut(&mut mock_buf).await.expect("decode"); 701 let e = decoder.decode_fut(&mut mock_buf).await.unwrap_err(); 702 assert_eq!(e.kind(), io::ErrorKind::InvalidInput); 703 } 704 705 #[tokio::test] test_read_chunked_after_eof()706 async fn test_read_chunked_after_eof() { 707 let mut mock_buf = &b"10\r\n1234567890abcdef\r\n0\r\n\r\n"[..]; 708 let mut decoder = Decoder::chunked(); 709 710 // normal read 711 let buf = decoder.decode_fut(&mut mock_buf).await.unwrap(); 712 assert_eq!(16, buf.len()); 713 let result = String::from_utf8(buf.as_ref().to_vec()).expect("decode String"); 714 assert_eq!("1234567890abcdef", &result); 715 716 // eof read 717 let buf = decoder.decode_fut(&mut mock_buf).await.expect("decode"); 718 assert_eq!(0, buf.len()); 719 720 // ensure read after eof also returns eof 721 let buf = decoder.decode_fut(&mut mock_buf).await.expect("decode"); 722 assert_eq!(0, buf.len()); 723 } 724 725 // perform an async read using a custom buffer size and causing a blocking 726 // read at the specified byte read_async(mut decoder: Decoder, content: &[u8], block_at: usize) -> String727 async fn read_async(mut decoder: Decoder, content: &[u8], block_at: usize) -> String { 728 let mut outs = Vec::new(); 729 730 let mut ins = if block_at == 0 { 731 tokio_test::io::Builder::new() 732 .wait(Duration::from_millis(10)) 733 .read(content) 734 .build() 735 } else { 736 tokio_test::io::Builder::new() 737 .read(&content[..block_at]) 738 .wait(Duration::from_millis(10)) 739 .read(&content[block_at..]) 740 .build() 741 }; 742 743 let mut ins = &mut ins as &mut (dyn AsyncRead + Unpin); 744 745 loop { 746 let buf = decoder 747 .decode_fut(&mut ins) 748 .await 749 .expect("unexpected decode error"); 750 if buf.is_empty() { 751 break; // eof 752 } 753 outs.extend(buf.as_ref()); 754 } 755 756 String::from_utf8(outs).expect("decode String") 757 } 758 759 // iterate over the different ways that this async read could go. 760 // tests blocking a read at each byte along the content - The shotgun approach all_async_cases(content: &str, expected: &str, decoder: Decoder)761 async fn all_async_cases(content: &str, expected: &str, decoder: Decoder) { 762 let content_len = content.len(); 763 for block_at in 0..content_len { 764 let actual = read_async(decoder.clone(), content.as_bytes(), block_at).await; 765 assert_eq!(expected, &actual) //, "Failed async. Blocking at {}", block_at); 766 } 767 } 768 769 #[tokio::test] test_read_length_async()770 async fn test_read_length_async() { 771 let content = "foobar"; 772 all_async_cases(content, content, Decoder::length(content.len() as u64)).await; 773 } 774 775 #[tokio::test] test_read_chunked_async()776 async fn test_read_chunked_async() { 777 let content = "3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n"; 778 let expected = "foobar"; 779 all_async_cases(content, expected, Decoder::chunked()).await; 780 } 781 782 #[tokio::test] test_read_eof_async()783 async fn test_read_eof_async() { 784 let content = "foobar"; 785 all_async_cases(content, content, Decoder::eof()).await; 786 } 787 788 #[cfg(feature = "nightly")] 789 #[bench] bench_decode_chunked_1kb(b: &mut test::Bencher)790 fn bench_decode_chunked_1kb(b: &mut test::Bencher) { 791 let rt = new_runtime(); 792 793 const LEN: usize = 1024; 794 let mut vec = Vec::new(); 795 vec.extend(format!("{:x}\r\n", LEN).as_bytes()); 796 vec.extend(&[0; LEN][..]); 797 vec.extend(b"\r\n"); 798 let content = Bytes::from(vec); 799 800 b.bytes = LEN as u64; 801 802 b.iter(|| { 803 let mut decoder = Decoder::chunked(); 804 rt.block_on(async { 805 let mut raw = content.clone(); 806 let chunk = decoder.decode_fut(&mut raw).await.unwrap(); 807 assert_eq!(chunk.len(), LEN); 808 }); 809 }); 810 } 811 812 #[cfg(feature = "nightly")] 813 #[bench] bench_decode_length_1kb(b: &mut test::Bencher)814 fn bench_decode_length_1kb(b: &mut test::Bencher) { 815 let rt = new_runtime(); 816 817 const LEN: usize = 1024; 818 let content = Bytes::from(&[0; LEN][..]); 819 b.bytes = LEN as u64; 820 821 b.iter(|| { 822 let mut decoder = Decoder::length(LEN as u64); 823 rt.block_on(async { 824 let mut raw = content.clone(); 825 let chunk = decoder.decode_fut(&mut raw).await.unwrap(); 826 assert_eq!(chunk.len(), LEN); 827 }); 828 }); 829 } 830 831 #[cfg(feature = "nightly")] new_runtime() -> tokio::runtime::Runtime832 fn new_runtime() -> tokio::runtime::Runtime { 833 tokio::runtime::Builder::new_current_thread() 834 .enable_all() 835 .build() 836 .expect("rt build") 837 } 838 } 839