1 use std::io::{self, BufRead, Read};
2 
3 #[cfg(any(
4     feature = "futures-03",
5     feature = "tokio-02",
6     feature = "tokio-03",
7     feature = "tokio"
8 ))]
9 use std::pin::Pin;
10 
11 #[cfg(any(feature = "futures-03", feature = "tokio-02", feature = "tokio-03"))]
12 use std::mem::MaybeUninit;
13 
14 #[cfg(feature = "futures-core-03")]
15 use std::task::{Context, Poll};
16 
17 #[cfg(feature = "futures-03")]
18 use std::future::Future;
19 
20 use bytes::{Buf, BufMut, BytesMut};
21 
22 #[cfg(feature = "pin-project-lite")]
23 use pin_project_lite::pin_project;
24 
25 #[cfg(feature = "tokio-03")]
26 use tokio_03_dep::io::AsyncBufRead as _;
27 
28 #[cfg(feature = "tokio")]
29 use tokio_dep::io::AsyncBufRead as _;
30 
31 #[cfg(feature = "futures-core-03")]
32 use futures_core_03::ready;
33 
34 #[cfg(feature = "pin-project-lite")]
35 pin_project! {
36     /// `BufReader` used by `Decoder` when it is constructed with [`Decoder::new_bufferless`][]
37     ///
38     /// [`Decoder::new_bufferless`]: ../decoder/struct.Decoder.html#method.new_bufferless
39     #[derive(Debug)]
40     pub struct BufReader<R> {
41         #[pin]
42         inner: R,
43         buf: BytesMut
44     }
45 }
46 
47 #[cfg(not(feature = "pin-project-lite"))]
48 /// `BufReader` used by `Decoder` when it is constructed with [`Decoder::new_bufferless`][]
49 ///
50 /// [`Decoder::new_bufferless`]: ../decoder/struct.Decoder.html#method.new_bufferless
51 #[derive(Debug)]
52 pub struct BufReader<R> {
53     inner: R,
54     buf: BytesMut,
55 }
56 
57 impl<R> BufReader<R> {
58     /// Creates a new `BufReader` with a default buffer capacity. The default is currently 8 KB,
59     /// but may change in the future.
new(inner: R) -> Self60     pub fn new(inner: R) -> Self {
61         Self::with_capacity(8096, inner)
62     }
63 
64     /// Creates a new `BufReader` with the specified buffer capacity.
with_capacity(capacity: usize, inner: R) -> Self65     pub fn with_capacity(capacity: usize, inner: R) -> Self {
66         let buf = BytesMut::with_capacity(capacity);
67 
68         Self { inner, buf }
69     }
70 
71     /// Gets a reference to the underlying reader.
72     ///
73     /// It is inadvisable to directly read from the underlying reader.
get_ref(&self) -> &R74     pub fn get_ref(&self) -> &R {
75         &self.inner
76     }
77 
78     /// Gets a mutable reference to the underlying reader.
79     ///
80     /// It is inadvisable to directly read from the underlying reader.
get_mut(&mut self) -> &mut R81     pub fn get_mut(&mut self) -> &mut R {
82         &mut self.inner
83     }
84 
85     #[cfg(feature = "pin-project-lite")]
86     /// Gets a pinned mutable reference to the underlying reader.
87     ///
88     /// It is inadvisable to directly read from the underlying reader.
get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R>89     pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
90         self.project().inner
91     }
92 
93     /// Consumes this `BufWriter`, returning the underlying reader.
94     ///
95     /// Note that any leftover data in the internal buffer is lost.
into_inner(self) -> R96     pub fn into_inner(self) -> R {
97         self.inner
98     }
99 
100     /// Returns a reference to the internally buffered data.
101     ///
102     /// Unlike `fill_buf`, this will not attempt to fill the buffer if it is empty.
buffer(&self) -> &[u8]103     pub fn buffer(&self) -> &[u8] {
104         &self.buf
105     }
106 
107     /// Invalidates all data in the internal buffer.
108     #[inline]
109     #[cfg(any(feature = "tokio-02", feature = "tokio-03", feature = "tokio"))]
discard_buffer(self: Pin<&mut Self>)110     fn discard_buffer(self: Pin<&mut Self>) {
111         let me = self.project();
112         me.buf.clear();
113     }
114 }
115 
116 mod sealed {
117     pub trait Sealed {}
118 }
119 
120 #[doc(hidden)]
121 pub trait CombineBuffer<R>: sealed::Sealed {
buffer<'a>(&'a self, read: &'a R) -> &'a [u8]122     fn buffer<'a>(&'a self, read: &'a R) -> &'a [u8];
123 
advance(&mut self, read: &mut R, len: usize)124     fn advance(&mut self, read: &mut R, len: usize);
125 
126     #[cfg(feature = "pin-project-lite")]
advance_pin(&mut self, read: Pin<&mut R>, len: usize)127     fn advance_pin(&mut self, read: Pin<&mut R>, len: usize);
128 }
129 
130 #[doc(hidden)]
131 pub trait CombineSyncRead<R>: CombineBuffer<R> {
extend_buf_sync(&mut self, read: &mut R) -> io::Result<usize>132     fn extend_buf_sync(&mut self, read: &mut R) -> io::Result<usize>;
133 }
134 
135 #[cfg(any(feature = "tokio-02", feature = "tokio-03", feature = "tokio"))]
136 #[doc(hidden)]
137 pub trait CombineRead<R, T: ?Sized>: CombineBuffer<R> {
poll_extend_buf( &mut self, cx: &mut Context<'_>, read: Pin<&mut R>, ) -> Poll<io::Result<usize>>138     fn poll_extend_buf(
139         &mut self,
140         cx: &mut Context<'_>,
141         read: Pin<&mut R>,
142     ) -> Poll<io::Result<usize>>;
143 }
144 
145 #[cfg(feature = "futures-03")]
146 #[doc(hidden)]
147 pub trait CombineAsyncRead<R>: CombineBuffer<R> {
poll_extend_buf( &mut self, cx: &mut Context<'_>, read: Pin<&mut R>, ) -> Poll<io::Result<usize>>148     fn poll_extend_buf(
149         &mut self,
150         cx: &mut Context<'_>,
151         read: Pin<&mut R>,
152     ) -> Poll<io::Result<usize>>;
153 
extend_buf<'a>(&'a mut self, read: Pin<&'a mut R>) -> ExtendBuf<'a, Self, R> where Self: Sized154     fn extend_buf<'a>(&'a mut self, read: Pin<&'a mut R>) -> ExtendBuf<'a, Self, R>
155     where
156         Self: Sized;
157 }
158 
159 #[cfg(feature = "futures-03")]
160 pin_project_lite::pin_project! {
161     #[doc(hidden)]
162     pub struct ExtendBuf<'a, C, R> {
163         buffer: &'a mut C,
164         read: Pin<&'a mut R>
165     }
166 }
167 
168 #[cfg(feature = "futures-03")]
169 impl<'a, C, R> Future for ExtendBuf<'a, C, R>
170 where
171     C: CombineAsyncRead<R>,
172 {
173     type Output = io::Result<usize>;
174 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>175     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
176         let me = self.project();
177         me.buffer.poll_extend_buf(cx, me.read.as_mut())
178     }
179 }
180 
181 /// Marker used by `Decoder` for an internal buffer
182 #[derive(Default)]
183 pub struct Buffer(pub(crate) BytesMut);
184 
185 impl sealed::Sealed for Buffer {}
186 
187 impl<R> CombineBuffer<R> for Buffer {
buffer<'a>(&'a self, _read: &'a R) -> &'a [u8]188     fn buffer<'a>(&'a self, _read: &'a R) -> &'a [u8] {
189         &self.0
190     }
191 
advance(&mut self, _read: &mut R, len: usize)192     fn advance(&mut self, _read: &mut R, len: usize) {
193         self.0.advance(len);
194     }
195 
196     #[cfg(feature = "pin-project-lite")]
advance_pin(&mut self, _read: Pin<&mut R>, len: usize)197     fn advance_pin(&mut self, _read: Pin<&mut R>, len: usize) {
198         self.0.advance(len);
199     }
200 }
201 
202 impl<R> CombineSyncRead<R> for Buffer
203 where
204     R: Read,
205 {
extend_buf_sync(&mut self, read: &mut R) -> io::Result<usize>206     fn extend_buf_sync(&mut self, read: &mut R) -> io::Result<usize> {
207         extend_buf_sync(&mut self.0, read)
208     }
209 }
210 
211 #[cfg(feature = "futures-03")]
212 impl<R> CombineAsyncRead<R> for Buffer
213 where
214     R: futures_io_03::AsyncRead,
215 {
poll_extend_buf( &mut self, cx: &mut Context<'_>, read: Pin<&mut R>, ) -> Poll<io::Result<usize>>216     fn poll_extend_buf(
217         &mut self,
218         cx: &mut Context<'_>,
219         read: Pin<&mut R>,
220     ) -> Poll<io::Result<usize>> {
221         poll_extend_buf(&mut self.0, cx, read)
222     }
223 
extend_buf<'a>(&'a mut self, read: Pin<&'a mut R>) -> ExtendBuf<'a, Self, R>224     fn extend_buf<'a>(&'a mut self, read: Pin<&'a mut R>) -> ExtendBuf<'a, Self, R> {
225         if !self.0.has_remaining_mut() {
226             self.0.reserve(8 * 1024);
227         }
228         // Copy of tokio's read_buf method (but it has to force initialize the buffer)
229         let bs = self.0.chunk_mut();
230 
231         for i in 0..bs.len() {
232             bs.write_byte(i, 0);
233         }
234         ExtendBuf { buffer: self, read }
235     }
236 }
237 
238 #[cfg(feature = "tokio-02")]
239 impl<R> CombineRead<R, dyn tokio_02_dep::io::AsyncRead> for Buffer
240 where
241     R: tokio_02_dep::io::AsyncRead,
242 {
poll_extend_buf( &mut self, cx: &mut Context<'_>, read: Pin<&mut R>, ) -> Poll<io::Result<usize>>243     fn poll_extend_buf(
244         &mut self,
245         cx: &mut Context<'_>,
246         read: Pin<&mut R>,
247     ) -> Poll<io::Result<usize>> {
248         if !self.0.has_remaining_mut() {
249             self.0.reserve(8 * 1024);
250         }
251         read.poll_read_buf(cx, &mut Bytes05(&mut self.0))
252     }
253 }
254 
255 #[cfg(feature = "tokio-03")]
tokio_03_to_read_buf(bs: &mut BytesMut) -> tokio_03_dep::io::ReadBuf<'_>256 fn tokio_03_to_read_buf(bs: &mut BytesMut) -> tokio_03_dep::io::ReadBuf<'_> {
257     let uninit = bs.chunk_mut();
258     unsafe {
259         tokio_03_dep::io::ReadBuf::uninit(std::slice::from_raw_parts_mut(
260             uninit.as_mut_ptr() as *mut MaybeUninit<u8>,
261             uninit.len(),
262         ))
263     }
264 }
265 
266 #[cfg(feature = "tokio-03")]
267 impl<R> CombineRead<R, dyn tokio_03_dep::io::AsyncRead> for Buffer
268 where
269     R: tokio_03_dep::io::AsyncRead,
270 {
poll_extend_buf( &mut self, cx: &mut Context<'_>, read: Pin<&mut R>, ) -> Poll<io::Result<usize>>271     fn poll_extend_buf(
272         &mut self,
273         cx: &mut Context<'_>,
274         read: Pin<&mut R>,
275     ) -> Poll<io::Result<usize>> {
276         tokio_03_read_buf(cx, read, &mut self.0)
277     }
278 }
279 
280 #[cfg(feature = "tokio-03")]
tokio_03_read_buf( cx: &mut Context<'_>, read: Pin<&mut impl tokio_03_dep::io::AsyncRead>, bs: &mut bytes::BytesMut, ) -> Poll<io::Result<usize>>281 fn tokio_03_read_buf(
282     cx: &mut Context<'_>,
283     read: Pin<&mut impl tokio_03_dep::io::AsyncRead>,
284     bs: &mut bytes::BytesMut,
285 ) -> Poll<io::Result<usize>> {
286     if !bs.has_remaining_mut() {
287         bs.reserve(8 * 1024);
288     }
289 
290     let mut buf = tokio_03_to_read_buf(bs);
291     ready!(read.poll_read(cx, &mut buf))?;
292     unsafe {
293         let n = buf.filled().len();
294         bs.advance_mut(n);
295         Poll::Ready(Ok(n))
296     }
297 }
298 
299 #[cfg(feature = "tokio")]
300 impl<R> CombineRead<R, dyn tokio_dep::io::AsyncRead> for Buffer
301 where
302     R: tokio_dep::io::AsyncRead,
303 {
poll_extend_buf( &mut self, cx: &mut Context<'_>, read: Pin<&mut R>, ) -> Poll<io::Result<usize>>304     fn poll_extend_buf(
305         &mut self,
306         cx: &mut Context<'_>,
307         read: Pin<&mut R>,
308     ) -> Poll<io::Result<usize>> {
309         tokio_read_buf(read, cx, &mut self.0)
310     }
311 }
312 
313 #[cfg(feature = "tokio")]
tokio_read_buf( read: Pin<&mut impl tokio_dep::io::AsyncRead>, cx: &mut Context<'_>, bs: &mut bytes::BytesMut, ) -> Poll<io::Result<usize>>314 fn tokio_read_buf(
315     read: Pin<&mut impl tokio_dep::io::AsyncRead>,
316     cx: &mut Context<'_>,
317     bs: &mut bytes::BytesMut,
318 ) -> Poll<io::Result<usize>> {
319     if !bs.has_remaining_mut() {
320         bs.reserve(8 * 1024);
321     }
322 
323     tokio_util::io::poll_read_buf(read, cx, bs)
324 }
325 
326 /// Marker used by `Decoder` for an external buffer
327 #[derive(Default)]
328 pub struct Bufferless;
329 
330 impl sealed::Sealed for Bufferless {}
331 
332 impl<R> CombineBuffer<BufReader<R>> for Bufferless {
buffer<'a>(&'a self, read: &'a BufReader<R>) -> &'a [u8]333     fn buffer<'a>(&'a self, read: &'a BufReader<R>) -> &'a [u8] {
334         &read.buf
335     }
336 
advance(&mut self, read: &mut BufReader<R>, len: usize)337     fn advance(&mut self, read: &mut BufReader<R>, len: usize) {
338         read.buf.advance(len);
339     }
340 
341     #[cfg(feature = "pin-project-lite")]
advance_pin(&mut self, read: Pin<&mut BufReader<R>>, len: usize)342     fn advance_pin(&mut self, read: Pin<&mut BufReader<R>>, len: usize) {
343         read.project().buf.advance(len);
344     }
345 }
346 
347 impl<R> CombineSyncRead<BufReader<R>> for Bufferless
348 where
349     R: Read,
350 {
extend_buf_sync(&mut self, read: &mut BufReader<R>) -> io::Result<usize>351     fn extend_buf_sync(&mut self, read: &mut BufReader<R>) -> io::Result<usize> {
352         extend_buf_sync(&mut read.buf, &mut read.inner)
353     }
354 }
355 
extend_buf_sync<R>(buf: &mut BytesMut, read: &mut R) -> io::Result<usize> where R: Read,356 fn extend_buf_sync<R>(buf: &mut BytesMut, read: &mut R) -> io::Result<usize>
357 where
358     R: Read,
359 {
360     let size = 8 * 1024;
361     if !buf.has_remaining_mut() {
362         buf.reserve(size);
363     }
364 
365     // Copy of tokio's poll_read_buf method (but it has to force initialize the buffer)
366     let n = {
367         let bs = buf.chunk_mut();
368 
369         let initial_size = bs.len().min(size);
370         let bs = &mut bs[..initial_size];
371         for i in 0..bs.len() {
372             bs.write_byte(i, 0);
373         }
374 
375         // Convert to `&mut [u8]`
376         // SAFETY: the entire buffer is preinitialized above
377         let bs = unsafe { &mut *(bs as *mut _ as *mut [u8]) };
378 
379         let n = read.read(bs)?;
380         assert!(
381             n <= bs.len(),
382             "AsyncRead reported that it initialized more than the number of bytes in the buffer"
383         );
384         n
385     };
386 
387     // SAFETY: the entire buffer has been preinitialized
388     unsafe { buf.advance_mut(n) };
389 
390     Ok(n)
391 }
392 
393 #[cfg(feature = "tokio-02")]
394 struct Bytes05<'a>(&'a mut BytesMut);
395 
396 #[cfg(feature = "tokio-02")]
397 impl bytes_05::BufMut for Bytes05<'_> {
remaining_mut(&self) -> usize398     fn remaining_mut(&self) -> usize {
399         self.0.remaining_mut()
400     }
advance_mut(&mut self, cnt: usize)401     unsafe fn advance_mut(&mut self, cnt: usize) {
402         self.0.advance_mut(cnt)
403     }
bytes_mut(&mut self) -> &mut [MaybeUninit<u8>]404     fn bytes_mut(&mut self) -> &mut [MaybeUninit<u8>] {
405         unsafe { &mut *(self.0.chunk_mut() as *mut _ as *mut [MaybeUninit<u8>]) }
406     }
407 }
408 
409 #[cfg(feature = "tokio-02")]
410 impl<R> CombineRead<BufReader<R>, dyn tokio_02_dep::io::AsyncRead> for Bufferless
411 where
412     R: tokio_02_dep::io::AsyncRead,
413 {
poll_extend_buf( &mut self, cx: &mut Context<'_>, read: Pin<&mut BufReader<R>>, ) -> Poll<io::Result<usize>>414     fn poll_extend_buf(
415         &mut self,
416         cx: &mut Context<'_>,
417         read: Pin<&mut BufReader<R>>,
418     ) -> Poll<io::Result<usize>> {
419         let me = read.project();
420 
421         if !me.buf.has_remaining_mut() {
422             me.buf.reserve(8 * 1024);
423         }
424         tokio_02_dep::io::AsyncRead::poll_read_buf(me.inner, cx, &mut Bytes05(me.buf))
425     }
426 }
427 
428 #[cfg(feature = "tokio-03")]
429 impl<R> CombineRead<BufReader<R>, dyn tokio_03_dep::io::AsyncRead> for Bufferless
430 where
431     R: tokio_03_dep::io::AsyncRead,
432 {
poll_extend_buf( &mut self, cx: &mut Context<'_>, read: Pin<&mut BufReader<R>>, ) -> Poll<io::Result<usize>>433     fn poll_extend_buf(
434         &mut self,
435         cx: &mut Context<'_>,
436         read: Pin<&mut BufReader<R>>,
437     ) -> Poll<io::Result<usize>> {
438         let me = read.project();
439 
440         tokio_03_read_buf(cx, me.inner, me.buf)
441     }
442 }
443 
444 #[cfg(feature = "tokio")]
445 impl<R> CombineRead<BufReader<R>, dyn tokio_dep::io::AsyncRead> for Bufferless
446 where
447     R: tokio_dep::io::AsyncRead,
448 {
poll_extend_buf( &mut self, cx: &mut Context<'_>, read: Pin<&mut BufReader<R>>, ) -> Poll<io::Result<usize>>449     fn poll_extend_buf(
450         &mut self,
451         cx: &mut Context<'_>,
452         read: Pin<&mut BufReader<R>>,
453     ) -> Poll<io::Result<usize>> {
454         let me = read.project();
455 
456         tokio_read_buf(me.inner, cx, me.buf)
457     }
458 }
459 
460 #[cfg(feature = "futures-03")]
461 impl<R> CombineAsyncRead<BufReader<R>> for Bufferless
462 where
463     R: futures_io_03::AsyncRead,
464 {
poll_extend_buf( &mut self, cx: &mut Context<'_>, read: Pin<&mut BufReader<R>>, ) -> Poll<io::Result<usize>>465     fn poll_extend_buf(
466         &mut self,
467         cx: &mut Context<'_>,
468         read: Pin<&mut BufReader<R>>,
469     ) -> Poll<io::Result<usize>> {
470         let me = read.project();
471 
472         poll_extend_buf(me.buf, cx, me.inner)
473     }
474 
extend_buf<'a>( &'a mut self, mut read: Pin<&'a mut BufReader<R>>, ) -> ExtendBuf<'a, Self, BufReader<R>>475     fn extend_buf<'a>(
476         &'a mut self,
477         mut read: Pin<&'a mut BufReader<R>>,
478     ) -> ExtendBuf<'a, Self, BufReader<R>> {
479         let me = read.as_mut().project();
480 
481         if !me.buf.has_remaining_mut() {
482             me.buf.reserve(8 * 1024);
483         }
484         // Copy of tokio's read_buf method (but it has to force initialize the buffer)
485         let bs = me.buf.chunk_mut();
486 
487         for i in 0..bs.len() {
488             bs.write_byte(i, 0);
489         }
490         ExtendBuf { buffer: self, read }
491     }
492 }
493 
494 #[cfg(feature = "futures-03")]
poll_extend_buf<R>( buf: &mut BytesMut, cx: &mut Context<'_>, read: Pin<&mut R>, ) -> Poll<io::Result<usize>> where R: futures_io_03::AsyncRead,495 fn poll_extend_buf<R>(
496     buf: &mut BytesMut,
497     cx: &mut Context<'_>,
498     read: Pin<&mut R>,
499 ) -> Poll<io::Result<usize>>
500 where
501     R: futures_io_03::AsyncRead,
502 {
503     // Copy of tokio's read_buf method (but it has to force initialize the buffer)
504     let n = {
505         let bs = buf.chunk_mut();
506         // preinit the buffer
507         for i in 0..bs.len() {
508             bs.write_byte(i, 0);
509         }
510 
511         // Convert to `&mut [u8]`
512         // SAFETY: preinitialize the buffer
513         let bs = unsafe { &mut *(bs as *mut _ as *mut [u8]) };
514 
515         let n = ready!(read.poll_read(cx, bs))?;
516         assert!(
517             n <= bs.len(),
518             "AsyncRead reported that it initialized more than the number of bytes in the buffer"
519         );
520         n
521     };
522     // SAFETY: the buffer was preinitialized
523     unsafe { buf.advance_mut(n) };
524     Poll::Ready(Ok(n))
525 }
526 
527 #[cfg(feature = "tokio-02")]
528 impl<R: tokio_02_dep::io::AsyncRead> tokio_02_dep::io::AsyncRead for BufReader<R> {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>>529     fn poll_read(
530         mut self: Pin<&mut Self>,
531         cx: &mut Context<'_>,
532         buf: &mut [u8],
533     ) -> Poll<io::Result<usize>> {
534         use tokio_02_dep::io::AsyncBufRead;
535 
536         // If we don't have any buffered data and we're doing a massive read
537         // (larger than our internal buffer), bypass our internal buffer
538         // entirely.
539         if !self.buf.has_remaining_mut() && buf.len() >= self.buf.len() {
540             let res = ready!(self.as_mut().get_pin_mut().poll_read(cx, buf));
541             self.discard_buffer();
542             return Poll::Ready(res);
543         }
544         let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
545         let nread = rem.read(buf)?;
546         self.consume(nread);
547         Poll::Ready(Ok(nread))
548     }
549 
550     // we can't skip unconditionally because of the large buffer case in read.
prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool551     unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool {
552         self.inner.prepare_uninitialized_buffer(buf)
553     }
554 }
555 
556 #[cfg(feature = "tokio-02")]
557 impl<R: tokio_02_dep::io::AsyncRead> tokio_02_dep::io::AsyncBufRead for BufReader<R> {
poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>558     fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
559         let me = self.project();
560 
561         // If we've reached the end of our internal buffer then we need to fetch
562         // some more data from the underlying reader.
563         // Branch using `>=` instead of the more correct `==`
564         // to tell the compiler that the pos..cap slice is always valid.
565 
566         if me.buf.is_empty() {
567             ready!(me.inner.poll_read_buf(cx, &mut Bytes05(me.buf)))?;
568         }
569         Poll::Ready(Ok(&me.buf[..]))
570     }
571 
consume(self: Pin<&mut Self>, amt: usize)572     fn consume(self: Pin<&mut Self>, amt: usize) {
573         let me = self.project();
574         me.buf.advance(amt);
575     }
576 }
577 
578 #[cfg(feature = "tokio-02")]
579 impl<R: tokio_02_dep::io::AsyncRead + tokio_02_dep::io::AsyncWrite> tokio_02_dep::io::AsyncWrite
580     for BufReader<R>
581 {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>582     fn poll_write(
583         self: Pin<&mut Self>,
584         cx: &mut Context<'_>,
585         buf: &[u8],
586     ) -> Poll<io::Result<usize>> {
587         self.get_pin_mut().poll_write(cx, buf)
588     }
589 
poll_write_buf<B: bytes_05::Buf>( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut B, ) -> Poll<io::Result<usize>>590     fn poll_write_buf<B: bytes_05::Buf>(
591         self: Pin<&mut Self>,
592         cx: &mut Context<'_>,
593         buf: &mut B,
594     ) -> Poll<io::Result<usize>> {
595         self.get_pin_mut().poll_write_buf(cx, buf)
596     }
597 
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>598     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
599         self.get_pin_mut().poll_flush(cx)
600     }
601 
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>602     fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
603         self.get_pin_mut().poll_shutdown(cx)
604     }
605 }
606 
607 #[cfg(feature = "tokio-03")]
608 impl<R: tokio_03_dep::io::AsyncRead> tokio_03_dep::io::AsyncRead for BufReader<R> {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut tokio_03_dep::io::ReadBuf<'_>, ) -> Poll<io::Result<()>>609     fn poll_read(
610         mut self: Pin<&mut Self>,
611         cx: &mut Context<'_>,
612         buf: &mut tokio_03_dep::io::ReadBuf<'_>,
613     ) -> Poll<io::Result<()>> {
614         // If we don't have any buffered data and we're doing a massive read
615         // (larger than our internal buffer), bypass our internal buffer
616         // entirely.
617         if !self.buf.has_remaining_mut() && buf.remaining() >= self.buf.len() {
618             let res = ready!(self.as_mut().get_pin_mut().poll_read(cx, buf));
619             self.discard_buffer();
620             return Poll::Ready(res);
621         }
622         let rem = ready!(self.as_mut().poll_fill_buf(cx))?;
623         let amt = std::cmp::min(rem.len(), buf.remaining());
624         buf.put_slice(&rem[..amt]);
625         self.consume(amt);
626         Poll::Ready(Ok(()))
627     }
628 }
629 
630 #[cfg(feature = "tokio-03")]
631 impl<R: tokio_03_dep::io::AsyncRead> tokio_03_dep::io::AsyncBufRead for BufReader<R> {
poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>632     fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
633         let me = self.project();
634 
635         // If we've reached the end of our internal buffer then we need to fetch
636         // some more data from the underlying reader.
637         if me.buf.is_empty() {
638             ready!(tokio_03_read_buf(cx, me.inner, me.buf))?;
639         }
640         Poll::Ready(Ok(&me.buf[..]))
641     }
642 
consume(self: Pin<&mut Self>, amt: usize)643     fn consume(self: Pin<&mut Self>, amt: usize) {
644         let me = self.project();
645         me.buf.advance(amt);
646     }
647 }
648 
649 #[cfg(feature = "tokio-03")]
650 impl<R: tokio_03_dep::io::AsyncRead + tokio_03_dep::io::AsyncWrite> tokio_03_dep::io::AsyncWrite
651     for BufReader<R>
652 {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>653     fn poll_write(
654         self: Pin<&mut Self>,
655         cx: &mut Context<'_>,
656         buf: &[u8],
657     ) -> Poll<io::Result<usize>> {
658         self.get_pin_mut().poll_write(cx, buf)
659     }
660 
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>661     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
662         self.get_pin_mut().poll_flush(cx)
663     }
664 
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>665     fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
666         self.get_pin_mut().poll_shutdown(cx)
667     }
668 }
669 
670 #[cfg(feature = "tokio")]
671 impl<R: tokio_dep::io::AsyncRead> tokio_dep::io::AsyncRead for BufReader<R> {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut tokio_dep::io::ReadBuf<'_>, ) -> Poll<io::Result<()>>672     fn poll_read(
673         mut self: Pin<&mut Self>,
674         cx: &mut Context<'_>,
675         buf: &mut tokio_dep::io::ReadBuf<'_>,
676     ) -> Poll<io::Result<()>> {
677         // If we don't have any buffered data and we're doing a massive read
678         // (larger than our internal buffer), bypass our internal buffer
679         // entirely.
680         if !self.buf.has_remaining_mut() && buf.remaining() >= self.buf.len() {
681             let res = ready!(self.as_mut().get_pin_mut().poll_read(cx, buf));
682             self.discard_buffer();
683             return Poll::Ready(res);
684         }
685         let rem = ready!(self.as_mut().poll_fill_buf(cx))?;
686         let amt = std::cmp::min(rem.len(), buf.remaining());
687         buf.put_slice(&rem[..amt]);
688         self.consume(amt);
689         Poll::Ready(Ok(()))
690     }
691 }
692 
693 #[cfg(feature = "tokio")]
694 impl<R: tokio_dep::io::AsyncRead> tokio_dep::io::AsyncBufRead for BufReader<R> {
poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>695     fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
696         let me = self.project();
697 
698         // If we've reached the end of our internal buffer then we need to fetch
699         // some more data from the underlying reader.
700         if me.buf.is_empty() {
701             ready!(tokio_read_buf(me.inner, cx, me.buf))?;
702         }
703         Poll::Ready(Ok(&me.buf[..]))
704     }
705 
consume(self: Pin<&mut Self>, amt: usize)706     fn consume(self: Pin<&mut Self>, amt: usize) {
707         let me = self.project();
708         me.buf.advance(amt);
709     }
710 }
711 
712 #[cfg(feature = "tokio")]
713 impl<R: tokio_dep::io::AsyncRead + tokio_dep::io::AsyncWrite> tokio_dep::io::AsyncWrite
714     for BufReader<R>
715 {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>716     fn poll_write(
717         self: Pin<&mut Self>,
718         cx: &mut Context<'_>,
719         buf: &[u8],
720     ) -> Poll<io::Result<usize>> {
721         self.get_pin_mut().poll_write(cx, buf)
722     }
723 
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>724     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
725         self.get_pin_mut().poll_flush(cx)
726     }
727 
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>728     fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
729         self.get_pin_mut().poll_shutdown(cx)
730     }
731 }
732 
733 impl<R: Read> Read for BufReader<R> {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>734     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
735         // If we don't have any buffered data and we're doing a massive read
736         // (larger than our internal buffer), bypass our internal buffer
737         // entirely.
738         if !self.buf.has_remaining_mut() && buf.len() >= self.buf.len() {
739             let res = self.read(buf);
740             self.buf.clear();
741             return res;
742         }
743         let nread = {
744             let mut rem = self.fill_buf()?;
745             rem.read(buf)?
746         };
747         self.consume(nread);
748         Ok(nread)
749     }
750 }
751 
752 impl<R: Read> BufRead for BufReader<R> {
fill_buf(&mut self) -> io::Result<&[u8]>753     fn fill_buf(&mut self) -> io::Result<&[u8]> {
754         // If we've reached the end of our internal buffer then we need to fetch
755         // some more data from the underlying reader.
756         // Branch using `>=` instead of the more correct `==`
757         // to tell the compiler that the pos..cap slice is always valid.
758 
759         if self.buf.is_empty() {
760             Bufferless.extend_buf_sync(self)?;
761         }
762         Ok(&self.buf[..])
763     }
764 
consume(&mut self, amt: usize)765     fn consume(&mut self, amt: usize) {
766         self.buf.advance(amt);
767     }
768 }
769 
770 #[cfg(test)]
771 #[cfg(feature = "tokio-02")]
772 mod tests {
773     use super::{BufReader, Bufferless, CombineRead};
774 
775     use std::{io, pin::Pin};
776 
777     use {
778         bytes_05::BytesMut,
779         tokio_02_dep::{
780             self as tokio,
781             io::{AsyncRead, AsyncReadExt},
782         },
783     };
784 
785     impl<R: AsyncRead> BufReader<R> {
extend_buf_tokio_02(mut self: Pin<&mut Self>) -> io::Result<usize>786         async fn extend_buf_tokio_02(mut self: Pin<&mut Self>) -> io::Result<usize> {
787             crate::future_ext::poll_fn(|cx| Bufferless.poll_extend_buf(cx, self.as_mut())).await
788         }
789     }
790 
791     #[tokio::test]
buf_reader()792     async fn buf_reader() {
793         let mut read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
794 
795         let mut buf = [0u8; 3];
796         read.read(&mut buf).await.unwrap();
797         assert_eq!(buf, [1, 2, 3]);
798 
799         let mut buf = [0u8; 3];
800         read.read(&mut buf).await.unwrap();
801         assert_eq!(buf, [4, 5, 6]);
802 
803         let mut buf = [0u8; 3];
804         read.read(&mut buf).await.unwrap();
805         assert_eq!(buf, [7, 8, 9]);
806 
807         let mut buf = [1u8; 3];
808         read.read(&mut buf).await.unwrap();
809         assert_eq!(buf, [0, 1, 1]);
810     }
811 
812     #[tokio::test]
buf_reader_buf()813     async fn buf_reader_buf() {
814         let mut read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
815 
816         let mut buf = BytesMut::with_capacity(3);
817         read.read_buf(&mut buf).await.unwrap();
818         assert_eq!(&buf[..], [1, 2, 3]);
819 
820         read.read_buf(&mut buf).await.unwrap();
821         assert_eq!(&buf[..], [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]);
822     }
823 
824     #[tokio::test]
buf_reader_extend_buf()825     async fn buf_reader_extend_buf() {
826         let read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
827         futures_03_dep::pin_mut!(read);
828 
829         assert_eq!(read.as_mut().extend_buf_tokio_02().await.unwrap(), 3);
830         assert_eq!(read.buffer(), [1, 2, 3]);
831 
832         assert_eq!(read.as_mut().extend_buf_tokio_02().await.unwrap(), 7);
833         assert_eq!(read.buffer(), [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]);
834     }
835 }
836 
837 #[cfg(test)]
838 #[cfg(feature = "tokio")]
839 mod tests_tokio_1 {
840     use super::{BufReader, Bufferless, CombineRead};
841 
842     use std::{io, pin::Pin};
843 
844     use {
845         bytes::BytesMut,
846         tokio_dep::{
847             self as tokio,
848             io::{AsyncRead, AsyncReadExt},
849         },
850     };
851 
852     impl<R: AsyncRead> BufReader<R> {
extend_buf_tokio(mut self: Pin<&mut Self>) -> io::Result<usize>853         async fn extend_buf_tokio(mut self: Pin<&mut Self>) -> io::Result<usize> {
854             crate::future_ext::poll_fn(|cx| Bufferless.poll_extend_buf(cx, self.as_mut())).await
855         }
856     }
857 
858     #[tokio::test]
buf_reader()859     async fn buf_reader() {
860         let mut read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
861 
862         let mut buf = [0u8; 3];
863         read.read(&mut buf).await.unwrap();
864         assert_eq!(buf, [1, 2, 3]);
865 
866         let mut buf = [0u8; 3];
867         read.read(&mut buf).await.unwrap();
868         assert_eq!(buf, [4, 5, 6]);
869 
870         let mut buf = [0u8; 3];
871         read.read(&mut buf).await.unwrap();
872         assert_eq!(buf, [7, 8, 9]);
873 
874         let mut buf = [1u8; 3];
875         read.read(&mut buf).await.unwrap();
876         assert_eq!(buf, [0, 1, 1]);
877     }
878 
879     #[tokio::test]
buf_reader_buf()880     async fn buf_reader_buf() {
881         let mut read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
882 
883         let mut buf = BytesMut::with_capacity(3);
884         read.read_buf(&mut buf).await.unwrap();
885         assert_eq!(&buf[..], [1, 2, 3]);
886 
887         read.read_buf(&mut buf).await.unwrap();
888         assert_eq!(&buf[..], [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]);
889     }
890 
891     #[tokio::test]
buf_reader_extend_buf()892     async fn buf_reader_extend_buf() {
893         let read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
894         futures_03_dep::pin_mut!(read);
895 
896         assert_eq!(read.as_mut().extend_buf_tokio().await.unwrap(), 3);
897         assert_eq!(read.buffer(), [1, 2, 3]);
898 
899         assert_eq!(read.as_mut().extend_buf_tokio().await.unwrap(), 7);
900         assert_eq!(read.buffer(), [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]);
901     }
902 }
903 
904 #[cfg(test)]
905 mod tests_sync {
906     use super::{BufReader, Bufferless, CombineSyncRead};
907 
908     use std::io::Read;
909 
910     #[test]
911     #[allow(clippy::unused_io_amount)]
buf_reader()912     fn buf_reader() {
913         let mut read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
914 
915         let mut buf = [0u8; 3];
916         read.read(&mut buf).unwrap();
917         assert_eq!(buf, [1, 2, 3]);
918 
919         let mut buf = [0u8; 3];
920         read.read(&mut buf).unwrap();
921         assert_eq!(buf, [4, 5, 6]);
922 
923         let mut buf = [0u8; 3];
924         read.read(&mut buf).unwrap();
925         assert_eq!(buf, [7, 8, 9]);
926 
927         let mut buf = [1u8; 3];
928         read.read(&mut buf).unwrap();
929         assert_eq!(buf, [0, 1, 1]);
930     }
931 
932     #[test]
buf_reader_extend_buf()933     fn buf_reader_extend_buf() {
934         let mut read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
935 
936         assert_eq!(Bufferless.extend_buf_sync(&mut read).unwrap(), 3);
937         assert_eq!(read.buffer(), [1, 2, 3]);
938 
939         assert_eq!(Bufferless.extend_buf_sync(&mut read).unwrap(), 7);
940         assert_eq!(read.buffer(), [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]);
941     }
942 }
943