1 use std::error::Error as StdError;
2 use std::fmt;
3 use std::io;
4 use std::task::{Context, Poll};
5 use std::usize;
6 
7 use bytes::Bytes;
8 use tracing::{debug, trace};
9 
10 use super::io::MemRead;
11 use super::DecodedLength;
12 
13 use self::Kind::{Chunked, Eof, Length};
14 
15 /// Maximum amount of bytes allowed in chunked extensions.
16 ///
17 /// This limit is currentlty applied for the entire body, not per chunk.
18 const CHUNKED_EXTENSIONS_LIMIT: u64 = 1024 * 16;
19 
20 /// Decoders to handle different Transfer-Encodings.
21 ///
22 /// If a message body does not include a Transfer-Encoding, it *should*
23 /// include a Content-Length header.
24 #[derive(Clone, PartialEq)]
25 pub(crate) struct Decoder {
26     kind: Kind,
27 }
28 
29 #[derive(Debug, Clone, Copy, PartialEq)]
30 enum Kind {
31     /// A Reader used when a Content-Length header is passed with a positive integer.
32     Length(u64),
33     /// A Reader used when Transfer-Encoding is `chunked`.
34     Chunked {
35         state: ChunkedState,
36         chunk_len: u64,
37         extensions_cnt: u64,
38     },
39     /// A Reader used for responses that don't indicate a length or chunked.
40     ///
41     /// The bool tracks when EOF is seen on the transport.
42     ///
43     /// Note: This should only used for `Response`s. It is illegal for a
44     /// `Request` to be made with both `Content-Length` and
45     /// `Transfer-Encoding: chunked` missing, as explained from the spec:
46     ///
47     /// > If a Transfer-Encoding header field is present in a response and
48     /// > the chunked transfer coding is not the final encoding, the
49     /// > message body length is determined by reading the connection until
50     /// > it is closed by the server.  If a Transfer-Encoding header field
51     /// > is present in a request and the chunked transfer coding is not
52     /// > the final encoding, the message body length cannot be determined
53     /// > reliably; the server MUST respond with the 400 (Bad Request)
54     /// > status code and then close the connection.
55     Eof(bool),
56 }
57 
58 #[derive(Debug, PartialEq, Clone, Copy)]
59 enum ChunkedState {
60     Start,
61     Size,
62     SizeLws,
63     Extension,
64     SizeLf,
65     Body,
66     BodyCr,
67     BodyLf,
68     Trailer,
69     TrailerLf,
70     EndCr,
71     EndLf,
72     End,
73 }
74 
75 impl Decoder {
76     // constructors
77 
length(x: u64) -> Decoder78     pub(crate) fn length(x: u64) -> Decoder {
79         Decoder {
80             kind: Kind::Length(x),
81         }
82     }
83 
chunked() -> Decoder84     pub(crate) fn chunked() -> Decoder {
85         Decoder {
86             kind: Kind::Chunked {
87                 state: ChunkedState::new(),
88                 chunk_len: 0,
89                 extensions_cnt: 0,
90             },
91         }
92     }
93 
eof() -> Decoder94     pub(crate) fn eof() -> Decoder {
95         Decoder {
96             kind: Kind::Eof(false),
97         }
98     }
99 
new(len: DecodedLength) -> Self100     pub(super) fn new(len: DecodedLength) -> Self {
101         match len {
102             DecodedLength::CHUNKED => Decoder::chunked(),
103             DecodedLength::CLOSE_DELIMITED => Decoder::eof(),
104             length => Decoder::length(length.danger_len()),
105         }
106     }
107 
108     // methods
109 
is_eof(&self) -> bool110     pub(crate) fn is_eof(&self) -> bool {
111         matches!(
112             self.kind,
113             Length(0)
114                 | Chunked {
115                     state: ChunkedState::End,
116                     ..
117                 }
118                 | Eof(true)
119         )
120     }
121 
decode<R: MemRead>( &mut self, cx: &mut Context<'_>, body: &mut R, ) -> Poll<Result<Bytes, io::Error>>122     pub(crate) fn decode<R: MemRead>(
123         &mut self,
124         cx: &mut Context<'_>,
125         body: &mut R,
126     ) -> Poll<Result<Bytes, io::Error>> {
127         trace!("decode; state={:?}", self.kind);
128         match self.kind {
129             Length(ref mut remaining) => {
130                 if *remaining == 0 {
131                     Poll::Ready(Ok(Bytes::new()))
132                 } else {
133                     let to_read = *remaining as usize;
134                     let buf = ready!(body.read_mem(cx, to_read))?;
135                     let num = buf.as_ref().len() as u64;
136                     if num > *remaining {
137                         *remaining = 0;
138                     } else if num == 0 {
139                         return Poll::Ready(Err(io::Error::new(
140                             io::ErrorKind::UnexpectedEof,
141                             IncompleteBody,
142                         )));
143                     } else {
144                         *remaining -= num;
145                     }
146                     Poll::Ready(Ok(buf))
147                 }
148             }
149             Chunked {
150                 ref mut state,
151                 ref mut chunk_len,
152                 ref mut extensions_cnt,
153             } => {
154                 loop {
155                     let mut buf = None;
156                     // advances the chunked state
157                     *state = ready!(state.step(cx, body, chunk_len, extensions_cnt, &mut buf))?;
158                     if *state == ChunkedState::End {
159                         trace!("end of chunked");
160                         return Poll::Ready(Ok(Bytes::new()));
161                     }
162                     if let Some(buf) = buf {
163                         return Poll::Ready(Ok(buf));
164                     }
165                 }
166             }
167             Eof(ref mut is_eof) => {
168                 if *is_eof {
169                     Poll::Ready(Ok(Bytes::new()))
170                 } else {
171                     // 8192 chosen because its about 2 packets, there probably
172                     // won't be that much available, so don't have MemReaders
173                     // allocate buffers to big
174                     body.read_mem(cx, 8192).map_ok(|slice| {
175                         *is_eof = slice.is_empty();
176                         slice
177                     })
178                 }
179             }
180         }
181     }
182 
183     #[cfg(test)]
decode_fut<R: MemRead>(&mut self, body: &mut R) -> Result<Bytes, io::Error>184     async fn decode_fut<R: MemRead>(&mut self, body: &mut R) -> Result<Bytes, io::Error> {
185         futures_util::future::poll_fn(move |cx| self.decode(cx, body)).await
186     }
187 }
188 
189 impl fmt::Debug for Decoder {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result190     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
191         fmt::Debug::fmt(&self.kind, f)
192     }
193 }
194 
195 macro_rules! byte (
196     ($rdr:ident, $cx:expr) => ({
197         let buf = ready!($rdr.read_mem($cx, 1))?;
198         if !buf.is_empty() {
199             buf[0]
200         } else {
201             return Poll::Ready(Err(io::Error::new(io::ErrorKind::UnexpectedEof,
202                                       "unexpected EOF during chunk size line")));
203         }
204     })
205 );
206 
207 macro_rules! or_overflow {
208     ($e:expr) => (
209         match $e {
210             Some(val) => val,
211             None => return Poll::Ready(Err(io::Error::new(
212                 io::ErrorKind::InvalidData,
213                 "invalid chunk size: overflow",
214             ))),
215         }
216     )
217 }
218 
219 impl ChunkedState {
new() -> ChunkedState220     fn new() -> ChunkedState {
221         ChunkedState::Start
222     }
step<R: MemRead>( &self, cx: &mut Context<'_>, body: &mut R, size: &mut u64, extensions_cnt: &mut u64, buf: &mut Option<Bytes>, ) -> Poll<Result<ChunkedState, io::Error>>223     fn step<R: MemRead>(
224         &self,
225         cx: &mut Context<'_>,
226         body: &mut R,
227         size: &mut u64,
228         extensions_cnt: &mut u64,
229         buf: &mut Option<Bytes>,
230     ) -> Poll<Result<ChunkedState, io::Error>> {
231         use self::ChunkedState::*;
232         match *self {
233             Start => ChunkedState::read_start(cx, body, size),
234             Size => ChunkedState::read_size(cx, body, size),
235             SizeLws => ChunkedState::read_size_lws(cx, body),
236             Extension => ChunkedState::read_extension(cx, body, extensions_cnt),
237             SizeLf => ChunkedState::read_size_lf(cx, body, *size),
238             Body => ChunkedState::read_body(cx, body, size, buf),
239             BodyCr => ChunkedState::read_body_cr(cx, body),
240             BodyLf => ChunkedState::read_body_lf(cx, body),
241             Trailer => ChunkedState::read_trailer(cx, body),
242             TrailerLf => ChunkedState::read_trailer_lf(cx, body),
243             EndCr => ChunkedState::read_end_cr(cx, body),
244             EndLf => ChunkedState::read_end_lf(cx, body),
245             End => Poll::Ready(Ok(ChunkedState::End)),
246         }
247     }
248 
read_start<R: MemRead>( cx: &mut Context<'_>, rdr: &mut R, size: &mut u64, ) -> Poll<Result<ChunkedState, io::Error>>249     fn read_start<R: MemRead>(
250         cx: &mut Context<'_>,
251         rdr: &mut R,
252         size: &mut u64,
253     ) -> Poll<Result<ChunkedState, io::Error>> {
254         trace!("Read chunk start");
255 
256         let radix = 16;
257         match byte!(rdr, cx) {
258             b @ b'0'..=b'9' => {
259                 *size = or_overflow!(size.checked_mul(radix));
260                 *size = or_overflow!(size.checked_add((b - b'0') as u64));
261             }
262             b @ b'a'..=b'f' => {
263                 *size = or_overflow!(size.checked_mul(radix));
264                 *size = or_overflow!(size.checked_add((b + 10 - b'a') as u64));
265             }
266             b @ b'A'..=b'F' => {
267                 *size = or_overflow!(size.checked_mul(radix));
268                 *size = or_overflow!(size.checked_add((b + 10 - b'A') as u64));
269             }
270             _ => {
271                 return Poll::Ready(Err(io::Error::new(
272                     io::ErrorKind::InvalidInput,
273                     "Invalid chunk size line: missing size digit",
274                 )));
275             }
276         }
277 
278         Poll::Ready(Ok(ChunkedState::Size))
279     }
280 
read_size<R: MemRead>( cx: &mut Context<'_>, rdr: &mut R, size: &mut u64, ) -> Poll<Result<ChunkedState, io::Error>>281     fn read_size<R: MemRead>(
282         cx: &mut Context<'_>,
283         rdr: &mut R,
284         size: &mut u64,
285     ) -> Poll<Result<ChunkedState, io::Error>> {
286         trace!("Read chunk hex size");
287 
288         let radix = 16;
289         match byte!(rdr, cx) {
290             b @ b'0'..=b'9' => {
291                 *size = or_overflow!(size.checked_mul(radix));
292                 *size = or_overflow!(size.checked_add((b - b'0') as u64));
293             }
294             b @ b'a'..=b'f' => {
295                 *size = or_overflow!(size.checked_mul(radix));
296                 *size = or_overflow!(size.checked_add((b + 10 - b'a') as u64));
297             }
298             b @ b'A'..=b'F' => {
299                 *size = or_overflow!(size.checked_mul(radix));
300                 *size = or_overflow!(size.checked_add((b + 10 - b'A') as u64));
301             }
302             b'\t' | b' ' => return Poll::Ready(Ok(ChunkedState::SizeLws)),
303             b';' => return Poll::Ready(Ok(ChunkedState::Extension)),
304             b'\r' => return Poll::Ready(Ok(ChunkedState::SizeLf)),
305             _ => {
306                 return Poll::Ready(Err(io::Error::new(
307                     io::ErrorKind::InvalidInput,
308                     "Invalid chunk size line: Invalid Size",
309                 )));
310             }
311         }
312         Poll::Ready(Ok(ChunkedState::Size))
313     }
read_size_lws<R: MemRead>( cx: &mut Context<'_>, rdr: &mut R, ) -> Poll<Result<ChunkedState, io::Error>>314     fn read_size_lws<R: MemRead>(
315         cx: &mut Context<'_>,
316         rdr: &mut R,
317     ) -> Poll<Result<ChunkedState, io::Error>> {
318         trace!("read_size_lws");
319         match byte!(rdr, cx) {
320             // LWS can follow the chunk size, but no more digits can come
321             b'\t' | b' ' => Poll::Ready(Ok(ChunkedState::SizeLws)),
322             b';' => Poll::Ready(Ok(ChunkedState::Extension)),
323             b'\r' => Poll::Ready(Ok(ChunkedState::SizeLf)),
324             _ => Poll::Ready(Err(io::Error::new(
325                 io::ErrorKind::InvalidInput,
326                 "Invalid chunk size linear white space",
327             ))),
328         }
329     }
read_extension<R: MemRead>( cx: &mut Context<'_>, rdr: &mut R, extensions_cnt: &mut u64, ) -> Poll<Result<ChunkedState, io::Error>>330     fn read_extension<R: MemRead>(
331         cx: &mut Context<'_>,
332         rdr: &mut R,
333         extensions_cnt: &mut u64,
334     ) -> Poll<Result<ChunkedState, io::Error>> {
335         trace!("read_extension");
336         // We don't care about extensions really at all. Just ignore them.
337         // They "end" at the next CRLF.
338         //
339         // However, some implementations may not check for the CR, so to save
340         // them from themselves, we reject extensions containing plain LF as
341         // well.
342         match byte!(rdr, cx) {
343             b'\r' => Poll::Ready(Ok(ChunkedState::SizeLf)),
344             b'\n' => Poll::Ready(Err(io::Error::new(
345                 io::ErrorKind::InvalidData,
346                 "invalid chunk extension contains newline",
347             ))),
348             _ => {
349                 *extensions_cnt += 1;
350                 if *extensions_cnt >= CHUNKED_EXTENSIONS_LIMIT {
351                     Poll::Ready(Err(io::Error::new(
352                         io::ErrorKind::InvalidData,
353                         "chunk extensions over limit",
354                     )))
355                 } else {
356                     Poll::Ready(Ok(ChunkedState::Extension))
357                 }
358             } // no supported extensions
359         }
360     }
read_size_lf<R: MemRead>( cx: &mut Context<'_>, rdr: &mut R, size: u64, ) -> Poll<Result<ChunkedState, io::Error>>361     fn read_size_lf<R: MemRead>(
362         cx: &mut Context<'_>,
363         rdr: &mut R,
364         size: u64,
365     ) -> Poll<Result<ChunkedState, io::Error>> {
366         trace!("Chunk size is {:?}", size);
367         match byte!(rdr, cx) {
368             b'\n' => {
369                 if size == 0 {
370                     Poll::Ready(Ok(ChunkedState::EndCr))
371                 } else {
372                     debug!("incoming chunked header: {0:#X} ({0} bytes)", size);
373                     Poll::Ready(Ok(ChunkedState::Body))
374                 }
375             }
376             _ => Poll::Ready(Err(io::Error::new(
377                 io::ErrorKind::InvalidInput,
378                 "Invalid chunk size LF",
379             ))),
380         }
381     }
382 
read_body<R: MemRead>( cx: &mut Context<'_>, rdr: &mut R, rem: &mut u64, buf: &mut Option<Bytes>, ) -> Poll<Result<ChunkedState, io::Error>>383     fn read_body<R: MemRead>(
384         cx: &mut Context<'_>,
385         rdr: &mut R,
386         rem: &mut u64,
387         buf: &mut Option<Bytes>,
388     ) -> Poll<Result<ChunkedState, io::Error>> {
389         trace!("Chunked read, remaining={:?}", rem);
390 
391         // cap remaining bytes at the max capacity of usize
392         let rem_cap = match *rem {
393             r if r > usize::MAX as u64 => usize::MAX,
394             r => r as usize,
395         };
396 
397         let to_read = rem_cap;
398         let slice = ready!(rdr.read_mem(cx, to_read))?;
399         let count = slice.len();
400 
401         if count == 0 {
402             *rem = 0;
403             return Poll::Ready(Err(io::Error::new(
404                 io::ErrorKind::UnexpectedEof,
405                 IncompleteBody,
406             )));
407         }
408         *buf = Some(slice);
409         *rem -= count as u64;
410 
411         if *rem > 0 {
412             Poll::Ready(Ok(ChunkedState::Body))
413         } else {
414             Poll::Ready(Ok(ChunkedState::BodyCr))
415         }
416     }
read_body_cr<R: MemRead>( cx: &mut Context<'_>, rdr: &mut R, ) -> Poll<Result<ChunkedState, io::Error>>417     fn read_body_cr<R: MemRead>(
418         cx: &mut Context<'_>,
419         rdr: &mut R,
420     ) -> Poll<Result<ChunkedState, io::Error>> {
421         match byte!(rdr, cx) {
422             b'\r' => Poll::Ready(Ok(ChunkedState::BodyLf)),
423             _ => Poll::Ready(Err(io::Error::new(
424                 io::ErrorKind::InvalidInput,
425                 "Invalid chunk body CR",
426             ))),
427         }
428     }
read_body_lf<R: MemRead>( cx: &mut Context<'_>, rdr: &mut R, ) -> Poll<Result<ChunkedState, io::Error>>429     fn read_body_lf<R: MemRead>(
430         cx: &mut Context<'_>,
431         rdr: &mut R,
432     ) -> Poll<Result<ChunkedState, io::Error>> {
433         match byte!(rdr, cx) {
434             b'\n' => Poll::Ready(Ok(ChunkedState::Size)),
435             _ => Poll::Ready(Err(io::Error::new(
436                 io::ErrorKind::InvalidInput,
437                 "Invalid chunk body LF",
438             ))),
439         }
440     }
441 
read_trailer<R: MemRead>( cx: &mut Context<'_>, rdr: &mut R, ) -> Poll<Result<ChunkedState, io::Error>>442     fn read_trailer<R: MemRead>(
443         cx: &mut Context<'_>,
444         rdr: &mut R,
445     ) -> Poll<Result<ChunkedState, io::Error>> {
446         trace!("read_trailer");
447         match byte!(rdr, cx) {
448             b'\r' => Poll::Ready(Ok(ChunkedState::TrailerLf)),
449             _ => Poll::Ready(Ok(ChunkedState::Trailer)),
450         }
451     }
read_trailer_lf<R: MemRead>( cx: &mut Context<'_>, rdr: &mut R, ) -> Poll<Result<ChunkedState, io::Error>>452     fn read_trailer_lf<R: MemRead>(
453         cx: &mut Context<'_>,
454         rdr: &mut R,
455     ) -> Poll<Result<ChunkedState, io::Error>> {
456         match byte!(rdr, cx) {
457             b'\n' => Poll::Ready(Ok(ChunkedState::EndCr)),
458             _ => Poll::Ready(Err(io::Error::new(
459                 io::ErrorKind::InvalidInput,
460                 "Invalid trailer end LF",
461             ))),
462         }
463     }
464 
read_end_cr<R: MemRead>( cx: &mut Context<'_>, rdr: &mut R, ) -> Poll<Result<ChunkedState, io::Error>>465     fn read_end_cr<R: MemRead>(
466         cx: &mut Context<'_>,
467         rdr: &mut R,
468     ) -> Poll<Result<ChunkedState, io::Error>> {
469         match byte!(rdr, cx) {
470             b'\r' => Poll::Ready(Ok(ChunkedState::EndLf)),
471             _ => Poll::Ready(Ok(ChunkedState::Trailer)),
472         }
473     }
read_end_lf<R: MemRead>( cx: &mut Context<'_>, rdr: &mut R, ) -> Poll<Result<ChunkedState, io::Error>>474     fn read_end_lf<R: MemRead>(
475         cx: &mut Context<'_>,
476         rdr: &mut R,
477     ) -> Poll<Result<ChunkedState, io::Error>> {
478         match byte!(rdr, cx) {
479             b'\n' => Poll::Ready(Ok(ChunkedState::End)),
480             _ => Poll::Ready(Err(io::Error::new(
481                 io::ErrorKind::InvalidInput,
482                 "Invalid chunk end LF",
483             ))),
484         }
485     }
486 }
487 
488 #[derive(Debug)]
489 struct IncompleteBody;
490 
491 impl fmt::Display for IncompleteBody {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result492     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
493         write!(f, "end of file before message length reached")
494     }
495 }
496 
497 impl StdError for IncompleteBody {}
498 
499 #[cfg(test)]
500 mod tests {
501     use super::*;
502     use std::pin::Pin;
503     use std::time::Duration;
504     use tokio::io::{AsyncRead, ReadBuf};
505 
506     impl<'a> MemRead for &'a [u8] {
read_mem(&mut self, _: &mut Context<'_>, len: usize) -> Poll<io::Result<Bytes>>507         fn read_mem(&mut self, _: &mut Context<'_>, len: usize) -> Poll<io::Result<Bytes>> {
508             let n = std::cmp::min(len, self.len());
509             if n > 0 {
510                 let (a, b) = self.split_at(n);
511                 let buf = Bytes::copy_from_slice(a);
512                 *self = b;
513                 Poll::Ready(Ok(buf))
514             } else {
515                 Poll::Ready(Ok(Bytes::new()))
516             }
517         }
518     }
519 
520     impl<'a> MemRead for &'a mut (dyn AsyncRead + Unpin) {
read_mem(&mut self, cx: &mut Context<'_>, len: usize) -> Poll<io::Result<Bytes>>521         fn read_mem(&mut self, cx: &mut Context<'_>, len: usize) -> Poll<io::Result<Bytes>> {
522             let mut v = vec![0; len];
523             let mut buf = ReadBuf::new(&mut v);
524             ready!(Pin::new(self).poll_read(cx, &mut buf)?);
525             Poll::Ready(Ok(Bytes::copy_from_slice(&buf.filled())))
526         }
527     }
528 
529     impl MemRead for Bytes {
read_mem(&mut self, _: &mut Context<'_>, len: usize) -> Poll<io::Result<Bytes>>530         fn read_mem(&mut self, _: &mut Context<'_>, len: usize) -> Poll<io::Result<Bytes>> {
531             let n = std::cmp::min(len, self.len());
532             let ret = self.split_to(n);
533             Poll::Ready(Ok(ret))
534         }
535     }
536 
537     /*
538     use std::io;
539     use std::io::Write;
540     use super::Decoder;
541     use super::ChunkedState;
542     use futures::{Async, Poll};
543     use bytes::{BytesMut, Bytes};
544     use crate::mock::AsyncIo;
545     */
546 
547     #[tokio::test]
test_read_chunk_size()548     async fn test_read_chunk_size() {
549         use std::io::ErrorKind::{InvalidData, InvalidInput, UnexpectedEof};
550 
551         async fn read(s: &str) -> u64 {
552             let mut state = ChunkedState::new();
553             let rdr = &mut s.as_bytes();
554             let mut size = 0;
555             let mut ext_cnt = 0;
556             loop {
557                 let result = futures_util::future::poll_fn(|cx| {
558                     state.step(cx, rdr, &mut size, &mut ext_cnt, &mut None)
559                 })
560                 .await;
561                 let desc = format!("read_size failed for {:?}", s);
562                 state = result.expect(desc.as_str());
563                 if state == ChunkedState::Body || state == ChunkedState::EndCr {
564                     break;
565                 }
566             }
567             size
568         }
569 
570         async fn read_err(s: &str, expected_err: io::ErrorKind) {
571             let mut state = ChunkedState::new();
572             let rdr = &mut s.as_bytes();
573             let mut size = 0;
574             let mut ext_cnt = 0;
575             loop {
576                 let result = futures_util::future::poll_fn(|cx| {
577                     state.step(cx, rdr, &mut size, &mut ext_cnt, &mut None)
578                 })
579                 .await;
580                 state = match result {
581                     Ok(s) => s,
582                     Err(e) => {
583                         assert!(
584                             expected_err == e.kind(),
585                             "Reading {:?}, expected {:?}, but got {:?}",
586                             s,
587                             expected_err,
588                             e.kind()
589                         );
590                         return;
591                     }
592                 };
593                 if state == ChunkedState::Body || state == ChunkedState::End {
594                     panic!("Was Ok. Expected Err for {:?}", s);
595                 }
596             }
597         }
598 
599         assert_eq!(1, read("1\r\n").await);
600         assert_eq!(1, read("01\r\n").await);
601         assert_eq!(0, read("0\r\n").await);
602         assert_eq!(0, read("00\r\n").await);
603         assert_eq!(10, read("A\r\n").await);
604         assert_eq!(10, read("a\r\n").await);
605         assert_eq!(255, read("Ff\r\n").await);
606         assert_eq!(255, read("Ff   \r\n").await);
607         // Missing LF or CRLF
608         read_err("F\rF", InvalidInput).await;
609         read_err("F", UnexpectedEof).await;
610         // Missing digit
611         read_err("\r\n\r\n", InvalidInput).await;
612         read_err("\r\n", InvalidInput).await;
613         // Invalid hex digit
614         read_err("X\r\n", InvalidInput).await;
615         read_err("1X\r\n", InvalidInput).await;
616         read_err("-\r\n", InvalidInput).await;
617         read_err("-1\r\n", InvalidInput).await;
618         // Acceptable (if not fully valid) extensions do not influence the size
619         assert_eq!(1, read("1;extension\r\n").await);
620         assert_eq!(10, read("a;ext name=value\r\n").await);
621         assert_eq!(1, read("1;extension;extension2\r\n").await);
622         assert_eq!(1, read("1;;;  ;\r\n").await);
623         assert_eq!(2, read("2; extension...\r\n").await);
624         assert_eq!(3, read("3   ; extension=123\r\n").await);
625         assert_eq!(3, read("3   ;\r\n").await);
626         assert_eq!(3, read("3   ;   \r\n").await);
627         // Invalid extensions cause an error
628         read_err("1 invalid extension\r\n", InvalidInput).await;
629         read_err("1 A\r\n", InvalidInput).await;
630         read_err("1;no CRLF", UnexpectedEof).await;
631         read_err("1;reject\nnewlines\r\n", InvalidData).await;
632         // Overflow
633         read_err("f0000000000000003\r\n", InvalidData).await;
634     }
635 
636     #[tokio::test]
test_read_sized_early_eof()637     async fn test_read_sized_early_eof() {
638         let mut bytes = &b"foo bar"[..];
639         let mut decoder = Decoder::length(10);
640         assert_eq!(decoder.decode_fut(&mut bytes).await.unwrap().len(), 7);
641         let e = decoder.decode_fut(&mut bytes).await.unwrap_err();
642         assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof);
643     }
644 
645     #[tokio::test]
test_read_chunked_early_eof()646     async fn test_read_chunked_early_eof() {
647         let mut bytes = &b"\
648             9\r\n\
649             foo bar\
650         "[..];
651         let mut decoder = Decoder::chunked();
652         assert_eq!(decoder.decode_fut(&mut bytes).await.unwrap().len(), 7);
653         let e = decoder.decode_fut(&mut bytes).await.unwrap_err();
654         assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof);
655     }
656 
657     #[tokio::test]
test_read_chunked_single_read()658     async fn test_read_chunked_single_read() {
659         let mut mock_buf = &b"10\r\n1234567890abcdef\r\n0\r\n"[..];
660         let buf = Decoder::chunked()
661             .decode_fut(&mut mock_buf)
662             .await
663             .expect("decode");
664         assert_eq!(16, buf.len());
665         let result = String::from_utf8(buf.as_ref().to_vec()).expect("decode String");
666         assert_eq!("1234567890abcdef", &result);
667     }
668 
669     #[tokio::test]
test_read_chunked_extensions_over_limit()670     async fn test_read_chunked_extensions_over_limit() {
671         // construct a chunked body where each individual chunked extension
672         // is totally fine, but combined is over the limit.
673         let per_chunk = super::CHUNKED_EXTENSIONS_LIMIT * 2 / 3;
674         let mut scratch = vec![];
675         for _ in 0..2 {
676             scratch.extend(b"1;");
677             scratch.extend(b"x".repeat(per_chunk as usize));
678             scratch.extend(b"\r\nA\r\n");
679         }
680         scratch.extend(b"0\r\n\r\n");
681         let mut mock_buf = Bytes::from(scratch);
682 
683         let mut decoder = Decoder::chunked();
684         let buf1 = decoder.decode_fut(&mut mock_buf).await.expect("decode1");
685         assert_eq!(&buf1[..], b"A");
686 
687         let err = decoder
688             .decode_fut(&mut mock_buf)
689             .await
690             .expect_err("decode2");
691         assert_eq!(err.kind(), io::ErrorKind::InvalidData);
692         assert_eq!(err.to_string(), "chunk extensions over limit");
693     }
694 
695     #[cfg(not(miri))]
696     #[tokio::test]
test_read_chunked_trailer_with_missing_lf()697     async fn test_read_chunked_trailer_with_missing_lf() {
698         let mut mock_buf = &b"10\r\n1234567890abcdef\r\n0\r\nbad\r\r\n"[..];
699         let mut decoder = Decoder::chunked();
700         decoder.decode_fut(&mut mock_buf).await.expect("decode");
701         let e = decoder.decode_fut(&mut mock_buf).await.unwrap_err();
702         assert_eq!(e.kind(), io::ErrorKind::InvalidInput);
703     }
704 
705     #[tokio::test]
test_read_chunked_after_eof()706     async fn test_read_chunked_after_eof() {
707         let mut mock_buf = &b"10\r\n1234567890abcdef\r\n0\r\n\r\n"[..];
708         let mut decoder = Decoder::chunked();
709 
710         // normal read
711         let buf = decoder.decode_fut(&mut mock_buf).await.unwrap();
712         assert_eq!(16, buf.len());
713         let result = String::from_utf8(buf.as_ref().to_vec()).expect("decode String");
714         assert_eq!("1234567890abcdef", &result);
715 
716         // eof read
717         let buf = decoder.decode_fut(&mut mock_buf).await.expect("decode");
718         assert_eq!(0, buf.len());
719 
720         // ensure read after eof also returns eof
721         let buf = decoder.decode_fut(&mut mock_buf).await.expect("decode");
722         assert_eq!(0, buf.len());
723     }
724 
725     // perform an async read using a custom buffer size and causing a blocking
726     // read at the specified byte
read_async(mut decoder: Decoder, content: &[u8], block_at: usize) -> String727     async fn read_async(mut decoder: Decoder, content: &[u8], block_at: usize) -> String {
728         let mut outs = Vec::new();
729 
730         let mut ins = if block_at == 0 {
731             tokio_test::io::Builder::new()
732                 .wait(Duration::from_millis(10))
733                 .read(content)
734                 .build()
735         } else {
736             tokio_test::io::Builder::new()
737                 .read(&content[..block_at])
738                 .wait(Duration::from_millis(10))
739                 .read(&content[block_at..])
740                 .build()
741         };
742 
743         let mut ins = &mut ins as &mut (dyn AsyncRead + Unpin);
744 
745         loop {
746             let buf = decoder
747                 .decode_fut(&mut ins)
748                 .await
749                 .expect("unexpected decode error");
750             if buf.is_empty() {
751                 break; // eof
752             }
753             outs.extend(buf.as_ref());
754         }
755 
756         String::from_utf8(outs).expect("decode String")
757     }
758 
759     // iterate over the different ways that this async read could go.
760     // tests blocking a read at each byte along the content - The shotgun approach
all_async_cases(content: &str, expected: &str, decoder: Decoder)761     async fn all_async_cases(content: &str, expected: &str, decoder: Decoder) {
762         let content_len = content.len();
763         for block_at in 0..content_len {
764             let actual = read_async(decoder.clone(), content.as_bytes(), block_at).await;
765             assert_eq!(expected, &actual) //, "Failed async. Blocking at {}", block_at);
766         }
767     }
768 
769     #[tokio::test]
test_read_length_async()770     async fn test_read_length_async() {
771         let content = "foobar";
772         all_async_cases(content, content, Decoder::length(content.len() as u64)).await;
773     }
774 
775     #[tokio::test]
test_read_chunked_async()776     async fn test_read_chunked_async() {
777         let content = "3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n";
778         let expected = "foobar";
779         all_async_cases(content, expected, Decoder::chunked()).await;
780     }
781 
782     #[tokio::test]
test_read_eof_async()783     async fn test_read_eof_async() {
784         let content = "foobar";
785         all_async_cases(content, content, Decoder::eof()).await;
786     }
787 
788     #[cfg(feature = "nightly")]
789     #[bench]
bench_decode_chunked_1kb(b: &mut test::Bencher)790     fn bench_decode_chunked_1kb(b: &mut test::Bencher) {
791         let rt = new_runtime();
792 
793         const LEN: usize = 1024;
794         let mut vec = Vec::new();
795         vec.extend(format!("{:x}\r\n", LEN).as_bytes());
796         vec.extend(&[0; LEN][..]);
797         vec.extend(b"\r\n");
798         let content = Bytes::from(vec);
799 
800         b.bytes = LEN as u64;
801 
802         b.iter(|| {
803             let mut decoder = Decoder::chunked();
804             rt.block_on(async {
805                 let mut raw = content.clone();
806                 let chunk = decoder.decode_fut(&mut raw).await.unwrap();
807                 assert_eq!(chunk.len(), LEN);
808             });
809         });
810     }
811 
812     #[cfg(feature = "nightly")]
813     #[bench]
bench_decode_length_1kb(b: &mut test::Bencher)814     fn bench_decode_length_1kb(b: &mut test::Bencher) {
815         let rt = new_runtime();
816 
817         const LEN: usize = 1024;
818         let content = Bytes::from(&[0; LEN][..]);
819         b.bytes = LEN as u64;
820 
821         b.iter(|| {
822             let mut decoder = Decoder::length(LEN as u64);
823             rt.block_on(async {
824                 let mut raw = content.clone();
825                 let chunk = decoder.decode_fut(&mut raw).await.unwrap();
826                 assert_eq!(chunk.len(), LEN);
827             });
828         });
829     }
830 
831     #[cfg(feature = "nightly")]
new_runtime() -> tokio::runtime::Runtime832     fn new_runtime() -> tokio::runtime::Runtime {
833         tokio::runtime::Builder::new_current_thread()
834             .enable_all()
835             .build()
836             .expect("rt build")
837     }
838 }
839