1 #![warn(rust_2018_idioms)]
2 #![cfg(all(feature = "full", not(target_os = "wasi"), not(miri)))] // Wasi does not support bind or UDP
3 // No `socket` on miri.
4
5 use std::future::poll_fn;
6 use std::io;
7 use std::sync::Arc;
8 use tokio::{io::ReadBuf, net::UdpSocket};
9 use tokio_test::assert_ok;
10
11 const MSG: &[u8] = b"hello";
12 const MSG_LEN: usize = MSG.len();
13
14 #[tokio::test]
send_recv() -> std::io::Result<()>15 async fn send_recv() -> std::io::Result<()> {
16 let sender = UdpSocket::bind("127.0.0.1:0").await?;
17 let receiver = UdpSocket::bind("127.0.0.1:0").await?;
18
19 sender.connect(receiver.local_addr()?).await?;
20 receiver.connect(sender.local_addr()?).await?;
21
22 sender.send(MSG).await?;
23
24 let mut recv_buf = [0u8; 32];
25 let len = receiver.recv(&mut recv_buf[..]).await?;
26
27 assert_eq!(&recv_buf[..len], MSG);
28 Ok(())
29 }
30
31 #[tokio::test]
send_recv_poll() -> std::io::Result<()>32 async fn send_recv_poll() -> std::io::Result<()> {
33 let sender = UdpSocket::bind("127.0.0.1:0").await?;
34 let receiver = UdpSocket::bind("127.0.0.1:0").await?;
35
36 sender.connect(receiver.local_addr()?).await?;
37 receiver.connect(sender.local_addr()?).await?;
38
39 poll_fn(|cx| sender.poll_send(cx, MSG)).await?;
40
41 let mut recv_buf = [0u8; 32];
42 let mut read = ReadBuf::new(&mut recv_buf);
43 poll_fn(|cx| receiver.poll_recv(cx, &mut read)).await?;
44
45 assert_eq!(read.filled(), MSG);
46 Ok(())
47 }
48
49 #[tokio::test]
send_to_recv_from() -> std::io::Result<()>50 async fn send_to_recv_from() -> std::io::Result<()> {
51 let sender = UdpSocket::bind("127.0.0.1:0").await?;
52 let receiver = UdpSocket::bind("127.0.0.1:0").await?;
53
54 let receiver_addr = receiver.local_addr()?;
55 sender.send_to(MSG, &receiver_addr).await?;
56
57 let mut recv_buf = [0u8; 32];
58 let (len, addr) = receiver.recv_from(&mut recv_buf[..]).await?;
59
60 assert_eq!(&recv_buf[..len], MSG);
61 assert_eq!(addr, sender.local_addr()?);
62 Ok(())
63 }
64
65 #[tokio::test]
send_to_recv_from_poll() -> std::io::Result<()>66 async fn send_to_recv_from_poll() -> std::io::Result<()> {
67 let sender = UdpSocket::bind("127.0.0.1:0").await?;
68 let receiver = UdpSocket::bind("127.0.0.1:0").await?;
69
70 let receiver_addr = receiver.local_addr()?;
71 poll_fn(|cx| sender.poll_send_to(cx, MSG, receiver_addr)).await?;
72
73 let mut recv_buf = [0u8; 32];
74 let mut read = ReadBuf::new(&mut recv_buf);
75 let addr = poll_fn(|cx| receiver.poll_recv_from(cx, &mut read)).await?;
76
77 assert_eq!(read.filled(), MSG);
78 assert_eq!(addr, sender.local_addr()?);
79 Ok(())
80 }
81
82 #[tokio::test]
send_to_peek_from() -> std::io::Result<()>83 async fn send_to_peek_from() -> std::io::Result<()> {
84 let sender = UdpSocket::bind("127.0.0.1:0").await?;
85 let receiver = UdpSocket::bind("127.0.0.1:0").await?;
86
87 let receiver_addr = receiver.local_addr()?;
88 poll_fn(|cx| sender.poll_send_to(cx, MSG, receiver_addr)).await?;
89
90 // peek
91 let mut recv_buf = [0u8; 32];
92 let (n, addr) = receiver.peek_from(&mut recv_buf).await?;
93 assert_eq!(&recv_buf[..n], MSG);
94 assert_eq!(addr, sender.local_addr()?);
95
96 // peek
97 let mut recv_buf = [0u8; 32];
98 let (n, addr) = receiver.peek_from(&mut recv_buf).await?;
99 assert_eq!(&recv_buf[..n], MSG);
100 assert_eq!(addr, sender.local_addr()?);
101
102 let mut recv_buf = [0u8; 32];
103 let (n, addr) = receiver.recv_from(&mut recv_buf).await?;
104 assert_eq!(&recv_buf[..n], MSG);
105 assert_eq!(addr, sender.local_addr()?);
106
107 Ok(())
108 }
109
110 #[tokio::test]
send_to_try_peek_from() -> std::io::Result<()>111 async fn send_to_try_peek_from() -> std::io::Result<()> {
112 let sender = UdpSocket::bind("127.0.0.1:0").await?;
113 let receiver = UdpSocket::bind("127.0.0.1:0").await?;
114
115 let receiver_addr = receiver.local_addr()?;
116 poll_fn(|cx| sender.poll_send_to(cx, MSG, receiver_addr)).await?;
117
118 // peek
119 let mut recv_buf = [0u8; 32];
120
121 loop {
122 match receiver.try_peek_from(&mut recv_buf) {
123 Ok((n, addr)) => {
124 assert_eq!(&recv_buf[..n], MSG);
125 assert_eq!(addr, sender.local_addr()?);
126 break;
127 }
128 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
129 receiver.readable().await?;
130 }
131 Err(e) => return Err(e),
132 }
133 }
134
135 // peek
136 let mut recv_buf = [0u8; 32];
137 let (n, addr) = receiver.peek_from(&mut recv_buf).await?;
138 assert_eq!(&recv_buf[..n], MSG);
139 assert_eq!(addr, sender.local_addr()?);
140
141 let mut recv_buf = [0u8; 32];
142 let (n, addr) = receiver.recv_from(&mut recv_buf).await?;
143 assert_eq!(&recv_buf[..n], MSG);
144 assert_eq!(addr, sender.local_addr()?);
145
146 Ok(())
147 }
148
149 #[tokio::test]
send_to_peek_from_poll() -> std::io::Result<()>150 async fn send_to_peek_from_poll() -> std::io::Result<()> {
151 let sender = UdpSocket::bind("127.0.0.1:0").await?;
152 let receiver = UdpSocket::bind("127.0.0.1:0").await?;
153
154 let receiver_addr = receiver.local_addr()?;
155 poll_fn(|cx| sender.poll_send_to(cx, MSG, receiver_addr)).await?;
156
157 let mut recv_buf = [0u8; 32];
158 let mut read = ReadBuf::new(&mut recv_buf);
159 let addr = poll_fn(|cx| receiver.poll_peek_from(cx, &mut read)).await?;
160
161 assert_eq!(read.filled(), MSG);
162 assert_eq!(addr, sender.local_addr()?);
163
164 let mut recv_buf = [0u8; 32];
165 let mut read = ReadBuf::new(&mut recv_buf);
166 poll_fn(|cx| receiver.poll_peek_from(cx, &mut read)).await?;
167
168 assert_eq!(read.filled(), MSG);
169 let mut recv_buf = [0u8; 32];
170 let mut read = ReadBuf::new(&mut recv_buf);
171
172 poll_fn(|cx| receiver.poll_recv_from(cx, &mut read)).await?;
173 assert_eq!(read.filled(), MSG);
174 Ok(())
175 }
176
177 #[tokio::test]
peek_sender() -> std::io::Result<()>178 async fn peek_sender() -> std::io::Result<()> {
179 let sender = UdpSocket::bind("127.0.0.1:0").await?;
180 let receiver = UdpSocket::bind("127.0.0.1:0").await?;
181
182 let sender_addr = sender.local_addr()?;
183 let receiver_addr = receiver.local_addr()?;
184
185 let msg = b"Hello, world!";
186 sender.send_to(msg, receiver_addr).await?;
187
188 let peeked_sender = receiver.peek_sender().await?;
189 assert_eq!(peeked_sender, sender_addr);
190
191 // Assert that `peek_sender()` returns the right sender but
192 // doesn't remove from the receive queue.
193 let mut recv_buf = [0u8; 32];
194 let (read, received_sender) = receiver.recv_from(&mut recv_buf).await?;
195
196 assert_eq!(&recv_buf[..read], msg);
197 assert_eq!(received_sender, peeked_sender);
198
199 Ok(())
200 }
201
202 #[tokio::test]
poll_peek_sender() -> std::io::Result<()>203 async fn poll_peek_sender() -> std::io::Result<()> {
204 let sender = UdpSocket::bind("127.0.0.1:0").await?;
205 let receiver = UdpSocket::bind("127.0.0.1:0").await?;
206
207 let sender_addr = sender.local_addr()?;
208 let receiver_addr = receiver.local_addr()?;
209
210 let msg = b"Hello, world!";
211 poll_fn(|cx| sender.poll_send_to(cx, msg, receiver_addr)).await?;
212
213 let peeked_sender = poll_fn(|cx| receiver.poll_peek_sender(cx)).await?;
214 assert_eq!(peeked_sender, sender_addr);
215
216 // Assert that `poll_peek_sender()` returns the right sender but
217 // doesn't remove from the receive queue.
218 let mut recv_buf = [0u8; 32];
219 let mut read = ReadBuf::new(&mut recv_buf);
220 let received_sender = poll_fn(|cx| receiver.poll_recv_from(cx, &mut read)).await?;
221
222 assert_eq!(read.filled(), msg);
223 assert_eq!(received_sender, peeked_sender);
224
225 Ok(())
226 }
227
228 #[tokio::test]
try_peek_sender() -> std::io::Result<()>229 async fn try_peek_sender() -> std::io::Result<()> {
230 let sender = UdpSocket::bind("127.0.0.1:0").await?;
231 let receiver = UdpSocket::bind("127.0.0.1:0").await?;
232
233 let sender_addr = sender.local_addr()?;
234 let receiver_addr = receiver.local_addr()?;
235
236 let msg = b"Hello, world!";
237 sender.send_to(msg, receiver_addr).await?;
238
239 let peeked_sender = loop {
240 match receiver.try_peek_sender() {
241 Ok(peeked_sender) => break peeked_sender,
242 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
243 receiver.readable().await?;
244 }
245 Err(e) => return Err(e),
246 }
247 };
248
249 assert_eq!(peeked_sender, sender_addr);
250
251 // Assert that `try_peek_sender()` returns the right sender but
252 // didn't remove from the receive queue.
253 let mut recv_buf = [0u8; 32];
254 // We already peeked the sender so there must be data in the receive queue.
255 let (read, received_sender) = receiver.try_recv_from(&mut recv_buf).unwrap();
256
257 assert_eq!(&recv_buf[..read], msg);
258 assert_eq!(received_sender, peeked_sender);
259
260 Ok(())
261 }
262
263 #[tokio::test]
split() -> std::io::Result<()>264 async fn split() -> std::io::Result<()> {
265 let socket = UdpSocket::bind("127.0.0.1:0").await?;
266 let s = Arc::new(socket);
267 let r = s.clone();
268
269 let addr = s.local_addr()?;
270 tokio::spawn(async move {
271 s.send_to(MSG, &addr).await.unwrap();
272 });
273 let mut recv_buf = [0u8; 32];
274 let (len, _) = r.recv_from(&mut recv_buf[..]).await?;
275 assert_eq!(&recv_buf[..len], MSG);
276 Ok(())
277 }
278
279 #[tokio::test]
split_chan() -> std::io::Result<()>280 async fn split_chan() -> std::io::Result<()> {
281 // setup UdpSocket that will echo all sent items
282 let socket = UdpSocket::bind("127.0.0.1:0").await?;
283 let addr = socket.local_addr().unwrap();
284 let s = Arc::new(socket);
285 let r = s.clone();
286
287 let (tx, mut rx) = tokio::sync::mpsc::channel::<(Vec<u8>, std::net::SocketAddr)>(1_000);
288 tokio::spawn(async move {
289 while let Some((bytes, addr)) = rx.recv().await {
290 s.send_to(&bytes, &addr).await.unwrap();
291 }
292 });
293
294 tokio::spawn(async move {
295 let mut buf = [0u8; 32];
296 loop {
297 let (len, addr) = r.recv_from(&mut buf).await.unwrap();
298 tx.send((buf[..len].to_vec(), addr)).await.unwrap();
299 }
300 });
301
302 // test that we can send a value and get back some response
303 let sender = UdpSocket::bind("127.0.0.1:0").await?;
304 sender.send_to(MSG, addr).await?;
305 let mut recv_buf = [0u8; 32];
306 let (len, _) = sender.recv_from(&mut recv_buf).await?;
307 assert_eq!(&recv_buf[..len], MSG);
308 Ok(())
309 }
310
311 #[tokio::test]
split_chan_poll() -> std::io::Result<()>312 async fn split_chan_poll() -> std::io::Result<()> {
313 // setup UdpSocket that will echo all sent items
314 let socket = UdpSocket::bind("127.0.0.1:0").await?;
315 let addr = socket.local_addr().unwrap();
316 let s = Arc::new(socket);
317 let r = s.clone();
318
319 let (tx, mut rx) = tokio::sync::mpsc::channel::<(Vec<u8>, std::net::SocketAddr)>(1_000);
320 tokio::spawn(async move {
321 while let Some((bytes, addr)) = rx.recv().await {
322 poll_fn(|cx| s.poll_send_to(cx, &bytes, addr))
323 .await
324 .unwrap();
325 }
326 });
327
328 tokio::spawn(async move {
329 let mut recv_buf = [0u8; 32];
330 let mut read = ReadBuf::new(&mut recv_buf);
331 loop {
332 let addr = poll_fn(|cx| r.poll_recv_from(cx, &mut read)).await.unwrap();
333 tx.send((read.filled().to_vec(), addr)).await.unwrap();
334 }
335 });
336
337 // test that we can send a value and get back some response
338 let sender = UdpSocket::bind("127.0.0.1:0").await?;
339 poll_fn(|cx| sender.poll_send_to(cx, MSG, addr)).await?;
340
341 let mut recv_buf = [0u8; 32];
342 let mut read = ReadBuf::new(&mut recv_buf);
343 let _ = poll_fn(|cx| sender.poll_recv_from(cx, &mut read)).await?;
344 assert_eq!(read.filled(), MSG);
345 Ok(())
346 }
347
348 // # Note
349 //
350 // This test is purposely written such that each time `sender` sends data on
351 // the socket, `receiver` awaits the data. On Unix, it would be okay waiting
352 // until the end of the test to receive all the data. On Windows, this would
353 // **not** be okay because it's resources are completion based (via IOCP).
354 // If data is sent and not yet received, attempting to send more data will
355 // result in `ErrorKind::WouldBlock` until the first operation completes.
356 #[tokio::test]
try_send_spawn()357 async fn try_send_spawn() {
358 const MSG2: &[u8] = b"world!";
359 const MSG2_LEN: usize = MSG2.len();
360
361 let sender = UdpSocket::bind("127.0.0.1:0").await.unwrap();
362 let receiver = UdpSocket::bind("127.0.0.1:0").await.unwrap();
363
364 receiver
365 .connect(sender.local_addr().unwrap())
366 .await
367 .unwrap();
368
369 sender.writable().await.unwrap();
370
371 let sent = &sender
372 .try_send_to(MSG, receiver.local_addr().unwrap())
373 .unwrap();
374 assert_eq!(sent, &MSG_LEN);
375 let mut buf = [0u8; 32];
376 let mut received = receiver.recv(&mut buf[..]).await.unwrap();
377
378 sender
379 .connect(receiver.local_addr().unwrap())
380 .await
381 .unwrap();
382 let sent = &sender.try_send(MSG2).unwrap();
383 assert_eq!(sent, &MSG2_LEN);
384 received += receiver.recv(&mut buf[..]).await.unwrap();
385
386 std::thread::spawn(move || {
387 let sent = &sender.try_send(MSG).unwrap();
388 assert_eq!(sent, &MSG_LEN);
389 })
390 .join()
391 .unwrap();
392 received += receiver.recv(&mut buf[..]).await.unwrap();
393
394 assert_eq!(received, MSG_LEN * 2 + MSG2_LEN);
395 }
396
397 #[tokio::test]
try_send_recv()398 async fn try_send_recv() {
399 // Create listener
400 let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
401
402 // Create socket pair
403 let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
404
405 // Connect the two
406 client.connect(server.local_addr().unwrap()).await.unwrap();
407 server.connect(client.local_addr().unwrap()).await.unwrap();
408
409 for _ in 0..5 {
410 loop {
411 client.writable().await.unwrap();
412
413 match client.try_send(b"hello world") {
414 Ok(n) => {
415 assert_eq!(n, 11);
416 break;
417 }
418 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
419 Err(e) => panic!("{e:?}"),
420 }
421 }
422
423 loop {
424 server.readable().await.unwrap();
425
426 let mut buf = [0; 512];
427
428 match server.try_recv(&mut buf) {
429 Ok(n) => {
430 assert_eq!(n, 11);
431 assert_eq!(&buf[0..11], &b"hello world"[..]);
432 break;
433 }
434 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
435 Err(e) => panic!("{e:?}"),
436 }
437 }
438 }
439 }
440
441 #[tokio::test]
try_send_to_recv_from()442 async fn try_send_to_recv_from() {
443 // Create listener
444 let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
445 let saddr = server.local_addr().unwrap();
446
447 // Create socket pair
448 let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
449 let caddr = client.local_addr().unwrap();
450
451 for _ in 0..5 {
452 loop {
453 client.writable().await.unwrap();
454
455 match client.try_send_to(b"hello world", saddr) {
456 Ok(n) => {
457 assert_eq!(n, 11);
458 break;
459 }
460 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
461 Err(e) => panic!("{e:?}"),
462 }
463 }
464
465 loop {
466 server.readable().await.unwrap();
467
468 let mut buf = [0; 512];
469
470 match server.try_recv_from(&mut buf) {
471 Ok((n, addr)) => {
472 assert_eq!(n, 11);
473 assert_eq!(addr, caddr);
474 assert_eq!(&buf[0..11], &b"hello world"[..]);
475 break;
476 }
477 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
478 Err(e) => panic!("{e:?}"),
479 }
480 }
481 }
482 }
483
484 #[tokio::test]
try_recv_buf()485 async fn try_recv_buf() {
486 // Create listener
487 let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
488
489 // Create socket pair
490 let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
491
492 // Connect the two
493 client.connect(server.local_addr().unwrap()).await.unwrap();
494 server.connect(client.local_addr().unwrap()).await.unwrap();
495
496 for _ in 0..5 {
497 loop {
498 client.writable().await.unwrap();
499
500 match client.try_send(b"hello world") {
501 Ok(n) => {
502 assert_eq!(n, 11);
503 break;
504 }
505 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
506 Err(e) => panic!("{e:?}"),
507 }
508 }
509
510 loop {
511 server.readable().await.unwrap();
512
513 let mut buf = Vec::with_capacity(512);
514
515 match server.try_recv_buf(&mut buf) {
516 Ok(n) => {
517 assert_eq!(n, 11);
518 assert_eq!(&buf[0..11], &b"hello world"[..]);
519 break;
520 }
521 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
522 Err(e) => panic!("{e:?}"),
523 }
524 }
525 }
526 }
527
528 #[tokio::test]
recv_buf() -> std::io::Result<()>529 async fn recv_buf() -> std::io::Result<()> {
530 let sender = UdpSocket::bind("127.0.0.1:0").await?;
531 let receiver = UdpSocket::bind("127.0.0.1:0").await?;
532
533 sender.connect(receiver.local_addr()?).await?;
534 receiver.connect(sender.local_addr()?).await?;
535
536 sender.send(MSG).await?;
537 let mut recv_buf = Vec::with_capacity(32);
538 let len = receiver.recv_buf(&mut recv_buf).await?;
539
540 assert_eq!(len, MSG_LEN);
541 assert_eq!(&recv_buf[..len], MSG);
542 Ok(())
543 }
544
545 #[tokio::test]
try_recv_buf_from()546 async fn try_recv_buf_from() {
547 // Create listener
548 let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
549 let saddr = server.local_addr().unwrap();
550
551 // Create socket pair
552 let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
553 let caddr = client.local_addr().unwrap();
554
555 for _ in 0..5 {
556 loop {
557 client.writable().await.unwrap();
558
559 match client.try_send_to(b"hello world", saddr) {
560 Ok(n) => {
561 assert_eq!(n, 11);
562 break;
563 }
564 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
565 Err(e) => panic!("{e:?}"),
566 }
567 }
568
569 loop {
570 server.readable().await.unwrap();
571
572 let mut buf = Vec::with_capacity(512);
573
574 match server.try_recv_buf_from(&mut buf) {
575 Ok((n, addr)) => {
576 assert_eq!(n, 11);
577 assert_eq!(addr, caddr);
578 assert_eq!(&buf[0..11], &b"hello world"[..]);
579 break;
580 }
581 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
582 Err(e) => panic!("{e:?}"),
583 }
584 }
585 }
586 }
587
588 #[tokio::test]
recv_buf_from() -> std::io::Result<()>589 async fn recv_buf_from() -> std::io::Result<()> {
590 let sender = UdpSocket::bind("127.0.0.1:0").await?;
591 let receiver = UdpSocket::bind("127.0.0.1:0").await?;
592
593 sender.connect(receiver.local_addr()?).await?;
594
595 sender.send(MSG).await?;
596 let mut recv_buf = Vec::with_capacity(32);
597 let (len, caddr) = receiver.recv_buf_from(&mut recv_buf).await?;
598
599 assert_eq!(len, MSG_LEN);
600 assert_eq!(&recv_buf[..len], MSG);
601 assert_eq!(caddr, sender.local_addr()?);
602 Ok(())
603 }
604
605 #[tokio::test]
poll_ready()606 async fn poll_ready() {
607 // Create listener
608 let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
609 let saddr = server.local_addr().unwrap();
610
611 // Create socket pair
612 let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
613 let caddr = client.local_addr().unwrap();
614
615 for _ in 0..5 {
616 loop {
617 assert_ok!(poll_fn(|cx| client.poll_send_ready(cx)).await);
618
619 match client.try_send_to(b"hello world", saddr) {
620 Ok(n) => {
621 assert_eq!(n, 11);
622 break;
623 }
624 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
625 Err(e) => panic!("{e:?}"),
626 }
627 }
628
629 loop {
630 assert_ok!(poll_fn(|cx| server.poll_recv_ready(cx)).await);
631
632 let mut buf = Vec::with_capacity(512);
633
634 match server.try_recv_buf_from(&mut buf) {
635 Ok((n, addr)) => {
636 assert_eq!(n, 11);
637 assert_eq!(addr, caddr);
638 assert_eq!(&buf[0..11], &b"hello world"[..]);
639 break;
640 }
641 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
642 Err(e) => panic!("{e:?}"),
643 }
644 }
645 }
646 }
647