1 use std::io::{self, BufRead, Read};
2
3 #[cfg(any(
4 feature = "futures-03",
5 feature = "tokio-02",
6 feature = "tokio-03",
7 feature = "tokio"
8 ))]
9 use std::pin::Pin;
10
11 #[cfg(any(feature = "futures-03", feature = "tokio-02", feature = "tokio-03"))]
12 use std::mem::MaybeUninit;
13
14 #[cfg(feature = "futures-core-03")]
15 use std::task::{Context, Poll};
16
17 #[cfg(feature = "futures-03")]
18 use std::future::Future;
19
20 use bytes::{Buf, BufMut, BytesMut};
21
22 #[cfg(feature = "pin-project-lite")]
23 use pin_project_lite::pin_project;
24
25 #[cfg(feature = "tokio-03")]
26 use tokio_03_dep::io::AsyncBufRead as _;
27
28 #[cfg(feature = "tokio")]
29 use tokio_dep::io::AsyncBufRead as _;
30
31 #[cfg(feature = "futures-core-03")]
32 use futures_core_03::ready;
33
34 #[cfg(feature = "pin-project-lite")]
35 pin_project! {
36 /// `BufReader` used by `Decoder` when it is constructed with [`Decoder::new_bufferless`][]
37 ///
38 /// [`Decoder::new_bufferless`]: ../decoder/struct.Decoder.html#method.new_bufferless
39 #[derive(Debug)]
40 pub struct BufReader<R> {
41 #[pin]
42 inner: R,
43 buf: BytesMut
44 }
45 }
46
47 #[cfg(not(feature = "pin-project-lite"))]
48 /// `BufReader` used by `Decoder` when it is constructed with [`Decoder::new_bufferless`][]
49 ///
50 /// [`Decoder::new_bufferless`]: ../decoder/struct.Decoder.html#method.new_bufferless
51 #[derive(Debug)]
52 pub struct BufReader<R> {
53 inner: R,
54 buf: BytesMut,
55 }
56
57 impl<R> BufReader<R> {
58 /// Creates a new `BufReader` with a default buffer capacity. The default is currently 8 KB,
59 /// but may change in the future.
new(inner: R) -> Self60 pub fn new(inner: R) -> Self {
61 Self::with_capacity(8096, inner)
62 }
63
64 /// Creates a new `BufReader` with the specified buffer capacity.
with_capacity(capacity: usize, inner: R) -> Self65 pub fn with_capacity(capacity: usize, inner: R) -> Self {
66 let buf = BytesMut::with_capacity(capacity);
67
68 Self { inner, buf }
69 }
70
71 /// Gets a reference to the underlying reader.
72 ///
73 /// It is inadvisable to directly read from the underlying reader.
get_ref(&self) -> &R74 pub fn get_ref(&self) -> &R {
75 &self.inner
76 }
77
78 /// Gets a mutable reference to the underlying reader.
79 ///
80 /// It is inadvisable to directly read from the underlying reader.
get_mut(&mut self) -> &mut R81 pub fn get_mut(&mut self) -> &mut R {
82 &mut self.inner
83 }
84
85 #[cfg(feature = "pin-project-lite")]
86 /// Gets a pinned mutable reference to the underlying reader.
87 ///
88 /// It is inadvisable to directly read from the underlying reader.
get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R>89 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
90 self.project().inner
91 }
92
93 /// Consumes this `BufWriter`, returning the underlying reader.
94 ///
95 /// Note that any leftover data in the internal buffer is lost.
into_inner(self) -> R96 pub fn into_inner(self) -> R {
97 self.inner
98 }
99
100 /// Returns a reference to the internally buffered data.
101 ///
102 /// Unlike `fill_buf`, this will not attempt to fill the buffer if it is empty.
buffer(&self) -> &[u8]103 pub fn buffer(&self) -> &[u8] {
104 &self.buf
105 }
106
107 /// Invalidates all data in the internal buffer.
108 #[inline]
109 #[cfg(any(feature = "tokio-02", feature = "tokio-03", feature = "tokio"))]
discard_buffer(self: Pin<&mut Self>)110 fn discard_buffer(self: Pin<&mut Self>) {
111 let me = self.project();
112 me.buf.clear();
113 }
114 }
115
116 mod sealed {
117 pub trait Sealed {}
118 }
119
120 #[doc(hidden)]
121 pub trait CombineBuffer<R>: sealed::Sealed {
buffer<'a>(&'a self, read: &'a R) -> &'a [u8]122 fn buffer<'a>(&'a self, read: &'a R) -> &'a [u8];
123
advance(&mut self, read: &mut R, len: usize)124 fn advance(&mut self, read: &mut R, len: usize);
125
126 #[cfg(feature = "pin-project-lite")]
advance_pin(&mut self, read: Pin<&mut R>, len: usize)127 fn advance_pin(&mut self, read: Pin<&mut R>, len: usize);
128 }
129
130 #[doc(hidden)]
131 pub trait CombineSyncRead<R>: CombineBuffer<R> {
extend_buf_sync(&mut self, read: &mut R) -> io::Result<usize>132 fn extend_buf_sync(&mut self, read: &mut R) -> io::Result<usize>;
133 }
134
135 #[cfg(any(feature = "tokio-02", feature = "tokio-03", feature = "tokio"))]
136 #[doc(hidden)]
137 pub trait CombineRead<R, T: ?Sized>: CombineBuffer<R> {
poll_extend_buf( &mut self, cx: &mut Context<'_>, read: Pin<&mut R>, ) -> Poll<io::Result<usize>>138 fn poll_extend_buf(
139 &mut self,
140 cx: &mut Context<'_>,
141 read: Pin<&mut R>,
142 ) -> Poll<io::Result<usize>>;
143 }
144
145 #[cfg(feature = "futures-03")]
146 #[doc(hidden)]
147 pub trait CombineAsyncRead<R>: CombineBuffer<R> {
poll_extend_buf( &mut self, cx: &mut Context<'_>, read: Pin<&mut R>, ) -> Poll<io::Result<usize>>148 fn poll_extend_buf(
149 &mut self,
150 cx: &mut Context<'_>,
151 read: Pin<&mut R>,
152 ) -> Poll<io::Result<usize>>;
153
extend_buf<'a>(&'a mut self, read: Pin<&'a mut R>) -> ExtendBuf<'a, Self, R> where Self: Sized154 fn extend_buf<'a>(&'a mut self, read: Pin<&'a mut R>) -> ExtendBuf<'a, Self, R>
155 where
156 Self: Sized;
157 }
158
159 #[cfg(feature = "futures-03")]
160 pin_project_lite::pin_project! {
161 #[doc(hidden)]
162 pub struct ExtendBuf<'a, C, R> {
163 buffer: &'a mut C,
164 read: Pin<&'a mut R>
165 }
166 }
167
168 #[cfg(feature = "futures-03")]
169 impl<'a, C, R> Future for ExtendBuf<'a, C, R>
170 where
171 C: CombineAsyncRead<R>,
172 {
173 type Output = io::Result<usize>;
174
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>175 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
176 let me = self.project();
177 me.buffer.poll_extend_buf(cx, me.read.as_mut())
178 }
179 }
180
181 /// Marker used by `Decoder` for an internal buffer
182 #[derive(Default)]
183 pub struct Buffer(pub(crate) BytesMut);
184
185 impl sealed::Sealed for Buffer {}
186
187 impl<R> CombineBuffer<R> for Buffer {
buffer<'a>(&'a self, _read: &'a R) -> &'a [u8]188 fn buffer<'a>(&'a self, _read: &'a R) -> &'a [u8] {
189 &self.0
190 }
191
advance(&mut self, _read: &mut R, len: usize)192 fn advance(&mut self, _read: &mut R, len: usize) {
193 self.0.advance(len);
194 }
195
196 #[cfg(feature = "pin-project-lite")]
advance_pin(&mut self, _read: Pin<&mut R>, len: usize)197 fn advance_pin(&mut self, _read: Pin<&mut R>, len: usize) {
198 self.0.advance(len);
199 }
200 }
201
202 impl<R> CombineSyncRead<R> for Buffer
203 where
204 R: Read,
205 {
extend_buf_sync(&mut self, read: &mut R) -> io::Result<usize>206 fn extend_buf_sync(&mut self, read: &mut R) -> io::Result<usize> {
207 extend_buf_sync(&mut self.0, read)
208 }
209 }
210
211 #[cfg(feature = "futures-03")]
212 impl<R> CombineAsyncRead<R> for Buffer
213 where
214 R: futures_io_03::AsyncRead,
215 {
poll_extend_buf( &mut self, cx: &mut Context<'_>, read: Pin<&mut R>, ) -> Poll<io::Result<usize>>216 fn poll_extend_buf(
217 &mut self,
218 cx: &mut Context<'_>,
219 read: Pin<&mut R>,
220 ) -> Poll<io::Result<usize>> {
221 poll_extend_buf(&mut self.0, cx, read)
222 }
223
extend_buf<'a>(&'a mut self, read: Pin<&'a mut R>) -> ExtendBuf<'a, Self, R>224 fn extend_buf<'a>(&'a mut self, read: Pin<&'a mut R>) -> ExtendBuf<'a, Self, R> {
225 if !self.0.has_remaining_mut() {
226 self.0.reserve(8 * 1024);
227 }
228 // Copy of tokio's read_buf method (but it has to force initialize the buffer)
229 let bs = self.0.chunk_mut();
230
231 for i in 0..bs.len() {
232 bs.write_byte(i, 0);
233 }
234 ExtendBuf { buffer: self, read }
235 }
236 }
237
238 #[cfg(feature = "tokio-02")]
239 impl<R> CombineRead<R, dyn tokio_02_dep::io::AsyncRead> for Buffer
240 where
241 R: tokio_02_dep::io::AsyncRead,
242 {
poll_extend_buf( &mut self, cx: &mut Context<'_>, read: Pin<&mut R>, ) -> Poll<io::Result<usize>>243 fn poll_extend_buf(
244 &mut self,
245 cx: &mut Context<'_>,
246 read: Pin<&mut R>,
247 ) -> Poll<io::Result<usize>> {
248 if !self.0.has_remaining_mut() {
249 self.0.reserve(8 * 1024);
250 }
251 read.poll_read_buf(cx, &mut Bytes05(&mut self.0))
252 }
253 }
254
255 #[cfg(feature = "tokio-03")]
tokio_03_to_read_buf(bs: &mut BytesMut) -> tokio_03_dep::io::ReadBuf<'_>256 fn tokio_03_to_read_buf(bs: &mut BytesMut) -> tokio_03_dep::io::ReadBuf<'_> {
257 let uninit = bs.chunk_mut();
258 unsafe {
259 tokio_03_dep::io::ReadBuf::uninit(std::slice::from_raw_parts_mut(
260 uninit.as_mut_ptr() as *mut MaybeUninit<u8>,
261 uninit.len(),
262 ))
263 }
264 }
265
266 #[cfg(feature = "tokio-03")]
267 impl<R> CombineRead<R, dyn tokio_03_dep::io::AsyncRead> for Buffer
268 where
269 R: tokio_03_dep::io::AsyncRead,
270 {
poll_extend_buf( &mut self, cx: &mut Context<'_>, read: Pin<&mut R>, ) -> Poll<io::Result<usize>>271 fn poll_extend_buf(
272 &mut self,
273 cx: &mut Context<'_>,
274 read: Pin<&mut R>,
275 ) -> Poll<io::Result<usize>> {
276 tokio_03_read_buf(cx, read, &mut self.0)
277 }
278 }
279
280 #[cfg(feature = "tokio-03")]
tokio_03_read_buf( cx: &mut Context<'_>, read: Pin<&mut impl tokio_03_dep::io::AsyncRead>, bs: &mut bytes::BytesMut, ) -> Poll<io::Result<usize>>281 fn tokio_03_read_buf(
282 cx: &mut Context<'_>,
283 read: Pin<&mut impl tokio_03_dep::io::AsyncRead>,
284 bs: &mut bytes::BytesMut,
285 ) -> Poll<io::Result<usize>> {
286 if !bs.has_remaining_mut() {
287 bs.reserve(8 * 1024);
288 }
289
290 let mut buf = tokio_03_to_read_buf(bs);
291 ready!(read.poll_read(cx, &mut buf))?;
292 unsafe {
293 let n = buf.filled().len();
294 bs.advance_mut(n);
295 Poll::Ready(Ok(n))
296 }
297 }
298
299 #[cfg(feature = "tokio")]
300 impl<R> CombineRead<R, dyn tokio_dep::io::AsyncRead> for Buffer
301 where
302 R: tokio_dep::io::AsyncRead,
303 {
poll_extend_buf( &mut self, cx: &mut Context<'_>, read: Pin<&mut R>, ) -> Poll<io::Result<usize>>304 fn poll_extend_buf(
305 &mut self,
306 cx: &mut Context<'_>,
307 read: Pin<&mut R>,
308 ) -> Poll<io::Result<usize>> {
309 tokio_read_buf(read, cx, &mut self.0)
310 }
311 }
312
313 #[cfg(feature = "tokio")]
tokio_read_buf( read: Pin<&mut impl tokio_dep::io::AsyncRead>, cx: &mut Context<'_>, bs: &mut bytes::BytesMut, ) -> Poll<io::Result<usize>>314 fn tokio_read_buf(
315 read: Pin<&mut impl tokio_dep::io::AsyncRead>,
316 cx: &mut Context<'_>,
317 bs: &mut bytes::BytesMut,
318 ) -> Poll<io::Result<usize>> {
319 if !bs.has_remaining_mut() {
320 bs.reserve(8 * 1024);
321 }
322
323 tokio_util::io::poll_read_buf(read, cx, bs)
324 }
325
326 /// Marker used by `Decoder` for an external buffer
327 #[derive(Default)]
328 pub struct Bufferless;
329
330 impl sealed::Sealed for Bufferless {}
331
332 impl<R> CombineBuffer<BufReader<R>> for Bufferless {
buffer<'a>(&'a self, read: &'a BufReader<R>) -> &'a [u8]333 fn buffer<'a>(&'a self, read: &'a BufReader<R>) -> &'a [u8] {
334 &read.buf
335 }
336
advance(&mut self, read: &mut BufReader<R>, len: usize)337 fn advance(&mut self, read: &mut BufReader<R>, len: usize) {
338 read.buf.advance(len);
339 }
340
341 #[cfg(feature = "pin-project-lite")]
advance_pin(&mut self, read: Pin<&mut BufReader<R>>, len: usize)342 fn advance_pin(&mut self, read: Pin<&mut BufReader<R>>, len: usize) {
343 read.project().buf.advance(len);
344 }
345 }
346
347 impl<R> CombineSyncRead<BufReader<R>> for Bufferless
348 where
349 R: Read,
350 {
extend_buf_sync(&mut self, read: &mut BufReader<R>) -> io::Result<usize>351 fn extend_buf_sync(&mut self, read: &mut BufReader<R>) -> io::Result<usize> {
352 extend_buf_sync(&mut read.buf, &mut read.inner)
353 }
354 }
355
extend_buf_sync<R>(buf: &mut BytesMut, read: &mut R) -> io::Result<usize> where R: Read,356 fn extend_buf_sync<R>(buf: &mut BytesMut, read: &mut R) -> io::Result<usize>
357 where
358 R: Read,
359 {
360 let size = 8 * 1024;
361 if !buf.has_remaining_mut() {
362 buf.reserve(size);
363 }
364
365 // Copy of tokio's poll_read_buf method (but it has to force initialize the buffer)
366 let n = {
367 let bs = buf.chunk_mut();
368
369 let initial_size = bs.len().min(size);
370 let bs = &mut bs[..initial_size];
371 for i in 0..bs.len() {
372 bs.write_byte(i, 0);
373 }
374
375 // Convert to `&mut [u8]`
376 // SAFETY: the entire buffer is preinitialized above
377 let bs = unsafe { &mut *(bs as *mut _ as *mut [u8]) };
378
379 let n = read.read(bs)?;
380 assert!(
381 n <= bs.len(),
382 "AsyncRead reported that it initialized more than the number of bytes in the buffer"
383 );
384 n
385 };
386
387 // SAFETY: the entire buffer has been preinitialized
388 unsafe { buf.advance_mut(n) };
389
390 Ok(n)
391 }
392
393 #[cfg(feature = "tokio-02")]
394 struct Bytes05<'a>(&'a mut BytesMut);
395
396 #[cfg(feature = "tokio-02")]
397 impl bytes_05::BufMut for Bytes05<'_> {
remaining_mut(&self) -> usize398 fn remaining_mut(&self) -> usize {
399 self.0.remaining_mut()
400 }
advance_mut(&mut self, cnt: usize)401 unsafe fn advance_mut(&mut self, cnt: usize) {
402 self.0.advance_mut(cnt)
403 }
bytes_mut(&mut self) -> &mut [MaybeUninit<u8>]404 fn bytes_mut(&mut self) -> &mut [MaybeUninit<u8>] {
405 unsafe { &mut *(self.0.chunk_mut() as *mut _ as *mut [MaybeUninit<u8>]) }
406 }
407 }
408
409 #[cfg(feature = "tokio-02")]
410 impl<R> CombineRead<BufReader<R>, dyn tokio_02_dep::io::AsyncRead> for Bufferless
411 where
412 R: tokio_02_dep::io::AsyncRead,
413 {
poll_extend_buf( &mut self, cx: &mut Context<'_>, read: Pin<&mut BufReader<R>>, ) -> Poll<io::Result<usize>>414 fn poll_extend_buf(
415 &mut self,
416 cx: &mut Context<'_>,
417 read: Pin<&mut BufReader<R>>,
418 ) -> Poll<io::Result<usize>> {
419 let me = read.project();
420
421 if !me.buf.has_remaining_mut() {
422 me.buf.reserve(8 * 1024);
423 }
424 tokio_02_dep::io::AsyncRead::poll_read_buf(me.inner, cx, &mut Bytes05(me.buf))
425 }
426 }
427
428 #[cfg(feature = "tokio-03")]
429 impl<R> CombineRead<BufReader<R>, dyn tokio_03_dep::io::AsyncRead> for Bufferless
430 where
431 R: tokio_03_dep::io::AsyncRead,
432 {
poll_extend_buf( &mut self, cx: &mut Context<'_>, read: Pin<&mut BufReader<R>>, ) -> Poll<io::Result<usize>>433 fn poll_extend_buf(
434 &mut self,
435 cx: &mut Context<'_>,
436 read: Pin<&mut BufReader<R>>,
437 ) -> Poll<io::Result<usize>> {
438 let me = read.project();
439
440 tokio_03_read_buf(cx, me.inner, me.buf)
441 }
442 }
443
444 #[cfg(feature = "tokio")]
445 impl<R> CombineRead<BufReader<R>, dyn tokio_dep::io::AsyncRead> for Bufferless
446 where
447 R: tokio_dep::io::AsyncRead,
448 {
poll_extend_buf( &mut self, cx: &mut Context<'_>, read: Pin<&mut BufReader<R>>, ) -> Poll<io::Result<usize>>449 fn poll_extend_buf(
450 &mut self,
451 cx: &mut Context<'_>,
452 read: Pin<&mut BufReader<R>>,
453 ) -> Poll<io::Result<usize>> {
454 let me = read.project();
455
456 tokio_read_buf(me.inner, cx, me.buf)
457 }
458 }
459
460 #[cfg(feature = "futures-03")]
461 impl<R> CombineAsyncRead<BufReader<R>> for Bufferless
462 where
463 R: futures_io_03::AsyncRead,
464 {
poll_extend_buf( &mut self, cx: &mut Context<'_>, read: Pin<&mut BufReader<R>>, ) -> Poll<io::Result<usize>>465 fn poll_extend_buf(
466 &mut self,
467 cx: &mut Context<'_>,
468 read: Pin<&mut BufReader<R>>,
469 ) -> Poll<io::Result<usize>> {
470 let me = read.project();
471
472 poll_extend_buf(me.buf, cx, me.inner)
473 }
474
extend_buf<'a>( &'a mut self, mut read: Pin<&'a mut BufReader<R>>, ) -> ExtendBuf<'a, Self, BufReader<R>>475 fn extend_buf<'a>(
476 &'a mut self,
477 mut read: Pin<&'a mut BufReader<R>>,
478 ) -> ExtendBuf<'a, Self, BufReader<R>> {
479 let me = read.as_mut().project();
480
481 if !me.buf.has_remaining_mut() {
482 me.buf.reserve(8 * 1024);
483 }
484 // Copy of tokio's read_buf method (but it has to force initialize the buffer)
485 let bs = me.buf.chunk_mut();
486
487 for i in 0..bs.len() {
488 bs.write_byte(i, 0);
489 }
490 ExtendBuf { buffer: self, read }
491 }
492 }
493
494 #[cfg(feature = "futures-03")]
poll_extend_buf<R>( buf: &mut BytesMut, cx: &mut Context<'_>, read: Pin<&mut R>, ) -> Poll<io::Result<usize>> where R: futures_io_03::AsyncRead,495 fn poll_extend_buf<R>(
496 buf: &mut BytesMut,
497 cx: &mut Context<'_>,
498 read: Pin<&mut R>,
499 ) -> Poll<io::Result<usize>>
500 where
501 R: futures_io_03::AsyncRead,
502 {
503 // Copy of tokio's read_buf method (but it has to force initialize the buffer)
504 let n = {
505 let bs = buf.chunk_mut();
506 // preinit the buffer
507 for i in 0..bs.len() {
508 bs.write_byte(i, 0);
509 }
510
511 // Convert to `&mut [u8]`
512 // SAFETY: preinitialize the buffer
513 let bs = unsafe { &mut *(bs as *mut _ as *mut [u8]) };
514
515 let n = ready!(read.poll_read(cx, bs))?;
516 assert!(
517 n <= bs.len(),
518 "AsyncRead reported that it initialized more than the number of bytes in the buffer"
519 );
520 n
521 };
522 // SAFETY: the buffer was preinitialized
523 unsafe { buf.advance_mut(n) };
524 Poll::Ready(Ok(n))
525 }
526
527 #[cfg(feature = "tokio-02")]
528 impl<R: tokio_02_dep::io::AsyncRead> tokio_02_dep::io::AsyncRead for BufReader<R> {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>>529 fn poll_read(
530 mut self: Pin<&mut Self>,
531 cx: &mut Context<'_>,
532 buf: &mut [u8],
533 ) -> Poll<io::Result<usize>> {
534 use tokio_02_dep::io::AsyncBufRead;
535
536 // If we don't have any buffered data and we're doing a massive read
537 // (larger than our internal buffer), bypass our internal buffer
538 // entirely.
539 if !self.buf.has_remaining_mut() && buf.len() >= self.buf.len() {
540 let res = ready!(self.as_mut().get_pin_mut().poll_read(cx, buf));
541 self.discard_buffer();
542 return Poll::Ready(res);
543 }
544 let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
545 let nread = rem.read(buf)?;
546 self.consume(nread);
547 Poll::Ready(Ok(nread))
548 }
549
550 // we can't skip unconditionally because of the large buffer case in read.
prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool551 unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool {
552 self.inner.prepare_uninitialized_buffer(buf)
553 }
554 }
555
556 #[cfg(feature = "tokio-02")]
557 impl<R: tokio_02_dep::io::AsyncRead> tokio_02_dep::io::AsyncBufRead for BufReader<R> {
poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>558 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
559 let me = self.project();
560
561 // If we've reached the end of our internal buffer then we need to fetch
562 // some more data from the underlying reader.
563 // Branch using `>=` instead of the more correct `==`
564 // to tell the compiler that the pos..cap slice is always valid.
565
566 if me.buf.is_empty() {
567 ready!(me.inner.poll_read_buf(cx, &mut Bytes05(me.buf)))?;
568 }
569 Poll::Ready(Ok(&me.buf[..]))
570 }
571
consume(self: Pin<&mut Self>, amt: usize)572 fn consume(self: Pin<&mut Self>, amt: usize) {
573 let me = self.project();
574 me.buf.advance(amt);
575 }
576 }
577
578 #[cfg(feature = "tokio-02")]
579 impl<R: tokio_02_dep::io::AsyncRead + tokio_02_dep::io::AsyncWrite> tokio_02_dep::io::AsyncWrite
580 for BufReader<R>
581 {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>582 fn poll_write(
583 self: Pin<&mut Self>,
584 cx: &mut Context<'_>,
585 buf: &[u8],
586 ) -> Poll<io::Result<usize>> {
587 self.get_pin_mut().poll_write(cx, buf)
588 }
589
poll_write_buf<B: bytes_05::Buf>( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut B, ) -> Poll<io::Result<usize>>590 fn poll_write_buf<B: bytes_05::Buf>(
591 self: Pin<&mut Self>,
592 cx: &mut Context<'_>,
593 buf: &mut B,
594 ) -> Poll<io::Result<usize>> {
595 self.get_pin_mut().poll_write_buf(cx, buf)
596 }
597
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>598 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
599 self.get_pin_mut().poll_flush(cx)
600 }
601
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>602 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
603 self.get_pin_mut().poll_shutdown(cx)
604 }
605 }
606
607 #[cfg(feature = "tokio-03")]
608 impl<R: tokio_03_dep::io::AsyncRead> tokio_03_dep::io::AsyncRead for BufReader<R> {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut tokio_03_dep::io::ReadBuf<'_>, ) -> Poll<io::Result<()>>609 fn poll_read(
610 mut self: Pin<&mut Self>,
611 cx: &mut Context<'_>,
612 buf: &mut tokio_03_dep::io::ReadBuf<'_>,
613 ) -> Poll<io::Result<()>> {
614 // If we don't have any buffered data and we're doing a massive read
615 // (larger than our internal buffer), bypass our internal buffer
616 // entirely.
617 if !self.buf.has_remaining_mut() && buf.remaining() >= self.buf.len() {
618 let res = ready!(self.as_mut().get_pin_mut().poll_read(cx, buf));
619 self.discard_buffer();
620 return Poll::Ready(res);
621 }
622 let rem = ready!(self.as_mut().poll_fill_buf(cx))?;
623 let amt = std::cmp::min(rem.len(), buf.remaining());
624 buf.put_slice(&rem[..amt]);
625 self.consume(amt);
626 Poll::Ready(Ok(()))
627 }
628 }
629
630 #[cfg(feature = "tokio-03")]
631 impl<R: tokio_03_dep::io::AsyncRead> tokio_03_dep::io::AsyncBufRead for BufReader<R> {
poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>632 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
633 let me = self.project();
634
635 // If we've reached the end of our internal buffer then we need to fetch
636 // some more data from the underlying reader.
637 if me.buf.is_empty() {
638 ready!(tokio_03_read_buf(cx, me.inner, me.buf))?;
639 }
640 Poll::Ready(Ok(&me.buf[..]))
641 }
642
consume(self: Pin<&mut Self>, amt: usize)643 fn consume(self: Pin<&mut Self>, amt: usize) {
644 let me = self.project();
645 me.buf.advance(amt);
646 }
647 }
648
649 #[cfg(feature = "tokio-03")]
650 impl<R: tokio_03_dep::io::AsyncRead + tokio_03_dep::io::AsyncWrite> tokio_03_dep::io::AsyncWrite
651 for BufReader<R>
652 {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>653 fn poll_write(
654 self: Pin<&mut Self>,
655 cx: &mut Context<'_>,
656 buf: &[u8],
657 ) -> Poll<io::Result<usize>> {
658 self.get_pin_mut().poll_write(cx, buf)
659 }
660
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>661 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
662 self.get_pin_mut().poll_flush(cx)
663 }
664
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>665 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
666 self.get_pin_mut().poll_shutdown(cx)
667 }
668 }
669
670 #[cfg(feature = "tokio")]
671 impl<R: tokio_dep::io::AsyncRead> tokio_dep::io::AsyncRead for BufReader<R> {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut tokio_dep::io::ReadBuf<'_>, ) -> Poll<io::Result<()>>672 fn poll_read(
673 mut self: Pin<&mut Self>,
674 cx: &mut Context<'_>,
675 buf: &mut tokio_dep::io::ReadBuf<'_>,
676 ) -> Poll<io::Result<()>> {
677 // If we don't have any buffered data and we're doing a massive read
678 // (larger than our internal buffer), bypass our internal buffer
679 // entirely.
680 if !self.buf.has_remaining_mut() && buf.remaining() >= self.buf.len() {
681 let res = ready!(self.as_mut().get_pin_mut().poll_read(cx, buf));
682 self.discard_buffer();
683 return Poll::Ready(res);
684 }
685 let rem = ready!(self.as_mut().poll_fill_buf(cx))?;
686 let amt = std::cmp::min(rem.len(), buf.remaining());
687 buf.put_slice(&rem[..amt]);
688 self.consume(amt);
689 Poll::Ready(Ok(()))
690 }
691 }
692
693 #[cfg(feature = "tokio")]
694 impl<R: tokio_dep::io::AsyncRead> tokio_dep::io::AsyncBufRead for BufReader<R> {
poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>695 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
696 let me = self.project();
697
698 // If we've reached the end of our internal buffer then we need to fetch
699 // some more data from the underlying reader.
700 if me.buf.is_empty() {
701 ready!(tokio_read_buf(me.inner, cx, me.buf))?;
702 }
703 Poll::Ready(Ok(&me.buf[..]))
704 }
705
consume(self: Pin<&mut Self>, amt: usize)706 fn consume(self: Pin<&mut Self>, amt: usize) {
707 let me = self.project();
708 me.buf.advance(amt);
709 }
710 }
711
712 #[cfg(feature = "tokio")]
713 impl<R: tokio_dep::io::AsyncRead + tokio_dep::io::AsyncWrite> tokio_dep::io::AsyncWrite
714 for BufReader<R>
715 {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>716 fn poll_write(
717 self: Pin<&mut Self>,
718 cx: &mut Context<'_>,
719 buf: &[u8],
720 ) -> Poll<io::Result<usize>> {
721 self.get_pin_mut().poll_write(cx, buf)
722 }
723
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>724 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
725 self.get_pin_mut().poll_flush(cx)
726 }
727
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>728 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
729 self.get_pin_mut().poll_shutdown(cx)
730 }
731 }
732
733 impl<R: Read> Read for BufReader<R> {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>734 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
735 // If we don't have any buffered data and we're doing a massive read
736 // (larger than our internal buffer), bypass our internal buffer
737 // entirely.
738 if !self.buf.has_remaining_mut() && buf.len() >= self.buf.len() {
739 let res = self.read(buf);
740 self.buf.clear();
741 return res;
742 }
743 let nread = {
744 let mut rem = self.fill_buf()?;
745 rem.read(buf)?
746 };
747 self.consume(nread);
748 Ok(nread)
749 }
750 }
751
752 impl<R: Read> BufRead for BufReader<R> {
fill_buf(&mut self) -> io::Result<&[u8]>753 fn fill_buf(&mut self) -> io::Result<&[u8]> {
754 // If we've reached the end of our internal buffer then we need to fetch
755 // some more data from the underlying reader.
756 // Branch using `>=` instead of the more correct `==`
757 // to tell the compiler that the pos..cap slice is always valid.
758
759 if self.buf.is_empty() {
760 Bufferless.extend_buf_sync(self)?;
761 }
762 Ok(&self.buf[..])
763 }
764
consume(&mut self, amt: usize)765 fn consume(&mut self, amt: usize) {
766 self.buf.advance(amt);
767 }
768 }
769
770 #[cfg(test)]
771 #[cfg(feature = "tokio-02")]
772 mod tests {
773 use super::{BufReader, Bufferless, CombineRead};
774
775 use std::{io, pin::Pin};
776
777 use {
778 bytes_05::BytesMut,
779 tokio_02_dep::{
780 self as tokio,
781 io::{AsyncRead, AsyncReadExt},
782 },
783 };
784
785 impl<R: AsyncRead> BufReader<R> {
extend_buf_tokio_02(mut self: Pin<&mut Self>) -> io::Result<usize>786 async fn extend_buf_tokio_02(mut self: Pin<&mut Self>) -> io::Result<usize> {
787 crate::future_ext::poll_fn(|cx| Bufferless.poll_extend_buf(cx, self.as_mut())).await
788 }
789 }
790
791 #[tokio::test]
buf_reader()792 async fn buf_reader() {
793 let mut read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
794
795 let mut buf = [0u8; 3];
796 read.read(&mut buf).await.unwrap();
797 assert_eq!(buf, [1, 2, 3]);
798
799 let mut buf = [0u8; 3];
800 read.read(&mut buf).await.unwrap();
801 assert_eq!(buf, [4, 5, 6]);
802
803 let mut buf = [0u8; 3];
804 read.read(&mut buf).await.unwrap();
805 assert_eq!(buf, [7, 8, 9]);
806
807 let mut buf = [1u8; 3];
808 read.read(&mut buf).await.unwrap();
809 assert_eq!(buf, [0, 1, 1]);
810 }
811
812 #[tokio::test]
buf_reader_buf()813 async fn buf_reader_buf() {
814 let mut read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
815
816 let mut buf = BytesMut::with_capacity(3);
817 read.read_buf(&mut buf).await.unwrap();
818 assert_eq!(&buf[..], [1, 2, 3]);
819
820 read.read_buf(&mut buf).await.unwrap();
821 assert_eq!(&buf[..], [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]);
822 }
823
824 #[tokio::test]
buf_reader_extend_buf()825 async fn buf_reader_extend_buf() {
826 let read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
827 futures_03_dep::pin_mut!(read);
828
829 assert_eq!(read.as_mut().extend_buf_tokio_02().await.unwrap(), 3);
830 assert_eq!(read.buffer(), [1, 2, 3]);
831
832 assert_eq!(read.as_mut().extend_buf_tokio_02().await.unwrap(), 7);
833 assert_eq!(read.buffer(), [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]);
834 }
835 }
836
837 #[cfg(test)]
838 #[cfg(feature = "tokio")]
839 mod tests_tokio_1 {
840 use super::{BufReader, Bufferless, CombineRead};
841
842 use std::{io, pin::Pin};
843
844 use {
845 bytes::BytesMut,
846 tokio_dep::{
847 self as tokio,
848 io::{AsyncRead, AsyncReadExt},
849 },
850 };
851
852 impl<R: AsyncRead> BufReader<R> {
extend_buf_tokio(mut self: Pin<&mut Self>) -> io::Result<usize>853 async fn extend_buf_tokio(mut self: Pin<&mut Self>) -> io::Result<usize> {
854 crate::future_ext::poll_fn(|cx| Bufferless.poll_extend_buf(cx, self.as_mut())).await
855 }
856 }
857
858 #[tokio::test]
buf_reader()859 async fn buf_reader() {
860 let mut read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
861
862 let mut buf = [0u8; 3];
863 read.read(&mut buf).await.unwrap();
864 assert_eq!(buf, [1, 2, 3]);
865
866 let mut buf = [0u8; 3];
867 read.read(&mut buf).await.unwrap();
868 assert_eq!(buf, [4, 5, 6]);
869
870 let mut buf = [0u8; 3];
871 read.read(&mut buf).await.unwrap();
872 assert_eq!(buf, [7, 8, 9]);
873
874 let mut buf = [1u8; 3];
875 read.read(&mut buf).await.unwrap();
876 assert_eq!(buf, [0, 1, 1]);
877 }
878
879 #[tokio::test]
buf_reader_buf()880 async fn buf_reader_buf() {
881 let mut read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
882
883 let mut buf = BytesMut::with_capacity(3);
884 read.read_buf(&mut buf).await.unwrap();
885 assert_eq!(&buf[..], [1, 2, 3]);
886
887 read.read_buf(&mut buf).await.unwrap();
888 assert_eq!(&buf[..], [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]);
889 }
890
891 #[tokio::test]
buf_reader_extend_buf()892 async fn buf_reader_extend_buf() {
893 let read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
894 futures_03_dep::pin_mut!(read);
895
896 assert_eq!(read.as_mut().extend_buf_tokio().await.unwrap(), 3);
897 assert_eq!(read.buffer(), [1, 2, 3]);
898
899 assert_eq!(read.as_mut().extend_buf_tokio().await.unwrap(), 7);
900 assert_eq!(read.buffer(), [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]);
901 }
902 }
903
904 #[cfg(test)]
905 mod tests_sync {
906 use super::{BufReader, Bufferless, CombineSyncRead};
907
908 use std::io::Read;
909
910 #[test]
911 #[allow(clippy::unused_io_amount)]
buf_reader()912 fn buf_reader() {
913 let mut read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
914
915 let mut buf = [0u8; 3];
916 read.read(&mut buf).unwrap();
917 assert_eq!(buf, [1, 2, 3]);
918
919 let mut buf = [0u8; 3];
920 read.read(&mut buf).unwrap();
921 assert_eq!(buf, [4, 5, 6]);
922
923 let mut buf = [0u8; 3];
924 read.read(&mut buf).unwrap();
925 assert_eq!(buf, [7, 8, 9]);
926
927 let mut buf = [1u8; 3];
928 read.read(&mut buf).unwrap();
929 assert_eq!(buf, [0, 1, 1]);
930 }
931
932 #[test]
buf_reader_extend_buf()933 fn buf_reader_extend_buf() {
934 let mut read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
935
936 assert_eq!(Bufferless.extend_buf_sync(&mut read).unwrap(), 3);
937 assert_eq!(read.buffer(), [1, 2, 3]);
938
939 assert_eq!(Bufferless.extend_buf_sync(&mut read).unwrap(), 7);
940 assert_eq!(read.buffer(), [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]);
941 }
942 }
943