1 // Copyright 2018 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/websockets/websocket_basic_stream_adapters.h"
6
7 #include <cstring>
8 #include <ostream>
9 #include <utility>
10
11 #include "base/check.h"
12 #include "base/check_op.h"
13 #include "base/functional/bind.h"
14 #include "base/functional/callback.h"
15 #include "base/location.h"
16 #include "base/logging.h"
17 #include "base/memory/scoped_refptr.h"
18 #include "base/notreached.h"
19 #include "base/task/single_thread_task_runner.h"
20 #include "net/base/io_buffer.h"
21 #include "net/socket/client_socket_handle.h"
22 #include "net/socket/stream_socket.h"
23 #include "net/spdy/spdy_buffer.h"
24 #include "net/third_party/quiche/src/quiche/quic/core/http/quic_header_list.h"
25 #include "net/third_party/quiche/src/quiche/quic/core/http/spdy_utils.h"
26 #include "net/third_party/quiche/src/quiche/quic/core/quic_ack_listener_interface.h"
27 #include "net/third_party/quiche/src/quiche/quic/core/quic_error_codes.h"
28 #include "net/websockets/websocket_quic_spdy_stream.h"
29
30 namespace net {
31 struct NetworkTrafficAnnotationTag;
32
WebSocketClientSocketHandleAdapter(std::unique_ptr<ClientSocketHandle> connection)33 WebSocketClientSocketHandleAdapter::WebSocketClientSocketHandleAdapter(
34 std::unique_ptr<ClientSocketHandle> connection)
35 : connection_(std::move(connection)) {}
36
37 WebSocketClientSocketHandleAdapter::~WebSocketClientSocketHandleAdapter() =
38 default;
39
Read(IOBuffer * buf,int buf_len,CompletionOnceCallback callback)40 int WebSocketClientSocketHandleAdapter::Read(IOBuffer* buf,
41 int buf_len,
42 CompletionOnceCallback callback) {
43 return connection_->socket()->Read(buf, buf_len, std::move(callback));
44 }
45
Write(IOBuffer * buf,int buf_len,CompletionOnceCallback callback,const NetworkTrafficAnnotationTag & traffic_annotation)46 int WebSocketClientSocketHandleAdapter::Write(
47 IOBuffer* buf,
48 int buf_len,
49 CompletionOnceCallback callback,
50 const NetworkTrafficAnnotationTag& traffic_annotation) {
51 return connection_->socket()->Write(buf, buf_len, std::move(callback),
52 traffic_annotation);
53 }
54
Disconnect()55 void WebSocketClientSocketHandleAdapter::Disconnect() {
56 connection_->socket()->Disconnect();
57 }
58
is_initialized() const59 bool WebSocketClientSocketHandleAdapter::is_initialized() const {
60 return connection_->is_initialized();
61 }
62
WebSocketSpdyStreamAdapter(base::WeakPtr<SpdyStream> stream,Delegate * delegate,NetLogWithSource net_log)63 WebSocketSpdyStreamAdapter::WebSocketSpdyStreamAdapter(
64 base::WeakPtr<SpdyStream> stream,
65 Delegate* delegate,
66 NetLogWithSource net_log)
67 : stream_(stream), delegate_(delegate), net_log_(net_log) {
68 stream_->SetDelegate(this);
69 }
70
~WebSocketSpdyStreamAdapter()71 WebSocketSpdyStreamAdapter::~WebSocketSpdyStreamAdapter() {
72 if (stream_) {
73 // DetachDelegate() also cancels the stream.
74 stream_->DetachDelegate();
75 }
76 }
77
DetachDelegate()78 void WebSocketSpdyStreamAdapter::DetachDelegate() {
79 delegate_ = nullptr;
80 }
81
Read(IOBuffer * buf,int buf_len,CompletionOnceCallback callback)82 int WebSocketSpdyStreamAdapter::Read(IOBuffer* buf,
83 int buf_len,
84 CompletionOnceCallback callback) {
85 DCHECK(!read_callback_);
86 DCHECK_LT(0, buf_len);
87
88 DCHECK(!read_buffer_);
89 read_buffer_ = buf;
90 // |read_length_| is size_t and |buf_len| is a non-negative int, therefore
91 // conversion is always valid.
92 DCHECK(!read_length_);
93 read_length_ = buf_len;
94
95 if (!read_data_.IsEmpty())
96 return CopySavedReadDataIntoBuffer();
97
98 if (!stream_)
99 return stream_error_;
100
101 read_callback_ = std::move(callback);
102 return ERR_IO_PENDING;
103 }
104
Write(IOBuffer * buf,int buf_len,CompletionOnceCallback callback,const NetworkTrafficAnnotationTag & traffic_annotation)105 int WebSocketSpdyStreamAdapter::Write(
106 IOBuffer* buf,
107 int buf_len,
108 CompletionOnceCallback callback,
109 const NetworkTrafficAnnotationTag& traffic_annotation) {
110 CHECK(headers_sent_);
111 DCHECK(!write_callback_);
112 DCHECK(callback);
113 DCHECK_LT(0, buf_len);
114
115 if (!stream_)
116 return stream_error_;
117
118 stream_->SendData(buf, buf_len, MORE_DATA_TO_SEND);
119 write_callback_ = std::move(callback);
120 write_length_ = buf_len;
121 return ERR_IO_PENDING;
122 }
123
Disconnect()124 void WebSocketSpdyStreamAdapter::Disconnect() {
125 if (stream_) {
126 stream_->DetachDelegate();
127 stream_ = nullptr;
128 }
129 }
130
is_initialized() const131 bool WebSocketSpdyStreamAdapter::is_initialized() const {
132 return true;
133 }
134
135 // SpdyStream::Delegate methods.
OnHeadersSent()136 void WebSocketSpdyStreamAdapter::OnHeadersSent() {
137 headers_sent_ = true;
138 if (delegate_)
139 delegate_->OnHeadersSent();
140 }
141
OnEarlyHintsReceived(const spdy::Http2HeaderBlock & headers)142 void WebSocketSpdyStreamAdapter::OnEarlyHintsReceived(
143 const spdy::Http2HeaderBlock& headers) {
144 // This callback should not be called for a WebSocket handshake.
145 NOTREACHED();
146 }
147
OnHeadersReceived(const spdy::Http2HeaderBlock & response_headers)148 void WebSocketSpdyStreamAdapter::OnHeadersReceived(
149 const spdy::Http2HeaderBlock& response_headers) {
150 if (delegate_)
151 delegate_->OnHeadersReceived(response_headers);
152 }
153
OnDataReceived(std::unique_ptr<SpdyBuffer> buffer)154 void WebSocketSpdyStreamAdapter::OnDataReceived(
155 std::unique_ptr<SpdyBuffer> buffer) {
156 if (!buffer) {
157 // This is slightly wrong semantically, as it's still possible to write to
158 // the stream at this point. However, if the server closes the stream
159 // without waiting for a close frame from us, that means it is not
160 // interested in a clean shutdown. In which case we don't need to worry
161 // about sending any remaining data we might have buffered. This results in
162 // a call to OnClose() which then informs our delegate.
163 stream_->Close();
164 return;
165 }
166
167 read_data_.Enqueue(std::move(buffer));
168 if (read_callback_)
169 std::move(read_callback_).Run(CopySavedReadDataIntoBuffer());
170 }
171
OnDataSent()172 void WebSocketSpdyStreamAdapter::OnDataSent() {
173 DCHECK(write_callback_);
174
175 std::move(write_callback_).Run(write_length_);
176 }
177
OnTrailers(const spdy::Http2HeaderBlock & trailers)178 void WebSocketSpdyStreamAdapter::OnTrailers(
179 const spdy::Http2HeaderBlock& trailers) {}
180
OnClose(int status)181 void WebSocketSpdyStreamAdapter::OnClose(int status) {
182 DCHECK_NE(ERR_IO_PENDING, status);
183 DCHECK_LE(status, 0);
184
185 if (status == OK) {
186 status = ERR_CONNECTION_CLOSED;
187 }
188
189 stream_error_ = status;
190 stream_ = nullptr;
191
192 auto self = weak_factory_.GetWeakPtr();
193
194 if (read_callback_) {
195 DCHECK(read_data_.IsEmpty());
196 // Might destroy |this|.
197 std::move(read_callback_).Run(status);
198 if (!self)
199 return;
200 }
201 if (write_callback_) {
202 // Might destroy |this|.
203 std::move(write_callback_).Run(status);
204 if (!self)
205 return;
206 }
207
208 // Delay calling delegate_->OnClose() until all buffered data are read.
209 if (read_data_.IsEmpty() && delegate_) {
210 // Might destroy |this|.
211 delegate_->OnClose(status);
212 }
213 }
214
CanGreaseFrameType() const215 bool WebSocketSpdyStreamAdapter::CanGreaseFrameType() const {
216 return false;
217 }
218
source_dependency() const219 NetLogSource WebSocketSpdyStreamAdapter::source_dependency() const {
220 return net_log_.source();
221 }
222
CopySavedReadDataIntoBuffer()223 int WebSocketSpdyStreamAdapter::CopySavedReadDataIntoBuffer() {
224 DCHECK(read_buffer_);
225 DCHECK(read_length_);
226 int rv = read_data_.Dequeue(read_buffer_->data(), read_length_);
227 read_buffer_ = nullptr;
228 read_length_ = 0u;
229
230 // Stream has been destroyed earlier but delegate_->OnClose() call was
231 // delayed until all buffered data are read. PostTask so that Read() can
232 // return beforehand.
233 if (!stream_ && delegate_ && read_data_.IsEmpty()) {
234 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
235 FROM_HERE,
236 base::BindOnce(&WebSocketSpdyStreamAdapter::CallDelegateOnClose,
237 weak_factory_.GetWeakPtr()));
238 }
239
240 return rv;
241 }
242
CallDelegateOnClose()243 void WebSocketSpdyStreamAdapter::CallDelegateOnClose() {
244 if (delegate_)
245 delegate_->OnClose(stream_error_);
246 }
247
WebSocketQuicStreamAdapter(WebSocketQuicSpdyStream * websocket_quic_spdy_stream,Delegate * delegate)248 WebSocketQuicStreamAdapter::WebSocketQuicStreamAdapter(
249 WebSocketQuicSpdyStream* websocket_quic_spdy_stream,
250 Delegate* delegate)
251 : websocket_quic_spdy_stream_(websocket_quic_spdy_stream),
252 delegate_(delegate) {
253 websocket_quic_spdy_stream_->set_delegate(this);
254 }
255
~WebSocketQuicStreamAdapter()256 WebSocketQuicStreamAdapter::~WebSocketQuicStreamAdapter() {
257 if (websocket_quic_spdy_stream_) {
258 websocket_quic_spdy_stream_->set_delegate(nullptr);
259 }
260 }
261
WriteHeaders(spdy::Http2HeaderBlock header_block,bool fin)262 size_t WebSocketQuicStreamAdapter::WriteHeaders(
263 spdy::Http2HeaderBlock header_block,
264 bool fin) {
265 return websocket_quic_spdy_stream_->WriteHeaders(std::move(header_block), fin,
266 nullptr);
267 }
268
269 // WebSocketBasicStream::Adapter methods.
Read(IOBuffer * buf,int buf_len,CompletionOnceCallback callback)270 int WebSocketQuicStreamAdapter::Read(IOBuffer* buf,
271 int buf_len,
272 CompletionOnceCallback callback) {
273 if (!websocket_quic_spdy_stream_) {
274 return ERR_UNEXPECTED;
275 }
276
277 int rv = websocket_quic_spdy_stream_->Read(buf, buf_len);
278 if (rv != ERR_IO_PENDING) {
279 return rv;
280 }
281
282 read_callback_ = std::move(callback);
283 read_buffer_ = buf;
284 read_length_ = buf_len;
285 return ERR_IO_PENDING;
286 }
287
Write(IOBuffer * buf,int buf_len,CompletionOnceCallback callback,const NetworkTrafficAnnotationTag & traffic_annotation)288 int WebSocketQuicStreamAdapter::Write(
289 IOBuffer* buf,
290 int buf_len,
291 CompletionOnceCallback callback,
292 const NetworkTrafficAnnotationTag& traffic_annotation) {
293 // TODO(momoka): Write implementation.
294 return OK;
295 }
296
Disconnect()297 void WebSocketQuicStreamAdapter::Disconnect() {
298 if (websocket_quic_spdy_stream_) {
299 websocket_quic_spdy_stream_->Reset(quic::QUIC_STREAM_CANCELLED);
300 }
301 }
302
is_initialized() const303 bool WebSocketQuicStreamAdapter::is_initialized() const {
304 return true;
305 }
306
307 // WebSocketQuicSpdyStream::Delegate methods.
308
OnInitialHeadersComplete(bool fin,size_t frame_len,const quic::QuicHeaderList & quic_header_list)309 void WebSocketQuicStreamAdapter::OnInitialHeadersComplete(
310 bool fin,
311 size_t frame_len,
312 const quic::QuicHeaderList& quic_header_list) {
313 spdy::Http2HeaderBlock response_headers;
314 if (!quic::SpdyUtils::CopyAndValidateHeaders(quic_header_list, nullptr,
315 &response_headers)) {
316 DLOG(ERROR) << "Failed to parse header list: "
317 << quic_header_list.DebugString();
318 websocket_quic_spdy_stream_->ConsumeHeaderList();
319 websocket_quic_spdy_stream_->Reset(quic::QUIC_BAD_APPLICATION_PAYLOAD);
320 return;
321 }
322 websocket_quic_spdy_stream_->ConsumeHeaderList();
323 delegate_->OnHeadersReceived(response_headers);
324 }
325
OnBodyAvailable()326 void WebSocketQuicStreamAdapter::OnBodyAvailable() {
327 if (!websocket_quic_spdy_stream_->FinishedReadingHeaders()) {
328 // Buffer the data in the sequencer until the headers have been read.
329 return;
330 }
331
332 if (!websocket_quic_spdy_stream_->HasBytesToRead()) {
333 return;
334 }
335
336 if (!read_callback_) {
337 // Wait for Read() to be called.
338 return;
339 }
340
341 DCHECK(read_buffer_);
342 DCHECK_GT(read_length_, 0);
343
344 int rv = websocket_quic_spdy_stream_->Read(read_buffer_, read_length_);
345
346 if (rv == ERR_IO_PENDING) {
347 return;
348 }
349
350 read_buffer_ = nullptr;
351 read_length_ = 0;
352 std::move(read_callback_).Run(rv);
353 }
354
ClearStream()355 void WebSocketQuicStreamAdapter::ClearStream() {
356 if (websocket_quic_spdy_stream_) {
357 websocket_quic_spdy_stream_ = nullptr;
358 }
359 }
360
361 } // namespace net
362