1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include "src/core/load_balancing/pick_first/pick_first.h"
20
21 #include <inttypes.h>
22 #include <string.h>
23
24 #include <memory>
25 #include <set>
26 #include <string>
27 #include <type_traits>
28 #include <utility>
29 #include <vector>
30
31 #include "absl/algorithm/container.h"
32 #include "absl/random/random.h"
33 #include "absl/status/status.h"
34 #include "absl/status/statusor.h"
35 #include "absl/strings/str_cat.h"
36 #include "absl/strings/string_view.h"
37 #include "absl/types/optional.h"
38
39 #include <grpc/event_engine/event_engine.h>
40 #include <grpc/impl/channel_arg_names.h>
41 #include <grpc/impl/connectivity_state.h>
42 #include <grpc/support/log.h>
43
44 #include "src/core/load_balancing/health_check_client.h"
45 #include "src/core/lib/address_utils/sockaddr_utils.h"
46 #include "src/core/lib/channel/channel_args.h"
47 #include "src/core/lib/channel/metrics.h"
48 #include "src/core/lib/config/core_configuration.h"
49 #include "src/core/lib/debug/trace.h"
50 #include "src/core/lib/gpr/useful.h"
51 #include "src/core/lib/gprpp/crash.h"
52 #include "src/core/lib/gprpp/debug_location.h"
53 #include "src/core/lib/gprpp/orphanable.h"
54 #include "src/core/lib/gprpp/ref_counted_ptr.h"
55 #include "src/core/lib/gprpp/time.h"
56 #include "src/core/lib/gprpp/work_serializer.h"
57 #include "src/core/lib/iomgr/exec_ctx.h"
58 #include "src/core/lib/iomgr/iomgr_fwd.h"
59 #include "src/core/lib/iomgr/resolved_address.h"
60 #include "src/core/lib/json/json.h"
61 #include "src/core/lib/json/json_args.h"
62 #include "src/core/lib/json/json_object_loader.h"
63 #include "src/core/lib/transport/connectivity_state.h"
64 #include "src/core/load_balancing/lb_policy.h"
65 #include "src/core/load_balancing/lb_policy_factory.h"
66 #include "src/core/load_balancing/subchannel_interface.h"
67 #include "src/core/resolver/endpoint_addresses.h"
68
69 namespace grpc_core {
70
71 TraceFlag grpc_lb_pick_first_trace(false, "pick_first");
72
73 namespace {
74
75 //
76 // pick_first LB policy
77 //
78
79 constexpr absl::string_view kPickFirst = "pick_first";
80
81 const auto kMetricDisconnections =
82 GlobalInstrumentsRegistry::RegisterUInt64Counter(
83 "grpc.lb.pick_first.disconnections",
84 "EXPERIMENTAL. Number of times the selected subchannel becomes "
85 "disconnected.",
86 "{disconnection}", {kMetricLabelTarget}, {}, false);
87
88 const auto kMetricConnectionAttemptsSucceeded =
89 GlobalInstrumentsRegistry::RegisterUInt64Counter(
90 "grpc.lb.pick_first.connection_attempts_succeeded",
91 "EXPERIMENTAL. Number of successful connection attempts.",
92 "{attempt}", {kMetricLabelTarget}, {}, false);
93
94 const auto kMetricConnectionAttemptsFailed =
95 GlobalInstrumentsRegistry::RegisterUInt64Counter(
96 "grpc.lb.pick_first.connection_attempts_failed",
97 "EXPERIMENTAL. Number of failed connection attempts.",
98 "{attempt}", {kMetricLabelTarget}, {}, false);
99
100 class PickFirstConfig final : public LoadBalancingPolicy::Config {
101 public:
name() const102 absl::string_view name() const override { return kPickFirst; }
shuffle_addresses() const103 bool shuffle_addresses() const { return shuffle_addresses_; }
104
JsonLoader(const JsonArgs &)105 static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
106 static const auto kJsonLoader =
107 JsonObjectLoader<PickFirstConfig>()
108 .OptionalField("shuffleAddressList",
109 &PickFirstConfig::shuffle_addresses_)
110 .Finish();
111 return kJsonLoader;
112 }
113
114 private:
115 bool shuffle_addresses_ = false;
116 };
117
118 class PickFirst final : public LoadBalancingPolicy {
119 public:
120 explicit PickFirst(Args args);
121
name() const122 absl::string_view name() const override { return kPickFirst; }
123
124 absl::Status UpdateLocked(UpdateArgs args) override;
125 void ExitIdleLocked() override;
126 void ResetBackoffLocked() override;
127
128 private:
129 ~PickFirst() override;
130
131 class SubchannelList final : public InternallyRefCounted<SubchannelList> {
132 public:
133 class SubchannelData final {
134 public:
135 SubchannelData(SubchannelList* subchannel_list, size_t index,
136 RefCountedPtr<SubchannelInterface> subchannel);
137
subchannel() const138 SubchannelInterface* subchannel() const { return subchannel_.get(); }
connectivity_state() const139 absl::optional<grpc_connectivity_state> connectivity_state() const {
140 return connectivity_state_;
141 }
connectivity_status() const142 const absl::Status& connectivity_status() const {
143 return connectivity_status_;
144 }
145
146 // Resets the connection backoff.
ResetBackoffLocked()147 void ResetBackoffLocked() {
148 if (subchannel_ != nullptr) subchannel_->ResetBackoff();
149 }
150
RequestConnection()151 void RequestConnection() { subchannel_->RequestConnection(); }
152
153 // Requests a connection attempt to start on this subchannel,
154 // with appropriate Connection Attempt Delay.
155 // Used only during the Happy Eyeballs pass.
156 void RequestConnectionWithTimer();
157
158 // Cancels any pending connectivity watch and unrefs the subchannel.
159 void ShutdownLocked();
160
seen_transient_failure() const161 bool seen_transient_failure() const { return seen_transient_failure_; }
162
163 private:
164 // Watcher for subchannel connectivity state.
165 class Watcher final
166 : public SubchannelInterface::ConnectivityStateWatcherInterface {
167 public:
Watcher(RefCountedPtr<SubchannelList> subchannel_list,size_t index)168 Watcher(RefCountedPtr<SubchannelList> subchannel_list, size_t index)
169 : subchannel_list_(std::move(subchannel_list)), index_(index) {}
170
~Watcher()171 ~Watcher() override {
172 subchannel_list_.reset(DEBUG_LOCATION, "Watcher dtor");
173 }
174
OnConnectivityStateChange(grpc_connectivity_state new_state,absl::Status status)175 void OnConnectivityStateChange(grpc_connectivity_state new_state,
176 absl::Status status) override {
177 subchannel_list_->subchannels_[index_].OnConnectivityStateChange(
178 new_state, std::move(status));
179 }
180
interested_parties()181 grpc_pollset_set* interested_parties() override {
182 return subchannel_list_->policy_->interested_parties();
183 }
184
185 private:
186 RefCountedPtr<SubchannelList> subchannel_list_;
187 const size_t index_;
188 };
189
190 // This method will be invoked once soon after instantiation to report
191 // the current connectivity state, and it will then be invoked again
192 // whenever the connectivity state changes.
193 void OnConnectivityStateChange(grpc_connectivity_state new_state,
194 absl::Status status);
195
196 // Processes the connectivity change to READY for an unselected
197 // subchannel.
198 void ProcessUnselectedReadyLocked();
199
200 // Backpointer to owning subchannel list. Not owned.
201 SubchannelList* subchannel_list_;
202 const size_t index_;
203 // The subchannel.
204 RefCountedPtr<SubchannelInterface> subchannel_;
205 // Will be non-null when the subchannel's state is being watched.
206 SubchannelInterface::ConnectivityStateWatcherInterface* pending_watcher_ =
207 nullptr;
208 // Data updated by the watcher.
209 absl::optional<grpc_connectivity_state> connectivity_state_;
210 absl::Status connectivity_status_;
211 bool seen_transient_failure_ = false;
212 };
213
214 SubchannelList(RefCountedPtr<PickFirst> policy,
215 EndpointAddressesIterator* addresses,
216 const ChannelArgs& args);
217
218 ~SubchannelList() override;
219
220 // The number of subchannels in the list.
size() const221 size_t size() const { return subchannels_.size(); }
222
223 // Resets connection backoff of all subchannels.
224 void ResetBackoffLocked();
225
226 void Orphan() override;
227
IsHappyEyeballsPassComplete() const228 bool IsHappyEyeballsPassComplete() const {
229 // Checking attempting_index_ here is just an optimization -- if
230 // we haven't actually tried all subchannels yet, then we don't
231 // need to iterate.
232 if (attempting_index_ < size()) return false;
233 for (const SubchannelData& sd : subchannels_) {
234 if (!sd.seen_transient_failure()) return false;
235 }
236 return true;
237 }
238
239 private:
240 // Returns true if all subchannels have seen their initial
241 // connectivity state notifications.
AllSubchannelsSeenInitialState() const242 bool AllSubchannelsSeenInitialState() const {
243 return num_subchannels_seen_initial_notification_ == size();
244 }
245
246 // Looks through subchannels_ starting from attempting_index_ to
247 // find the first one not currently in TRANSIENT_FAILURE, then
248 // triggers a connection attempt for that subchannel. If there are
249 // no more subchannels not in TRANSIENT_FAILURE, calls
250 // MaybeFinishHappyEyeballsPass().
251 void StartConnectingNextSubchannel();
252
253 // Checks to see if the initial Happy Eyeballs pass is complete --
254 // i.e., all subchannels have seen TRANSIENT_FAILURE state at least once.
255 // If so, transitions to a mode where we try to connect to all subchannels
256 // in parallel and returns true.
257 void MaybeFinishHappyEyeballsPass();
258
259 // Backpointer to owning policy.
260 RefCountedPtr<PickFirst> policy_;
261
262 ChannelArgs args_;
263
264 // The list of subchannels.
265 std::vector<SubchannelData> subchannels_;
266
267 // Is this list shutting down? This may be true due to the shutdown of the
268 // policy itself or because a newer update has arrived while this one hadn't
269 // finished processing.
270 bool shutting_down_ = false;
271
272 size_t num_subchannels_seen_initial_notification_ = 0;
273
274 // The index into subchannels_ to which we are currently attempting
275 // to connect during the initial Happy Eyeballs pass. Once the
276 // initial pass is over, this will be equal to size().
277 size_t attempting_index_ = 0;
278 // Happy Eyeballs timer handle.
279 absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
280 timer_handle_;
281
282 // After the initial Happy Eyeballs pass, the number of failures
283 // we've seen. Every size() failures, we trigger re-resolution.
284 size_t num_failures_ = 0;
285
286 // The status from the last subchannel that reported TRANSIENT_FAILURE.
287 absl::Status last_failure_;
288 };
289
290 class HealthWatcher final
291 : public SubchannelInterface::ConnectivityStateWatcherInterface {
292 public:
HealthWatcher(RefCountedPtr<PickFirst> policy)293 explicit HealthWatcher(RefCountedPtr<PickFirst> policy)
294 : policy_(std::move(policy)) {}
295
~HealthWatcher()296 ~HealthWatcher() override {
297 policy_.reset(DEBUG_LOCATION, "HealthWatcher dtor");
298 }
299
300 void OnConnectivityStateChange(grpc_connectivity_state new_state,
301 absl::Status status) override;
302
interested_parties()303 grpc_pollset_set* interested_parties() override {
304 return policy_->interested_parties();
305 }
306
307 private:
308 RefCountedPtr<PickFirst> policy_;
309 };
310
311 class Picker final : public SubchannelPicker {
312 public:
Picker(RefCountedPtr<SubchannelInterface> subchannel)313 explicit Picker(RefCountedPtr<SubchannelInterface> subchannel)
314 : subchannel_(std::move(subchannel)) {}
315
Pick(PickArgs)316 PickResult Pick(PickArgs /*args*/) override {
317 return PickResult::Complete(subchannel_);
318 }
319
320 private:
321 RefCountedPtr<SubchannelInterface> subchannel_;
322 };
323
324 void ShutdownLocked() override;
325
326 void UpdateState(grpc_connectivity_state state, const absl::Status& status,
327 RefCountedPtr<SubchannelPicker> picker);
328
329 void AttemptToConnectUsingLatestUpdateArgsLocked();
330
331 void UnsetSelectedSubchannel();
332
333 // When ExitIdleLocked() is called, we create a subchannel_list_ and start
334 // trying to connect, but we don't actually change state_ until the first
335 // subchannel reports CONNECTING. So in order to know if we're really
336 // idle, we need to check both state_ and subchannel_list_.
IsIdle() const337 bool IsIdle() const {
338 return state_ == GRPC_CHANNEL_IDLE && subchannel_list_ == nullptr;
339 }
340
341 // Whether we should enable health watching.
342 const bool enable_health_watch_;
343 // Whether we should omit our status message prefix.
344 const bool omit_status_message_prefix_;
345 // Connection Attempt Delay for Happy Eyeballs.
346 const Duration connection_attempt_delay_;
347
348 // Lateset update args.
349 UpdateArgs latest_update_args_;
350 // All our subchannels.
351 OrphanablePtr<SubchannelList> subchannel_list_;
352 // Latest pending subchannel list.
353 OrphanablePtr<SubchannelList> latest_pending_subchannel_list_;
354 // Selected subchannel in subchannel_list_.
355 SubchannelList::SubchannelData* selected_ = nullptr;
356 // Health watcher for the selected subchannel.
357 SubchannelInterface::ConnectivityStateWatcherInterface* health_watcher_ =
358 nullptr;
359 SubchannelInterface::DataWatcherInterface* health_data_watcher_ = nullptr;
360 // Current connectivity state.
361 grpc_connectivity_state state_ = GRPC_CHANNEL_CONNECTING;
362 // Are we shut down?
363 bool shutdown_ = false;
364 // Random bit generator used for shuffling addresses if configured
365 absl::BitGen bit_gen_;
366 };
367
PickFirst(Args args)368 PickFirst::PickFirst(Args args)
369 : LoadBalancingPolicy(std::move(args)),
370 enable_health_watch_(
371 channel_args()
372 .GetBool(GRPC_ARG_INTERNAL_PICK_FIRST_ENABLE_HEALTH_CHECKING)
373 .value_or(false)),
374 omit_status_message_prefix_(
375 channel_args()
376 .GetBool(GRPC_ARG_INTERNAL_PICK_FIRST_OMIT_STATUS_MESSAGE_PREFIX)
377 .value_or(false)),
378 connection_attempt_delay_(Duration::Milliseconds(
379 Clamp(channel_args()
380 .GetInt(GRPC_ARG_HAPPY_EYEBALLS_CONNECTION_ATTEMPT_DELAY_MS)
381 .value_or(250),
382 100, 2000))) {
383 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
384 gpr_log(GPR_INFO, "Pick First %p created.", this);
385 }
386 }
387
~PickFirst()388 PickFirst::~PickFirst() {
389 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
390 gpr_log(GPR_INFO, "Destroying Pick First %p", this);
391 }
392 GPR_ASSERT(subchannel_list_ == nullptr);
393 GPR_ASSERT(latest_pending_subchannel_list_ == nullptr);
394 }
395
ShutdownLocked()396 void PickFirst::ShutdownLocked() {
397 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
398 gpr_log(GPR_INFO, "Pick First %p Shutting down", this);
399 }
400 shutdown_ = true;
401 UnsetSelectedSubchannel();
402 subchannel_list_.reset();
403 latest_pending_subchannel_list_.reset();
404 }
405
ExitIdleLocked()406 void PickFirst::ExitIdleLocked() {
407 if (shutdown_) return;
408 if (IsIdle()) {
409 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
410 gpr_log(GPR_INFO, "Pick First %p exiting idle", this);
411 }
412 AttemptToConnectUsingLatestUpdateArgsLocked();
413 }
414 }
415
ResetBackoffLocked()416 void PickFirst::ResetBackoffLocked() {
417 if (subchannel_list_ != nullptr) subchannel_list_->ResetBackoffLocked();
418 if (latest_pending_subchannel_list_ != nullptr) {
419 latest_pending_subchannel_list_->ResetBackoffLocked();
420 }
421 }
422
AttemptToConnectUsingLatestUpdateArgsLocked()423 void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() {
424 // Create a subchannel list from latest_update_args_.
425 EndpointAddressesIterator* addresses = nullptr;
426 if (latest_update_args_.addresses.ok()) {
427 addresses = latest_update_args_.addresses->get();
428 }
429 // Replace latest_pending_subchannel_list_.
430 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace) &&
431 latest_pending_subchannel_list_ != nullptr) {
432 gpr_log(GPR_INFO,
433 "[PF %p] Shutting down previous pending subchannel list %p", this,
434 latest_pending_subchannel_list_.get());
435 }
436 latest_pending_subchannel_list_ = MakeOrphanable<SubchannelList>(
437 RefAsSubclass<PickFirst>(), addresses, latest_update_args_.args);
438 // Empty update or no valid subchannels. Put the channel in
439 // TRANSIENT_FAILURE and request re-resolution.
440 if (latest_pending_subchannel_list_->size() == 0) {
441 channel_control_helper()->RequestReresolution();
442 absl::Status status =
443 latest_update_args_.addresses.ok()
444 ? absl::UnavailableError(absl::StrCat(
445 "empty address list: ", latest_update_args_.resolution_note))
446 : latest_update_args_.addresses.status();
447 UpdateState(GRPC_CHANNEL_TRANSIENT_FAILURE, status,
448 MakeRefCounted<TransientFailurePicker>(status));
449 }
450 // If the new update is empty or we don't yet have a selected subchannel in
451 // the current list, replace the current subchannel list immediately.
452 if (latest_pending_subchannel_list_->size() == 0 || selected_ == nullptr) {
453 UnsetSelectedSubchannel();
454 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace) &&
455 subchannel_list_ != nullptr) {
456 gpr_log(GPR_INFO, "[PF %p] Shutting down previous subchannel list %p",
457 this, subchannel_list_.get());
458 }
459 subchannel_list_ = std::move(latest_pending_subchannel_list_);
460 }
461 }
462
GetAddressFamily(const grpc_resolved_address & address)463 absl::string_view GetAddressFamily(const grpc_resolved_address& address) {
464 const char* uri_scheme = grpc_sockaddr_get_uri_scheme(&address);
465 return absl::string_view(uri_scheme == nullptr ? "other" : uri_scheme);
466 };
467
468 // An endpoint list iterator that returns only entries for a specific
469 // address family, as indicated by the URI scheme.
470 class AddressFamilyIterator final {
471 public:
AddressFamilyIterator(absl::string_view scheme,size_t index)472 AddressFamilyIterator(absl::string_view scheme, size_t index)
473 : scheme_(scheme), index_(index) {}
474
Next(EndpointAddressesList & endpoints,std::vector<bool> * endpoints_moved)475 EndpointAddresses* Next(EndpointAddressesList& endpoints,
476 std::vector<bool>* endpoints_moved) {
477 for (; index_ < endpoints.size(); ++index_) {
478 if (!(*endpoints_moved)[index_] &&
479 GetAddressFamily(endpoints[index_].address()) == scheme_) {
480 (*endpoints_moved)[index_] = true;
481 return &endpoints[index_++];
482 }
483 }
484 return nullptr;
485 }
486
487 private:
488 absl::string_view scheme_;
489 size_t index_;
490 };
491
UpdateLocked(UpdateArgs args)492 absl::Status PickFirst::UpdateLocked(UpdateArgs args) {
493 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
494 if (args.addresses.ok()) {
495 gpr_log(GPR_INFO, "Pick First %p received update", this);
496 } else {
497 gpr_log(GPR_INFO, "Pick First %p received update with address error: %s",
498 this, args.addresses.status().ToString().c_str());
499 }
500 }
501 // Set return status based on the address list.
502 absl::Status status;
503 if (!args.addresses.ok()) {
504 status = args.addresses.status();
505 } else {
506 EndpointAddressesList endpoints;
507 (*args.addresses)->ForEach([&](const EndpointAddresses& endpoint) {
508 endpoints.push_back(endpoint);
509 });
510 if (endpoints.empty()) {
511 status = absl::UnavailableError("address list must not be empty");
512 } else {
513 // Shuffle the list if needed.
514 auto config = static_cast<PickFirstConfig*>(args.config.get());
515 if (config->shuffle_addresses()) {
516 absl::c_shuffle(endpoints, bit_gen_);
517 }
518 // Flatten the list so that we have one address per endpoint.
519 // While we're iterating, also determine the desired address family
520 // order and the index of the first element of each family, for use in
521 // the interleaving below.
522 std::set<absl::string_view> address_families;
523 std::vector<AddressFamilyIterator> address_family_order;
524 EndpointAddressesList flattened_endpoints;
525 for (const auto& endpoint : endpoints) {
526 for (const auto& address : endpoint.addresses()) {
527 flattened_endpoints.emplace_back(address, endpoint.args());
528 absl::string_view scheme = GetAddressFamily(address);
529 bool inserted = address_families.insert(scheme).second;
530 if (inserted) {
531 address_family_order.emplace_back(scheme,
532 flattened_endpoints.size() - 1);
533 }
534 }
535 }
536 endpoints = std::move(flattened_endpoints);
537 // Interleave addresses as per RFC-8305 section 4.
538 EndpointAddressesList interleaved_endpoints;
539 interleaved_endpoints.reserve(endpoints.size());
540 std::vector<bool> endpoints_moved(endpoints.size());
541 size_t scheme_index = 0;
542 for (size_t i = 0; i < endpoints.size(); ++i) {
543 EndpointAddresses* endpoint;
544 do {
545 auto& iterator = address_family_order[scheme_index++ %
546 address_family_order.size()];
547 endpoint = iterator.Next(endpoints, &endpoints_moved);
548 } while (endpoint == nullptr);
549 interleaved_endpoints.emplace_back(std::move(*endpoint));
550 }
551 endpoints = std::move(interleaved_endpoints);
552 args.addresses =
553 std::make_shared<EndpointAddressesListIterator>(std::move(endpoints));
554 }
555 }
556 // If the update contains a resolver error and we have a previous update
557 // that was not a resolver error, keep using the previous addresses.
558 if (!args.addresses.ok() && latest_update_args_.config != nullptr) {
559 args.addresses = std::move(latest_update_args_.addresses);
560 }
561 // Update latest_update_args_.
562 latest_update_args_ = std::move(args);
563 // If we are not in idle, start connection attempt immediately.
564 // Otherwise, we defer the attempt into ExitIdleLocked().
565 if (!IsIdle()) {
566 AttemptToConnectUsingLatestUpdateArgsLocked();
567 }
568 return status;
569 }
570
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)571 void PickFirst::UpdateState(grpc_connectivity_state state,
572 const absl::Status& status,
573 RefCountedPtr<SubchannelPicker> picker) {
574 state_ = state;
575 channel_control_helper()->UpdateState(state, status, std::move(picker));
576 }
577
UnsetSelectedSubchannel()578 void PickFirst::UnsetSelectedSubchannel() {
579 if (selected_ != nullptr && health_data_watcher_ != nullptr) {
580 selected_->subchannel()->CancelDataWatcher(health_data_watcher_);
581 }
582 selected_ = nullptr;
583 health_watcher_ = nullptr;
584 health_data_watcher_ = nullptr;
585 }
586
587 //
588 // PickFirst::HealthWatcher
589 //
590
OnConnectivityStateChange(grpc_connectivity_state new_state,absl::Status status)591 void PickFirst::HealthWatcher::OnConnectivityStateChange(
592 grpc_connectivity_state new_state, absl::Status status) {
593 if (policy_->health_watcher_ != this) return;
594 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
595 gpr_log(GPR_INFO, "[PF %p] health watch state update: %s (%s)",
596 policy_.get(), ConnectivityStateName(new_state),
597 status.ToString().c_str());
598 }
599 switch (new_state) {
600 case GRPC_CHANNEL_READY:
601 policy_->channel_control_helper()->UpdateState(
602 GRPC_CHANNEL_READY, absl::OkStatus(),
603 MakeRefCounted<Picker>(policy_->selected_->subchannel()->Ref()));
604 break;
605 case GRPC_CHANNEL_IDLE:
606 // If the subchannel becomes disconnected, the health watcher
607 // might happen to see the change before the raw connectivity
608 // state watcher does. In this case, ignore it, since the raw
609 // connectivity state watcher will handle it shortly.
610 break;
611 case GRPC_CHANNEL_CONNECTING:
612 policy_->channel_control_helper()->UpdateState(
613 new_state, absl::OkStatus(),
614 MakeRefCounted<QueuePicker>(policy_->Ref()));
615 break;
616 case GRPC_CHANNEL_TRANSIENT_FAILURE:
617 policy_->channel_control_helper()->UpdateState(
618 GRPC_CHANNEL_TRANSIENT_FAILURE, status,
619 MakeRefCounted<TransientFailurePicker>(status));
620 break;
621 case GRPC_CHANNEL_SHUTDOWN:
622 Crash("health watcher reported state SHUTDOWN");
623 }
624 }
625
626 //
627 // PickFirst::SubchannelList::SubchannelData
628 //
629
SubchannelData(SubchannelList * subchannel_list,size_t index,RefCountedPtr<SubchannelInterface> subchannel)630 PickFirst::SubchannelList::SubchannelData::SubchannelData(
631 SubchannelList* subchannel_list, size_t index,
632 RefCountedPtr<SubchannelInterface> subchannel)
633 : subchannel_list_(subchannel_list),
634 index_(index),
635 subchannel_(std::move(subchannel)) {
636 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
637 gpr_log(GPR_INFO,
638 "[PF %p] subchannel list %p index %" PRIuPTR
639 " (subchannel %p): starting watch",
640 subchannel_list_->policy_.get(), subchannel_list_, index_,
641 subchannel_.get());
642 }
643 auto watcher = std::make_unique<Watcher>(
644 subchannel_list_->Ref(DEBUG_LOCATION, "Watcher"), index_);
645 pending_watcher_ = watcher.get();
646 subchannel_->WatchConnectivityState(std::move(watcher));
647 }
648
ShutdownLocked()649 void PickFirst::SubchannelList::SubchannelData::ShutdownLocked() {
650 if (subchannel_ != nullptr) {
651 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
652 gpr_log(GPR_INFO,
653 "[PF %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
654 " (subchannel %p): cancelling watch and unreffing subchannel",
655 subchannel_list_->policy_.get(), subchannel_list_, index_,
656 subchannel_list_->size(), subchannel_.get());
657 }
658 subchannel_->CancelConnectivityStateWatch(pending_watcher_);
659 pending_watcher_ = nullptr;
660 subchannel_.reset();
661 }
662 }
663
OnConnectivityStateChange(grpc_connectivity_state new_state,absl::Status status)664 void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange(
665 grpc_connectivity_state new_state, absl::Status status) {
666 PickFirst* p = subchannel_list_->policy_.get();
667 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
668 gpr_log(
669 GPR_INFO,
670 "[PF %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
671 " (subchannel %p): connectivity changed: old_state=%s, new_state=%s, "
672 "status=%s, shutting_down=%d, pending_watcher=%p, "
673 "seen_transient_failure=%d, p->selected_=%p, "
674 "p->subchannel_list_=%p, p->latest_pending_subchannel_list_=%p",
675 p, subchannel_list_, index_, subchannel_list_->size(),
676 subchannel_.get(),
677 (connectivity_state_.has_value()
678 ? ConnectivityStateName(*connectivity_state_)
679 : "N/A"),
680 ConnectivityStateName(new_state), status.ToString().c_str(),
681 subchannel_list_->shutting_down_, pending_watcher_,
682 seen_transient_failure_, p->selected_, p->subchannel_list_.get(),
683 p->latest_pending_subchannel_list_.get());
684 }
685 if (subchannel_list_->shutting_down_ || pending_watcher_ == nullptr) return;
686 auto& stats_plugins =
687 subchannel_list_->policy_->channel_control_helper()
688 ->GetStatsPluginGroup();
689 // The notification must be for a subchannel in either the current or
690 // latest pending subchannel lists.
691 GPR_ASSERT(subchannel_list_ == p->subchannel_list_.get() ||
692 subchannel_list_ == p->latest_pending_subchannel_list_.get());
693 GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN);
694 absl::optional<grpc_connectivity_state> old_state = connectivity_state_;
695 connectivity_state_ = new_state;
696 connectivity_status_ = std::move(status);
697 // Handle updates for the currently selected subchannel.
698 if (p->selected_ == this) {
699 GPR_ASSERT(subchannel_list_ == p->subchannel_list_.get());
700 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
701 gpr_log(GPR_INFO,
702 "Pick First %p selected subchannel connectivity changed to %s", p,
703 ConnectivityStateName(new_state));
704 }
705 // Any state change is considered to be a failure of the existing
706 // connection.
707 stats_plugins.AddCounter(
708 kMetricDisconnections, 1,
709 {subchannel_list_->policy_->channel_control_helper()->GetTarget()}, {});
710 // TODO(roth): We could check the connectivity states of all the
711 // subchannels here, just in case one of them happens to be READY,
712 // and we could switch to that rather than going IDLE.
713 // Request a re-resolution.
714 // TODO(qianchengz): We may want to request re-resolution in
715 // ExitIdleLocked().
716 p->channel_control_helper()->RequestReresolution();
717 // If there is a pending update, switch to the pending update.
718 if (p->latest_pending_subchannel_list_ != nullptr) {
719 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
720 gpr_log(GPR_INFO,
721 "Pick First %p promoting pending subchannel list %p to "
722 "replace %p",
723 p, p->latest_pending_subchannel_list_.get(),
724 p->subchannel_list_.get());
725 }
726 p->UnsetSelectedSubchannel();
727 p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
728 // Set our state to that of the pending subchannel list.
729 if (p->subchannel_list_->IsHappyEyeballsPassComplete()) {
730 status = absl::UnavailableError(absl::StrCat(
731 "selected subchannel failed; switching to pending update; "
732 "last failure: ",
733 p->subchannel_list_->last_failure_.ToString()));
734 p->UpdateState(GRPC_CHANNEL_TRANSIENT_FAILURE, status,
735 MakeRefCounted<TransientFailurePicker>(status));
736 } else if (p->state_ != GRPC_CHANNEL_TRANSIENT_FAILURE) {
737 p->UpdateState(GRPC_CHANNEL_CONNECTING, absl::Status(),
738 MakeRefCounted<QueuePicker>(nullptr));
739 }
740 return;
741 }
742 // Enter idle.
743 p->UnsetSelectedSubchannel();
744 p->subchannel_list_.reset();
745 p->UpdateState(
746 GRPC_CHANNEL_IDLE, absl::Status(),
747 MakeRefCounted<QueuePicker>(p->Ref(DEBUG_LOCATION, "QueuePicker")));
748 return;
749 }
750 // If we get here, there are two possible cases:
751 // 1. We do not currently have a selected subchannel, and the update is
752 // for a subchannel in p->subchannel_list_ that we're trying to
753 // connect to. The goal here is to find a subchannel that we can
754 // select.
755 // 2. We do currently have a selected subchannel, and the update is
756 // for a subchannel in p->latest_pending_subchannel_list_. The
757 // goal here is to find a subchannel from the update that we can
758 // select in place of the current one.
759 // If the subchannel is READY, use it.
760 if (new_state == GRPC_CHANNEL_READY) {
761 // We consider it a successful connection attempt only if the
762 // previous state was CONNECTING. In particular, we don't want to
763 // increment this counter if we got a new address list and found the
764 // existing connection already in state READY.
765 if (old_state == GRPC_CHANNEL_CONNECTING) {
766 stats_plugins.AddCounter(
767 kMetricConnectionAttemptsSucceeded, 1,
768 {subchannel_list_->policy_->channel_control_helper()->GetTarget()},
769 {});
770 }
771 ProcessUnselectedReadyLocked();
772 return;
773 }
774 // Make sure we note when a subchannel has seen TRANSIENT_FAILURE.
775 bool prev_seen_transient_failure = seen_transient_failure_;
776 if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
777 seen_transient_failure_ = true;
778 subchannel_list_->last_failure_ = connectivity_status_;
779 }
780 // If this is the initial connectivity state update for this subchannel,
781 // increment the counter in the subchannel list.
782 if (!old_state.has_value()) {
783 ++subchannel_list_->num_subchannels_seen_initial_notification_;
784 }
785 // If we haven't yet seen the initial connectivity state notification
786 // for all subchannels, do nothing.
787 if (!subchannel_list_->AllSubchannelsSeenInitialState()) return;
788 // If we're still here and this is the initial connectivity state
789 // notification for this subchannel, that means it was the last one to
790 // see its initial notification. Start trying to connect, starting
791 // with the first subchannel.
792 if (!old_state.has_value()) {
793 subchannel_list_->StartConnectingNextSubchannel();
794 return;
795 }
796 // We've already started trying to connect. Any subchannel that
797 // reports TF is a connection attempt failure.
798 if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
799 stats_plugins.AddCounter(
800 kMetricConnectionAttemptsFailed, 1,
801 {subchannel_list_->policy_->channel_control_helper()->GetTarget()}, {});
802 }
803 // Otherwise, process connectivity state change.
804 switch (*connectivity_state_) {
805 case GRPC_CHANNEL_TRANSIENT_FAILURE: {
806 // If this is the first failure we've seen on this subchannel,
807 // then we're still in the Happy Eyeballs pass.
808 if (!prev_seen_transient_failure && seen_transient_failure_) {
809 // If a connection attempt fails before the timer fires, then
810 // cancel the timer and start connecting on the next subchannel.
811 if (index_ == subchannel_list_->attempting_index_) {
812 if (subchannel_list_->timer_handle_.has_value()) {
813 p->channel_control_helper()->GetEventEngine()->Cancel(
814 *subchannel_list_->timer_handle_);
815 }
816 ++subchannel_list_->attempting_index_;
817 subchannel_list_->StartConnectingNextSubchannel();
818 } else {
819 // If this was the last subchannel to fail, check if the Happy
820 // Eyeballs pass is complete.
821 subchannel_list_->MaybeFinishHappyEyeballsPass();
822 }
823 } else if (subchannel_list_->IsHappyEyeballsPassComplete()) {
824 // We're done with the initial Happy Eyeballs pass and in a mode
825 // where we're attempting to connect to every subchannel in
826 // parallel. We count the number of failed connection attempts,
827 // and when that is equal to the number of subchannels, request
828 // re-resolution and report TRANSIENT_FAILURE again, so that the
829 // caller has the most recent status message. Note that this
830 // isn't necessarily the same as saying that we've seen one
831 // failure for each subchannel in the list, because the backoff
832 // state may be different in each subchannel, so we may have seen
833 // one subchannel fail more than once and another subchannel not
834 // fail at all. But it's a good enough heuristic.
835 ++subchannel_list_->num_failures_;
836 if (subchannel_list_->num_failures_ % subchannel_list_->size() == 0) {
837 p->channel_control_helper()->RequestReresolution();
838 status = absl::UnavailableError(absl::StrCat(
839 (p->omit_status_message_prefix_
840 ? ""
841 : "failed to connect to all addresses; last error: "),
842 connectivity_status_.ToString()));
843 p->UpdateState(GRPC_CHANNEL_TRANSIENT_FAILURE, status,
844 MakeRefCounted<TransientFailurePicker>(status));
845 }
846 }
847 break;
848 }
849 case GRPC_CHANNEL_IDLE:
850 // If we've finished the first Happy Eyeballs pass, then we go
851 // into a mode where we immediately try to connect to every
852 // subchannel in parallel.
853 if (subchannel_list_->IsHappyEyeballsPassComplete()) {
854 subchannel_->RequestConnection();
855 }
856 break;
857 case GRPC_CHANNEL_CONNECTING:
858 // Only update connectivity state in case 1, and only if we're not
859 // already in TRANSIENT_FAILURE.
860 if (subchannel_list_ == p->subchannel_list_.get() &&
861 p->state_ != GRPC_CHANNEL_TRANSIENT_FAILURE) {
862 p->UpdateState(GRPC_CHANNEL_CONNECTING, absl::Status(),
863 MakeRefCounted<QueuePicker>(nullptr));
864 }
865 break;
866 default:
867 // We handled READY above, and we should never see SHUTDOWN.
868 GPR_UNREACHABLE_CODE(break);
869 }
870 }
871
RequestConnectionWithTimer()872 void PickFirst::SubchannelList::SubchannelData::RequestConnectionWithTimer() {
873 GPR_ASSERT(connectivity_state_.has_value());
874 if (connectivity_state_ == GRPC_CHANNEL_IDLE) {
875 subchannel_->RequestConnection();
876 } else {
877 GPR_ASSERT(connectivity_state_ == GRPC_CHANNEL_CONNECTING);
878 }
879 // If this is not the last subchannel in the list, start the timer.
880 if (index_ != subchannel_list_->size() - 1) {
881 PickFirst* p = subchannel_list_->policy_.get();
882 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
883 gpr_log(GPR_INFO,
884 "Pick First %p subchannel list %p: starting Connection "
885 "Attempt Delay timer for %" PRId64 "ms for index %" PRIuPTR,
886 p, subchannel_list_, p->connection_attempt_delay_.millis(),
887 index_);
888 }
889 subchannel_list_->timer_handle_ =
890 p->channel_control_helper()->GetEventEngine()->RunAfter(
891 p->connection_attempt_delay_,
892 [subchannel_list =
893 subchannel_list_->Ref(DEBUG_LOCATION, "timer")]() mutable {
894 ApplicationCallbackExecCtx application_exec_ctx;
895 ExecCtx exec_ctx;
896 auto* sl = subchannel_list.get();
897 sl->policy_->work_serializer()->Run(
898 [subchannel_list = std::move(subchannel_list)]() {
899 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
900 gpr_log(GPR_INFO,
901 "Pick First %p subchannel list %p: Connection "
902 "Attempt Delay timer fired (shutting_down=%d, "
903 "selected=%p)",
904 subchannel_list->policy_.get(),
905 subchannel_list.get(),
906 subchannel_list->shutting_down_,
907 subchannel_list->policy_->selected_);
908 }
909 if (subchannel_list->shutting_down_) return;
910 if (subchannel_list->policy_->selected_ != nullptr) return;
911 ++subchannel_list->attempting_index_;
912 subchannel_list->StartConnectingNextSubchannel();
913 },
914 DEBUG_LOCATION);
915 });
916 }
917 }
918
ProcessUnselectedReadyLocked()919 void PickFirst::SubchannelList::SubchannelData::ProcessUnselectedReadyLocked() {
920 PickFirst* p = subchannel_list_->policy_.get();
921 // Cancel Happy Eyeballs timer, if any.
922 if (subchannel_list_->timer_handle_.has_value()) {
923 p->channel_control_helper()->GetEventEngine()->Cancel(
924 *subchannel_list_->timer_handle_);
925 }
926 // If we get here, there are two possible cases:
927 // 1. We do not currently have a selected subchannel, and the update is
928 // for a subchannel in p->subchannel_list_ that we're trying to
929 // connect to. The goal here is to find a subchannel that we can
930 // select.
931 // 2. We do currently have a selected subchannel, and the update is
932 // for a subchannel in p->latest_pending_subchannel_list_. The
933 // goal here is to find a subchannel from the update that we can
934 // select in place of the current one.
935 GPR_ASSERT(subchannel_list_ == p->subchannel_list_.get() ||
936 subchannel_list_ == p->latest_pending_subchannel_list_.get());
937 // Case 2. Promote p->latest_pending_subchannel_list_ to p->subchannel_list_.
938 if (subchannel_list_ == p->latest_pending_subchannel_list_.get()) {
939 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
940 gpr_log(GPR_INFO,
941 "Pick First %p promoting pending subchannel list %p to "
942 "replace %p",
943 p, p->latest_pending_subchannel_list_.get(),
944 p->subchannel_list_.get());
945 }
946 p->UnsetSelectedSubchannel();
947 p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
948 }
949 // Cases 1 and 2.
950 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
951 gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p,
952 subchannel_.get());
953 }
954 p->selected_ = this;
955 // If health checking is enabled, start the health watch, but don't
956 // report a new picker -- we want to stay in CONNECTING while we wait
957 // for the health status notification.
958 // If health checking is NOT enabled, report READY.
959 if (p->enable_health_watch_) {
960 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
961 gpr_log(GPR_INFO, "[PF %p] starting health watch", p);
962 }
963 auto watcher = std::make_unique<HealthWatcher>(
964 p->RefAsSubclass<PickFirst>(DEBUG_LOCATION, "HealthWatcher"));
965 p->health_watcher_ = watcher.get();
966 auto health_data_watcher = MakeHealthCheckWatcher(
967 p->work_serializer(), subchannel_list_->args_, std::move(watcher));
968 p->health_data_watcher_ = health_data_watcher.get();
969 subchannel_->AddDataWatcher(std::move(health_data_watcher));
970 } else {
971 p->UpdateState(GRPC_CHANNEL_READY, absl::Status(),
972 MakeRefCounted<Picker>(subchannel()->Ref()));
973 }
974 // Unref all other subchannels in the list.
975 for (size_t i = 0; i < subchannel_list_->size(); ++i) {
976 if (i != index_) {
977 subchannel_list_->subchannels_[i].ShutdownLocked();
978 }
979 }
980 }
981
982 //
983 // PickFirst::SubchannelList
984 //
985
SubchannelList(RefCountedPtr<PickFirst> policy,EndpointAddressesIterator * addresses,const ChannelArgs & args)986 PickFirst::SubchannelList::SubchannelList(RefCountedPtr<PickFirst> policy,
987 EndpointAddressesIterator* addresses,
988 const ChannelArgs& args)
989 : InternallyRefCounted<SubchannelList>(
990 GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace) ? "SubchannelList"
991 : nullptr),
992 policy_(std::move(policy)),
993 args_(args.Remove(GRPC_ARG_INTERNAL_PICK_FIRST_ENABLE_HEALTH_CHECKING)
994 .Remove(
995 GRPC_ARG_INTERNAL_PICK_FIRST_OMIT_STATUS_MESSAGE_PREFIX)) {
996 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
997 gpr_log(GPR_INFO, "[PF %p] Creating subchannel list %p - channel args: %s",
998 policy_.get(), this, args_.ToString().c_str());
999 }
1000 if (addresses == nullptr) return;
1001 // Create a subchannel for each address.
1002 addresses->ForEach([&](const EndpointAddresses& address) {
1003 GPR_ASSERT(address.addresses().size() == 1);
1004 RefCountedPtr<SubchannelInterface> subchannel =
1005 policy_->channel_control_helper()->CreateSubchannel(
1006 address.address(), address.args(), args_);
1007 if (subchannel == nullptr) {
1008 // Subchannel could not be created.
1009 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
1010 gpr_log(GPR_INFO,
1011 "[PF %p] could not create subchannel for address %s, ignoring",
1012 policy_.get(), address.ToString().c_str());
1013 }
1014 return;
1015 }
1016 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
1017 gpr_log(GPR_INFO,
1018 "[PF %p] subchannel list %p index %" PRIuPTR
1019 ": Created subchannel %p for address %s",
1020 policy_.get(), this, subchannels_.size(), subchannel.get(),
1021 address.ToString().c_str());
1022 }
1023 subchannels_.emplace_back(this, subchannels_.size(), std::move(subchannel));
1024 });
1025 }
1026
~SubchannelList()1027 PickFirst::SubchannelList::~SubchannelList() {
1028 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
1029 gpr_log(GPR_INFO, "[PF %p] Destroying subchannel_list %p", policy_.get(),
1030 this);
1031 }
1032 }
1033
Orphan()1034 void PickFirst::SubchannelList::Orphan() {
1035 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
1036 gpr_log(GPR_INFO, "[PF %p] Shutting down subchannel_list %p", policy_.get(),
1037 this);
1038 }
1039 GPR_ASSERT(!shutting_down_);
1040 shutting_down_ = true;
1041 for (auto& sd : subchannels_) {
1042 sd.ShutdownLocked();
1043 }
1044 if (timer_handle_.has_value()) {
1045 policy_->channel_control_helper()->GetEventEngine()->Cancel(*timer_handle_);
1046 }
1047 Unref();
1048 }
1049
ResetBackoffLocked()1050 void PickFirst::SubchannelList::ResetBackoffLocked() {
1051 for (auto& sd : subchannels_) {
1052 sd.ResetBackoffLocked();
1053 }
1054 }
1055
StartConnectingNextSubchannel()1056 void PickFirst::SubchannelList::StartConnectingNextSubchannel() {
1057 // Find the next subchannel not in state TRANSIENT_FAILURE.
1058 // We skip subchannels in state TRANSIENT_FAILURE to avoid a
1059 // large recursion that could overflow the stack.
1060 for (; attempting_index_ < size(); ++attempting_index_) {
1061 SubchannelData* sc = &subchannels_[attempting_index_];
1062 GPR_ASSERT(sc->connectivity_state().has_value());
1063 if (sc->connectivity_state() != GRPC_CHANNEL_TRANSIENT_FAILURE) {
1064 // Found a subchannel not in TRANSIENT_FAILURE, so trigger a
1065 // connection attempt.
1066 sc->RequestConnectionWithTimer();
1067 return;
1068 }
1069 }
1070 // If we didn't find a subchannel to request a connection on, check to
1071 // see if the Happy Eyeballs pass is complete.
1072 MaybeFinishHappyEyeballsPass();
1073 }
1074
MaybeFinishHappyEyeballsPass()1075 void PickFirst::SubchannelList::MaybeFinishHappyEyeballsPass() {
1076 // Make sure all subchannels have finished a connection attempt before
1077 // we consider the Happy Eyeballs pass complete.
1078 if (!IsHappyEyeballsPassComplete()) return;
1079 // We didn't find another subchannel not in state TRANSIENT_FAILURE,
1080 // so report TRANSIENT_FAILURE and switch to a mode in which we try to
1081 // connect to all addresses in parallel.
1082 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
1083 gpr_log(GPR_INFO,
1084 "Pick First %p subchannel list %p failed to connect to "
1085 "all subchannels",
1086 policy_.get(), this);
1087 }
1088 // In case 2, swap to the new subchannel list. This means reporting
1089 // TRANSIENT_FAILURE and dropping the existing (working) connection,
1090 // but we can't ignore what the control plane has told us.
1091 if (policy_->latest_pending_subchannel_list_.get() == this) {
1092 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
1093 gpr_log(GPR_INFO,
1094 "Pick First %p promoting pending subchannel list %p to "
1095 "replace %p",
1096 policy_.get(), policy_->latest_pending_subchannel_list_.get(),
1097 this);
1098 }
1099 policy_->UnsetSelectedSubchannel();
1100 policy_->subchannel_list_ =
1101 std::move(policy_->latest_pending_subchannel_list_);
1102 }
1103 // If this is the current subchannel list (either because we were
1104 // in case 1 or because we were in case 2 and just promoted it to
1105 // be the current list), re-resolve and report new state.
1106 if (policy_->subchannel_list_.get() == this) {
1107 policy_->channel_control_helper()->RequestReresolution();
1108 absl::Status status = absl::UnavailableError(
1109 absl::StrCat((policy_->omit_status_message_prefix_
1110 ? ""
1111 : "failed to connect to all addresses; last error: "),
1112 last_failure_.ToString()));
1113 policy_->UpdateState(GRPC_CHANNEL_TRANSIENT_FAILURE, status,
1114 MakeRefCounted<TransientFailurePicker>(status));
1115 }
1116 // We now transition into a mode where we try to connect to all
1117 // subchannels in parallel. For any subchannel currently in IDLE,
1118 // trigger a connection attempt. For any subchannel not currently in
1119 // IDLE, we will trigger a connection attempt when it does report IDLE.
1120 for (SubchannelData& sd : subchannels_) {
1121 if (sd.connectivity_state() == GRPC_CHANNEL_IDLE) {
1122 sd.RequestConnection();
1123 }
1124 }
1125 }
1126
1127 //
1128 // factory
1129 //
1130
1131 class PickFirstFactory final : public LoadBalancingPolicyFactory {
1132 public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const1133 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
1134 LoadBalancingPolicy::Args args) const override {
1135 return MakeOrphanable<PickFirst>(std::move(args));
1136 }
1137
name() const1138 absl::string_view name() const override { return kPickFirst; }
1139
1140 absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json & json) const1141 ParseLoadBalancingConfig(const Json& json) const override {
1142 return LoadFromJson<RefCountedPtr<PickFirstConfig>>(
1143 json, JsonArgs(), "errors validating pick_first LB policy config");
1144 }
1145 };
1146
1147 } // namespace
1148
RegisterPickFirstLbPolicy(CoreConfiguration::Builder * builder)1149 void RegisterPickFirstLbPolicy(CoreConfiguration::Builder* builder) {
1150 builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
1151 std::make_unique<PickFirstFactory>());
1152 }
1153
1154 } // namespace grpc_core
1155