xref: /aosp_15_r20/external/cronet/net/socket/transport_client_socket_pool.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2012 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/socket/transport_client_socket_pool.h"
6 
7 #include <string_view>
8 #include <utility>
9 
10 #include "base/auto_reset.h"
11 #include "base/barrier_closure.h"
12 #include "base/check_op.h"
13 #include "base/compiler_specific.h"
14 #include "base/containers/contains.h"
15 #include "base/format_macros.h"
16 #include "base/functional/bind.h"
17 #include "base/functional/callback_helpers.h"
18 #include "base/location.h"
19 #include "base/memory/ptr_util.h"
20 #include "base/metrics/histogram_macros.h"
21 #include "base/notreached.h"
22 #include "base/ranges/algorithm.h"
23 #include "base/strings/string_util.h"
24 #include "base/task/single_thread_task_runner.h"
25 #include "base/time/time.h"
26 #include "base/values.h"
27 #include "net/base/host_port_pair.h"
28 #include "net/base/net_errors.h"
29 #include "net/base/proxy_chain.h"
30 #include "net/base/proxy_server.h"
31 #include "net/log/net_log.h"
32 #include "net/log/net_log_event_type.h"
33 #include "net/log/net_log_source.h"
34 #include "net/socket/connect_job_factory.h"
35 #include "net/traffic_annotation/network_traffic_annotation.h"
36 #include "url/gurl.h"
37 
38 namespace net {
39 
40 namespace {
41 
42 // Indicate whether or not we should establish a new transport layer connection
43 // after a certain timeout has passed without receiving an ACK.
44 bool g_connect_backup_jobs_enabled = true;
45 
NetLogCreateConnectJobParams(bool backup_job,const ClientSocketPool::GroupId * group_id)46 base::Value::Dict NetLogCreateConnectJobParams(
47     bool backup_job,
48     const ClientSocketPool::GroupId* group_id) {
49   return base::Value::Dict()
50       .Set("backup_job", backup_job)
51       .Set("group_id", group_id->ToString());
52 }
53 
54 }  // namespace
55 
56 const char TransportClientSocketPool::kCertDatabaseChanged[] =
57     "Cert database changed";
58 const char TransportClientSocketPool::kCertVerifierChanged[] =
59     "Cert verifier changed";
60 const char TransportClientSocketPool::kClosedConnectionReturnedToPool[] =
61     "Connection was closed when it was returned to the pool";
62 const char TransportClientSocketPool::kDataReceivedUnexpectedly[] =
63     "Data received unexpectedly";
64 const char TransportClientSocketPool::kIdleTimeLimitExpired[] =
65     "Idle time limit expired";
66 const char TransportClientSocketPool::kNetworkChanged[] = "Network changed";
67 const char TransportClientSocketPool::kRemoteSideClosedConnection[] =
68     "Remote side closed connection";
69 const char TransportClientSocketPool::kSocketGenerationOutOfDate[] =
70     "Socket generation out of date";
71 const char TransportClientSocketPool::kSocketPoolDestroyed[] =
72     "Socket pool destroyed";
73 const char TransportClientSocketPool::kSslConfigChanged[] =
74     "SSL configuration changed";
75 
Request(ClientSocketHandle * handle,CompletionOnceCallback callback,const ProxyAuthCallback & proxy_auth_callback,RequestPriority priority,const SocketTag & socket_tag,RespectLimits respect_limits,Flags flags,scoped_refptr<SocketParams> socket_params,const std::optional<NetworkTrafficAnnotationTag> & proxy_annotation_tag,const NetLogWithSource & net_log)76 TransportClientSocketPool::Request::Request(
77     ClientSocketHandle* handle,
78     CompletionOnceCallback callback,
79     const ProxyAuthCallback& proxy_auth_callback,
80     RequestPriority priority,
81     const SocketTag& socket_tag,
82     RespectLimits respect_limits,
83     Flags flags,
84     scoped_refptr<SocketParams> socket_params,
85     const std::optional<NetworkTrafficAnnotationTag>& proxy_annotation_tag,
86     const NetLogWithSource& net_log)
87     : handle_(handle),
88       callback_(std::move(callback)),
89       proxy_auth_callback_(proxy_auth_callback),
90       priority_(priority),
91       respect_limits_(respect_limits),
92       flags_(flags),
93       socket_params_(std::move(socket_params)),
94       proxy_annotation_tag_(proxy_annotation_tag),
95       net_log_(net_log),
96       socket_tag_(socket_tag) {
97   if (respect_limits_ == ClientSocketPool::RespectLimits::DISABLED)
98     DCHECK_EQ(priority_, MAXIMUM_PRIORITY);
99 }
100 
101 TransportClientSocketPool::Request::~Request() = default;
102 
AssignJob(ConnectJob * job)103 void TransportClientSocketPool::Request::AssignJob(ConnectJob* job) {
104   DCHECK(job);
105   DCHECK(!job_);
106   job_ = job;
107   if (job_->priority() != priority_)
108     job_->ChangePriority(priority_);
109 }
110 
ReleaseJob()111 ConnectJob* TransportClientSocketPool::Request::ReleaseJob() {
112   DCHECK(job_);
113   ConnectJob* job = job_;
114   job_ = nullptr;
115   return job;
116 }
117 
118 struct TransportClientSocketPool::IdleSocket {
119   // An idle socket can't be used if it is disconnected or has been used
120   // before and has received data unexpectedly (hence no longer idle).  The
121   // unread data would be mistaken for the beginning of the next response if
122   // we were to use the socket for a new request.
123   //
124   // Note that a socket that has never been used before (like a preconnected
125   // socket) may be used even with unread data.  This may be, e.g., a SPDY
126   // SETTINGS frame.
127   //
128   // If the socket is not usable, |net_log_reason_utf8| is set to a string
129   // indicating why the socket is not usable.
130   bool IsUsable(const char** net_log_reason_utf8) const;
131 
132   std::unique_ptr<StreamSocket> socket;
133   base::TimeTicks start_time;
134 };
135 
TransportClientSocketPool(int max_sockets,int max_sockets_per_group,base::TimeDelta unused_idle_socket_timeout,const ProxyChain & proxy_chain,bool is_for_websockets,const CommonConnectJobParams * common_connect_job_params,bool cleanup_on_ip_address_change)136 TransportClientSocketPool::TransportClientSocketPool(
137     int max_sockets,
138     int max_sockets_per_group,
139     base::TimeDelta unused_idle_socket_timeout,
140     const ProxyChain& proxy_chain,
141     bool is_for_websockets,
142     const CommonConnectJobParams* common_connect_job_params,
143     bool cleanup_on_ip_address_change)
144     : TransportClientSocketPool(max_sockets,
145                                 max_sockets_per_group,
146                                 unused_idle_socket_timeout,
147                                 ClientSocketPool::used_idle_socket_timeout(),
148                                 proxy_chain,
149                                 is_for_websockets,
150                                 common_connect_job_params,
151                                 cleanup_on_ip_address_change,
152                                 std::make_unique<ConnectJobFactory>(),
153                                 common_connect_job_params->ssl_client_context,
154                                 /*connect_backup_jobs_enabled=*/true) {}
155 
~TransportClientSocketPool()156 TransportClientSocketPool::~TransportClientSocketPool() {
157   // Clean up any idle sockets and pending connect jobs.  Assert that we have no
158   // remaining active sockets or pending requests.  They should have all been
159   // cleaned up prior to |this| being destroyed.
160   FlushWithError(ERR_ABORTED, kSocketPoolDestroyed);
161   DCHECK(group_map_.empty());
162   DCHECK(pending_callback_map_.empty());
163   DCHECK_EQ(0, connecting_socket_count_);
164   DCHECK_EQ(0, handed_out_socket_count_);
165   CHECK(higher_pools_.empty());
166 
167   if (ssl_client_context_)
168     ssl_client_context_->RemoveObserver(this);
169 
170   if (cleanup_on_ip_address_change_)
171     NetworkChangeNotifier::RemoveIPAddressObserver(this);
172 }
173 
174 std::unique_ptr<TransportClientSocketPool>
CreateForTesting(int max_sockets,int max_sockets_per_group,base::TimeDelta unused_idle_socket_timeout,base::TimeDelta used_idle_socket_timeout,const ProxyChain & proxy_chain,bool is_for_websockets,const CommonConnectJobParams * common_connect_job_params,std::unique_ptr<ConnectJobFactory> connect_job_factory,SSLClientContext * ssl_client_context,bool connect_backup_jobs_enabled)175 TransportClientSocketPool::CreateForTesting(
176     int max_sockets,
177     int max_sockets_per_group,
178     base::TimeDelta unused_idle_socket_timeout,
179     base::TimeDelta used_idle_socket_timeout,
180     const ProxyChain& proxy_chain,
181     bool is_for_websockets,
182     const CommonConnectJobParams* common_connect_job_params,
183     std::unique_ptr<ConnectJobFactory> connect_job_factory,
184     SSLClientContext* ssl_client_context,
185     bool connect_backup_jobs_enabled) {
186   return base::WrapUnique<TransportClientSocketPool>(
187       new TransportClientSocketPool(
188           max_sockets, max_sockets_per_group, unused_idle_socket_timeout,
189           used_idle_socket_timeout, proxy_chain, is_for_websockets,
190           common_connect_job_params, /*cleanup_on_ip_address_change=*/true,
191           std::move(connect_job_factory), ssl_client_context,
192           connect_backup_jobs_enabled));
193 }
194 
CallbackResultPair()195 TransportClientSocketPool::CallbackResultPair::CallbackResultPair()
196     : result(OK) {}
197 
CallbackResultPair(CompletionOnceCallback callback_in,int result_in)198 TransportClientSocketPool::CallbackResultPair::CallbackResultPair(
199     CompletionOnceCallback callback_in,
200     int result_in)
201     : callback(std::move(callback_in)), result(result_in) {}
202 
203 TransportClientSocketPool::CallbackResultPair::CallbackResultPair(
204     TransportClientSocketPool::CallbackResultPair&& other) = default;
205 
206 TransportClientSocketPool::CallbackResultPair&
207 TransportClientSocketPool::CallbackResultPair::operator=(
208     TransportClientSocketPool::CallbackResultPair&& other) = default;
209 
210 TransportClientSocketPool::CallbackResultPair::~CallbackResultPair() = default;
211 
IsStalled() const212 bool TransportClientSocketPool::IsStalled() const {
213   // If fewer than |max_sockets_| are in use, then clearly |this| is not
214   // stalled.
215   if ((handed_out_socket_count_ + connecting_socket_count_) < max_sockets_)
216     return false;
217   // So in order to be stalled, |this| must be using at least |max_sockets_| AND
218   // |this| must have a request that is actually stalled on the global socket
219   // limit.  To find such a request, look for a group that has more requests
220   // than jobs AND where the number of sockets is less than
221   // |max_sockets_per_group_|.  (If the number of sockets is equal to
222   // |max_sockets_per_group_|, then the request is stalled on the group limit,
223   // which does not count.)
224   for (const auto& it : group_map_) {
225     if (it.second->CanUseAdditionalSocketSlot(max_sockets_per_group_))
226       return true;
227   }
228   return false;
229 }
230 
AddHigherLayeredPool(HigherLayeredPool * higher_pool)231 void TransportClientSocketPool::AddHigherLayeredPool(
232     HigherLayeredPool* higher_pool) {
233   CHECK(higher_pool);
234   CHECK(!base::Contains(higher_pools_, higher_pool));
235   higher_pools_.insert(higher_pool);
236 }
237 
RemoveHigherLayeredPool(HigherLayeredPool * higher_pool)238 void TransportClientSocketPool::RemoveHigherLayeredPool(
239     HigherLayeredPool* higher_pool) {
240   CHECK(higher_pool);
241   CHECK(base::Contains(higher_pools_, higher_pool));
242   higher_pools_.erase(higher_pool);
243 }
244 
RequestSocket(const GroupId & group_id,scoped_refptr<SocketParams> params,const std::optional<NetworkTrafficAnnotationTag> & proxy_annotation_tag,RequestPriority priority,const SocketTag & socket_tag,RespectLimits respect_limits,ClientSocketHandle * handle,CompletionOnceCallback callback,const ProxyAuthCallback & proxy_auth_callback,const NetLogWithSource & net_log)245 int TransportClientSocketPool::RequestSocket(
246     const GroupId& group_id,
247     scoped_refptr<SocketParams> params,
248     const std::optional<NetworkTrafficAnnotationTag>& proxy_annotation_tag,
249     RequestPriority priority,
250     const SocketTag& socket_tag,
251     RespectLimits respect_limits,
252     ClientSocketHandle* handle,
253     CompletionOnceCallback callback,
254     const ProxyAuthCallback& proxy_auth_callback,
255     const NetLogWithSource& net_log) {
256   CHECK(callback);
257   CHECK(handle);
258 
259   NetLogTcpClientSocketPoolRequestedSocket(net_log, group_id);
260 
261   std::unique_ptr<Request> request = std::make_unique<Request>(
262       handle, std::move(callback), proxy_auth_callback, priority, socket_tag,
263       respect_limits, NORMAL, std::move(params), proxy_annotation_tag, net_log);
264 
265   // Cleanup any timed-out idle sockets.
266   CleanupIdleSockets(false, nullptr /* net_log_reason_utf8 */);
267 
268   request->net_log().BeginEvent(NetLogEventType::SOCKET_POOL);
269 
270   int rv =
271       RequestSocketInternal(group_id, *request,
272                             /*preconnect_done_closure=*/base::OnceClosure());
273   if (rv != ERR_IO_PENDING) {
274     if (rv == OK) {
275       request->handle()->socket()->ApplySocketTag(request->socket_tag());
276     }
277     request->net_log().EndEventWithNetErrorCode(NetLogEventType::SOCKET_POOL,
278                                                 rv);
279     CHECK(!request->handle()->is_initialized());
280     request.reset();
281   } else {
282     Group* group = GetOrCreateGroup(group_id);
283     group->InsertUnboundRequest(std::move(request));
284     // Have to do this asynchronously, as closing sockets in higher level pools
285     // call back in to |this|, which will cause all sorts of fun and exciting
286     // re-entrancy issues if the socket pool is doing something else at the
287     // time.
288     if (group->CanUseAdditionalSocketSlot(max_sockets_per_group_)) {
289       base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
290           FROM_HERE,
291           base::BindOnce(
292               &TransportClientSocketPool::TryToCloseSocketsInLayeredPools,
293               weak_factory_.GetWeakPtr()));
294     }
295   }
296   return rv;
297 }
298 
RequestSockets(const GroupId & group_id,scoped_refptr<SocketParams> params,const std::optional<NetworkTrafficAnnotationTag> & proxy_annotation_tag,int num_sockets,CompletionOnceCallback callback,const NetLogWithSource & net_log)299 int TransportClientSocketPool::RequestSockets(
300     const GroupId& group_id,
301     scoped_refptr<SocketParams> params,
302     const std::optional<NetworkTrafficAnnotationTag>& proxy_annotation_tag,
303     int num_sockets,
304     CompletionOnceCallback callback,
305     const NetLogWithSource& net_log) {
306   // TODO(eroman): Split out the host and port parameters.
307   net_log.AddEvent(NetLogEventType::TCP_CLIENT_SOCKET_POOL_REQUESTED_SOCKETS,
308                    [&] { return NetLogGroupIdParams(group_id); });
309 
310   Request request(nullptr /* no handle */, CompletionOnceCallback(),
311                   ProxyAuthCallback(), IDLE, SocketTag(),
312                   RespectLimits::ENABLED, NO_IDLE_SOCKETS, std::move(params),
313                   proxy_annotation_tag, net_log);
314 
315   // Cleanup any timed-out idle sockets.
316   CleanupIdleSockets(false, nullptr /* net_log_reason_utf8 */);
317 
318   if (num_sockets > max_sockets_per_group_) {
319     num_sockets = max_sockets_per_group_;
320   }
321 
322   request.net_log().BeginEventWithIntParams(
323       NetLogEventType::SOCKET_POOL_CONNECTING_N_SOCKETS, "num_sockets",
324       num_sockets);
325 
326   Group* group = GetOrCreateGroup(group_id);
327 
328   // RequestSocketsInternal() may delete the group.
329   bool deleted_group = false;
330 
331   int rv = OK;
332 
333   base::RepeatingClosure preconnect_done_closure = base::BarrierClosure(
334       num_sockets,
335       base::BindOnce(
336           [](CompletionOnceCallback callback) {
337             base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
338                 FROM_HERE, base::BindOnce(std::move(callback), OK));
339           },
340           std::move(callback)));
341   int pending_connect_job_count = 0;
342   for (int num_iterations_left = num_sockets;
343        group->NumActiveSocketSlots() < num_sockets && num_iterations_left > 0;
344        num_iterations_left--) {
345     rv = RequestSocketInternal(group_id, request, preconnect_done_closure);
346     if (rv == ERR_IO_PENDING) {
347       ++pending_connect_job_count;
348     }
349     if (rv < 0 && rv != ERR_IO_PENDING) {
350       // We're encountering a synchronous error.  Give up.
351       if (!base::Contains(group_map_, group_id))
352         deleted_group = true;
353       break;
354     }
355     if (!base::Contains(group_map_, group_id)) {
356       // Unexpected.  The group should only be getting deleted on synchronous
357       // error.
358       NOTREACHED();
359       deleted_group = true;
360       break;
361     }
362   }
363 
364   if (!deleted_group && group->IsEmpty())
365     RemoveGroup(group_id);
366 
367   if (rv == ERR_IO_PENDING)
368     rv = OK;
369   request.net_log().EndEventWithNetErrorCode(
370       NetLogEventType::SOCKET_POOL_CONNECTING_N_SOCKETS, rv);
371 
372   // Currently we don't handle preconnect errors. So this method returns OK even
373   // if failed to preconnect.
374   // TODO(crbug.com/1330235): Consider support error handlings when needed.
375   if (pending_connect_job_count == 0)
376     return OK;
377   for (int i = 0; i < num_sockets - pending_connect_job_count; ++i) {
378     preconnect_done_closure.Run();
379   }
380 
381   return ERR_IO_PENDING;
382 }
383 
RequestSocketInternal(const GroupId & group_id,const Request & request,base::OnceClosure preconnect_done_closure)384 int TransportClientSocketPool::RequestSocketInternal(
385     const GroupId& group_id,
386     const Request& request,
387     base::OnceClosure preconnect_done_closure) {
388 #if DCHECK_IS_ON()
389   DCHECK(!request_in_process_);
390   base::AutoReset<bool> auto_reset(&request_in_process_, true);
391 #endif  // DCHECK_IS_ON()
392 
393   ClientSocketHandle* const handle = request.handle();
394   const bool preconnecting = !handle;
395   DCHECK_EQ(preconnecting, !!preconnect_done_closure);
396 
397   Group* group = nullptr;
398   auto group_it = group_map_.find(group_id);
399   if (group_it != group_map_.end()) {
400     group = group_it->second;
401 
402     if (!(request.flags() & NO_IDLE_SOCKETS)) {
403       // Try to reuse a socket.
404       if (AssignIdleSocketToRequest(request, group))
405         return OK;
406     }
407 
408     // If there are more ConnectJobs than pending requests, don't need to do
409     // anything.  Can just wait for the extra job to connect, and then assign it
410     // to the request.
411     if (!preconnecting && group->TryToUseNeverAssignedConnectJob())
412       return ERR_IO_PENDING;
413 
414     // Can we make another active socket now?
415     if (!group->HasAvailableSocketSlot(max_sockets_per_group_) &&
416         request.respect_limits() == RespectLimits::ENABLED) {
417       // TODO(willchan): Consider whether or not we need to close a socket in a
418       // higher layered group. I don't think this makes sense since we would
419       // just reuse that socket then if we needed one and wouldn't make it down
420       // to this layer.
421       request.net_log().AddEvent(
422           NetLogEventType::SOCKET_POOL_STALLED_MAX_SOCKETS_PER_GROUP);
423       return preconnecting ? ERR_PRECONNECT_MAX_SOCKET_LIMIT : ERR_IO_PENDING;
424     }
425   }
426 
427   if (ReachedMaxSocketsLimit() &&
428       request.respect_limits() == RespectLimits::ENABLED) {
429     // NOTE(mmenke):  Wonder if we really need different code for each case
430     // here.  Only reason for them now seems to be preconnects.
431     if (idle_socket_count_ > 0) {
432       // There's an idle socket in this pool. Either that's because there's
433       // still one in this group, but we got here due to preconnecting
434       // bypassing idle sockets, or because there's an idle socket in another
435       // group.
436       bool closed = CloseOneIdleSocketExceptInGroup(group);
437       if (preconnecting && !closed)
438         return ERR_PRECONNECT_MAX_SOCKET_LIMIT;
439     } else {
440       // We could check if we really have a stalled group here, but it
441       // requires a scan of all groups, so just flip a flag here, and do the
442       // check later.
443       request.net_log().AddEvent(
444           NetLogEventType::SOCKET_POOL_STALLED_MAX_SOCKETS);
445       return preconnecting ? ERR_PRECONNECT_MAX_SOCKET_LIMIT : ERR_IO_PENDING;
446     }
447   }
448 
449   // We couldn't find a socket to reuse, and there's space to allocate one,
450   // so allocate and connect a new one.
451   group = GetOrCreateGroup(group_id);
452   std::unique_ptr<ConnectJob> connect_job(
453       CreateConnectJob(group_id, request.socket_params(), proxy_chain_,
454                        request.proxy_annotation_tag(), request.priority(),
455                        request.socket_tag(), group));
456   connect_job->net_log().AddEvent(
457       NetLogEventType::SOCKET_POOL_CONNECT_JOB_CREATED, [&] {
458         return NetLogCreateConnectJobParams(false /* backup_job */, &group_id);
459       });
460 
461   int rv = connect_job->Connect();
462   if (rv == ERR_IO_PENDING) {
463     if (preconnect_done_closure) {
464       DCHECK(preconnecting);
465       connect_job->set_done_closure(std::move(preconnect_done_closure));
466     }
467     // If we didn't have any sockets in this group, set a timer for potentially
468     // creating a new one.  If the SYN is lost, this backup socket may complete
469     // before the slow socket, improving end user latency.
470     if (connect_backup_jobs_enabled_ && group->IsEmpty())
471       group->StartBackupJobTimer(group_id);
472     group->AddJob(std::move(connect_job), preconnecting);
473     connecting_socket_count_++;
474     return rv;
475   }
476 
477   LogBoundConnectJobToRequest(connect_job->net_log().source(), request);
478   if (preconnecting) {
479     if (rv == OK)
480       AddIdleSocket(connect_job->PassSocket(), group);
481   } else {
482     DCHECK(handle);
483     if (rv != OK)
484       handle->SetAdditionalErrorState(connect_job.get());
485     std::unique_ptr<StreamSocket> socket = connect_job->PassSocket();
486     if (socket) {
487       HandOutSocket(std::move(socket), ClientSocketHandle::UNUSED,
488                     connect_job->connect_timing(), handle,
489                     base::TimeDelta() /* idle_time */, group,
490                     request.net_log());
491     }
492   }
493   if (group->IsEmpty())
494     RemoveGroup(group_id);
495 
496   return rv;
497 }
498 
AssignIdleSocketToRequest(const Request & request,Group * group)499 bool TransportClientSocketPool::AssignIdleSocketToRequest(
500     const Request& request,
501     Group* group) {
502   std::list<IdleSocket>* idle_sockets = group->mutable_idle_sockets();
503   auto idle_socket_it = idle_sockets->end();
504 
505   // Iterate through the idle sockets forwards (oldest to newest)
506   //   * Delete any disconnected ones.
507   //   * If we find a used idle socket, assign to |idle_socket|.  At the end,
508   //   the |idle_socket_it| will be set to the newest used idle socket.
509   for (auto it = idle_sockets->begin(); it != idle_sockets->end();) {
510     // Check whether socket is usable. Note that it's unlikely that the socket
511     // is not usable because this function is always invoked after a
512     // reusability check, but in theory socket can be closed asynchronously.
513     const char* net_log_reason_utf8;
514     if (!it->IsUsable(&net_log_reason_utf8)) {
515       it->socket->NetLog().AddEventWithStringParams(
516           NetLogEventType::SOCKET_POOL_CLOSING_SOCKET, "reason",
517           net_log_reason_utf8);
518       DecrementIdleCount();
519       it = idle_sockets->erase(it);
520       continue;
521     }
522 
523     if (it->socket->WasEverUsed()) {
524       // We found one we can reuse!
525       idle_socket_it = it;
526     }
527 
528     ++it;
529   }
530 
531   // If we haven't found an idle socket, that means there are no used idle
532   // sockets.  Pick the oldest (first) idle socket (FIFO).
533 
534   if (idle_socket_it == idle_sockets->end() && !idle_sockets->empty())
535     idle_socket_it = idle_sockets->begin();
536 
537   if (idle_socket_it != idle_sockets->end()) {
538     DecrementIdleCount();
539     base::TimeDelta idle_time =
540         base::TimeTicks::Now() - idle_socket_it->start_time;
541     std::unique_ptr<StreamSocket> socket = std::move(idle_socket_it->socket);
542     idle_sockets->erase(idle_socket_it);
543     // TODO(davidben): If |idle_time| is under some low watermark, consider
544     // treating as UNUSED rather than UNUSED_IDLE. This will avoid
545     // HttpNetworkTransaction retrying on some errors.
546     ClientSocketHandle::SocketReuseType reuse_type =
547         socket->WasEverUsed() ? ClientSocketHandle::REUSED_IDLE
548                               : ClientSocketHandle::UNUSED_IDLE;
549 
550     HandOutSocket(std::move(socket), reuse_type,
551                   LoadTimingInfo::ConnectTiming(), request.handle(), idle_time,
552                   group, request.net_log());
553     return true;
554   }
555 
556   return false;
557 }
558 
559 // static
LogBoundConnectJobToRequest(const NetLogSource & connect_job_source,const Request & request)560 void TransportClientSocketPool::LogBoundConnectJobToRequest(
561     const NetLogSource& connect_job_source,
562     const Request& request) {
563   request.net_log().AddEventReferencingSource(
564       NetLogEventType::SOCKET_POOL_BOUND_TO_CONNECT_JOB, connect_job_source);
565 }
566 
SetPriority(const GroupId & group_id,ClientSocketHandle * handle,RequestPriority priority)567 void TransportClientSocketPool::SetPriority(const GroupId& group_id,
568                                             ClientSocketHandle* handle,
569                                             RequestPriority priority) {
570   auto group_it = group_map_.find(group_id);
571   if (group_it == group_map_.end()) {
572     DCHECK(base::Contains(pending_callback_map_, handle));
573     // The Request has already completed and been destroyed; nothing to
574     // reprioritize.
575     return;
576   }
577 
578   group_it->second->SetPriority(handle, priority);
579 }
580 
CancelRequest(const GroupId & group_id,ClientSocketHandle * handle,bool cancel_connect_job)581 void TransportClientSocketPool::CancelRequest(const GroupId& group_id,
582                                               ClientSocketHandle* handle,
583                                               bool cancel_connect_job) {
584   auto callback_it = pending_callback_map_.find(handle);
585   if (callback_it != pending_callback_map_.end()) {
586     int result = callback_it->second.result;
587     pending_callback_map_.erase(callback_it);
588     std::unique_ptr<StreamSocket> socket = handle->PassSocket();
589     if (socket) {
590       if (result != OK) {
591         socket->Disconnect();
592       } else if (cancel_connect_job) {
593         // Close the socket if |cancel_connect_job| is true and there are no
594         // other pending requests.
595         Group* group = GetOrCreateGroup(group_id);
596         if (group->unbound_request_count() == 0)
597           socket->Disconnect();
598       }
599       ReleaseSocket(handle->group_id(), std::move(socket),
600                     handle->group_generation());
601     }
602     return;
603   }
604 
605   CHECK(base::Contains(group_map_, group_id));
606   Group* group = GetOrCreateGroup(group_id);
607 
608   std::unique_ptr<Request> request = group->FindAndRemoveBoundRequest(handle);
609   if (request) {
610     --connecting_socket_count_;
611     OnAvailableSocketSlot(group_id, group);
612     CheckForStalledSocketGroups();
613     return;
614   }
615 
616   // Search |unbound_requests_| for matching handle.
617   request = group->FindAndRemoveUnboundRequest(handle);
618   if (request) {
619     request->net_log().AddEvent(NetLogEventType::CANCELLED);
620     request->net_log().EndEvent(NetLogEventType::SOCKET_POOL);
621 
622     // Let the job run, unless |cancel_connect_job| is true, or we're at the
623     // socket limit and there are no other requests waiting on the job.
624     bool reached_limit = ReachedMaxSocketsLimit();
625     if (group->jobs().size() > group->unbound_request_count() &&
626         (cancel_connect_job || reached_limit)) {
627       RemoveConnectJob(group->jobs().begin()->get(), group);
628       if (group->IsEmpty())
629         RemoveGroup(group->group_id());
630       if (reached_limit)
631         CheckForStalledSocketGroups();
632     }
633   }
634 }
635 
CloseIdleSockets(const char * net_log_reason_utf8)636 void TransportClientSocketPool::CloseIdleSockets(
637     const char* net_log_reason_utf8) {
638   CleanupIdleSockets(true, net_log_reason_utf8);
639   DCHECK_EQ(0, idle_socket_count_);
640 }
641 
CloseIdleSocketsInGroup(const GroupId & group_id,const char * net_log_reason_utf8)642 void TransportClientSocketPool::CloseIdleSocketsInGroup(
643     const GroupId& group_id,
644     const char* net_log_reason_utf8) {
645   if (idle_socket_count_ == 0)
646     return;
647   auto it = group_map_.find(group_id);
648   if (it == group_map_.end())
649     return;
650   CleanupIdleSocketsInGroup(true, it->second, base::TimeTicks::Now(),
651                             net_log_reason_utf8);
652   if (it->second->IsEmpty())
653     RemoveGroup(it);
654 }
655 
IdleSocketCount() const656 int TransportClientSocketPool::IdleSocketCount() const {
657   return idle_socket_count_;
658 }
659 
IdleSocketCountInGroup(const GroupId & group_id) const660 size_t TransportClientSocketPool::IdleSocketCountInGroup(
661     const GroupId& group_id) const {
662   auto i = group_map_.find(group_id);
663   CHECK(i != group_map_.end());
664 
665   return i->second->idle_sockets().size();
666 }
667 
GetLoadState(const GroupId & group_id,const ClientSocketHandle * handle) const668 LoadState TransportClientSocketPool::GetLoadState(
669     const GroupId& group_id,
670     const ClientSocketHandle* handle) const {
671   if (base::Contains(pending_callback_map_, handle))
672     return LOAD_STATE_CONNECTING;
673 
674   auto group_it = group_map_.find(group_id);
675   if (group_it == group_map_.end()) {
676     // TODO(mmenke):  This is actually reached in the wild, for unknown reasons.
677     // Would be great to understand why, and if it's a bug, fix it.  If not,
678     // should have a test for that case.
679     NOTREACHED();
680     return LOAD_STATE_IDLE;
681   }
682 
683   const Group& group = *group_it->second;
684   ConnectJob* job = group.GetConnectJobForHandle(handle);
685   if (job)
686     return job->GetLoadState();
687 
688   if (group.CanUseAdditionalSocketSlot(max_sockets_per_group_))
689     return LOAD_STATE_WAITING_FOR_STALLED_SOCKET_POOL;
690   return LOAD_STATE_WAITING_FOR_AVAILABLE_SOCKET;
691 }
692 
GetInfoAsValue(const std::string & name,const std::string & type) const693 base::Value TransportClientSocketPool::GetInfoAsValue(
694     const std::string& name,
695     const std::string& type) const {
696   // TODO(mmenke): This currently doesn't return bound Requests or ConnectJobs.
697   auto dict = base::Value::Dict()
698                   .Set("name", name)
699                   .Set("type", type)
700                   .Set("handed_out_socket_count", handed_out_socket_count_)
701                   .Set("connecting_socket_count", connecting_socket_count_)
702                   .Set("idle_socket_count", idle_socket_count_)
703                   .Set("max_socket_count", max_sockets_)
704                   .Set("max_sockets_per_group", max_sockets_per_group_);
705 
706   if (group_map_.empty())
707     return base::Value(std::move(dict));
708 
709   base::Value::Dict all_groups_dict;
710   for (const auto& entry : group_map_) {
711     const Group* group = entry.second;
712 
713     base::Value::List idle_socket_list;
714     for (const auto& idle_socket : group->idle_sockets()) {
715       int source_id = idle_socket.socket->NetLog().source().id;
716       idle_socket_list.Append(source_id);
717     }
718 
719     base::Value::List connect_jobs_list;
720     for (const auto& job : group->jobs()) {
721       int source_id = job->net_log().source().id;
722       connect_jobs_list.Append(source_id);
723     }
724 
725     auto group_dict =
726         base::Value::Dict()
727             .Set("pending_request_count",
728                  static_cast<int>(group->unbound_request_count()))
729             .Set("active_socket_count", group->active_socket_count())
730             .Set("idle_sockets", std::move(idle_socket_list))
731             .Set("connect_jobs", std::move(connect_jobs_list))
732             .Set("is_stalled",
733                  group->CanUseAdditionalSocketSlot(max_sockets_per_group_))
734             .Set("backup_job_timer_is_running",
735                  group->BackupJobTimerIsRunning());
736 
737     if (group->has_unbound_requests()) {
738       group_dict.Set("top_pending_priority",
739                      RequestPriorityToString(group->TopPendingPriority()));
740     }
741 
742     all_groups_dict.Set(entry.first.ToString(), std::move(group_dict));
743   }
744   dict.Set("groups", std::move(all_groups_dict));
745   return base::Value(std::move(dict));
746 }
747 
HasActiveSocket(const GroupId & group_id) const748 bool TransportClientSocketPool::HasActiveSocket(const GroupId& group_id) const {
749   return HasGroup(group_id);
750 }
751 
IsUsable(const char ** net_log_reason_utf8) const752 bool TransportClientSocketPool::IdleSocket::IsUsable(
753     const char** net_log_reason_utf8) const {
754   DCHECK(net_log_reason_utf8);
755   if (socket->WasEverUsed()) {
756     if (!socket->IsConnectedAndIdle()) {
757       if (!socket->IsConnected()) {
758         *net_log_reason_utf8 = kRemoteSideClosedConnection;
759       } else {
760         *net_log_reason_utf8 = kDataReceivedUnexpectedly;
761       }
762       return false;
763     }
764     return true;
765   }
766 
767   if (!socket->IsConnected()) {
768     *net_log_reason_utf8 = kRemoteSideClosedConnection;
769     return false;
770   }
771   return true;
772 }
773 
TransportClientSocketPool(int max_sockets,int max_sockets_per_group,base::TimeDelta unused_idle_socket_timeout,base::TimeDelta used_idle_socket_timeout,const ProxyChain & proxy_chain,bool is_for_websockets,const CommonConnectJobParams * common_connect_job_params,bool cleanup_on_ip_address_change,std::unique_ptr<ConnectJobFactory> connect_job_factory,SSLClientContext * ssl_client_context,bool connect_backup_jobs_enabled)774 TransportClientSocketPool::TransportClientSocketPool(
775     int max_sockets,
776     int max_sockets_per_group,
777     base::TimeDelta unused_idle_socket_timeout,
778     base::TimeDelta used_idle_socket_timeout,
779     const ProxyChain& proxy_chain,
780     bool is_for_websockets,
781     const CommonConnectJobParams* common_connect_job_params,
782     bool cleanup_on_ip_address_change,
783     std::unique_ptr<ConnectJobFactory> connect_job_factory,
784     SSLClientContext* ssl_client_context,
785     bool connect_backup_jobs_enabled)
786     : ClientSocketPool(is_for_websockets,
787                        common_connect_job_params,
788                        std::move(connect_job_factory)),
789       max_sockets_(max_sockets),
790       max_sockets_per_group_(max_sockets_per_group),
791       unused_idle_socket_timeout_(unused_idle_socket_timeout),
792       used_idle_socket_timeout_(used_idle_socket_timeout),
793       proxy_chain_(proxy_chain),
794       cleanup_on_ip_address_change_(cleanup_on_ip_address_change),
795       connect_backup_jobs_enabled_(connect_backup_jobs_enabled &&
796                                    g_connect_backup_jobs_enabled),
797       ssl_client_context_(ssl_client_context) {
798   DCHECK_LE(0, max_sockets_per_group);
799   DCHECK_LE(max_sockets_per_group, max_sockets);
800 
801   if (cleanup_on_ip_address_change_)
802     NetworkChangeNotifier::AddIPAddressObserver(this);
803 
804   if (ssl_client_context_)
805     ssl_client_context_->AddObserver(this);
806 }
807 
OnSSLConfigChanged(SSLClientContext::SSLConfigChangeType change_type)808 void TransportClientSocketPool::OnSSLConfigChanged(
809     SSLClientContext::SSLConfigChangeType change_type) {
810   const char* message = nullptr;
811   // When the SSL config or cert verifier config changes, flush all idle
812   // sockets so they won't get re-used, and allow any active sockets to finish,
813   // but don't put them back in the socket pool.
814   switch (change_type) {
815     case SSLClientContext::SSLConfigChangeType::kSSLConfigChanged:
816       message = kNetworkChanged;
817       break;
818     case SSLClientContext::SSLConfigChangeType::kCertDatabaseChanged:
819       message = kCertDatabaseChanged;
820       break;
821     case SSLClientContext::SSLConfigChangeType::kCertVerifierChanged:
822       message = kCertVerifierChanged;
823       break;
824   };
825 
826   base::TimeTicks now = base::TimeTicks::Now();
827   for (auto it = group_map_.begin(); it != group_map_.end();) {
828     it = RefreshGroup(it, now, message);
829   }
830   CheckForStalledSocketGroups();
831 }
832 
833 // TODO(crbug.com/1206799): Get `server` as SchemeHostPort?
OnSSLConfigForServersChanged(const base::flat_set<HostPortPair> & servers)834 void TransportClientSocketPool::OnSSLConfigForServersChanged(
835     const base::flat_set<HostPortPair>& servers) {
836   // Current time value. Retrieving it once at the function start rather than
837   // inside the inner loop, since it shouldn't change by any meaningful amount.
838   //
839   // TODO(davidben): This value is not actually needed because
840   // CleanupIdleSocketsInGroup() is called with |force| = true. Tidy up
841   // interfaces so the parameter is not necessary.
842   base::TimeTicks now = base::TimeTicks::Now();
843 
844   // If the proxy chain includes a server from `servers` and uses SSL settings
845   // (HTTPS or QUIC), refresh every group.
846   bool proxy_matches = false;
847   for (const ProxyServer& proxy_server : proxy_chain_.proxy_servers()) {
848     if (proxy_server.is_secure_http_like() &&
849         servers.contains(proxy_server.host_port_pair())) {
850       proxy_matches = true;
851     }
852   }
853 
854   bool refreshed_any = false;
855   for (auto it = group_map_.begin(); it != group_map_.end();) {
856     if (proxy_matches ||
857         (GURL::SchemeIsCryptographic(it->first.destination().scheme()) &&
858          servers.contains(
859              HostPortPair::FromSchemeHostPort(it->first.destination())))) {
860       refreshed_any = true;
861       // Note this call may destroy the group and invalidate |to_refresh|.
862       it = RefreshGroup(it, now, kSslConfigChanged);
863     } else {
864       ++it;
865     }
866   }
867 
868   if (refreshed_any) {
869     // Check to see if any group can use the freed up socket slots. It would be
870     // more efficient to give the slots to the refreshed groups, if the still
871     // exists and need them, but this should be rare enough that it doesn't
872     // matter. This will also make sure the slots are given to the group with
873     // the highest priority request without an assigned ConnectJob.
874     CheckForStalledSocketGroups();
875   }
876 }
877 
HasGroup(const GroupId & group_id) const878 bool TransportClientSocketPool::HasGroup(const GroupId& group_id) const {
879   return base::Contains(group_map_, group_id);
880 }
881 
CleanupIdleSockets(bool force,const char * net_log_reason_utf8)882 void TransportClientSocketPool::CleanupIdleSockets(
883     bool force,
884     const char* net_log_reason_utf8) {
885   if (idle_socket_count_ == 0)
886     return;
887 
888   // Current time value. Retrieving it once at the function start rather than
889   // inside the inner loop, since it shouldn't change by any meaningful amount.
890   base::TimeTicks now = base::TimeTicks::Now();
891 
892   for (auto i = group_map_.begin(); i != group_map_.end();) {
893     Group* group = i->second;
894     CHECK(group);
895     CleanupIdleSocketsInGroup(force, group, now, net_log_reason_utf8);
896     // Delete group if no longer needed.
897     if (group->IsEmpty()) {
898       i = RemoveGroup(i);
899     } else {
900       ++i;
901     }
902   }
903 }
904 
CloseOneIdleSocket()905 bool TransportClientSocketPool::CloseOneIdleSocket() {
906   if (idle_socket_count_ == 0)
907     return false;
908   return CloseOneIdleSocketExceptInGroup(nullptr);
909 }
910 
CloseOneIdleConnectionInHigherLayeredPool()911 bool TransportClientSocketPool::CloseOneIdleConnectionInHigherLayeredPool() {
912   // This pool doesn't have any idle sockets. It's possible that a pool at a
913   // higher layer is holding one of this sockets active, but it's actually idle.
914   // Query the higher layers.
915   for (HigherLayeredPool* higher_pool : higher_pools_) {
916     if (higher_pool->CloseOneIdleConnection())
917       return true;
918   }
919   return false;
920 }
921 
CleanupIdleSocketsInGroup(bool force,Group * group,const base::TimeTicks & now,const char * net_log_reason_utf8)922 void TransportClientSocketPool::CleanupIdleSocketsInGroup(
923     bool force,
924     Group* group,
925     const base::TimeTicks& now,
926     const char* net_log_reason_utf8) {
927   // If |force| is true, a reason must be provided.
928   DCHECK(!force || net_log_reason_utf8);
929 
930   auto idle_socket_it = group->mutable_idle_sockets()->begin();
931   while (idle_socket_it != group->idle_sockets().end()) {
932     bool should_clean_up = force;
933     const char* reason_for_closing_socket = net_log_reason_utf8;
934     base::TimeDelta timeout = idle_socket_it->socket->WasEverUsed()
935                                   ? used_idle_socket_timeout_
936                                   : unused_idle_socket_timeout_;
937 
938     // Timeout errors take precedence over the reason for flushing sockets in
939     // the group, if applicable.
940     if (now - idle_socket_it->start_time >= timeout) {
941       should_clean_up = true;
942       reason_for_closing_socket = kIdleTimeLimitExpired;
943     }
944 
945     // Usability errors take precedence over over other errors.
946     if (!idle_socket_it->IsUsable(&reason_for_closing_socket))
947       should_clean_up = true;
948 
949     if (should_clean_up) {
950       DCHECK(reason_for_closing_socket);
951       idle_socket_it->socket->NetLog().AddEventWithStringParams(
952           NetLogEventType::SOCKET_POOL_CLOSING_SOCKET, "reason",
953           reason_for_closing_socket);
954       idle_socket_it = group->mutable_idle_sockets()->erase(idle_socket_it);
955       DecrementIdleCount();
956     } else {
957       DCHECK(!reason_for_closing_socket);
958       ++idle_socket_it;
959     }
960   }
961 }
962 
GetOrCreateGroup(const GroupId & group_id)963 TransportClientSocketPool::Group* TransportClientSocketPool::GetOrCreateGroup(
964     const GroupId& group_id) {
965   auto it = group_map_.find(group_id);
966   if (it != group_map_.end())
967     return it->second;
968   Group* group = new Group(group_id, this);
969   group_map_[group_id] = group;
970   return group;
971 }
972 
RemoveGroup(const GroupId & group_id)973 void TransportClientSocketPool::RemoveGroup(const GroupId& group_id) {
974   auto it = group_map_.find(group_id);
975   CHECK(it != group_map_.end());
976 
977   RemoveGroup(it);
978 }
979 
980 TransportClientSocketPool::GroupMap::iterator
RemoveGroup(GroupMap::iterator it)981 TransportClientSocketPool::RemoveGroup(GroupMap::iterator it) {
982   delete it->second;
983   return group_map_.erase(it);
984 }
985 
986 // static
connect_backup_jobs_enabled()987 bool TransportClientSocketPool::connect_backup_jobs_enabled() {
988   return g_connect_backup_jobs_enabled;
989 }
990 
991 // static
set_connect_backup_jobs_enabled(bool enabled)992 bool TransportClientSocketPool::set_connect_backup_jobs_enabled(bool enabled) {
993   bool old_value = g_connect_backup_jobs_enabled;
994   g_connect_backup_jobs_enabled = enabled;
995   return old_value;
996 }
997 
IncrementIdleCount()998 void TransportClientSocketPool::IncrementIdleCount() {
999   ++idle_socket_count_;
1000 }
1001 
DecrementIdleCount()1002 void TransportClientSocketPool::DecrementIdleCount() {
1003   --idle_socket_count_;
1004 }
1005 
ReleaseSocket(const GroupId & group_id,std::unique_ptr<StreamSocket> socket,int64_t group_generation)1006 void TransportClientSocketPool::ReleaseSocket(
1007     const GroupId& group_id,
1008     std::unique_ptr<StreamSocket> socket,
1009     int64_t group_generation) {
1010   auto i = group_map_.find(group_id);
1011   CHECK(i != group_map_.end());
1012 
1013   Group* group = i->second;
1014   CHECK(group);
1015 
1016   CHECK_GT(handed_out_socket_count_, 0);
1017   handed_out_socket_count_--;
1018 
1019   CHECK_GT(group->active_socket_count(), 0);
1020   group->DecrementActiveSocketCount();
1021 
1022   bool can_resuse_socket = false;
1023   std::string_view not_reusable_reason;
1024   if (!socket->IsConnectedAndIdle()) {
1025     if (!socket->IsConnected()) {
1026       not_reusable_reason = kClosedConnectionReturnedToPool;
1027     } else {
1028       not_reusable_reason = kDataReceivedUnexpectedly;
1029     }
1030   } else if (group_generation != group->generation()) {
1031     not_reusable_reason = kSocketGenerationOutOfDate;
1032   } else {
1033     can_resuse_socket = true;
1034   }
1035 
1036   if (can_resuse_socket) {
1037     DCHECK(not_reusable_reason.empty());
1038 
1039     // Add it to the idle list.
1040     AddIdleSocket(std::move(socket), group);
1041     OnAvailableSocketSlot(group_id, group);
1042   } else {
1043     DCHECK(!not_reusable_reason.empty());
1044 
1045     socket->NetLog().AddEventWithStringParams(
1046         NetLogEventType::SOCKET_POOL_CLOSING_SOCKET, "reason",
1047         not_reusable_reason);
1048     if (group->IsEmpty())
1049       RemoveGroup(i);
1050     socket.reset();
1051   }
1052 
1053   CheckForStalledSocketGroups();
1054 }
1055 
CheckForStalledSocketGroups()1056 void TransportClientSocketPool::CheckForStalledSocketGroups() {
1057   // Loop until there's nothing more to do.
1058   while (true) {
1059     // If we have idle sockets, see if we can give one to the top-stalled group.
1060     GroupId top_group_id;
1061     Group* top_group = nullptr;
1062     if (!FindTopStalledGroup(&top_group, &top_group_id))
1063       return;
1064 
1065     if (ReachedMaxSocketsLimit()) {
1066       if (idle_socket_count_ > 0) {
1067         CloseOneIdleSocket();
1068       } else {
1069         // We can't activate more sockets since we're already at our global
1070         // limit.
1071         return;
1072       }
1073     }
1074 
1075     // Note that this may delete top_group.
1076     OnAvailableSocketSlot(top_group_id, top_group);
1077   }
1078 }
1079 
1080 // Search for the highest priority pending request, amongst the groups that
1081 // are not at the |max_sockets_per_group_| limit. Note: for requests with
1082 // the same priority, the winner is based on group hash ordering (and not
1083 // insertion order).
FindTopStalledGroup(Group ** group,GroupId * group_id) const1084 bool TransportClientSocketPool::FindTopStalledGroup(Group** group,
1085                                                     GroupId* group_id) const {
1086   CHECK((group && group_id) || (!group && !group_id));
1087   Group* top_group = nullptr;
1088   const GroupId* top_group_id = nullptr;
1089   bool has_stalled_group = false;
1090   for (const auto& it : group_map_) {
1091     Group* curr_group = it.second;
1092     if (!curr_group->has_unbound_requests())
1093       continue;
1094     if (curr_group->CanUseAdditionalSocketSlot(max_sockets_per_group_)) {
1095       if (!group)
1096         return true;
1097       has_stalled_group = true;
1098       bool has_higher_priority =
1099           !top_group ||
1100           curr_group->TopPendingPriority() > top_group->TopPendingPriority();
1101       if (has_higher_priority) {
1102         top_group = curr_group;
1103         top_group_id = &it.first;
1104       }
1105     }
1106   }
1107 
1108   if (top_group) {
1109     CHECK(group);
1110     *group = top_group;
1111     *group_id = *top_group_id;
1112   } else {
1113     CHECK(!has_stalled_group);
1114   }
1115   return has_stalled_group;
1116 }
1117 
OnIPAddressChanged()1118 void TransportClientSocketPool::OnIPAddressChanged() {
1119   DCHECK(cleanup_on_ip_address_change_);
1120   FlushWithError(ERR_NETWORK_CHANGED, kNetworkChanged);
1121 }
1122 
FlushWithError(int error,const char * net_log_reason_utf8)1123 void TransportClientSocketPool::FlushWithError(
1124     int error,
1125     const char* net_log_reason_utf8) {
1126   CancelAllConnectJobs();
1127   CloseIdleSockets(net_log_reason_utf8);
1128   CancelAllRequestsWithError(error);
1129   for (const auto& group : group_map_) {
1130     group.second->IncrementGeneration();
1131   }
1132 }
1133 
RemoveConnectJob(ConnectJob * job,Group * group)1134 void TransportClientSocketPool::RemoveConnectJob(ConnectJob* job,
1135                                                  Group* group) {
1136   CHECK_GT(connecting_socket_count_, 0);
1137   connecting_socket_count_--;
1138 
1139   DCHECK(group);
1140   group->RemoveUnboundJob(job);
1141 }
1142 
OnAvailableSocketSlot(const GroupId & group_id,Group * group)1143 void TransportClientSocketPool::OnAvailableSocketSlot(const GroupId& group_id,
1144                                                       Group* group) {
1145   DCHECK(base::Contains(group_map_, group_id));
1146   if (group->IsEmpty()) {
1147     RemoveGroup(group_id);
1148   } else if (group->has_unbound_requests()) {
1149     ProcessPendingRequest(group_id, group);
1150   }
1151 }
1152 
ProcessPendingRequest(const GroupId & group_id,Group * group)1153 void TransportClientSocketPool::ProcessPendingRequest(const GroupId& group_id,
1154                                                       Group* group) {
1155   const Request* next_request = group->GetNextUnboundRequest();
1156   DCHECK(next_request);
1157 
1158   // If the group has no idle sockets, and can't make use of an additional slot,
1159   // either because it's at the limit or because it's at the socket per group
1160   // limit, then there's nothing to do.
1161   if (group->idle_sockets().empty() &&
1162       !group->CanUseAdditionalSocketSlot(max_sockets_per_group_)) {
1163     return;
1164   }
1165 
1166   int rv =
1167       RequestSocketInternal(group_id, *next_request,
1168                             /*preconnect_done_closure=*/base::OnceClosure());
1169   if (rv != ERR_IO_PENDING) {
1170     std::unique_ptr<Request> request = group->PopNextUnboundRequest();
1171     DCHECK(request);
1172     if (group->IsEmpty())
1173       RemoveGroup(group_id);
1174 
1175     request->net_log().EndEventWithNetErrorCode(NetLogEventType::SOCKET_POOL,
1176                                                 rv);
1177     InvokeUserCallbackLater(request->handle(), request->release_callback(), rv,
1178                             request->socket_tag());
1179   }
1180 }
1181 
HandOutSocket(std::unique_ptr<StreamSocket> socket,ClientSocketHandle::SocketReuseType reuse_type,const LoadTimingInfo::ConnectTiming & connect_timing,ClientSocketHandle * handle,base::TimeDelta idle_time,Group * group,const NetLogWithSource & net_log)1182 void TransportClientSocketPool::HandOutSocket(
1183     std::unique_ptr<StreamSocket> socket,
1184     ClientSocketHandle::SocketReuseType reuse_type,
1185     const LoadTimingInfo::ConnectTiming& connect_timing,
1186     ClientSocketHandle* handle,
1187     base::TimeDelta idle_time,
1188     Group* group,
1189     const NetLogWithSource& net_log) {
1190   DCHECK(socket);
1191   handle->SetSocket(std::move(socket));
1192   handle->set_reuse_type(reuse_type);
1193   handle->set_idle_time(idle_time);
1194   handle->set_group_generation(group->generation());
1195   handle->set_connect_timing(connect_timing);
1196 
1197   if (reuse_type == ClientSocketHandle::REUSED_IDLE) {
1198     net_log.AddEventWithIntParams(
1199         NetLogEventType::SOCKET_POOL_REUSED_AN_EXISTING_SOCKET, "idle_ms",
1200         static_cast<int>(idle_time.InMilliseconds()));
1201   }
1202 
1203   net_log.AddEventReferencingSource(
1204       NetLogEventType::SOCKET_POOL_BOUND_TO_SOCKET,
1205       handle->socket()->NetLog().source());
1206 
1207   handed_out_socket_count_++;
1208   group->IncrementActiveSocketCount();
1209 }
1210 
AddIdleSocket(std::unique_ptr<StreamSocket> socket,Group * group)1211 void TransportClientSocketPool::AddIdleSocket(
1212     std::unique_ptr<StreamSocket> socket,
1213     Group* group) {
1214   DCHECK(socket);
1215   IdleSocket idle_socket;
1216   idle_socket.socket = std::move(socket);
1217   idle_socket.start_time = base::TimeTicks::Now();
1218 
1219   group->mutable_idle_sockets()->push_back(std::move(idle_socket));
1220   IncrementIdleCount();
1221 }
1222 
CancelAllConnectJobs()1223 void TransportClientSocketPool::CancelAllConnectJobs() {
1224   for (auto i = group_map_.begin(); i != group_map_.end();) {
1225     Group* group = i->second;
1226     CHECK(group);
1227     connecting_socket_count_ -= group->jobs().size();
1228     group->RemoveAllUnboundJobs();
1229 
1230     // Delete group if no longer needed.
1231     if (group->IsEmpty()) {
1232       i = RemoveGroup(i);
1233     } else {
1234       ++i;
1235     }
1236   }
1237 }
1238 
CancelAllRequestsWithError(int error)1239 void TransportClientSocketPool::CancelAllRequestsWithError(int error) {
1240   for (auto i = group_map_.begin(); i != group_map_.end();) {
1241     Group* group = i->second;
1242     CHECK(group);
1243 
1244     while (true) {
1245       std::unique_ptr<Request> request = group->PopNextUnboundRequest();
1246       if (!request)
1247         break;
1248       InvokeUserCallbackLater(request->handle(), request->release_callback(),
1249                               error, request->socket_tag());
1250     }
1251 
1252     // Mark bound connect jobs as needing to fail. Can't fail them immediately
1253     // because they may have access to objects owned by the ConnectJob, and
1254     // could access them if a user callback invocation is queued. It would also
1255     // result in the consumer handling two messages at once, which in general
1256     // isn't safe for a lot of code.
1257     group->SetPendingErrorForAllBoundRequests(error);
1258 
1259     // Delete group if no longer needed.
1260     if (group->IsEmpty()) {
1261       i = RemoveGroup(i);
1262     } else {
1263       ++i;
1264     }
1265   }
1266 }
1267 
ReachedMaxSocketsLimit() const1268 bool TransportClientSocketPool::ReachedMaxSocketsLimit() const {
1269   // Each connecting socket will eventually connect and be handed out.
1270   int total =
1271       handed_out_socket_count_ + connecting_socket_count_ + idle_socket_count_;
1272   // There can be more sockets than the limit since some requests can ignore
1273   // the limit
1274   if (total < max_sockets_)
1275     return false;
1276   return true;
1277 }
1278 
CloseOneIdleSocketExceptInGroup(const Group * exception_group)1279 bool TransportClientSocketPool::CloseOneIdleSocketExceptInGroup(
1280     const Group* exception_group) {
1281   CHECK_GT(idle_socket_count_, 0);
1282 
1283   for (auto i = group_map_.begin(); i != group_map_.end(); ++i) {
1284     Group* group = i->second;
1285     CHECK(group);
1286     if (exception_group == group)
1287       continue;
1288     std::list<IdleSocket>* idle_sockets = group->mutable_idle_sockets();
1289 
1290     if (!idle_sockets->empty()) {
1291       idle_sockets->pop_front();
1292       DecrementIdleCount();
1293       if (group->IsEmpty())
1294         RemoveGroup(i);
1295 
1296       return true;
1297     }
1298   }
1299 
1300   return false;
1301 }
1302 
OnConnectJobComplete(Group * group,int result,ConnectJob * job)1303 void TransportClientSocketPool::OnConnectJobComplete(Group* group,
1304                                                      int result,
1305                                                      ConnectJob* job) {
1306   DCHECK_NE(ERR_IO_PENDING, result);
1307   DCHECK(group_map_.find(group->group_id()) != group_map_.end());
1308   DCHECK_EQ(group, group_map_[group->group_id()]);
1309   DCHECK(result != OK || job->socket() != nullptr);
1310 
1311   // Check if the ConnectJob is already bound to a Request. If so, result is
1312   // returned to that specific request.
1313   std::optional<Group::BoundRequest> bound_request =
1314       group->FindAndRemoveBoundRequestForConnectJob(job);
1315   Request* request = nullptr;
1316   std::unique_ptr<Request> owned_request;
1317   if (bound_request) {
1318     --connecting_socket_count_;
1319 
1320     // If the socket pools were previously flushed with an error, return that
1321     // error to the bound request and discard the socket.
1322     if (bound_request->pending_error != OK) {
1323       InvokeUserCallbackLater(bound_request->request->handle(),
1324                               bound_request->request->release_callback(),
1325                               bound_request->pending_error,
1326                               bound_request->request->socket_tag());
1327       bound_request->request->net_log().EndEventWithNetErrorCode(
1328           NetLogEventType::SOCKET_POOL, bound_request->pending_error);
1329       OnAvailableSocketSlot(group->group_id(), group);
1330       CheckForStalledSocketGroups();
1331       return;
1332     }
1333 
1334     // If the ConnectJob is from a previous generation, add the request back to
1335     // the group, and kick off another request. The socket will be discarded.
1336     if (bound_request->generation != group->generation()) {
1337       group->InsertUnboundRequest(std::move(bound_request->request));
1338       OnAvailableSocketSlot(group->group_id(), group);
1339       CheckForStalledSocketGroups();
1340       return;
1341     }
1342 
1343     request = bound_request->request.get();
1344   } else {
1345     // In this case, RemoveConnectJob(job, _) must be called before exiting this
1346     // method. Otherwise, |job| will be leaked.
1347     owned_request = group->PopNextUnboundRequest();
1348     request = owned_request.get();
1349 
1350     if (!request) {
1351       if (result == OK)
1352         AddIdleSocket(job->PassSocket(), group);
1353       RemoveConnectJob(job, group);
1354       OnAvailableSocketSlot(group->group_id(), group);
1355       CheckForStalledSocketGroups();
1356       return;
1357     }
1358 
1359     LogBoundConnectJobToRequest(job->net_log().source(), *request);
1360   }
1361 
1362   // The case where there's no request is handled above.
1363   DCHECK(request);
1364 
1365   if (result != OK)
1366     request->handle()->SetAdditionalErrorState(job);
1367   if (job->socket()) {
1368     HandOutSocket(job->PassSocket(), ClientSocketHandle::UNUSED,
1369                   job->connect_timing(), request->handle(), base::TimeDelta(),
1370                   group, request->net_log());
1371   }
1372   request->net_log().EndEventWithNetErrorCode(NetLogEventType::SOCKET_POOL,
1373                                               result);
1374   InvokeUserCallbackLater(request->handle(), request->release_callback(),
1375                           result, request->socket_tag());
1376   if (!bound_request)
1377     RemoveConnectJob(job, group);
1378   // If no socket was handed out, there's a new socket slot available.
1379   if (!request->handle()->socket()) {
1380     OnAvailableSocketSlot(group->group_id(), group);
1381     CheckForStalledSocketGroups();
1382   }
1383 }
1384 
OnNeedsProxyAuth(Group * group,const HttpResponseInfo & response,HttpAuthController * auth_controller,base::OnceClosure restart_with_auth_callback,ConnectJob * job)1385 void TransportClientSocketPool::OnNeedsProxyAuth(
1386     Group* group,
1387     const HttpResponseInfo& response,
1388     HttpAuthController* auth_controller,
1389     base::OnceClosure restart_with_auth_callback,
1390     ConnectJob* job) {
1391   DCHECK(group_map_.find(group->group_id()) != group_map_.end());
1392   DCHECK_EQ(group, group_map_[group->group_id()]);
1393 
1394   const Request* request = group->BindRequestToConnectJob(job);
1395   // If can't bind the ConnectJob to a request, treat this as a ConnectJob
1396   // failure.
1397   if (!request) {
1398     OnConnectJobComplete(group, ERR_PROXY_AUTH_REQUESTED, job);
1399     return;
1400   }
1401 
1402   request->proxy_auth_callback().Run(response, auth_controller,
1403                                      std::move(restart_with_auth_callback));
1404 }
1405 
InvokeUserCallbackLater(ClientSocketHandle * handle,CompletionOnceCallback callback,int rv,const SocketTag & socket_tag)1406 void TransportClientSocketPool::InvokeUserCallbackLater(
1407     ClientSocketHandle* handle,
1408     CompletionOnceCallback callback,
1409     int rv,
1410     const SocketTag& socket_tag) {
1411   CHECK(!base::Contains(pending_callback_map_, handle));
1412   pending_callback_map_[handle] = CallbackResultPair(std::move(callback), rv);
1413   if (rv == OK) {
1414     handle->socket()->ApplySocketTag(socket_tag);
1415   }
1416   base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
1417       FROM_HERE, base::BindOnce(&TransportClientSocketPool::InvokeUserCallback,
1418                                 weak_factory_.GetWeakPtr(),
1419                                 // This is safe as `handle` is checked against a
1420                                 // map to verify it's alive before dereference.
1421                                 // This code path must only be reachable by
1422                                 // `handle`s that have had Init called.
1423                                 base::UnsafeDangling(handle)));
1424 }
1425 
InvokeUserCallback(MayBeDangling<ClientSocketHandle> handle)1426 void TransportClientSocketPool::InvokeUserCallback(
1427     MayBeDangling<ClientSocketHandle> handle) {
1428   auto it = pending_callback_map_.find(handle);
1429 
1430   // Exit if the request has already been cancelled.
1431   if (it == pending_callback_map_.end())
1432     return;
1433 
1434   CHECK(!handle->is_initialized());
1435   CompletionOnceCallback callback = std::move(it->second.callback);
1436   int result = it->second.result;
1437   pending_callback_map_.erase(it);
1438   std::move(callback).Run(result);
1439 }
1440 
TryToCloseSocketsInLayeredPools()1441 void TransportClientSocketPool::TryToCloseSocketsInLayeredPools() {
1442   while (IsStalled()) {
1443     // Closing a socket will result in calling back into |this| to use the freed
1444     // socket slot, so nothing else is needed.
1445     if (!CloseOneIdleConnectionInHigherLayeredPool())
1446       return;
1447   }
1448 }
1449 
1450 TransportClientSocketPool::GroupMap::iterator
RefreshGroup(GroupMap::iterator it,const base::TimeTicks & now,const char * net_log_reason_utf8)1451 TransportClientSocketPool::RefreshGroup(GroupMap::iterator it,
1452                                         const base::TimeTicks& now,
1453                                         const char* net_log_reason_utf8) {
1454   Group* group = it->second;
1455   CHECK(group);
1456   CleanupIdleSocketsInGroup(true /* force */, group, now, net_log_reason_utf8);
1457 
1458   connecting_socket_count_ -= group->jobs().size();
1459   group->RemoveAllUnboundJobs();
1460 
1461   // Otherwise, prevent reuse of existing sockets.
1462   group->IncrementGeneration();
1463 
1464   // Delete group if no longer needed.
1465   if (group->IsEmpty()) {
1466     return RemoveGroup(it);
1467   }
1468   return ++it;
1469 }
1470 
Group(const GroupId & group_id,TransportClientSocketPool * client_socket_pool)1471 TransportClientSocketPool::Group::Group(
1472     const GroupId& group_id,
1473     TransportClientSocketPool* client_socket_pool)
1474     : group_id_(group_id),
1475       client_socket_pool_(client_socket_pool),
1476       unbound_requests_(NUM_PRIORITIES) {}
1477 
~Group()1478 TransportClientSocketPool::Group::~Group() {
1479   DCHECK_EQ(0u, never_assigned_job_count());
1480   DCHECK_EQ(0u, unassigned_job_count());
1481   DCHECK(unbound_requests_.empty());
1482   DCHECK(jobs_.empty());
1483   DCHECK(bound_requests_.empty());
1484 }
1485 
OnConnectJobComplete(int result,ConnectJob * job)1486 void TransportClientSocketPool::Group::OnConnectJobComplete(int result,
1487                                                             ConnectJob* job) {
1488   DCHECK_NE(ERR_IO_PENDING, result);
1489   client_socket_pool_->OnConnectJobComplete(this, result, job);
1490 }
1491 
OnNeedsProxyAuth(const HttpResponseInfo & response,HttpAuthController * auth_controller,base::OnceClosure restart_with_auth_callback,ConnectJob * job)1492 void TransportClientSocketPool::Group::OnNeedsProxyAuth(
1493     const HttpResponseInfo& response,
1494     HttpAuthController* auth_controller,
1495     base::OnceClosure restart_with_auth_callback,
1496     ConnectJob* job) {
1497   client_socket_pool_->OnNeedsProxyAuth(this, response, auth_controller,
1498                                         std::move(restart_with_auth_callback),
1499                                         job);
1500 }
1501 
StartBackupJobTimer(const GroupId & group_id)1502 void TransportClientSocketPool::Group::StartBackupJobTimer(
1503     const GroupId& group_id) {
1504   // Only allow one timer to run at a time.
1505   if (BackupJobTimerIsRunning())
1506     return;
1507 
1508   // Unretained here is okay because |backup_job_timer_| is
1509   // automatically cancelled when it's destroyed.
1510   backup_job_timer_.Start(FROM_HERE,
1511                           client_socket_pool_->ConnectRetryInterval(),
1512                           base::BindOnce(&Group::OnBackupJobTimerFired,
1513                                          base::Unretained(this), group_id));
1514 }
1515 
BackupJobTimerIsRunning() const1516 bool TransportClientSocketPool::Group::BackupJobTimerIsRunning() const {
1517   return backup_job_timer_.IsRunning();
1518 }
1519 
TryToUseNeverAssignedConnectJob()1520 bool TransportClientSocketPool::Group::TryToUseNeverAssignedConnectJob() {
1521   SanityCheck();
1522 
1523   if (never_assigned_job_count_ == 0)
1524     return false;
1525   --never_assigned_job_count_;
1526   return true;
1527 }
1528 
AddJob(std::unique_ptr<ConnectJob> job,bool is_preconnect)1529 void TransportClientSocketPool::Group::AddJob(std::unique_ptr<ConnectJob> job,
1530                                               bool is_preconnect) {
1531   SanityCheck();
1532 
1533   if (is_preconnect)
1534     ++never_assigned_job_count_;
1535   jobs_.push_back(std::move(job));
1536   TryToAssignUnassignedJob(jobs_.back().get());
1537 
1538   SanityCheck();
1539 }
1540 
RemoveUnboundJob(ConnectJob * job)1541 std::unique_ptr<ConnectJob> TransportClientSocketPool::Group::RemoveUnboundJob(
1542     ConnectJob* job) {
1543   SanityCheck();
1544 
1545   // Check that |job| is in the list.
1546   auto it = base::ranges::find(jobs_, job, &std::unique_ptr<ConnectJob>::get);
1547   DCHECK(it != jobs_.end());
1548 
1549   // Check if |job| is in the unassigned jobs list. If so, remove it.
1550   auto it2 = base::ranges::find(unassigned_jobs_, job);
1551   if (it2 != unassigned_jobs_.end()) {
1552     unassigned_jobs_.erase(it2);
1553   } else {
1554     // Otherwise, |job| must be assigned to some Request. Unassign it, then
1555     // try to replace it with another job if possible (either by taking an
1556     // unassigned job or stealing from another request, if any requests after it
1557     // have a job).
1558     RequestQueue::Pointer request_with_job = FindUnboundRequestWithJob(job);
1559     DCHECK(!request_with_job.is_null());
1560     request_with_job.value()->ReleaseJob();
1561     TryToAssignJobToRequest(request_with_job);
1562   }
1563   std::unique_ptr<ConnectJob> owned_job = std::move(*it);
1564   jobs_.erase(it);
1565 
1566   size_t job_count = jobs_.size();
1567   if (job_count < never_assigned_job_count_)
1568     never_assigned_job_count_ = job_count;
1569 
1570   // If we've got no more jobs for this group, then we no longer need a
1571   // backup job either.
1572   if (jobs_.empty()) {
1573     DCHECK(unassigned_jobs_.empty());
1574     backup_job_timer_.Stop();
1575   }
1576 
1577   SanityCheck();
1578   return owned_job;
1579 }
1580 
OnBackupJobTimerFired(const GroupId & group_id)1581 void TransportClientSocketPool::Group::OnBackupJobTimerFired(
1582     const GroupId& group_id) {
1583   // If there are no more jobs pending, there is no work to do.
1584   // If we've done our cleanups correctly, this should not happen.
1585   if (jobs_.empty()) {
1586     NOTREACHED();
1587     return;
1588   }
1589 
1590   // If the old job has already established a connection, don't start a backup
1591   // job. Backup jobs are only for issues establishing the initial TCP
1592   // connection - the timeout they used is tuned for that, and tests expect that
1593   // behavior.
1594   //
1595   // TODO(https://crbug.com/929814): Replace both this and the
1596   // LOAD_STATE_RESOLVING_HOST check with a callback. Use the
1597   // LOAD_STATE_RESOLVING_HOST callback to start the timer (And invoke the
1598   // OnHostResolved callback of any pending requests), and the
1599   // HasEstablishedConnection() callback to stop the timer. That should result
1600   // in a more robust, testable API.
1601   if ((*jobs_.begin())->HasEstablishedConnection())
1602     return;
1603 
1604   // If our old job is waiting on DNS, or if we can't create any sockets
1605   // right now due to limits, just reset the timer.
1606   if (client_socket_pool_->ReachedMaxSocketsLimit() ||
1607       !HasAvailableSocketSlot(client_socket_pool_->max_sockets_per_group_) ||
1608       (*jobs_.begin())->GetLoadState() == LOAD_STATE_RESOLVING_HOST) {
1609     StartBackupJobTimer(group_id);
1610     return;
1611   }
1612 
1613   if (unbound_requests_.empty())
1614     return;
1615 
1616   Request* request = unbound_requests_.FirstMax().value().get();
1617   std::unique_ptr<ConnectJob> owned_backup_job =
1618       client_socket_pool_->CreateConnectJob(
1619           group_id, request->socket_params(), client_socket_pool_->proxy_chain_,
1620           request->proxy_annotation_tag(), request->priority(),
1621           request->socket_tag(), this);
1622   owned_backup_job->net_log().AddEvent(
1623       NetLogEventType::SOCKET_POOL_CONNECT_JOB_CREATED, [&] {
1624         return NetLogCreateConnectJobParams(true /* backup_job */, &group_id_);
1625       });
1626   ConnectJob* backup_job = owned_backup_job.get();
1627   AddJob(std::move(owned_backup_job), false);
1628   client_socket_pool_->connecting_socket_count_++;
1629   int rv = backup_job->Connect();
1630   if (rv != ERR_IO_PENDING) {
1631     client_socket_pool_->OnConnectJobComplete(this, rv, backup_job);
1632   }
1633 }
1634 
SanityCheck() const1635 void TransportClientSocketPool::Group::SanityCheck() const {
1636 #if DCHECK_IS_ON()
1637   DCHECK_LE(never_assigned_job_count(), jobs_.size());
1638   DCHECK_LE(unassigned_job_count(), jobs_.size());
1639 
1640   // Check that |unassigned_jobs_| is empty iff there are at least as many
1641   // requests as jobs.
1642   DCHECK_EQ(unassigned_jobs_.empty(), jobs_.size() <= unbound_requests_.size());
1643 
1644   size_t num_assigned_jobs = jobs_.size() - unassigned_jobs_.size();
1645 
1646   RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1647   for (size_t i = 0; i < unbound_requests_.size();
1648        ++i, pointer = unbound_requests_.GetNextTowardsLastMin(pointer)) {
1649     DCHECK(!pointer.is_null());
1650     DCHECK(pointer.value());
1651     // Check that the first |num_assigned_jobs| requests have valid job
1652     // assignments.
1653     if (i < num_assigned_jobs) {
1654       // The request has a job.
1655       ConnectJob* job = pointer.value()->job();
1656       DCHECK(job);
1657       // The request's job is not in |unassigned_jobs_|
1658       DCHECK(!base::Contains(unassigned_jobs_, job));
1659       // The request's job is in |jobs_|
1660       DCHECK(base::Contains(jobs_, job, &std::unique_ptr<ConnectJob>::get));
1661       // The same job is not assigned to any other request with a job.
1662       RequestQueue::Pointer pointer2 =
1663           unbound_requests_.GetNextTowardsLastMin(pointer);
1664       for (size_t j = i + 1; j < num_assigned_jobs;
1665            ++j, pointer2 = unbound_requests_.GetNextTowardsLastMin(pointer2)) {
1666         DCHECK(!pointer2.is_null());
1667         ConnectJob* job2 = pointer2.value()->job();
1668         DCHECK(job2);
1669         DCHECK_NE(job, job2);
1670       }
1671       DCHECK_EQ(pointer.value()->priority(), job->priority());
1672     } else {
1673       // Check that any subsequent requests do not have a job.
1674       DCHECK(!pointer.value()->job());
1675     }
1676   }
1677 
1678   for (auto it = unassigned_jobs_.begin(); it != unassigned_jobs_.end(); ++it) {
1679     // Check that all unassigned jobs are in |jobs_|
1680     ConnectJob* job = *it;
1681     DCHECK(base::Contains(jobs_, job, &std::unique_ptr<ConnectJob>::get));
1682     // Check that there are no duplicated entries in |unassigned_jobs_|
1683     for (auto it2 = std::next(it); it2 != unassigned_jobs_.end(); ++it2) {
1684       DCHECK_NE(job, *it2);
1685     }
1686 
1687     // Check that no |unassigned_jobs_| are in |bound_requests_|.
1688     DCHECK(!base::Contains(bound_requests_, job,
1689                            [](const BoundRequest& bound_request) {
1690                              return bound_request.connect_job.get();
1691                            }));
1692   }
1693 #endif
1694 }
1695 
RemoveAllUnboundJobs()1696 void TransportClientSocketPool::Group::RemoveAllUnboundJobs() {
1697   SanityCheck();
1698 
1699   // Remove jobs from any requests that have them.
1700   if (!unbound_requests_.empty()) {
1701     for (RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1702          !pointer.is_null() && pointer.value()->job();
1703          pointer = unbound_requests_.GetNextTowardsLastMin(pointer)) {
1704       pointer.value()->ReleaseJob();
1705     }
1706   }
1707   unassigned_jobs_.clear();
1708   never_assigned_job_count_ = 0;
1709 
1710   // Diagnostics check for crbug.com/1231248. `Group`s are deleted only on
1711   // removal from `TransportClientSocketPool::group_map_`, so if this check
1712   // fails, `this` has been deleted, likely through some reentrancy issue.
1713   CHECK(client_socket_pool_->HasGroup(group_id_));
1714 
1715   // Delete active jobs.
1716   jobs_.clear();
1717   // Stop backup job timer.
1718   backup_job_timer_.Stop();
1719 
1720   SanityCheck();
1721 }
1722 
ConnectJobCount() const1723 size_t TransportClientSocketPool::Group::ConnectJobCount() const {
1724   return bound_requests_.size() + jobs_.size();
1725 }
1726 
GetConnectJobForHandle(const ClientSocketHandle * handle) const1727 ConnectJob* TransportClientSocketPool::Group::GetConnectJobForHandle(
1728     const ClientSocketHandle* handle) const {
1729   // Search through bound requests for |handle|.
1730   for (const auto& bound_pair : bound_requests_) {
1731     if (handle == bound_pair.request->handle())
1732       return bound_pair.connect_job.get();
1733   }
1734 
1735   // Search through the unbound requests that have corresponding jobs for a
1736   // request with |handle|.
1737   for (RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1738        !pointer.is_null() && pointer.value()->job();
1739        pointer = unbound_requests_.GetNextTowardsLastMin(pointer)) {
1740     if (pointer.value()->handle() == handle)
1741       return pointer.value()->job();
1742   }
1743 
1744   return nullptr;
1745 }
1746 
InsertUnboundRequest(std::unique_ptr<Request> request)1747 void TransportClientSocketPool::Group::InsertUnboundRequest(
1748     std::unique_ptr<Request> request) {
1749   SanityCheck();
1750 
1751   // Should not have a job because it is not already in |unbound_requests_|
1752   DCHECK(!request->job());
1753   // This value must be cached before we release |request|.
1754   RequestPriority priority = request->priority();
1755 
1756   RequestQueue::Pointer new_position;
1757   if (request->respect_limits() == RespectLimits::DISABLED) {
1758     // Put requests with RespectLimits::DISABLED (which should have
1759     // priority == MAXIMUM_PRIORITY) ahead of other requests with
1760     // MAXIMUM_PRIORITY.
1761     DCHECK_EQ(priority, MAXIMUM_PRIORITY);
1762     new_position =
1763         unbound_requests_.InsertAtFront(std::move(request), priority);
1764   } else {
1765     new_position = unbound_requests_.Insert(std::move(request), priority);
1766   }
1767   DCHECK(!unbound_requests_.empty());
1768 
1769   TryToAssignJobToRequest(new_position);
1770 
1771   SanityCheck();
1772 }
1773 
1774 const TransportClientSocketPool::Request*
GetNextUnboundRequest() const1775 TransportClientSocketPool::Group::GetNextUnboundRequest() const {
1776   return unbound_requests_.empty() ? nullptr
1777                                    : unbound_requests_.FirstMax().value().get();
1778 }
1779 
1780 std::unique_ptr<TransportClientSocketPool::Request>
PopNextUnboundRequest()1781 TransportClientSocketPool::Group::PopNextUnboundRequest() {
1782   if (unbound_requests_.empty())
1783     return nullptr;
1784   return RemoveUnboundRequest(unbound_requests_.FirstMax());
1785 }
1786 
1787 std::unique_ptr<TransportClientSocketPool::Request>
FindAndRemoveUnboundRequest(ClientSocketHandle * handle)1788 TransportClientSocketPool::Group::FindAndRemoveUnboundRequest(
1789     ClientSocketHandle* handle) {
1790   for (RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1791        !pointer.is_null();
1792        pointer = unbound_requests_.GetNextTowardsLastMin(pointer)) {
1793     if (pointer.value()->handle() == handle) {
1794       DCHECK_EQ(static_cast<RequestPriority>(pointer.priority()),
1795                 pointer.value()->priority());
1796       std::unique_ptr<Request> request = RemoveUnboundRequest(pointer);
1797       return request;
1798     }
1799   }
1800   return nullptr;
1801 }
1802 
SetPendingErrorForAllBoundRequests(int pending_error)1803 void TransportClientSocketPool::Group::SetPendingErrorForAllBoundRequests(
1804     int pending_error) {
1805   for (auto& bound_request : bound_requests_) {
1806     // Earlier errors take precedence.
1807     if (bound_request.pending_error == OK)
1808       bound_request.pending_error = pending_error;
1809   }
1810 }
1811 
1812 const TransportClientSocketPool::Request*
BindRequestToConnectJob(ConnectJob * connect_job)1813 TransportClientSocketPool::Group::BindRequestToConnectJob(
1814     ConnectJob* connect_job) {
1815   // Check if |job| is already bound to a Request.
1816   for (const auto& bound_pair : bound_requests_) {
1817     if (bound_pair.connect_job.get() == connect_job)
1818       return bound_pair.request.get();
1819   }
1820 
1821   // If not, try to bind it to a Request.
1822   const Request* request = GetNextUnboundRequest();
1823   // If there are no pending requests, or the highest priority request has no
1824   // callback to handle auth challenges, return nullptr.
1825   if (!request || request->proxy_auth_callback().is_null())
1826     return nullptr;
1827 
1828   // Otherwise, bind the ConnectJob to the Request.
1829   std::unique_ptr<Request> owned_request = PopNextUnboundRequest();
1830   DCHECK_EQ(owned_request.get(), request);
1831   std::unique_ptr<ConnectJob> owned_connect_job = RemoveUnboundJob(connect_job);
1832   LogBoundConnectJobToRequest(owned_connect_job->net_log().source(), *request);
1833   bound_requests_.emplace_back(BoundRequest(
1834       std::move(owned_connect_job), std::move(owned_request), generation()));
1835   return request;
1836 }
1837 
1838 std::optional<TransportClientSocketPool::Group::BoundRequest>
FindAndRemoveBoundRequestForConnectJob(ConnectJob * connect_job)1839 TransportClientSocketPool::Group::FindAndRemoveBoundRequestForConnectJob(
1840     ConnectJob* connect_job) {
1841   for (auto bound_pair = bound_requests_.begin();
1842        bound_pair != bound_requests_.end(); ++bound_pair) {
1843     if (bound_pair->connect_job.get() != connect_job)
1844       continue;
1845     BoundRequest ret = std::move(*bound_pair);
1846     bound_requests_.erase(bound_pair);
1847     return std::move(ret);
1848   }
1849   return std::nullopt;
1850 }
1851 
1852 std::unique_ptr<TransportClientSocketPool::Request>
FindAndRemoveBoundRequest(ClientSocketHandle * client_socket_handle)1853 TransportClientSocketPool::Group::FindAndRemoveBoundRequest(
1854     ClientSocketHandle* client_socket_handle) {
1855   for (auto bound_pair = bound_requests_.begin();
1856        bound_pair != bound_requests_.end(); ++bound_pair) {
1857     if (bound_pair->request->handle() != client_socket_handle)
1858       continue;
1859     std::unique_ptr<Request> request = std::move(bound_pair->request);
1860     bound_requests_.erase(bound_pair);
1861     return request;
1862   }
1863   return nullptr;
1864 }
1865 
SetPriority(ClientSocketHandle * handle,RequestPriority priority)1866 void TransportClientSocketPool::Group::SetPriority(ClientSocketHandle* handle,
1867                                                    RequestPriority priority) {
1868   for (RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1869        !pointer.is_null();
1870        pointer = unbound_requests_.GetNextTowardsLastMin(pointer)) {
1871     if (pointer.value()->handle() == handle) {
1872       if (pointer.value()->priority() == priority)
1873         return;
1874 
1875       std::unique_ptr<Request> request = RemoveUnboundRequest(pointer);
1876 
1877       // Requests that ignore limits much be created and remain at the highest
1878       // priority, and should not be reprioritized.
1879       DCHECK_EQ(request->respect_limits(), RespectLimits::ENABLED);
1880 
1881       request->set_priority(priority);
1882       InsertUnboundRequest(std::move(request));
1883       return;
1884     }
1885   }
1886 
1887   // This function must be called with a valid ClientSocketHandle.
1888   NOTREACHED();
1889 }
1890 
RequestWithHandleHasJobForTesting(const ClientSocketHandle * handle) const1891 bool TransportClientSocketPool::Group::RequestWithHandleHasJobForTesting(
1892     const ClientSocketHandle* handle) const {
1893   SanityCheck();
1894   if (GetConnectJobForHandle(handle))
1895     return true;
1896 
1897   // There's no corresponding ConnectJob. Verify that the handle is at least
1898   // owned by a request.
1899   RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1900   for (size_t i = 0; i < unbound_requests_.size(); ++i) {
1901     if (pointer.value()->handle() == handle)
1902       return false;
1903     pointer = unbound_requests_.GetNextTowardsLastMin(pointer);
1904   }
1905   NOTREACHED();
1906   return false;
1907 }
1908 
BoundRequest()1909 TransportClientSocketPool::Group::BoundRequest::BoundRequest()
1910     : pending_error(OK) {}
1911 
BoundRequest(std::unique_ptr<ConnectJob> connect_job,std::unique_ptr<Request> request,int64_t generation)1912 TransportClientSocketPool::Group::BoundRequest::BoundRequest(
1913     std::unique_ptr<ConnectJob> connect_job,
1914     std::unique_ptr<Request> request,
1915     int64_t generation)
1916     : connect_job(std::move(connect_job)),
1917       request(std::move(request)),
1918       generation(generation),
1919       pending_error(OK) {}
1920 
1921 TransportClientSocketPool::Group::BoundRequest::BoundRequest(
1922     BoundRequest&& other) = default;
1923 
1924 TransportClientSocketPool::Group::BoundRequest&
1925 TransportClientSocketPool::Group::BoundRequest::operator=(
1926     BoundRequest&& other) = default;
1927 
1928 TransportClientSocketPool::Group::BoundRequest::~BoundRequest() = default;
1929 
1930 std::unique_ptr<TransportClientSocketPool::Request>
RemoveUnboundRequest(const RequestQueue::Pointer & pointer)1931 TransportClientSocketPool::Group::RemoveUnboundRequest(
1932     const RequestQueue::Pointer& pointer) {
1933   SanityCheck();
1934 
1935   std::unique_ptr<Request> request = unbound_requests_.Erase(pointer);
1936   if (request->job()) {
1937     TryToAssignUnassignedJob(request->ReleaseJob());
1938   }
1939   // If there are no more unbound requests, kill the backup timer.
1940   if (unbound_requests_.empty())
1941     backup_job_timer_.Stop();
1942 
1943   SanityCheck();
1944   return request;
1945 }
1946 
1947 TransportClientSocketPool::RequestQueue::Pointer
FindUnboundRequestWithJob(const ConnectJob * job) const1948 TransportClientSocketPool::Group::FindUnboundRequestWithJob(
1949     const ConnectJob* job) const {
1950   SanityCheck();
1951 
1952   for (RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1953        !pointer.is_null() && pointer.value()->job();
1954        pointer = unbound_requests_.GetNextTowardsLastMin(pointer)) {
1955     if (pointer.value()->job() == job)
1956       return pointer;
1957   }
1958   // If a request with the job was not found, it must be in |unassigned_jobs_|.
1959   DCHECK(base::Contains(unassigned_jobs_, job));
1960   return RequestQueue::Pointer();
1961 }
1962 
1963 TransportClientSocketPool::RequestQueue::Pointer
GetFirstRequestWithoutJob() const1964 TransportClientSocketPool::Group::GetFirstRequestWithoutJob() const {
1965   RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1966   size_t i = 0;
1967   for (; !pointer.is_null() && pointer.value()->job();
1968        pointer = unbound_requests_.GetNextTowardsLastMin(pointer)) {
1969     ++i;
1970   }
1971   DCHECK_EQ(i, jobs_.size() - unassigned_jobs_.size());
1972   DCHECK(pointer.is_null() || !pointer.value()->job());
1973   return pointer;
1974 }
1975 
TryToAssignUnassignedJob(ConnectJob * job)1976 void TransportClientSocketPool::Group::TryToAssignUnassignedJob(
1977     ConnectJob* job) {
1978   unassigned_jobs_.push_back(job);
1979   RequestQueue::Pointer first_request_without_job = GetFirstRequestWithoutJob();
1980   if (!first_request_without_job.is_null()) {
1981     first_request_without_job.value()->AssignJob(unassigned_jobs_.back());
1982     unassigned_jobs_.pop_back();
1983   }
1984 }
1985 
TryToAssignJobToRequest(TransportClientSocketPool::RequestQueue::Pointer request_pointer)1986 void TransportClientSocketPool::Group::TryToAssignJobToRequest(
1987     TransportClientSocketPool::RequestQueue::Pointer request_pointer) {
1988   DCHECK(!request_pointer.value()->job());
1989   if (!unassigned_jobs_.empty()) {
1990     request_pointer.value()->AssignJob(unassigned_jobs_.front());
1991     unassigned_jobs_.pop_front();
1992     return;
1993   }
1994 
1995   // If the next request in the queue does not have a job, then there are no
1996   // requests with a job after |request_pointer| from which we can steal.
1997   RequestQueue::Pointer next_request =
1998       unbound_requests_.GetNextTowardsLastMin(request_pointer);
1999   if (next_request.is_null() || !next_request.value()->job())
2000     return;
2001 
2002   // Walk down the queue to find the last request with a job.
2003   RequestQueue::Pointer cur = next_request;
2004   RequestQueue::Pointer next = unbound_requests_.GetNextTowardsLastMin(cur);
2005   while (!next.is_null() && next.value()->job()) {
2006     cur = next;
2007     next = unbound_requests_.GetNextTowardsLastMin(next);
2008   }
2009   // Steal the job from the last request with a job.
2010   TransferJobBetweenRequests(cur.value().get(), request_pointer.value().get());
2011 }
2012 
TransferJobBetweenRequests(TransportClientSocketPool::Request * source,TransportClientSocketPool::Request * dest)2013 void TransportClientSocketPool::Group::TransferJobBetweenRequests(
2014     TransportClientSocketPool::Request* source,
2015     TransportClientSocketPool::Request* dest) {
2016   DCHECK(!dest->job());
2017   DCHECK(source->job());
2018   dest->AssignJob(source->ReleaseJob());
2019 }
2020 
2021 }  // namespace net
2022