xref: /aosp_15_r20/external/cronet/net/websockets/websocket_basic_stream_adapters.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2018 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/websockets/websocket_basic_stream_adapters.h"
6 
7 #include <cstring>
8 #include <ostream>
9 #include <utility>
10 
11 #include "base/check.h"
12 #include "base/check_op.h"
13 #include "base/functional/bind.h"
14 #include "base/functional/callback.h"
15 #include "base/location.h"
16 #include "base/logging.h"
17 #include "base/memory/scoped_refptr.h"
18 #include "base/notreached.h"
19 #include "base/task/single_thread_task_runner.h"
20 #include "net/base/io_buffer.h"
21 #include "net/socket/client_socket_handle.h"
22 #include "net/socket/stream_socket.h"
23 #include "net/spdy/spdy_buffer.h"
24 #include "net/third_party/quiche/src/quiche/quic/core/http/quic_header_list.h"
25 #include "net/third_party/quiche/src/quiche/quic/core/http/spdy_utils.h"
26 #include "net/third_party/quiche/src/quiche/quic/core/quic_ack_listener_interface.h"
27 #include "net/third_party/quiche/src/quiche/quic/core/quic_error_codes.h"
28 #include "net/websockets/websocket_quic_spdy_stream.h"
29 
30 namespace net {
31 struct NetworkTrafficAnnotationTag;
32 
WebSocketClientSocketHandleAdapter(std::unique_ptr<ClientSocketHandle> connection)33 WebSocketClientSocketHandleAdapter::WebSocketClientSocketHandleAdapter(
34     std::unique_ptr<ClientSocketHandle> connection)
35     : connection_(std::move(connection)) {}
36 
37 WebSocketClientSocketHandleAdapter::~WebSocketClientSocketHandleAdapter() =
38     default;
39 
Read(IOBuffer * buf,int buf_len,CompletionOnceCallback callback)40 int WebSocketClientSocketHandleAdapter::Read(IOBuffer* buf,
41                                              int buf_len,
42                                              CompletionOnceCallback callback) {
43   return connection_->socket()->Read(buf, buf_len, std::move(callback));
44 }
45 
Write(IOBuffer * buf,int buf_len,CompletionOnceCallback callback,const NetworkTrafficAnnotationTag & traffic_annotation)46 int WebSocketClientSocketHandleAdapter::Write(
47     IOBuffer* buf,
48     int buf_len,
49     CompletionOnceCallback callback,
50     const NetworkTrafficAnnotationTag& traffic_annotation) {
51   return connection_->socket()->Write(buf, buf_len, std::move(callback),
52                                       traffic_annotation);
53 }
54 
Disconnect()55 void WebSocketClientSocketHandleAdapter::Disconnect() {
56   connection_->socket()->Disconnect();
57 }
58 
is_initialized() const59 bool WebSocketClientSocketHandleAdapter::is_initialized() const {
60   return connection_->is_initialized();
61 }
62 
WebSocketSpdyStreamAdapter(base::WeakPtr<SpdyStream> stream,Delegate * delegate,NetLogWithSource net_log)63 WebSocketSpdyStreamAdapter::WebSocketSpdyStreamAdapter(
64     base::WeakPtr<SpdyStream> stream,
65     Delegate* delegate,
66     NetLogWithSource net_log)
67     : stream_(stream), delegate_(delegate), net_log_(net_log) {
68   stream_->SetDelegate(this);
69 }
70 
~WebSocketSpdyStreamAdapter()71 WebSocketSpdyStreamAdapter::~WebSocketSpdyStreamAdapter() {
72   if (stream_) {
73     // DetachDelegate() also cancels the stream.
74     stream_->DetachDelegate();
75   }
76 }
77 
DetachDelegate()78 void WebSocketSpdyStreamAdapter::DetachDelegate() {
79   delegate_ = nullptr;
80 }
81 
Read(IOBuffer * buf,int buf_len,CompletionOnceCallback callback)82 int WebSocketSpdyStreamAdapter::Read(IOBuffer* buf,
83                                      int buf_len,
84                                      CompletionOnceCallback callback) {
85   DCHECK(!read_callback_);
86   DCHECK_LT(0, buf_len);
87 
88   DCHECK(!read_buffer_);
89   read_buffer_ = buf;
90   // |read_length_| is size_t and |buf_len| is a non-negative int, therefore
91   // conversion is always valid.
92   DCHECK(!read_length_);
93   read_length_ = buf_len;
94 
95   if (!read_data_.IsEmpty())
96     return CopySavedReadDataIntoBuffer();
97 
98   if (!stream_)
99     return stream_error_;
100 
101   read_callback_ = std::move(callback);
102   return ERR_IO_PENDING;
103 }
104 
Write(IOBuffer * buf,int buf_len,CompletionOnceCallback callback,const NetworkTrafficAnnotationTag & traffic_annotation)105 int WebSocketSpdyStreamAdapter::Write(
106     IOBuffer* buf,
107     int buf_len,
108     CompletionOnceCallback callback,
109     const NetworkTrafficAnnotationTag& traffic_annotation) {
110   CHECK(headers_sent_);
111   DCHECK(!write_callback_);
112   DCHECK(callback);
113   DCHECK_LT(0, buf_len);
114 
115   if (!stream_)
116     return stream_error_;
117 
118   stream_->SendData(buf, buf_len, MORE_DATA_TO_SEND);
119   write_callback_ = std::move(callback);
120   write_length_ = buf_len;
121   return ERR_IO_PENDING;
122 }
123 
Disconnect()124 void WebSocketSpdyStreamAdapter::Disconnect() {
125   if (stream_) {
126     stream_->DetachDelegate();
127     stream_ = nullptr;
128   }
129 }
130 
is_initialized() const131 bool WebSocketSpdyStreamAdapter::is_initialized() const {
132   return true;
133 }
134 
135 // SpdyStream::Delegate methods.
OnHeadersSent()136 void WebSocketSpdyStreamAdapter::OnHeadersSent() {
137   headers_sent_ = true;
138   if (delegate_)
139     delegate_->OnHeadersSent();
140 }
141 
OnEarlyHintsReceived(const spdy::Http2HeaderBlock & headers)142 void WebSocketSpdyStreamAdapter::OnEarlyHintsReceived(
143     const spdy::Http2HeaderBlock& headers) {
144   // This callback should not be called for a WebSocket handshake.
145   NOTREACHED();
146 }
147 
OnHeadersReceived(const spdy::Http2HeaderBlock & response_headers)148 void WebSocketSpdyStreamAdapter::OnHeadersReceived(
149     const spdy::Http2HeaderBlock& response_headers) {
150   if (delegate_)
151     delegate_->OnHeadersReceived(response_headers);
152 }
153 
OnDataReceived(std::unique_ptr<SpdyBuffer> buffer)154 void WebSocketSpdyStreamAdapter::OnDataReceived(
155     std::unique_ptr<SpdyBuffer> buffer) {
156   if (!buffer) {
157     // This is slightly wrong semantically, as it's still possible to write to
158     // the stream at this point. However, if the server closes the stream
159     // without waiting for a close frame from us, that means it is not
160     // interested in a clean shutdown. In which case we don't need to worry
161     // about sending any remaining data we might have buffered. This results in
162     // a call to OnClose() which then informs our delegate.
163     stream_->Close();
164     return;
165   }
166 
167   read_data_.Enqueue(std::move(buffer));
168   if (read_callback_)
169     std::move(read_callback_).Run(CopySavedReadDataIntoBuffer());
170 }
171 
OnDataSent()172 void WebSocketSpdyStreamAdapter::OnDataSent() {
173   DCHECK(write_callback_);
174 
175   std::move(write_callback_).Run(write_length_);
176 }
177 
OnTrailers(const spdy::Http2HeaderBlock & trailers)178 void WebSocketSpdyStreamAdapter::OnTrailers(
179     const spdy::Http2HeaderBlock& trailers) {}
180 
OnClose(int status)181 void WebSocketSpdyStreamAdapter::OnClose(int status) {
182   DCHECK_NE(ERR_IO_PENDING, status);
183   DCHECK_LE(status, 0);
184 
185   if (status == OK) {
186     status = ERR_CONNECTION_CLOSED;
187   }
188 
189   stream_error_ = status;
190   stream_ = nullptr;
191 
192   auto self = weak_factory_.GetWeakPtr();
193 
194   if (read_callback_) {
195     DCHECK(read_data_.IsEmpty());
196     // Might destroy |this|.
197     std::move(read_callback_).Run(status);
198     if (!self)
199       return;
200   }
201   if (write_callback_) {
202     // Might destroy |this|.
203     std::move(write_callback_).Run(status);
204     if (!self)
205       return;
206   }
207 
208   // Delay calling delegate_->OnClose() until all buffered data are read.
209   if (read_data_.IsEmpty() && delegate_) {
210     // Might destroy |this|.
211     delegate_->OnClose(status);
212   }
213 }
214 
CanGreaseFrameType() const215 bool WebSocketSpdyStreamAdapter::CanGreaseFrameType() const {
216   return false;
217 }
218 
source_dependency() const219 NetLogSource WebSocketSpdyStreamAdapter::source_dependency() const {
220   return net_log_.source();
221 }
222 
CopySavedReadDataIntoBuffer()223 int WebSocketSpdyStreamAdapter::CopySavedReadDataIntoBuffer() {
224   DCHECK(read_buffer_);
225   DCHECK(read_length_);
226   int rv = read_data_.Dequeue(read_buffer_->data(), read_length_);
227   read_buffer_ = nullptr;
228   read_length_ = 0u;
229 
230   // Stream has been destroyed earlier but delegate_->OnClose() call was
231   // delayed until all buffered data are read.  PostTask so that Read() can
232   // return beforehand.
233   if (!stream_ && delegate_ && read_data_.IsEmpty()) {
234     base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
235         FROM_HERE,
236         base::BindOnce(&WebSocketSpdyStreamAdapter::CallDelegateOnClose,
237                        weak_factory_.GetWeakPtr()));
238   }
239 
240   return rv;
241 }
242 
CallDelegateOnClose()243 void WebSocketSpdyStreamAdapter::CallDelegateOnClose() {
244   if (delegate_)
245     delegate_->OnClose(stream_error_);
246 }
247 
WebSocketQuicStreamAdapter(WebSocketQuicSpdyStream * websocket_quic_spdy_stream,Delegate * delegate)248 WebSocketQuicStreamAdapter::WebSocketQuicStreamAdapter(
249     WebSocketQuicSpdyStream* websocket_quic_spdy_stream,
250     Delegate* delegate)
251     : websocket_quic_spdy_stream_(websocket_quic_spdy_stream),
252       delegate_(delegate) {
253   websocket_quic_spdy_stream_->set_delegate(this);
254 }
255 
~WebSocketQuicStreamAdapter()256 WebSocketQuicStreamAdapter::~WebSocketQuicStreamAdapter() {
257   if (websocket_quic_spdy_stream_) {
258     websocket_quic_spdy_stream_->set_delegate(nullptr);
259   }
260 }
261 
WriteHeaders(spdy::Http2HeaderBlock header_block,bool fin)262 size_t WebSocketQuicStreamAdapter::WriteHeaders(
263     spdy::Http2HeaderBlock header_block,
264     bool fin) {
265   return websocket_quic_spdy_stream_->WriteHeaders(std::move(header_block), fin,
266                                                    nullptr);
267 }
268 
269 // WebSocketBasicStream::Adapter methods.
Read(IOBuffer * buf,int buf_len,CompletionOnceCallback callback)270 int WebSocketQuicStreamAdapter::Read(IOBuffer* buf,
271                                      int buf_len,
272                                      CompletionOnceCallback callback) {
273   if (!websocket_quic_spdy_stream_) {
274     return ERR_UNEXPECTED;
275   }
276 
277   int rv = websocket_quic_spdy_stream_->Read(buf, buf_len);
278   if (rv != ERR_IO_PENDING) {
279     return rv;
280   }
281 
282   read_callback_ = std::move(callback);
283   read_buffer_ = buf;
284   read_length_ = buf_len;
285   return ERR_IO_PENDING;
286 }
287 
Write(IOBuffer * buf,int buf_len,CompletionOnceCallback callback,const NetworkTrafficAnnotationTag & traffic_annotation)288 int WebSocketQuicStreamAdapter::Write(
289     IOBuffer* buf,
290     int buf_len,
291     CompletionOnceCallback callback,
292     const NetworkTrafficAnnotationTag& traffic_annotation) {
293   // TODO(momoka): Write implementation.
294   return OK;
295 }
296 
Disconnect()297 void WebSocketQuicStreamAdapter::Disconnect() {
298   if (websocket_quic_spdy_stream_) {
299     websocket_quic_spdy_stream_->Reset(quic::QUIC_STREAM_CANCELLED);
300   }
301 }
302 
is_initialized() const303 bool WebSocketQuicStreamAdapter::is_initialized() const {
304   return true;
305 }
306 
307 // WebSocketQuicSpdyStream::Delegate methods.
308 
OnInitialHeadersComplete(bool fin,size_t frame_len,const quic::QuicHeaderList & quic_header_list)309 void WebSocketQuicStreamAdapter::OnInitialHeadersComplete(
310     bool fin,
311     size_t frame_len,
312     const quic::QuicHeaderList& quic_header_list) {
313   spdy::Http2HeaderBlock response_headers;
314   if (!quic::SpdyUtils::CopyAndValidateHeaders(quic_header_list, nullptr,
315                                                &response_headers)) {
316     DLOG(ERROR) << "Failed to parse header list: "
317                 << quic_header_list.DebugString();
318     websocket_quic_spdy_stream_->ConsumeHeaderList();
319     websocket_quic_spdy_stream_->Reset(quic::QUIC_BAD_APPLICATION_PAYLOAD);
320     return;
321   }
322   websocket_quic_spdy_stream_->ConsumeHeaderList();
323   delegate_->OnHeadersReceived(response_headers);
324 }
325 
OnBodyAvailable()326 void WebSocketQuicStreamAdapter::OnBodyAvailable() {
327   if (!websocket_quic_spdy_stream_->FinishedReadingHeaders()) {
328     // Buffer the data in the sequencer until the headers have been read.
329     return;
330   }
331 
332   if (!websocket_quic_spdy_stream_->HasBytesToRead()) {
333     return;
334   }
335 
336   if (!read_callback_) {
337     // Wait for Read() to be called.
338     return;
339   }
340 
341   DCHECK(read_buffer_);
342   DCHECK_GT(read_length_, 0);
343 
344   int rv = websocket_quic_spdy_stream_->Read(read_buffer_, read_length_);
345 
346   if (rv == ERR_IO_PENDING) {
347     return;
348   }
349 
350   read_buffer_ = nullptr;
351   read_length_ = 0;
352   std::move(read_callback_).Run(rv);
353 }
354 
ClearStream()355 void WebSocketQuicStreamAdapter::ClearStream() {
356   if (websocket_quic_spdy_stream_) {
357     websocket_quic_spdy_stream_ = nullptr;
358   }
359 }
360 
361 }  // namespace net
362