1 use bytes::{Buf, Bytes};
2 use h2::{Reason, RecvStream, SendStream};
3 use http::header::{HeaderName, CONNECTION, TE, TRAILER, TRANSFER_ENCODING, UPGRADE};
4 use http::HeaderMap;
5 use pin_project_lite::pin_project;
6 use std::error::Error as StdError;
7 use std::future::Future;
8 use std::io::{self, Cursor, IoSlice};
9 use std::mem;
10 use std::pin::Pin;
11 use std::task::{Context, Poll};
12 use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
13 use tracing::{debug, trace, warn};
14 
15 use crate::body::HttpBody;
16 use crate::proto::h2::ping::Recorder;
17 
18 pub(crate) mod ping;
19 
20 cfg_client! {
21     pub(crate) mod client;
22     pub(crate) use self::client::ClientTask;
23 }
24 
25 cfg_server! {
26     pub(crate) mod server;
27     pub(crate) use self::server::Server;
28 }
29 
30 /// Default initial stream window size defined in HTTP2 spec.
31 pub(crate) const SPEC_WINDOW_SIZE: u32 = 65_535;
32 
strip_connection_headers(headers: &mut HeaderMap, is_request: bool)33 fn strip_connection_headers(headers: &mut HeaderMap, is_request: bool) {
34     // List of connection headers from:
35     // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Connection
36     //
37     // TE headers are allowed in HTTP/2 requests as long as the value is "trailers", so they're
38     // tested separately.
39     let connection_headers = [
40         HeaderName::from_lowercase(b"keep-alive").unwrap(),
41         HeaderName::from_lowercase(b"proxy-connection").unwrap(),
42         TRAILER,
43         TRANSFER_ENCODING,
44         UPGRADE,
45     ];
46 
47     for header in connection_headers.iter() {
48         if headers.remove(header).is_some() {
49             warn!("Connection header illegal in HTTP/2: {}", header.as_str());
50         }
51     }
52 
53     if is_request {
54         if headers
55             .get(TE)
56             .map(|te_header| te_header != "trailers")
57             .unwrap_or(false)
58         {
59             warn!("TE headers not set to \"trailers\" are illegal in HTTP/2 requests");
60             headers.remove(TE);
61         }
62     } else if headers.remove(TE).is_some() {
63         warn!("TE headers illegal in HTTP/2 responses");
64     }
65 
66     if let Some(header) = headers.remove(CONNECTION) {
67         warn!(
68             "Connection header illegal in HTTP/2: {}",
69             CONNECTION.as_str()
70         );
71         let header_contents = header.to_str().unwrap();
72 
73         // A `Connection` header may have a comma-separated list of names of other headers that
74         // are meant for only this specific connection.
75         //
76         // Iterate these names and remove them as headers. Connection-specific headers are
77         // forbidden in HTTP2, as that information has been moved into frame types of the h2
78         // protocol.
79         for name in header_contents.split(',') {
80             let name = name.trim();
81             headers.remove(name);
82         }
83     }
84 }
85 
86 // body adapters used by both Client and Server
87 
88 pin_project! {
89     struct PipeToSendStream<S>
90     where
91         S: HttpBody,
92     {
93         body_tx: SendStream<SendBuf<S::Data>>,
94         data_done: bool,
95         #[pin]
96         stream: S,
97     }
98 }
99 
100 impl<S> PipeToSendStream<S>
101 where
102     S: HttpBody,
103 {
new(stream: S, tx: SendStream<SendBuf<S::Data>>) -> PipeToSendStream<S>104     fn new(stream: S, tx: SendStream<SendBuf<S::Data>>) -> PipeToSendStream<S> {
105         PipeToSendStream {
106             body_tx: tx,
107             data_done: false,
108             stream,
109         }
110     }
111 }
112 
113 impl<S> Future for PipeToSendStream<S>
114 where
115     S: HttpBody,
116     S::Error: Into<Box<dyn StdError + Send + Sync>>,
117 {
118     type Output = crate::Result<()>;
119 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>120     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
121         let mut me = self.project();
122         loop {
123             if !*me.data_done {
124                 // we don't have the next chunk of data yet, so just reserve 1 byte to make
125                 // sure there's some capacity available. h2 will handle the capacity management
126                 // for the actual body chunk.
127                 me.body_tx.reserve_capacity(1);
128 
129                 if me.body_tx.capacity() == 0 {
130                     loop {
131                         match ready!(me.body_tx.poll_capacity(cx)) {
132                             Some(Ok(0)) => {}
133                             Some(Ok(_)) => break,
134                             Some(Err(e)) => {
135                                 return Poll::Ready(Err(crate::Error::new_body_write(e)))
136                             }
137                             None => {
138                                 // None means the stream is no longer in a
139                                 // streaming state, we either finished it
140                                 // somehow, or the remote reset us.
141                                 return Poll::Ready(Err(crate::Error::new_body_write(
142                                     "send stream capacity unexpectedly closed",
143                                 )));
144                             }
145                         }
146                     }
147                 } else if let Poll::Ready(reason) = me
148                     .body_tx
149                     .poll_reset(cx)
150                     .map_err(crate::Error::new_body_write)?
151                 {
152                     debug!("stream received RST_STREAM: {:?}", reason);
153                     return Poll::Ready(Err(crate::Error::new_body_write(::h2::Error::from(
154                         reason,
155                     ))));
156                 }
157 
158                 match ready!(me.stream.as_mut().poll_data(cx)) {
159                     Some(Ok(chunk)) => {
160                         let is_eos = me.stream.is_end_stream();
161                         trace!(
162                             "send body chunk: {} bytes, eos={}",
163                             chunk.remaining(),
164                             is_eos,
165                         );
166 
167                         let buf = SendBuf::Buf(chunk);
168                         me.body_tx
169                             .send_data(buf, is_eos)
170                             .map_err(crate::Error::new_body_write)?;
171 
172                         if is_eos {
173                             return Poll::Ready(Ok(()));
174                         }
175                     }
176                     Some(Err(e)) => return Poll::Ready(Err(me.body_tx.on_user_err(e))),
177                     None => {
178                         me.body_tx.reserve_capacity(0);
179                         let is_eos = me.stream.is_end_stream();
180                         if is_eos {
181                             return Poll::Ready(me.body_tx.send_eos_frame());
182                         } else {
183                             *me.data_done = true;
184                             // loop again to poll_trailers
185                         }
186                     }
187                 }
188             } else {
189                 if let Poll::Ready(reason) = me
190                     .body_tx
191                     .poll_reset(cx)
192                     .map_err(crate::Error::new_body_write)?
193                 {
194                     debug!("stream received RST_STREAM: {:?}", reason);
195                     return Poll::Ready(Err(crate::Error::new_body_write(::h2::Error::from(
196                         reason,
197                     ))));
198                 }
199 
200                 match ready!(me.stream.poll_trailers(cx)) {
201                     Ok(Some(trailers)) => {
202                         me.body_tx
203                             .send_trailers(trailers)
204                             .map_err(crate::Error::new_body_write)?;
205                         return Poll::Ready(Ok(()));
206                     }
207                     Ok(None) => {
208                         // There were no trailers, so send an empty DATA frame...
209                         return Poll::Ready(me.body_tx.send_eos_frame());
210                     }
211                     Err(e) => return Poll::Ready(Err(me.body_tx.on_user_err(e))),
212                 }
213             }
214         }
215     }
216 }
217 
218 trait SendStreamExt {
on_user_err<E>(&mut self, err: E) -> crate::Error where E: Into<Box<dyn std::error::Error + Send + Sync>>219     fn on_user_err<E>(&mut self, err: E) -> crate::Error
220     where
221         E: Into<Box<dyn std::error::Error + Send + Sync>>;
send_eos_frame(&mut self) -> crate::Result<()>222     fn send_eos_frame(&mut self) -> crate::Result<()>;
223 }
224 
225 impl<B: Buf> SendStreamExt for SendStream<SendBuf<B>> {
on_user_err<E>(&mut self, err: E) -> crate::Error where E: Into<Box<dyn std::error::Error + Send + Sync>>,226     fn on_user_err<E>(&mut self, err: E) -> crate::Error
227     where
228         E: Into<Box<dyn std::error::Error + Send + Sync>>,
229     {
230         let err = crate::Error::new_user_body(err);
231         debug!("send body user stream error: {}", err);
232         self.send_reset(err.h2_reason());
233         err
234     }
235 
send_eos_frame(&mut self) -> crate::Result<()>236     fn send_eos_frame(&mut self) -> crate::Result<()> {
237         trace!("send body eos");
238         self.send_data(SendBuf::None, true)
239             .map_err(crate::Error::new_body_write)
240     }
241 }
242 
243 #[repr(usize)]
244 enum SendBuf<B> {
245     Buf(B),
246     Cursor(Cursor<Box<[u8]>>),
247     None,
248 }
249 
250 impl<B: Buf> Buf for SendBuf<B> {
251     #[inline]
remaining(&self) -> usize252     fn remaining(&self) -> usize {
253         match *self {
254             Self::Buf(ref b) => b.remaining(),
255             Self::Cursor(ref c) => Buf::remaining(c),
256             Self::None => 0,
257         }
258     }
259 
260     #[inline]
chunk(&self) -> &[u8]261     fn chunk(&self) -> &[u8] {
262         match *self {
263             Self::Buf(ref b) => b.chunk(),
264             Self::Cursor(ref c) => c.chunk(),
265             Self::None => &[],
266         }
267     }
268 
269     #[inline]
advance(&mut self, cnt: usize)270     fn advance(&mut self, cnt: usize) {
271         match *self {
272             Self::Buf(ref mut b) => b.advance(cnt),
273             Self::Cursor(ref mut c) => c.advance(cnt),
274             Self::None => {}
275         }
276     }
277 
chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize278     fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
279         match *self {
280             Self::Buf(ref b) => b.chunks_vectored(dst),
281             Self::Cursor(ref c) => c.chunks_vectored(dst),
282             Self::None => 0,
283         }
284     }
285 }
286 
287 struct H2Upgraded<B>
288 where
289     B: Buf,
290 {
291     ping: Recorder,
292     send_stream: UpgradedSendStream<B>,
293     recv_stream: RecvStream,
294     buf: Bytes,
295 }
296 
297 impl<B> AsyncRead for H2Upgraded<B>
298 where
299     B: Buf,
300 {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, read_buf: &mut ReadBuf<'_>, ) -> Poll<Result<(), io::Error>>301     fn poll_read(
302         mut self: Pin<&mut Self>,
303         cx: &mut Context<'_>,
304         read_buf: &mut ReadBuf<'_>,
305     ) -> Poll<Result<(), io::Error>> {
306         if self.buf.is_empty() {
307             self.buf = loop {
308                 match ready!(self.recv_stream.poll_data(cx)) {
309                     None => return Poll::Ready(Ok(())),
310                     Some(Ok(buf)) if buf.is_empty() && !self.recv_stream.is_end_stream() => {
311                         continue
312                     }
313                     Some(Ok(buf)) => {
314                         self.ping.record_data(buf.len());
315                         break buf;
316                     }
317                     Some(Err(e)) => {
318                         return Poll::Ready(match e.reason() {
319                             Some(Reason::NO_ERROR) | Some(Reason::CANCEL) => Ok(()),
320                             Some(Reason::STREAM_CLOSED) => {
321                                 Err(io::Error::new(io::ErrorKind::BrokenPipe, e))
322                             }
323                             _ => Err(h2_to_io_error(e)),
324                         })
325                     }
326                 }
327             };
328         }
329         let cnt = std::cmp::min(self.buf.len(), read_buf.remaining());
330         read_buf.put_slice(&self.buf[..cnt]);
331         self.buf.advance(cnt);
332         let _ = self.recv_stream.flow_control().release_capacity(cnt);
333         Poll::Ready(Ok(()))
334     }
335 }
336 
337 impl<B> AsyncWrite for H2Upgraded<B>
338 where
339     B: Buf,
340 {
poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<Result<usize, io::Error>>341     fn poll_write(
342         mut self: Pin<&mut Self>,
343         cx: &mut Context<'_>,
344         buf: &[u8],
345     ) -> Poll<Result<usize, io::Error>> {
346         if buf.is_empty() {
347             return Poll::Ready(Ok(0));
348         }
349         self.send_stream.reserve_capacity(buf.len());
350 
351         // We ignore all errors returned by `poll_capacity` and `write`, as we
352         // will get the correct from `poll_reset` anyway.
353         let cnt = match ready!(self.send_stream.poll_capacity(cx)) {
354             None => Some(0),
355             Some(Ok(cnt)) => self
356                 .send_stream
357                 .write(&buf[..cnt], false)
358                 .ok()
359                 .map(|()| cnt),
360             Some(Err(_)) => None,
361         };
362 
363         if let Some(cnt) = cnt {
364             return Poll::Ready(Ok(cnt));
365         }
366 
367         Poll::Ready(Err(h2_to_io_error(
368             match ready!(self.send_stream.poll_reset(cx)) {
369                 Ok(Reason::NO_ERROR) | Ok(Reason::CANCEL) | Ok(Reason::STREAM_CLOSED) => {
370                     return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()))
371                 }
372                 Ok(reason) => reason.into(),
373                 Err(e) => e,
374             },
375         )))
376     }
377 
poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>378     fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
379         Poll::Ready(Ok(()))
380     }
381 
poll_shutdown( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), io::Error>>382     fn poll_shutdown(
383         mut self: Pin<&mut Self>,
384         cx: &mut Context<'_>,
385     ) -> Poll<Result<(), io::Error>> {
386         if self.send_stream.write(&[], true).is_ok() {
387             return Poll::Ready(Ok(()));
388         }
389 
390         Poll::Ready(Err(h2_to_io_error(
391             match ready!(self.send_stream.poll_reset(cx)) {
392                 Ok(Reason::NO_ERROR) => return Poll::Ready(Ok(())),
393                 Ok(Reason::CANCEL) | Ok(Reason::STREAM_CLOSED) => {
394                     return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()))
395                 }
396                 Ok(reason) => reason.into(),
397                 Err(e) => e,
398             },
399         )))
400     }
401 }
402 
h2_to_io_error(e: h2::Error) -> io::Error403 fn h2_to_io_error(e: h2::Error) -> io::Error {
404     if e.is_io() {
405         e.into_io().unwrap()
406     } else {
407         io::Error::new(io::ErrorKind::Other, e)
408     }
409 }
410 
411 struct UpgradedSendStream<B>(SendStream<SendBuf<Neutered<B>>>);
412 
413 impl<B> UpgradedSendStream<B>
414 where
415     B: Buf,
416 {
new(inner: SendStream<SendBuf<B>>) -> Self417     unsafe fn new(inner: SendStream<SendBuf<B>>) -> Self {
418         assert_eq!(mem::size_of::<B>(), mem::size_of::<Neutered<B>>());
419         Self(mem::transmute(inner))
420     }
421 
reserve_capacity(&mut self, cnt: usize)422     fn reserve_capacity(&mut self, cnt: usize) {
423         unsafe { self.as_inner_unchecked().reserve_capacity(cnt) }
424     }
425 
poll_capacity(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<usize, h2::Error>>>426     fn poll_capacity(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<usize, h2::Error>>> {
427         unsafe { self.as_inner_unchecked().poll_capacity(cx) }
428     }
429 
poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<h2::Reason, h2::Error>>430     fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<h2::Reason, h2::Error>> {
431         unsafe { self.as_inner_unchecked().poll_reset(cx) }
432     }
433 
write(&mut self, buf: &[u8], end_of_stream: bool) -> Result<(), io::Error>434     fn write(&mut self, buf: &[u8], end_of_stream: bool) -> Result<(), io::Error> {
435         let send_buf = SendBuf::Cursor(Cursor::new(buf.into()));
436         unsafe {
437             self.as_inner_unchecked()
438                 .send_data(send_buf, end_of_stream)
439                 .map_err(h2_to_io_error)
440         }
441     }
442 
as_inner_unchecked(&mut self) -> &mut SendStream<SendBuf<B>>443     unsafe fn as_inner_unchecked(&mut self) -> &mut SendStream<SendBuf<B>> {
444         &mut *(&mut self.0 as *mut _ as *mut _)
445     }
446 }
447 
448 #[repr(transparent)]
449 struct Neutered<B> {
450     _inner: B,
451     impossible: Impossible,
452 }
453 
454 enum Impossible {}
455 
456 unsafe impl<B> Send for Neutered<B> {}
457 
458 impl<B> Buf for Neutered<B> {
remaining(&self) -> usize459     fn remaining(&self) -> usize {
460         match self.impossible {}
461     }
462 
chunk(&self) -> &[u8]463     fn chunk(&self) -> &[u8] {
464         match self.impossible {}
465     }
466 
advance(&mut self, _cnt: usize)467     fn advance(&mut self, _cnt: usize) {
468         match self.impossible {}
469     }
470 }
471