1 use futures::executor::block_on;
2 use futures::future::{Future, FutureExt};
3 use futures::io::{AsyncBufReadExt, AsyncRead, Cursor};
4 use futures::stream::{self, StreamExt, TryStreamExt};
5 use futures::task::Poll;
6 use futures_test::io::AsyncReadTestExt;
7 use futures_test::task::noop_context;
8
run<F: Future + Unpin>(mut f: F) -> F::Output9 fn run<F: Future + Unpin>(mut f: F) -> F::Output {
10 let mut cx = noop_context();
11 loop {
12 if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
13 return x;
14 }
15 }
16 }
17
18 macro_rules! block_on_next {
19 ($expr:expr) => {
20 block_on($expr.next()).unwrap().unwrap()
21 };
22 }
23
24 macro_rules! run_next {
25 ($expr:expr) => {
26 run($expr.next()).unwrap().unwrap()
27 };
28 }
29
30 struct IOErrorRead(bool);
31
32 impl AsyncRead for IOErrorRead {
poll_read( mut self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>, b: &mut [u8], ) -> Poll<std::io::Result<usize>>33 fn poll_read(
34 mut self: std::pin::Pin<&mut Self>,
35 _cx: &mut std::task::Context<'_>,
36 b: &mut [u8],
37 ) -> Poll<std::io::Result<usize>> {
38 if self.0 {
39 Poll::Ready(Err(std::io::ErrorKind::InvalidInput.into()))
40 } else {
41 self.0 = true;
42 b[..16].fill(b'x');
43 Ok(16).into()
44 }
45 }
46 }
47
48 #[test]
lines()49 fn lines() {
50 let buf = Cursor::new(&b"12\r"[..]);
51 let mut s = buf.lines();
52 assert_eq!(block_on_next!(s), "12\r".to_string());
53 assert!(block_on(s.next()).is_none());
54
55 let buf = Cursor::new(&b"12\r\n\n"[..]);
56 let mut s = buf.lines();
57 assert_eq!(block_on_next!(s), "12".to_string());
58 assert_eq!(block_on_next!(s), "".to_string());
59 assert!(block_on(s.next()).is_none());
60 }
61
62 #[test]
maybe_pending()63 fn maybe_pending() {
64 let buf =
65 stream::iter(vec![&b"12"[..], &b"\r"[..]]).map(Ok).into_async_read().interleave_pending();
66 let mut s = buf.lines();
67 assert_eq!(run_next!(s), "12\r".to_string());
68 assert!(run(s.next()).is_none());
69
70 let buf = stream::iter(vec![&b"12"[..], &b"\r\n"[..], &b"\n"[..]])
71 .map(Ok)
72 .into_async_read()
73 .interleave_pending();
74 let mut s = buf.lines();
75 assert_eq!(run_next!(s), "12".to_string());
76 assert_eq!(run_next!(s), "".to_string());
77 assert!(run(s.next()).is_none());
78 }
79
80 #[test]
issue2862()81 fn issue2862() {
82 let mut lines = futures::io::BufReader::new(IOErrorRead(false)).lines();
83 assert!(block_on(lines.next()).unwrap().is_err())
84 }
85