1 #![cfg(feature = "io-util")]
2 #![cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
3
4 use std::error::Error;
5 use std::io::{Cursor, Read, Result as IoResult, Write};
6 use tokio::io::{AsyncRead, AsyncReadExt};
7 use tokio_util::io::SyncIoBridge;
8
test_reader_len( r: impl AsyncRead + Unpin + Send + 'static, expected_len: usize, ) -> IoResult<()>9 async fn test_reader_len(
10 r: impl AsyncRead + Unpin + Send + 'static,
11 expected_len: usize,
12 ) -> IoResult<()> {
13 let mut r = SyncIoBridge::new(r);
14 let res = tokio::task::spawn_blocking(move || {
15 let mut buf = Vec::new();
16 r.read_to_end(&mut buf)?;
17 Ok::<_, std::io::Error>(buf)
18 })
19 .await?;
20 assert_eq!(res?.len(), expected_len);
21 Ok(())
22 }
23
24 #[tokio::test]
test_async_read_to_sync() -> Result<(), Box<dyn Error>>25 async fn test_async_read_to_sync() -> Result<(), Box<dyn Error>> {
26 test_reader_len(tokio::io::empty(), 0).await?;
27 let buf = b"hello world";
28 test_reader_len(Cursor::new(buf), buf.len()).await?;
29 Ok(())
30 }
31
32 #[tokio::test]
test_async_write_to_sync() -> Result<(), Box<dyn Error>>33 async fn test_async_write_to_sync() -> Result<(), Box<dyn Error>> {
34 let mut dest = Vec::new();
35 let src = b"hello world";
36 let dest = tokio::task::spawn_blocking(move || -> Result<_, String> {
37 let mut w = SyncIoBridge::new(Cursor::new(&mut dest));
38 std::io::copy(&mut Cursor::new(src), &mut w).map_err(|e| e.to_string())?;
39 Ok(dest)
40 })
41 .await??;
42 assert_eq!(dest.as_slice(), src);
43 Ok(())
44 }
45
46 #[tokio::test]
test_into_inner() -> Result<(), Box<dyn Error>>47 async fn test_into_inner() -> Result<(), Box<dyn Error>> {
48 let mut buf = Vec::new();
49 SyncIoBridge::new(tokio::io::empty())
50 .into_inner()
51 .read_to_end(&mut buf)
52 .await
53 .unwrap();
54 assert_eq!(buf.len(), 0);
55 Ok(())
56 }
57
58 #[tokio::test]
test_shutdown() -> Result<(), Box<dyn Error>>59 async fn test_shutdown() -> Result<(), Box<dyn Error>> {
60 let (s1, mut s2) = tokio::io::duplex(1024);
61 let (_rh, wh) = tokio::io::split(s1);
62 tokio::task::spawn_blocking(move || -> std::io::Result<_> {
63 let mut wh = SyncIoBridge::new(wh);
64 wh.write_all(b"hello")?;
65 wh.shutdown()?;
66 assert!(wh.write_all(b" world").is_err());
67 Ok(())
68 })
69 .await??;
70 let mut buf = vec![];
71 s2.read_to_end(&mut buf).await?;
72 assert_eq!(buf, b"hello");
73 Ok(())
74 }
75