1 // You can run this example from the root of the mio repo:
2 // cargo run --example tcp_server --features="os-poll net"
3 use mio::event::Event;
4 use mio::net::{TcpListener, TcpStream};
5 use mio::{Events, Interest, Poll, Registry, Token};
6 use std::collections::HashMap;
7 use std::io::{self, Read, Write};
8 use std::str::from_utf8;
9 
10 // Setup some tokens to allow us to identify which event is for which socket.
11 const SERVER: Token = Token(0);
12 
13 // Some data we'll send over the connection.
14 const DATA: &[u8] = b"Hello world!\n";
15 
16 #[cfg(not(target_os = "wasi"))]
main() -> io::Result<()>17 fn main() -> io::Result<()> {
18     env_logger::init();
19 
20     // Create a poll instance.
21     let mut poll = Poll::new()?;
22     // Create storage for events.
23     let mut events = Events::with_capacity(128);
24 
25     // Setup the TCP server socket.
26     let addr = "127.0.0.1:9000".parse().unwrap();
27     let mut server = TcpListener::bind(addr)?;
28 
29     // Register the server with poll we can receive events for it.
30     poll.registry()
31         .register(&mut server, SERVER, Interest::READABLE)?;
32 
33     // Map of `Token` -> `TcpStream`.
34     let mut connections = HashMap::new();
35     // Unique token for each incoming connection.
36     let mut unique_token = Token(SERVER.0 + 1);
37 
38     println!("You can connect to the server using `nc`:");
39     println!(" $ nc 127.0.0.1 9000");
40     println!("You'll see our welcome message and anything you type will be printed here.");
41 
42     loop {
43         if let Err(err) = poll.poll(&mut events, None) {
44             if interrupted(&err) {
45                 continue;
46             }
47             return Err(err);
48         }
49 
50         for event in events.iter() {
51             match event.token() {
52                 SERVER => loop {
53                     // Received an event for the TCP server socket, which
54                     // indicates we can accept an connection.
55                     let (mut connection, address) = match server.accept() {
56                         Ok((connection, address)) => (connection, address),
57                         Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
58                             // If we get a `WouldBlock` error we know our
59                             // listener has no more incoming connections queued,
60                             // so we can return to polling and wait for some
61                             // more.
62                             break;
63                         }
64                         Err(e) => {
65                             // If it was any other kind of error, something went
66                             // wrong and we terminate with an error.
67                             return Err(e);
68                         }
69                     };
70 
71                     println!("Accepted connection from: {}", address);
72 
73                     let token = next(&mut unique_token);
74                     poll.registry().register(
75                         &mut connection,
76                         token,
77                         Interest::READABLE.add(Interest::WRITABLE),
78                     )?;
79 
80                     connections.insert(token, connection);
81                 },
82                 token => {
83                     // Maybe received an event for a TCP connection.
84                     let done = if let Some(connection) = connections.get_mut(&token) {
85                         handle_connection_event(poll.registry(), connection, event)?
86                     } else {
87                         // Sporadic events happen, we can safely ignore them.
88                         false
89                     };
90                     if done {
91                         if let Some(mut connection) = connections.remove(&token) {
92                             poll.registry().deregister(&mut connection)?;
93                         }
94                     }
95                 }
96             }
97         }
98     }
99 }
100 
next(current: &mut Token) -> Token101 fn next(current: &mut Token) -> Token {
102     let next = current.0;
103     current.0 += 1;
104     Token(next)
105 }
106 
107 /// Returns `true` if the connection is done.
handle_connection_event( registry: &Registry, connection: &mut TcpStream, event: &Event, ) -> io::Result<bool>108 fn handle_connection_event(
109     registry: &Registry,
110     connection: &mut TcpStream,
111     event: &Event,
112 ) -> io::Result<bool> {
113     if event.is_writable() {
114         // We can (maybe) write to the connection.
115         match connection.write(DATA) {
116             // We want to write the entire `DATA` buffer in a single go. If we
117             // write less we'll return a short write error (same as
118             // `io::Write::write_all` does).
119             Ok(n) if n < DATA.len() => return Err(io::ErrorKind::WriteZero.into()),
120             Ok(_) => {
121                 // After we've written something we'll reregister the connection
122                 // to only respond to readable events.
123                 registry.reregister(connection, event.token(), Interest::READABLE)?
124             }
125             // Would block "errors" are the OS's way of saying that the
126             // connection is not actually ready to perform this I/O operation.
127             Err(ref err) if would_block(err) => {}
128             // Got interrupted (how rude!), we'll try again.
129             Err(ref err) if interrupted(err) => {
130                 return handle_connection_event(registry, connection, event)
131             }
132             // Other errors we'll consider fatal.
133             Err(err) => return Err(err),
134         }
135     }
136 
137     if event.is_readable() {
138         let mut connection_closed = false;
139         let mut received_data = vec![0; 4096];
140         let mut bytes_read = 0;
141         // We can (maybe) read from the connection.
142         loop {
143             match connection.read(&mut received_data[bytes_read..]) {
144                 Ok(0) => {
145                     // Reading 0 bytes means the other side has closed the
146                     // connection or is done writing, then so are we.
147                     connection_closed = true;
148                     break;
149                 }
150                 Ok(n) => {
151                     bytes_read += n;
152                     if bytes_read == received_data.len() {
153                         received_data.resize(received_data.len() + 1024, 0);
154                     }
155                 }
156                 // Would block "errors" are the OS's way of saying that the
157                 // connection is not actually ready to perform this I/O operation.
158                 Err(ref err) if would_block(err) => break,
159                 Err(ref err) if interrupted(err) => continue,
160                 // Other errors we'll consider fatal.
161                 Err(err) => return Err(err),
162             }
163         }
164 
165         if bytes_read != 0 {
166             let received_data = &received_data[..bytes_read];
167             if let Ok(str_buf) = from_utf8(received_data) {
168                 println!("Received data: {}", str_buf.trim_end());
169             } else {
170                 println!("Received (none UTF-8) data: {:?}", received_data);
171             }
172         }
173 
174         if connection_closed {
175             println!("Connection closed");
176             return Ok(true);
177         }
178     }
179 
180     Ok(false)
181 }
182 
would_block(err: &io::Error) -> bool183 fn would_block(err: &io::Error) -> bool {
184     err.kind() == io::ErrorKind::WouldBlock
185 }
186 
interrupted(err: &io::Error) -> bool187 fn interrupted(err: &io::Error) -> bool {
188     err.kind() == io::ErrorKind::Interrupted
189 }
190 
191 #[cfg(target_os = "wasi")]
main()192 fn main() {
193     panic!("can't bind to an address with wasi")
194 }
195