xref: /aosp_15_r20/external/grpc-grpc/src/core/load_balancing/pick_first/pick_first.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include "src/core/load_balancing/pick_first/pick_first.h"
20 
21 #include <inttypes.h>
22 #include <string.h>
23 
24 #include <memory>
25 #include <set>
26 #include <string>
27 #include <type_traits>
28 #include <utility>
29 #include <vector>
30 
31 #include "absl/algorithm/container.h"
32 #include "absl/random/random.h"
33 #include "absl/status/status.h"
34 #include "absl/status/statusor.h"
35 #include "absl/strings/str_cat.h"
36 #include "absl/strings/string_view.h"
37 #include "absl/types/optional.h"
38 
39 #include <grpc/event_engine/event_engine.h>
40 #include <grpc/impl/channel_arg_names.h>
41 #include <grpc/impl/connectivity_state.h>
42 #include <grpc/support/log.h>
43 
44 #include "src/core/load_balancing/health_check_client.h"
45 #include "src/core/lib/address_utils/sockaddr_utils.h"
46 #include "src/core/lib/channel/channel_args.h"
47 #include "src/core/lib/channel/metrics.h"
48 #include "src/core/lib/config/core_configuration.h"
49 #include "src/core/lib/debug/trace.h"
50 #include "src/core/lib/gpr/useful.h"
51 #include "src/core/lib/gprpp/crash.h"
52 #include "src/core/lib/gprpp/debug_location.h"
53 #include "src/core/lib/gprpp/orphanable.h"
54 #include "src/core/lib/gprpp/ref_counted_ptr.h"
55 #include "src/core/lib/gprpp/time.h"
56 #include "src/core/lib/gprpp/work_serializer.h"
57 #include "src/core/lib/iomgr/exec_ctx.h"
58 #include "src/core/lib/iomgr/iomgr_fwd.h"
59 #include "src/core/lib/iomgr/resolved_address.h"
60 #include "src/core/lib/json/json.h"
61 #include "src/core/lib/json/json_args.h"
62 #include "src/core/lib/json/json_object_loader.h"
63 #include "src/core/lib/transport/connectivity_state.h"
64 #include "src/core/load_balancing/lb_policy.h"
65 #include "src/core/load_balancing/lb_policy_factory.h"
66 #include "src/core/load_balancing/subchannel_interface.h"
67 #include "src/core/resolver/endpoint_addresses.h"
68 
69 namespace grpc_core {
70 
71 TraceFlag grpc_lb_pick_first_trace(false, "pick_first");
72 
73 namespace {
74 
75 //
76 // pick_first LB policy
77 //
78 
79 constexpr absl::string_view kPickFirst = "pick_first";
80 
81 const auto kMetricDisconnections =
82     GlobalInstrumentsRegistry::RegisterUInt64Counter(
83         "grpc.lb.pick_first.disconnections",
84         "EXPERIMENTAL.  Number of times the selected subchannel becomes "
85         "disconnected.",
86         "{disconnection}", {kMetricLabelTarget}, {}, false);
87 
88 const auto kMetricConnectionAttemptsSucceeded =
89     GlobalInstrumentsRegistry::RegisterUInt64Counter(
90         "grpc.lb.pick_first.connection_attempts_succeeded",
91         "EXPERIMENTAL.  Number of successful connection attempts.",
92         "{attempt}", {kMetricLabelTarget}, {}, false);
93 
94 const auto kMetricConnectionAttemptsFailed =
95     GlobalInstrumentsRegistry::RegisterUInt64Counter(
96         "grpc.lb.pick_first.connection_attempts_failed",
97         "EXPERIMENTAL.  Number of failed connection attempts.",
98         "{attempt}", {kMetricLabelTarget}, {}, false);
99 
100 class PickFirstConfig final : public LoadBalancingPolicy::Config {
101  public:
name() const102   absl::string_view name() const override { return kPickFirst; }
shuffle_addresses() const103   bool shuffle_addresses() const { return shuffle_addresses_; }
104 
JsonLoader(const JsonArgs &)105   static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
106     static const auto kJsonLoader =
107         JsonObjectLoader<PickFirstConfig>()
108             .OptionalField("shuffleAddressList",
109                            &PickFirstConfig::shuffle_addresses_)
110             .Finish();
111     return kJsonLoader;
112   }
113 
114  private:
115   bool shuffle_addresses_ = false;
116 };
117 
118 class PickFirst final : public LoadBalancingPolicy {
119  public:
120   explicit PickFirst(Args args);
121 
name() const122   absl::string_view name() const override { return kPickFirst; }
123 
124   absl::Status UpdateLocked(UpdateArgs args) override;
125   void ExitIdleLocked() override;
126   void ResetBackoffLocked() override;
127 
128  private:
129   ~PickFirst() override;
130 
131   class SubchannelList final : public InternallyRefCounted<SubchannelList> {
132    public:
133     class SubchannelData final {
134      public:
135       SubchannelData(SubchannelList* subchannel_list, size_t index,
136                      RefCountedPtr<SubchannelInterface> subchannel);
137 
subchannel() const138       SubchannelInterface* subchannel() const { return subchannel_.get(); }
connectivity_state() const139       absl::optional<grpc_connectivity_state> connectivity_state() const {
140         return connectivity_state_;
141       }
connectivity_status() const142       const absl::Status& connectivity_status() const {
143         return connectivity_status_;
144       }
145 
146       // Resets the connection backoff.
ResetBackoffLocked()147       void ResetBackoffLocked() {
148         if (subchannel_ != nullptr) subchannel_->ResetBackoff();
149       }
150 
RequestConnection()151       void RequestConnection() { subchannel_->RequestConnection(); }
152 
153       // Requests a connection attempt to start on this subchannel,
154       // with appropriate Connection Attempt Delay.
155       // Used only during the Happy Eyeballs pass.
156       void RequestConnectionWithTimer();
157 
158       // Cancels any pending connectivity watch and unrefs the subchannel.
159       void ShutdownLocked();
160 
seen_transient_failure() const161       bool seen_transient_failure() const { return seen_transient_failure_; }
162 
163      private:
164       // Watcher for subchannel connectivity state.
165       class Watcher final
166           : public SubchannelInterface::ConnectivityStateWatcherInterface {
167        public:
Watcher(RefCountedPtr<SubchannelList> subchannel_list,size_t index)168         Watcher(RefCountedPtr<SubchannelList> subchannel_list, size_t index)
169             : subchannel_list_(std::move(subchannel_list)), index_(index) {}
170 
~Watcher()171         ~Watcher() override {
172           subchannel_list_.reset(DEBUG_LOCATION, "Watcher dtor");
173         }
174 
OnConnectivityStateChange(grpc_connectivity_state new_state,absl::Status status)175         void OnConnectivityStateChange(grpc_connectivity_state new_state,
176                                        absl::Status status) override {
177           subchannel_list_->subchannels_[index_].OnConnectivityStateChange(
178               new_state, std::move(status));
179         }
180 
interested_parties()181         grpc_pollset_set* interested_parties() override {
182           return subchannel_list_->policy_->interested_parties();
183         }
184 
185        private:
186         RefCountedPtr<SubchannelList> subchannel_list_;
187         const size_t index_;
188       };
189 
190       // This method will be invoked once soon after instantiation to report
191       // the current connectivity state, and it will then be invoked again
192       // whenever the connectivity state changes.
193       void OnConnectivityStateChange(grpc_connectivity_state new_state,
194                                      absl::Status status);
195 
196       // Processes the connectivity change to READY for an unselected
197       // subchannel.
198       void ProcessUnselectedReadyLocked();
199 
200       // Backpointer to owning subchannel list.  Not owned.
201       SubchannelList* subchannel_list_;
202       const size_t index_;
203       // The subchannel.
204       RefCountedPtr<SubchannelInterface> subchannel_;
205       // Will be non-null when the subchannel's state is being watched.
206       SubchannelInterface::ConnectivityStateWatcherInterface* pending_watcher_ =
207           nullptr;
208       // Data updated by the watcher.
209       absl::optional<grpc_connectivity_state> connectivity_state_;
210       absl::Status connectivity_status_;
211       bool seen_transient_failure_ = false;
212     };
213 
214     SubchannelList(RefCountedPtr<PickFirst> policy,
215                    EndpointAddressesIterator* addresses,
216                    const ChannelArgs& args);
217 
218     ~SubchannelList() override;
219 
220     // The number of subchannels in the list.
size() const221     size_t size() const { return subchannels_.size(); }
222 
223     // Resets connection backoff of all subchannels.
224     void ResetBackoffLocked();
225 
226     void Orphan() override;
227 
IsHappyEyeballsPassComplete() const228     bool IsHappyEyeballsPassComplete() const {
229       // Checking attempting_index_ here is just an optimization -- if
230       // we haven't actually tried all subchannels yet, then we don't
231       // need to iterate.
232       if (attempting_index_ < size()) return false;
233       for (const SubchannelData& sd : subchannels_) {
234         if (!sd.seen_transient_failure()) return false;
235       }
236       return true;
237     }
238 
239    private:
240     // Returns true if all subchannels have seen their initial
241     // connectivity state notifications.
AllSubchannelsSeenInitialState() const242     bool AllSubchannelsSeenInitialState() const {
243       return num_subchannels_seen_initial_notification_ == size();
244     }
245 
246     // Looks through subchannels_ starting from attempting_index_ to
247     // find the first one not currently in TRANSIENT_FAILURE, then
248     // triggers a connection attempt for that subchannel.  If there are
249     // no more subchannels not in TRANSIENT_FAILURE, calls
250     // MaybeFinishHappyEyeballsPass().
251     void StartConnectingNextSubchannel();
252 
253     // Checks to see if the initial Happy Eyeballs pass is complete --
254     // i.e., all subchannels have seen TRANSIENT_FAILURE state at least once.
255     // If so, transitions to a mode where we try to connect to all subchannels
256     // in parallel and returns true.
257     void MaybeFinishHappyEyeballsPass();
258 
259     // Backpointer to owning policy.
260     RefCountedPtr<PickFirst> policy_;
261 
262     ChannelArgs args_;
263 
264     // The list of subchannels.
265     std::vector<SubchannelData> subchannels_;
266 
267     // Is this list shutting down? This may be true due to the shutdown of the
268     // policy itself or because a newer update has arrived while this one hadn't
269     // finished processing.
270     bool shutting_down_ = false;
271 
272     size_t num_subchannels_seen_initial_notification_ = 0;
273 
274     // The index into subchannels_ to which we are currently attempting
275     // to connect during the initial Happy Eyeballs pass.  Once the
276     // initial pass is over, this will be equal to size().
277     size_t attempting_index_ = 0;
278     // Happy Eyeballs timer handle.
279     absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
280         timer_handle_;
281 
282     // After the initial Happy Eyeballs pass, the number of failures
283     // we've seen.  Every size() failures, we trigger re-resolution.
284     size_t num_failures_ = 0;
285 
286     // The status from the last subchannel that reported TRANSIENT_FAILURE.
287     absl::Status last_failure_;
288   };
289 
290   class HealthWatcher final
291       : public SubchannelInterface::ConnectivityStateWatcherInterface {
292    public:
HealthWatcher(RefCountedPtr<PickFirst> policy)293     explicit HealthWatcher(RefCountedPtr<PickFirst> policy)
294         : policy_(std::move(policy)) {}
295 
~HealthWatcher()296     ~HealthWatcher() override {
297       policy_.reset(DEBUG_LOCATION, "HealthWatcher dtor");
298     }
299 
300     void OnConnectivityStateChange(grpc_connectivity_state new_state,
301                                    absl::Status status) override;
302 
interested_parties()303     grpc_pollset_set* interested_parties() override {
304       return policy_->interested_parties();
305     }
306 
307    private:
308     RefCountedPtr<PickFirst> policy_;
309   };
310 
311   class Picker final : public SubchannelPicker {
312    public:
Picker(RefCountedPtr<SubchannelInterface> subchannel)313     explicit Picker(RefCountedPtr<SubchannelInterface> subchannel)
314         : subchannel_(std::move(subchannel)) {}
315 
Pick(PickArgs)316     PickResult Pick(PickArgs /*args*/) override {
317       return PickResult::Complete(subchannel_);
318     }
319 
320    private:
321     RefCountedPtr<SubchannelInterface> subchannel_;
322   };
323 
324   void ShutdownLocked() override;
325 
326   void UpdateState(grpc_connectivity_state state, const absl::Status& status,
327                    RefCountedPtr<SubchannelPicker> picker);
328 
329   void AttemptToConnectUsingLatestUpdateArgsLocked();
330 
331   void UnsetSelectedSubchannel();
332 
333   // When ExitIdleLocked() is called, we create a subchannel_list_ and start
334   // trying to connect, but we don't actually change state_ until the first
335   // subchannel reports CONNECTING.  So in order to know if we're really
336   // idle, we need to check both state_ and subchannel_list_.
IsIdle() const337   bool IsIdle() const {
338     return state_ == GRPC_CHANNEL_IDLE && subchannel_list_ == nullptr;
339   }
340 
341   // Whether we should enable health watching.
342   const bool enable_health_watch_;
343   // Whether we should omit our status message prefix.
344   const bool omit_status_message_prefix_;
345   // Connection Attempt Delay for Happy Eyeballs.
346   const Duration connection_attempt_delay_;
347 
348   // Lateset update args.
349   UpdateArgs latest_update_args_;
350   // All our subchannels.
351   OrphanablePtr<SubchannelList> subchannel_list_;
352   // Latest pending subchannel list.
353   OrphanablePtr<SubchannelList> latest_pending_subchannel_list_;
354   // Selected subchannel in subchannel_list_.
355   SubchannelList::SubchannelData* selected_ = nullptr;
356   // Health watcher for the selected subchannel.
357   SubchannelInterface::ConnectivityStateWatcherInterface* health_watcher_ =
358       nullptr;
359   SubchannelInterface::DataWatcherInterface* health_data_watcher_ = nullptr;
360   // Current connectivity state.
361   grpc_connectivity_state state_ = GRPC_CHANNEL_CONNECTING;
362   // Are we shut down?
363   bool shutdown_ = false;
364   // Random bit generator used for shuffling addresses if configured
365   absl::BitGen bit_gen_;
366 };
367 
PickFirst(Args args)368 PickFirst::PickFirst(Args args)
369     : LoadBalancingPolicy(std::move(args)),
370       enable_health_watch_(
371           channel_args()
372               .GetBool(GRPC_ARG_INTERNAL_PICK_FIRST_ENABLE_HEALTH_CHECKING)
373               .value_or(false)),
374       omit_status_message_prefix_(
375           channel_args()
376               .GetBool(GRPC_ARG_INTERNAL_PICK_FIRST_OMIT_STATUS_MESSAGE_PREFIX)
377               .value_or(false)),
378       connection_attempt_delay_(Duration::Milliseconds(
379           Clamp(channel_args()
380                     .GetInt(GRPC_ARG_HAPPY_EYEBALLS_CONNECTION_ATTEMPT_DELAY_MS)
381                     .value_or(250),
382                 100, 2000))) {
383   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
384     gpr_log(GPR_INFO, "Pick First %p created.", this);
385   }
386 }
387 
~PickFirst()388 PickFirst::~PickFirst() {
389   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
390     gpr_log(GPR_INFO, "Destroying Pick First %p", this);
391   }
392   GPR_ASSERT(subchannel_list_ == nullptr);
393   GPR_ASSERT(latest_pending_subchannel_list_ == nullptr);
394 }
395 
ShutdownLocked()396 void PickFirst::ShutdownLocked() {
397   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
398     gpr_log(GPR_INFO, "Pick First %p Shutting down", this);
399   }
400   shutdown_ = true;
401   UnsetSelectedSubchannel();
402   subchannel_list_.reset();
403   latest_pending_subchannel_list_.reset();
404 }
405 
ExitIdleLocked()406 void PickFirst::ExitIdleLocked() {
407   if (shutdown_) return;
408   if (IsIdle()) {
409     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
410       gpr_log(GPR_INFO, "Pick First %p exiting idle", this);
411     }
412     AttemptToConnectUsingLatestUpdateArgsLocked();
413   }
414 }
415 
ResetBackoffLocked()416 void PickFirst::ResetBackoffLocked() {
417   if (subchannel_list_ != nullptr) subchannel_list_->ResetBackoffLocked();
418   if (latest_pending_subchannel_list_ != nullptr) {
419     latest_pending_subchannel_list_->ResetBackoffLocked();
420   }
421 }
422 
AttemptToConnectUsingLatestUpdateArgsLocked()423 void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() {
424   // Create a subchannel list from latest_update_args_.
425   EndpointAddressesIterator* addresses = nullptr;
426   if (latest_update_args_.addresses.ok()) {
427     addresses = latest_update_args_.addresses->get();
428   }
429   // Replace latest_pending_subchannel_list_.
430   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace) &&
431       latest_pending_subchannel_list_ != nullptr) {
432     gpr_log(GPR_INFO,
433             "[PF %p] Shutting down previous pending subchannel list %p", this,
434             latest_pending_subchannel_list_.get());
435   }
436   latest_pending_subchannel_list_ = MakeOrphanable<SubchannelList>(
437       RefAsSubclass<PickFirst>(), addresses, latest_update_args_.args);
438   // Empty update or no valid subchannels.  Put the channel in
439   // TRANSIENT_FAILURE and request re-resolution.
440   if (latest_pending_subchannel_list_->size() == 0) {
441     channel_control_helper()->RequestReresolution();
442     absl::Status status =
443         latest_update_args_.addresses.ok()
444             ? absl::UnavailableError(absl::StrCat(
445                   "empty address list: ", latest_update_args_.resolution_note))
446             : latest_update_args_.addresses.status();
447     UpdateState(GRPC_CHANNEL_TRANSIENT_FAILURE, status,
448                 MakeRefCounted<TransientFailurePicker>(status));
449   }
450   // If the new update is empty or we don't yet have a selected subchannel in
451   // the current list, replace the current subchannel list immediately.
452   if (latest_pending_subchannel_list_->size() == 0 || selected_ == nullptr) {
453     UnsetSelectedSubchannel();
454     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace) &&
455         subchannel_list_ != nullptr) {
456       gpr_log(GPR_INFO, "[PF %p] Shutting down previous subchannel list %p",
457               this, subchannel_list_.get());
458     }
459     subchannel_list_ = std::move(latest_pending_subchannel_list_);
460   }
461 }
462 
GetAddressFamily(const grpc_resolved_address & address)463 absl::string_view GetAddressFamily(const grpc_resolved_address& address) {
464   const char* uri_scheme = grpc_sockaddr_get_uri_scheme(&address);
465   return absl::string_view(uri_scheme == nullptr ? "other" : uri_scheme);
466 };
467 
468 // An endpoint list iterator that returns only entries for a specific
469 // address family, as indicated by the URI scheme.
470 class AddressFamilyIterator final {
471  public:
AddressFamilyIterator(absl::string_view scheme,size_t index)472   AddressFamilyIterator(absl::string_view scheme, size_t index)
473       : scheme_(scheme), index_(index) {}
474 
Next(EndpointAddressesList & endpoints,std::vector<bool> * endpoints_moved)475   EndpointAddresses* Next(EndpointAddressesList& endpoints,
476                           std::vector<bool>* endpoints_moved) {
477     for (; index_ < endpoints.size(); ++index_) {
478       if (!(*endpoints_moved)[index_] &&
479           GetAddressFamily(endpoints[index_].address()) == scheme_) {
480         (*endpoints_moved)[index_] = true;
481         return &endpoints[index_++];
482       }
483     }
484     return nullptr;
485   }
486 
487  private:
488   absl::string_view scheme_;
489   size_t index_;
490 };
491 
UpdateLocked(UpdateArgs args)492 absl::Status PickFirst::UpdateLocked(UpdateArgs args) {
493   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
494     if (args.addresses.ok()) {
495       gpr_log(GPR_INFO, "Pick First %p received update", this);
496     } else {
497       gpr_log(GPR_INFO, "Pick First %p received update with address error: %s",
498               this, args.addresses.status().ToString().c_str());
499     }
500   }
501   // Set return status based on the address list.
502   absl::Status status;
503   if (!args.addresses.ok()) {
504     status = args.addresses.status();
505   } else {
506     EndpointAddressesList endpoints;
507     (*args.addresses)->ForEach([&](const EndpointAddresses& endpoint) {
508       endpoints.push_back(endpoint);
509     });
510     if (endpoints.empty()) {
511       status = absl::UnavailableError("address list must not be empty");
512     } else {
513       // Shuffle the list if needed.
514       auto config = static_cast<PickFirstConfig*>(args.config.get());
515       if (config->shuffle_addresses()) {
516         absl::c_shuffle(endpoints, bit_gen_);
517       }
518       // Flatten the list so that we have one address per endpoint.
519       // While we're iterating, also determine the desired address family
520       // order and the index of the first element of each family, for use in
521       // the interleaving below.
522       std::set<absl::string_view> address_families;
523       std::vector<AddressFamilyIterator> address_family_order;
524       EndpointAddressesList flattened_endpoints;
525       for (const auto& endpoint : endpoints) {
526         for (const auto& address : endpoint.addresses()) {
527           flattened_endpoints.emplace_back(address, endpoint.args());
528           absl::string_view scheme = GetAddressFamily(address);
529           bool inserted = address_families.insert(scheme).second;
530           if (inserted) {
531             address_family_order.emplace_back(scheme,
532                                               flattened_endpoints.size() - 1);
533           }
534         }
535       }
536       endpoints = std::move(flattened_endpoints);
537       // Interleave addresses as per RFC-8305 section 4.
538       EndpointAddressesList interleaved_endpoints;
539       interleaved_endpoints.reserve(endpoints.size());
540       std::vector<bool> endpoints_moved(endpoints.size());
541       size_t scheme_index = 0;
542       for (size_t i = 0; i < endpoints.size(); ++i) {
543         EndpointAddresses* endpoint;
544         do {
545           auto& iterator = address_family_order[scheme_index++ %
546                                                 address_family_order.size()];
547           endpoint = iterator.Next(endpoints, &endpoints_moved);
548         } while (endpoint == nullptr);
549         interleaved_endpoints.emplace_back(std::move(*endpoint));
550       }
551       endpoints = std::move(interleaved_endpoints);
552       args.addresses =
553           std::make_shared<EndpointAddressesListIterator>(std::move(endpoints));
554     }
555   }
556   // If the update contains a resolver error and we have a previous update
557   // that was not a resolver error, keep using the previous addresses.
558   if (!args.addresses.ok() && latest_update_args_.config != nullptr) {
559     args.addresses = std::move(latest_update_args_.addresses);
560   }
561   // Update latest_update_args_.
562   latest_update_args_ = std::move(args);
563   // If we are not in idle, start connection attempt immediately.
564   // Otherwise, we defer the attempt into ExitIdleLocked().
565   if (!IsIdle()) {
566     AttemptToConnectUsingLatestUpdateArgsLocked();
567   }
568   return status;
569 }
570 
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)571 void PickFirst::UpdateState(grpc_connectivity_state state,
572                             const absl::Status& status,
573                             RefCountedPtr<SubchannelPicker> picker) {
574   state_ = state;
575   channel_control_helper()->UpdateState(state, status, std::move(picker));
576 }
577 
UnsetSelectedSubchannel()578 void PickFirst::UnsetSelectedSubchannel() {
579   if (selected_ != nullptr && health_data_watcher_ != nullptr) {
580     selected_->subchannel()->CancelDataWatcher(health_data_watcher_);
581   }
582   selected_ = nullptr;
583   health_watcher_ = nullptr;
584   health_data_watcher_ = nullptr;
585 }
586 
587 //
588 // PickFirst::HealthWatcher
589 //
590 
OnConnectivityStateChange(grpc_connectivity_state new_state,absl::Status status)591 void PickFirst::HealthWatcher::OnConnectivityStateChange(
592     grpc_connectivity_state new_state, absl::Status status) {
593   if (policy_->health_watcher_ != this) return;
594   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
595     gpr_log(GPR_INFO, "[PF %p] health watch state update: %s (%s)",
596             policy_.get(), ConnectivityStateName(new_state),
597             status.ToString().c_str());
598   }
599   switch (new_state) {
600     case GRPC_CHANNEL_READY:
601       policy_->channel_control_helper()->UpdateState(
602           GRPC_CHANNEL_READY, absl::OkStatus(),
603           MakeRefCounted<Picker>(policy_->selected_->subchannel()->Ref()));
604       break;
605     case GRPC_CHANNEL_IDLE:
606       // If the subchannel becomes disconnected, the health watcher
607       // might happen to see the change before the raw connectivity
608       // state watcher does.  In this case, ignore it, since the raw
609       // connectivity state watcher will handle it shortly.
610       break;
611     case GRPC_CHANNEL_CONNECTING:
612       policy_->channel_control_helper()->UpdateState(
613           new_state, absl::OkStatus(),
614           MakeRefCounted<QueuePicker>(policy_->Ref()));
615       break;
616     case GRPC_CHANNEL_TRANSIENT_FAILURE:
617       policy_->channel_control_helper()->UpdateState(
618           GRPC_CHANNEL_TRANSIENT_FAILURE, status,
619           MakeRefCounted<TransientFailurePicker>(status));
620       break;
621     case GRPC_CHANNEL_SHUTDOWN:
622       Crash("health watcher reported state SHUTDOWN");
623   }
624 }
625 
626 //
627 // PickFirst::SubchannelList::SubchannelData
628 //
629 
SubchannelData(SubchannelList * subchannel_list,size_t index,RefCountedPtr<SubchannelInterface> subchannel)630 PickFirst::SubchannelList::SubchannelData::SubchannelData(
631     SubchannelList* subchannel_list, size_t index,
632     RefCountedPtr<SubchannelInterface> subchannel)
633     : subchannel_list_(subchannel_list),
634       index_(index),
635       subchannel_(std::move(subchannel)) {
636   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
637     gpr_log(GPR_INFO,
638             "[PF %p] subchannel list %p index %" PRIuPTR
639             " (subchannel %p): starting watch",
640             subchannel_list_->policy_.get(), subchannel_list_, index_,
641             subchannel_.get());
642   }
643   auto watcher = std::make_unique<Watcher>(
644       subchannel_list_->Ref(DEBUG_LOCATION, "Watcher"), index_);
645   pending_watcher_ = watcher.get();
646   subchannel_->WatchConnectivityState(std::move(watcher));
647 }
648 
ShutdownLocked()649 void PickFirst::SubchannelList::SubchannelData::ShutdownLocked() {
650   if (subchannel_ != nullptr) {
651     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
652       gpr_log(GPR_INFO,
653               "[PF %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
654               " (subchannel %p): cancelling watch and unreffing subchannel",
655               subchannel_list_->policy_.get(), subchannel_list_, index_,
656               subchannel_list_->size(), subchannel_.get());
657     }
658     subchannel_->CancelConnectivityStateWatch(pending_watcher_);
659     pending_watcher_ = nullptr;
660     subchannel_.reset();
661   }
662 }
663 
OnConnectivityStateChange(grpc_connectivity_state new_state,absl::Status status)664 void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange(
665     grpc_connectivity_state new_state, absl::Status status) {
666   PickFirst* p = subchannel_list_->policy_.get();
667   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
668     gpr_log(
669         GPR_INFO,
670         "[PF %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
671         " (subchannel %p): connectivity changed: old_state=%s, new_state=%s, "
672         "status=%s, shutting_down=%d, pending_watcher=%p, "
673         "seen_transient_failure=%d, p->selected_=%p, "
674         "p->subchannel_list_=%p, p->latest_pending_subchannel_list_=%p",
675         p, subchannel_list_, index_, subchannel_list_->size(),
676         subchannel_.get(),
677         (connectivity_state_.has_value()
678              ? ConnectivityStateName(*connectivity_state_)
679              : "N/A"),
680         ConnectivityStateName(new_state), status.ToString().c_str(),
681         subchannel_list_->shutting_down_, pending_watcher_,
682         seen_transient_failure_, p->selected_, p->subchannel_list_.get(),
683         p->latest_pending_subchannel_list_.get());
684   }
685   if (subchannel_list_->shutting_down_ || pending_watcher_ == nullptr) return;
686   auto& stats_plugins =
687       subchannel_list_->policy_->channel_control_helper()
688           ->GetStatsPluginGroup();
689   // The notification must be for a subchannel in either the current or
690   // latest pending subchannel lists.
691   GPR_ASSERT(subchannel_list_ == p->subchannel_list_.get() ||
692              subchannel_list_ == p->latest_pending_subchannel_list_.get());
693   GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN);
694   absl::optional<grpc_connectivity_state> old_state = connectivity_state_;
695   connectivity_state_ = new_state;
696   connectivity_status_ = std::move(status);
697   // Handle updates for the currently selected subchannel.
698   if (p->selected_ == this) {
699     GPR_ASSERT(subchannel_list_ == p->subchannel_list_.get());
700     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
701       gpr_log(GPR_INFO,
702               "Pick First %p selected subchannel connectivity changed to %s", p,
703               ConnectivityStateName(new_state));
704     }
705     // Any state change is considered to be a failure of the existing
706     // connection.
707     stats_plugins.AddCounter(
708         kMetricDisconnections, 1,
709         {subchannel_list_->policy_->channel_control_helper()->GetTarget()}, {});
710     // TODO(roth): We could check the connectivity states of all the
711     // subchannels here, just in case one of them happens to be READY,
712     // and we could switch to that rather than going IDLE.
713     // Request a re-resolution.
714     // TODO(qianchengz): We may want to request re-resolution in
715     // ExitIdleLocked().
716     p->channel_control_helper()->RequestReresolution();
717     // If there is a pending update, switch to the pending update.
718     if (p->latest_pending_subchannel_list_ != nullptr) {
719       if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
720         gpr_log(GPR_INFO,
721                 "Pick First %p promoting pending subchannel list %p to "
722                 "replace %p",
723                 p, p->latest_pending_subchannel_list_.get(),
724                 p->subchannel_list_.get());
725       }
726       p->UnsetSelectedSubchannel();
727       p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
728       // Set our state to that of the pending subchannel list.
729       if (p->subchannel_list_->IsHappyEyeballsPassComplete()) {
730         status = absl::UnavailableError(absl::StrCat(
731             "selected subchannel failed; switching to pending update; "
732             "last failure: ",
733             p->subchannel_list_->last_failure_.ToString()));
734         p->UpdateState(GRPC_CHANNEL_TRANSIENT_FAILURE, status,
735                        MakeRefCounted<TransientFailurePicker>(status));
736       } else if (p->state_ != GRPC_CHANNEL_TRANSIENT_FAILURE) {
737         p->UpdateState(GRPC_CHANNEL_CONNECTING, absl::Status(),
738                        MakeRefCounted<QueuePicker>(nullptr));
739       }
740       return;
741     }
742     // Enter idle.
743     p->UnsetSelectedSubchannel();
744     p->subchannel_list_.reset();
745     p->UpdateState(
746         GRPC_CHANNEL_IDLE, absl::Status(),
747         MakeRefCounted<QueuePicker>(p->Ref(DEBUG_LOCATION, "QueuePicker")));
748     return;
749   }
750   // If we get here, there are two possible cases:
751   // 1. We do not currently have a selected subchannel, and the update is
752   //    for a subchannel in p->subchannel_list_ that we're trying to
753   //    connect to.  The goal here is to find a subchannel that we can
754   //    select.
755   // 2. We do currently have a selected subchannel, and the update is
756   //    for a subchannel in p->latest_pending_subchannel_list_.  The
757   //    goal here is to find a subchannel from the update that we can
758   //    select in place of the current one.
759   // If the subchannel is READY, use it.
760   if (new_state == GRPC_CHANNEL_READY) {
761     // We consider it a successful connection attempt only if the
762     // previous state was CONNECTING.  In particular, we don't want to
763     // increment this counter if we got a new address list and found the
764     // existing connection already in state READY.
765     if (old_state == GRPC_CHANNEL_CONNECTING) {
766       stats_plugins.AddCounter(
767           kMetricConnectionAttemptsSucceeded, 1,
768           {subchannel_list_->policy_->channel_control_helper()->GetTarget()},
769           {});
770     }
771     ProcessUnselectedReadyLocked();
772     return;
773   }
774   // Make sure we note when a subchannel has seen TRANSIENT_FAILURE.
775   bool prev_seen_transient_failure = seen_transient_failure_;
776   if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
777     seen_transient_failure_ = true;
778     subchannel_list_->last_failure_ = connectivity_status_;
779   }
780   // If this is the initial connectivity state update for this subchannel,
781   // increment the counter in the subchannel list.
782   if (!old_state.has_value()) {
783     ++subchannel_list_->num_subchannels_seen_initial_notification_;
784   }
785   // If we haven't yet seen the initial connectivity state notification
786   // for all subchannels, do nothing.
787   if (!subchannel_list_->AllSubchannelsSeenInitialState()) return;
788   // If we're still here and this is the initial connectivity state
789   // notification for this subchannel, that means it was the last one to
790   // see its initial notification.  Start trying to connect, starting
791   // with the first subchannel.
792   if (!old_state.has_value()) {
793     subchannel_list_->StartConnectingNextSubchannel();
794     return;
795   }
796   // We've already started trying to connect.  Any subchannel that
797   // reports TF is a connection attempt failure.
798   if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
799     stats_plugins.AddCounter(
800         kMetricConnectionAttemptsFailed, 1,
801         {subchannel_list_->policy_->channel_control_helper()->GetTarget()}, {});
802   }
803   // Otherwise, process connectivity state change.
804   switch (*connectivity_state_) {
805     case GRPC_CHANNEL_TRANSIENT_FAILURE: {
806       // If this is the first failure we've seen on this subchannel,
807       // then we're still in the Happy Eyeballs pass.
808       if (!prev_seen_transient_failure && seen_transient_failure_) {
809         // If a connection attempt fails before the timer fires, then
810         // cancel the timer and start connecting on the next subchannel.
811         if (index_ == subchannel_list_->attempting_index_) {
812           if (subchannel_list_->timer_handle_.has_value()) {
813             p->channel_control_helper()->GetEventEngine()->Cancel(
814                 *subchannel_list_->timer_handle_);
815           }
816           ++subchannel_list_->attempting_index_;
817           subchannel_list_->StartConnectingNextSubchannel();
818         } else {
819           // If this was the last subchannel to fail, check if the Happy
820           // Eyeballs pass is complete.
821           subchannel_list_->MaybeFinishHappyEyeballsPass();
822         }
823       } else if (subchannel_list_->IsHappyEyeballsPassComplete()) {
824         // We're done with the initial Happy Eyeballs pass and in a mode
825         // where we're attempting to connect to every subchannel in
826         // parallel.  We count the number of failed connection attempts,
827         // and when that is equal to the number of subchannels, request
828         // re-resolution and report TRANSIENT_FAILURE again, so that the
829         // caller has the most recent status message.  Note that this
830         // isn't necessarily the same as saying that we've seen one
831         // failure for each subchannel in the list, because the backoff
832         // state may be different in each subchannel, so we may have seen
833         // one subchannel fail more than once and another subchannel not
834         // fail at all.  But it's a good enough heuristic.
835         ++subchannel_list_->num_failures_;
836         if (subchannel_list_->num_failures_ % subchannel_list_->size() == 0) {
837           p->channel_control_helper()->RequestReresolution();
838           status = absl::UnavailableError(absl::StrCat(
839               (p->omit_status_message_prefix_
840                    ? ""
841                    : "failed to connect to all addresses; last error: "),
842               connectivity_status_.ToString()));
843           p->UpdateState(GRPC_CHANNEL_TRANSIENT_FAILURE, status,
844                          MakeRefCounted<TransientFailurePicker>(status));
845         }
846       }
847       break;
848     }
849     case GRPC_CHANNEL_IDLE:
850       // If we've finished the first Happy Eyeballs pass, then we go
851       // into a mode where we immediately try to connect to every
852       // subchannel in parallel.
853       if (subchannel_list_->IsHappyEyeballsPassComplete()) {
854         subchannel_->RequestConnection();
855       }
856       break;
857     case GRPC_CHANNEL_CONNECTING:
858       // Only update connectivity state in case 1, and only if we're not
859       // already in TRANSIENT_FAILURE.
860       if (subchannel_list_ == p->subchannel_list_.get() &&
861           p->state_ != GRPC_CHANNEL_TRANSIENT_FAILURE) {
862         p->UpdateState(GRPC_CHANNEL_CONNECTING, absl::Status(),
863                        MakeRefCounted<QueuePicker>(nullptr));
864       }
865       break;
866     default:
867       // We handled READY above, and we should never see SHUTDOWN.
868       GPR_UNREACHABLE_CODE(break);
869   }
870 }
871 
RequestConnectionWithTimer()872 void PickFirst::SubchannelList::SubchannelData::RequestConnectionWithTimer() {
873   GPR_ASSERT(connectivity_state_.has_value());
874   if (connectivity_state_ == GRPC_CHANNEL_IDLE) {
875     subchannel_->RequestConnection();
876   } else {
877     GPR_ASSERT(connectivity_state_ == GRPC_CHANNEL_CONNECTING);
878   }
879   // If this is not the last subchannel in the list, start the timer.
880   if (index_ != subchannel_list_->size() - 1) {
881     PickFirst* p = subchannel_list_->policy_.get();
882     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
883       gpr_log(GPR_INFO,
884               "Pick First %p subchannel list %p: starting Connection "
885               "Attempt Delay timer for %" PRId64 "ms for index %" PRIuPTR,
886               p, subchannel_list_, p->connection_attempt_delay_.millis(),
887               index_);
888     }
889     subchannel_list_->timer_handle_ =
890         p->channel_control_helper()->GetEventEngine()->RunAfter(
891             p->connection_attempt_delay_,
892             [subchannel_list =
893                  subchannel_list_->Ref(DEBUG_LOCATION, "timer")]() mutable {
894               ApplicationCallbackExecCtx application_exec_ctx;
895               ExecCtx exec_ctx;
896               auto* sl = subchannel_list.get();
897               sl->policy_->work_serializer()->Run(
898                   [subchannel_list = std::move(subchannel_list)]() {
899                     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
900                       gpr_log(GPR_INFO,
901                               "Pick First %p subchannel list %p: Connection "
902                               "Attempt Delay timer fired (shutting_down=%d, "
903                               "selected=%p)",
904                               subchannel_list->policy_.get(),
905                               subchannel_list.get(),
906                               subchannel_list->shutting_down_,
907                               subchannel_list->policy_->selected_);
908                     }
909                     if (subchannel_list->shutting_down_) return;
910                     if (subchannel_list->policy_->selected_ != nullptr) return;
911                     ++subchannel_list->attempting_index_;
912                     subchannel_list->StartConnectingNextSubchannel();
913                   },
914                   DEBUG_LOCATION);
915             });
916   }
917 }
918 
ProcessUnselectedReadyLocked()919 void PickFirst::SubchannelList::SubchannelData::ProcessUnselectedReadyLocked() {
920   PickFirst* p = subchannel_list_->policy_.get();
921   // Cancel Happy Eyeballs timer, if any.
922   if (subchannel_list_->timer_handle_.has_value()) {
923     p->channel_control_helper()->GetEventEngine()->Cancel(
924         *subchannel_list_->timer_handle_);
925   }
926   // If we get here, there are two possible cases:
927   // 1. We do not currently have a selected subchannel, and the update is
928   //    for a subchannel in p->subchannel_list_ that we're trying to
929   //    connect to.  The goal here is to find a subchannel that we can
930   //    select.
931   // 2. We do currently have a selected subchannel, and the update is
932   //    for a subchannel in p->latest_pending_subchannel_list_.  The
933   //    goal here is to find a subchannel from the update that we can
934   //    select in place of the current one.
935   GPR_ASSERT(subchannel_list_ == p->subchannel_list_.get() ||
936              subchannel_list_ == p->latest_pending_subchannel_list_.get());
937   // Case 2.  Promote p->latest_pending_subchannel_list_ to p->subchannel_list_.
938   if (subchannel_list_ == p->latest_pending_subchannel_list_.get()) {
939     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
940       gpr_log(GPR_INFO,
941               "Pick First %p promoting pending subchannel list %p to "
942               "replace %p",
943               p, p->latest_pending_subchannel_list_.get(),
944               p->subchannel_list_.get());
945     }
946     p->UnsetSelectedSubchannel();
947     p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
948   }
949   // Cases 1 and 2.
950   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
951     gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p,
952             subchannel_.get());
953   }
954   p->selected_ = this;
955   // If health checking is enabled, start the health watch, but don't
956   // report a new picker -- we want to stay in CONNECTING while we wait
957   // for the health status notification.
958   // If health checking is NOT enabled, report READY.
959   if (p->enable_health_watch_) {
960     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
961       gpr_log(GPR_INFO, "[PF %p] starting health watch", p);
962     }
963     auto watcher = std::make_unique<HealthWatcher>(
964         p->RefAsSubclass<PickFirst>(DEBUG_LOCATION, "HealthWatcher"));
965     p->health_watcher_ = watcher.get();
966     auto health_data_watcher = MakeHealthCheckWatcher(
967         p->work_serializer(), subchannel_list_->args_, std::move(watcher));
968     p->health_data_watcher_ = health_data_watcher.get();
969     subchannel_->AddDataWatcher(std::move(health_data_watcher));
970   } else {
971     p->UpdateState(GRPC_CHANNEL_READY, absl::Status(),
972                    MakeRefCounted<Picker>(subchannel()->Ref()));
973   }
974   // Unref all other subchannels in the list.
975   for (size_t i = 0; i < subchannel_list_->size(); ++i) {
976     if (i != index_) {
977       subchannel_list_->subchannels_[i].ShutdownLocked();
978     }
979   }
980 }
981 
982 //
983 // PickFirst::SubchannelList
984 //
985 
SubchannelList(RefCountedPtr<PickFirst> policy,EndpointAddressesIterator * addresses,const ChannelArgs & args)986 PickFirst::SubchannelList::SubchannelList(RefCountedPtr<PickFirst> policy,
987                                           EndpointAddressesIterator* addresses,
988                                           const ChannelArgs& args)
989     : InternallyRefCounted<SubchannelList>(
990           GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace) ? "SubchannelList"
991                                                             : nullptr),
992       policy_(std::move(policy)),
993       args_(args.Remove(GRPC_ARG_INTERNAL_PICK_FIRST_ENABLE_HEALTH_CHECKING)
994                 .Remove(
995                     GRPC_ARG_INTERNAL_PICK_FIRST_OMIT_STATUS_MESSAGE_PREFIX)) {
996   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
997     gpr_log(GPR_INFO, "[PF %p] Creating subchannel list %p - channel args: %s",
998             policy_.get(), this, args_.ToString().c_str());
999   }
1000   if (addresses == nullptr) return;
1001   // Create a subchannel for each address.
1002   addresses->ForEach([&](const EndpointAddresses& address) {
1003     GPR_ASSERT(address.addresses().size() == 1);
1004     RefCountedPtr<SubchannelInterface> subchannel =
1005         policy_->channel_control_helper()->CreateSubchannel(
1006             address.address(), address.args(), args_);
1007     if (subchannel == nullptr) {
1008       // Subchannel could not be created.
1009       if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
1010         gpr_log(GPR_INFO,
1011                 "[PF %p] could not create subchannel for address %s, ignoring",
1012                 policy_.get(), address.ToString().c_str());
1013       }
1014       return;
1015     }
1016     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
1017       gpr_log(GPR_INFO,
1018               "[PF %p] subchannel list %p index %" PRIuPTR
1019               ": Created subchannel %p for address %s",
1020               policy_.get(), this, subchannels_.size(), subchannel.get(),
1021               address.ToString().c_str());
1022     }
1023     subchannels_.emplace_back(this, subchannels_.size(), std::move(subchannel));
1024   });
1025 }
1026 
~SubchannelList()1027 PickFirst::SubchannelList::~SubchannelList() {
1028   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
1029     gpr_log(GPR_INFO, "[PF %p] Destroying subchannel_list %p", policy_.get(),
1030             this);
1031   }
1032 }
1033 
Orphan()1034 void PickFirst::SubchannelList::Orphan() {
1035   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
1036     gpr_log(GPR_INFO, "[PF %p] Shutting down subchannel_list %p", policy_.get(),
1037             this);
1038   }
1039   GPR_ASSERT(!shutting_down_);
1040   shutting_down_ = true;
1041   for (auto& sd : subchannels_) {
1042     sd.ShutdownLocked();
1043   }
1044   if (timer_handle_.has_value()) {
1045     policy_->channel_control_helper()->GetEventEngine()->Cancel(*timer_handle_);
1046   }
1047   Unref();
1048 }
1049 
ResetBackoffLocked()1050 void PickFirst::SubchannelList::ResetBackoffLocked() {
1051   for (auto& sd : subchannels_) {
1052     sd.ResetBackoffLocked();
1053   }
1054 }
1055 
StartConnectingNextSubchannel()1056 void PickFirst::SubchannelList::StartConnectingNextSubchannel() {
1057   // Find the next subchannel not in state TRANSIENT_FAILURE.
1058   // We skip subchannels in state TRANSIENT_FAILURE to avoid a
1059   // large recursion that could overflow the stack.
1060   for (; attempting_index_ < size(); ++attempting_index_) {
1061     SubchannelData* sc = &subchannels_[attempting_index_];
1062     GPR_ASSERT(sc->connectivity_state().has_value());
1063     if (sc->connectivity_state() != GRPC_CHANNEL_TRANSIENT_FAILURE) {
1064       // Found a subchannel not in TRANSIENT_FAILURE, so trigger a
1065       // connection attempt.
1066       sc->RequestConnectionWithTimer();
1067       return;
1068     }
1069   }
1070   // If we didn't find a subchannel to request a connection on, check to
1071   // see if the Happy Eyeballs pass is complete.
1072   MaybeFinishHappyEyeballsPass();
1073 }
1074 
MaybeFinishHappyEyeballsPass()1075 void PickFirst::SubchannelList::MaybeFinishHappyEyeballsPass() {
1076   // Make sure all subchannels have finished a connection attempt before
1077   // we consider the Happy Eyeballs pass complete.
1078   if (!IsHappyEyeballsPassComplete()) return;
1079   // We didn't find another subchannel not in state TRANSIENT_FAILURE,
1080   // so report TRANSIENT_FAILURE and switch to a mode in which we try to
1081   // connect to all addresses in parallel.
1082   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
1083     gpr_log(GPR_INFO,
1084             "Pick First %p subchannel list %p failed to connect to "
1085             "all subchannels",
1086             policy_.get(), this);
1087   }
1088   // In case 2, swap to the new subchannel list.  This means reporting
1089   // TRANSIENT_FAILURE and dropping the existing (working) connection,
1090   // but we can't ignore what the control plane has told us.
1091   if (policy_->latest_pending_subchannel_list_.get() == this) {
1092     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
1093       gpr_log(GPR_INFO,
1094               "Pick First %p promoting pending subchannel list %p to "
1095               "replace %p",
1096               policy_.get(), policy_->latest_pending_subchannel_list_.get(),
1097               this);
1098     }
1099     policy_->UnsetSelectedSubchannel();
1100     policy_->subchannel_list_ =
1101         std::move(policy_->latest_pending_subchannel_list_);
1102   }
1103   // If this is the current subchannel list (either because we were
1104   // in case 1 or because we were in case 2 and just promoted it to
1105   // be the current list), re-resolve and report new state.
1106   if (policy_->subchannel_list_.get() == this) {
1107     policy_->channel_control_helper()->RequestReresolution();
1108     absl::Status status = absl::UnavailableError(
1109         absl::StrCat((policy_->omit_status_message_prefix_
1110                           ? ""
1111                           : "failed to connect to all addresses; last error: "),
1112                      last_failure_.ToString()));
1113     policy_->UpdateState(GRPC_CHANNEL_TRANSIENT_FAILURE, status,
1114                          MakeRefCounted<TransientFailurePicker>(status));
1115   }
1116   // We now transition into a mode where we try to connect to all
1117   // subchannels in parallel.  For any subchannel currently in IDLE,
1118   // trigger a connection attempt.  For any subchannel not currently in
1119   // IDLE, we will trigger a connection attempt when it does report IDLE.
1120   for (SubchannelData& sd : subchannels_) {
1121     if (sd.connectivity_state() == GRPC_CHANNEL_IDLE) {
1122       sd.RequestConnection();
1123     }
1124   }
1125 }
1126 
1127 //
1128 // factory
1129 //
1130 
1131 class PickFirstFactory final : public LoadBalancingPolicyFactory {
1132  public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const1133   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
1134       LoadBalancingPolicy::Args args) const override {
1135     return MakeOrphanable<PickFirst>(std::move(args));
1136   }
1137 
name() const1138   absl::string_view name() const override { return kPickFirst; }
1139 
1140   absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json & json) const1141   ParseLoadBalancingConfig(const Json& json) const override {
1142     return LoadFromJson<RefCountedPtr<PickFirstConfig>>(
1143         json, JsonArgs(), "errors validating pick_first LB policy config");
1144   }
1145 };
1146 
1147 }  // namespace
1148 
RegisterPickFirstLbPolicy(CoreConfiguration::Builder * builder)1149 void RegisterPickFirstLbPolicy(CoreConfiguration::Builder* builder) {
1150   builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
1151       std::make_unique<PickFirstFactory>());
1152 }
1153 
1154 }  // namespace grpc_core
1155