1 /*
2 * Copyright (C) 2021 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 //! Defines a backing task to keep a HTTP/3 connection running
17
18 use crate::boot_time;
19 use crate::boot_time::BootTime;
20 use crate::metrics::log_handshake_event_stats;
21 use log::{debug, info, warn};
22 use quiche::h3;
23 use std::collections::HashMap;
24 use std::default::Default;
25 use std::future;
26 use std::io;
27 use std::time::Instant;
28 use thiserror::Error;
29 use tokio::net::UdpSocket;
30 use tokio::select;
31 use tokio::sync::{mpsc, oneshot, watch};
32
33 use super::Status;
34
35 #[derive(Copy, Clone, Debug)]
36 pub enum Cause {
37 Probe,
38 Reconnect,
39 Retry,
40 }
41
42 #[derive(Clone)]
43 #[allow(dead_code)]
44 pub enum HandshakeResult {
45 Unknown,
46 Success,
47 Timeout,
48 TlsFail,
49 ServerUnreachable,
50 }
51
52 #[derive(Copy, Clone, Debug)]
53 pub struct HandshakeInfo {
54 pub cause: Cause,
55 pub sent_bytes: u64,
56 pub recv_bytes: u64,
57 pub elapsed: u128,
58 pub quic_version: u32,
59 pub network_type: u32,
60 pub private_dns_mode: u32,
61 pub session_hit_checker: bool,
62 }
63
64 impl std::fmt::Display for HandshakeInfo {
65 #[inline]
fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result66 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
67 write!(
68 f,
69 "cause={:?}, sent_bytes={}, recv_bytes={}, quic_version={}, session_hit_checker={}",
70 self.cause,
71 self.sent_bytes,
72 self.recv_bytes,
73 self.quic_version,
74 self.session_hit_checker
75 )
76 }
77 }
78
79 #[derive(Error, Debug)]
80 pub enum Error {
81 #[error("network IO error: {0}")]
82 Network(#[from] io::Error),
83 #[error("QUIC error: {0}")]
84 Quic(#[from] quiche::Error),
85 #[error("HTTP/3 error: {0}")]
86 H3(#[from] h3::Error),
87 #[error("Response delivery error: {0}")]
88 StreamSend(#[from] mpsc::error::SendError<Stream>),
89 #[error("Connection closed")]
90 Closed,
91 }
92
93 pub type Result<T> = std::result::Result<T, Error>;
94
95 #[derive(Debug)]
96 /// HTTP/3 Request to be sent on the connection
97 pub struct Request {
98 /// Request headers
99 pub headers: Vec<h3::Header>,
100 /// Expiry time for the request, relative to `CLOCK_BOOTTIME`
101 pub expiry: Option<BootTime>,
102 /// Channel to send the response to
103 pub response_tx: oneshot::Sender<Stream>,
104 }
105
106 #[derive(Debug)]
107 /// HTTP/3 Response
108 pub struct Stream {
109 /// Response headers
110 #[allow(dead_code)]
111 pub headers: Vec<h3::Header>,
112 /// Response body
113 pub data: Vec<u8>,
114 /// Error code if stream was reset
115 pub error: Option<u64>,
116 }
117
118 impl Stream {
new(headers: Vec<h3::Header>) -> Self119 fn new(headers: Vec<h3::Header>) -> Self {
120 Self { headers, data: Vec::new(), error: None }
121 }
122 }
123
124 const MAX_UDP_PACKET_SIZE: usize = 65536;
125
126 struct Driver {
127 request_rx: mpsc::Receiver<Request>,
128 status_tx: watch::Sender<Status>,
129 quiche_conn: quiche::Connection,
130 socket: UdpSocket,
131 // This buffer is large, boxing it will keep it
132 // off the stack and prevent it being copied during
133 // moves of the driver.
134 buffer: Box<[u8; MAX_UDP_PACKET_SIZE]>,
135 net_id: u32,
136 // Used to check if the connection has entered closing or draining state. A connection can
137 // enter closing state if the sender of request_rx's channel has been dropped.
138 // Note that we can't check if a receiver is dead without potentially receiving a message, and
139 // if we poll on a dead receiver in a select! it will immediately return None. As a result, we
140 // need this to gate whether or not to include .recv() in our select!
141 closing: bool,
142 handshake_info: HandshakeInfo,
143 connection_start: Instant,
144 }
145
146 struct H3Driver {
147 driver: Driver,
148 // h3_conn sometimes can't "fit" a request in its available windows.
149 // This value holds a peeked request in that case, waiting for
150 // transmission to become possible.
151 buffered_request: Option<Request>,
152 h3_conn: h3::Connection,
153 requests: HashMap<u64, Request>,
154 streams: HashMap<u64, Stream>,
155 }
156
optional_timeout(timeout: Option<boot_time::Duration>, net_id: u32)157 async fn optional_timeout(timeout: Option<boot_time::Duration>, net_id: u32) {
158 info!("optional_timeout: timeout={:?}, network {}", timeout, net_id);
159 match timeout {
160 Some(timeout) => boot_time::sleep(timeout).await,
161 None => future::pending().await,
162 }
163 }
164
165 /// Creates a future which when polled will handle events related to a HTTP/3 connection.
166 /// The returned error code will explain why the connection terminated.
drive( request_rx: mpsc::Receiver<Request>, status_tx: watch::Sender<Status>, quiche_conn: quiche::Connection, socket: UdpSocket, net_id: u32, handshake_info: HandshakeInfo, ) -> Result<()>167 pub async fn drive(
168 request_rx: mpsc::Receiver<Request>,
169 status_tx: watch::Sender<Status>,
170 quiche_conn: quiche::Connection,
171 socket: UdpSocket,
172 net_id: u32,
173 handshake_info: HandshakeInfo,
174 ) -> Result<()> {
175 Driver::new(request_rx, status_tx, quiche_conn, socket, net_id, handshake_info).drive().await
176 }
177
178 impl Driver {
new( request_rx: mpsc::Receiver<Request>, status_tx: watch::Sender<Status>, quiche_conn: quiche::Connection, socket: UdpSocket, net_id: u32, handshake_info: HandshakeInfo, ) -> Self179 fn new(
180 request_rx: mpsc::Receiver<Request>,
181 status_tx: watch::Sender<Status>,
182 quiche_conn: quiche::Connection,
183 socket: UdpSocket,
184 net_id: u32,
185 handshake_info: HandshakeInfo,
186 ) -> Self {
187 Self {
188 request_rx,
189 status_tx,
190 quiche_conn,
191 socket,
192 buffer: Box::new([0; MAX_UDP_PACKET_SIZE]),
193 net_id,
194 closing: false,
195 handshake_info,
196 connection_start: Instant::now(),
197 }
198 }
199
drive(mut self) -> Result<()>200 async fn drive(mut self) -> Result<()> {
201 self.connection_start = Instant::now();
202 // Prime connection
203 self.flush_tx().await?;
204 loop {
205 self = self.drive_once().await?
206 }
207 }
208
handle_closed(&self) -> Result<()>209 fn handle_closed(&self) -> Result<()> {
210 if self.quiche_conn.is_closed() {
211 // TODO: Also log local_error() once Quiche 0.10.0 is available.
212 info!(
213 "Connection {} closed on network {}, peer_error={:x?}",
214 self.quiche_conn.trace_id(),
215 self.net_id,
216 self.quiche_conn.peer_error()
217 );
218 // We don't care if the receiver has hung up
219 let session = self.quiche_conn.session().map(<[_]>::to_vec);
220 let _ = self.status_tx.send(Status::Dead { session });
221 Err(Error::Closed)
222 } else {
223 Ok(())
224 }
225 }
226
handle_draining(&mut self)227 fn handle_draining(&mut self) {
228 if self.quiche_conn.is_draining() && !self.closing {
229 // TODO: Also log local_error() once Quiche 0.10.0 is available.
230 info!(
231 "Connection {} is draining on network {}, peer_error={:x?}",
232 self.quiche_conn.trace_id(),
233 self.net_id,
234 self.quiche_conn.peer_error()
235 );
236 // We don't care if the receiver has hung up
237 let session = self.quiche_conn.session().map(<[_]>::to_vec);
238 let _ = self.status_tx.send(Status::Dead { session });
239
240 self.request_rx.close();
241 // Drain the pending DNS requests from the queue to make their corresponding future
242 // tasks return some error quickly rather than timeout. However, the DNS requests
243 // that has been sent will still time out.
244 // TODO: re-issue the outstanding DNS requests, such as passing H3Driver.requests
245 // along with Status::Dead to the `Network` that can re-issue the DNS requests.
246 while self.request_rx.try_recv().is_ok() {}
247 self.closing = true;
248 }
249 }
250
drive_once(mut self) -> Result<Self>251 async fn drive_once(mut self) -> Result<Self> {
252 // If the QUIC connection is live, but the HTTP/3 is not, try to bring it up
253 if self.quiche_conn.is_established() || self.quiche_conn.is_in_early_data() {
254 info!(
255 "Connection {} established on network {}",
256 self.quiche_conn.trace_id(),
257 self.net_id
258 );
259 self.handshake_info.elapsed = self.connection_start.elapsed().as_micros();
260 // In Stats, sent_bytes implements the way that omits the length of padding data
261 // append to the datagram.
262 self.handshake_info.sent_bytes = self.quiche_conn.stats().sent_bytes;
263 self.handshake_info.recv_bytes = self.quiche_conn.stats().recv_bytes;
264 self.handshake_info.quic_version = quiche::PROTOCOL_VERSION;
265 log_handshake_event_stats(HandshakeResult::Success, self.handshake_info);
266 let h3_config = h3::Config::new()?;
267 let h3_conn = h3::Connection::with_transport(&mut self.quiche_conn, &h3_config)?;
268 self = H3Driver::new(self, h3_conn).drive().await?;
269 let _ = self.status_tx.send(Status::QUIC);
270 }
271
272 let timer = optional_timeout(self.quiche_conn.timeout(), self.net_id);
273 select! {
274 // If a quiche timer would fire, call their callback
275 _ = timer => {
276 info!("Driver: Timer expired on network {}", self.net_id);
277 self.quiche_conn.on_timeout();
278
279 if !self.quiche_conn.is_established() && self.quiche_conn.is_closed() {
280 info!(
281 "Connection {} timeouted on network {}",
282 self.quiche_conn.trace_id(),
283 self.net_id
284 );
285 self.handshake_info.elapsed = self.connection_start.elapsed().as_micros();
286 log_handshake_event_stats(
287 HandshakeResult::Timeout,
288 self.handshake_info,
289 );
290 }
291 }
292 // If we got packets from our peer, pass them to quiche
293 Ok((size, from)) = self.socket.recv_from(self.buffer.as_mut()) => {
294 let local = self.socket.local_addr()?;
295 self.quiche_conn.recv(&mut self.buffer[..size], quiche::RecvInfo { from, to: local })?;
296 debug!("Received {} bytes on network {}", size, self.net_id);
297 }
298 };
299
300 // Any of the actions in the select could require us to send packets to the peer
301 self.flush_tx().await?;
302
303 // If the connection has entered draining state (the server is closing the connection),
304 // tell the status watcher not to use the connection. Besides, per Quiche document,
305 // the connection should not be dropped until is_closed() returns true.
306 // This tokio task will become unowned and get dropped when is_closed() returns true.
307 self.handle_draining();
308
309 // If the connection has closed, tear down
310 self.handle_closed()?;
311
312 Ok(self)
313 }
314
flush_tx(&mut self) -> Result<()>315 async fn flush_tx(&mut self) -> Result<()> {
316 let send_buf = self.buffer.as_mut();
317 loop {
318 match self.quiche_conn.send(send_buf) {
319 Err(quiche::Error::Done) => return Ok(()),
320 Err(e) => return Err(e.into()),
321 Ok((valid_len, send_info)) => {
322 self.socket.send_to(&send_buf[..valid_len], send_info.to).await?;
323 debug!("Sent {} bytes on network {}", valid_len, self.net_id);
324 }
325 }
326 }
327 }
328 }
329
330 impl H3Driver {
new(driver: Driver, h3_conn: h3::Connection) -> Self331 fn new(driver: Driver, h3_conn: h3::Connection) -> Self {
332 Self {
333 driver,
334 h3_conn,
335 requests: HashMap::new(),
336 streams: HashMap::new(),
337 buffered_request: None,
338 }
339 }
340
drive(mut self) -> Result<Driver>341 async fn drive(mut self) -> Result<Driver> {
342 let _ = self.driver.status_tx.send(Status::H3);
343 loop {
344 if let Err(e) = self.drive_once().await {
345 let session = self.driver.quiche_conn.session().map(<[_]>::to_vec);
346 let _ = self.driver.status_tx.send(Status::Dead { session });
347 return Err(e);
348 }
349 }
350 }
351
drive_once(&mut self) -> Result<()>352 async fn drive_once(&mut self) -> Result<()> {
353 // We can't call self.driver.drive_once at the same time as
354 // self.driver.request_rx.recv() due to ownership
355 let timer = optional_timeout(self.driver.quiche_conn.timeout(), self.driver.net_id);
356 // If we've buffered a request (due to the connection being full)
357 // try to resend that first
358 if let Some(request) = self.buffered_request.take() {
359 self.handle_request(request)?;
360 self.driver.flush_tx().await?;
361 }
362 select! {
363 // Only attempt to enqueue new requests if we have no buffered request and aren't
364 // closing. Maybe limit the number of in-flight queries if the handshake
365 // still hasn't finished.
366 msg = self.driver.request_rx.recv(), if !self.driver.closing && self.buffered_request.is_none() => {
367 match msg {
368 Some(request) => self.handle_request(request)?,
369 None => self.shutdown(true, b"DONE").await?,
370 }
371 },
372 // If a quiche timer would fire, call their callback
373 _ = timer => {
374 info!("H3Driver: Timer expired on network {}", self.driver.net_id);
375 self.driver.quiche_conn.on_timeout()
376 }
377 // If we got packets from our peer, pass them to quiche
378 Ok((size, from)) = self.driver.socket.recv_from(self.driver.buffer.as_mut()) => {
379 let local = self.driver.socket.local_addr()?;
380 self.driver.quiche_conn.recv(&mut self.driver.buffer[..size], quiche::RecvInfo { from, to: local }).map(|_| ())?;
381
382 debug!("Received {} bytes on network {}", size, self.driver.net_id);
383 }
384 };
385
386 // Any of the actions in the select could require us to send packets to the peer
387 self.driver.flush_tx().await?;
388
389 // Process any incoming HTTP/3 events
390 self.flush_h3().await?;
391
392 // If the connection has entered draining state (the server is closing the connection),
393 // tell the status watcher not to use the connection. Besides, per Quiche document,
394 // the connection should not be dropped until is_closed() returns true.
395 // This tokio task will become unowned and get dropped when is_closed() returns true.
396 self.driver.handle_draining();
397
398 // If the connection has closed, tear down
399 self.driver.handle_closed()
400 }
401
handle_request(&mut self, request: Request) -> Result<()>402 fn handle_request(&mut self, request: Request) -> Result<()> {
403 info!("Handling DNS request on network {}, is_in_early_data={}, stats=[{:?}], peer_streams_left_bidi={}, peer_streams_left_uni={}",
404 self.driver.net_id, self.driver.quiche_conn.is_in_early_data(), self.driver.quiche_conn.stats(), self.driver.quiche_conn.peer_streams_left_bidi(), self.driver.quiche_conn.peer_streams_left_uni());
405 // If the request has already timed out, don't issue it to the server.
406 if let Some(expiry) = request.expiry {
407 if BootTime::now() > expiry {
408 warn!("Abandoning expired DNS request");
409 return Ok(());
410 }
411 }
412 let stream_id =
413 // If h3_conn says the stream is blocked, this error is recoverable just by trying
414 // again once the stream has made progress. Buffer the request for a later retry.
415 match self.h3_conn.send_request(&mut self.driver.quiche_conn, &request.headers, true) {
416 Err(h3::Error::StreamBlocked) | Err(h3::Error::TransportError(quiche::Error::StreamLimit)) => {
417 // We only call handle_request on a value that has just come out of
418 // buffered_request, or when buffered_request is empty. This assert just
419 // validates that we don't break that assumption later, as it could result in
420 // requests being dropped on the floor under high load.
421 info!("Stream has become blocked, buffering one request.");
422 assert!(self.buffered_request.is_none());
423 self.buffered_request = Some(request);
424 return Ok(())
425 }
426 result => result?,
427 };
428 info!(
429 "Handled DNS request: stream ID {}, network {}, stream_capacity={:?}",
430 stream_id,
431 self.driver.net_id,
432 self.driver.quiche_conn.stream_capacity(stream_id)
433 );
434 self.requests.insert(stream_id, request);
435 Ok(())
436 }
437
recv_body(&mut self, stream_id: u64) -> Result<()>438 async fn recv_body(&mut self, stream_id: u64) -> Result<()> {
439 const STREAM_READ_CHUNK: usize = 4096;
440 if let Some(stream) = self.streams.get_mut(&stream_id) {
441 loop {
442 let base_len = stream.data.len();
443 stream.data.resize(base_len + STREAM_READ_CHUNK, 0);
444 match self.h3_conn.recv_body(
445 &mut self.driver.quiche_conn,
446 stream_id,
447 &mut stream.data[base_len..],
448 ) {
449 Err(h3::Error::Done) => {
450 stream.data.truncate(base_len);
451 return Ok(());
452 }
453 Err(e) => {
454 info!("recv_body: Error={:?}", e);
455 stream.data.truncate(base_len);
456 return Err(e.into());
457 }
458 Ok(recvd) => {
459 stream.data.truncate(base_len + recvd);
460 info!(
461 "Got {} bytes of response data from stream ID {} on network {}",
462 recvd, stream_id, self.driver.net_id
463 );
464 }
465 }
466 }
467 } else {
468 warn!("Received body for untracked stream ID {}", stream_id);
469 }
470 Ok(())
471 }
472
discard_datagram(&mut self, _flow_id: u64) -> Result<()>473 fn discard_datagram(&mut self, _flow_id: u64) -> Result<()> {
474 loop {
475 match self.h3_conn.recv_dgram(&mut self.driver.quiche_conn, self.driver.buffer.as_mut())
476 {
477 Err(h3::Error::Done) => return Ok(()),
478 Err(e) => return Err(e.into()),
479 _ => (),
480 }
481 }
482 }
483
flush_h3(&mut self) -> Result<()>484 async fn flush_h3(&mut self) -> Result<()> {
485 loop {
486 match self.h3_conn.poll(&mut self.driver.quiche_conn) {
487 Err(h3::Error::Done) => return Ok(()),
488 Err(e) => return Err(e.into()),
489 Ok((stream_id, event)) => self.process_h3_event(stream_id, event).await?,
490 }
491 }
492 }
493
process_h3_event(&mut self, stream_id: u64, event: h3::Event) -> Result<()>494 async fn process_h3_event(&mut self, stream_id: u64, event: h3::Event) -> Result<()> {
495 if !self.requests.contains_key(&stream_id) {
496 warn!("Received event {:?} for stream_id {} without a request.", event, stream_id);
497 }
498 match event {
499 h3::Event::Headers { list, has_body } => {
500 debug!(
501 "process_h3_event: h3::Event::Headers on stream ID {}, network {}",
502 stream_id, self.driver.net_id
503 );
504 let stream = Stream::new(list);
505 if self.streams.insert(stream_id, stream).is_some() {
506 warn!("Re-using stream ID {} before it was completed.", stream_id)
507 }
508 if !has_body {
509 self.respond(stream_id);
510 }
511 }
512 h3::Event::Data => {
513 debug!(
514 "process_h3_event: h3::Event::Data on stream ID {}, network {}",
515 stream_id, self.driver.net_id
516 );
517 self.recv_body(stream_id).await?;
518 }
519 h3::Event::Finished => {
520 debug!(
521 "process_h3_event: h3::Event::Finished on stream ID {}, network {}",
522 stream_id, self.driver.net_id
523 );
524 self.respond(stream_id)
525 }
526 h3::Event::Reset(e) => {
527 warn!(
528 "process_h3_event: h3::Event::Reset with error code {} on stream ID {}, network {}",
529 e, stream_id, self.driver.net_id
530 );
531 if let Some(stream) = self.streams.get_mut(&stream_id) {
532 stream.error = Some(e)
533 }
534 self.respond(stream_id);
535 }
536 h3::Event::Datagram => {
537 warn!("Unexpected Datagram received");
538 // We don't care if something went wrong with the datagram, we didn't
539 // want it anyways.
540 let _ = self.discard_datagram(stream_id);
541 }
542 h3::Event::PriorityUpdate => {
543 debug!(
544 "process_h3_event: h3::Event::PriorityUpdate on stream ID {}, network {}",
545 stream_id, self.driver.net_id
546 );
547 // It tells us that PRIORITY_UPDATE frame is received, but we are not
548 // using it in our code currently. No-op should be fine.
549 }
550 h3::Event::GoAway => self.shutdown(false, b"SERVER GOAWAY").await?,
551 }
552 Ok(())
553 }
554
shutdown(&mut self, send_goaway: bool, msg: &[u8]) -> Result<()>555 async fn shutdown(&mut self, send_goaway: bool, msg: &[u8]) -> Result<()> {
556 info!(
557 "Closing connection {} on network {} with msg {:?}",
558 self.driver.quiche_conn.trace_id(),
559 self.driver.net_id,
560 msg
561 );
562 self.driver.request_rx.close();
563 while self.driver.request_rx.recv().await.is_some() {}
564 self.driver.closing = true;
565 if send_goaway {
566 self.h3_conn.send_goaway(&mut self.driver.quiche_conn, 0)?;
567 }
568 if self.driver.quiche_conn.close(true, 0, msg).is_err() {
569 warn!("Trying to close already closed QUIC connection");
570 }
571 Ok(())
572 }
573
respond(&mut self, stream_id: u64)574 fn respond(&mut self, stream_id: u64) {
575 match (self.streams.remove(&stream_id), self.requests.remove(&stream_id)) {
576 (Some(stream), Some(request)) => {
577 debug!(
578 "Sending answer back to resolv, stream ID: {}, network {}",
579 stream_id, self.driver.net_id
580 );
581 // We don't care about the error, because it means the requestor has left.
582 let _ = request.response_tx.send(stream);
583 }
584 (None, _) => warn!("Tried to deliver untracked stream {}", stream_id),
585 (_, None) => warn!("Tried to deliver stream {} to untracked requestor", stream_id),
586 }
587 }
588 }
589