1 #![warn(rust_2018_idioms)]
2 
3 use bytes::Bytes;
4 use futures_util::SinkExt;
5 use std::io::{self, Error, ErrorKind};
6 use tokio::io::AsyncWriteExt;
7 use tokio_util::codec::{Encoder, FramedWrite};
8 use tokio_util::io::{CopyToBytes, SinkWriter};
9 use tokio_util::sync::PollSender;
10 
11 #[tokio::test]
test_copied_sink_writer() -> Result<(), Error>12 async fn test_copied_sink_writer() -> Result<(), Error> {
13     // Construct a channel pair to send data across and wrap a pollable sink.
14     // Note that the sink must mimic a writable object, e.g. have `std::io::Error`
15     // as its error type.
16     // As `PollSender` requires an owned copy of the buffer, we wrap it additionally
17     // with a `CopyToBytes` helper.
18     let (tx, mut rx) = tokio::sync::mpsc::channel::<Bytes>(1);
19     let mut writer = SinkWriter::new(CopyToBytes::new(
20         PollSender::new(tx).sink_map_err(|_| io::Error::from(ErrorKind::BrokenPipe)),
21     ));
22 
23     // Write data to our interface...
24     let data: [u8; 4] = [1, 2, 3, 4];
25     let _ = writer.write(&data).await;
26 
27     // ... and receive it.
28     assert_eq!(data.to_vec(), rx.recv().await.unwrap().to_vec());
29 
30     Ok(())
31 }
32 
33 /// A trivial encoder.
34 struct SliceEncoder;
35 
36 impl SliceEncoder {
new() -> Self37     fn new() -> Self {
38         Self {}
39     }
40 }
41 
42 impl<'a> Encoder<&'a [u8]> for SliceEncoder {
43     type Error = Error;
44 
encode(&mut self, item: &'a [u8], dst: &mut bytes::BytesMut) -> Result<(), Self::Error>45     fn encode(&mut self, item: &'a [u8], dst: &mut bytes::BytesMut) -> Result<(), Self::Error> {
46         // This is where we'd write packet headers, lengths, etc. in a real encoder.
47         // For simplicity and demonstration purposes, we just pack a copy of
48         // the slice at the end of a buffer.
49         dst.extend_from_slice(item);
50         Ok(())
51     }
52 }
53 
54 #[tokio::test]
test_direct_sink_writer() -> Result<(), Error>55 async fn test_direct_sink_writer() -> Result<(), Error> {
56     // We define a framed writer which accepts byte slices
57     // and 'reverse' this construction immediately.
58     let framed_byte_lc = FramedWrite::new(Vec::new(), SliceEncoder::new());
59     let mut writer = SinkWriter::new(framed_byte_lc);
60 
61     // Write multiple slices to the sink...
62     let _ = writer.write(&[1, 2, 3]).await;
63     let _ = writer.write(&[4, 5, 6]).await;
64 
65     // ... and compare it with the buffer.
66     assert_eq!(
67         writer.into_inner().write_buffer().to_vec().as_slice(),
68         &[1, 2, 3, 4, 5, 6]
69     );
70 
71     Ok(())
72 }
73