1 // Copyright 2012 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/socket/transport_client_socket_pool.h"
6
7 #include <string_view>
8 #include <utility>
9
10 #include "base/auto_reset.h"
11 #include "base/barrier_closure.h"
12 #include "base/check_op.h"
13 #include "base/compiler_specific.h"
14 #include "base/containers/contains.h"
15 #include "base/format_macros.h"
16 #include "base/functional/bind.h"
17 #include "base/functional/callback_helpers.h"
18 #include "base/location.h"
19 #include "base/memory/ptr_util.h"
20 #include "base/metrics/histogram_macros.h"
21 #include "base/notreached.h"
22 #include "base/ranges/algorithm.h"
23 #include "base/strings/string_util.h"
24 #include "base/task/single_thread_task_runner.h"
25 #include "base/time/time.h"
26 #include "base/values.h"
27 #include "net/base/host_port_pair.h"
28 #include "net/base/net_errors.h"
29 #include "net/base/proxy_chain.h"
30 #include "net/base/proxy_server.h"
31 #include "net/log/net_log.h"
32 #include "net/log/net_log_event_type.h"
33 #include "net/log/net_log_source.h"
34 #include "net/socket/connect_job_factory.h"
35 #include "net/traffic_annotation/network_traffic_annotation.h"
36 #include "url/gurl.h"
37
38 namespace net {
39
40 namespace {
41
42 // Indicate whether or not we should establish a new transport layer connection
43 // after a certain timeout has passed without receiving an ACK.
44 bool g_connect_backup_jobs_enabled = true;
45
NetLogCreateConnectJobParams(bool backup_job,const ClientSocketPool::GroupId * group_id)46 base::Value::Dict NetLogCreateConnectJobParams(
47 bool backup_job,
48 const ClientSocketPool::GroupId* group_id) {
49 return base::Value::Dict()
50 .Set("backup_job", backup_job)
51 .Set("group_id", group_id->ToString());
52 }
53
54 } // namespace
55
56 const char TransportClientSocketPool::kCertDatabaseChanged[] =
57 "Cert database changed";
58 const char TransportClientSocketPool::kCertVerifierChanged[] =
59 "Cert verifier changed";
60 const char TransportClientSocketPool::kClosedConnectionReturnedToPool[] =
61 "Connection was closed when it was returned to the pool";
62 const char TransportClientSocketPool::kDataReceivedUnexpectedly[] =
63 "Data received unexpectedly";
64 const char TransportClientSocketPool::kIdleTimeLimitExpired[] =
65 "Idle time limit expired";
66 const char TransportClientSocketPool::kNetworkChanged[] = "Network changed";
67 const char TransportClientSocketPool::kRemoteSideClosedConnection[] =
68 "Remote side closed connection";
69 const char TransportClientSocketPool::kSocketGenerationOutOfDate[] =
70 "Socket generation out of date";
71 const char TransportClientSocketPool::kSocketPoolDestroyed[] =
72 "Socket pool destroyed";
73 const char TransportClientSocketPool::kSslConfigChanged[] =
74 "SSL configuration changed";
75
Request(ClientSocketHandle * handle,CompletionOnceCallback callback,const ProxyAuthCallback & proxy_auth_callback,RequestPriority priority,const SocketTag & socket_tag,RespectLimits respect_limits,Flags flags,scoped_refptr<SocketParams> socket_params,const std::optional<NetworkTrafficAnnotationTag> & proxy_annotation_tag,const NetLogWithSource & net_log)76 TransportClientSocketPool::Request::Request(
77 ClientSocketHandle* handle,
78 CompletionOnceCallback callback,
79 const ProxyAuthCallback& proxy_auth_callback,
80 RequestPriority priority,
81 const SocketTag& socket_tag,
82 RespectLimits respect_limits,
83 Flags flags,
84 scoped_refptr<SocketParams> socket_params,
85 const std::optional<NetworkTrafficAnnotationTag>& proxy_annotation_tag,
86 const NetLogWithSource& net_log)
87 : handle_(handle),
88 callback_(std::move(callback)),
89 proxy_auth_callback_(proxy_auth_callback),
90 priority_(priority),
91 respect_limits_(respect_limits),
92 flags_(flags),
93 socket_params_(std::move(socket_params)),
94 proxy_annotation_tag_(proxy_annotation_tag),
95 net_log_(net_log),
96 socket_tag_(socket_tag) {
97 if (respect_limits_ == ClientSocketPool::RespectLimits::DISABLED)
98 DCHECK_EQ(priority_, MAXIMUM_PRIORITY);
99 }
100
101 TransportClientSocketPool::Request::~Request() = default;
102
AssignJob(ConnectJob * job)103 void TransportClientSocketPool::Request::AssignJob(ConnectJob* job) {
104 DCHECK(job);
105 DCHECK(!job_);
106 job_ = job;
107 if (job_->priority() != priority_)
108 job_->ChangePriority(priority_);
109 }
110
ReleaseJob()111 ConnectJob* TransportClientSocketPool::Request::ReleaseJob() {
112 DCHECK(job_);
113 ConnectJob* job = job_;
114 job_ = nullptr;
115 return job;
116 }
117
118 struct TransportClientSocketPool::IdleSocket {
119 // An idle socket can't be used if it is disconnected or has been used
120 // before and has received data unexpectedly (hence no longer idle). The
121 // unread data would be mistaken for the beginning of the next response if
122 // we were to use the socket for a new request.
123 //
124 // Note that a socket that has never been used before (like a preconnected
125 // socket) may be used even with unread data. This may be, e.g., a SPDY
126 // SETTINGS frame.
127 //
128 // If the socket is not usable, |net_log_reason_utf8| is set to a string
129 // indicating why the socket is not usable.
130 bool IsUsable(const char** net_log_reason_utf8) const;
131
132 std::unique_ptr<StreamSocket> socket;
133 base::TimeTicks start_time;
134 };
135
TransportClientSocketPool(int max_sockets,int max_sockets_per_group,base::TimeDelta unused_idle_socket_timeout,const ProxyChain & proxy_chain,bool is_for_websockets,const CommonConnectJobParams * common_connect_job_params,bool cleanup_on_ip_address_change)136 TransportClientSocketPool::TransportClientSocketPool(
137 int max_sockets,
138 int max_sockets_per_group,
139 base::TimeDelta unused_idle_socket_timeout,
140 const ProxyChain& proxy_chain,
141 bool is_for_websockets,
142 const CommonConnectJobParams* common_connect_job_params,
143 bool cleanup_on_ip_address_change)
144 : TransportClientSocketPool(max_sockets,
145 max_sockets_per_group,
146 unused_idle_socket_timeout,
147 ClientSocketPool::used_idle_socket_timeout(),
148 proxy_chain,
149 is_for_websockets,
150 common_connect_job_params,
151 cleanup_on_ip_address_change,
152 std::make_unique<ConnectJobFactory>(),
153 common_connect_job_params->ssl_client_context,
154 /*connect_backup_jobs_enabled=*/true) {}
155
~TransportClientSocketPool()156 TransportClientSocketPool::~TransportClientSocketPool() {
157 // Clean up any idle sockets and pending connect jobs. Assert that we have no
158 // remaining active sockets or pending requests. They should have all been
159 // cleaned up prior to |this| being destroyed.
160 FlushWithError(ERR_ABORTED, kSocketPoolDestroyed);
161 DCHECK(group_map_.empty());
162 DCHECK(pending_callback_map_.empty());
163 DCHECK_EQ(0, connecting_socket_count_);
164 DCHECK_EQ(0, handed_out_socket_count_);
165 CHECK(higher_pools_.empty());
166
167 if (ssl_client_context_)
168 ssl_client_context_->RemoveObserver(this);
169
170 if (cleanup_on_ip_address_change_)
171 NetworkChangeNotifier::RemoveIPAddressObserver(this);
172 }
173
174 std::unique_ptr<TransportClientSocketPool>
CreateForTesting(int max_sockets,int max_sockets_per_group,base::TimeDelta unused_idle_socket_timeout,base::TimeDelta used_idle_socket_timeout,const ProxyChain & proxy_chain,bool is_for_websockets,const CommonConnectJobParams * common_connect_job_params,std::unique_ptr<ConnectJobFactory> connect_job_factory,SSLClientContext * ssl_client_context,bool connect_backup_jobs_enabled)175 TransportClientSocketPool::CreateForTesting(
176 int max_sockets,
177 int max_sockets_per_group,
178 base::TimeDelta unused_idle_socket_timeout,
179 base::TimeDelta used_idle_socket_timeout,
180 const ProxyChain& proxy_chain,
181 bool is_for_websockets,
182 const CommonConnectJobParams* common_connect_job_params,
183 std::unique_ptr<ConnectJobFactory> connect_job_factory,
184 SSLClientContext* ssl_client_context,
185 bool connect_backup_jobs_enabled) {
186 return base::WrapUnique<TransportClientSocketPool>(
187 new TransportClientSocketPool(
188 max_sockets, max_sockets_per_group, unused_idle_socket_timeout,
189 used_idle_socket_timeout, proxy_chain, is_for_websockets,
190 common_connect_job_params, /*cleanup_on_ip_address_change=*/true,
191 std::move(connect_job_factory), ssl_client_context,
192 connect_backup_jobs_enabled));
193 }
194
CallbackResultPair()195 TransportClientSocketPool::CallbackResultPair::CallbackResultPair()
196 : result(OK) {}
197
CallbackResultPair(CompletionOnceCallback callback_in,int result_in)198 TransportClientSocketPool::CallbackResultPair::CallbackResultPair(
199 CompletionOnceCallback callback_in,
200 int result_in)
201 : callback(std::move(callback_in)), result(result_in) {}
202
203 TransportClientSocketPool::CallbackResultPair::CallbackResultPair(
204 TransportClientSocketPool::CallbackResultPair&& other) = default;
205
206 TransportClientSocketPool::CallbackResultPair&
207 TransportClientSocketPool::CallbackResultPair::operator=(
208 TransportClientSocketPool::CallbackResultPair&& other) = default;
209
210 TransportClientSocketPool::CallbackResultPair::~CallbackResultPair() = default;
211
IsStalled() const212 bool TransportClientSocketPool::IsStalled() const {
213 // If fewer than |max_sockets_| are in use, then clearly |this| is not
214 // stalled.
215 if ((handed_out_socket_count_ + connecting_socket_count_) < max_sockets_)
216 return false;
217 // So in order to be stalled, |this| must be using at least |max_sockets_| AND
218 // |this| must have a request that is actually stalled on the global socket
219 // limit. To find such a request, look for a group that has more requests
220 // than jobs AND where the number of sockets is less than
221 // |max_sockets_per_group_|. (If the number of sockets is equal to
222 // |max_sockets_per_group_|, then the request is stalled on the group limit,
223 // which does not count.)
224 for (const auto& it : group_map_) {
225 if (it.second->CanUseAdditionalSocketSlot(max_sockets_per_group_))
226 return true;
227 }
228 return false;
229 }
230
AddHigherLayeredPool(HigherLayeredPool * higher_pool)231 void TransportClientSocketPool::AddHigherLayeredPool(
232 HigherLayeredPool* higher_pool) {
233 CHECK(higher_pool);
234 CHECK(!base::Contains(higher_pools_, higher_pool));
235 higher_pools_.insert(higher_pool);
236 }
237
RemoveHigherLayeredPool(HigherLayeredPool * higher_pool)238 void TransportClientSocketPool::RemoveHigherLayeredPool(
239 HigherLayeredPool* higher_pool) {
240 CHECK(higher_pool);
241 CHECK(base::Contains(higher_pools_, higher_pool));
242 higher_pools_.erase(higher_pool);
243 }
244
RequestSocket(const GroupId & group_id,scoped_refptr<SocketParams> params,const std::optional<NetworkTrafficAnnotationTag> & proxy_annotation_tag,RequestPriority priority,const SocketTag & socket_tag,RespectLimits respect_limits,ClientSocketHandle * handle,CompletionOnceCallback callback,const ProxyAuthCallback & proxy_auth_callback,const NetLogWithSource & net_log)245 int TransportClientSocketPool::RequestSocket(
246 const GroupId& group_id,
247 scoped_refptr<SocketParams> params,
248 const std::optional<NetworkTrafficAnnotationTag>& proxy_annotation_tag,
249 RequestPriority priority,
250 const SocketTag& socket_tag,
251 RespectLimits respect_limits,
252 ClientSocketHandle* handle,
253 CompletionOnceCallback callback,
254 const ProxyAuthCallback& proxy_auth_callback,
255 const NetLogWithSource& net_log) {
256 CHECK(callback);
257 CHECK(handle);
258
259 NetLogTcpClientSocketPoolRequestedSocket(net_log, group_id);
260
261 std::unique_ptr<Request> request = std::make_unique<Request>(
262 handle, std::move(callback), proxy_auth_callback, priority, socket_tag,
263 respect_limits, NORMAL, std::move(params), proxy_annotation_tag, net_log);
264
265 // Cleanup any timed-out idle sockets.
266 CleanupIdleSockets(false, nullptr /* net_log_reason_utf8 */);
267
268 request->net_log().BeginEvent(NetLogEventType::SOCKET_POOL);
269
270 int rv =
271 RequestSocketInternal(group_id, *request,
272 /*preconnect_done_closure=*/base::OnceClosure());
273 if (rv != ERR_IO_PENDING) {
274 if (rv == OK) {
275 request->handle()->socket()->ApplySocketTag(request->socket_tag());
276 }
277 request->net_log().EndEventWithNetErrorCode(NetLogEventType::SOCKET_POOL,
278 rv);
279 CHECK(!request->handle()->is_initialized());
280 request.reset();
281 } else {
282 Group* group = GetOrCreateGroup(group_id);
283 group->InsertUnboundRequest(std::move(request));
284 // Have to do this asynchronously, as closing sockets in higher level pools
285 // call back in to |this|, which will cause all sorts of fun and exciting
286 // re-entrancy issues if the socket pool is doing something else at the
287 // time.
288 if (group->CanUseAdditionalSocketSlot(max_sockets_per_group_)) {
289 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
290 FROM_HERE,
291 base::BindOnce(
292 &TransportClientSocketPool::TryToCloseSocketsInLayeredPools,
293 weak_factory_.GetWeakPtr()));
294 }
295 }
296 return rv;
297 }
298
RequestSockets(const GroupId & group_id,scoped_refptr<SocketParams> params,const std::optional<NetworkTrafficAnnotationTag> & proxy_annotation_tag,int num_sockets,CompletionOnceCallback callback,const NetLogWithSource & net_log)299 int TransportClientSocketPool::RequestSockets(
300 const GroupId& group_id,
301 scoped_refptr<SocketParams> params,
302 const std::optional<NetworkTrafficAnnotationTag>& proxy_annotation_tag,
303 int num_sockets,
304 CompletionOnceCallback callback,
305 const NetLogWithSource& net_log) {
306 // TODO(eroman): Split out the host and port parameters.
307 net_log.AddEvent(NetLogEventType::TCP_CLIENT_SOCKET_POOL_REQUESTED_SOCKETS,
308 [&] { return NetLogGroupIdParams(group_id); });
309
310 Request request(nullptr /* no handle */, CompletionOnceCallback(),
311 ProxyAuthCallback(), IDLE, SocketTag(),
312 RespectLimits::ENABLED, NO_IDLE_SOCKETS, std::move(params),
313 proxy_annotation_tag, net_log);
314
315 // Cleanup any timed-out idle sockets.
316 CleanupIdleSockets(false, nullptr /* net_log_reason_utf8 */);
317
318 if (num_sockets > max_sockets_per_group_) {
319 num_sockets = max_sockets_per_group_;
320 }
321
322 request.net_log().BeginEventWithIntParams(
323 NetLogEventType::SOCKET_POOL_CONNECTING_N_SOCKETS, "num_sockets",
324 num_sockets);
325
326 Group* group = GetOrCreateGroup(group_id);
327
328 // RequestSocketsInternal() may delete the group.
329 bool deleted_group = false;
330
331 int rv = OK;
332
333 base::RepeatingClosure preconnect_done_closure = base::BarrierClosure(
334 num_sockets,
335 base::BindOnce(
336 [](CompletionOnceCallback callback) {
337 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
338 FROM_HERE, base::BindOnce(std::move(callback), OK));
339 },
340 std::move(callback)));
341 int pending_connect_job_count = 0;
342 for (int num_iterations_left = num_sockets;
343 group->NumActiveSocketSlots() < num_sockets && num_iterations_left > 0;
344 num_iterations_left--) {
345 rv = RequestSocketInternal(group_id, request, preconnect_done_closure);
346 if (rv == ERR_IO_PENDING) {
347 ++pending_connect_job_count;
348 }
349 if (rv < 0 && rv != ERR_IO_PENDING) {
350 // We're encountering a synchronous error. Give up.
351 if (!base::Contains(group_map_, group_id))
352 deleted_group = true;
353 break;
354 }
355 if (!base::Contains(group_map_, group_id)) {
356 // Unexpected. The group should only be getting deleted on synchronous
357 // error.
358 NOTREACHED();
359 deleted_group = true;
360 break;
361 }
362 }
363
364 if (!deleted_group && group->IsEmpty())
365 RemoveGroup(group_id);
366
367 if (rv == ERR_IO_PENDING)
368 rv = OK;
369 request.net_log().EndEventWithNetErrorCode(
370 NetLogEventType::SOCKET_POOL_CONNECTING_N_SOCKETS, rv);
371
372 // Currently we don't handle preconnect errors. So this method returns OK even
373 // if failed to preconnect.
374 // TODO(crbug.com/1330235): Consider support error handlings when needed.
375 if (pending_connect_job_count == 0)
376 return OK;
377 for (int i = 0; i < num_sockets - pending_connect_job_count; ++i) {
378 preconnect_done_closure.Run();
379 }
380
381 return ERR_IO_PENDING;
382 }
383
RequestSocketInternal(const GroupId & group_id,const Request & request,base::OnceClosure preconnect_done_closure)384 int TransportClientSocketPool::RequestSocketInternal(
385 const GroupId& group_id,
386 const Request& request,
387 base::OnceClosure preconnect_done_closure) {
388 #if DCHECK_IS_ON()
389 DCHECK(!request_in_process_);
390 base::AutoReset<bool> auto_reset(&request_in_process_, true);
391 #endif // DCHECK_IS_ON()
392
393 ClientSocketHandle* const handle = request.handle();
394 const bool preconnecting = !handle;
395 DCHECK_EQ(preconnecting, !!preconnect_done_closure);
396
397 Group* group = nullptr;
398 auto group_it = group_map_.find(group_id);
399 if (group_it != group_map_.end()) {
400 group = group_it->second;
401
402 if (!(request.flags() & NO_IDLE_SOCKETS)) {
403 // Try to reuse a socket.
404 if (AssignIdleSocketToRequest(request, group))
405 return OK;
406 }
407
408 // If there are more ConnectJobs than pending requests, don't need to do
409 // anything. Can just wait for the extra job to connect, and then assign it
410 // to the request.
411 if (!preconnecting && group->TryToUseNeverAssignedConnectJob())
412 return ERR_IO_PENDING;
413
414 // Can we make another active socket now?
415 if (!group->HasAvailableSocketSlot(max_sockets_per_group_) &&
416 request.respect_limits() == RespectLimits::ENABLED) {
417 // TODO(willchan): Consider whether or not we need to close a socket in a
418 // higher layered group. I don't think this makes sense since we would
419 // just reuse that socket then if we needed one and wouldn't make it down
420 // to this layer.
421 request.net_log().AddEvent(
422 NetLogEventType::SOCKET_POOL_STALLED_MAX_SOCKETS_PER_GROUP);
423 return preconnecting ? ERR_PRECONNECT_MAX_SOCKET_LIMIT : ERR_IO_PENDING;
424 }
425 }
426
427 if (ReachedMaxSocketsLimit() &&
428 request.respect_limits() == RespectLimits::ENABLED) {
429 // NOTE(mmenke): Wonder if we really need different code for each case
430 // here. Only reason for them now seems to be preconnects.
431 if (idle_socket_count_ > 0) {
432 // There's an idle socket in this pool. Either that's because there's
433 // still one in this group, but we got here due to preconnecting
434 // bypassing idle sockets, or because there's an idle socket in another
435 // group.
436 bool closed = CloseOneIdleSocketExceptInGroup(group);
437 if (preconnecting && !closed)
438 return ERR_PRECONNECT_MAX_SOCKET_LIMIT;
439 } else {
440 // We could check if we really have a stalled group here, but it
441 // requires a scan of all groups, so just flip a flag here, and do the
442 // check later.
443 request.net_log().AddEvent(
444 NetLogEventType::SOCKET_POOL_STALLED_MAX_SOCKETS);
445 return preconnecting ? ERR_PRECONNECT_MAX_SOCKET_LIMIT : ERR_IO_PENDING;
446 }
447 }
448
449 // We couldn't find a socket to reuse, and there's space to allocate one,
450 // so allocate and connect a new one.
451 group = GetOrCreateGroup(group_id);
452 std::unique_ptr<ConnectJob> connect_job(
453 CreateConnectJob(group_id, request.socket_params(), proxy_chain_,
454 request.proxy_annotation_tag(), request.priority(),
455 request.socket_tag(), group));
456 connect_job->net_log().AddEvent(
457 NetLogEventType::SOCKET_POOL_CONNECT_JOB_CREATED, [&] {
458 return NetLogCreateConnectJobParams(false /* backup_job */, &group_id);
459 });
460
461 int rv = connect_job->Connect();
462 if (rv == ERR_IO_PENDING) {
463 if (preconnect_done_closure) {
464 DCHECK(preconnecting);
465 connect_job->set_done_closure(std::move(preconnect_done_closure));
466 }
467 // If we didn't have any sockets in this group, set a timer for potentially
468 // creating a new one. If the SYN is lost, this backup socket may complete
469 // before the slow socket, improving end user latency.
470 if (connect_backup_jobs_enabled_ && group->IsEmpty())
471 group->StartBackupJobTimer(group_id);
472 group->AddJob(std::move(connect_job), preconnecting);
473 connecting_socket_count_++;
474 return rv;
475 }
476
477 LogBoundConnectJobToRequest(connect_job->net_log().source(), request);
478 if (preconnecting) {
479 if (rv == OK)
480 AddIdleSocket(connect_job->PassSocket(), group);
481 } else {
482 DCHECK(handle);
483 if (rv != OK)
484 handle->SetAdditionalErrorState(connect_job.get());
485 std::unique_ptr<StreamSocket> socket = connect_job->PassSocket();
486 if (socket) {
487 HandOutSocket(std::move(socket), ClientSocketHandle::UNUSED,
488 connect_job->connect_timing(), handle,
489 base::TimeDelta() /* idle_time */, group,
490 request.net_log());
491 }
492 }
493 if (group->IsEmpty())
494 RemoveGroup(group_id);
495
496 return rv;
497 }
498
AssignIdleSocketToRequest(const Request & request,Group * group)499 bool TransportClientSocketPool::AssignIdleSocketToRequest(
500 const Request& request,
501 Group* group) {
502 std::list<IdleSocket>* idle_sockets = group->mutable_idle_sockets();
503 auto idle_socket_it = idle_sockets->end();
504
505 // Iterate through the idle sockets forwards (oldest to newest)
506 // * Delete any disconnected ones.
507 // * If we find a used idle socket, assign to |idle_socket|. At the end,
508 // the |idle_socket_it| will be set to the newest used idle socket.
509 for (auto it = idle_sockets->begin(); it != idle_sockets->end();) {
510 // Check whether socket is usable. Note that it's unlikely that the socket
511 // is not usable because this function is always invoked after a
512 // reusability check, but in theory socket can be closed asynchronously.
513 const char* net_log_reason_utf8;
514 if (!it->IsUsable(&net_log_reason_utf8)) {
515 it->socket->NetLog().AddEventWithStringParams(
516 NetLogEventType::SOCKET_POOL_CLOSING_SOCKET, "reason",
517 net_log_reason_utf8);
518 DecrementIdleCount();
519 it = idle_sockets->erase(it);
520 continue;
521 }
522
523 if (it->socket->WasEverUsed()) {
524 // We found one we can reuse!
525 idle_socket_it = it;
526 }
527
528 ++it;
529 }
530
531 // If we haven't found an idle socket, that means there are no used idle
532 // sockets. Pick the oldest (first) idle socket (FIFO).
533
534 if (idle_socket_it == idle_sockets->end() && !idle_sockets->empty())
535 idle_socket_it = idle_sockets->begin();
536
537 if (idle_socket_it != idle_sockets->end()) {
538 DecrementIdleCount();
539 base::TimeDelta idle_time =
540 base::TimeTicks::Now() - idle_socket_it->start_time;
541 std::unique_ptr<StreamSocket> socket = std::move(idle_socket_it->socket);
542 idle_sockets->erase(idle_socket_it);
543 // TODO(davidben): If |idle_time| is under some low watermark, consider
544 // treating as UNUSED rather than UNUSED_IDLE. This will avoid
545 // HttpNetworkTransaction retrying on some errors.
546 ClientSocketHandle::SocketReuseType reuse_type =
547 socket->WasEverUsed() ? ClientSocketHandle::REUSED_IDLE
548 : ClientSocketHandle::UNUSED_IDLE;
549
550 HandOutSocket(std::move(socket), reuse_type,
551 LoadTimingInfo::ConnectTiming(), request.handle(), idle_time,
552 group, request.net_log());
553 return true;
554 }
555
556 return false;
557 }
558
559 // static
LogBoundConnectJobToRequest(const NetLogSource & connect_job_source,const Request & request)560 void TransportClientSocketPool::LogBoundConnectJobToRequest(
561 const NetLogSource& connect_job_source,
562 const Request& request) {
563 request.net_log().AddEventReferencingSource(
564 NetLogEventType::SOCKET_POOL_BOUND_TO_CONNECT_JOB, connect_job_source);
565 }
566
SetPriority(const GroupId & group_id,ClientSocketHandle * handle,RequestPriority priority)567 void TransportClientSocketPool::SetPriority(const GroupId& group_id,
568 ClientSocketHandle* handle,
569 RequestPriority priority) {
570 auto group_it = group_map_.find(group_id);
571 if (group_it == group_map_.end()) {
572 DCHECK(base::Contains(pending_callback_map_, handle));
573 // The Request has already completed and been destroyed; nothing to
574 // reprioritize.
575 return;
576 }
577
578 group_it->second->SetPriority(handle, priority);
579 }
580
CancelRequest(const GroupId & group_id,ClientSocketHandle * handle,bool cancel_connect_job)581 void TransportClientSocketPool::CancelRequest(const GroupId& group_id,
582 ClientSocketHandle* handle,
583 bool cancel_connect_job) {
584 auto callback_it = pending_callback_map_.find(handle);
585 if (callback_it != pending_callback_map_.end()) {
586 int result = callback_it->second.result;
587 pending_callback_map_.erase(callback_it);
588 std::unique_ptr<StreamSocket> socket = handle->PassSocket();
589 if (socket) {
590 if (result != OK) {
591 socket->Disconnect();
592 } else if (cancel_connect_job) {
593 // Close the socket if |cancel_connect_job| is true and there are no
594 // other pending requests.
595 Group* group = GetOrCreateGroup(group_id);
596 if (group->unbound_request_count() == 0)
597 socket->Disconnect();
598 }
599 ReleaseSocket(handle->group_id(), std::move(socket),
600 handle->group_generation());
601 }
602 return;
603 }
604
605 CHECK(base::Contains(group_map_, group_id));
606 Group* group = GetOrCreateGroup(group_id);
607
608 std::unique_ptr<Request> request = group->FindAndRemoveBoundRequest(handle);
609 if (request) {
610 --connecting_socket_count_;
611 OnAvailableSocketSlot(group_id, group);
612 CheckForStalledSocketGroups();
613 return;
614 }
615
616 // Search |unbound_requests_| for matching handle.
617 request = group->FindAndRemoveUnboundRequest(handle);
618 if (request) {
619 request->net_log().AddEvent(NetLogEventType::CANCELLED);
620 request->net_log().EndEvent(NetLogEventType::SOCKET_POOL);
621
622 // Let the job run, unless |cancel_connect_job| is true, or we're at the
623 // socket limit and there are no other requests waiting on the job.
624 bool reached_limit = ReachedMaxSocketsLimit();
625 if (group->jobs().size() > group->unbound_request_count() &&
626 (cancel_connect_job || reached_limit)) {
627 RemoveConnectJob(group->jobs().begin()->get(), group);
628 if (group->IsEmpty())
629 RemoveGroup(group->group_id());
630 if (reached_limit)
631 CheckForStalledSocketGroups();
632 }
633 }
634 }
635
CloseIdleSockets(const char * net_log_reason_utf8)636 void TransportClientSocketPool::CloseIdleSockets(
637 const char* net_log_reason_utf8) {
638 CleanupIdleSockets(true, net_log_reason_utf8);
639 DCHECK_EQ(0, idle_socket_count_);
640 }
641
CloseIdleSocketsInGroup(const GroupId & group_id,const char * net_log_reason_utf8)642 void TransportClientSocketPool::CloseIdleSocketsInGroup(
643 const GroupId& group_id,
644 const char* net_log_reason_utf8) {
645 if (idle_socket_count_ == 0)
646 return;
647 auto it = group_map_.find(group_id);
648 if (it == group_map_.end())
649 return;
650 CleanupIdleSocketsInGroup(true, it->second, base::TimeTicks::Now(),
651 net_log_reason_utf8);
652 if (it->second->IsEmpty())
653 RemoveGroup(it);
654 }
655
IdleSocketCount() const656 int TransportClientSocketPool::IdleSocketCount() const {
657 return idle_socket_count_;
658 }
659
IdleSocketCountInGroup(const GroupId & group_id) const660 size_t TransportClientSocketPool::IdleSocketCountInGroup(
661 const GroupId& group_id) const {
662 auto i = group_map_.find(group_id);
663 CHECK(i != group_map_.end());
664
665 return i->second->idle_sockets().size();
666 }
667
GetLoadState(const GroupId & group_id,const ClientSocketHandle * handle) const668 LoadState TransportClientSocketPool::GetLoadState(
669 const GroupId& group_id,
670 const ClientSocketHandle* handle) const {
671 if (base::Contains(pending_callback_map_, handle))
672 return LOAD_STATE_CONNECTING;
673
674 auto group_it = group_map_.find(group_id);
675 if (group_it == group_map_.end()) {
676 // TODO(mmenke): This is actually reached in the wild, for unknown reasons.
677 // Would be great to understand why, and if it's a bug, fix it. If not,
678 // should have a test for that case.
679 NOTREACHED();
680 return LOAD_STATE_IDLE;
681 }
682
683 const Group& group = *group_it->second;
684 ConnectJob* job = group.GetConnectJobForHandle(handle);
685 if (job)
686 return job->GetLoadState();
687
688 if (group.CanUseAdditionalSocketSlot(max_sockets_per_group_))
689 return LOAD_STATE_WAITING_FOR_STALLED_SOCKET_POOL;
690 return LOAD_STATE_WAITING_FOR_AVAILABLE_SOCKET;
691 }
692
GetInfoAsValue(const std::string & name,const std::string & type) const693 base::Value TransportClientSocketPool::GetInfoAsValue(
694 const std::string& name,
695 const std::string& type) const {
696 // TODO(mmenke): This currently doesn't return bound Requests or ConnectJobs.
697 auto dict = base::Value::Dict()
698 .Set("name", name)
699 .Set("type", type)
700 .Set("handed_out_socket_count", handed_out_socket_count_)
701 .Set("connecting_socket_count", connecting_socket_count_)
702 .Set("idle_socket_count", idle_socket_count_)
703 .Set("max_socket_count", max_sockets_)
704 .Set("max_sockets_per_group", max_sockets_per_group_);
705
706 if (group_map_.empty())
707 return base::Value(std::move(dict));
708
709 base::Value::Dict all_groups_dict;
710 for (const auto& entry : group_map_) {
711 const Group* group = entry.second;
712
713 base::Value::List idle_socket_list;
714 for (const auto& idle_socket : group->idle_sockets()) {
715 int source_id = idle_socket.socket->NetLog().source().id;
716 idle_socket_list.Append(source_id);
717 }
718
719 base::Value::List connect_jobs_list;
720 for (const auto& job : group->jobs()) {
721 int source_id = job->net_log().source().id;
722 connect_jobs_list.Append(source_id);
723 }
724
725 auto group_dict =
726 base::Value::Dict()
727 .Set("pending_request_count",
728 static_cast<int>(group->unbound_request_count()))
729 .Set("active_socket_count", group->active_socket_count())
730 .Set("idle_sockets", std::move(idle_socket_list))
731 .Set("connect_jobs", std::move(connect_jobs_list))
732 .Set("is_stalled",
733 group->CanUseAdditionalSocketSlot(max_sockets_per_group_))
734 .Set("backup_job_timer_is_running",
735 group->BackupJobTimerIsRunning());
736
737 if (group->has_unbound_requests()) {
738 group_dict.Set("top_pending_priority",
739 RequestPriorityToString(group->TopPendingPriority()));
740 }
741
742 all_groups_dict.Set(entry.first.ToString(), std::move(group_dict));
743 }
744 dict.Set("groups", std::move(all_groups_dict));
745 return base::Value(std::move(dict));
746 }
747
HasActiveSocket(const GroupId & group_id) const748 bool TransportClientSocketPool::HasActiveSocket(const GroupId& group_id) const {
749 return HasGroup(group_id);
750 }
751
IsUsable(const char ** net_log_reason_utf8) const752 bool TransportClientSocketPool::IdleSocket::IsUsable(
753 const char** net_log_reason_utf8) const {
754 DCHECK(net_log_reason_utf8);
755 if (socket->WasEverUsed()) {
756 if (!socket->IsConnectedAndIdle()) {
757 if (!socket->IsConnected()) {
758 *net_log_reason_utf8 = kRemoteSideClosedConnection;
759 } else {
760 *net_log_reason_utf8 = kDataReceivedUnexpectedly;
761 }
762 return false;
763 }
764 return true;
765 }
766
767 if (!socket->IsConnected()) {
768 *net_log_reason_utf8 = kRemoteSideClosedConnection;
769 return false;
770 }
771 return true;
772 }
773
TransportClientSocketPool(int max_sockets,int max_sockets_per_group,base::TimeDelta unused_idle_socket_timeout,base::TimeDelta used_idle_socket_timeout,const ProxyChain & proxy_chain,bool is_for_websockets,const CommonConnectJobParams * common_connect_job_params,bool cleanup_on_ip_address_change,std::unique_ptr<ConnectJobFactory> connect_job_factory,SSLClientContext * ssl_client_context,bool connect_backup_jobs_enabled)774 TransportClientSocketPool::TransportClientSocketPool(
775 int max_sockets,
776 int max_sockets_per_group,
777 base::TimeDelta unused_idle_socket_timeout,
778 base::TimeDelta used_idle_socket_timeout,
779 const ProxyChain& proxy_chain,
780 bool is_for_websockets,
781 const CommonConnectJobParams* common_connect_job_params,
782 bool cleanup_on_ip_address_change,
783 std::unique_ptr<ConnectJobFactory> connect_job_factory,
784 SSLClientContext* ssl_client_context,
785 bool connect_backup_jobs_enabled)
786 : ClientSocketPool(is_for_websockets,
787 common_connect_job_params,
788 std::move(connect_job_factory)),
789 max_sockets_(max_sockets),
790 max_sockets_per_group_(max_sockets_per_group),
791 unused_idle_socket_timeout_(unused_idle_socket_timeout),
792 used_idle_socket_timeout_(used_idle_socket_timeout),
793 proxy_chain_(proxy_chain),
794 cleanup_on_ip_address_change_(cleanup_on_ip_address_change),
795 connect_backup_jobs_enabled_(connect_backup_jobs_enabled &&
796 g_connect_backup_jobs_enabled),
797 ssl_client_context_(ssl_client_context) {
798 DCHECK_LE(0, max_sockets_per_group);
799 DCHECK_LE(max_sockets_per_group, max_sockets);
800
801 if (cleanup_on_ip_address_change_)
802 NetworkChangeNotifier::AddIPAddressObserver(this);
803
804 if (ssl_client_context_)
805 ssl_client_context_->AddObserver(this);
806 }
807
OnSSLConfigChanged(SSLClientContext::SSLConfigChangeType change_type)808 void TransportClientSocketPool::OnSSLConfigChanged(
809 SSLClientContext::SSLConfigChangeType change_type) {
810 const char* message = nullptr;
811 // When the SSL config or cert verifier config changes, flush all idle
812 // sockets so they won't get re-used, and allow any active sockets to finish,
813 // but don't put them back in the socket pool.
814 switch (change_type) {
815 case SSLClientContext::SSLConfigChangeType::kSSLConfigChanged:
816 message = kNetworkChanged;
817 break;
818 case SSLClientContext::SSLConfigChangeType::kCertDatabaseChanged:
819 message = kCertDatabaseChanged;
820 break;
821 case SSLClientContext::SSLConfigChangeType::kCertVerifierChanged:
822 message = kCertVerifierChanged;
823 break;
824 };
825
826 base::TimeTicks now = base::TimeTicks::Now();
827 for (auto it = group_map_.begin(); it != group_map_.end();) {
828 it = RefreshGroup(it, now, message);
829 }
830 CheckForStalledSocketGroups();
831 }
832
833 // TODO(crbug.com/1206799): Get `server` as SchemeHostPort?
OnSSLConfigForServersChanged(const base::flat_set<HostPortPair> & servers)834 void TransportClientSocketPool::OnSSLConfigForServersChanged(
835 const base::flat_set<HostPortPair>& servers) {
836 // Current time value. Retrieving it once at the function start rather than
837 // inside the inner loop, since it shouldn't change by any meaningful amount.
838 //
839 // TODO(davidben): This value is not actually needed because
840 // CleanupIdleSocketsInGroup() is called with |force| = true. Tidy up
841 // interfaces so the parameter is not necessary.
842 base::TimeTicks now = base::TimeTicks::Now();
843
844 // If the proxy chain includes a server from `servers` and uses SSL settings
845 // (HTTPS or QUIC), refresh every group.
846 bool proxy_matches = false;
847 for (const ProxyServer& proxy_server : proxy_chain_.proxy_servers()) {
848 if (proxy_server.is_secure_http_like() &&
849 servers.contains(proxy_server.host_port_pair())) {
850 proxy_matches = true;
851 }
852 }
853
854 bool refreshed_any = false;
855 for (auto it = group_map_.begin(); it != group_map_.end();) {
856 if (proxy_matches ||
857 (GURL::SchemeIsCryptographic(it->first.destination().scheme()) &&
858 servers.contains(
859 HostPortPair::FromSchemeHostPort(it->first.destination())))) {
860 refreshed_any = true;
861 // Note this call may destroy the group and invalidate |to_refresh|.
862 it = RefreshGroup(it, now, kSslConfigChanged);
863 } else {
864 ++it;
865 }
866 }
867
868 if (refreshed_any) {
869 // Check to see if any group can use the freed up socket slots. It would be
870 // more efficient to give the slots to the refreshed groups, if the still
871 // exists and need them, but this should be rare enough that it doesn't
872 // matter. This will also make sure the slots are given to the group with
873 // the highest priority request without an assigned ConnectJob.
874 CheckForStalledSocketGroups();
875 }
876 }
877
HasGroup(const GroupId & group_id) const878 bool TransportClientSocketPool::HasGroup(const GroupId& group_id) const {
879 return base::Contains(group_map_, group_id);
880 }
881
CleanupIdleSockets(bool force,const char * net_log_reason_utf8)882 void TransportClientSocketPool::CleanupIdleSockets(
883 bool force,
884 const char* net_log_reason_utf8) {
885 if (idle_socket_count_ == 0)
886 return;
887
888 // Current time value. Retrieving it once at the function start rather than
889 // inside the inner loop, since it shouldn't change by any meaningful amount.
890 base::TimeTicks now = base::TimeTicks::Now();
891
892 for (auto i = group_map_.begin(); i != group_map_.end();) {
893 Group* group = i->second;
894 CHECK(group);
895 CleanupIdleSocketsInGroup(force, group, now, net_log_reason_utf8);
896 // Delete group if no longer needed.
897 if (group->IsEmpty()) {
898 i = RemoveGroup(i);
899 } else {
900 ++i;
901 }
902 }
903 }
904
CloseOneIdleSocket()905 bool TransportClientSocketPool::CloseOneIdleSocket() {
906 if (idle_socket_count_ == 0)
907 return false;
908 return CloseOneIdleSocketExceptInGroup(nullptr);
909 }
910
CloseOneIdleConnectionInHigherLayeredPool()911 bool TransportClientSocketPool::CloseOneIdleConnectionInHigherLayeredPool() {
912 // This pool doesn't have any idle sockets. It's possible that a pool at a
913 // higher layer is holding one of this sockets active, but it's actually idle.
914 // Query the higher layers.
915 for (HigherLayeredPool* higher_pool : higher_pools_) {
916 if (higher_pool->CloseOneIdleConnection())
917 return true;
918 }
919 return false;
920 }
921
CleanupIdleSocketsInGroup(bool force,Group * group,const base::TimeTicks & now,const char * net_log_reason_utf8)922 void TransportClientSocketPool::CleanupIdleSocketsInGroup(
923 bool force,
924 Group* group,
925 const base::TimeTicks& now,
926 const char* net_log_reason_utf8) {
927 // If |force| is true, a reason must be provided.
928 DCHECK(!force || net_log_reason_utf8);
929
930 auto idle_socket_it = group->mutable_idle_sockets()->begin();
931 while (idle_socket_it != group->idle_sockets().end()) {
932 bool should_clean_up = force;
933 const char* reason_for_closing_socket = net_log_reason_utf8;
934 base::TimeDelta timeout = idle_socket_it->socket->WasEverUsed()
935 ? used_idle_socket_timeout_
936 : unused_idle_socket_timeout_;
937
938 // Timeout errors take precedence over the reason for flushing sockets in
939 // the group, if applicable.
940 if (now - idle_socket_it->start_time >= timeout) {
941 should_clean_up = true;
942 reason_for_closing_socket = kIdleTimeLimitExpired;
943 }
944
945 // Usability errors take precedence over over other errors.
946 if (!idle_socket_it->IsUsable(&reason_for_closing_socket))
947 should_clean_up = true;
948
949 if (should_clean_up) {
950 DCHECK(reason_for_closing_socket);
951 idle_socket_it->socket->NetLog().AddEventWithStringParams(
952 NetLogEventType::SOCKET_POOL_CLOSING_SOCKET, "reason",
953 reason_for_closing_socket);
954 idle_socket_it = group->mutable_idle_sockets()->erase(idle_socket_it);
955 DecrementIdleCount();
956 } else {
957 DCHECK(!reason_for_closing_socket);
958 ++idle_socket_it;
959 }
960 }
961 }
962
GetOrCreateGroup(const GroupId & group_id)963 TransportClientSocketPool::Group* TransportClientSocketPool::GetOrCreateGroup(
964 const GroupId& group_id) {
965 auto it = group_map_.find(group_id);
966 if (it != group_map_.end())
967 return it->second;
968 Group* group = new Group(group_id, this);
969 group_map_[group_id] = group;
970 return group;
971 }
972
RemoveGroup(const GroupId & group_id)973 void TransportClientSocketPool::RemoveGroup(const GroupId& group_id) {
974 auto it = group_map_.find(group_id);
975 CHECK(it != group_map_.end());
976
977 RemoveGroup(it);
978 }
979
980 TransportClientSocketPool::GroupMap::iterator
RemoveGroup(GroupMap::iterator it)981 TransportClientSocketPool::RemoveGroup(GroupMap::iterator it) {
982 delete it->second;
983 return group_map_.erase(it);
984 }
985
986 // static
connect_backup_jobs_enabled()987 bool TransportClientSocketPool::connect_backup_jobs_enabled() {
988 return g_connect_backup_jobs_enabled;
989 }
990
991 // static
set_connect_backup_jobs_enabled(bool enabled)992 bool TransportClientSocketPool::set_connect_backup_jobs_enabled(bool enabled) {
993 bool old_value = g_connect_backup_jobs_enabled;
994 g_connect_backup_jobs_enabled = enabled;
995 return old_value;
996 }
997
IncrementIdleCount()998 void TransportClientSocketPool::IncrementIdleCount() {
999 ++idle_socket_count_;
1000 }
1001
DecrementIdleCount()1002 void TransportClientSocketPool::DecrementIdleCount() {
1003 --idle_socket_count_;
1004 }
1005
ReleaseSocket(const GroupId & group_id,std::unique_ptr<StreamSocket> socket,int64_t group_generation)1006 void TransportClientSocketPool::ReleaseSocket(
1007 const GroupId& group_id,
1008 std::unique_ptr<StreamSocket> socket,
1009 int64_t group_generation) {
1010 auto i = group_map_.find(group_id);
1011 CHECK(i != group_map_.end());
1012
1013 Group* group = i->second;
1014 CHECK(group);
1015
1016 CHECK_GT(handed_out_socket_count_, 0);
1017 handed_out_socket_count_--;
1018
1019 CHECK_GT(group->active_socket_count(), 0);
1020 group->DecrementActiveSocketCount();
1021
1022 bool can_resuse_socket = false;
1023 std::string_view not_reusable_reason;
1024 if (!socket->IsConnectedAndIdle()) {
1025 if (!socket->IsConnected()) {
1026 not_reusable_reason = kClosedConnectionReturnedToPool;
1027 } else {
1028 not_reusable_reason = kDataReceivedUnexpectedly;
1029 }
1030 } else if (group_generation != group->generation()) {
1031 not_reusable_reason = kSocketGenerationOutOfDate;
1032 } else {
1033 can_resuse_socket = true;
1034 }
1035
1036 if (can_resuse_socket) {
1037 DCHECK(not_reusable_reason.empty());
1038
1039 // Add it to the idle list.
1040 AddIdleSocket(std::move(socket), group);
1041 OnAvailableSocketSlot(group_id, group);
1042 } else {
1043 DCHECK(!not_reusable_reason.empty());
1044
1045 socket->NetLog().AddEventWithStringParams(
1046 NetLogEventType::SOCKET_POOL_CLOSING_SOCKET, "reason",
1047 not_reusable_reason);
1048 if (group->IsEmpty())
1049 RemoveGroup(i);
1050 socket.reset();
1051 }
1052
1053 CheckForStalledSocketGroups();
1054 }
1055
CheckForStalledSocketGroups()1056 void TransportClientSocketPool::CheckForStalledSocketGroups() {
1057 // Loop until there's nothing more to do.
1058 while (true) {
1059 // If we have idle sockets, see if we can give one to the top-stalled group.
1060 GroupId top_group_id;
1061 Group* top_group = nullptr;
1062 if (!FindTopStalledGroup(&top_group, &top_group_id))
1063 return;
1064
1065 if (ReachedMaxSocketsLimit()) {
1066 if (idle_socket_count_ > 0) {
1067 CloseOneIdleSocket();
1068 } else {
1069 // We can't activate more sockets since we're already at our global
1070 // limit.
1071 return;
1072 }
1073 }
1074
1075 // Note that this may delete top_group.
1076 OnAvailableSocketSlot(top_group_id, top_group);
1077 }
1078 }
1079
1080 // Search for the highest priority pending request, amongst the groups that
1081 // are not at the |max_sockets_per_group_| limit. Note: for requests with
1082 // the same priority, the winner is based on group hash ordering (and not
1083 // insertion order).
FindTopStalledGroup(Group ** group,GroupId * group_id) const1084 bool TransportClientSocketPool::FindTopStalledGroup(Group** group,
1085 GroupId* group_id) const {
1086 CHECK((group && group_id) || (!group && !group_id));
1087 Group* top_group = nullptr;
1088 const GroupId* top_group_id = nullptr;
1089 bool has_stalled_group = false;
1090 for (const auto& it : group_map_) {
1091 Group* curr_group = it.second;
1092 if (!curr_group->has_unbound_requests())
1093 continue;
1094 if (curr_group->CanUseAdditionalSocketSlot(max_sockets_per_group_)) {
1095 if (!group)
1096 return true;
1097 has_stalled_group = true;
1098 bool has_higher_priority =
1099 !top_group ||
1100 curr_group->TopPendingPriority() > top_group->TopPendingPriority();
1101 if (has_higher_priority) {
1102 top_group = curr_group;
1103 top_group_id = &it.first;
1104 }
1105 }
1106 }
1107
1108 if (top_group) {
1109 CHECK(group);
1110 *group = top_group;
1111 *group_id = *top_group_id;
1112 } else {
1113 CHECK(!has_stalled_group);
1114 }
1115 return has_stalled_group;
1116 }
1117
OnIPAddressChanged()1118 void TransportClientSocketPool::OnIPAddressChanged() {
1119 DCHECK(cleanup_on_ip_address_change_);
1120 FlushWithError(ERR_NETWORK_CHANGED, kNetworkChanged);
1121 }
1122
FlushWithError(int error,const char * net_log_reason_utf8)1123 void TransportClientSocketPool::FlushWithError(
1124 int error,
1125 const char* net_log_reason_utf8) {
1126 CancelAllConnectJobs();
1127 CloseIdleSockets(net_log_reason_utf8);
1128 CancelAllRequestsWithError(error);
1129 for (const auto& group : group_map_) {
1130 group.second->IncrementGeneration();
1131 }
1132 }
1133
RemoveConnectJob(ConnectJob * job,Group * group)1134 void TransportClientSocketPool::RemoveConnectJob(ConnectJob* job,
1135 Group* group) {
1136 CHECK_GT(connecting_socket_count_, 0);
1137 connecting_socket_count_--;
1138
1139 DCHECK(group);
1140 group->RemoveUnboundJob(job);
1141 }
1142
OnAvailableSocketSlot(const GroupId & group_id,Group * group)1143 void TransportClientSocketPool::OnAvailableSocketSlot(const GroupId& group_id,
1144 Group* group) {
1145 DCHECK(base::Contains(group_map_, group_id));
1146 if (group->IsEmpty()) {
1147 RemoveGroup(group_id);
1148 } else if (group->has_unbound_requests()) {
1149 ProcessPendingRequest(group_id, group);
1150 }
1151 }
1152
ProcessPendingRequest(const GroupId & group_id,Group * group)1153 void TransportClientSocketPool::ProcessPendingRequest(const GroupId& group_id,
1154 Group* group) {
1155 const Request* next_request = group->GetNextUnboundRequest();
1156 DCHECK(next_request);
1157
1158 // If the group has no idle sockets, and can't make use of an additional slot,
1159 // either because it's at the limit or because it's at the socket per group
1160 // limit, then there's nothing to do.
1161 if (group->idle_sockets().empty() &&
1162 !group->CanUseAdditionalSocketSlot(max_sockets_per_group_)) {
1163 return;
1164 }
1165
1166 int rv =
1167 RequestSocketInternal(group_id, *next_request,
1168 /*preconnect_done_closure=*/base::OnceClosure());
1169 if (rv != ERR_IO_PENDING) {
1170 std::unique_ptr<Request> request = group->PopNextUnboundRequest();
1171 DCHECK(request);
1172 if (group->IsEmpty())
1173 RemoveGroup(group_id);
1174
1175 request->net_log().EndEventWithNetErrorCode(NetLogEventType::SOCKET_POOL,
1176 rv);
1177 InvokeUserCallbackLater(request->handle(), request->release_callback(), rv,
1178 request->socket_tag());
1179 }
1180 }
1181
HandOutSocket(std::unique_ptr<StreamSocket> socket,ClientSocketHandle::SocketReuseType reuse_type,const LoadTimingInfo::ConnectTiming & connect_timing,ClientSocketHandle * handle,base::TimeDelta idle_time,Group * group,const NetLogWithSource & net_log)1182 void TransportClientSocketPool::HandOutSocket(
1183 std::unique_ptr<StreamSocket> socket,
1184 ClientSocketHandle::SocketReuseType reuse_type,
1185 const LoadTimingInfo::ConnectTiming& connect_timing,
1186 ClientSocketHandle* handle,
1187 base::TimeDelta idle_time,
1188 Group* group,
1189 const NetLogWithSource& net_log) {
1190 DCHECK(socket);
1191 handle->SetSocket(std::move(socket));
1192 handle->set_reuse_type(reuse_type);
1193 handle->set_idle_time(idle_time);
1194 handle->set_group_generation(group->generation());
1195 handle->set_connect_timing(connect_timing);
1196
1197 if (reuse_type == ClientSocketHandle::REUSED_IDLE) {
1198 net_log.AddEventWithIntParams(
1199 NetLogEventType::SOCKET_POOL_REUSED_AN_EXISTING_SOCKET, "idle_ms",
1200 static_cast<int>(idle_time.InMilliseconds()));
1201 }
1202
1203 net_log.AddEventReferencingSource(
1204 NetLogEventType::SOCKET_POOL_BOUND_TO_SOCKET,
1205 handle->socket()->NetLog().source());
1206
1207 handed_out_socket_count_++;
1208 group->IncrementActiveSocketCount();
1209 }
1210
AddIdleSocket(std::unique_ptr<StreamSocket> socket,Group * group)1211 void TransportClientSocketPool::AddIdleSocket(
1212 std::unique_ptr<StreamSocket> socket,
1213 Group* group) {
1214 DCHECK(socket);
1215 IdleSocket idle_socket;
1216 idle_socket.socket = std::move(socket);
1217 idle_socket.start_time = base::TimeTicks::Now();
1218
1219 group->mutable_idle_sockets()->push_back(std::move(idle_socket));
1220 IncrementIdleCount();
1221 }
1222
CancelAllConnectJobs()1223 void TransportClientSocketPool::CancelAllConnectJobs() {
1224 for (auto i = group_map_.begin(); i != group_map_.end();) {
1225 Group* group = i->second;
1226 CHECK(group);
1227 connecting_socket_count_ -= group->jobs().size();
1228 group->RemoveAllUnboundJobs();
1229
1230 // Delete group if no longer needed.
1231 if (group->IsEmpty()) {
1232 i = RemoveGroup(i);
1233 } else {
1234 ++i;
1235 }
1236 }
1237 }
1238
CancelAllRequestsWithError(int error)1239 void TransportClientSocketPool::CancelAllRequestsWithError(int error) {
1240 for (auto i = group_map_.begin(); i != group_map_.end();) {
1241 Group* group = i->second;
1242 CHECK(group);
1243
1244 while (true) {
1245 std::unique_ptr<Request> request = group->PopNextUnboundRequest();
1246 if (!request)
1247 break;
1248 InvokeUserCallbackLater(request->handle(), request->release_callback(),
1249 error, request->socket_tag());
1250 }
1251
1252 // Mark bound connect jobs as needing to fail. Can't fail them immediately
1253 // because they may have access to objects owned by the ConnectJob, and
1254 // could access them if a user callback invocation is queued. It would also
1255 // result in the consumer handling two messages at once, which in general
1256 // isn't safe for a lot of code.
1257 group->SetPendingErrorForAllBoundRequests(error);
1258
1259 // Delete group if no longer needed.
1260 if (group->IsEmpty()) {
1261 i = RemoveGroup(i);
1262 } else {
1263 ++i;
1264 }
1265 }
1266 }
1267
ReachedMaxSocketsLimit() const1268 bool TransportClientSocketPool::ReachedMaxSocketsLimit() const {
1269 // Each connecting socket will eventually connect and be handed out.
1270 int total =
1271 handed_out_socket_count_ + connecting_socket_count_ + idle_socket_count_;
1272 // There can be more sockets than the limit since some requests can ignore
1273 // the limit
1274 if (total < max_sockets_)
1275 return false;
1276 return true;
1277 }
1278
CloseOneIdleSocketExceptInGroup(const Group * exception_group)1279 bool TransportClientSocketPool::CloseOneIdleSocketExceptInGroup(
1280 const Group* exception_group) {
1281 CHECK_GT(idle_socket_count_, 0);
1282
1283 for (auto i = group_map_.begin(); i != group_map_.end(); ++i) {
1284 Group* group = i->second;
1285 CHECK(group);
1286 if (exception_group == group)
1287 continue;
1288 std::list<IdleSocket>* idle_sockets = group->mutable_idle_sockets();
1289
1290 if (!idle_sockets->empty()) {
1291 idle_sockets->pop_front();
1292 DecrementIdleCount();
1293 if (group->IsEmpty())
1294 RemoveGroup(i);
1295
1296 return true;
1297 }
1298 }
1299
1300 return false;
1301 }
1302
OnConnectJobComplete(Group * group,int result,ConnectJob * job)1303 void TransportClientSocketPool::OnConnectJobComplete(Group* group,
1304 int result,
1305 ConnectJob* job) {
1306 DCHECK_NE(ERR_IO_PENDING, result);
1307 DCHECK(group_map_.find(group->group_id()) != group_map_.end());
1308 DCHECK_EQ(group, group_map_[group->group_id()]);
1309 DCHECK(result != OK || job->socket() != nullptr);
1310
1311 // Check if the ConnectJob is already bound to a Request. If so, result is
1312 // returned to that specific request.
1313 std::optional<Group::BoundRequest> bound_request =
1314 group->FindAndRemoveBoundRequestForConnectJob(job);
1315 Request* request = nullptr;
1316 std::unique_ptr<Request> owned_request;
1317 if (bound_request) {
1318 --connecting_socket_count_;
1319
1320 // If the socket pools were previously flushed with an error, return that
1321 // error to the bound request and discard the socket.
1322 if (bound_request->pending_error != OK) {
1323 InvokeUserCallbackLater(bound_request->request->handle(),
1324 bound_request->request->release_callback(),
1325 bound_request->pending_error,
1326 bound_request->request->socket_tag());
1327 bound_request->request->net_log().EndEventWithNetErrorCode(
1328 NetLogEventType::SOCKET_POOL, bound_request->pending_error);
1329 OnAvailableSocketSlot(group->group_id(), group);
1330 CheckForStalledSocketGroups();
1331 return;
1332 }
1333
1334 // If the ConnectJob is from a previous generation, add the request back to
1335 // the group, and kick off another request. The socket will be discarded.
1336 if (bound_request->generation != group->generation()) {
1337 group->InsertUnboundRequest(std::move(bound_request->request));
1338 OnAvailableSocketSlot(group->group_id(), group);
1339 CheckForStalledSocketGroups();
1340 return;
1341 }
1342
1343 request = bound_request->request.get();
1344 } else {
1345 // In this case, RemoveConnectJob(job, _) must be called before exiting this
1346 // method. Otherwise, |job| will be leaked.
1347 owned_request = group->PopNextUnboundRequest();
1348 request = owned_request.get();
1349
1350 if (!request) {
1351 if (result == OK)
1352 AddIdleSocket(job->PassSocket(), group);
1353 RemoveConnectJob(job, group);
1354 OnAvailableSocketSlot(group->group_id(), group);
1355 CheckForStalledSocketGroups();
1356 return;
1357 }
1358
1359 LogBoundConnectJobToRequest(job->net_log().source(), *request);
1360 }
1361
1362 // The case where there's no request is handled above.
1363 DCHECK(request);
1364
1365 if (result != OK)
1366 request->handle()->SetAdditionalErrorState(job);
1367 if (job->socket()) {
1368 HandOutSocket(job->PassSocket(), ClientSocketHandle::UNUSED,
1369 job->connect_timing(), request->handle(), base::TimeDelta(),
1370 group, request->net_log());
1371 }
1372 request->net_log().EndEventWithNetErrorCode(NetLogEventType::SOCKET_POOL,
1373 result);
1374 InvokeUserCallbackLater(request->handle(), request->release_callback(),
1375 result, request->socket_tag());
1376 if (!bound_request)
1377 RemoveConnectJob(job, group);
1378 // If no socket was handed out, there's a new socket slot available.
1379 if (!request->handle()->socket()) {
1380 OnAvailableSocketSlot(group->group_id(), group);
1381 CheckForStalledSocketGroups();
1382 }
1383 }
1384
OnNeedsProxyAuth(Group * group,const HttpResponseInfo & response,HttpAuthController * auth_controller,base::OnceClosure restart_with_auth_callback,ConnectJob * job)1385 void TransportClientSocketPool::OnNeedsProxyAuth(
1386 Group* group,
1387 const HttpResponseInfo& response,
1388 HttpAuthController* auth_controller,
1389 base::OnceClosure restart_with_auth_callback,
1390 ConnectJob* job) {
1391 DCHECK(group_map_.find(group->group_id()) != group_map_.end());
1392 DCHECK_EQ(group, group_map_[group->group_id()]);
1393
1394 const Request* request = group->BindRequestToConnectJob(job);
1395 // If can't bind the ConnectJob to a request, treat this as a ConnectJob
1396 // failure.
1397 if (!request) {
1398 OnConnectJobComplete(group, ERR_PROXY_AUTH_REQUESTED, job);
1399 return;
1400 }
1401
1402 request->proxy_auth_callback().Run(response, auth_controller,
1403 std::move(restart_with_auth_callback));
1404 }
1405
InvokeUserCallbackLater(ClientSocketHandle * handle,CompletionOnceCallback callback,int rv,const SocketTag & socket_tag)1406 void TransportClientSocketPool::InvokeUserCallbackLater(
1407 ClientSocketHandle* handle,
1408 CompletionOnceCallback callback,
1409 int rv,
1410 const SocketTag& socket_tag) {
1411 CHECK(!base::Contains(pending_callback_map_, handle));
1412 pending_callback_map_[handle] = CallbackResultPair(std::move(callback), rv);
1413 if (rv == OK) {
1414 handle->socket()->ApplySocketTag(socket_tag);
1415 }
1416 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
1417 FROM_HERE, base::BindOnce(&TransportClientSocketPool::InvokeUserCallback,
1418 weak_factory_.GetWeakPtr(),
1419 // This is safe as `handle` is checked against a
1420 // map to verify it's alive before dereference.
1421 // This code path must only be reachable by
1422 // `handle`s that have had Init called.
1423 base::UnsafeDangling(handle)));
1424 }
1425
InvokeUserCallback(MayBeDangling<ClientSocketHandle> handle)1426 void TransportClientSocketPool::InvokeUserCallback(
1427 MayBeDangling<ClientSocketHandle> handle) {
1428 auto it = pending_callback_map_.find(handle);
1429
1430 // Exit if the request has already been cancelled.
1431 if (it == pending_callback_map_.end())
1432 return;
1433
1434 CHECK(!handle->is_initialized());
1435 CompletionOnceCallback callback = std::move(it->second.callback);
1436 int result = it->second.result;
1437 pending_callback_map_.erase(it);
1438 std::move(callback).Run(result);
1439 }
1440
TryToCloseSocketsInLayeredPools()1441 void TransportClientSocketPool::TryToCloseSocketsInLayeredPools() {
1442 while (IsStalled()) {
1443 // Closing a socket will result in calling back into |this| to use the freed
1444 // socket slot, so nothing else is needed.
1445 if (!CloseOneIdleConnectionInHigherLayeredPool())
1446 return;
1447 }
1448 }
1449
1450 TransportClientSocketPool::GroupMap::iterator
RefreshGroup(GroupMap::iterator it,const base::TimeTicks & now,const char * net_log_reason_utf8)1451 TransportClientSocketPool::RefreshGroup(GroupMap::iterator it,
1452 const base::TimeTicks& now,
1453 const char* net_log_reason_utf8) {
1454 Group* group = it->second;
1455 CHECK(group);
1456 CleanupIdleSocketsInGroup(true /* force */, group, now, net_log_reason_utf8);
1457
1458 connecting_socket_count_ -= group->jobs().size();
1459 group->RemoveAllUnboundJobs();
1460
1461 // Otherwise, prevent reuse of existing sockets.
1462 group->IncrementGeneration();
1463
1464 // Delete group if no longer needed.
1465 if (group->IsEmpty()) {
1466 return RemoveGroup(it);
1467 }
1468 return ++it;
1469 }
1470
Group(const GroupId & group_id,TransportClientSocketPool * client_socket_pool)1471 TransportClientSocketPool::Group::Group(
1472 const GroupId& group_id,
1473 TransportClientSocketPool* client_socket_pool)
1474 : group_id_(group_id),
1475 client_socket_pool_(client_socket_pool),
1476 unbound_requests_(NUM_PRIORITIES) {}
1477
~Group()1478 TransportClientSocketPool::Group::~Group() {
1479 DCHECK_EQ(0u, never_assigned_job_count());
1480 DCHECK_EQ(0u, unassigned_job_count());
1481 DCHECK(unbound_requests_.empty());
1482 DCHECK(jobs_.empty());
1483 DCHECK(bound_requests_.empty());
1484 }
1485
OnConnectJobComplete(int result,ConnectJob * job)1486 void TransportClientSocketPool::Group::OnConnectJobComplete(int result,
1487 ConnectJob* job) {
1488 DCHECK_NE(ERR_IO_PENDING, result);
1489 client_socket_pool_->OnConnectJobComplete(this, result, job);
1490 }
1491
OnNeedsProxyAuth(const HttpResponseInfo & response,HttpAuthController * auth_controller,base::OnceClosure restart_with_auth_callback,ConnectJob * job)1492 void TransportClientSocketPool::Group::OnNeedsProxyAuth(
1493 const HttpResponseInfo& response,
1494 HttpAuthController* auth_controller,
1495 base::OnceClosure restart_with_auth_callback,
1496 ConnectJob* job) {
1497 client_socket_pool_->OnNeedsProxyAuth(this, response, auth_controller,
1498 std::move(restart_with_auth_callback),
1499 job);
1500 }
1501
StartBackupJobTimer(const GroupId & group_id)1502 void TransportClientSocketPool::Group::StartBackupJobTimer(
1503 const GroupId& group_id) {
1504 // Only allow one timer to run at a time.
1505 if (BackupJobTimerIsRunning())
1506 return;
1507
1508 // Unretained here is okay because |backup_job_timer_| is
1509 // automatically cancelled when it's destroyed.
1510 backup_job_timer_.Start(FROM_HERE,
1511 client_socket_pool_->ConnectRetryInterval(),
1512 base::BindOnce(&Group::OnBackupJobTimerFired,
1513 base::Unretained(this), group_id));
1514 }
1515
BackupJobTimerIsRunning() const1516 bool TransportClientSocketPool::Group::BackupJobTimerIsRunning() const {
1517 return backup_job_timer_.IsRunning();
1518 }
1519
TryToUseNeverAssignedConnectJob()1520 bool TransportClientSocketPool::Group::TryToUseNeverAssignedConnectJob() {
1521 SanityCheck();
1522
1523 if (never_assigned_job_count_ == 0)
1524 return false;
1525 --never_assigned_job_count_;
1526 return true;
1527 }
1528
AddJob(std::unique_ptr<ConnectJob> job,bool is_preconnect)1529 void TransportClientSocketPool::Group::AddJob(std::unique_ptr<ConnectJob> job,
1530 bool is_preconnect) {
1531 SanityCheck();
1532
1533 if (is_preconnect)
1534 ++never_assigned_job_count_;
1535 jobs_.push_back(std::move(job));
1536 TryToAssignUnassignedJob(jobs_.back().get());
1537
1538 SanityCheck();
1539 }
1540
RemoveUnboundJob(ConnectJob * job)1541 std::unique_ptr<ConnectJob> TransportClientSocketPool::Group::RemoveUnboundJob(
1542 ConnectJob* job) {
1543 SanityCheck();
1544
1545 // Check that |job| is in the list.
1546 auto it = base::ranges::find(jobs_, job, &std::unique_ptr<ConnectJob>::get);
1547 DCHECK(it != jobs_.end());
1548
1549 // Check if |job| is in the unassigned jobs list. If so, remove it.
1550 auto it2 = base::ranges::find(unassigned_jobs_, job);
1551 if (it2 != unassigned_jobs_.end()) {
1552 unassigned_jobs_.erase(it2);
1553 } else {
1554 // Otherwise, |job| must be assigned to some Request. Unassign it, then
1555 // try to replace it with another job if possible (either by taking an
1556 // unassigned job or stealing from another request, if any requests after it
1557 // have a job).
1558 RequestQueue::Pointer request_with_job = FindUnboundRequestWithJob(job);
1559 DCHECK(!request_with_job.is_null());
1560 request_with_job.value()->ReleaseJob();
1561 TryToAssignJobToRequest(request_with_job);
1562 }
1563 std::unique_ptr<ConnectJob> owned_job = std::move(*it);
1564 jobs_.erase(it);
1565
1566 size_t job_count = jobs_.size();
1567 if (job_count < never_assigned_job_count_)
1568 never_assigned_job_count_ = job_count;
1569
1570 // If we've got no more jobs for this group, then we no longer need a
1571 // backup job either.
1572 if (jobs_.empty()) {
1573 DCHECK(unassigned_jobs_.empty());
1574 backup_job_timer_.Stop();
1575 }
1576
1577 SanityCheck();
1578 return owned_job;
1579 }
1580
OnBackupJobTimerFired(const GroupId & group_id)1581 void TransportClientSocketPool::Group::OnBackupJobTimerFired(
1582 const GroupId& group_id) {
1583 // If there are no more jobs pending, there is no work to do.
1584 // If we've done our cleanups correctly, this should not happen.
1585 if (jobs_.empty()) {
1586 NOTREACHED();
1587 return;
1588 }
1589
1590 // If the old job has already established a connection, don't start a backup
1591 // job. Backup jobs are only for issues establishing the initial TCP
1592 // connection - the timeout they used is tuned for that, and tests expect that
1593 // behavior.
1594 //
1595 // TODO(https://crbug.com/929814): Replace both this and the
1596 // LOAD_STATE_RESOLVING_HOST check with a callback. Use the
1597 // LOAD_STATE_RESOLVING_HOST callback to start the timer (And invoke the
1598 // OnHostResolved callback of any pending requests), and the
1599 // HasEstablishedConnection() callback to stop the timer. That should result
1600 // in a more robust, testable API.
1601 if ((*jobs_.begin())->HasEstablishedConnection())
1602 return;
1603
1604 // If our old job is waiting on DNS, or if we can't create any sockets
1605 // right now due to limits, just reset the timer.
1606 if (client_socket_pool_->ReachedMaxSocketsLimit() ||
1607 !HasAvailableSocketSlot(client_socket_pool_->max_sockets_per_group_) ||
1608 (*jobs_.begin())->GetLoadState() == LOAD_STATE_RESOLVING_HOST) {
1609 StartBackupJobTimer(group_id);
1610 return;
1611 }
1612
1613 if (unbound_requests_.empty())
1614 return;
1615
1616 Request* request = unbound_requests_.FirstMax().value().get();
1617 std::unique_ptr<ConnectJob> owned_backup_job =
1618 client_socket_pool_->CreateConnectJob(
1619 group_id, request->socket_params(), client_socket_pool_->proxy_chain_,
1620 request->proxy_annotation_tag(), request->priority(),
1621 request->socket_tag(), this);
1622 owned_backup_job->net_log().AddEvent(
1623 NetLogEventType::SOCKET_POOL_CONNECT_JOB_CREATED, [&] {
1624 return NetLogCreateConnectJobParams(true /* backup_job */, &group_id_);
1625 });
1626 ConnectJob* backup_job = owned_backup_job.get();
1627 AddJob(std::move(owned_backup_job), false);
1628 client_socket_pool_->connecting_socket_count_++;
1629 int rv = backup_job->Connect();
1630 if (rv != ERR_IO_PENDING) {
1631 client_socket_pool_->OnConnectJobComplete(this, rv, backup_job);
1632 }
1633 }
1634
SanityCheck() const1635 void TransportClientSocketPool::Group::SanityCheck() const {
1636 #if DCHECK_IS_ON()
1637 DCHECK_LE(never_assigned_job_count(), jobs_.size());
1638 DCHECK_LE(unassigned_job_count(), jobs_.size());
1639
1640 // Check that |unassigned_jobs_| is empty iff there are at least as many
1641 // requests as jobs.
1642 DCHECK_EQ(unassigned_jobs_.empty(), jobs_.size() <= unbound_requests_.size());
1643
1644 size_t num_assigned_jobs = jobs_.size() - unassigned_jobs_.size();
1645
1646 RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1647 for (size_t i = 0; i < unbound_requests_.size();
1648 ++i, pointer = unbound_requests_.GetNextTowardsLastMin(pointer)) {
1649 DCHECK(!pointer.is_null());
1650 DCHECK(pointer.value());
1651 // Check that the first |num_assigned_jobs| requests have valid job
1652 // assignments.
1653 if (i < num_assigned_jobs) {
1654 // The request has a job.
1655 ConnectJob* job = pointer.value()->job();
1656 DCHECK(job);
1657 // The request's job is not in |unassigned_jobs_|
1658 DCHECK(!base::Contains(unassigned_jobs_, job));
1659 // The request's job is in |jobs_|
1660 DCHECK(base::Contains(jobs_, job, &std::unique_ptr<ConnectJob>::get));
1661 // The same job is not assigned to any other request with a job.
1662 RequestQueue::Pointer pointer2 =
1663 unbound_requests_.GetNextTowardsLastMin(pointer);
1664 for (size_t j = i + 1; j < num_assigned_jobs;
1665 ++j, pointer2 = unbound_requests_.GetNextTowardsLastMin(pointer2)) {
1666 DCHECK(!pointer2.is_null());
1667 ConnectJob* job2 = pointer2.value()->job();
1668 DCHECK(job2);
1669 DCHECK_NE(job, job2);
1670 }
1671 DCHECK_EQ(pointer.value()->priority(), job->priority());
1672 } else {
1673 // Check that any subsequent requests do not have a job.
1674 DCHECK(!pointer.value()->job());
1675 }
1676 }
1677
1678 for (auto it = unassigned_jobs_.begin(); it != unassigned_jobs_.end(); ++it) {
1679 // Check that all unassigned jobs are in |jobs_|
1680 ConnectJob* job = *it;
1681 DCHECK(base::Contains(jobs_, job, &std::unique_ptr<ConnectJob>::get));
1682 // Check that there are no duplicated entries in |unassigned_jobs_|
1683 for (auto it2 = std::next(it); it2 != unassigned_jobs_.end(); ++it2) {
1684 DCHECK_NE(job, *it2);
1685 }
1686
1687 // Check that no |unassigned_jobs_| are in |bound_requests_|.
1688 DCHECK(!base::Contains(bound_requests_, job,
1689 [](const BoundRequest& bound_request) {
1690 return bound_request.connect_job.get();
1691 }));
1692 }
1693 #endif
1694 }
1695
RemoveAllUnboundJobs()1696 void TransportClientSocketPool::Group::RemoveAllUnboundJobs() {
1697 SanityCheck();
1698
1699 // Remove jobs from any requests that have them.
1700 if (!unbound_requests_.empty()) {
1701 for (RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1702 !pointer.is_null() && pointer.value()->job();
1703 pointer = unbound_requests_.GetNextTowardsLastMin(pointer)) {
1704 pointer.value()->ReleaseJob();
1705 }
1706 }
1707 unassigned_jobs_.clear();
1708 never_assigned_job_count_ = 0;
1709
1710 // Diagnostics check for crbug.com/1231248. `Group`s are deleted only on
1711 // removal from `TransportClientSocketPool::group_map_`, so if this check
1712 // fails, `this` has been deleted, likely through some reentrancy issue.
1713 CHECK(client_socket_pool_->HasGroup(group_id_));
1714
1715 // Delete active jobs.
1716 jobs_.clear();
1717 // Stop backup job timer.
1718 backup_job_timer_.Stop();
1719
1720 SanityCheck();
1721 }
1722
ConnectJobCount() const1723 size_t TransportClientSocketPool::Group::ConnectJobCount() const {
1724 return bound_requests_.size() + jobs_.size();
1725 }
1726
GetConnectJobForHandle(const ClientSocketHandle * handle) const1727 ConnectJob* TransportClientSocketPool::Group::GetConnectJobForHandle(
1728 const ClientSocketHandle* handle) const {
1729 // Search through bound requests for |handle|.
1730 for (const auto& bound_pair : bound_requests_) {
1731 if (handle == bound_pair.request->handle())
1732 return bound_pair.connect_job.get();
1733 }
1734
1735 // Search through the unbound requests that have corresponding jobs for a
1736 // request with |handle|.
1737 for (RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1738 !pointer.is_null() && pointer.value()->job();
1739 pointer = unbound_requests_.GetNextTowardsLastMin(pointer)) {
1740 if (pointer.value()->handle() == handle)
1741 return pointer.value()->job();
1742 }
1743
1744 return nullptr;
1745 }
1746
InsertUnboundRequest(std::unique_ptr<Request> request)1747 void TransportClientSocketPool::Group::InsertUnboundRequest(
1748 std::unique_ptr<Request> request) {
1749 SanityCheck();
1750
1751 // Should not have a job because it is not already in |unbound_requests_|
1752 DCHECK(!request->job());
1753 // This value must be cached before we release |request|.
1754 RequestPriority priority = request->priority();
1755
1756 RequestQueue::Pointer new_position;
1757 if (request->respect_limits() == RespectLimits::DISABLED) {
1758 // Put requests with RespectLimits::DISABLED (which should have
1759 // priority == MAXIMUM_PRIORITY) ahead of other requests with
1760 // MAXIMUM_PRIORITY.
1761 DCHECK_EQ(priority, MAXIMUM_PRIORITY);
1762 new_position =
1763 unbound_requests_.InsertAtFront(std::move(request), priority);
1764 } else {
1765 new_position = unbound_requests_.Insert(std::move(request), priority);
1766 }
1767 DCHECK(!unbound_requests_.empty());
1768
1769 TryToAssignJobToRequest(new_position);
1770
1771 SanityCheck();
1772 }
1773
1774 const TransportClientSocketPool::Request*
GetNextUnboundRequest() const1775 TransportClientSocketPool::Group::GetNextUnboundRequest() const {
1776 return unbound_requests_.empty() ? nullptr
1777 : unbound_requests_.FirstMax().value().get();
1778 }
1779
1780 std::unique_ptr<TransportClientSocketPool::Request>
PopNextUnboundRequest()1781 TransportClientSocketPool::Group::PopNextUnboundRequest() {
1782 if (unbound_requests_.empty())
1783 return nullptr;
1784 return RemoveUnboundRequest(unbound_requests_.FirstMax());
1785 }
1786
1787 std::unique_ptr<TransportClientSocketPool::Request>
FindAndRemoveUnboundRequest(ClientSocketHandle * handle)1788 TransportClientSocketPool::Group::FindAndRemoveUnboundRequest(
1789 ClientSocketHandle* handle) {
1790 for (RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1791 !pointer.is_null();
1792 pointer = unbound_requests_.GetNextTowardsLastMin(pointer)) {
1793 if (pointer.value()->handle() == handle) {
1794 DCHECK_EQ(static_cast<RequestPriority>(pointer.priority()),
1795 pointer.value()->priority());
1796 std::unique_ptr<Request> request = RemoveUnboundRequest(pointer);
1797 return request;
1798 }
1799 }
1800 return nullptr;
1801 }
1802
SetPendingErrorForAllBoundRequests(int pending_error)1803 void TransportClientSocketPool::Group::SetPendingErrorForAllBoundRequests(
1804 int pending_error) {
1805 for (auto& bound_request : bound_requests_) {
1806 // Earlier errors take precedence.
1807 if (bound_request.pending_error == OK)
1808 bound_request.pending_error = pending_error;
1809 }
1810 }
1811
1812 const TransportClientSocketPool::Request*
BindRequestToConnectJob(ConnectJob * connect_job)1813 TransportClientSocketPool::Group::BindRequestToConnectJob(
1814 ConnectJob* connect_job) {
1815 // Check if |job| is already bound to a Request.
1816 for (const auto& bound_pair : bound_requests_) {
1817 if (bound_pair.connect_job.get() == connect_job)
1818 return bound_pair.request.get();
1819 }
1820
1821 // If not, try to bind it to a Request.
1822 const Request* request = GetNextUnboundRequest();
1823 // If there are no pending requests, or the highest priority request has no
1824 // callback to handle auth challenges, return nullptr.
1825 if (!request || request->proxy_auth_callback().is_null())
1826 return nullptr;
1827
1828 // Otherwise, bind the ConnectJob to the Request.
1829 std::unique_ptr<Request> owned_request = PopNextUnboundRequest();
1830 DCHECK_EQ(owned_request.get(), request);
1831 std::unique_ptr<ConnectJob> owned_connect_job = RemoveUnboundJob(connect_job);
1832 LogBoundConnectJobToRequest(owned_connect_job->net_log().source(), *request);
1833 bound_requests_.emplace_back(BoundRequest(
1834 std::move(owned_connect_job), std::move(owned_request), generation()));
1835 return request;
1836 }
1837
1838 std::optional<TransportClientSocketPool::Group::BoundRequest>
FindAndRemoveBoundRequestForConnectJob(ConnectJob * connect_job)1839 TransportClientSocketPool::Group::FindAndRemoveBoundRequestForConnectJob(
1840 ConnectJob* connect_job) {
1841 for (auto bound_pair = bound_requests_.begin();
1842 bound_pair != bound_requests_.end(); ++bound_pair) {
1843 if (bound_pair->connect_job.get() != connect_job)
1844 continue;
1845 BoundRequest ret = std::move(*bound_pair);
1846 bound_requests_.erase(bound_pair);
1847 return std::move(ret);
1848 }
1849 return std::nullopt;
1850 }
1851
1852 std::unique_ptr<TransportClientSocketPool::Request>
FindAndRemoveBoundRequest(ClientSocketHandle * client_socket_handle)1853 TransportClientSocketPool::Group::FindAndRemoveBoundRequest(
1854 ClientSocketHandle* client_socket_handle) {
1855 for (auto bound_pair = bound_requests_.begin();
1856 bound_pair != bound_requests_.end(); ++bound_pair) {
1857 if (bound_pair->request->handle() != client_socket_handle)
1858 continue;
1859 std::unique_ptr<Request> request = std::move(bound_pair->request);
1860 bound_requests_.erase(bound_pair);
1861 return request;
1862 }
1863 return nullptr;
1864 }
1865
SetPriority(ClientSocketHandle * handle,RequestPriority priority)1866 void TransportClientSocketPool::Group::SetPriority(ClientSocketHandle* handle,
1867 RequestPriority priority) {
1868 for (RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1869 !pointer.is_null();
1870 pointer = unbound_requests_.GetNextTowardsLastMin(pointer)) {
1871 if (pointer.value()->handle() == handle) {
1872 if (pointer.value()->priority() == priority)
1873 return;
1874
1875 std::unique_ptr<Request> request = RemoveUnboundRequest(pointer);
1876
1877 // Requests that ignore limits much be created and remain at the highest
1878 // priority, and should not be reprioritized.
1879 DCHECK_EQ(request->respect_limits(), RespectLimits::ENABLED);
1880
1881 request->set_priority(priority);
1882 InsertUnboundRequest(std::move(request));
1883 return;
1884 }
1885 }
1886
1887 // This function must be called with a valid ClientSocketHandle.
1888 NOTREACHED();
1889 }
1890
RequestWithHandleHasJobForTesting(const ClientSocketHandle * handle) const1891 bool TransportClientSocketPool::Group::RequestWithHandleHasJobForTesting(
1892 const ClientSocketHandle* handle) const {
1893 SanityCheck();
1894 if (GetConnectJobForHandle(handle))
1895 return true;
1896
1897 // There's no corresponding ConnectJob. Verify that the handle is at least
1898 // owned by a request.
1899 RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1900 for (size_t i = 0; i < unbound_requests_.size(); ++i) {
1901 if (pointer.value()->handle() == handle)
1902 return false;
1903 pointer = unbound_requests_.GetNextTowardsLastMin(pointer);
1904 }
1905 NOTREACHED();
1906 return false;
1907 }
1908
BoundRequest()1909 TransportClientSocketPool::Group::BoundRequest::BoundRequest()
1910 : pending_error(OK) {}
1911
BoundRequest(std::unique_ptr<ConnectJob> connect_job,std::unique_ptr<Request> request,int64_t generation)1912 TransportClientSocketPool::Group::BoundRequest::BoundRequest(
1913 std::unique_ptr<ConnectJob> connect_job,
1914 std::unique_ptr<Request> request,
1915 int64_t generation)
1916 : connect_job(std::move(connect_job)),
1917 request(std::move(request)),
1918 generation(generation),
1919 pending_error(OK) {}
1920
1921 TransportClientSocketPool::Group::BoundRequest::BoundRequest(
1922 BoundRequest&& other) = default;
1923
1924 TransportClientSocketPool::Group::BoundRequest&
1925 TransportClientSocketPool::Group::BoundRequest::operator=(
1926 BoundRequest&& other) = default;
1927
1928 TransportClientSocketPool::Group::BoundRequest::~BoundRequest() = default;
1929
1930 std::unique_ptr<TransportClientSocketPool::Request>
RemoveUnboundRequest(const RequestQueue::Pointer & pointer)1931 TransportClientSocketPool::Group::RemoveUnboundRequest(
1932 const RequestQueue::Pointer& pointer) {
1933 SanityCheck();
1934
1935 std::unique_ptr<Request> request = unbound_requests_.Erase(pointer);
1936 if (request->job()) {
1937 TryToAssignUnassignedJob(request->ReleaseJob());
1938 }
1939 // If there are no more unbound requests, kill the backup timer.
1940 if (unbound_requests_.empty())
1941 backup_job_timer_.Stop();
1942
1943 SanityCheck();
1944 return request;
1945 }
1946
1947 TransportClientSocketPool::RequestQueue::Pointer
FindUnboundRequestWithJob(const ConnectJob * job) const1948 TransportClientSocketPool::Group::FindUnboundRequestWithJob(
1949 const ConnectJob* job) const {
1950 SanityCheck();
1951
1952 for (RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1953 !pointer.is_null() && pointer.value()->job();
1954 pointer = unbound_requests_.GetNextTowardsLastMin(pointer)) {
1955 if (pointer.value()->job() == job)
1956 return pointer;
1957 }
1958 // If a request with the job was not found, it must be in |unassigned_jobs_|.
1959 DCHECK(base::Contains(unassigned_jobs_, job));
1960 return RequestQueue::Pointer();
1961 }
1962
1963 TransportClientSocketPool::RequestQueue::Pointer
GetFirstRequestWithoutJob() const1964 TransportClientSocketPool::Group::GetFirstRequestWithoutJob() const {
1965 RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1966 size_t i = 0;
1967 for (; !pointer.is_null() && pointer.value()->job();
1968 pointer = unbound_requests_.GetNextTowardsLastMin(pointer)) {
1969 ++i;
1970 }
1971 DCHECK_EQ(i, jobs_.size() - unassigned_jobs_.size());
1972 DCHECK(pointer.is_null() || !pointer.value()->job());
1973 return pointer;
1974 }
1975
TryToAssignUnassignedJob(ConnectJob * job)1976 void TransportClientSocketPool::Group::TryToAssignUnassignedJob(
1977 ConnectJob* job) {
1978 unassigned_jobs_.push_back(job);
1979 RequestQueue::Pointer first_request_without_job = GetFirstRequestWithoutJob();
1980 if (!first_request_without_job.is_null()) {
1981 first_request_without_job.value()->AssignJob(unassigned_jobs_.back());
1982 unassigned_jobs_.pop_back();
1983 }
1984 }
1985
TryToAssignJobToRequest(TransportClientSocketPool::RequestQueue::Pointer request_pointer)1986 void TransportClientSocketPool::Group::TryToAssignJobToRequest(
1987 TransportClientSocketPool::RequestQueue::Pointer request_pointer) {
1988 DCHECK(!request_pointer.value()->job());
1989 if (!unassigned_jobs_.empty()) {
1990 request_pointer.value()->AssignJob(unassigned_jobs_.front());
1991 unassigned_jobs_.pop_front();
1992 return;
1993 }
1994
1995 // If the next request in the queue does not have a job, then there are no
1996 // requests with a job after |request_pointer| from which we can steal.
1997 RequestQueue::Pointer next_request =
1998 unbound_requests_.GetNextTowardsLastMin(request_pointer);
1999 if (next_request.is_null() || !next_request.value()->job())
2000 return;
2001
2002 // Walk down the queue to find the last request with a job.
2003 RequestQueue::Pointer cur = next_request;
2004 RequestQueue::Pointer next = unbound_requests_.GetNextTowardsLastMin(cur);
2005 while (!next.is_null() && next.value()->job()) {
2006 cur = next;
2007 next = unbound_requests_.GetNextTowardsLastMin(next);
2008 }
2009 // Steal the job from the last request with a job.
2010 TransferJobBetweenRequests(cur.value().get(), request_pointer.value().get());
2011 }
2012
TransferJobBetweenRequests(TransportClientSocketPool::Request * source,TransportClientSocketPool::Request * dest)2013 void TransportClientSocketPool::Group::TransferJobBetweenRequests(
2014 TransportClientSocketPool::Request* source,
2015 TransportClientSocketPool::Request* dest) {
2016 DCHECK(!dest->job());
2017 DCHECK(source->job());
2018 dest->AssignJob(source->ReleaseJob());
2019 }
2020
2021 } // namespace net
2022