1 use futures::executor::block_on;
2 use futures::future::{Future, FutureExt};
3 use futures::io::{AsyncBufReadExt, AsyncRead, Cursor};
4 use futures::stream::{self, StreamExt, TryStreamExt};
5 use futures::task::Poll;
6 use futures_test::io::AsyncReadTestExt;
7 use futures_test::task::noop_context;
8 
run<F: Future + Unpin>(mut f: F) -> F::Output9 fn run<F: Future + Unpin>(mut f: F) -> F::Output {
10     let mut cx = noop_context();
11     loop {
12         if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
13             return x;
14         }
15     }
16 }
17 
18 macro_rules! block_on_next {
19     ($expr:expr) => {
20         block_on($expr.next()).unwrap().unwrap()
21     };
22 }
23 
24 macro_rules! run_next {
25     ($expr:expr) => {
26         run($expr.next()).unwrap().unwrap()
27     };
28 }
29 
30 struct IOErrorRead(bool);
31 
32 impl AsyncRead for IOErrorRead {
poll_read( mut self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>, b: &mut [u8], ) -> Poll<std::io::Result<usize>>33     fn poll_read(
34         mut self: std::pin::Pin<&mut Self>,
35         _cx: &mut std::task::Context<'_>,
36         b: &mut [u8],
37     ) -> Poll<std::io::Result<usize>> {
38         if self.0 {
39             Poll::Ready(Err(std::io::ErrorKind::InvalidInput.into()))
40         } else {
41             self.0 = true;
42             b[..16].fill(b'x');
43             Ok(16).into()
44         }
45     }
46 }
47 
48 #[test]
lines()49 fn lines() {
50     let buf = Cursor::new(&b"12\r"[..]);
51     let mut s = buf.lines();
52     assert_eq!(block_on_next!(s), "12\r".to_string());
53     assert!(block_on(s.next()).is_none());
54 
55     let buf = Cursor::new(&b"12\r\n\n"[..]);
56     let mut s = buf.lines();
57     assert_eq!(block_on_next!(s), "12".to_string());
58     assert_eq!(block_on_next!(s), "".to_string());
59     assert!(block_on(s.next()).is_none());
60 }
61 
62 #[test]
maybe_pending()63 fn maybe_pending() {
64     let buf =
65         stream::iter(vec![&b"12"[..], &b"\r"[..]]).map(Ok).into_async_read().interleave_pending();
66     let mut s = buf.lines();
67     assert_eq!(run_next!(s), "12\r".to_string());
68     assert!(run(s.next()).is_none());
69 
70     let buf = stream::iter(vec![&b"12"[..], &b"\r\n"[..], &b"\n"[..]])
71         .map(Ok)
72         .into_async_read()
73         .interleave_pending();
74     let mut s = buf.lines();
75     assert_eq!(run_next!(s), "12".to_string());
76     assert_eq!(run_next!(s), "".to_string());
77     assert!(run(s.next()).is_none());
78 }
79 
80 #[test]
issue2862()81 fn issue2862() {
82     let mut lines = futures::io::BufReader::new(IOErrorRead(false)).lines();
83     assert!(block_on(lines.next()).unwrap().is_err())
84 }
85