1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include "src/core/ext/xds/xds_client.h"
20
21 #include <inttypes.h>
22 #include <string.h>
23
24 #include <algorithm>
25 #include <functional>
26 #include <memory>
27 #include <string>
28 #include <type_traits>
29 #include <vector>
30
31 #include "absl/cleanup/cleanup.h"
32 #include "absl/strings/match.h"
33 #include "absl/strings/str_cat.h"
34 #include "absl/strings/str_join.h"
35 #include "absl/strings/str_split.h"
36 #include "absl/strings/string_view.h"
37 #include "absl/strings/strip.h"
38 #include "absl/types/optional.h"
39 #include "envoy/config/core/v3/base.upb.h"
40 #include "envoy/service/status/v3/csds.upb.h"
41 #include "google/protobuf/any.upb.h"
42 #include "google/protobuf/timestamp.upb.h"
43 #include "upb/base/string_view.h"
44 #include "upb/mem/arena.h"
45
46 #include <grpc/event_engine/event_engine.h>
47 #include <grpc/support/log.h>
48
49 #include "src/core/ext/xds/upb_utils.h"
50 #include "src/core/ext/xds/xds_api.h"
51 #include "src/core/ext/xds/xds_bootstrap.h"
52 #include "src/core/ext/xds/xds_client_stats.h"
53 #include "src/core/lib/backoff/backoff.h"
54 #include "src/core/lib/gprpp/debug_location.h"
55 #include "src/core/lib/gprpp/orphanable.h"
56 #include "src/core/lib/gprpp/ref_counted_ptr.h"
57 #include "src/core/lib/gprpp/sync.h"
58 #include "src/core/lib/iomgr/exec_ctx.h"
59 #include "src/core/lib/uri/uri_parser.h"
60
61 #define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1
62 #define GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER 1.6
63 #define GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS 120
64 #define GRPC_XDS_RECONNECT_JITTER 0.2
65 #define GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS 1000
66
67 namespace grpc_core {
68
69 using ::grpc_event_engine::experimental::EventEngine;
70
71 TraceFlag grpc_xds_client_trace(false, "xds_client");
72 TraceFlag grpc_xds_client_refcount_trace(false, "xds_client_refcount");
73
74 //
75 // Internal class declarations
76 //
77
78 // An xds call wrapper that can restart a call upon failure. Holds a ref to
79 // the xds channel. The template parameter is the kind of wrapped xds call.
80 template <typename T>
81 class XdsClient::XdsChannel::RetryableCall final
82 : public InternallyRefCounted<RetryableCall<T>> {
83 public:
84 explicit RetryableCall(WeakRefCountedPtr<XdsChannel> xds_channel);
85
86 // Disable thread-safety analysis because this method is called via
87 // OrphanablePtr<>, but there's no way to pass the lock annotation
88 // through there.
89 void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
90
91 void OnCallFinishedLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
92
call() const93 T* call() const { return call_.get(); }
xds_channel() const94 XdsChannel* xds_channel() const { return xds_channel_.get(); }
95
96 bool IsCurrentCallOnChannel() const;
97
98 private:
99 void StartNewCallLocked();
100 void StartRetryTimerLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
101
102 void OnRetryTimer();
103
104 // The wrapped xds call that talks to the xds server. It's instantiated
105 // every time we start a new call. It's null during call retry backoff.
106 OrphanablePtr<T> call_;
107 // The owning xds channel.
108 WeakRefCountedPtr<XdsChannel> xds_channel_;
109
110 // Retry state.
111 BackOff backoff_;
112 absl::optional<EventEngine::TaskHandle> timer_handle_
113 ABSL_GUARDED_BY(&XdsClient::mu_);
114
115 bool shutting_down_ = false;
116 };
117
118 // Contains an ADS call to the xds server.
119 class XdsClient::XdsChannel::AdsCall final
120 : public InternallyRefCounted<AdsCall> {
121 public:
122 // The ctor and dtor should not be used directly.
123 explicit AdsCall(RefCountedPtr<RetryableCall<AdsCall>> retryable_call);
124
125 void Orphan() override;
126
retryable_call() const127 RetryableCall<AdsCall>* retryable_call() const {
128 return retryable_call_.get();
129 }
xds_channel() const130 XdsChannel* xds_channel() const { return retryable_call_->xds_channel(); }
xds_client() const131 XdsClient* xds_client() const { return xds_channel()->xds_client(); }
seen_response() const132 bool seen_response() const { return seen_response_; }
133
134 void SubscribeLocked(const XdsResourceType* type, const XdsResourceName& name,
135 bool delay_send)
136 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
137 void UnsubscribeLocked(const XdsResourceType* type,
138 const XdsResourceName& name, bool delay_unsubscription)
139 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
140
141 bool HasSubscribedResources() const;
142
143 private:
144 class AdsReadDelayHandle;
145
146 class AdsResponseParser final : public XdsApi::AdsResponseParserInterface {
147 public:
148 struct Result {
149 const XdsResourceType* type;
150 std::string type_url;
151 std::string version;
152 std::string nonce;
153 std::vector<std::string> errors;
154 std::map<std::string /*authority*/, std::set<XdsResourceKey>>
155 resources_seen;
156 uint64_t num_valid_resources = 0;
157 uint64_t num_invalid_resources = 0;
158 RefCountedPtr<ReadDelayHandle> read_delay_handle;
159 };
160
AdsResponseParser(AdsCall * ads_call)161 explicit AdsResponseParser(AdsCall* ads_call) : ads_call_(ads_call) {}
162
163 absl::Status ProcessAdsResponseFields(AdsResponseFields fields) override
164 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
165
166 void ParseResource(upb_Arena* arena, size_t idx, absl::string_view type_url,
167 absl::string_view resource_name,
168 absl::string_view serialized_resource) override
169 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
170
171 void ResourceWrapperParsingFailed(size_t idx,
172 absl::string_view message) override;
173
TakeResult()174 Result TakeResult() { return std::move(result_); }
175
176 private:
xds_client() const177 XdsClient* xds_client() const { return ads_call_->xds_client(); }
178
179 AdsCall* ads_call_;
180 const Timestamp update_time_ = Timestamp::Now();
181 Result result_;
182 };
183
184 class ResourceTimer final : public InternallyRefCounted<ResourceTimer> {
185 public:
ResourceTimer(const XdsResourceType * type,const XdsResourceName & name)186 ResourceTimer(const XdsResourceType* type, const XdsResourceName& name)
187 : type_(type), name_(name) {}
188
189 // Disable thread-safety analysis because this method is called via
190 // OrphanablePtr<>, but there's no way to pass the lock annotation
191 // through there.
Orphan()192 void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS {
193 MaybeCancelTimer();
194 Unref(DEBUG_LOCATION, "Orphan");
195 }
196
MarkSubscriptionSendStarted()197 void MarkSubscriptionSendStarted()
198 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
199 subscription_sent_ = true;
200 }
201
MaybeMarkSubscriptionSendComplete(RefCountedPtr<AdsCall> ads_call)202 void MaybeMarkSubscriptionSendComplete(RefCountedPtr<AdsCall> ads_call)
203 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
204 if (subscription_sent_) MaybeStartTimer(std::move(ads_call));
205 }
206
MarkSeen()207 void MarkSeen() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
208 resource_seen_ = true;
209 MaybeCancelTimer();
210 }
211
MaybeCancelTimer()212 void MaybeCancelTimer() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
213 if (timer_handle_.has_value() &&
214 ads_call_->xds_client()->engine()->Cancel(*timer_handle_)) {
215 timer_handle_.reset();
216 }
217 }
218
219 private:
MaybeStartTimer(RefCountedPtr<AdsCall> ads_call)220 void MaybeStartTimer(RefCountedPtr<AdsCall> ads_call)
221 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
222 // Don't start timer if we've already either seen the resource or
223 // marked it as non-existing.
224 // Note: There are edge cases where we can have seen the resource
225 // before we have sent the initial subscription request, such as
226 // when we unsubscribe and then resubscribe to a given resource
227 // and then get a response containing that resource, all while a
228 // send_message op is in flight.
229 if (resource_seen_) return;
230 // Don't start timer if we haven't yet sent the initial subscription
231 // request for the resource.
232 if (!subscription_sent_) return;
233 // Don't start timer if it's already running.
234 if (timer_handle_.has_value()) return;
235 // Check if we already have a cached version of this resource
236 // (i.e., if this is the initial request for the resource after an
237 // ADS stream restart). If so, we don't start the timer, because
238 // (a) we already have the resource and (b) the server may
239 // optimize by not resending the resource that we already have.
240 auto& authority_state =
241 ads_call->xds_client()->authority_state_map_[name_.authority];
242 ResourceState& state = authority_state.resource_map[type_][name_.key];
243 if (state.resource != nullptr) return;
244 // Start timer.
245 ads_call_ = std::move(ads_call);
246 timer_handle_ = ads_call_->xds_client()->engine()->RunAfter(
247 ads_call_->xds_client()->request_timeout_,
248 [self = Ref(DEBUG_LOCATION, "timer")]() {
249 ApplicationCallbackExecCtx callback_exec_ctx;
250 ExecCtx exec_ctx;
251 self->OnTimer();
252 });
253 }
254
OnTimer()255 void OnTimer() {
256 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
257 gpr_log(GPR_INFO,
258 "[xds_client %p] xds server %s: timeout obtaining resource "
259 "{type=%s name=%s} from xds server",
260 ads_call_->xds_client(),
261 ads_call_->xds_channel()->server_.server_uri().c_str(),
262 std::string(type_->type_url()).c_str(),
263 XdsClient::ConstructFullXdsResourceName(
264 name_.authority, type_->type_url(), name_.key)
265 .c_str());
266 }
267 {
268 MutexLock lock(&ads_call_->xds_client()->mu_);
269 timer_handle_.reset();
270 resource_seen_ = true;
271 auto& authority_state =
272 ads_call_->xds_client()->authority_state_map_[name_.authority];
273 ResourceState& state = authority_state.resource_map[type_][name_.key];
274 state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST;
275 ads_call_->xds_client()->NotifyWatchersOnResourceDoesNotExist(
276 state.watchers, ReadDelayHandle::NoWait());
277 }
278 ads_call_->xds_client()->work_serializer_.DrainQueue();
279 ads_call_.reset();
280 }
281
282 const XdsResourceType* type_;
283 const XdsResourceName name_;
284
285 RefCountedPtr<AdsCall> ads_call_;
286 // True if we have sent the initial subscription request for this
287 // resource on this ADS stream.
288 bool subscription_sent_ ABSL_GUARDED_BY(&XdsClient::mu_) = false;
289 // True if we have either (a) seen the resource in a response on this
290 // stream or (b) declared the resource to not exist due to the timer
291 // firing.
292 bool resource_seen_ ABSL_GUARDED_BY(&XdsClient::mu_) = false;
293 absl::optional<EventEngine::TaskHandle> timer_handle_
294 ABSL_GUARDED_BY(&XdsClient::mu_);
295 };
296
297 class StreamEventHandler final
298 : public XdsTransportFactory::XdsTransport::StreamingCall::EventHandler {
299 public:
StreamEventHandler(RefCountedPtr<AdsCall> ads_call)300 explicit StreamEventHandler(RefCountedPtr<AdsCall> ads_call)
301 : ads_call_(std::move(ads_call)) {}
302
OnRequestSent(bool ok)303 void OnRequestSent(bool ok) override { ads_call_->OnRequestSent(ok); }
OnRecvMessage(absl::string_view payload)304 void OnRecvMessage(absl::string_view payload) override {
305 ads_call_->OnRecvMessage(payload);
306 }
OnStatusReceived(absl::Status status)307 void OnStatusReceived(absl::Status status) override {
308 ads_call_->OnStatusReceived(std::move(status));
309 }
310
311 private:
312 RefCountedPtr<AdsCall> ads_call_;
313 };
314
315 struct ResourceTypeState {
316 // Nonce and status for this resource type.
317 std::string nonce;
318 absl::Status status;
319
320 // Subscribed resources of this type.
321 std::map<std::string /*authority*/,
322 std::map<XdsResourceKey, OrphanablePtr<ResourceTimer>>>
323 subscribed_resources;
324 };
325
326 void SendMessageLocked(const XdsResourceType* type)
327 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
328
329 void OnRequestSent(bool ok);
330 void OnRecvMessage(absl::string_view payload);
331 void OnStatusReceived(absl::Status status);
332
333 bool IsCurrentCallOnChannel() const;
334
335 // Constructs a list of resource names of a given type for an ADS
336 // request. Also starts the timer for each resource if needed.
337 std::vector<std::string> ResourceNamesForRequest(const XdsResourceType* type)
338 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
339
340 // The owning RetryableCall<>.
341 RefCountedPtr<RetryableCall<AdsCall>> retryable_call_;
342
343 OrphanablePtr<XdsTransportFactory::XdsTransport::StreamingCall>
344 streaming_call_;
345
346 bool sent_initial_message_ = false;
347 bool seen_response_ = false;
348
349 const XdsResourceType* send_message_pending_
350 ABSL_GUARDED_BY(&XdsClient::mu_) = nullptr;
351
352 // Resource types for which requests need to be sent.
353 std::set<const XdsResourceType*> buffered_requests_;
354
355 // State for each resource type.
356 std::map<const XdsResourceType*, ResourceTypeState> state_map_;
357 };
358
359 // Contains an LRS call to the xds server.
360 class XdsClient::XdsChannel::LrsCall final
361 : public InternallyRefCounted<LrsCall> {
362 public:
363 // The ctor and dtor should not be used directly.
364 explicit LrsCall(RefCountedPtr<RetryableCall<LrsCall>> retryable_call);
365
366 void Orphan() override;
367
retryable_call()368 RetryableCall<LrsCall>* retryable_call() { return retryable_call_.get(); }
xds_channel() const369 XdsChannel* xds_channel() const { return retryable_call_->xds_channel(); }
xds_client() const370 XdsClient* xds_client() const { return xds_channel()->xds_client(); }
seen_response() const371 bool seen_response() const { return seen_response_; }
372
373 private:
374 class StreamEventHandler final
375 : public XdsTransportFactory::XdsTransport::StreamingCall::EventHandler {
376 public:
StreamEventHandler(RefCountedPtr<LrsCall> lrs_call)377 explicit StreamEventHandler(RefCountedPtr<LrsCall> lrs_call)
378 : lrs_call_(std::move(lrs_call)) {}
379
OnRequestSent(bool)380 void OnRequestSent(bool /*ok*/) override { lrs_call_->OnRequestSent(); }
OnRecvMessage(absl::string_view payload)381 void OnRecvMessage(absl::string_view payload) override {
382 lrs_call_->OnRecvMessage(payload);
383 }
OnStatusReceived(absl::Status status)384 void OnStatusReceived(absl::Status status) override {
385 lrs_call_->OnStatusReceived(std::move(status));
386 }
387
388 private:
389 RefCountedPtr<LrsCall> lrs_call_;
390 };
391
392 // A repeating timer for a particular duration.
393 class Timer final : public InternallyRefCounted<Timer> {
394 public:
Timer(RefCountedPtr<LrsCall> lrs_call)395 explicit Timer(RefCountedPtr<LrsCall> lrs_call)
396 : lrs_call_(std::move(lrs_call)) {}
~Timer()397 ~Timer() override { lrs_call_.reset(DEBUG_LOCATION, "LRS timer"); }
398
399 // Disable thread-safety analysis because this method is called via
400 // OrphanablePtr<>, but there's no way to pass the lock annotation
401 // through there.
402 void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
403
404 void ScheduleNextReportLocked()
405 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
406
407 private:
IsCurrentTimerOnCall() const408 bool IsCurrentTimerOnCall() const {
409 return this == lrs_call_->timer_.get();
410 }
xds_client() const411 XdsClient* xds_client() const { return lrs_call_->xds_client(); }
412
413 void OnNextReportTimer();
414
415 // The owning LRS call.
416 RefCountedPtr<LrsCall> lrs_call_;
417
418 absl::optional<EventEngine::TaskHandle> timer_handle_
419 ABSL_GUARDED_BY(&XdsClient::mu_);
420 };
421
422 void MaybeScheduleNextReportLocked()
423 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
424
425 void SendReportLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
426
427 void SendMessageLocked(std::string payload)
428 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
429
430 void OnRequestSent();
431 void OnRecvMessage(absl::string_view payload);
432 void OnStatusReceived(absl::Status status);
433
434 bool IsCurrentCallOnChannel() const;
435
436 // The owning RetryableCall<>.
437 RefCountedPtr<RetryableCall<LrsCall>> retryable_call_;
438
439 OrphanablePtr<XdsTransportFactory::XdsTransport::StreamingCall>
440 streaming_call_;
441
442 bool seen_response_ = false;
443 bool send_message_pending_ ABSL_GUARDED_BY(&XdsClient::mu_) = false;
444
445 // Load reporting state.
446 bool send_all_clusters_ = false;
447 std::set<std::string> cluster_names_; // Asked for by the LRS server.
448 Duration load_reporting_interval_;
449 bool last_report_counters_were_zero_ = false;
450 OrphanablePtr<Timer> timer_;
451 };
452
453 //
454 // XdsClient::XdsChannel
455 //
456
XdsChannel(WeakRefCountedPtr<XdsClient> xds_client,const XdsBootstrap::XdsServer & server)457 XdsClient::XdsChannel::XdsChannel(WeakRefCountedPtr<XdsClient> xds_client,
458 const XdsBootstrap::XdsServer& server)
459 : DualRefCounted<XdsChannel>(
460 GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace) ? "XdsChannel"
461 : nullptr),
462 xds_client_(std::move(xds_client)),
463 server_(server) {
464 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
465 gpr_log(GPR_INFO, "[xds_client %p] creating channel %p for server %s",
466 xds_client_.get(), this, server.server_uri().c_str());
467 }
468 absl::Status status;
469 transport_ = xds_client_->transport_factory_->Create(
470 server,
471 [self = WeakRef(DEBUG_LOCATION, "OnConnectivityFailure")](
472 absl::Status status) {
473 self->OnConnectivityFailure(std::move(status));
474 },
475 &status);
476 GPR_ASSERT(transport_ != nullptr);
477 if (!status.ok()) SetChannelStatusLocked(std::move(status));
478 }
479
~XdsChannel()480 XdsClient::XdsChannel::~XdsChannel() {
481 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
482 gpr_log(GPR_INFO, "[xds_client %p] destroying xds channel %p for server %s",
483 xds_client(), this, server_.server_uri().c_str());
484 }
485 xds_client_.reset(DEBUG_LOCATION, "XdsChannel");
486 }
487
488 // This method should only ever be called when holding the lock, but we can't
489 // use a ABSL_EXCLUSIVE_LOCKS_REQUIRED annotation, because Orphan() will be
490 // called from DualRefCounted::Unref, which cannot have a lock annotation for
491 // a lock in this subclass.
Orphaned()492 void XdsClient::XdsChannel::Orphaned() ABSL_NO_THREAD_SAFETY_ANALYSIS {
493 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
494 gpr_log(GPR_INFO, "[xds_client %p] orphaning xds channel %p for server %s",
495 xds_client(), this, server_.server_uri().c_str());
496 }
497 shutting_down_ = true;
498 transport_.reset();
499 // At this time, all strong refs are removed, remove from channel map to
500 // prevent subsequent subscription from trying to use this XdsChannel as
501 // it is shutting down.
502 xds_client_->xds_channel_map_.erase(server_.Key());
503 ads_call_.reset();
504 lrs_call_.reset();
505 }
506
ResetBackoff()507 void XdsClient::XdsChannel::ResetBackoff() { transport_->ResetBackoff(); }
508
ads_call() const509 XdsClient::XdsChannel::AdsCall* XdsClient::XdsChannel::ads_call() const {
510 return ads_call_->call();
511 }
512
lrs_call() const513 XdsClient::XdsChannel::LrsCall* XdsClient::XdsChannel::lrs_call() const {
514 return lrs_call_->call();
515 }
516
MaybeStartLrsCall()517 void XdsClient::XdsChannel::MaybeStartLrsCall() {
518 if (lrs_call_ != nullptr) return;
519 lrs_call_.reset(
520 new RetryableCall<LrsCall>(WeakRef(DEBUG_LOCATION, "XdsChannel+lrs")));
521 }
522
StopLrsCallLocked()523 void XdsClient::XdsChannel::StopLrsCallLocked() {
524 xds_client_->xds_load_report_server_map_.erase(server_.Key());
525 lrs_call_.reset();
526 }
527
SubscribeLocked(const XdsResourceType * type,const XdsResourceName & name)528 void XdsClient::XdsChannel::SubscribeLocked(const XdsResourceType* type,
529 const XdsResourceName& name) {
530 if (ads_call_ == nullptr) {
531 // Start the ADS call if this is the first request.
532 ads_call_.reset(
533 new RetryableCall<AdsCall>(WeakRef(DEBUG_LOCATION, "XdsChannel+ads")));
534 // Note: AdsCall's ctor will automatically subscribe to all
535 // resources that the XdsClient already has watchers for, so we can
536 // return here.
537 return;
538 }
539 // If the ADS call is in backoff state, we don't need to do anything now
540 // because when the call is restarted it will resend all necessary requests.
541 if (ads_call() == nullptr) return;
542 // Subscribe to this resource if the ADS call is active.
543 ads_call()->SubscribeLocked(type, name, /*delay_send=*/false);
544 }
545
UnsubscribeLocked(const XdsResourceType * type,const XdsResourceName & name,bool delay_unsubscription)546 void XdsClient::XdsChannel::UnsubscribeLocked(const XdsResourceType* type,
547 const XdsResourceName& name,
548 bool delay_unsubscription) {
549 if (ads_call_ != nullptr) {
550 auto* call = ads_call_->call();
551 if (call != nullptr) {
552 call->UnsubscribeLocked(type, name, delay_unsubscription);
553 if (!call->HasSubscribedResources()) {
554 ads_call_.reset();
555 }
556 }
557 }
558 }
559
MaybeFallbackLocked(const std::string & authority,AuthorityState & authority_state)560 bool XdsClient::XdsChannel::MaybeFallbackLocked(
561 const std::string& authority, AuthorityState& authority_state) {
562 if (!xds_client_->HasUncachedResources(authority_state)) {
563 return false;
564 }
565 std::vector<const XdsBootstrap::XdsServer*> xds_servers;
566 if (authority != kOldStyleAuthority) {
567 xds_servers =
568 xds_client_->bootstrap().LookupAuthority(authority)->servers();
569 }
570 if (xds_servers.empty()) xds_servers = xds_client_->bootstrap().servers();
571 for (size_t i = authority_state.xds_channels.size(); i < xds_servers.size();
572 ++i) {
573 authority_state.xds_channels.emplace_back(
574 xds_client_->GetOrCreateXdsChannelLocked(*xds_servers[i], "fallback"));
575 for (const auto& type_resource : authority_state.resource_map) {
576 for (const auto& key_state : type_resource.second) {
577 authority_state.xds_channels.back()->SubscribeLocked(
578 type_resource.first, {authority, key_state.first});
579 }
580 }
581 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
582 gpr_log(GPR_INFO,
583 "[xds_client %p] authority %s: added fallback server %s (%s)",
584 xds_client_.get(), authority.c_str(),
585 xds_servers[i]->server_uri().c_str(),
586 authority_state.xds_channels.back()->status().ToString().c_str());
587 }
588 if (authority_state.xds_channels.back()->status().ok()) return true;
589 }
590 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
591 gpr_log(GPR_INFO, "[xds_client %p] authority %s: No fallback server",
592 xds_client_.get(), authority.c_str());
593 }
594 return false;
595 }
596
SetHealthyLocked()597 void XdsClient::XdsChannel::SetHealthyLocked() {
598 status_ = absl::OkStatus();
599 // Make this channel active iff:
600 // 1. Channel is on the list of authority channels
601 // 2. Channel is not the last channel on the list (i.e. not the active
602 // channel)
603 for (auto& authority : xds_client_->authority_state_map_) {
604 auto& channels = authority.second.xds_channels;
605 // Skip if channel is active.
606 if (channels.back() == this) continue;
607 auto channel_it = std::find(channels.begin(), channels.end(), this);
608 // Skip if this is not on the list
609 if (channel_it != channels.end()) {
610 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
611 gpr_log(GPR_INFO, "[xds_client %p] authority %s: Falling forward to %s",
612 xds_client_.get(), authority.first.c_str(),
613 server_.server_uri().c_str());
614 }
615 // Lower priority channels are no longer needed, connection is back!
616 channels.erase(channel_it + 1, channels.end());
617 }
618 }
619 }
620
OnConnectivityFailure(absl::Status status)621 void XdsClient::XdsChannel::OnConnectivityFailure(absl::Status status) {
622 {
623 MutexLock lock(&xds_client_->mu_);
624 SetChannelStatusLocked(std::move(status));
625 }
626 xds_client_->work_serializer_.DrainQueue();
627 }
628
SetChannelStatusLocked(absl::Status status)629 void XdsClient::XdsChannel::SetChannelStatusLocked(absl::Status status) {
630 if (shutting_down_) return;
631 status = absl::Status(status.code(), absl::StrCat("xDS channel for server ",
632 server_.server_uri(), ": ",
633 status.message()));
634 gpr_log(GPR_INFO, "[xds_client %p] %s", xds_client(),
635 status.ToString().c_str());
636 // If the node ID is set, append that to the status message that we send to
637 // the watchers, so that it will appear in log messages visible to users.
638 const auto* node = xds_client_->bootstrap_->node();
639 if (node != nullptr) {
640 status = absl::Status(
641 status.code(),
642 absl::StrCat(status.message(),
643 " (node ID:", xds_client_->bootstrap_->node()->id(), ")"));
644 }
645 // Save status in channel, so that we can immediately generate an
646 // error for any new watchers that may be started.
647 status_ = status;
648 // Find all watchers for this channel.
649 std::set<RefCountedPtr<ResourceWatcherInterface>> watchers;
650 for (auto& a : xds_client_->authority_state_map_) { // authority
651 if (a.second.xds_channels.empty() || a.second.xds_channels.back() != this ||
652 MaybeFallbackLocked(a.first, a.second)) {
653 continue;
654 }
655 for (const auto& t : a.second.resource_map) { // type
656 for (const auto& r : t.second) { // resource id
657 for (const auto& w : r.second.watchers) { // watchers
658 watchers.insert(w.second);
659 }
660 }
661 }
662 }
663 if (!watchers.empty()) {
664 // Enqueue notification for the watchers.
665 xds_client_->work_serializer_.Schedule(
666 [watchers = std::move(watchers), status = std::move(status)]()
667 ABSL_EXCLUSIVE_LOCKS_REQUIRED(xds_client_->work_serializer_) {
668 for (const auto& watcher : watchers) {
669 watcher->OnError(status, ReadDelayHandle::NoWait());
670 }
671 },
672 DEBUG_LOCATION);
673 }
674 }
675
676 //
677 // XdsClient::XdsChannel::RetryableCall<>
678 //
679
680 template <typename T>
RetryableCall(WeakRefCountedPtr<XdsChannel> xds_channel)681 XdsClient::XdsChannel::RetryableCall<T>::RetryableCall(
682 WeakRefCountedPtr<XdsChannel> xds_channel)
683 : xds_channel_(std::move(xds_channel)),
684 backoff_(BackOff::Options()
685 .set_initial_backoff(Duration::Seconds(
686 GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS))
687 .set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER)
688 .set_jitter(GRPC_XDS_RECONNECT_JITTER)
689 .set_max_backoff(Duration::Seconds(
690 GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS))) {
691 StartNewCallLocked();
692 }
693
694 template <typename T>
Orphan()695 void XdsClient::XdsChannel::RetryableCall<T>::Orphan() {
696 shutting_down_ = true;
697 call_.reset();
698 if (timer_handle_.has_value()) {
699 xds_channel()->xds_client()->engine()->Cancel(*timer_handle_);
700 timer_handle_.reset();
701 }
702 this->Unref(DEBUG_LOCATION, "RetryableCall+orphaned");
703 }
704
705 template <typename T>
OnCallFinishedLocked()706 void XdsClient::XdsChannel::RetryableCall<T>::OnCallFinishedLocked() {
707 // If we saw a response on the current stream, reset backoff.
708 if (call_->seen_response()) backoff_.Reset();
709 call_.reset();
710 // Start retry timer.
711 StartRetryTimerLocked();
712 }
713
714 template <typename T>
StartNewCallLocked()715 void XdsClient::XdsChannel::RetryableCall<T>::StartNewCallLocked() {
716 if (shutting_down_) return;
717 GPR_ASSERT(xds_channel_->transport_ != nullptr);
718 GPR_ASSERT(call_ == nullptr);
719 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
720 gpr_log(GPR_INFO,
721 "[xds_client %p] xds server %s: start new call from retryable "
722 "call %p",
723 xds_channel()->xds_client(),
724 xds_channel()->server_.server_uri().c_str(), this);
725 }
726 call_ = MakeOrphanable<T>(
727 this->Ref(DEBUG_LOCATION, "RetryableCall+start_new_call"));
728 }
729
730 template <typename T>
StartRetryTimerLocked()731 void XdsClient::XdsChannel::RetryableCall<T>::StartRetryTimerLocked() {
732 if (shutting_down_) return;
733 const Timestamp next_attempt_time = backoff_.NextAttemptTime();
734 const Duration timeout =
735 std::max(next_attempt_time - Timestamp::Now(), Duration::Zero());
736 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
737 gpr_log(GPR_INFO,
738 "[xds_client %p] xds server %s: call attempt failed; "
739 "retry timer will fire in %" PRId64 "ms.",
740 xds_channel()->xds_client(),
741 xds_channel()->server_.server_uri().c_str(), timeout.millis());
742 }
743 timer_handle_ = xds_channel()->xds_client()->engine()->RunAfter(
744 timeout,
745 [self = this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start")]() {
746 ApplicationCallbackExecCtx callback_exec_ctx;
747 ExecCtx exec_ctx;
748 self->OnRetryTimer();
749 });
750 }
751
752 template <typename T>
OnRetryTimer()753 void XdsClient::XdsChannel::RetryableCall<T>::OnRetryTimer() {
754 MutexLock lock(&xds_channel_->xds_client()->mu_);
755 if (timer_handle_.has_value()) {
756 timer_handle_.reset();
757 if (shutting_down_) return;
758 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
759 gpr_log(GPR_INFO,
760 "[xds_client %p] xds server %s: retry timer fired (retryable "
761 "call: %p)",
762 xds_channel()->xds_client(),
763 xds_channel()->server_.server_uri().c_str(), this);
764 }
765 StartNewCallLocked();
766 }
767 }
768
769 //
770 // XdsClient::XdsChannel::AdsCall::AdsReadDelayHandle
771 //
772
773 class XdsClient::XdsChannel::AdsCall::AdsReadDelayHandle final
774 : public XdsClient::ReadDelayHandle {
775 public:
AdsReadDelayHandle(RefCountedPtr<AdsCall> ads_call)776 explicit AdsReadDelayHandle(RefCountedPtr<AdsCall> ads_call)
777 : ads_call_(std::move(ads_call)) {}
778
~AdsReadDelayHandle()779 ~AdsReadDelayHandle() override {
780 MutexLock lock(&ads_call_->xds_client()->mu_);
781 auto call = ads_call_->streaming_call_.get();
782 if (call != nullptr) call->StartRecvMessage();
783 }
784
785 private:
786 RefCountedPtr<AdsCall> ads_call_;
787 };
788
789 //
790 // XdsClient::XdsChannel::AdsCall::AdsResponseParser
791 //
792
793 absl::Status
ProcessAdsResponseFields(AdsResponseFields fields)794 XdsClient::XdsChannel::AdsCall::AdsResponseParser::ProcessAdsResponseFields(
795 AdsResponseFields fields) {
796 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
797 gpr_log(
798 GPR_INFO,
799 "[xds_client %p] xds server %s: received ADS response: type_url=%s, "
800 "version=%s, nonce=%s, num_resources=%" PRIuPTR,
801 ads_call_->xds_client(),
802 ads_call_->xds_channel()->server_.server_uri().c_str(),
803 fields.type_url.c_str(), fields.version.c_str(), fields.nonce.c_str(),
804 fields.num_resources);
805 }
806 result_.type =
807 ads_call_->xds_client()->GetResourceTypeLocked(fields.type_url);
808 if (result_.type == nullptr) {
809 return absl::InvalidArgumentError(
810 absl::StrCat("unknown resource type ", fields.type_url));
811 }
812 result_.type_url = std::move(fields.type_url);
813 result_.version = std::move(fields.version);
814 result_.nonce = std::move(fields.nonce);
815 result_.read_delay_handle =
816 MakeRefCounted<AdsReadDelayHandle>(ads_call_->Ref());
817 return absl::OkStatus();
818 }
819
820 namespace {
821
822 // Build a resource metadata struct for ADS result accepting methods and CSDS.
CreateResourceMetadataAcked(std::string serialized_proto,std::string version,Timestamp update_time)823 XdsApi::ResourceMetadata CreateResourceMetadataAcked(
824 std::string serialized_proto, std::string version, Timestamp update_time) {
825 XdsApi::ResourceMetadata resource_metadata;
826 resource_metadata.serialized_proto = std::move(serialized_proto);
827 resource_metadata.update_time = update_time;
828 resource_metadata.version = std::move(version);
829 resource_metadata.client_status = XdsApi::ResourceMetadata::ACKED;
830 return resource_metadata;
831 }
832
833 // Update resource_metadata for NACK.
UpdateResourceMetadataNacked(const std::string & version,const std::string & details,Timestamp update_time,XdsApi::ResourceMetadata * resource_metadata)834 void UpdateResourceMetadataNacked(const std::string& version,
835 const std::string& details,
836 Timestamp update_time,
837 XdsApi::ResourceMetadata* resource_metadata) {
838 resource_metadata->client_status = XdsApi::ResourceMetadata::NACKED;
839 resource_metadata->failed_version = version;
840 resource_metadata->failed_details = details;
841 resource_metadata->failed_update_time = update_time;
842 }
843
844 } // namespace
845
ParseResource(upb_Arena * arena,size_t idx,absl::string_view type_url,absl::string_view resource_name,absl::string_view serialized_resource)846 void XdsClient::XdsChannel::AdsCall::AdsResponseParser::ParseResource(
847 upb_Arena* arena, size_t idx, absl::string_view type_url,
848 absl::string_view resource_name, absl::string_view serialized_resource) {
849 std::string error_prefix = absl::StrCat(
850 "resource index ", idx, ": ",
851 resource_name.empty() ? "" : absl::StrCat(resource_name, ": "));
852 // Check the type_url of the resource.
853 if (result_.type_url != type_url) {
854 result_.errors.emplace_back(
855 absl::StrCat(error_prefix, "incorrect resource type \"", type_url,
856 "\" (should be \"", result_.type_url, "\")"));
857 ++result_.num_invalid_resources;
858 return;
859 }
860 // Parse the resource.
861 XdsResourceType::DecodeContext context = {
862 xds_client(), ads_call_->xds_channel()->server_, &grpc_xds_client_trace,
863 xds_client()->def_pool_.ptr(), arena};
864 XdsResourceType::DecodeResult decode_result =
865 result_.type->Decode(context, serialized_resource);
866 // If we didn't already have the resource name from the Resource
867 // wrapper, try to get it from the decoding result.
868 if (resource_name.empty()) {
869 if (decode_result.name.has_value()) {
870 resource_name = *decode_result.name;
871 error_prefix =
872 absl::StrCat("resource index ", idx, ": ", resource_name, ": ");
873 } else {
874 // We don't have any way of determining the resource name, so
875 // there's nothing more we can do here.
876 result_.errors.emplace_back(absl::StrCat(
877 error_prefix, decode_result.resource.status().ToString()));
878 ++result_.num_invalid_resources;
879 return;
880 }
881 }
882 // If decoding failed, make sure we include the error in the NACK.
883 const absl::Status& decode_status = decode_result.resource.status();
884 if (!decode_status.ok()) {
885 result_.errors.emplace_back(
886 absl::StrCat(error_prefix, decode_status.ToString()));
887 }
888 // Check the resource name.
889 auto parsed_resource_name =
890 xds_client()->ParseXdsResourceName(resource_name, result_.type);
891 if (!parsed_resource_name.ok()) {
892 result_.errors.emplace_back(
893 absl::StrCat(error_prefix, "Cannot parse xDS resource name"));
894 ++result_.num_invalid_resources;
895 return;
896 }
897 // Cancel resource-does-not-exist timer, if needed.
898 auto timer_it = ads_call_->state_map_.find(result_.type);
899 if (timer_it != ads_call_->state_map_.end()) {
900 auto it = timer_it->second.subscribed_resources.find(
901 parsed_resource_name->authority);
902 if (it != timer_it->second.subscribed_resources.end()) {
903 auto res_it = it->second.find(parsed_resource_name->key);
904 if (res_it != it->second.end()) {
905 res_it->second->MarkSeen();
906 }
907 }
908 }
909 // Lookup the authority in the cache.
910 auto authority_it =
911 xds_client()->authority_state_map_.find(parsed_resource_name->authority);
912 if (authority_it == xds_client()->authority_state_map_.end()) {
913 return; // Skip resource -- we don't have a subscription for it.
914 }
915 // Found authority, so look up type.
916 AuthorityState& authority_state = authority_it->second;
917 auto type_it = authority_state.resource_map.find(result_.type);
918 if (type_it == authority_state.resource_map.end()) {
919 return; // Skip resource -- we don't have a subscription for it.
920 }
921 auto& type_map = type_it->second;
922 // Found type, so look up resource key.
923 auto it = type_map.find(parsed_resource_name->key);
924 if (it == type_map.end()) {
925 return; // Skip resource -- we don't have a subscription for it.
926 }
927 ResourceState& resource_state = it->second;
928 // If needed, record that we've seen this resource.
929 if (result_.type->AllResourcesRequiredInSotW()) {
930 result_.resources_seen[parsed_resource_name->authority].insert(
931 parsed_resource_name->key);
932 }
933 // If we previously ignored the resource's deletion, log that we're
934 // now re-adding it.
935 if (resource_state.ignored_deletion) {
936 gpr_log(GPR_INFO,
937 "[xds_client %p] xds server %s: server returned new version of "
938 "resource for which we previously ignored a deletion: type %s "
939 "name %s",
940 xds_client(),
941 ads_call_->xds_channel()->server_.server_uri().c_str(),
942 std::string(type_url).c_str(), std::string(resource_name).c_str());
943 resource_state.ignored_deletion = false;
944 }
945 // Update resource state based on whether the resource is valid.
946 if (!decode_status.ok()) {
947 xds_client()->NotifyWatchersOnErrorLocked(
948 resource_state.watchers,
949 absl::UnavailableError(
950 absl::StrCat("invalid resource: ", decode_status.ToString())),
951 result_.read_delay_handle);
952 UpdateResourceMetadataNacked(result_.version, decode_status.ToString(),
953 update_time_, &resource_state.meta);
954 ++result_.num_invalid_resources;
955 return;
956 }
957 // Resource is valid.
958 ++result_.num_valid_resources;
959 // If it didn't change, ignore it.
960 if (resource_state.resource != nullptr &&
961 result_.type->ResourcesEqual(resource_state.resource.get(),
962 decode_result.resource->get())) {
963 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
964 gpr_log(GPR_INFO,
965 "[xds_client %p] %s resource %s identical to current, ignoring.",
966 xds_client(), result_.type_url.c_str(),
967 std::string(resource_name).c_str());
968 }
969 return;
970 }
971 // Update the resource state.
972 resource_state.resource = std::move(*decode_result.resource);
973 resource_state.meta = CreateResourceMetadataAcked(
974 std::string(serialized_resource), result_.version, update_time_);
975 // Notify watchers.
976 auto& watchers_list = resource_state.watchers;
977 xds_client()->work_serializer_.Schedule(
978 [watchers_list, value = resource_state.resource,
979 read_delay_handle = result_.read_delay_handle]()
980 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&xds_client()->work_serializer_) {
981 for (const auto& p : watchers_list) {
982 p.first->OnGenericResourceChanged(value, read_delay_handle);
983 }
984 },
985 DEBUG_LOCATION);
986 }
987
988 void XdsClient::XdsChannel::AdsCall::AdsResponseParser::
ResourceWrapperParsingFailed(size_t idx,absl::string_view message)989 ResourceWrapperParsingFailed(size_t idx, absl::string_view message) {
990 result_.errors.emplace_back(
991 absl::StrCat("resource index ", idx, ": ", message));
992 ++result_.num_invalid_resources;
993 }
994
995 //
996 // XdsClient::XdsChannel::AdsCall
997 //
998
AdsCall(RefCountedPtr<RetryableCall<AdsCall>> retryable_call)999 XdsClient::XdsChannel::AdsCall::AdsCall(
1000 RefCountedPtr<RetryableCall<AdsCall>> retryable_call)
1001 : InternallyRefCounted<AdsCall>(
1002 GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace) ? "AdsCall"
1003 : nullptr),
1004 retryable_call_(std::move(retryable_call)) {
1005 GPR_ASSERT(xds_client() != nullptr);
1006 // Init the ADS call.
1007 const char* method =
1008 "/envoy.service.discovery.v3.AggregatedDiscoveryService/"
1009 "StreamAggregatedResources";
1010 streaming_call_ = xds_channel()->transport_->CreateStreamingCall(
1011 method, std::make_unique<StreamEventHandler>(
1012 // Passing the initial ref here. This ref will go away when
1013 // the StreamEventHandler is destroyed.
1014 RefCountedPtr<AdsCall>(this)));
1015 GPR_ASSERT(streaming_call_ != nullptr);
1016 // Start the call.
1017 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1018 gpr_log(GPR_INFO,
1019 "[xds_client %p] xds server %s: starting ADS call "
1020 "(ads_call: %p, streaming_call: %p)",
1021 xds_client(), xds_channel()->server_.server_uri().c_str(), this,
1022 streaming_call_.get());
1023 }
1024 // If this is a reconnect, add any necessary subscriptions from what's
1025 // already in the cache.
1026 for (auto& a : xds_client()->authority_state_map_) {
1027 const std::string& authority = a.first;
1028 auto it = std::find(a.second.xds_channels.begin(),
1029 a.second.xds_channels.end(), xds_channel());
1030 // Skip authorities that are not using this xDS channel. The channel can be
1031 // anywhere in the list.
1032 if (it == a.second.xds_channels.end()) continue;
1033 for (const auto& t : a.second.resource_map) {
1034 const XdsResourceType* type = t.first;
1035 for (const auto& r : t.second) {
1036 const XdsResourceKey& resource_key = r.first;
1037 SubscribeLocked(type, {authority, resource_key}, /*delay_send=*/true);
1038 }
1039 }
1040 }
1041 // Send initial message if we added any subscriptions above.
1042 for (const auto& p : state_map_) {
1043 SendMessageLocked(p.first);
1044 }
1045 streaming_call_->StartRecvMessage();
1046 }
1047
Orphan()1048 void XdsClient::XdsChannel::AdsCall::Orphan() {
1049 state_map_.clear();
1050 // Note that the initial ref is held by the StreamEventHandler, which
1051 // will be destroyed when streaming_call_ is destroyed, which may not happen
1052 // here, since there may be other refs held to streaming_call_ by internal
1053 // callbacks.
1054 streaming_call_.reset();
1055 }
1056
SendMessageLocked(const XdsResourceType * type)1057 void XdsClient::XdsChannel::AdsCall::SendMessageLocked(
1058 const XdsResourceType* type)
1059 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
1060 // Buffer message sending if an existing message is in flight.
1061 if (send_message_pending_ != nullptr) {
1062 buffered_requests_.insert(type);
1063 return;
1064 }
1065 auto& state = state_map_[type];
1066 std::string serialized_message = xds_client()->api_.CreateAdsRequest(
1067 type->type_url(), xds_channel()->resource_type_version_map_[type],
1068 state.nonce, ResourceNamesForRequest(type), state.status,
1069 !sent_initial_message_);
1070 sent_initial_message_ = true;
1071 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1072 gpr_log(GPR_INFO,
1073 "[xds_client %p] xds server %s: sending ADS request: type=%s "
1074 "version=%s nonce=%s error=%s",
1075 xds_client(), xds_channel()->server_.server_uri().c_str(),
1076 std::string(type->type_url()).c_str(),
1077 xds_channel()->resource_type_version_map_[type].c_str(),
1078 state.nonce.c_str(), state.status.ToString().c_str());
1079 }
1080 state.status = absl::OkStatus();
1081 streaming_call_->SendMessage(std::move(serialized_message));
1082 send_message_pending_ = type;
1083 }
1084
SubscribeLocked(const XdsResourceType * type,const XdsResourceName & name,bool delay_send)1085 void XdsClient::XdsChannel::AdsCall::SubscribeLocked(
1086 const XdsResourceType* type, const XdsResourceName& name, bool delay_send) {
1087 auto& state = state_map_[type].subscribed_resources[name.authority][name.key];
1088 if (state == nullptr) {
1089 state = MakeOrphanable<ResourceTimer>(type, name);
1090 if (!delay_send) SendMessageLocked(type);
1091 }
1092 }
1093
UnsubscribeLocked(const XdsResourceType * type,const XdsResourceName & name,bool delay_unsubscription)1094 void XdsClient::XdsChannel::AdsCall::UnsubscribeLocked(
1095 const XdsResourceType* type, const XdsResourceName& name,
1096 bool delay_unsubscription) {
1097 auto& type_state_map = state_map_[type];
1098 auto& authority_map = type_state_map.subscribed_resources[name.authority];
1099 authority_map.erase(name.key);
1100 if (authority_map.empty()) {
1101 type_state_map.subscribed_resources.erase(name.authority);
1102 }
1103 // Don't need to send unsubscription message if this was the last
1104 // resource we were subscribed to, since we'll be closing the stream
1105 // immediately in that case.
1106 if (!delay_unsubscription && HasSubscribedResources()) {
1107 SendMessageLocked(type);
1108 }
1109 }
1110
HasSubscribedResources() const1111 bool XdsClient::XdsChannel::AdsCall::HasSubscribedResources() const {
1112 for (const auto& p : state_map_) {
1113 if (!p.second.subscribed_resources.empty()) return true;
1114 }
1115 return false;
1116 }
1117
OnRequestSent(bool ok)1118 void XdsClient::XdsChannel::AdsCall::OnRequestSent(bool ok) {
1119 MutexLock lock(&xds_client()->mu_);
1120 // For each resource that was in the message we just sent, start the
1121 // resource timer if needed.
1122 if (ok) {
1123 auto& resource_type_state = state_map_[send_message_pending_];
1124 for (const auto& p : resource_type_state.subscribed_resources) {
1125 for (auto& q : p.second) {
1126 q.second->MaybeMarkSubscriptionSendComplete(
1127 Ref(DEBUG_LOCATION, "ResourceTimer"));
1128 }
1129 }
1130 }
1131 send_message_pending_ = nullptr;
1132 if (ok && IsCurrentCallOnChannel()) {
1133 // Continue to send another pending message if any.
1134 // TODO(roth): The current code to handle buffered messages has the
1135 // advantage of sending only the most recent list of resource names for
1136 // each resource type (no matter how many times that resource type has
1137 // been requested to send while the current message sending is still
1138 // pending). But its disadvantage is that we send the requests in fixed
1139 // order of resource types. We need to fix this if we are seeing some
1140 // resource type(s) starved due to frequent requests of other resource
1141 // type(s).
1142 auto it = buffered_requests_.begin();
1143 if (it != buffered_requests_.end()) {
1144 SendMessageLocked(*it);
1145 buffered_requests_.erase(it);
1146 }
1147 }
1148 }
1149
OnRecvMessage(absl::string_view payload)1150 void XdsClient::XdsChannel::AdsCall::OnRecvMessage(absl::string_view payload) {
1151 // Needs to be destroyed after the mutex is released.
1152 RefCountedPtr<ReadDelayHandle> read_delay_handle;
1153 {
1154 MutexLock lock(&xds_client()->mu_);
1155 if (!IsCurrentCallOnChannel()) return;
1156 // Parse and validate the response.
1157 AdsResponseParser parser(this);
1158 absl::Status status = xds_client()->api_.ParseAdsResponse(payload, &parser);
1159 // This includes a handle that will trigger an ADS read.
1160 AdsResponseParser::Result result = parser.TakeResult();
1161 read_delay_handle = std::move(result.read_delay_handle);
1162 if (!status.ok()) {
1163 // Ignore unparsable response.
1164 gpr_log(GPR_ERROR,
1165 "[xds_client %p] xds server %s: error parsing ADS response (%s) "
1166 "-- ignoring",
1167 xds_client(), xds_channel()->server_.server_uri().c_str(),
1168 status.ToString().c_str());
1169 } else {
1170 seen_response_ = true;
1171 xds_channel()->SetHealthyLocked();
1172 // Update nonce.
1173 auto& state = state_map_[result.type];
1174 state.nonce = result.nonce;
1175 // If we got an error, set state.status so that we'll NACK the update.
1176 if (!result.errors.empty()) {
1177 state.status = absl::UnavailableError(
1178 absl::StrCat("xDS response validation errors: [",
1179 absl::StrJoin(result.errors, "; "), "]"));
1180 gpr_log(GPR_ERROR,
1181 "[xds_client %p] xds server %s: ADS response invalid for "
1182 "resource "
1183 "type %s version %s, will NACK: nonce=%s status=%s",
1184 xds_client(), xds_channel()->server_.server_uri().c_str(),
1185 result.type_url.c_str(), result.version.c_str(),
1186 state.nonce.c_str(), state.status.ToString().c_str());
1187 }
1188 // Delete resources not seen in update if needed.
1189 if (result.type->AllResourcesRequiredInSotW()) {
1190 for (auto& a : xds_client()->authority_state_map_) {
1191 const std::string& authority = a.first;
1192 AuthorityState& authority_state = a.second;
1193 // Skip authorities that are not using this xDS channel.
1194 if (authority_state.xds_channels.back() != xds_channel()) {
1195 continue;
1196 }
1197 auto seen_authority_it = result.resources_seen.find(authority);
1198 // Find this resource type.
1199 auto type_it = authority_state.resource_map.find(result.type);
1200 if (type_it == authority_state.resource_map.end()) continue;
1201 // Iterate over resource ids.
1202 for (auto& r : type_it->second) {
1203 const XdsResourceKey& resource_key = r.first;
1204 ResourceState& resource_state = r.second;
1205 if (seen_authority_it == result.resources_seen.end() ||
1206 seen_authority_it->second.find(resource_key) ==
1207 seen_authority_it->second.end()) {
1208 // If the resource was newly requested but has not yet been
1209 // received, we don't want to generate an error for the
1210 // watchers, because this ADS response may be in reaction to an
1211 // earlier request that did not yet request the new resource, so
1212 // its absence from the response does not necessarily indicate
1213 // that the resource does not exist. For that case, we rely on
1214 // the request timeout instead.
1215 if (resource_state.resource == nullptr) continue;
1216 if (xds_channel()->server_.IgnoreResourceDeletion()) {
1217 if (!resource_state.ignored_deletion) {
1218 gpr_log(GPR_ERROR,
1219 "[xds_client %p] xds server %s: ignoring deletion "
1220 "for resource type %s name %s",
1221 xds_client(),
1222 xds_channel()->server_.server_uri().c_str(),
1223 result.type_url.c_str(),
1224 XdsClient::ConstructFullXdsResourceName(
1225 authority, result.type_url.c_str(), resource_key)
1226 .c_str());
1227 resource_state.ignored_deletion = true;
1228 }
1229 } else {
1230 resource_state.resource.reset();
1231 resource_state.meta.client_status =
1232 XdsApi::ResourceMetadata::DOES_NOT_EXIST;
1233 xds_client()->NotifyWatchersOnResourceDoesNotExist(
1234 resource_state.watchers, read_delay_handle);
1235 }
1236 }
1237 }
1238 }
1239 }
1240 // If we had valid resources or the update was empty, update the version.
1241 if (result.num_valid_resources > 0 || result.errors.empty()) {
1242 xds_channel()->resource_type_version_map_[result.type] =
1243 std::move(result.version);
1244 }
1245 // Send ACK or NACK.
1246 SendMessageLocked(result.type);
1247 }
1248 // Update metrics.
1249 if (xds_client()->metrics_reporter_ != nullptr) {
1250 xds_client()->metrics_reporter_->ReportResourceUpdates(
1251 xds_channel()->server_.server_uri(), result.type_url,
1252 result.num_valid_resources, result.num_invalid_resources);
1253 }
1254 }
1255 xds_client()->work_serializer_.DrainQueue();
1256 }
1257
OnStatusReceived(absl::Status status)1258 void XdsClient::XdsChannel::AdsCall::OnStatusReceived(absl::Status status) {
1259 {
1260 MutexLock lock(&xds_client()->mu_);
1261 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1262 gpr_log(GPR_INFO,
1263 "[xds_client %p] xds server %s: ADS call status received "
1264 "(xds_channel=%p, ads_call=%p, streaming_call=%p): %s",
1265 xds_client(), xds_channel()->server_.server_uri().c_str(),
1266 xds_channel(), this, streaming_call_.get(),
1267 status.ToString().c_str());
1268 }
1269 // Cancel any does-not-exist timers that may be pending.
1270 for (const auto& p : state_map_) {
1271 for (const auto& q : p.second.subscribed_resources) {
1272 for (auto& r : q.second) {
1273 r.second->MaybeCancelTimer();
1274 }
1275 }
1276 }
1277 // Ignore status from a stale call.
1278 if (IsCurrentCallOnChannel()) {
1279 // Try to restart the call.
1280 retryable_call_->OnCallFinishedLocked();
1281 // If we didn't receive a response on the stream, report the
1282 // stream failure as a connectivity failure, which will report the
1283 // error to all watchers of resources on this channel.
1284 if (!seen_response_) {
1285 xds_channel()->SetChannelStatusLocked(absl::UnavailableError(
1286 absl::StrCat("xDS call failed with no responses received; status: ",
1287 status.ToString())));
1288 }
1289 }
1290 }
1291 xds_client()->work_serializer_.DrainQueue();
1292 }
1293
IsCurrentCallOnChannel() const1294 bool XdsClient::XdsChannel::AdsCall::IsCurrentCallOnChannel() const {
1295 // If the retryable ADS call is null (which only happens when the xds
1296 // channel is shutting down), all the ADS calls are stale.
1297 if (xds_channel()->ads_call_ == nullptr) return false;
1298 return this == xds_channel()->ads_call_->call();
1299 }
1300
1301 std::vector<std::string>
ResourceNamesForRequest(const XdsResourceType * type)1302 XdsClient::XdsChannel::AdsCall::ResourceNamesForRequest(
1303 const XdsResourceType* type) {
1304 std::vector<std::string> resource_names;
1305 auto it = state_map_.find(type);
1306 if (it != state_map_.end()) {
1307 for (auto& a : it->second.subscribed_resources) {
1308 const std::string& authority = a.first;
1309 for (auto& p : a.second) {
1310 const XdsResourceKey& resource_key = p.first;
1311 resource_names.emplace_back(XdsClient::ConstructFullXdsResourceName(
1312 authority, type->type_url(), resource_key));
1313 OrphanablePtr<ResourceTimer>& resource_timer = p.second;
1314 resource_timer->MarkSubscriptionSendStarted();
1315 }
1316 }
1317 }
1318 return resource_names;
1319 }
1320
1321 //
1322 // XdsClient::XdsChannel::LrsCall::Timer
1323 //
1324
Orphan()1325 void XdsClient::XdsChannel::LrsCall::Timer::Orphan() {
1326 if (timer_handle_.has_value()) {
1327 xds_client()->engine()->Cancel(*timer_handle_);
1328 timer_handle_.reset();
1329 }
1330 Unref(DEBUG_LOCATION, "Orphan");
1331 }
1332
ScheduleNextReportLocked()1333 void XdsClient::XdsChannel::LrsCall::Timer::ScheduleNextReportLocked() {
1334 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1335 gpr_log(GPR_INFO,
1336 "[xds_client %p] xds server %s: scheduling next load report in %s",
1337 xds_client(),
1338 lrs_call_->xds_channel()->server_.server_uri().c_str(),
1339 lrs_call_->load_reporting_interval_.ToString().c_str());
1340 }
1341 timer_handle_ = xds_client()->engine()->RunAfter(
1342 lrs_call_->load_reporting_interval_,
1343 [self = Ref(DEBUG_LOCATION, "timer")]() {
1344 ApplicationCallbackExecCtx callback_exec_ctx;
1345 ExecCtx exec_ctx;
1346 self->OnNextReportTimer();
1347 });
1348 }
1349
OnNextReportTimer()1350 void XdsClient::XdsChannel::LrsCall::Timer::OnNextReportTimer() {
1351 MutexLock lock(&xds_client()->mu_);
1352 timer_handle_.reset();
1353 if (IsCurrentTimerOnCall()) lrs_call_->SendReportLocked();
1354 }
1355
1356 //
1357 // XdsClient::XdsChannel::LrsCall
1358 //
1359
LrsCall(RefCountedPtr<RetryableCall<LrsCall>> retryable_call)1360 XdsClient::XdsChannel::LrsCall::LrsCall(
1361 RefCountedPtr<RetryableCall<LrsCall>> retryable_call)
1362 : InternallyRefCounted<LrsCall>(
1363 GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace) ? "LrsCall"
1364 : nullptr),
1365 retryable_call_(std::move(retryable_call)) {
1366 // Init the LRS call. Note that the call will progress every time there's
1367 // activity in xds_client()->interested_parties_, which is comprised of
1368 // the polling entities from client_channel.
1369 GPR_ASSERT(xds_client() != nullptr);
1370 const char* method =
1371 "/envoy.service.load_stats.v3.LoadReportingService/StreamLoadStats";
1372 streaming_call_ = xds_channel()->transport_->CreateStreamingCall(
1373 method, std::make_unique<StreamEventHandler>(
1374 // Passing the initial ref here. This ref will go away when
1375 // the StreamEventHandler is destroyed.
1376 RefCountedPtr<LrsCall>(this)));
1377 GPR_ASSERT(streaming_call_ != nullptr);
1378 // Start the call.
1379 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1380 gpr_log(GPR_INFO,
1381 "[xds_client %p] xds server %s: starting LRS call (lrs_call=%p, "
1382 "streaming_call=%p)",
1383 xds_client(), xds_channel()->server_.server_uri().c_str(), this,
1384 streaming_call_.get());
1385 }
1386 // Send the initial request.
1387 std::string serialized_payload = xds_client()->api_.CreateLrsInitialRequest();
1388 SendMessageLocked(std::move(serialized_payload));
1389 // Read initial response.
1390 streaming_call_->StartRecvMessage();
1391 }
1392
Orphan()1393 void XdsClient::XdsChannel::LrsCall::Orphan() {
1394 timer_.reset();
1395 // Note that the initial ref is held by the StreamEventHandler, which
1396 // will be destroyed when streaming_call_ is destroyed, which may not happen
1397 // here, since there may be other refs held to streaming_call_ by internal
1398 // callbacks.
1399 streaming_call_.reset();
1400 }
1401
MaybeScheduleNextReportLocked()1402 void XdsClient::XdsChannel::LrsCall::MaybeScheduleNextReportLocked() {
1403 // If there are no more registered stats to report, cancel the call.
1404 auto it = xds_client()->xds_load_report_server_map_.find(
1405 xds_channel()->server_.Key());
1406 if (it == xds_client()->xds_load_report_server_map_.end() ||
1407 it->second.load_report_map.empty()) {
1408 it->second.xds_channel->StopLrsCallLocked();
1409 return;
1410 }
1411 // Don't start if the previous send_message op hasn't completed yet.
1412 // If this happens, we'll be called again from OnRequestSent().
1413 if (send_message_pending_) return;
1414 // Don't start if no LRS response has arrived.
1415 if (!seen_response()) return;
1416 // If there is no timer, create one.
1417 // This happens on the initial response and whenever the interval changes.
1418 if (timer_ == nullptr) {
1419 timer_ = MakeOrphanable<Timer>(Ref(DEBUG_LOCATION, "LRS timer"));
1420 }
1421 // Schedule the next load report.
1422 timer_->ScheduleNextReportLocked();
1423 }
1424
1425 namespace {
1426
LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap & snapshot)1427 bool LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap& snapshot) {
1428 for (const auto& p : snapshot) {
1429 const XdsApi::ClusterLoadReport& cluster_snapshot = p.second;
1430 if (!cluster_snapshot.dropped_requests.IsZero()) return false;
1431 for (const auto& q : cluster_snapshot.locality_stats) {
1432 const XdsClusterLocalityStats::Snapshot& locality_snapshot = q.second;
1433 if (!locality_snapshot.IsZero()) return false;
1434 }
1435 }
1436 return true;
1437 }
1438
1439 } // namespace
1440
SendReportLocked()1441 void XdsClient::XdsChannel::LrsCall::SendReportLocked() {
1442 // Construct snapshot from all reported stats.
1443 XdsApi::ClusterLoadReportMap snapshot =
1444 xds_client()->BuildLoadReportSnapshotLocked(
1445 xds_channel()->server_, send_all_clusters_, cluster_names_);
1446 // Skip client load report if the counters were all zero in the last
1447 // report and they are still zero in this one.
1448 const bool old_val = last_report_counters_were_zero_;
1449 last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot);
1450 if (old_val && last_report_counters_were_zero_) {
1451 MaybeScheduleNextReportLocked();
1452 return;
1453 }
1454 // Send a request that contains the snapshot.
1455 std::string serialized_payload =
1456 xds_client()->api_.CreateLrsRequest(std::move(snapshot));
1457 SendMessageLocked(std::move(serialized_payload));
1458 }
1459
SendMessageLocked(std::string payload)1460 void XdsClient::XdsChannel::LrsCall::SendMessageLocked(std::string payload) {
1461 send_message_pending_ = true;
1462 streaming_call_->SendMessage(std::move(payload));
1463 }
1464
OnRequestSent()1465 void XdsClient::XdsChannel::LrsCall::OnRequestSent() {
1466 MutexLock lock(&xds_client()->mu_);
1467 send_message_pending_ = false;
1468 if (IsCurrentCallOnChannel()) MaybeScheduleNextReportLocked();
1469 }
1470
OnRecvMessage(absl::string_view payload)1471 void XdsClient::XdsChannel::LrsCall::OnRecvMessage(absl::string_view payload) {
1472 MutexLock lock(&xds_client()->mu_);
1473 // If we're no longer the current call, ignore the result.
1474 if (!IsCurrentCallOnChannel()) return;
1475 // Start recv after any code branch
1476 auto cleanup = absl::MakeCleanup(
1477 [call = streaming_call_.get()]() { call->StartRecvMessage(); });
1478 // Parse the response.
1479 bool send_all_clusters = false;
1480 std::set<std::string> new_cluster_names;
1481 Duration new_load_reporting_interval;
1482 absl::Status status = xds_client()->api_.ParseLrsResponse(
1483 payload, &send_all_clusters, &new_cluster_names,
1484 &new_load_reporting_interval);
1485 if (!status.ok()) {
1486 gpr_log(GPR_ERROR,
1487 "[xds_client %p] xds server %s: LRS response parsing failed: %s",
1488 xds_client(), xds_channel()->server_.server_uri().c_str(),
1489 status.ToString().c_str());
1490 return;
1491 }
1492 seen_response_ = true;
1493 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1494 gpr_log(
1495 GPR_INFO,
1496 "[xds_client %p] xds server %s: LRS response received, %" PRIuPTR
1497 " cluster names, send_all_clusters=%d, load_report_interval=%" PRId64
1498 "ms",
1499 xds_client(), xds_channel()->server_.server_uri().c_str(),
1500 new_cluster_names.size(), send_all_clusters,
1501 new_load_reporting_interval.millis());
1502 size_t i = 0;
1503 for (const auto& name : new_cluster_names) {
1504 gpr_log(GPR_INFO, "[xds_client %p] cluster_name %" PRIuPTR ": %s",
1505 xds_client(), i++, name.c_str());
1506 }
1507 }
1508 if (new_load_reporting_interval <
1509 Duration::Milliseconds(GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS)) {
1510 new_load_reporting_interval =
1511 Duration::Milliseconds(GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS);
1512 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1513 gpr_log(GPR_INFO,
1514 "[xds_client %p] xds server %s: increased load_report_interval "
1515 "to minimum value %dms",
1516 xds_client(), xds_channel()->server_.server_uri().c_str(),
1517 GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS);
1518 }
1519 }
1520 // Ignore identical update.
1521 if (send_all_clusters == send_all_clusters_ &&
1522 cluster_names_ == new_cluster_names &&
1523 load_reporting_interval_ == new_load_reporting_interval) {
1524 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1525 gpr_log(GPR_INFO,
1526 "[xds_client %p] xds server %s: incoming LRS response identical "
1527 "to current, ignoring.",
1528 xds_client(), xds_channel()->server_.server_uri().c_str());
1529 }
1530 return;
1531 }
1532 // If the interval has changed, we'll need to restart the timer below.
1533 const bool restart_timer =
1534 load_reporting_interval_ != new_load_reporting_interval;
1535 // Record the new config.
1536 send_all_clusters_ = send_all_clusters;
1537 cluster_names_ = std::move(new_cluster_names);
1538 load_reporting_interval_ = new_load_reporting_interval;
1539 // Restart timer if needed.
1540 if (restart_timer) {
1541 timer_.reset();
1542 MaybeScheduleNextReportLocked();
1543 }
1544 }
1545
OnStatusReceived(absl::Status status)1546 void XdsClient::XdsChannel::LrsCall::OnStatusReceived(absl::Status status) {
1547 MutexLock lock(&xds_client()->mu_);
1548 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1549 gpr_log(GPR_INFO,
1550 "[xds_client %p] xds server %s: LRS call status received "
1551 "(xds_channel=%p, lrs_call=%p, streaming_call=%p): %s",
1552 xds_client(), xds_channel()->server_.server_uri().c_str(),
1553 xds_channel(), this, streaming_call_.get(),
1554 status.ToString().c_str());
1555 }
1556 // Ignore status from a stale call.
1557 if (IsCurrentCallOnChannel()) {
1558 // Try to restart the call.
1559 retryable_call_->OnCallFinishedLocked();
1560 }
1561 }
1562
IsCurrentCallOnChannel() const1563 bool XdsClient::XdsChannel::LrsCall::IsCurrentCallOnChannel() const {
1564 // If the retryable LRS call is null (which only happens when the xds
1565 // channel is shutting down), all the LRS calls are stale.
1566 if (xds_channel()->lrs_call_ == nullptr) return false;
1567 return this == xds_channel()->lrs_call_->call();
1568 }
1569
1570 //
1571 // XdsClient
1572 //
1573
1574 constexpr absl::string_view XdsClient::kOldStyleAuthority;
1575
XdsClient(std::unique_ptr<XdsBootstrap> bootstrap,OrphanablePtr<XdsTransportFactory> transport_factory,std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine,std::unique_ptr<XdsMetricsReporter> metrics_reporter,std::string user_agent_name,std::string user_agent_version,Duration resource_request_timeout)1576 XdsClient::XdsClient(
1577 std::unique_ptr<XdsBootstrap> bootstrap,
1578 OrphanablePtr<XdsTransportFactory> transport_factory,
1579 std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine,
1580 std::unique_ptr<XdsMetricsReporter> metrics_reporter,
1581 std::string user_agent_name, std::string user_agent_version,
1582 Duration resource_request_timeout)
1583 : DualRefCounted<XdsClient>(
1584 GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace) ? "XdsClient"
1585 : nullptr),
1586 bootstrap_(std::move(bootstrap)),
1587 transport_factory_(std::move(transport_factory)),
1588 request_timeout_(resource_request_timeout),
1589 xds_federation_enabled_(XdsFederationEnabled()),
1590 api_(this, &grpc_xds_client_trace, bootstrap_->node(), &def_pool_,
1591 std::move(user_agent_name), std::move(user_agent_version)),
1592 work_serializer_(engine),
1593 engine_(std::move(engine)),
1594 metrics_reporter_(std::move(metrics_reporter)) {
1595 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1596 gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this);
1597 }
1598 GPR_ASSERT(bootstrap_ != nullptr);
1599 if (bootstrap_->node() != nullptr) {
1600 gpr_log(GPR_INFO, "[xds_client %p] xDS node ID: %s", this,
1601 bootstrap_->node()->id().c_str());
1602 }
1603 }
1604
~XdsClient()1605 XdsClient::~XdsClient() {
1606 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1607 gpr_log(GPR_INFO, "[xds_client %p] destroying xds client", this);
1608 }
1609 }
1610
Orphaned()1611 void XdsClient::Orphaned() {
1612 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1613 gpr_log(GPR_INFO, "[xds_client %p] shutting down xds client", this);
1614 }
1615 MutexLock lock(&mu_);
1616 shutting_down_ = true;
1617 // Clear cache and any remaining watchers that may not have been cancelled.
1618 authority_state_map_.clear();
1619 invalid_watchers_.clear();
1620 // We may still be sending lingering queued load report data, so don't
1621 // just clear the load reporting map, but we do want to clear the refs
1622 // we're holding to the XdsChannel objects, to make sure that
1623 // everything shuts down properly.
1624 for (auto& p : xds_load_report_server_map_) {
1625 p.second.xds_channel.reset(DEBUG_LOCATION, "XdsClient::Orphan()");
1626 }
1627 }
1628
GetOrCreateXdsChannelLocked(const XdsBootstrap::XdsServer & server,const char * reason)1629 RefCountedPtr<XdsClient::XdsChannel> XdsClient::GetOrCreateXdsChannelLocked(
1630 const XdsBootstrap::XdsServer& server, const char* reason) {
1631 std::string key = server.Key();
1632 auto it = xds_channel_map_.find(key);
1633 if (it != xds_channel_map_.end()) {
1634 return it->second->Ref(DEBUG_LOCATION, reason);
1635 }
1636 // Channel not found, so create a new one.
1637 auto xds_channel =
1638 MakeRefCounted<XdsChannel>(WeakRef(DEBUG_LOCATION, "XdsChannel"), server);
1639 xds_channel_map_[std::move(key)] = xds_channel.get();
1640 return xds_channel;
1641 }
1642
HasUncachedResources(const AuthorityState & authority_state)1643 bool XdsClient::HasUncachedResources(const AuthorityState& authority_state) {
1644 for (const auto& type_resource : authority_state.resource_map) {
1645 for (const auto& key_state : type_resource.second) {
1646 if (key_state.second.meta.client_status ==
1647 XdsApi::ResourceMetadata::REQUESTED) {
1648 return true;
1649 }
1650 }
1651 }
1652 return false;
1653 }
1654
WatchResource(const XdsResourceType * type,absl::string_view name,RefCountedPtr<ResourceWatcherInterface> watcher)1655 void XdsClient::WatchResource(const XdsResourceType* type,
1656 absl::string_view name,
1657 RefCountedPtr<ResourceWatcherInterface> watcher) {
1658 ResourceWatcherInterface* w = watcher.get();
1659 // Lambda for handling failure cases.
1660 auto fail = [&](absl::Status status) mutable {
1661 {
1662 MutexLock lock(&mu_);
1663 MaybeRegisterResourceTypeLocked(type);
1664 invalid_watchers_[w] = watcher;
1665 }
1666 work_serializer_.Run(
1667 [watcher = std::move(watcher), status = std::move(status)]()
1668 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
1669 watcher->OnError(status, ReadDelayHandle::NoWait());
1670 },
1671 DEBUG_LOCATION);
1672 };
1673 auto resource_name = ParseXdsResourceName(name, type);
1674 if (!resource_name.ok()) {
1675 fail(absl::UnavailableError(
1676 absl::StrCat("Unable to parse resource name ", name)));
1677 return;
1678 }
1679 // Find server to use.
1680 std::vector<const XdsBootstrap::XdsServer*> xds_servers;
1681 if (resource_name->authority != kOldStyleAuthority) {
1682 auto* authority =
1683 bootstrap_->LookupAuthority(std::string(resource_name->authority));
1684 if (authority == nullptr) {
1685 fail(absl::UnavailableError(
1686 absl::StrCat("authority \"", resource_name->authority,
1687 "\" not present in bootstrap config")));
1688 return;
1689 }
1690 xds_servers = authority->servers();
1691 }
1692 if (xds_servers.empty()) xds_servers = bootstrap_->servers();
1693 {
1694 MutexLock lock(&mu_);
1695 MaybeRegisterResourceTypeLocked(type);
1696
1697 AuthorityState& authority_state =
1698 authority_state_map_[resource_name->authority];
1699 auto it_is_new = authority_state.resource_map[type].emplace(
1700 resource_name->key, ResourceState());
1701 bool first_watcher_for_resource = it_is_new.second;
1702 ResourceState& resource_state = it_is_new.first->second;
1703 resource_state.watchers[w] = watcher;
1704 if (first_watcher_for_resource) {
1705 // We try to add new channels in 2 cases:
1706 // - This is the first resource for this authority (i.e., the list
1707 // of channels is empty).
1708 // - The last channel in the list is failing. That failure may not
1709 // have previously triggered fallback if there were no uncached
1710 // resources, but we've just added a new uncached resource,
1711 // so we need to trigger fallback now.
1712 //
1713 // Note that when we add a channel, it might already be failing
1714 // due to being used in a different authority. So we keep going
1715 // until either we add one that isn't failing or we've added them all.
1716 if (authority_state.xds_channels.empty() ||
1717 !authority_state.xds_channels.back()->status().ok()) {
1718 for (size_t i = authority_state.xds_channels.size();
1719 i < xds_servers.size(); ++i) {
1720 authority_state.xds_channels.emplace_back(
1721 GetOrCreateXdsChannelLocked(*xds_servers[i], "start watch"));
1722 if (authority_state.xds_channels.back()->status().ok()) {
1723 break;
1724 }
1725 }
1726 }
1727 for (const auto& channel : authority_state.xds_channels) {
1728 channel->SubscribeLocked(type, *resource_name);
1729 }
1730 } else {
1731 // If we already have a cached value for the resource, notify the new
1732 // watcher immediately.
1733 if (resource_state.resource != nullptr) {
1734 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1735 gpr_log(GPR_INFO,
1736 "[xds_client %p] returning cached listener data for %s", this,
1737 std::string(name).c_str());
1738 }
1739 work_serializer_.Schedule(
1740 [watcher, value = resource_state.resource]()
1741 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
1742 watcher->OnGenericResourceChanged(value,
1743 ReadDelayHandle::NoWait());
1744 },
1745 DEBUG_LOCATION);
1746 } else if (resource_state.meta.client_status ==
1747 XdsApi::ResourceMetadata::DOES_NOT_EXIST) {
1748 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1749 gpr_log(GPR_INFO,
1750 "[xds_client %p] reporting cached does-not-exist for %s",
1751 this, std::string(name).c_str());
1752 }
1753 work_serializer_.Schedule(
1754 [watcher]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
1755 watcher->OnResourceDoesNotExist(ReadDelayHandle::NoWait());
1756 },
1757 DEBUG_LOCATION);
1758 } else if (resource_state.meta.client_status ==
1759 XdsApi::ResourceMetadata::NACKED) {
1760 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1761 gpr_log(
1762 GPR_INFO,
1763 "[xds_client %p] reporting cached validation failure for %s: %s",
1764 this, std::string(name).c_str(),
1765 resource_state.meta.failed_details.c_str());
1766 }
1767 std::string details = resource_state.meta.failed_details;
1768 const auto* node = bootstrap_->node();
1769 if (node != nullptr) {
1770 absl::StrAppend(&details, " (node ID:", bootstrap_->node()->id(),
1771 ")");
1772 }
1773 work_serializer_.Schedule(
1774 [watcher, details = std::move(details)]()
1775 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
1776 watcher->OnError(absl::UnavailableError(absl::StrCat(
1777 "invalid resource: ", details)),
1778 ReadDelayHandle::NoWait());
1779 },
1780 DEBUG_LOCATION);
1781 }
1782 }
1783 absl::Status channel_status = authority_state.xds_channels.back()->status();
1784 if (!channel_status.ok()) {
1785 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1786 gpr_log(GPR_INFO,
1787 "[xds_client %p] returning cached channel error for %s: %s",
1788 this, std::string(name).c_str(),
1789 channel_status.ToString().c_str());
1790 }
1791 work_serializer_.Schedule(
1792 [watcher = std::move(watcher), status = std::move(channel_status)]()
1793 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) mutable {
1794 watcher->OnError(std::move(status), ReadDelayHandle::NoWait());
1795 },
1796 DEBUG_LOCATION);
1797 }
1798 }
1799 work_serializer_.DrainQueue();
1800 }
1801
CancelResourceWatch(const XdsResourceType * type,absl::string_view name,ResourceWatcherInterface * watcher,bool delay_unsubscription)1802 void XdsClient::CancelResourceWatch(const XdsResourceType* type,
1803 absl::string_view name,
1804 ResourceWatcherInterface* watcher,
1805 bool delay_unsubscription) {
1806 auto resource_name = ParseXdsResourceName(name, type);
1807 MutexLock lock(&mu_);
1808 // We cannot be sure whether the watcher is in invalid_watchers_ or in
1809 // authority_state_map_, so we check both, just to be safe.
1810 invalid_watchers_.erase(watcher);
1811 // Find authority.
1812 if (!resource_name.ok()) return;
1813 auto authority_it = authority_state_map_.find(resource_name->authority);
1814 if (authority_it == authority_state_map_.end()) return;
1815 AuthorityState& authority_state = authority_it->second;
1816 // Find type map.
1817 auto type_it = authority_state.resource_map.find(type);
1818 if (type_it == authority_state.resource_map.end()) return;
1819 auto& type_map = type_it->second;
1820 // Find resource key.
1821 auto resource_it = type_map.find(resource_name->key);
1822 if (resource_it == type_map.end()) return;
1823 ResourceState& resource_state = resource_it->second;
1824 // Remove watcher.
1825 resource_state.watchers.erase(watcher);
1826 // Clean up empty map entries, if any.
1827 if (resource_state.watchers.empty()) {
1828 if (resource_state.ignored_deletion) {
1829 gpr_log(GPR_INFO,
1830 "[xds_client %p] unsubscribing from a resource for which we "
1831 "previously ignored a deletion: type %s name %s",
1832 this, std::string(type->type_url()).c_str(),
1833 std::string(name).c_str());
1834 }
1835 for (const auto& xds_channel : authority_state.xds_channels) {
1836 xds_channel->UnsubscribeLocked(type, *resource_name,
1837 delay_unsubscription);
1838 }
1839 type_map.erase(resource_it);
1840 if (type_map.empty()) {
1841 authority_state.resource_map.erase(type_it);
1842 if (authority_state.resource_map.empty()) {
1843 authority_state.xds_channels.clear();
1844 }
1845 }
1846 }
1847 }
1848
MaybeRegisterResourceTypeLocked(const XdsResourceType * resource_type)1849 void XdsClient::MaybeRegisterResourceTypeLocked(
1850 const XdsResourceType* resource_type) {
1851 auto it = resource_types_.find(resource_type->type_url());
1852 if (it != resource_types_.end()) {
1853 GPR_ASSERT(it->second == resource_type);
1854 return;
1855 }
1856 resource_types_.emplace(resource_type->type_url(), resource_type);
1857 resource_type->InitUpbSymtab(this, def_pool_.ptr());
1858 }
1859
GetResourceTypeLocked(absl::string_view resource_type)1860 const XdsResourceType* XdsClient::GetResourceTypeLocked(
1861 absl::string_view resource_type) {
1862 auto it = resource_types_.find(resource_type);
1863 if (it != resource_types_.end()) return it->second;
1864 return nullptr;
1865 }
1866
ParseXdsResourceName(absl::string_view name,const XdsResourceType * type)1867 absl::StatusOr<XdsClient::XdsResourceName> XdsClient::ParseXdsResourceName(
1868 absl::string_view name, const XdsResourceType* type) {
1869 // Old-style names use the empty string for authority.
1870 // authority is set to kOldStyleAuthority to indicate that it's an
1871 // old-style name.
1872 if (!xds_federation_enabled_ || !absl::StartsWith(name, "xdstp:")) {
1873 return XdsResourceName{std::string(kOldStyleAuthority),
1874 {std::string(name), {}}};
1875 }
1876 // New style name. Parse URI.
1877 auto uri = URI::Parse(name);
1878 if (!uri.ok()) return uri.status();
1879 // Split the resource type off of the path to get the id.
1880 std::pair<absl::string_view, absl::string_view> path_parts = absl::StrSplit(
1881 absl::StripPrefix(uri->path(), "/"), absl::MaxSplits('/', 1));
1882 if (type->type_url() != path_parts.first) {
1883 return absl::InvalidArgumentError(
1884 "xdstp URI path must indicate valid xDS resource type");
1885 }
1886 // Canonicalize order of query params.
1887 std::vector<URI::QueryParam> query_params;
1888 for (const auto& p : uri->query_parameter_map()) {
1889 query_params.emplace_back(
1890 URI::QueryParam{std::string(p.first), std::string(p.second)});
1891 }
1892 return XdsResourceName{
1893 uri->authority(),
1894 {std::string(path_parts.second), std::move(query_params)}};
1895 }
1896
ConstructFullXdsResourceName(absl::string_view authority,absl::string_view resource_type,const XdsResourceKey & key)1897 std::string XdsClient::ConstructFullXdsResourceName(
1898 absl::string_view authority, absl::string_view resource_type,
1899 const XdsResourceKey& key) {
1900 if (authority != kOldStyleAuthority) {
1901 auto uri = URI::Create("xdstp", std::string(authority),
1902 absl::StrCat("/", resource_type, "/", key.id),
1903 key.query_params, /*fragment=*/"");
1904 GPR_ASSERT(uri.ok());
1905 return uri->ToString();
1906 }
1907 // Old-style name.
1908 return key.id;
1909 }
1910
AddClusterDropStats(const XdsBootstrap::XdsServer & xds_server,absl::string_view cluster_name,absl::string_view eds_service_name)1911 RefCountedPtr<XdsClusterDropStats> XdsClient::AddClusterDropStats(
1912 const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
1913 absl::string_view eds_service_name) {
1914 auto key =
1915 std::make_pair(std::string(cluster_name), std::string(eds_service_name));
1916 RefCountedPtr<XdsClusterDropStats> cluster_drop_stats;
1917 {
1918 MutexLock lock(&mu_);
1919 // We jump through some hoops here to make sure that the
1920 // absl::string_views stored in the XdsClusterDropStats object point
1921 // to the strings in the xds_load_report_server_map_ keys, so that
1922 // they have the same lifetime.
1923 auto server_it = xds_load_report_server_map_
1924 .emplace(xds_server.Key(), LoadReportServer())
1925 .first;
1926 if (server_it->second.xds_channel == nullptr) {
1927 server_it->second.xds_channel = GetOrCreateXdsChannelLocked(
1928 xds_server, "load report map (drop stats)");
1929 }
1930 auto load_report_it = server_it->second.load_report_map
1931 .emplace(std::move(key), LoadReportState())
1932 .first;
1933 LoadReportState& load_report_state = load_report_it->second;
1934 if (load_report_state.drop_stats != nullptr) {
1935 cluster_drop_stats = load_report_state.drop_stats->RefIfNonZero();
1936 }
1937 if (cluster_drop_stats == nullptr) {
1938 if (load_report_state.drop_stats != nullptr) {
1939 load_report_state.deleted_drop_stats +=
1940 load_report_state.drop_stats->GetSnapshotAndReset();
1941 }
1942 cluster_drop_stats = MakeRefCounted<XdsClusterDropStats>(
1943 Ref(DEBUG_LOCATION, "DropStats"), server_it->first /*xds_server*/,
1944 load_report_it->first.first /*cluster_name*/,
1945 load_report_it->first.second /*eds_service_name*/);
1946 load_report_state.drop_stats = cluster_drop_stats.get();
1947 }
1948 server_it->second.xds_channel->MaybeStartLrsCall();
1949 }
1950 work_serializer_.DrainQueue();
1951 return cluster_drop_stats;
1952 }
1953
RemoveClusterDropStats(absl::string_view xds_server_key,absl::string_view cluster_name,absl::string_view eds_service_name,XdsClusterDropStats * cluster_drop_stats)1954 void XdsClient::RemoveClusterDropStats(
1955 absl::string_view xds_server_key, absl::string_view cluster_name,
1956 absl::string_view eds_service_name,
1957 XdsClusterDropStats* cluster_drop_stats) {
1958 MutexLock lock(&mu_);
1959 auto server_it = xds_load_report_server_map_.find(xds_server_key);
1960 if (server_it == xds_load_report_server_map_.end()) return;
1961 auto load_report_it = server_it->second.load_report_map.find(
1962 std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
1963 if (load_report_it == server_it->second.load_report_map.end()) return;
1964 LoadReportState& load_report_state = load_report_it->second;
1965 if (load_report_state.drop_stats == cluster_drop_stats) {
1966 // Record final snapshot in deleted_drop_stats, which will be
1967 // added to the next load report.
1968 load_report_state.deleted_drop_stats +=
1969 load_report_state.drop_stats->GetSnapshotAndReset();
1970 load_report_state.drop_stats = nullptr;
1971 }
1972 }
1973
AddClusterLocalityStats(const XdsBootstrap::XdsServer & xds_server,absl::string_view cluster_name,absl::string_view eds_service_name,RefCountedPtr<XdsLocalityName> locality)1974 RefCountedPtr<XdsClusterLocalityStats> XdsClient::AddClusterLocalityStats(
1975 const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
1976 absl::string_view eds_service_name,
1977 RefCountedPtr<XdsLocalityName> locality) {
1978 auto key =
1979 std::make_pair(std::string(cluster_name), std::string(eds_service_name));
1980 RefCountedPtr<XdsClusterLocalityStats> cluster_locality_stats;
1981 {
1982 MutexLock lock(&mu_);
1983 // We jump through some hoops here to make sure that the
1984 // absl::string_views stored in the XdsClusterDropStats object point
1985 // to the strings in the xds_load_report_server_map_ keys, so that
1986 // they have the same lifetime.
1987 auto server_it = xds_load_report_server_map_
1988 .emplace(xds_server.Key(), LoadReportServer())
1989 .first;
1990 if (server_it->second.xds_channel == nullptr) {
1991 server_it->second.xds_channel = GetOrCreateXdsChannelLocked(
1992 xds_server, "load report map (locality stats)");
1993 }
1994 auto load_report_it = server_it->second.load_report_map
1995 .emplace(std::move(key), LoadReportState())
1996 .first;
1997 LoadReportState& load_report_state = load_report_it->second;
1998 LoadReportState::LocalityState& locality_state =
1999 load_report_state.locality_stats[locality];
2000 if (locality_state.locality_stats != nullptr) {
2001 cluster_locality_stats = locality_state.locality_stats->RefIfNonZero();
2002 }
2003 if (cluster_locality_stats == nullptr) {
2004 if (locality_state.locality_stats != nullptr) {
2005 locality_state.deleted_locality_stats +=
2006 locality_state.locality_stats->GetSnapshotAndReset();
2007 }
2008 cluster_locality_stats = MakeRefCounted<XdsClusterLocalityStats>(
2009 Ref(DEBUG_LOCATION, "LocalityStats"), server_it->first /*xds_server*/,
2010 load_report_it->first.first /*cluster_name*/,
2011 load_report_it->first.second /*eds_service_name*/,
2012 std::move(locality));
2013 locality_state.locality_stats = cluster_locality_stats.get();
2014 }
2015 server_it->second.xds_channel->MaybeStartLrsCall();
2016 }
2017 work_serializer_.DrainQueue();
2018 return cluster_locality_stats;
2019 }
2020
RemoveClusterLocalityStats(absl::string_view xds_server_key,absl::string_view cluster_name,absl::string_view eds_service_name,const RefCountedPtr<XdsLocalityName> & locality,XdsClusterLocalityStats * cluster_locality_stats)2021 void XdsClient::RemoveClusterLocalityStats(
2022 absl::string_view xds_server_key, absl::string_view cluster_name,
2023 absl::string_view eds_service_name,
2024 const RefCountedPtr<XdsLocalityName>& locality,
2025 XdsClusterLocalityStats* cluster_locality_stats) {
2026 MutexLock lock(&mu_);
2027 auto server_it = xds_load_report_server_map_.find(xds_server_key);
2028 if (server_it == xds_load_report_server_map_.end()) return;
2029 auto load_report_it = server_it->second.load_report_map.find(
2030 std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
2031 if (load_report_it == server_it->second.load_report_map.end()) return;
2032 LoadReportState& load_report_state = load_report_it->second;
2033 auto locality_it = load_report_state.locality_stats.find(locality);
2034 if (locality_it == load_report_state.locality_stats.end()) return;
2035 LoadReportState::LocalityState& locality_state = locality_it->second;
2036 if (locality_state.locality_stats == cluster_locality_stats) {
2037 // Record final snapshot in deleted_locality_stats, which will be
2038 // added to the next load report.
2039 locality_state.deleted_locality_stats +=
2040 locality_state.locality_stats->GetSnapshotAndReset();
2041 locality_state.locality_stats = nullptr;
2042 }
2043 }
2044
ResetBackoff()2045 void XdsClient::ResetBackoff() {
2046 MutexLock lock(&mu_);
2047 for (auto& p : xds_channel_map_) {
2048 p.second->ResetBackoff();
2049 }
2050 }
2051
NotifyWatchersOnErrorLocked(const std::map<ResourceWatcherInterface *,RefCountedPtr<ResourceWatcherInterface>> & watchers,absl::Status status,RefCountedPtr<ReadDelayHandle> read_delay_handle)2052 void XdsClient::NotifyWatchersOnErrorLocked(
2053 const std::map<ResourceWatcherInterface*,
2054 RefCountedPtr<ResourceWatcherInterface>>& watchers,
2055 absl::Status status, RefCountedPtr<ReadDelayHandle> read_delay_handle) {
2056 const auto* node = bootstrap_->node();
2057 if (node != nullptr) {
2058 status = absl::Status(
2059 status.code(),
2060 absl::StrCat(status.message(), " (node ID:", node->id(), ")"));
2061 }
2062 work_serializer_.Schedule(
2063 [watchers, status = std::move(status),
2064 read_delay_handle = std::move(read_delay_handle)]()
2065 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
2066 for (const auto& p : watchers) {
2067 p.first->OnError(status, read_delay_handle);
2068 }
2069 },
2070 DEBUG_LOCATION);
2071 }
2072
NotifyWatchersOnResourceDoesNotExist(const std::map<ResourceWatcherInterface *,RefCountedPtr<ResourceWatcherInterface>> & watchers,RefCountedPtr<ReadDelayHandle> read_delay_handle)2073 void XdsClient::NotifyWatchersOnResourceDoesNotExist(
2074 const std::map<ResourceWatcherInterface*,
2075 RefCountedPtr<ResourceWatcherInterface>>& watchers,
2076 RefCountedPtr<ReadDelayHandle> read_delay_handle) {
2077 work_serializer_.Schedule(
2078 [watchers, read_delay_handle = std::move(read_delay_handle)]()
2079 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
2080 for (const auto& p : watchers) {
2081 p.first->OnResourceDoesNotExist(read_delay_handle);
2082 }
2083 },
2084 DEBUG_LOCATION);
2085 }
2086
BuildLoadReportSnapshotLocked(const XdsBootstrap::XdsServer & xds_server,bool send_all_clusters,const std::set<std::string> & clusters)2087 XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked(
2088 const XdsBootstrap::XdsServer& xds_server, bool send_all_clusters,
2089 const std::set<std::string>& clusters) {
2090 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2091 gpr_log(GPR_INFO, "[xds_client %p] start building load report", this);
2092 }
2093 XdsApi::ClusterLoadReportMap snapshot_map;
2094 auto server_it = xds_load_report_server_map_.find(xds_server.Key());
2095 if (server_it == xds_load_report_server_map_.end()) return snapshot_map;
2096 auto& load_report_map = server_it->second.load_report_map;
2097 for (auto load_report_it = load_report_map.begin();
2098 load_report_it != load_report_map.end();) {
2099 // Cluster key is cluster and EDS service name.
2100 const auto& cluster_key = load_report_it->first;
2101 LoadReportState& load_report = load_report_it->second;
2102 // If the CDS response for a cluster indicates to use LRS but the
2103 // LRS server does not say that it wants reports for this cluster,
2104 // then we'll have stats objects here whose data we're not going to
2105 // include in the load report. However, we still need to clear out
2106 // the data from the stats objects, so that if the LRS server starts
2107 // asking for the data in the future, we don't incorrectly include
2108 // data from previous reporting intervals in that future report.
2109 const bool record_stats =
2110 send_all_clusters || clusters.find(cluster_key.first) != clusters.end();
2111 XdsApi::ClusterLoadReport snapshot;
2112 // Aggregate drop stats.
2113 snapshot.dropped_requests = std::move(load_report.deleted_drop_stats);
2114 if (load_report.drop_stats != nullptr) {
2115 snapshot.dropped_requests +=
2116 load_report.drop_stats->GetSnapshotAndReset();
2117 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2118 gpr_log(GPR_INFO,
2119 "[xds_client %p] cluster=%s eds_service_name=%s drop_stats=%p",
2120 this, cluster_key.first.c_str(), cluster_key.second.c_str(),
2121 load_report.drop_stats);
2122 }
2123 }
2124 // Aggregate locality stats.
2125 for (auto it = load_report.locality_stats.begin();
2126 it != load_report.locality_stats.end();) {
2127 const RefCountedPtr<XdsLocalityName>& locality_name = it->first;
2128 auto& locality_state = it->second;
2129 XdsClusterLocalityStats::Snapshot& locality_snapshot =
2130 snapshot.locality_stats[locality_name];
2131 locality_snapshot = std::move(locality_state.deleted_locality_stats);
2132 if (locality_state.locality_stats != nullptr) {
2133 locality_snapshot +=
2134 locality_state.locality_stats->GetSnapshotAndReset();
2135 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2136 gpr_log(GPR_INFO,
2137 "[xds_client %p] cluster=%s eds_service_name=%s "
2138 "locality=%s locality_stats=%p",
2139 this, cluster_key.first.c_str(), cluster_key.second.c_str(),
2140 locality_name->human_readable_string().c_str(),
2141 locality_state.locality_stats);
2142 }
2143 }
2144 // If the only thing left in this entry was final snapshots from
2145 // deleted locality stats objects, remove the entry.
2146 if (locality_state.locality_stats == nullptr) {
2147 it = load_report.locality_stats.erase(it);
2148 } else {
2149 ++it;
2150 }
2151 }
2152 // Compute load report interval.
2153 const Timestamp now = Timestamp::Now();
2154 snapshot.load_report_interval = now - load_report.last_report_time;
2155 load_report.last_report_time = now;
2156 // Record snapshot.
2157 if (record_stats) {
2158 snapshot_map[cluster_key] = std::move(snapshot);
2159 }
2160 // If the only thing left in this entry was final snapshots from
2161 // deleted stats objects, remove the entry.
2162 if (load_report.locality_stats.empty() &&
2163 load_report.drop_stats == nullptr) {
2164 load_report_it = load_report_map.erase(load_report_it);
2165 } else {
2166 ++load_report_it;
2167 }
2168 }
2169 return snapshot_map;
2170 }
2171
2172 namespace {
2173
EncodeTimestamp(Timestamp value,upb_Arena * arena)2174 google_protobuf_Timestamp* EncodeTimestamp(Timestamp value, upb_Arena* arena) {
2175 google_protobuf_Timestamp* timestamp = google_protobuf_Timestamp_new(arena);
2176 gpr_timespec timespec = value.as_timespec(GPR_CLOCK_REALTIME);
2177 google_protobuf_Timestamp_set_seconds(timestamp, timespec.tv_sec);
2178 google_protobuf_Timestamp_set_nanos(timestamp, timespec.tv_nsec);
2179 return timestamp;
2180 }
2181
FillGenericXdsConfig(const XdsApi::ResourceMetadata & metadata,upb_StringView type_url,upb_StringView resource_name,upb_Arena * arena,envoy_service_status_v3_ClientConfig_GenericXdsConfig * entry)2182 void FillGenericXdsConfig(
2183 const XdsApi::ResourceMetadata& metadata, upb_StringView type_url,
2184 upb_StringView resource_name, upb_Arena* arena,
2185 envoy_service_status_v3_ClientConfig_GenericXdsConfig* entry) {
2186 envoy_service_status_v3_ClientConfig_GenericXdsConfig_set_type_url(entry,
2187 type_url);
2188 envoy_service_status_v3_ClientConfig_GenericXdsConfig_set_name(entry,
2189 resource_name);
2190 envoy_service_status_v3_ClientConfig_GenericXdsConfig_set_client_status(
2191 entry, metadata.client_status);
2192 if (!metadata.serialized_proto.empty()) {
2193 envoy_service_status_v3_ClientConfig_GenericXdsConfig_set_version_info(
2194 entry, StdStringToUpbString(metadata.version));
2195 envoy_service_status_v3_ClientConfig_GenericXdsConfig_set_last_updated(
2196 entry, EncodeTimestamp(metadata.update_time, arena));
2197 auto* any_field =
2198 envoy_service_status_v3_ClientConfig_GenericXdsConfig_mutable_xds_config(
2199 entry, arena);
2200 google_protobuf_Any_set_type_url(any_field, type_url);
2201 google_protobuf_Any_set_value(
2202 any_field, StdStringToUpbString(metadata.serialized_proto));
2203 }
2204 if (metadata.client_status == XdsApi::ResourceMetadata::NACKED) {
2205 auto* update_failure_state = envoy_admin_v3_UpdateFailureState_new(arena);
2206 envoy_admin_v3_UpdateFailureState_set_details(
2207 update_failure_state, StdStringToUpbString(metadata.failed_details));
2208 envoy_admin_v3_UpdateFailureState_set_version_info(
2209 update_failure_state, StdStringToUpbString(metadata.failed_version));
2210 envoy_admin_v3_UpdateFailureState_set_last_update_attempt(
2211 update_failure_state,
2212 EncodeTimestamp(metadata.failed_update_time, arena));
2213 envoy_service_status_v3_ClientConfig_GenericXdsConfig_set_error_state(
2214 entry, update_failure_state);
2215 }
2216 }
2217
2218 } // namespace
2219
DumpClientConfig(std::set<std::string> * string_pool,upb_Arena * arena,envoy_service_status_v3_ClientConfig * client_config)2220 void XdsClient::DumpClientConfig(
2221 std::set<std::string>* string_pool, upb_Arena* arena,
2222 envoy_service_status_v3_ClientConfig* client_config) {
2223 // Assemble config dump messages
2224 // Fill-in the node information
2225 auto* node =
2226 envoy_service_status_v3_ClientConfig_mutable_node(client_config, arena);
2227 api_.PopulateNode(node, arena);
2228 // Dump each resource.
2229 for (const auto& a : authority_state_map_) { // authority
2230 const std::string& authority = a.first;
2231 for (const auto& t : a.second.resource_map) { // type
2232 const XdsResourceType* type = t.first;
2233 auto it =
2234 string_pool
2235 ->emplace(absl::StrCat("type.googleapis.com/", type->type_url()))
2236 .first;
2237 upb_StringView type_url = StdStringToUpbString(*it);
2238 for (const auto& r : t.second) { // resource id
2239 auto it2 = string_pool
2240 ->emplace(ConstructFullXdsResourceName(
2241 authority, type->type_url(), r.first))
2242 .first;
2243 upb_StringView resource_name = StdStringToUpbString(*it2);
2244 envoy_service_status_v3_ClientConfig_GenericXdsConfig* entry =
2245 envoy_service_status_v3_ClientConfig_add_generic_xds_configs(
2246 client_config, arena);
2247 FillGenericXdsConfig(r.second.meta, type_url, resource_name, arena,
2248 entry);
2249 }
2250 }
2251 }
2252 }
2253
2254 namespace {
2255
CacheStateForEntry(const XdsApi::ResourceMetadata & metadata,bool resource_cached)2256 absl::string_view CacheStateForEntry(const XdsApi::ResourceMetadata& metadata,
2257 bool resource_cached) {
2258 switch (metadata.client_status) {
2259 case XdsApi::ResourceMetadata::REQUESTED:
2260 return "requested";
2261 case XdsApi::ResourceMetadata::DOES_NOT_EXIST:
2262 return "does_not_exist";
2263 case XdsApi::ResourceMetadata::ACKED:
2264 return "acked";
2265 case XdsApi::ResourceMetadata::NACKED:
2266 return resource_cached ? "nacked_but_cached" : "nacked";
2267 }
2268 Crash("unknown resource state");
2269 }
2270
2271 } // namespace
2272
ReportResourceCounts(absl::FunctionRef<void (const ResourceCountLabels &,uint64_t)> func)2273 void XdsClient::ReportResourceCounts(
2274 absl::FunctionRef<void(const ResourceCountLabels&, uint64_t)> func) {
2275 ResourceCountLabels labels;
2276 for (const auto& a : authority_state_map_) { // authority
2277 labels.xds_authority = a.first;
2278 for (const auto& t : a.second.resource_map) { // type
2279 labels.resource_type = t.first->type_url();
2280 // Count the number of entries in each state.
2281 std::map<absl::string_view, uint64_t> counts;
2282 for (const auto& r : t.second) { // resource id
2283 absl::string_view cache_state =
2284 CacheStateForEntry(r.second.meta, r.second.resource != nullptr);
2285 ++counts[cache_state];
2286 }
2287 // Report the count for each state.
2288 for (const auto& c : counts) {
2289 labels.cache_state = c.first;
2290 func(labels, c.second);
2291 }
2292 }
2293 }
2294 }
2295
ReportServerConnections(absl::FunctionRef<void (absl::string_view,bool)> func)2296 void XdsClient::ReportServerConnections(
2297 absl::FunctionRef<void(absl::string_view, bool)> func) {
2298 for (const auto& p : xds_channel_map_) {
2299 func(p.second->server_uri(), p.second->status().ok());
2300 }
2301 }
2302
2303 } // namespace grpc_core
2304