1 /*
2 * Copyright (C) 2021 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 *
7 *      http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 
16 //! Defines a backing task to keep a HTTP/3 connection running
17 
18 use crate::boot_time;
19 use crate::boot_time::BootTime;
20 use crate::metrics::log_handshake_event_stats;
21 use log::{debug, info, warn};
22 use quiche::h3;
23 use std::collections::HashMap;
24 use std::default::Default;
25 use std::future;
26 use std::io;
27 use std::time::Instant;
28 use thiserror::Error;
29 use tokio::net::UdpSocket;
30 use tokio::select;
31 use tokio::sync::{mpsc, oneshot, watch};
32 
33 use super::Status;
34 
35 #[derive(Copy, Clone, Debug)]
36 pub enum Cause {
37     Probe,
38     Reconnect,
39     Retry,
40 }
41 
42 #[derive(Clone)]
43 #[allow(dead_code)]
44 pub enum HandshakeResult {
45     Unknown,
46     Success,
47     Timeout,
48     TlsFail,
49     ServerUnreachable,
50 }
51 
52 #[derive(Copy, Clone, Debug)]
53 pub struct HandshakeInfo {
54     pub cause: Cause,
55     pub sent_bytes: u64,
56     pub recv_bytes: u64,
57     pub elapsed: u128,
58     pub quic_version: u32,
59     pub network_type: u32,
60     pub private_dns_mode: u32,
61     pub session_hit_checker: bool,
62 }
63 
64 impl std::fmt::Display for HandshakeInfo {
65     #[inline]
fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result66     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
67         write!(
68             f,
69             "cause={:?}, sent_bytes={}, recv_bytes={}, quic_version={}, session_hit_checker={}",
70             self.cause,
71             self.sent_bytes,
72             self.recv_bytes,
73             self.quic_version,
74             self.session_hit_checker
75         )
76     }
77 }
78 
79 #[derive(Error, Debug)]
80 pub enum Error {
81     #[error("network IO error: {0}")]
82     Network(#[from] io::Error),
83     #[error("QUIC error: {0}")]
84     Quic(#[from] quiche::Error),
85     #[error("HTTP/3 error: {0}")]
86     H3(#[from] h3::Error),
87     #[error("Response delivery error: {0}")]
88     StreamSend(#[from] mpsc::error::SendError<Stream>),
89     #[error("Connection closed")]
90     Closed,
91 }
92 
93 pub type Result<T> = std::result::Result<T, Error>;
94 
95 #[derive(Debug)]
96 /// HTTP/3 Request to be sent on the connection
97 pub struct Request {
98     /// Request headers
99     pub headers: Vec<h3::Header>,
100     /// Expiry time for the request, relative to `CLOCK_BOOTTIME`
101     pub expiry: Option<BootTime>,
102     /// Channel to send the response to
103     pub response_tx: oneshot::Sender<Stream>,
104 }
105 
106 #[derive(Debug)]
107 /// HTTP/3 Response
108 pub struct Stream {
109     /// Response headers
110     #[allow(dead_code)]
111     pub headers: Vec<h3::Header>,
112     /// Response body
113     pub data: Vec<u8>,
114     /// Error code if stream was reset
115     pub error: Option<u64>,
116 }
117 
118 impl Stream {
new(headers: Vec<h3::Header>) -> Self119     fn new(headers: Vec<h3::Header>) -> Self {
120         Self { headers, data: Vec::new(), error: None }
121     }
122 }
123 
124 const MAX_UDP_PACKET_SIZE: usize = 65536;
125 
126 struct Driver {
127     request_rx: mpsc::Receiver<Request>,
128     status_tx: watch::Sender<Status>,
129     quiche_conn: quiche::Connection,
130     socket: UdpSocket,
131     // This buffer is large, boxing it will keep it
132     // off the stack and prevent it being copied during
133     // moves of the driver.
134     buffer: Box<[u8; MAX_UDP_PACKET_SIZE]>,
135     net_id: u32,
136     // Used to check if the connection has entered closing or draining state. A connection can
137     // enter closing state if the sender of request_rx's channel has been dropped.
138     // Note that we can't check if a receiver is dead without potentially receiving a message, and
139     // if we poll on a dead receiver in a select! it will immediately return None. As a result, we
140     // need this to gate whether or not to include .recv() in our select!
141     closing: bool,
142     handshake_info: HandshakeInfo,
143     connection_start: Instant,
144 }
145 
146 struct H3Driver {
147     driver: Driver,
148     // h3_conn sometimes can't "fit" a request in its available windows.
149     // This value holds a peeked request in that case, waiting for
150     // transmission to become possible.
151     buffered_request: Option<Request>,
152     h3_conn: h3::Connection,
153     requests: HashMap<u64, Request>,
154     streams: HashMap<u64, Stream>,
155 }
156 
optional_timeout(timeout: Option<boot_time::Duration>, net_id: u32)157 async fn optional_timeout(timeout: Option<boot_time::Duration>, net_id: u32) {
158     info!("optional_timeout: timeout={:?}, network {}", timeout, net_id);
159     match timeout {
160         Some(timeout) => boot_time::sleep(timeout).await,
161         None => future::pending().await,
162     }
163 }
164 
165 /// Creates a future which when polled will handle events related to a HTTP/3 connection.
166 /// The returned error code will explain why the connection terminated.
drive( request_rx: mpsc::Receiver<Request>, status_tx: watch::Sender<Status>, quiche_conn: quiche::Connection, socket: UdpSocket, net_id: u32, handshake_info: HandshakeInfo, ) -> Result<()>167 pub async fn drive(
168     request_rx: mpsc::Receiver<Request>,
169     status_tx: watch::Sender<Status>,
170     quiche_conn: quiche::Connection,
171     socket: UdpSocket,
172     net_id: u32,
173     handshake_info: HandshakeInfo,
174 ) -> Result<()> {
175     Driver::new(request_rx, status_tx, quiche_conn, socket, net_id, handshake_info).drive().await
176 }
177 
178 impl Driver {
new( request_rx: mpsc::Receiver<Request>, status_tx: watch::Sender<Status>, quiche_conn: quiche::Connection, socket: UdpSocket, net_id: u32, handshake_info: HandshakeInfo, ) -> Self179     fn new(
180         request_rx: mpsc::Receiver<Request>,
181         status_tx: watch::Sender<Status>,
182         quiche_conn: quiche::Connection,
183         socket: UdpSocket,
184         net_id: u32,
185         handshake_info: HandshakeInfo,
186     ) -> Self {
187         Self {
188             request_rx,
189             status_tx,
190             quiche_conn,
191             socket,
192             buffer: Box::new([0; MAX_UDP_PACKET_SIZE]),
193             net_id,
194             closing: false,
195             handshake_info,
196             connection_start: Instant::now(),
197         }
198     }
199 
drive(mut self) -> Result<()>200     async fn drive(mut self) -> Result<()> {
201         self.connection_start = Instant::now();
202         // Prime connection
203         self.flush_tx().await?;
204         loop {
205             self = self.drive_once().await?
206         }
207     }
208 
handle_closed(&self) -> Result<()>209     fn handle_closed(&self) -> Result<()> {
210         if self.quiche_conn.is_closed() {
211             // TODO: Also log local_error() once Quiche 0.10.0 is available.
212             info!(
213                 "Connection {} closed on network {}, peer_error={:x?}",
214                 self.quiche_conn.trace_id(),
215                 self.net_id,
216                 self.quiche_conn.peer_error()
217             );
218             // We don't care if the receiver has hung up
219             let session = self.quiche_conn.session().map(<[_]>::to_vec);
220             let _ = self.status_tx.send(Status::Dead { session });
221             Err(Error::Closed)
222         } else {
223             Ok(())
224         }
225     }
226 
handle_draining(&mut self)227     fn handle_draining(&mut self) {
228         if self.quiche_conn.is_draining() && !self.closing {
229             // TODO: Also log local_error() once Quiche 0.10.0 is available.
230             info!(
231                 "Connection {} is draining on network {}, peer_error={:x?}",
232                 self.quiche_conn.trace_id(),
233                 self.net_id,
234                 self.quiche_conn.peer_error()
235             );
236             // We don't care if the receiver has hung up
237             let session = self.quiche_conn.session().map(<[_]>::to_vec);
238             let _ = self.status_tx.send(Status::Dead { session });
239 
240             self.request_rx.close();
241             // Drain the pending DNS requests from the queue to make their corresponding future
242             // tasks return some error quickly rather than timeout. However, the DNS requests
243             // that has been sent will still time out.
244             // TODO: re-issue the outstanding DNS requests, such as passing H3Driver.requests
245             // along with Status::Dead to the `Network` that can re-issue the DNS requests.
246             while self.request_rx.try_recv().is_ok() {}
247             self.closing = true;
248         }
249     }
250 
drive_once(mut self) -> Result<Self>251     async fn drive_once(mut self) -> Result<Self> {
252         // If the QUIC connection is live, but the HTTP/3 is not, try to bring it up
253         if self.quiche_conn.is_established() || self.quiche_conn.is_in_early_data() {
254             info!(
255                 "Connection {} established on network {}",
256                 self.quiche_conn.trace_id(),
257                 self.net_id
258             );
259             self.handshake_info.elapsed = self.connection_start.elapsed().as_micros();
260             // In Stats, sent_bytes implements the way that omits the length of padding data
261             // append to the datagram.
262             self.handshake_info.sent_bytes = self.quiche_conn.stats().sent_bytes;
263             self.handshake_info.recv_bytes = self.quiche_conn.stats().recv_bytes;
264             self.handshake_info.quic_version = quiche::PROTOCOL_VERSION;
265             log_handshake_event_stats(HandshakeResult::Success, self.handshake_info);
266             let h3_config = h3::Config::new()?;
267             let h3_conn = h3::Connection::with_transport(&mut self.quiche_conn, &h3_config)?;
268             self = H3Driver::new(self, h3_conn).drive().await?;
269             let _ = self.status_tx.send(Status::QUIC);
270         }
271 
272         let timer = optional_timeout(self.quiche_conn.timeout(), self.net_id);
273         select! {
274             // If a quiche timer would fire, call their callback
275             _ = timer => {
276                 info!("Driver: Timer expired on network {}", self.net_id);
277                 self.quiche_conn.on_timeout();
278 
279                 if !self.quiche_conn.is_established() && self.quiche_conn.is_closed() {
280                     info!(
281                         "Connection {} timeouted on network {}",
282                         self.quiche_conn.trace_id(),
283                         self.net_id
284                     );
285                     self.handshake_info.elapsed = self.connection_start.elapsed().as_micros();
286                     log_handshake_event_stats(
287                         HandshakeResult::Timeout,
288                         self.handshake_info,
289                     );
290                 }
291             }
292             // If we got packets from our peer, pass them to quiche
293             Ok((size, from)) = self.socket.recv_from(self.buffer.as_mut()) => {
294                 let local = self.socket.local_addr()?;
295                 self.quiche_conn.recv(&mut self.buffer[..size], quiche::RecvInfo { from, to: local })?;
296                 debug!("Received {} bytes on network {}", size, self.net_id);
297             }
298         };
299 
300         // Any of the actions in the select could require us to send packets to the peer
301         self.flush_tx().await?;
302 
303         // If the connection has entered draining state (the server is closing the connection),
304         // tell the status watcher not to use the connection. Besides, per Quiche document,
305         // the connection should not be dropped until is_closed() returns true.
306         // This tokio task will become unowned and get dropped when is_closed() returns true.
307         self.handle_draining();
308 
309         // If the connection has closed, tear down
310         self.handle_closed()?;
311 
312         Ok(self)
313     }
314 
flush_tx(&mut self) -> Result<()>315     async fn flush_tx(&mut self) -> Result<()> {
316         let send_buf = self.buffer.as_mut();
317         loop {
318             match self.quiche_conn.send(send_buf) {
319                 Err(quiche::Error::Done) => return Ok(()),
320                 Err(e) => return Err(e.into()),
321                 Ok((valid_len, send_info)) => {
322                     self.socket.send_to(&send_buf[..valid_len], send_info.to).await?;
323                     debug!("Sent {} bytes on network {}", valid_len, self.net_id);
324                 }
325             }
326         }
327     }
328 }
329 
330 impl H3Driver {
new(driver: Driver, h3_conn: h3::Connection) -> Self331     fn new(driver: Driver, h3_conn: h3::Connection) -> Self {
332         Self {
333             driver,
334             h3_conn,
335             requests: HashMap::new(),
336             streams: HashMap::new(),
337             buffered_request: None,
338         }
339     }
340 
drive(mut self) -> Result<Driver>341     async fn drive(mut self) -> Result<Driver> {
342         let _ = self.driver.status_tx.send(Status::H3);
343         loop {
344             if let Err(e) = self.drive_once().await {
345                 let session = self.driver.quiche_conn.session().map(<[_]>::to_vec);
346                 let _ = self.driver.status_tx.send(Status::Dead { session });
347                 return Err(e);
348             }
349         }
350     }
351 
drive_once(&mut self) -> Result<()>352     async fn drive_once(&mut self) -> Result<()> {
353         // We can't call self.driver.drive_once at the same time as
354         // self.driver.request_rx.recv() due to ownership
355         let timer = optional_timeout(self.driver.quiche_conn.timeout(), self.driver.net_id);
356         // If we've buffered a request (due to the connection being full)
357         // try to resend that first
358         if let Some(request) = self.buffered_request.take() {
359             self.handle_request(request)?;
360             self.driver.flush_tx().await?;
361         }
362         select! {
363             // Only attempt to enqueue new requests if we have no buffered request and aren't
364             // closing. Maybe limit the number of in-flight queries if the handshake
365             // still hasn't finished.
366             msg = self.driver.request_rx.recv(), if !self.driver.closing && self.buffered_request.is_none() => {
367                 match msg {
368                     Some(request) => self.handle_request(request)?,
369                     None => self.shutdown(true, b"DONE").await?,
370                 }
371             },
372             // If a quiche timer would fire, call their callback
373             _ = timer => {
374                 info!("H3Driver: Timer expired on network {}", self.driver.net_id);
375                 self.driver.quiche_conn.on_timeout()
376             }
377             // If we got packets from our peer, pass them to quiche
378             Ok((size, from)) = self.driver.socket.recv_from(self.driver.buffer.as_mut()) => {
379                 let local = self.driver.socket.local_addr()?;
380                 self.driver.quiche_conn.recv(&mut self.driver.buffer[..size], quiche::RecvInfo { from, to: local }).map(|_| ())?;
381 
382                 debug!("Received {} bytes on network {}", size, self.driver.net_id);
383             }
384         };
385 
386         // Any of the actions in the select could require us to send packets to the peer
387         self.driver.flush_tx().await?;
388 
389         // Process any incoming HTTP/3 events
390         self.flush_h3().await?;
391 
392         // If the connection has entered draining state (the server is closing the connection),
393         // tell the status watcher not to use the connection. Besides, per Quiche document,
394         // the connection should not be dropped until is_closed() returns true.
395         // This tokio task will become unowned and get dropped when is_closed() returns true.
396         self.driver.handle_draining();
397 
398         // If the connection has closed, tear down
399         self.driver.handle_closed()
400     }
401 
handle_request(&mut self, request: Request) -> Result<()>402     fn handle_request(&mut self, request: Request) -> Result<()> {
403         info!("Handling DNS request on network {}, is_in_early_data={}, stats=[{:?}], peer_streams_left_bidi={}, peer_streams_left_uni={}",
404                 self.driver.net_id, self.driver.quiche_conn.is_in_early_data(), self.driver.quiche_conn.stats(), self.driver.quiche_conn.peer_streams_left_bidi(), self.driver.quiche_conn.peer_streams_left_uni());
405         // If the request has already timed out, don't issue it to the server.
406         if let Some(expiry) = request.expiry {
407             if BootTime::now() > expiry {
408                 warn!("Abandoning expired DNS request");
409                 return Ok(());
410             }
411         }
412         let stream_id =
413             // If h3_conn says the stream is blocked, this error is recoverable just by trying
414             // again once the stream has made progress. Buffer the request for a later retry.
415             match self.h3_conn.send_request(&mut self.driver.quiche_conn, &request.headers, true) {
416                 Err(h3::Error::StreamBlocked) | Err(h3::Error::TransportError(quiche::Error::StreamLimit)) => {
417                     // We only call handle_request on a value that has just come out of
418                     // buffered_request, or when buffered_request is empty. This assert just
419                     // validates that we don't break that assumption later, as it could result in
420                     // requests being dropped on the floor under high load.
421                     info!("Stream has become blocked, buffering one request.");
422                     assert!(self.buffered_request.is_none());
423                     self.buffered_request = Some(request);
424                     return Ok(())
425                 }
426                 result => result?,
427             };
428         info!(
429             "Handled DNS request: stream ID {}, network {}, stream_capacity={:?}",
430             stream_id,
431             self.driver.net_id,
432             self.driver.quiche_conn.stream_capacity(stream_id)
433         );
434         self.requests.insert(stream_id, request);
435         Ok(())
436     }
437 
recv_body(&mut self, stream_id: u64) -> Result<()>438     async fn recv_body(&mut self, stream_id: u64) -> Result<()> {
439         const STREAM_READ_CHUNK: usize = 4096;
440         if let Some(stream) = self.streams.get_mut(&stream_id) {
441             loop {
442                 let base_len = stream.data.len();
443                 stream.data.resize(base_len + STREAM_READ_CHUNK, 0);
444                 match self.h3_conn.recv_body(
445                     &mut self.driver.quiche_conn,
446                     stream_id,
447                     &mut stream.data[base_len..],
448                 ) {
449                     Err(h3::Error::Done) => {
450                         stream.data.truncate(base_len);
451                         return Ok(());
452                     }
453                     Err(e) => {
454                         info!("recv_body: Error={:?}", e);
455                         stream.data.truncate(base_len);
456                         return Err(e.into());
457                     }
458                     Ok(recvd) => {
459                         stream.data.truncate(base_len + recvd);
460                         info!(
461                             "Got {} bytes of response data from stream ID {} on network {}",
462                             recvd, stream_id, self.driver.net_id
463                         );
464                     }
465                 }
466             }
467         } else {
468             warn!("Received body for untracked stream ID {}", stream_id);
469         }
470         Ok(())
471     }
472 
discard_datagram(&mut self, _flow_id: u64) -> Result<()>473     fn discard_datagram(&mut self, _flow_id: u64) -> Result<()> {
474         loop {
475             match self.h3_conn.recv_dgram(&mut self.driver.quiche_conn, self.driver.buffer.as_mut())
476             {
477                 Err(h3::Error::Done) => return Ok(()),
478                 Err(e) => return Err(e.into()),
479                 _ => (),
480             }
481         }
482     }
483 
flush_h3(&mut self) -> Result<()>484     async fn flush_h3(&mut self) -> Result<()> {
485         loop {
486             match self.h3_conn.poll(&mut self.driver.quiche_conn) {
487                 Err(h3::Error::Done) => return Ok(()),
488                 Err(e) => return Err(e.into()),
489                 Ok((stream_id, event)) => self.process_h3_event(stream_id, event).await?,
490             }
491         }
492     }
493 
process_h3_event(&mut self, stream_id: u64, event: h3::Event) -> Result<()>494     async fn process_h3_event(&mut self, stream_id: u64, event: h3::Event) -> Result<()> {
495         if !self.requests.contains_key(&stream_id) {
496             warn!("Received event {:?} for stream_id {} without a request.", event, stream_id);
497         }
498         match event {
499             h3::Event::Headers { list, has_body } => {
500                 debug!(
501                     "process_h3_event: h3::Event::Headers on stream ID {}, network {}",
502                     stream_id, self.driver.net_id
503                 );
504                 let stream = Stream::new(list);
505                 if self.streams.insert(stream_id, stream).is_some() {
506                     warn!("Re-using stream ID {} before it was completed.", stream_id)
507                 }
508                 if !has_body {
509                     self.respond(stream_id);
510                 }
511             }
512             h3::Event::Data => {
513                 debug!(
514                     "process_h3_event: h3::Event::Data on stream ID {}, network {}",
515                     stream_id, self.driver.net_id
516                 );
517                 self.recv_body(stream_id).await?;
518             }
519             h3::Event::Finished => {
520                 debug!(
521                     "process_h3_event: h3::Event::Finished on stream ID {}, network {}",
522                     stream_id, self.driver.net_id
523                 );
524                 self.respond(stream_id)
525             }
526             h3::Event::Reset(e) => {
527                 warn!(
528                     "process_h3_event: h3::Event::Reset with error code {} on stream ID {}, network {}",
529                     e, stream_id, self.driver.net_id
530                 );
531                 if let Some(stream) = self.streams.get_mut(&stream_id) {
532                     stream.error = Some(e)
533                 }
534                 self.respond(stream_id);
535             }
536             h3::Event::Datagram => {
537                 warn!("Unexpected Datagram received");
538                 // We don't care if something went wrong with the datagram, we didn't
539                 // want it anyways.
540                 let _ = self.discard_datagram(stream_id);
541             }
542             h3::Event::PriorityUpdate => {
543                 debug!(
544                     "process_h3_event: h3::Event::PriorityUpdate on stream ID {}, network {}",
545                     stream_id, self.driver.net_id
546                 );
547                 // It tells us that PRIORITY_UPDATE frame is received, but we are not
548                 // using it in our code currently. No-op should be fine.
549             }
550             h3::Event::GoAway => self.shutdown(false, b"SERVER GOAWAY").await?,
551         }
552         Ok(())
553     }
554 
shutdown(&mut self, send_goaway: bool, msg: &[u8]) -> Result<()>555     async fn shutdown(&mut self, send_goaway: bool, msg: &[u8]) -> Result<()> {
556         info!(
557             "Closing connection {} on network {} with msg {:?}",
558             self.driver.quiche_conn.trace_id(),
559             self.driver.net_id,
560             msg
561         );
562         self.driver.request_rx.close();
563         while self.driver.request_rx.recv().await.is_some() {}
564         self.driver.closing = true;
565         if send_goaway {
566             self.h3_conn.send_goaway(&mut self.driver.quiche_conn, 0)?;
567         }
568         if self.driver.quiche_conn.close(true, 0, msg).is_err() {
569             warn!("Trying to close already closed QUIC connection");
570         }
571         Ok(())
572     }
573 
respond(&mut self, stream_id: u64)574     fn respond(&mut self, stream_id: u64) {
575         match (self.streams.remove(&stream_id), self.requests.remove(&stream_id)) {
576             (Some(stream), Some(request)) => {
577                 debug!(
578                     "Sending answer back to resolv, stream ID: {}, network {}",
579                     stream_id, self.driver.net_id
580                 );
581                 // We don't care about the error, because it means the requestor has left.
582                 let _ = request.response_tx.send(stream);
583             }
584             (None, _) => warn!("Tried to deliver untracked stream {}", stream_id),
585             (_, None) => warn!("Tried to deliver stream {} to untracked requestor", stream_id),
586         }
587     }
588 }
589