1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/quic/bidirectional_stream_quic_impl.h"
6
7 #include <utility>
8
9 #include "base/functional/bind.h"
10 #include "base/location.h"
11 #include "base/logging.h"
12 #include "base/memory/raw_ptr.h"
13 #include "base/task/single_thread_task_runner.h"
14 #include "base/timer/timer.h"
15 #include "net/http/bidirectional_stream_request_info.h"
16 #include "net/http/http_util.h"
17 #include "net/socket/next_proto.h"
18 #include "net/spdy/spdy_http_utils.h"
19 #include "net/third_party/quiche/src/quiche/quic/core/quic_connection.h"
20 #include "net/third_party/quiche/src/quiche/spdy/core/http2_header_block.h"
21 #include "quic_http_stream.h"
22
23 namespace net {
24 namespace {
25 // Sets a boolean to a value, and restores it to the previous value once
26 // the saver goes out of scope.
27 class ScopedBoolSaver {
28 public:
ScopedBoolSaver(bool * var,bool new_val)29 ScopedBoolSaver(bool* var, bool new_val) : var_(var), old_val_(*var) {
30 *var_ = new_val;
31 }
32
~ScopedBoolSaver()33 ~ScopedBoolSaver() { *var_ = old_val_; }
34
35 private:
36 raw_ptr<bool> var_;
37 bool old_val_;
38 };
39 } // namespace
40
BidirectionalStreamQuicImpl(std::unique_ptr<QuicChromiumClientSession::Handle> session)41 BidirectionalStreamQuicImpl::BidirectionalStreamQuicImpl(
42 std::unique_ptr<QuicChromiumClientSession::Handle> session)
43 : session_(std::move(session)) {}
44
~BidirectionalStreamQuicImpl()45 BidirectionalStreamQuicImpl::~BidirectionalStreamQuicImpl() {
46 if (stream_) {
47 delegate_ = nullptr;
48 stream_->Reset(quic::QUIC_STREAM_CANCELLED);
49 }
50 }
51
Start(const BidirectionalStreamRequestInfo * request_info,const NetLogWithSource & net_log,bool send_request_headers_automatically,BidirectionalStreamImpl::Delegate * delegate,std::unique_ptr<base::OneShotTimer> timer,const NetworkTrafficAnnotationTag & traffic_annotation)52 void BidirectionalStreamQuicImpl::Start(
53 const BidirectionalStreamRequestInfo* request_info,
54 const NetLogWithSource& net_log,
55 bool send_request_headers_automatically,
56 BidirectionalStreamImpl::Delegate* delegate,
57 std::unique_ptr<base::OneShotTimer> timer,
58 const NetworkTrafficAnnotationTag& traffic_annotation) {
59 ScopedBoolSaver saver(&may_invoke_callbacks_, false);
60 DCHECK(!stream_);
61 CHECK(delegate);
62 DLOG_IF(WARNING, !session_->IsConnected())
63 << "Trying to start request headers after session has been closed.";
64
65 net_log.AddEventReferencingSource(
66 NetLogEventType::BIDIRECTIONAL_STREAM_BOUND_TO_QUIC_SESSION,
67 session_->net_log().source());
68
69 send_request_headers_automatically_ = send_request_headers_automatically;
70 delegate_ = delegate;
71 request_info_ = request_info;
72
73 // Only allow SAFE methods to use early data, unless overridden by the caller.
74 bool use_early_data = HttpUtil::IsMethodSafe(request_info_->method);
75 use_early_data |= request_info_->allow_early_data_override;
76
77 int rv = session_->RequestStream(
78 !use_early_data,
79 base::BindOnce(&BidirectionalStreamQuicImpl::OnStreamReady,
80 weak_factory_.GetWeakPtr()),
81 traffic_annotation);
82 if (rv == ERR_IO_PENDING)
83 return;
84
85 if (rv != OK) {
86 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
87 FROM_HERE,
88 base::BindOnce(
89 &BidirectionalStreamQuicImpl::NotifyError,
90 weak_factory_.GetWeakPtr(),
91 session_->OneRttKeysAvailable() ? rv : ERR_QUIC_HANDSHAKE_FAILED));
92 return;
93 }
94
95 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
96 FROM_HERE, base::BindOnce(&BidirectionalStreamQuicImpl::OnStreamReady,
97 weak_factory_.GetWeakPtr(), rv));
98 }
99
SendRequestHeaders()100 void BidirectionalStreamQuicImpl::SendRequestHeaders() {
101 ScopedBoolSaver saver(&may_invoke_callbacks_, false);
102 int rv = WriteHeaders();
103 if (rv < 0) {
104 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
105 FROM_HERE, base::BindOnce(&BidirectionalStreamQuicImpl::NotifyError,
106 weak_factory_.GetWeakPtr(), rv));
107 }
108 }
109
WriteHeaders()110 int BidirectionalStreamQuicImpl::WriteHeaders() {
111 DCHECK(!has_sent_headers_);
112
113 spdy::Http2HeaderBlock headers;
114 HttpRequestInfo http_request_info;
115 http_request_info.url = request_info_->url;
116 http_request_info.method = request_info_->method;
117 http_request_info.extra_headers = request_info_->extra_headers;
118
119 CreateSpdyHeadersFromHttpRequest(http_request_info, std::nullopt,
120 http_request_info.extra_headers, &headers);
121 int rv = stream_->WriteHeaders(std::move(headers),
122 request_info_->end_stream_on_headers, nullptr);
123 if (rv >= 0) {
124 headers_bytes_sent_ += rv;
125 has_sent_headers_ = true;
126 }
127 return rv;
128 }
129
ReadData(IOBuffer * buffer,int buffer_len)130 int BidirectionalStreamQuicImpl::ReadData(IOBuffer* buffer, int buffer_len) {
131 ScopedBoolSaver saver(&may_invoke_callbacks_, false);
132 DCHECK(buffer);
133 DCHECK(buffer_len);
134
135 int rv = stream_->ReadBody(
136 buffer, buffer_len,
137 base::BindOnce(&BidirectionalStreamQuicImpl::OnReadDataComplete,
138 weak_factory_.GetWeakPtr()));
139 if (rv == ERR_IO_PENDING) {
140 read_buffer_ = buffer;
141 read_buffer_len_ = buffer_len;
142 return ERR_IO_PENDING;
143 }
144
145 if (rv < 0)
146 return rv;
147
148 // If the write side is closed, OnFinRead() will call
149 // BidirectionalStreamQuicImpl::OnClose().
150 if (stream_->IsDoneReading())
151 stream_->OnFinRead();
152
153 return rv;
154 }
155
SendvData(const std::vector<scoped_refptr<IOBuffer>> & buffers,const std::vector<int> & lengths,bool end_stream)156 void BidirectionalStreamQuicImpl::SendvData(
157 const std::vector<scoped_refptr<IOBuffer>>& buffers,
158 const std::vector<int>& lengths,
159 bool end_stream) {
160 ScopedBoolSaver saver(&may_invoke_callbacks_, false);
161 DCHECK_EQ(buffers.size(), lengths.size());
162
163 if (!stream_->IsOpen()) {
164 LOG(ERROR) << "Trying to send data after stream has been closed.";
165 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
166 FROM_HERE, base::BindOnce(&BidirectionalStreamQuicImpl::NotifyError,
167 weak_factory_.GetWeakPtr(), ERR_UNEXPECTED));
168 return;
169 }
170
171 std::unique_ptr<quic::QuicConnection::ScopedPacketFlusher> bundler(
172 session_->CreatePacketBundler());
173 if (!has_sent_headers_) {
174 DCHECK(!send_request_headers_automatically_);
175 int rv = WriteHeaders();
176 if (rv < 0) {
177 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
178 FROM_HERE, base::BindOnce(&BidirectionalStreamQuicImpl::NotifyError,
179 weak_factory_.GetWeakPtr(), rv));
180 return;
181 }
182 }
183
184 int rv = stream_->WritevStreamData(
185 buffers, lengths, end_stream,
186 base::BindOnce(&BidirectionalStreamQuicImpl::OnSendDataComplete,
187 weak_factory_.GetWeakPtr()));
188
189 if (rv != ERR_IO_PENDING) {
190 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
191 FROM_HERE,
192 base::BindOnce(&BidirectionalStreamQuicImpl::OnSendDataComplete,
193 weak_factory_.GetWeakPtr(), rv));
194 }
195 }
196
GetProtocol() const197 NextProto BidirectionalStreamQuicImpl::GetProtocol() const {
198 return negotiated_protocol_;
199 }
200
GetTotalReceivedBytes() const201 int64_t BidirectionalStreamQuicImpl::GetTotalReceivedBytes() const {
202 if (stream_) {
203 DCHECK_LE(stream_->NumBytesConsumed(), stream_->stream_bytes_read());
204 // Only count the uniquely received bytes.
205 return stream_->NumBytesConsumed();
206 }
207 return closed_stream_received_bytes_;
208 }
209
GetTotalSentBytes() const210 int64_t BidirectionalStreamQuicImpl::GetTotalSentBytes() const {
211 if (stream_) {
212 return stream_->stream_bytes_written();
213 }
214 return closed_stream_sent_bytes_;
215 }
216
GetLoadTimingInfo(LoadTimingInfo * load_timing_info) const217 bool BidirectionalStreamQuicImpl::GetLoadTimingInfo(
218 LoadTimingInfo* load_timing_info) const {
219 bool is_first_stream = closed_is_first_stream_;
220 if (stream_)
221 is_first_stream = stream_->IsFirstStream();
222 if (is_first_stream) {
223 load_timing_info->socket_reused = false;
224 load_timing_info->connect_timing = connect_timing_;
225 } else {
226 load_timing_info->socket_reused = true;
227 }
228 return true;
229 }
230
PopulateNetErrorDetails(NetErrorDetails * details)231 void BidirectionalStreamQuicImpl::PopulateNetErrorDetails(
232 NetErrorDetails* details) {
233 DCHECK(details);
234 details->connection_info =
235 QuicHttpStream::ConnectionInfoFromQuicVersion(session_->GetQuicVersion());
236 session_->PopulateNetErrorDetails(details);
237 if (session_->OneRttKeysAvailable() && stream_)
238 details->quic_connection_error = stream_->connection_error();
239 }
240
OnStreamReady(int rv)241 void BidirectionalStreamQuicImpl::OnStreamReady(int rv) {
242 DCHECK_NE(ERR_IO_PENDING, rv);
243 DCHECK(!stream_);
244 if (rv != OK) {
245 NotifyError(rv);
246 return;
247 }
248
249 stream_ = session_->ReleaseStream();
250 DCHECK(stream_);
251
252 if (!stream_->IsOpen()) {
253 NotifyError(ERR_CONNECTION_CLOSED);
254 return;
255 }
256
257 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
258 FROM_HERE,
259 base::BindOnce(&BidirectionalStreamQuicImpl::ReadInitialHeaders,
260 weak_factory_.GetWeakPtr()));
261
262 NotifyStreamReady();
263 }
264
OnSendDataComplete(int rv)265 void BidirectionalStreamQuicImpl::OnSendDataComplete(int rv) {
266 CHECK(may_invoke_callbacks_);
267 DCHECK_NE(ERR_IO_PENDING, rv);
268 if (rv < 0) {
269 NotifyError(rv);
270 return;
271 }
272
273 if (delegate_)
274 delegate_->OnDataSent();
275 }
276
OnReadInitialHeadersComplete(int rv)277 void BidirectionalStreamQuicImpl::OnReadInitialHeadersComplete(int rv) {
278 CHECK(may_invoke_callbacks_);
279 DCHECK_NE(ERR_IO_PENDING, rv);
280 if (rv < 0) {
281 NotifyError(rv);
282 return;
283 }
284
285 headers_bytes_received_ += rv;
286 negotiated_protocol_ = kProtoQUIC;
287 connect_timing_ = session_->GetConnectTiming();
288 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
289 FROM_HERE,
290 base::BindOnce(&BidirectionalStreamQuicImpl::ReadTrailingHeaders,
291 weak_factory_.GetWeakPtr()));
292 if (delegate_)
293 delegate_->OnHeadersReceived(initial_headers_);
294 }
295
ReadInitialHeaders()296 void BidirectionalStreamQuicImpl::ReadInitialHeaders() {
297 int rv = stream_->ReadInitialHeaders(
298 &initial_headers_,
299 base::BindOnce(&BidirectionalStreamQuicImpl::OnReadInitialHeadersComplete,
300 weak_factory_.GetWeakPtr()));
301
302 if (rv != ERR_IO_PENDING)
303 OnReadInitialHeadersComplete(rv);
304 }
305
ReadTrailingHeaders()306 void BidirectionalStreamQuicImpl::ReadTrailingHeaders() {
307 int rv = stream_->ReadTrailingHeaders(
308 &trailing_headers_,
309 base::BindOnce(
310 &BidirectionalStreamQuicImpl::OnReadTrailingHeadersComplete,
311 weak_factory_.GetWeakPtr()));
312
313 if (rv != ERR_IO_PENDING)
314 OnReadTrailingHeadersComplete(rv);
315 }
316
OnReadTrailingHeadersComplete(int rv)317 void BidirectionalStreamQuicImpl::OnReadTrailingHeadersComplete(int rv) {
318 CHECK(may_invoke_callbacks_);
319 DCHECK_NE(ERR_IO_PENDING, rv);
320 if (rv < 0) {
321 NotifyError(rv);
322 return;
323 }
324
325 headers_bytes_received_ += rv;
326
327 if (delegate_)
328 delegate_->OnTrailersReceived(trailing_headers_);
329 }
330
OnReadDataComplete(int rv)331 void BidirectionalStreamQuicImpl::OnReadDataComplete(int rv) {
332 CHECK(may_invoke_callbacks_);
333
334 read_buffer_ = nullptr;
335 read_buffer_len_ = 0;
336
337 // If the write side is closed, OnFinRead() will call
338 // BidirectionalStreamQuicImpl::OnClose().
339 if (stream_->IsDoneReading())
340 stream_->OnFinRead();
341
342 if (!delegate_)
343 return;
344
345 if (rv < 0)
346 NotifyError(rv);
347 else
348 delegate_->OnDataRead(rv);
349 }
350
NotifyError(int error)351 void BidirectionalStreamQuicImpl::NotifyError(int error) {
352 NotifyErrorImpl(error, /*notify_delegate_later*/ false);
353 }
354
NotifyErrorImpl(int error,bool notify_delegate_later)355 void BidirectionalStreamQuicImpl::NotifyErrorImpl(int error,
356 bool notify_delegate_later) {
357 DCHECK_NE(OK, error);
358 DCHECK_NE(ERR_IO_PENDING, error);
359
360 ResetStream();
361 if (delegate_) {
362 response_status_ = error;
363 BidirectionalStreamImpl::Delegate* delegate = delegate_;
364 delegate_ = nullptr;
365 // Cancel any pending callback.
366 weak_factory_.InvalidateWeakPtrs();
367 if (notify_delegate_later) {
368 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
369 FROM_HERE,
370 base::BindOnce(&BidirectionalStreamQuicImpl::NotifyFailure,
371 weak_factory_.GetWeakPtr(), delegate, error));
372 } else {
373 NotifyFailure(delegate, error);
374 // |this| might be destroyed at this point.
375 }
376 }
377 }
378
NotifyFailure(BidirectionalStreamImpl::Delegate * delegate,int error)379 void BidirectionalStreamQuicImpl::NotifyFailure(
380 BidirectionalStreamImpl::Delegate* delegate,
381 int error) {
382 CHECK(may_invoke_callbacks_);
383 delegate->OnFailed(error);
384 // |this| might be destroyed at this point.
385 }
386
NotifyStreamReady()387 void BidirectionalStreamQuicImpl::NotifyStreamReady() {
388 CHECK(may_invoke_callbacks_);
389 if (send_request_headers_automatically_) {
390 int rv = WriteHeaders();
391 if (rv < 0) {
392 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
393 FROM_HERE, base::BindOnce(&BidirectionalStreamQuicImpl::NotifyError,
394 weak_factory_.GetWeakPtr(), rv));
395 return;
396 }
397 }
398
399 if (delegate_)
400 delegate_->OnStreamReady(has_sent_headers_);
401 }
402
ResetStream()403 void BidirectionalStreamQuicImpl::ResetStream() {
404 if (!stream_)
405 return;
406 closed_stream_received_bytes_ = stream_->stream_bytes_read();
407 closed_stream_sent_bytes_ = stream_->stream_bytes_written();
408 closed_is_first_stream_ = stream_->IsFirstStream();
409 }
410
411 } // namespace net
412