1 use crate::{
2 body::{self, Bytes, HttpBody},
3 response::{IntoResponse, Response},
4 BoxError, Error,
5 };
6 use futures_util::{
7 ready,
8 stream::{self, TryStream},
9 };
10 use http::HeaderMap;
11 use pin_project_lite::pin_project;
12 use std::{
13 fmt,
14 pin::Pin,
15 task::{Context, Poll},
16 };
17 use sync_wrapper::SyncWrapper;
18
19 pin_project! {
20 /// An [`http_body::Body`] created from a [`Stream`].
21 ///
22 /// The purpose of this type is to be used in responses. If you want to
23 /// extract the request body as a stream consider using
24 /// [`BodyStream`](crate::extract::BodyStream).
25 ///
26 /// # Example
27 ///
28 /// ```
29 /// use axum::{
30 /// Router,
31 /// routing::get,
32 /// body::StreamBody,
33 /// response::IntoResponse,
34 /// };
35 /// use futures_util::stream::{self, Stream};
36 /// use std::io;
37 ///
38 /// async fn handler() -> StreamBody<impl Stream<Item = io::Result<&'static str>>> {
39 /// let chunks: Vec<io::Result<_>> = vec![
40 /// Ok("Hello,"),
41 /// Ok(" "),
42 /// Ok("world!"),
43 /// ];
44 /// let stream = stream::iter(chunks);
45 /// StreamBody::new(stream)
46 /// }
47 ///
48 /// let app = Router::new().route("/", get(handler));
49 /// # async {
50 /// # axum::Server::bind(&"".parse().unwrap()).serve(app.into_make_service()).await.unwrap();
51 /// # };
52 /// ```
53 ///
54 /// [`Stream`]: futures_util::stream::Stream
55 #[must_use]
56 pub struct StreamBody<S> {
57 #[pin]
58 stream: SyncWrapper<S>,
59 }
60 }
61
62 impl<S> From<S> for StreamBody<S>
63 where
64 S: TryStream + Send + 'static,
65 S::Ok: Into<Bytes>,
66 S::Error: Into<BoxError>,
67 {
from(stream: S) -> Self68 fn from(stream: S) -> Self {
69 Self::new(stream)
70 }
71 }
72
73 impl<S> StreamBody<S> {
74 /// Create a new `StreamBody` from a [`Stream`].
75 ///
76 /// [`Stream`]: futures_util::stream::Stream
new(stream: S) -> Self where S: TryStream + Send + 'static, S::Ok: Into<Bytes>, S::Error: Into<BoxError>,77 pub fn new(stream: S) -> Self
78 where
79 S: TryStream + Send + 'static,
80 S::Ok: Into<Bytes>,
81 S::Error: Into<BoxError>,
82 {
83 Self {
84 stream: SyncWrapper::new(stream),
85 }
86 }
87 }
88
89 impl<S> IntoResponse for StreamBody<S>
90 where
91 S: TryStream + Send + 'static,
92 S::Ok: Into<Bytes>,
93 S::Error: Into<BoxError>,
94 {
into_response(self) -> Response95 fn into_response(self) -> Response {
96 Response::new(body::boxed(self))
97 }
98 }
99
100 impl Default for StreamBody<futures_util::stream::Empty<Result<Bytes, Error>>> {
default() -> Self101 fn default() -> Self {
102 Self::new(stream::empty())
103 }
104 }
105
106 impl<S> fmt::Debug for StreamBody<S> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result107 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
108 f.debug_tuple("StreamBody").finish()
109 }
110 }
111
112 impl<S> HttpBody for StreamBody<S>
113 where
114 S: TryStream,
115 S::Ok: Into<Bytes>,
116 S::Error: Into<BoxError>,
117 {
118 type Data = Bytes;
119 type Error = Error;
120
poll_data( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Result<Self::Data, Self::Error>>>121 fn poll_data(
122 self: Pin<&mut Self>,
123 cx: &mut Context<'_>,
124 ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
125 let stream = self.project().stream.get_pin_mut();
126 match ready!(stream.try_poll_next(cx)) {
127 Some(Ok(chunk)) => Poll::Ready(Some(Ok(chunk.into()))),
128 Some(Err(err)) => Poll::Ready(Some(Err(Error::new(err)))),
129 None => Poll::Ready(None),
130 }
131 }
132
poll_trailers( self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll<Result<Option<HeaderMap>, Self::Error>>133 fn poll_trailers(
134 self: Pin<&mut Self>,
135 _cx: &mut Context<'_>,
136 ) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
137 Poll::Ready(Ok(None))
138 }
139 }
140
141 #[test]
stream_body_traits()142 fn stream_body_traits() {
143 use futures_util::stream::Empty;
144
145 type EmptyStream = StreamBody<Empty<Result<Bytes, BoxError>>>;
146
147 crate::test_helpers::assert_send::<EmptyStream>();
148 crate::test_helpers::assert_sync::<EmptyStream>();
149 crate::test_helpers::assert_unpin::<EmptyStream>();
150 }
151