1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include <stddef.h>
20 #include <stdint.h>
21
22 #include <atomic>
23 #include <map>
24 #include <memory>
25 #include <string>
26 #include <utility>
27 #include <vector>
28
29 #include "absl/base/thread_annotations.h"
30 #include "absl/status/status.h"
31 #include "absl/status/statusor.h"
32 #include "absl/strings/str_cat.h"
33 #include "absl/strings/string_view.h"
34 #include "absl/types/optional.h"
35 #include "absl/types/variant.h"
36
37 #include <grpc/impl/connectivity_state.h>
38 #include <grpc/support/log.h>
39
40 #include "src/core/client_channel/client_channel_internal.h"
41 #include "src/core/ext/xds/xds_bootstrap.h"
42 #include "src/core/ext/xds/xds_bootstrap_grpc.h"
43 #include "src/core/ext/xds/xds_client.h"
44 #include "src/core/ext/xds/xds_client_grpc.h"
45 #include "src/core/ext/xds/xds_client_stats.h"
46 #include "src/core/ext/xds/xds_endpoint.h"
47 #include "src/core/lib/channel/call_tracer.h"
48 #include "src/core/lib/channel/channel_args.h"
49 #include "src/core/lib/config/core_configuration.h"
50 #include "src/core/lib/debug/trace.h"
51 #include "src/core/lib/gprpp/debug_location.h"
52 #include "src/core/lib/gprpp/match.h"
53 #include "src/core/lib/gprpp/orphanable.h"
54 #include "src/core/lib/gprpp/ref_counted.h"
55 #include "src/core/lib/gprpp/ref_counted_ptr.h"
56 #include "src/core/lib/gprpp/ref_counted_string.h"
57 #include "src/core/lib/gprpp/sync.h"
58 #include "src/core/lib/gprpp/validation_errors.h"
59 #include "src/core/lib/iomgr/pollset_set.h"
60 #include "src/core/lib/iomgr/resolved_address.h"
61 #include "src/core/lib/json/json.h"
62 #include "src/core/lib/json/json_args.h"
63 #include "src/core/lib/json/json_object_loader.h"
64 #include "src/core/lib/security/credentials/xds/xds_credentials.h"
65 #include "src/core/lib/transport/connectivity_state.h"
66 #include "src/core/load_balancing/backend_metric_data.h"
67 #include "src/core/load_balancing/child_policy_handler.h"
68 #include "src/core/load_balancing/delegating_helper.h"
69 #include "src/core/load_balancing/lb_policy.h"
70 #include "src/core/load_balancing/lb_policy_factory.h"
71 #include "src/core/load_balancing/lb_policy_registry.h"
72 #include "src/core/load_balancing/subchannel_interface.h"
73 #include "src/core/load_balancing/xds/xds_channel_args.h"
74 #include "src/core/resolver/endpoint_addresses.h"
75 #include "src/core/resolver/xds/xds_dependency_manager.h"
76
77 namespace grpc_core {
78
79 TraceFlag grpc_xds_cluster_impl_lb_trace(false, "xds_cluster_impl_lb");
80
81 namespace {
82
83 using XdsConfig = XdsDependencyManager::XdsConfig;
84
85 //
86 // global circuit breaker atomic map
87 //
88
89 class CircuitBreakerCallCounterMap final {
90 public:
91 using Key =
92 std::pair<std::string /*cluster*/, std::string /*eds_service_name*/>;
93
94 class CallCounter final : public RefCounted<CallCounter> {
95 public:
CallCounter(Key key)96 explicit CallCounter(Key key) : key_(std::move(key)) {}
97 ~CallCounter() override;
98
Load()99 uint32_t Load() {
100 return concurrent_requests_.load(std::memory_order_seq_cst);
101 }
Increment()102 uint32_t Increment() { return concurrent_requests_.fetch_add(1); }
Decrement()103 void Decrement() { concurrent_requests_.fetch_sub(1); }
104
105 private:
106 Key key_;
107 std::atomic<uint32_t> concurrent_requests_{0};
108 };
109
110 RefCountedPtr<CallCounter> GetOrCreate(const std::string& cluster,
111 const std::string& eds_service_name);
112
113 private:
114 Mutex mu_;
115 std::map<Key, CallCounter*> map_ ABSL_GUARDED_BY(mu_);
116 };
117
118 CircuitBreakerCallCounterMap* const g_call_counter_map =
119 new CircuitBreakerCallCounterMap;
120
121 RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter>
GetOrCreate(const std::string & cluster,const std::string & eds_service_name)122 CircuitBreakerCallCounterMap::GetOrCreate(const std::string& cluster,
123 const std::string& eds_service_name) {
124 Key key(cluster, eds_service_name);
125 RefCountedPtr<CallCounter> result;
126 MutexLock lock(&mu_);
127 auto it = map_.find(key);
128 if (it == map_.end()) {
129 it = map_.insert({key, nullptr}).first;
130 } else {
131 result = it->second->RefIfNonZero();
132 }
133 if (result == nullptr) {
134 result = MakeRefCounted<CallCounter>(std::move(key));
135 it->second = result.get();
136 }
137 return result;
138 }
139
~CallCounter()140 CircuitBreakerCallCounterMap::CallCounter::~CallCounter() {
141 MutexLock lock(&g_call_counter_map->mu_);
142 auto it = g_call_counter_map->map_.find(key_);
143 if (it != g_call_counter_map->map_.end() && it->second == this) {
144 g_call_counter_map->map_.erase(it);
145 }
146 }
147
148 //
149 // LB policy
150 //
151
152 constexpr absl::string_view kXdsClusterImpl = "xds_cluster_impl_experimental";
153
154 // Config for xDS Cluster Impl LB policy.
155 class XdsClusterImplLbConfig final : public LoadBalancingPolicy::Config {
156 public:
157 XdsClusterImplLbConfig() = default;
158
159 XdsClusterImplLbConfig(const XdsClusterImplLbConfig&) = delete;
160 XdsClusterImplLbConfig& operator=(const XdsClusterImplLbConfig&) = delete;
161
162 XdsClusterImplLbConfig(XdsClusterImplLbConfig&& other) = delete;
163 XdsClusterImplLbConfig& operator=(XdsClusterImplLbConfig&& other) = delete;
164
name() const165 absl::string_view name() const override { return kXdsClusterImpl; }
166
cluster_name() const167 const std::string& cluster_name() const { return cluster_name_; }
child_policy() const168 RefCountedPtr<LoadBalancingPolicy::Config> child_policy() const {
169 return child_policy_;
170 }
171
172 static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
173 void JsonPostLoad(const Json& json, const JsonArgs& args,
174 ValidationErrors* errors);
175
176 private:
177 std::string cluster_name_;
178 RefCountedPtr<LoadBalancingPolicy::Config> child_policy_;
179 };
180
181 // xDS Cluster Impl LB policy.
182 class XdsClusterImplLb final : public LoadBalancingPolicy {
183 public:
184 XdsClusterImplLb(RefCountedPtr<GrpcXdsClient> xds_client, Args args);
185
name() const186 absl::string_view name() const override { return kXdsClusterImpl; }
187
188 absl::Status UpdateLocked(UpdateArgs args) override;
189 void ExitIdleLocked() override;
190 void ResetBackoffLocked() override;
191
192 private:
193 class StatsSubchannelWrapper final : public DelegatingSubchannel {
194 public:
195 // If load reporting is enabled and we have an XdsClusterLocalityStats
196 // object, that object already contains the locality label. We
197 // need to store the locality label directly only in the case where
198 // load reporting is disabled.
199 using LocalityData = absl::variant<
200 RefCountedStringValue /*locality*/,
201 RefCountedPtr<XdsClusterLocalityStats> /*locality_stats*/>;
202
StatsSubchannelWrapper(RefCountedPtr<SubchannelInterface> wrapped_subchannel,LocalityData locality_data)203 StatsSubchannelWrapper(
204 RefCountedPtr<SubchannelInterface> wrapped_subchannel,
205 LocalityData locality_data)
206 : DelegatingSubchannel(std::move(wrapped_subchannel)),
207 locality_data_(std::move(locality_data)) {}
208
locality() const209 RefCountedStringValue locality() const {
210 return Match(
211 locality_data_,
212 [](RefCountedStringValue locality) { return locality; },
213 [](const RefCountedPtr<XdsClusterLocalityStats>& locality_stats) {
214 return locality_stats->locality_name()->human_readable_string();
215 });
216 }
217
locality_stats() const218 XdsClusterLocalityStats* locality_stats() const {
219 return Match(
220 locality_data_,
221 [](const RefCountedStringValue&) {
222 return static_cast<XdsClusterLocalityStats*>(nullptr);
223 },
224 [](const RefCountedPtr<XdsClusterLocalityStats>& locality_stats) {
225 return locality_stats.get();
226 });
227 }
228
229 private:
230 LocalityData locality_data_;
231 };
232
233 // A picker that wraps the picker from the child to perform drops.
234 class Picker final : public SubchannelPicker {
235 public:
236 Picker(XdsClusterImplLb* xds_cluster_impl_lb,
237 RefCountedPtr<SubchannelPicker> picker);
238
239 PickResult Pick(PickArgs args) override;
240
241 private:
242 class SubchannelCallTracker;
243
244 RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter_;
245 uint32_t max_concurrent_requests_;
246 RefCountedStringValue service_telemetry_label_;
247 RefCountedStringValue namespace_telemetry_label_;
248 RefCountedPtr<XdsEndpointResource::DropConfig> drop_config_;
249 RefCountedPtr<XdsClusterDropStats> drop_stats_;
250 RefCountedPtr<SubchannelPicker> picker_;
251 };
252
253 class Helper final
254 : public ParentOwningDelegatingChannelControlHelper<XdsClusterImplLb> {
255 public:
Helper(RefCountedPtr<XdsClusterImplLb> xds_cluster_impl_policy)256 explicit Helper(RefCountedPtr<XdsClusterImplLb> xds_cluster_impl_policy)
257 : ParentOwningDelegatingChannelControlHelper(
258 std::move(xds_cluster_impl_policy)) {}
259
260 RefCountedPtr<SubchannelInterface> CreateSubchannel(
261 const grpc_resolved_address& address,
262 const ChannelArgs& per_address_args, const ChannelArgs& args) override;
263 void UpdateState(grpc_connectivity_state state, const absl::Status& status,
264 RefCountedPtr<SubchannelPicker> picker) override;
265 };
266
267 ~XdsClusterImplLb() override;
268
269 void ShutdownLocked() override;
270
271 void ResetState();
272 void ReportTransientFailure(absl::Status status);
273
274 OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
275 const ChannelArgs& args);
276 absl::Status UpdateChildPolicyLocked(
277 absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses,
278 std::string resolution_note, const ChannelArgs& args);
279
280 absl::StatusOr<RefCountedPtr<XdsCertificateProvider>>
281 MaybeCreateCertificateProviderLocked(
282 const XdsClusterResource& cluster_resource) const;
283
284 void MaybeUpdatePickerLocked();
285
286 // Current config from the resolver.
287 RefCountedPtr<XdsClusterImplLbConfig> config_;
288 std::shared_ptr<const XdsClusterResource> cluster_resource_;
289 RefCountedPtr<XdsEndpointResource::DropConfig> drop_config_;
290
291 // Current concurrent number of requests.
292 RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter_;
293
294 // Internal state.
295 bool shutting_down_ = false;
296
297 // The xds client.
298 RefCountedPtr<GrpcXdsClient> xds_client_;
299
300 // The stats for client-side load reporting.
301 RefCountedPtr<XdsClusterDropStats> drop_stats_;
302
303 OrphanablePtr<LoadBalancingPolicy> child_policy_;
304
305 // Latest state and picker reported by the child policy.
306 grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE;
307 absl::Status status_;
308 RefCountedPtr<SubchannelPicker> picker_;
309 };
310
311 //
312 // XdsClusterImplLb::Picker::SubchannelCallTracker
313 //
314
315 class XdsClusterImplLb::Picker::SubchannelCallTracker final
316 : public LoadBalancingPolicy::SubchannelCallTrackerInterface {
317 public:
SubchannelCallTracker(std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface> original_subchannel_call_tracker,RefCountedPtr<XdsClusterLocalityStats> locality_stats,RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter)318 SubchannelCallTracker(
319 std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
320 original_subchannel_call_tracker,
321 RefCountedPtr<XdsClusterLocalityStats> locality_stats,
322 RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter)
323 : original_subchannel_call_tracker_(
324 std::move(original_subchannel_call_tracker)),
325 locality_stats_(std::move(locality_stats)),
326 call_counter_(std::move(call_counter)) {}
327
~SubchannelCallTracker()328 ~SubchannelCallTracker() override {
329 locality_stats_.reset(DEBUG_LOCATION, "SubchannelCallTracker");
330 call_counter_.reset(DEBUG_LOCATION, "SubchannelCallTracker");
331 GPR_DEBUG_ASSERT(!started_);
332 }
333
Start()334 void Start() override {
335 // Increment number of calls in flight.
336 call_counter_->Increment();
337 // Record a call started.
338 if (locality_stats_ != nullptr) {
339 locality_stats_->AddCallStarted();
340 }
341 // Delegate if needed.
342 if (original_subchannel_call_tracker_ != nullptr) {
343 original_subchannel_call_tracker_->Start();
344 }
345 #ifndef NDEBUG
346 started_ = true;
347 #endif
348 }
349
Finish(FinishArgs args)350 void Finish(FinishArgs args) override {
351 // Delegate if needed.
352 if (original_subchannel_call_tracker_ != nullptr) {
353 original_subchannel_call_tracker_->Finish(args);
354 }
355 // Record call completion for load reporting.
356 if (locality_stats_ != nullptr) {
357 auto* backend_metric_data =
358 args.backend_metric_accessor->GetBackendMetricData();
359 const std::map<absl::string_view, double>* named_metrics = nullptr;
360 if (backend_metric_data != nullptr) {
361 named_metrics = &backend_metric_data->named_metrics;
362 }
363 locality_stats_->AddCallFinished(named_metrics, !args.status.ok());
364 }
365 // Decrement number of calls in flight.
366 call_counter_->Decrement();
367 #ifndef NDEBUG
368 started_ = false;
369 #endif
370 }
371
372 private:
373 std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
374 original_subchannel_call_tracker_;
375 RefCountedPtr<XdsClusterLocalityStats> locality_stats_;
376 RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter_;
377 #ifndef NDEBUG
378 bool started_ = false;
379 #endif
380 };
381
382 //
383 // XdsClusterImplLb::Picker
384 //
385
Picker(XdsClusterImplLb * xds_cluster_impl_lb,RefCountedPtr<SubchannelPicker> picker)386 XdsClusterImplLb::Picker::Picker(XdsClusterImplLb* xds_cluster_impl_lb,
387 RefCountedPtr<SubchannelPicker> picker)
388 : call_counter_(xds_cluster_impl_lb->call_counter_),
389 max_concurrent_requests_(
390 xds_cluster_impl_lb->cluster_resource_->max_concurrent_requests),
391 service_telemetry_label_(
392 xds_cluster_impl_lb->cluster_resource_->service_telemetry_label),
393 namespace_telemetry_label_(
394 xds_cluster_impl_lb->cluster_resource_->namespace_telemetry_label),
395 drop_config_(xds_cluster_impl_lb->drop_config_),
396 drop_stats_(xds_cluster_impl_lb->drop_stats_),
397 picker_(std::move(picker)) {
398 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
399 gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] constructed new picker %p",
400 xds_cluster_impl_lb, this);
401 }
402 }
403
Pick(LoadBalancingPolicy::PickArgs args)404 LoadBalancingPolicy::PickResult XdsClusterImplLb::Picker::Pick(
405 LoadBalancingPolicy::PickArgs args) {
406 auto* call_state = static_cast<ClientChannelLbCallState*>(args.call_state);
407 auto* call_attempt_tracer = call_state->GetCallAttemptTracer();
408 if (call_attempt_tracer != nullptr) {
409 call_attempt_tracer->SetOptionalLabel(
410 ClientCallTracer::CallAttemptTracer::OptionalLabelKey::kXdsServiceName,
411 service_telemetry_label_);
412 call_attempt_tracer->SetOptionalLabel(
413 ClientCallTracer::CallAttemptTracer::OptionalLabelKey::
414 kXdsServiceNamespace,
415 namespace_telemetry_label_);
416 }
417 // Handle EDS drops.
418 const std::string* drop_category;
419 if (drop_config_ != nullptr && drop_config_->ShouldDrop(&drop_category)) {
420 if (drop_stats_ != nullptr) drop_stats_->AddCallDropped(*drop_category);
421 return PickResult::Drop(absl::UnavailableError(
422 absl::StrCat("EDS-configured drop: ", *drop_category)));
423 }
424 // Check if we exceeded the max concurrent requests circuit breaking limit.
425 // Note: We check the value here, but we don't actually increment the
426 // counter for the current request until the channel calls the subchannel
427 // call tracker's Start() method. This means that we may wind up
428 // allowing more concurrent requests than the configured limit.
429 if (call_counter_->Load() >= max_concurrent_requests_) {
430 if (drop_stats_ != nullptr) drop_stats_->AddUncategorizedDrops();
431 return PickResult::Drop(absl::UnavailableError("circuit breaker drop"));
432 }
433 // If we're not dropping the call, we should always have a child picker.
434 if (picker_ == nullptr) { // Should never happen.
435 return PickResult::Fail(absl::InternalError(
436 "xds_cluster_impl picker not given any child picker"));
437 }
438 // Not dropping, so delegate to child picker.
439 PickResult result = picker_->Pick(args);
440 auto* complete_pick = absl::get_if<PickResult::Complete>(&result.result);
441 if (complete_pick != nullptr) {
442 auto* subchannel_wrapper =
443 static_cast<StatsSubchannelWrapper*>(complete_pick->subchannel.get());
444 // Add locality label to per-call metrics if needed.
445 if (call_attempt_tracer != nullptr) {
446 call_attempt_tracer->SetOptionalLabel(
447 ClientCallTracer::CallAttemptTracer::OptionalLabelKey::kLocality,
448 subchannel_wrapper->locality());
449 }
450 // Handle load reporting.
451 RefCountedPtr<XdsClusterLocalityStats> locality_stats;
452 if (subchannel_wrapper->locality_stats() != nullptr) {
453 locality_stats = subchannel_wrapper->locality_stats()->Ref(
454 DEBUG_LOCATION, "SubchannelCallTracker");
455 }
456 // Unwrap subchannel to pass back up the stack.
457 complete_pick->subchannel = subchannel_wrapper->wrapped_subchannel();
458 // Inject subchannel call tracker to record call completion.
459 complete_pick->subchannel_call_tracker =
460 std::make_unique<SubchannelCallTracker>(
461 std::move(complete_pick->subchannel_call_tracker),
462 std::move(locality_stats),
463 call_counter_->Ref(DEBUG_LOCATION, "SubchannelCallTracker"));
464 } else {
465 // TODO(roth): We should ideally also record call failures here in the case
466 // where a pick fails. This is challenging, because we don't know which
467 // picks are for wait_for_ready RPCs or how many times we'll return a
468 // failure for the same wait_for_ready RPC.
469 }
470 return result;
471 }
472
473 //
474 // XdsClusterImplLb
475 //
476
XdsClusterImplLb(RefCountedPtr<GrpcXdsClient> xds_client,Args args)477 XdsClusterImplLb::XdsClusterImplLb(RefCountedPtr<GrpcXdsClient> xds_client,
478 Args args)
479 : LoadBalancingPolicy(std::move(args)), xds_client_(std::move(xds_client)) {
480 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
481 gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] created -- using xds client %p",
482 this, xds_client_.get());
483 }
484 }
485
~XdsClusterImplLb()486 XdsClusterImplLb::~XdsClusterImplLb() {
487 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
488 gpr_log(GPR_INFO,
489 "[xds_cluster_impl_lb %p] destroying xds_cluster_impl LB policy",
490 this);
491 }
492 }
493
ShutdownLocked()494 void XdsClusterImplLb::ShutdownLocked() {
495 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
496 gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] shutting down", this);
497 }
498 shutting_down_ = true;
499 ResetState();
500 xds_client_.reset(DEBUG_LOCATION, "XdsClusterImpl");
501 }
502
ResetState()503 void XdsClusterImplLb::ResetState() {
504 // Remove the child policy's interested_parties pollset_set from the
505 // xDS policy.
506 if (child_policy_ != nullptr) {
507 grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
508 interested_parties());
509 child_policy_.reset();
510 }
511 // Drop our ref to the child's picker, in case it's holding a ref to
512 // the child.
513 picker_.reset();
514 drop_stats_.reset();
515 }
516
ReportTransientFailure(absl::Status status)517 void XdsClusterImplLb::ReportTransientFailure(absl::Status status) {
518 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
519 gpr_log(GPR_INFO,
520 "[xds_cluster_impl_lb %p] reporting TRANSIENT_FAILURE: %s", this,
521 status.ToString().c_str());
522 }
523 ResetState();
524 channel_control_helper()->UpdateState(
525 GRPC_CHANNEL_TRANSIENT_FAILURE, status,
526 MakeRefCounted<TransientFailurePicker>(status));
527 }
528
ExitIdleLocked()529 void XdsClusterImplLb::ExitIdleLocked() {
530 if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
531 }
532
ResetBackoffLocked()533 void XdsClusterImplLb::ResetBackoffLocked() {
534 // The XdsClient will have its backoff reset by the xds resolver, so we
535 // don't need to do it here.
536 if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
537 }
538
GetEdsResourceName(const XdsClusterResource & cluster_resource)539 std::string GetEdsResourceName(const XdsClusterResource& cluster_resource) {
540 auto* eds = absl::get_if<XdsClusterResource::Eds>(&cluster_resource.type);
541 if (eds == nullptr) return "";
542 return eds->eds_service_name;
543 }
544
UpdateLocked(UpdateArgs args)545 absl::Status XdsClusterImplLb::UpdateLocked(UpdateArgs args) {
546 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
547 gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] Received update", this);
548 }
549 // Grab new LB policy config.
550 auto new_config = args.config.TakeAsSubclass<XdsClusterImplLbConfig>();
551 // Cluster name should never change, because the cds policy will assign a
552 // different priority child name if that happens, which means that this
553 // policy instance will get replaced instead of being updated.
554 if (config_ != nullptr) {
555 GPR_ASSERT(config_->cluster_name() == new_config->cluster_name());
556 }
557 // Get xDS config.
558 auto new_xds_config = args.args.GetObjectRef<XdsConfig>();
559 if (new_xds_config == nullptr) {
560 // Should never happen.
561 absl::Status status = absl::InternalError(
562 "xDS config not passed to xds_cluster_impl LB policy");
563 ReportTransientFailure(status);
564 return status;
565 }
566 auto it = new_xds_config->clusters.find(new_config->cluster_name());
567 if (it == new_xds_config->clusters.end() || !it->second.ok() ||
568 it->second->cluster == nullptr) {
569 // Should never happen.
570 absl::Status status = absl::InternalError(absl::StrCat(
571 "xDS config has no entry for cluster ", new_config->cluster_name()));
572 ReportTransientFailure(status);
573 return status;
574 }
575 auto& new_cluster_config = *it->second;
576 auto* endpoint_config =
577 absl::get_if<XdsConfig::ClusterConfig::EndpointConfig>(
578 &new_cluster_config.children);
579 if (endpoint_config == nullptr) {
580 // Should never happen.
581 absl::Status status = absl::InternalError(
582 absl::StrCat("cluster config for ", new_config->cluster_name(),
583 " has no endpoint config"));
584 ReportTransientFailure(status);
585 return status;
586 }
587 auto xds_cert_provider =
588 MaybeCreateCertificateProviderLocked(*new_cluster_config.cluster);
589 if (!xds_cert_provider.ok()) {
590 // Should never happen.
591 ReportTransientFailure(xds_cert_provider.status());
592 return xds_cert_provider.status();
593 }
594 if (*xds_cert_provider != nullptr) {
595 args.args = args.args.SetObject(std::move(*xds_cert_provider));
596 }
597 // Now we've verified the new config is good.
598 // Get new and old (if any) EDS service name.
599 std::string new_eds_service_name =
600 GetEdsResourceName(*new_cluster_config.cluster);
601 std::string old_eds_service_name =
602 cluster_resource_ == nullptr ? ""
603 : GetEdsResourceName(*cluster_resource_);
604 // Update drop stats if needed.
605 // Note: We need a drop stats object whenever load reporting is enabled,
606 // even if we have no EDS drop config, because we also use it when
607 // reporting circuit breaker drops.
608 if (!new_cluster_config.cluster->lrs_load_reporting_server.has_value()) {
609 drop_stats_.reset();
610 } else if (cluster_resource_ == nullptr ||
611 old_eds_service_name != new_eds_service_name ||
612 cluster_resource_->lrs_load_reporting_server !=
613 new_cluster_config.cluster->lrs_load_reporting_server) {
614 drop_stats_ = xds_client_->AddClusterDropStats(
615 *new_cluster_config.cluster->lrs_load_reporting_server,
616 new_config->cluster_name(), new_eds_service_name);
617 if (drop_stats_ == nullptr) {
618 gpr_log(
619 GPR_ERROR,
620 "[xds_cluster_impl_lb %p] Failed to get cluster drop stats for "
621 "LRS server %s, cluster %s, EDS service name %s, load "
622 "reporting for drops will not be done.",
623 this,
624 new_cluster_config.cluster->lrs_load_reporting_server->server_uri()
625 .c_str(),
626 new_config->cluster_name().c_str(), new_eds_service_name.c_str());
627 }
628 }
629 // Update call counter if needed.
630 if (cluster_resource_ == nullptr ||
631 old_eds_service_name != new_eds_service_name) {
632 call_counter_ = g_call_counter_map->GetOrCreate(new_config->cluster_name(),
633 new_eds_service_name);
634 }
635 // Update config state, now that we're done comparing old and new fields.
636 config_ = std::move(new_config);
637 cluster_resource_ = new_cluster_config.cluster;
638 drop_config_ = endpoint_config->endpoints != nullptr
639 ? endpoint_config->endpoints->drop_config
640 : nullptr;
641 // Update picker in case some dependent config field changed.
642 MaybeUpdatePickerLocked();
643 // Update child policy.
644 return UpdateChildPolicyLocked(std::move(args.addresses),
645 std::move(args.resolution_note), args.args);
646 }
647
648 absl::StatusOr<RefCountedPtr<XdsCertificateProvider>>
MaybeCreateCertificateProviderLocked(const XdsClusterResource & cluster_resource) const649 XdsClusterImplLb::MaybeCreateCertificateProviderLocked(
650 const XdsClusterResource& cluster_resource) const {
651 // If the channel is not using XdsCreds, do nothing.
652 auto channel_credentials = channel_control_helper()->GetChannelCredentials();
653 if (channel_credentials == nullptr ||
654 channel_credentials->type() != XdsCredentials::Type()) {
655 return nullptr;
656 }
657 // Configure root cert.
658 absl::string_view root_provider_instance_name =
659 cluster_resource.common_tls_context.certificate_validation_context
660 .ca_certificate_provider_instance.instance_name;
661 absl::string_view root_cert_name =
662 cluster_resource.common_tls_context.certificate_validation_context
663 .ca_certificate_provider_instance.certificate_name;
664 RefCountedPtr<grpc_tls_certificate_provider> root_cert_provider;
665 if (!root_provider_instance_name.empty()) {
666 root_cert_provider =
667 xds_client_->certificate_provider_store()
668 .CreateOrGetCertificateProvider(root_provider_instance_name);
669 if (root_cert_provider == nullptr) {
670 return absl::InternalError(
671 absl::StrCat("Certificate provider instance name: \"",
672 root_provider_instance_name, "\" not recognized."));
673 }
674 }
675 // Configure identity cert.
676 absl::string_view identity_provider_instance_name =
677 cluster_resource.common_tls_context.tls_certificate_provider_instance
678 .instance_name;
679 absl::string_view identity_cert_name =
680 cluster_resource.common_tls_context.tls_certificate_provider_instance
681 .certificate_name;
682 RefCountedPtr<grpc_tls_certificate_provider> identity_cert_provider;
683 if (!identity_provider_instance_name.empty()) {
684 identity_cert_provider =
685 xds_client_->certificate_provider_store()
686 .CreateOrGetCertificateProvider(identity_provider_instance_name);
687 if (identity_cert_provider == nullptr) {
688 return absl::InternalError(
689 absl::StrCat("Certificate provider instance name: \"",
690 identity_provider_instance_name, "\" not recognized."));
691 }
692 }
693 // Configure SAN matchers.
694 const std::vector<StringMatcher>& san_matchers =
695 cluster_resource.common_tls_context.certificate_validation_context
696 .match_subject_alt_names;
697 // Create xds cert provider.
698 return MakeRefCounted<XdsCertificateProvider>(
699 root_cert_provider, root_cert_name, identity_cert_provider,
700 identity_cert_name, san_matchers);
701 }
702
MaybeUpdatePickerLocked()703 void XdsClusterImplLb::MaybeUpdatePickerLocked() {
704 // If we're dropping all calls, report READY, regardless of what (or
705 // whether) the child has reported.
706 if (drop_config_ != nullptr && drop_config_->drop_all()) {
707 auto drop_picker = MakeRefCounted<Picker>(this, picker_);
708 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
709 gpr_log(GPR_INFO,
710 "[xds_cluster_impl_lb %p] updating connectivity (drop all): "
711 "state=READY picker=%p",
712 this, drop_picker.get());
713 }
714 channel_control_helper()->UpdateState(GRPC_CHANNEL_READY, absl::Status(),
715 std::move(drop_picker));
716 return;
717 }
718 // Otherwise, update only if we have a child picker.
719 if (picker_ != nullptr) {
720 auto drop_picker = MakeRefCounted<Picker>(this, picker_);
721 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
722 gpr_log(GPR_INFO,
723 "[xds_cluster_impl_lb %p] updating connectivity: state=%s "
724 "status=(%s) picker=%p",
725 this, ConnectivityStateName(state_), status_.ToString().c_str(),
726 drop_picker.get());
727 }
728 channel_control_helper()->UpdateState(state_, status_,
729 std::move(drop_picker));
730 }
731 }
732
CreateChildPolicyLocked(const ChannelArgs & args)733 OrphanablePtr<LoadBalancingPolicy> XdsClusterImplLb::CreateChildPolicyLocked(
734 const ChannelArgs& args) {
735 LoadBalancingPolicy::Args lb_policy_args;
736 lb_policy_args.work_serializer = work_serializer();
737 lb_policy_args.args = args;
738 lb_policy_args.channel_control_helper = std::make_unique<Helper>(
739 RefAsSubclass<XdsClusterImplLb>(DEBUG_LOCATION, "Helper"));
740 OrphanablePtr<LoadBalancingPolicy> lb_policy =
741 MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
742 &grpc_xds_cluster_impl_lb_trace);
743 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
744 gpr_log(GPR_INFO,
745 "[xds_cluster_impl_lb %p] Created new child policy handler %p",
746 this, lb_policy.get());
747 }
748 // Add our interested_parties pollset_set to that of the newly created
749 // child policy. This will make the child policy progress upon activity on
750 // this policy, which in turn is tied to the application's call.
751 grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
752 interested_parties());
753 return lb_policy;
754 }
755
UpdateChildPolicyLocked(absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses,std::string resolution_note,const ChannelArgs & args)756 absl::Status XdsClusterImplLb::UpdateChildPolicyLocked(
757 absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses,
758 std::string resolution_note, const ChannelArgs& args) {
759 // Create policy if needed.
760 if (child_policy_ == nullptr) {
761 child_policy_ = CreateChildPolicyLocked(args);
762 }
763 // Construct update args.
764 UpdateArgs update_args;
765 update_args.addresses = std::move(addresses);
766 update_args.resolution_note = std::move(resolution_note);
767 update_args.config = config_->child_policy();
768 update_args.args =
769 args.Set(GRPC_ARG_XDS_CLUSTER_NAME, config_->cluster_name());
770 // Update the policy.
771 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
772 gpr_log(GPR_INFO,
773 "[xds_cluster_impl_lb %p] Updating child policy handler %p", this,
774 child_policy_.get());
775 }
776 return child_policy_->UpdateLocked(std::move(update_args));
777 }
778
779 //
780 // XdsClusterImplLb::Helper
781 //
782
CreateSubchannel(const grpc_resolved_address & address,const ChannelArgs & per_address_args,const ChannelArgs & args)783 RefCountedPtr<SubchannelInterface> XdsClusterImplLb::Helper::CreateSubchannel(
784 const grpc_resolved_address& address, const ChannelArgs& per_address_args,
785 const ChannelArgs& args) {
786 if (parent()->shutting_down_) return nullptr;
787 // Wrap the subchannel so that we pass along the locality label and
788 // (if load reporting is enabled) the locality stats object, which
789 // will be used by the picker.
790 auto locality_name = per_address_args.GetObjectRef<XdsLocalityName>();
791 RefCountedPtr<XdsClusterLocalityStats> locality_stats;
792 if (parent()->cluster_resource_->lrs_load_reporting_server.has_value()) {
793 locality_stats = parent()->xds_client_->AddClusterLocalityStats(
794 parent()->cluster_resource_->lrs_load_reporting_server.value(),
795 parent()->config_->cluster_name(),
796 GetEdsResourceName(*parent()->cluster_resource_), locality_name);
797 if (locality_stats == nullptr) {
798 gpr_log(GPR_ERROR,
799 "[xds_cluster_impl_lb %p] Failed to get locality stats object "
800 "for LRS server %s, cluster %s, EDS service name %s; load "
801 "reports will not be generated",
802 parent(),
803 parent()
804 ->cluster_resource_->lrs_load_reporting_server->server_uri()
805 .c_str(),
806 parent()->config_->cluster_name().c_str(),
807 GetEdsResourceName(*parent()->cluster_resource_).c_str());
808 }
809 }
810 StatsSubchannelWrapper::LocalityData locality_data;
811 if (locality_stats != nullptr) {
812 locality_data = std::move(locality_stats);
813 } else {
814 locality_data = locality_name->human_readable_string();
815 }
816 return MakeRefCounted<StatsSubchannelWrapper>(
817 parent()->channel_control_helper()->CreateSubchannel(
818 address, per_address_args, args),
819 std::move(locality_data));
820 }
821
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)822 void XdsClusterImplLb::Helper::UpdateState(
823 grpc_connectivity_state state, const absl::Status& status,
824 RefCountedPtr<SubchannelPicker> picker) {
825 if (parent()->shutting_down_) return;
826 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
827 gpr_log(GPR_INFO,
828 "[xds_cluster_impl_lb %p] child connectivity state update: "
829 "state=%s (%s) picker=%p",
830 parent(), ConnectivityStateName(state), status.ToString().c_str(),
831 picker.get());
832 }
833 // Save the state and picker.
834 parent()->state_ = state;
835 parent()->status_ = status;
836 parent()->picker_ = std::move(picker);
837 // Wrap the picker and return it to the channel.
838 parent()->MaybeUpdatePickerLocked();
839 }
840
841 //
842 // factory
843 //
844
JsonLoader(const JsonArgs &)845 const JsonLoaderInterface* XdsClusterImplLbConfig::JsonLoader(const JsonArgs&) {
846 static const auto* loader =
847 JsonObjectLoader<XdsClusterImplLbConfig>()
848 // Note: Some fields require custom processing, so they are
849 // handled in JsonPostLoad() instead.
850 .Field("clusterName", &XdsClusterImplLbConfig::cluster_name_)
851 .Finish();
852 return loader;
853 }
854
JsonPostLoad(const Json & json,const JsonArgs &,ValidationErrors * errors)855 void XdsClusterImplLbConfig::JsonPostLoad(const Json& json, const JsonArgs&,
856 ValidationErrors* errors) {
857 // Parse "childPolicy" field.
858 ValidationErrors::ScopedField field(errors, ".childPolicy");
859 auto it = json.object().find("childPolicy");
860 if (it == json.object().end()) {
861 errors->AddError("field not present");
862 } else {
863 auto lb_config =
864 CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
865 it->second);
866 if (!lb_config.ok()) {
867 errors->AddError(lb_config.status().message());
868 } else {
869 child_policy_ = std::move(*lb_config);
870 }
871 }
872 }
873
874 class XdsClusterImplLbFactory final : public LoadBalancingPolicyFactory {
875 public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const876 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
877 LoadBalancingPolicy::Args args) const override {
878 auto xds_client = args.args.GetObjectRef<GrpcXdsClient>(DEBUG_LOCATION,
879 "XdsClusterImplLb");
880 if (xds_client == nullptr) {
881 gpr_log(GPR_ERROR,
882 "XdsClient not present in channel args -- cannot instantiate "
883 "xds_cluster_impl LB policy");
884 return nullptr;
885 }
886 return MakeOrphanable<XdsClusterImplLb>(std::move(xds_client),
887 std::move(args));
888 }
889
name() const890 absl::string_view name() const override { return kXdsClusterImpl; }
891
892 absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json & json) const893 ParseLoadBalancingConfig(const Json& json) const override {
894 return LoadFromJson<RefCountedPtr<XdsClusterImplLbConfig>>(
895 json, JsonArgs(),
896 "errors validating xds_cluster_impl LB policy config");
897 }
898 };
899
900 } // namespace
901
RegisterXdsClusterImplLbPolicy(CoreConfiguration::Builder * builder)902 void RegisterXdsClusterImplLbPolicy(CoreConfiguration::Builder* builder) {
903 builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
904 std::make_unique<XdsClusterImplLbFactory>());
905 }
906
907 } // namespace grpc_core
908