1 #![warn(rust_2018_idioms)]
2 #![cfg(not(target_os = "wasi"))] // Wasi doesn't support UDP
3 #![cfg(not(miri))] // No `socket` in Miri.
4
5 use tokio::net::UdpSocket;
6 use tokio_stream::StreamExt;
7 use tokio_util::codec::{Decoder, Encoder, LinesCodec};
8 use tokio_util::udp::UdpFramed;
9
10 use bytes::{BufMut, BytesMut};
11 use futures::future::try_join;
12 use futures::future::FutureExt;
13 use futures::sink::SinkExt;
14 use std::io;
15 use std::sync::Arc;
16
17 #[cfg_attr(
18 any(
19 target_os = "macos",
20 target_os = "ios",
21 target_os = "tvos",
22 target_os = "watchos",
23 target_os = "visionos"
24 ),
25 allow(unused_assignments)
26 )]
27 #[tokio::test]
send_framed_byte_codec() -> std::io::Result<()>28 async fn send_framed_byte_codec() -> std::io::Result<()> {
29 let mut a_soc = UdpSocket::bind("127.0.0.1:0").await?;
30 let mut b_soc = UdpSocket::bind("127.0.0.1:0").await?;
31
32 let a_addr = a_soc.local_addr()?;
33 let b_addr = b_soc.local_addr()?;
34
35 // test sending & receiving bytes
36 {
37 let mut a = UdpFramed::new(a_soc, ByteCodec);
38 let mut b = UdpFramed::new(b_soc, ByteCodec);
39
40 let msg = b"4567";
41
42 let send = a.send((msg, b_addr));
43 let recv = b.next().map(|e| e.unwrap());
44 let (_, received) = try_join(send, recv).await.unwrap();
45
46 let (data, addr) = received;
47 assert_eq!(msg, &*data);
48 assert_eq!(a_addr, addr);
49
50 a_soc = a.into_inner();
51 b_soc = b.into_inner();
52 }
53
54 #[cfg(not(any(
55 target_os = "macos",
56 target_os = "ios",
57 target_os = "tvos",
58 target_os = "watchos",
59 target_os = "visionos"
60 )))]
61 // test sending & receiving an empty message
62 {
63 let mut a = UdpFramed::new(a_soc, ByteCodec);
64 let mut b = UdpFramed::new(b_soc, ByteCodec);
65
66 let msg = b"";
67
68 let send = a.send((msg, b_addr));
69 let recv = b.next().map(|e| e.unwrap());
70 let (_, received) = try_join(send, recv).await.unwrap();
71
72 let (data, addr) = received;
73 assert_eq!(msg, &*data);
74 assert_eq!(a_addr, addr);
75 }
76
77 Ok(())
78 }
79
80 pub struct ByteCodec;
81
82 impl Decoder for ByteCodec {
83 type Item = Vec<u8>;
84 type Error = io::Error;
85
decode(&mut self, buf: &mut BytesMut) -> Result<Option<Vec<u8>>, io::Error>86 fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Vec<u8>>, io::Error> {
87 let len = buf.len();
88 Ok(Some(buf.split_to(len).to_vec()))
89 }
90 }
91
92 impl Encoder<&[u8]> for ByteCodec {
93 type Error = io::Error;
94
encode(&mut self, data: &[u8], buf: &mut BytesMut) -> Result<(), io::Error>95 fn encode(&mut self, data: &[u8], buf: &mut BytesMut) -> Result<(), io::Error> {
96 buf.reserve(data.len());
97 buf.put_slice(data);
98 Ok(())
99 }
100 }
101
102 #[tokio::test]
send_framed_lines_codec() -> std::io::Result<()>103 async fn send_framed_lines_codec() -> std::io::Result<()> {
104 let a_soc = UdpSocket::bind("127.0.0.1:0").await?;
105 let b_soc = UdpSocket::bind("127.0.0.1:0").await?;
106
107 let a_addr = a_soc.local_addr()?;
108 let b_addr = b_soc.local_addr()?;
109
110 let mut a = UdpFramed::new(a_soc, ByteCodec);
111 let mut b = UdpFramed::new(b_soc, LinesCodec::new());
112
113 let msg = b"1\r\n2\r\n3\r\n".to_vec();
114 a.send((&msg, b_addr)).await?;
115
116 assert_eq!(b.next().await.unwrap().unwrap(), ("1".to_string(), a_addr));
117 assert_eq!(b.next().await.unwrap().unwrap(), ("2".to_string(), a_addr));
118 assert_eq!(b.next().await.unwrap().unwrap(), ("3".to_string(), a_addr));
119
120 Ok(())
121 }
122
123 #[tokio::test]
framed_half() -> std::io::Result<()>124 async fn framed_half() -> std::io::Result<()> {
125 let a_soc = Arc::new(UdpSocket::bind("127.0.0.1:0").await?);
126 let b_soc = a_soc.clone();
127
128 let a_addr = a_soc.local_addr()?;
129 let b_addr = b_soc.local_addr()?;
130
131 let mut a = UdpFramed::new(a_soc, ByteCodec);
132 let mut b = UdpFramed::new(b_soc, LinesCodec::new());
133
134 let msg = b"1\r\n2\r\n3\r\n".to_vec();
135 a.send((&msg, b_addr)).await?;
136
137 let msg = b"4\r\n5\r\n6\r\n".to_vec();
138 a.send((&msg, b_addr)).await?;
139
140 assert_eq!(b.next().await.unwrap().unwrap(), ("1".to_string(), a_addr));
141 assert_eq!(b.next().await.unwrap().unwrap(), ("2".to_string(), a_addr));
142 assert_eq!(b.next().await.unwrap().unwrap(), ("3".to_string(), a_addr));
143
144 assert_eq!(b.next().await.unwrap().unwrap(), ("4".to_string(), a_addr));
145 assert_eq!(b.next().await.unwrap().unwrap(), ("5".to_string(), a_addr));
146 assert_eq!(b.next().await.unwrap().unwrap(), ("6".to_string(), a_addr));
147
148 Ok(())
149 }
150