1 use crate::{
2     body::{self, Bytes, HttpBody},
3     response::{IntoResponse, Response},
4     BoxError, Error,
5 };
6 use futures_util::{
7     ready,
8     stream::{self, TryStream},
9 };
10 use http::HeaderMap;
11 use pin_project_lite::pin_project;
12 use std::{
13     fmt,
14     pin::Pin,
15     task::{Context, Poll},
16 };
17 use sync_wrapper::SyncWrapper;
18 
19 pin_project! {
20     /// An [`http_body::Body`] created from a [`Stream`].
21     ///
22     /// The purpose of this type is to be used in responses. If you want to
23     /// extract the request body as a stream consider using
24     /// [`BodyStream`](crate::extract::BodyStream).
25     ///
26     /// # Example
27     ///
28     /// ```
29     /// use axum::{
30     ///     Router,
31     ///     routing::get,
32     ///     body::StreamBody,
33     ///     response::IntoResponse,
34     /// };
35     /// use futures_util::stream::{self, Stream};
36     /// use std::io;
37     ///
38     /// async fn handler() -> StreamBody<impl Stream<Item = io::Result<&'static str>>> {
39     ///     let chunks: Vec<io::Result<_>> = vec![
40     ///         Ok("Hello,"),
41     ///         Ok(" "),
42     ///         Ok("world!"),
43     ///     ];
44     ///     let stream = stream::iter(chunks);
45     ///     StreamBody::new(stream)
46     /// }
47     ///
48     /// let app = Router::new().route("/", get(handler));
49     /// # async {
50     /// # axum::Server::bind(&"".parse().unwrap()).serve(app.into_make_service()).await.unwrap();
51     /// # };
52     /// ```
53     ///
54     /// [`Stream`]: futures_util::stream::Stream
55     #[must_use]
56     pub struct StreamBody<S> {
57         #[pin]
58         stream: SyncWrapper<S>,
59     }
60 }
61 
62 impl<S> From<S> for StreamBody<S>
63 where
64     S: TryStream + Send + 'static,
65     S::Ok: Into<Bytes>,
66     S::Error: Into<BoxError>,
67 {
from(stream: S) -> Self68     fn from(stream: S) -> Self {
69         Self::new(stream)
70     }
71 }
72 
73 impl<S> StreamBody<S> {
74     /// Create a new `StreamBody` from a [`Stream`].
75     ///
76     /// [`Stream`]: futures_util::stream::Stream
new(stream: S) -> Self where S: TryStream + Send + 'static, S::Ok: Into<Bytes>, S::Error: Into<BoxError>,77     pub fn new(stream: S) -> Self
78     where
79         S: TryStream + Send + 'static,
80         S::Ok: Into<Bytes>,
81         S::Error: Into<BoxError>,
82     {
83         Self {
84             stream: SyncWrapper::new(stream),
85         }
86     }
87 }
88 
89 impl<S> IntoResponse for StreamBody<S>
90 where
91     S: TryStream + Send + 'static,
92     S::Ok: Into<Bytes>,
93     S::Error: Into<BoxError>,
94 {
into_response(self) -> Response95     fn into_response(self) -> Response {
96         Response::new(body::boxed(self))
97     }
98 }
99 
100 impl Default for StreamBody<futures_util::stream::Empty<Result<Bytes, Error>>> {
default() -> Self101     fn default() -> Self {
102         Self::new(stream::empty())
103     }
104 }
105 
106 impl<S> fmt::Debug for StreamBody<S> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result107     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
108         f.debug_tuple("StreamBody").finish()
109     }
110 }
111 
112 impl<S> HttpBody for StreamBody<S>
113 where
114     S: TryStream,
115     S::Ok: Into<Bytes>,
116     S::Error: Into<BoxError>,
117 {
118     type Data = Bytes;
119     type Error = Error;
120 
poll_data( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Result<Self::Data, Self::Error>>>121     fn poll_data(
122         self: Pin<&mut Self>,
123         cx: &mut Context<'_>,
124     ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
125         let stream = self.project().stream.get_pin_mut();
126         match ready!(stream.try_poll_next(cx)) {
127             Some(Ok(chunk)) => Poll::Ready(Some(Ok(chunk.into()))),
128             Some(Err(err)) => Poll::Ready(Some(Err(Error::new(err)))),
129             None => Poll::Ready(None),
130         }
131     }
132 
poll_trailers( self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll<Result<Option<HeaderMap>, Self::Error>>133     fn poll_trailers(
134         self: Pin<&mut Self>,
135         _cx: &mut Context<'_>,
136     ) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
137         Poll::Ready(Ok(None))
138     }
139 }
140 
141 #[test]
stream_body_traits()142 fn stream_body_traits() {
143     use futures_util::stream::Empty;
144 
145     type EmptyStream = StreamBody<Empty<Result<Bytes, BoxError>>>;
146 
147     crate::test_helpers::assert_send::<EmptyStream>();
148     crate::test_helpers::assert_sync::<EmptyStream>();
149     crate::test_helpers::assert_unpin::<EmptyStream>();
150 }
151