xref: /aosp_15_r20/external/grpc-grpc/src/core/ext/xds/xds_client.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include "src/core/ext/xds/xds_client.h"
20 
21 #include <inttypes.h>
22 #include <string.h>
23 
24 #include <algorithm>
25 #include <functional>
26 #include <memory>
27 #include <string>
28 #include <type_traits>
29 #include <vector>
30 
31 #include "absl/cleanup/cleanup.h"
32 #include "absl/strings/match.h"
33 #include "absl/strings/str_cat.h"
34 #include "absl/strings/str_join.h"
35 #include "absl/strings/str_split.h"
36 #include "absl/strings/string_view.h"
37 #include "absl/strings/strip.h"
38 #include "absl/types/optional.h"
39 #include "envoy/config/core/v3/base.upb.h"
40 #include "envoy/service/status/v3/csds.upb.h"
41 #include "google/protobuf/any.upb.h"
42 #include "google/protobuf/timestamp.upb.h"
43 #include "upb/base/string_view.h"
44 #include "upb/mem/arena.h"
45 
46 #include <grpc/event_engine/event_engine.h>
47 #include <grpc/support/log.h>
48 
49 #include "src/core/ext/xds/upb_utils.h"
50 #include "src/core/ext/xds/xds_api.h"
51 #include "src/core/ext/xds/xds_bootstrap.h"
52 #include "src/core/ext/xds/xds_client_stats.h"
53 #include "src/core/lib/backoff/backoff.h"
54 #include "src/core/lib/gprpp/debug_location.h"
55 #include "src/core/lib/gprpp/orphanable.h"
56 #include "src/core/lib/gprpp/ref_counted_ptr.h"
57 #include "src/core/lib/gprpp/sync.h"
58 #include "src/core/lib/iomgr/exec_ctx.h"
59 #include "src/core/lib/uri/uri_parser.h"
60 
61 #define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1
62 #define GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER 1.6
63 #define GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS 120
64 #define GRPC_XDS_RECONNECT_JITTER 0.2
65 #define GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS 1000
66 
67 namespace grpc_core {
68 
69 using ::grpc_event_engine::experimental::EventEngine;
70 
71 TraceFlag grpc_xds_client_trace(false, "xds_client");
72 TraceFlag grpc_xds_client_refcount_trace(false, "xds_client_refcount");
73 
74 //
75 // Internal class declarations
76 //
77 
78 // An xds call wrapper that can restart a call upon failure. Holds a ref to
79 // the xds channel. The template parameter is the kind of wrapped xds call.
80 template <typename T>
81 class XdsClient::XdsChannel::RetryableCall final
82     : public InternallyRefCounted<RetryableCall<T>> {
83  public:
84   explicit RetryableCall(WeakRefCountedPtr<XdsChannel> xds_channel);
85 
86   // Disable thread-safety analysis because this method is called via
87   // OrphanablePtr<>, but there's no way to pass the lock annotation
88   // through there.
89   void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
90 
91   void OnCallFinishedLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
92 
call() const93   T* call() const { return call_.get(); }
xds_channel() const94   XdsChannel* xds_channel() const { return xds_channel_.get(); }
95 
96   bool IsCurrentCallOnChannel() const;
97 
98  private:
99   void StartNewCallLocked();
100   void StartRetryTimerLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
101 
102   void OnRetryTimer();
103 
104   // The wrapped xds call that talks to the xds server. It's instantiated
105   // every time we start a new call. It's null during call retry backoff.
106   OrphanablePtr<T> call_;
107   // The owning xds channel.
108   WeakRefCountedPtr<XdsChannel> xds_channel_;
109 
110   // Retry state.
111   BackOff backoff_;
112   absl::optional<EventEngine::TaskHandle> timer_handle_
113       ABSL_GUARDED_BY(&XdsClient::mu_);
114 
115   bool shutting_down_ = false;
116 };
117 
118 // Contains an ADS call to the xds server.
119 class XdsClient::XdsChannel::AdsCall final
120     : public InternallyRefCounted<AdsCall> {
121  public:
122   // The ctor and dtor should not be used directly.
123   explicit AdsCall(RefCountedPtr<RetryableCall<AdsCall>> retryable_call);
124 
125   void Orphan() override;
126 
retryable_call() const127   RetryableCall<AdsCall>* retryable_call() const {
128     return retryable_call_.get();
129   }
xds_channel() const130   XdsChannel* xds_channel() const { return retryable_call_->xds_channel(); }
xds_client() const131   XdsClient* xds_client() const { return xds_channel()->xds_client(); }
seen_response() const132   bool seen_response() const { return seen_response_; }
133 
134   void SubscribeLocked(const XdsResourceType* type, const XdsResourceName& name,
135                        bool delay_send)
136       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
137   void UnsubscribeLocked(const XdsResourceType* type,
138                          const XdsResourceName& name, bool delay_unsubscription)
139       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
140 
141   bool HasSubscribedResources() const;
142 
143  private:
144   class AdsReadDelayHandle;
145 
146   class AdsResponseParser final : public XdsApi::AdsResponseParserInterface {
147    public:
148     struct Result {
149       const XdsResourceType* type;
150       std::string type_url;
151       std::string version;
152       std::string nonce;
153       std::vector<std::string> errors;
154       std::map<std::string /*authority*/, std::set<XdsResourceKey>>
155           resources_seen;
156       uint64_t num_valid_resources = 0;
157       uint64_t num_invalid_resources = 0;
158       RefCountedPtr<ReadDelayHandle> read_delay_handle;
159     };
160 
AdsResponseParser(AdsCall * ads_call)161     explicit AdsResponseParser(AdsCall* ads_call) : ads_call_(ads_call) {}
162 
163     absl::Status ProcessAdsResponseFields(AdsResponseFields fields) override
164         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
165 
166     void ParseResource(upb_Arena* arena, size_t idx, absl::string_view type_url,
167                        absl::string_view resource_name,
168                        absl::string_view serialized_resource) override
169         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
170 
171     void ResourceWrapperParsingFailed(size_t idx,
172                                       absl::string_view message) override;
173 
TakeResult()174     Result TakeResult() { return std::move(result_); }
175 
176    private:
xds_client() const177     XdsClient* xds_client() const { return ads_call_->xds_client(); }
178 
179     AdsCall* ads_call_;
180     const Timestamp update_time_ = Timestamp::Now();
181     Result result_;
182   };
183 
184   class ResourceTimer final : public InternallyRefCounted<ResourceTimer> {
185    public:
ResourceTimer(const XdsResourceType * type,const XdsResourceName & name)186     ResourceTimer(const XdsResourceType* type, const XdsResourceName& name)
187         : type_(type), name_(name) {}
188 
189     // Disable thread-safety analysis because this method is called via
190     // OrphanablePtr<>, but there's no way to pass the lock annotation
191     // through there.
Orphan()192     void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS {
193       MaybeCancelTimer();
194       Unref(DEBUG_LOCATION, "Orphan");
195     }
196 
MarkSubscriptionSendStarted()197     void MarkSubscriptionSendStarted()
198         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
199       subscription_sent_ = true;
200     }
201 
MaybeMarkSubscriptionSendComplete(RefCountedPtr<AdsCall> ads_call)202     void MaybeMarkSubscriptionSendComplete(RefCountedPtr<AdsCall> ads_call)
203         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
204       if (subscription_sent_) MaybeStartTimer(std::move(ads_call));
205     }
206 
MarkSeen()207     void MarkSeen() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
208       resource_seen_ = true;
209       MaybeCancelTimer();
210     }
211 
MaybeCancelTimer()212     void MaybeCancelTimer() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
213       if (timer_handle_.has_value() &&
214           ads_call_->xds_client()->engine()->Cancel(*timer_handle_)) {
215         timer_handle_.reset();
216       }
217     }
218 
219    private:
MaybeStartTimer(RefCountedPtr<AdsCall> ads_call)220     void MaybeStartTimer(RefCountedPtr<AdsCall> ads_call)
221         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
222       // Don't start timer if we've already either seen the resource or
223       // marked it as non-existing.
224       // Note: There are edge cases where we can have seen the resource
225       // before we have sent the initial subscription request, such as
226       // when we unsubscribe and then resubscribe to a given resource
227       // and then get a response containing that resource, all while a
228       // send_message op is in flight.
229       if (resource_seen_) return;
230       // Don't start timer if we haven't yet sent the initial subscription
231       // request for the resource.
232       if (!subscription_sent_) return;
233       // Don't start timer if it's already running.
234       if (timer_handle_.has_value()) return;
235       // Check if we already have a cached version of this resource
236       // (i.e., if this is the initial request for the resource after an
237       // ADS stream restart).  If so, we don't start the timer, because
238       // (a) we already have the resource and (b) the server may
239       // optimize by not resending the resource that we already have.
240       auto& authority_state =
241           ads_call->xds_client()->authority_state_map_[name_.authority];
242       ResourceState& state = authority_state.resource_map[type_][name_.key];
243       if (state.resource != nullptr) return;
244       // Start timer.
245       ads_call_ = std::move(ads_call);
246       timer_handle_ = ads_call_->xds_client()->engine()->RunAfter(
247           ads_call_->xds_client()->request_timeout_,
248           [self = Ref(DEBUG_LOCATION, "timer")]() {
249             ApplicationCallbackExecCtx callback_exec_ctx;
250             ExecCtx exec_ctx;
251             self->OnTimer();
252           });
253     }
254 
OnTimer()255     void OnTimer() {
256       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
257         gpr_log(GPR_INFO,
258                 "[xds_client %p] xds server %s: timeout obtaining resource "
259                 "{type=%s name=%s} from xds server",
260                 ads_call_->xds_client(),
261                 ads_call_->xds_channel()->server_.server_uri().c_str(),
262                 std::string(type_->type_url()).c_str(),
263                 XdsClient::ConstructFullXdsResourceName(
264                     name_.authority, type_->type_url(), name_.key)
265                     .c_str());
266       }
267       {
268         MutexLock lock(&ads_call_->xds_client()->mu_);
269         timer_handle_.reset();
270         resource_seen_ = true;
271         auto& authority_state =
272             ads_call_->xds_client()->authority_state_map_[name_.authority];
273         ResourceState& state = authority_state.resource_map[type_][name_.key];
274         state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST;
275         ads_call_->xds_client()->NotifyWatchersOnResourceDoesNotExist(
276             state.watchers, ReadDelayHandle::NoWait());
277       }
278       ads_call_->xds_client()->work_serializer_.DrainQueue();
279       ads_call_.reset();
280     }
281 
282     const XdsResourceType* type_;
283     const XdsResourceName name_;
284 
285     RefCountedPtr<AdsCall> ads_call_;
286     // True if we have sent the initial subscription request for this
287     // resource on this ADS stream.
288     bool subscription_sent_ ABSL_GUARDED_BY(&XdsClient::mu_) = false;
289     // True if we have either (a) seen the resource in a response on this
290     // stream or (b) declared the resource to not exist due to the timer
291     // firing.
292     bool resource_seen_ ABSL_GUARDED_BY(&XdsClient::mu_) = false;
293     absl::optional<EventEngine::TaskHandle> timer_handle_
294         ABSL_GUARDED_BY(&XdsClient::mu_);
295   };
296 
297   class StreamEventHandler final
298       : public XdsTransportFactory::XdsTransport::StreamingCall::EventHandler {
299    public:
StreamEventHandler(RefCountedPtr<AdsCall> ads_call)300     explicit StreamEventHandler(RefCountedPtr<AdsCall> ads_call)
301         : ads_call_(std::move(ads_call)) {}
302 
OnRequestSent(bool ok)303     void OnRequestSent(bool ok) override { ads_call_->OnRequestSent(ok); }
OnRecvMessage(absl::string_view payload)304     void OnRecvMessage(absl::string_view payload) override {
305       ads_call_->OnRecvMessage(payload);
306     }
OnStatusReceived(absl::Status status)307     void OnStatusReceived(absl::Status status) override {
308       ads_call_->OnStatusReceived(std::move(status));
309     }
310 
311    private:
312     RefCountedPtr<AdsCall> ads_call_;
313   };
314 
315   struct ResourceTypeState {
316     // Nonce and status for this resource type.
317     std::string nonce;
318     absl::Status status;
319 
320     // Subscribed resources of this type.
321     std::map<std::string /*authority*/,
322              std::map<XdsResourceKey, OrphanablePtr<ResourceTimer>>>
323         subscribed_resources;
324   };
325 
326   void SendMessageLocked(const XdsResourceType* type)
327       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
328 
329   void OnRequestSent(bool ok);
330   void OnRecvMessage(absl::string_view payload);
331   void OnStatusReceived(absl::Status status);
332 
333   bool IsCurrentCallOnChannel() const;
334 
335   // Constructs a list of resource names of a given type for an ADS
336   // request.  Also starts the timer for each resource if needed.
337   std::vector<std::string> ResourceNamesForRequest(const XdsResourceType* type)
338       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
339 
340   // The owning RetryableCall<>.
341   RefCountedPtr<RetryableCall<AdsCall>> retryable_call_;
342 
343   OrphanablePtr<XdsTransportFactory::XdsTransport::StreamingCall>
344       streaming_call_;
345 
346   bool sent_initial_message_ = false;
347   bool seen_response_ = false;
348 
349   const XdsResourceType* send_message_pending_
350       ABSL_GUARDED_BY(&XdsClient::mu_) = nullptr;
351 
352   // Resource types for which requests need to be sent.
353   std::set<const XdsResourceType*> buffered_requests_;
354 
355   // State for each resource type.
356   std::map<const XdsResourceType*, ResourceTypeState> state_map_;
357 };
358 
359 // Contains an LRS call to the xds server.
360 class XdsClient::XdsChannel::LrsCall final
361     : public InternallyRefCounted<LrsCall> {
362  public:
363   // The ctor and dtor should not be used directly.
364   explicit LrsCall(RefCountedPtr<RetryableCall<LrsCall>> retryable_call);
365 
366   void Orphan() override;
367 
retryable_call()368   RetryableCall<LrsCall>* retryable_call() { return retryable_call_.get(); }
xds_channel() const369   XdsChannel* xds_channel() const { return retryable_call_->xds_channel(); }
xds_client() const370   XdsClient* xds_client() const { return xds_channel()->xds_client(); }
seen_response() const371   bool seen_response() const { return seen_response_; }
372 
373  private:
374   class StreamEventHandler final
375       : public XdsTransportFactory::XdsTransport::StreamingCall::EventHandler {
376    public:
StreamEventHandler(RefCountedPtr<LrsCall> lrs_call)377     explicit StreamEventHandler(RefCountedPtr<LrsCall> lrs_call)
378         : lrs_call_(std::move(lrs_call)) {}
379 
OnRequestSent(bool)380     void OnRequestSent(bool /*ok*/) override { lrs_call_->OnRequestSent(); }
OnRecvMessage(absl::string_view payload)381     void OnRecvMessage(absl::string_view payload) override {
382       lrs_call_->OnRecvMessage(payload);
383     }
OnStatusReceived(absl::Status status)384     void OnStatusReceived(absl::Status status) override {
385       lrs_call_->OnStatusReceived(std::move(status));
386     }
387 
388    private:
389     RefCountedPtr<LrsCall> lrs_call_;
390   };
391 
392   // A repeating timer for a particular duration.
393   class Timer final : public InternallyRefCounted<Timer> {
394    public:
Timer(RefCountedPtr<LrsCall> lrs_call)395     explicit Timer(RefCountedPtr<LrsCall> lrs_call)
396         : lrs_call_(std::move(lrs_call)) {}
~Timer()397     ~Timer() override { lrs_call_.reset(DEBUG_LOCATION, "LRS timer"); }
398 
399     // Disable thread-safety analysis because this method is called via
400     // OrphanablePtr<>, but there's no way to pass the lock annotation
401     // through there.
402     void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
403 
404     void ScheduleNextReportLocked()
405         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
406 
407    private:
IsCurrentTimerOnCall() const408     bool IsCurrentTimerOnCall() const {
409       return this == lrs_call_->timer_.get();
410     }
xds_client() const411     XdsClient* xds_client() const { return lrs_call_->xds_client(); }
412 
413     void OnNextReportTimer();
414 
415     // The owning LRS call.
416     RefCountedPtr<LrsCall> lrs_call_;
417 
418     absl::optional<EventEngine::TaskHandle> timer_handle_
419         ABSL_GUARDED_BY(&XdsClient::mu_);
420   };
421 
422   void MaybeScheduleNextReportLocked()
423       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
424 
425   void SendReportLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
426 
427   void SendMessageLocked(std::string payload)
428       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
429 
430   void OnRequestSent();
431   void OnRecvMessage(absl::string_view payload);
432   void OnStatusReceived(absl::Status status);
433 
434   bool IsCurrentCallOnChannel() const;
435 
436   // The owning RetryableCall<>.
437   RefCountedPtr<RetryableCall<LrsCall>> retryable_call_;
438 
439   OrphanablePtr<XdsTransportFactory::XdsTransport::StreamingCall>
440       streaming_call_;
441 
442   bool seen_response_ = false;
443   bool send_message_pending_ ABSL_GUARDED_BY(&XdsClient::mu_) = false;
444 
445   // Load reporting state.
446   bool send_all_clusters_ = false;
447   std::set<std::string> cluster_names_;  // Asked for by the LRS server.
448   Duration load_reporting_interval_;
449   bool last_report_counters_were_zero_ = false;
450   OrphanablePtr<Timer> timer_;
451 };
452 
453 //
454 // XdsClient::XdsChannel
455 //
456 
XdsChannel(WeakRefCountedPtr<XdsClient> xds_client,const XdsBootstrap::XdsServer & server)457 XdsClient::XdsChannel::XdsChannel(WeakRefCountedPtr<XdsClient> xds_client,
458                                   const XdsBootstrap::XdsServer& server)
459     : DualRefCounted<XdsChannel>(
460           GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace) ? "XdsChannel"
461                                                                   : nullptr),
462       xds_client_(std::move(xds_client)),
463       server_(server) {
464   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
465     gpr_log(GPR_INFO, "[xds_client %p] creating channel %p for server %s",
466             xds_client_.get(), this, server.server_uri().c_str());
467   }
468   absl::Status status;
469   transport_ = xds_client_->transport_factory_->Create(
470       server,
471       [self = WeakRef(DEBUG_LOCATION, "OnConnectivityFailure")](
472           absl::Status status) {
473         self->OnConnectivityFailure(std::move(status));
474       },
475       &status);
476   GPR_ASSERT(transport_ != nullptr);
477   if (!status.ok()) SetChannelStatusLocked(std::move(status));
478 }
479 
~XdsChannel()480 XdsClient::XdsChannel::~XdsChannel() {
481   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
482     gpr_log(GPR_INFO, "[xds_client %p] destroying xds channel %p for server %s",
483             xds_client(), this, server_.server_uri().c_str());
484   }
485   xds_client_.reset(DEBUG_LOCATION, "XdsChannel");
486 }
487 
488 // This method should only ever be called when holding the lock, but we can't
489 // use a ABSL_EXCLUSIVE_LOCKS_REQUIRED annotation, because Orphan() will be
490 // called from DualRefCounted::Unref, which cannot have a lock annotation for
491 // a lock in this subclass.
Orphaned()492 void XdsClient::XdsChannel::Orphaned() ABSL_NO_THREAD_SAFETY_ANALYSIS {
493   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
494     gpr_log(GPR_INFO, "[xds_client %p] orphaning xds channel %p for server %s",
495             xds_client(), this, server_.server_uri().c_str());
496   }
497   shutting_down_ = true;
498   transport_.reset();
499   // At this time, all strong refs are removed, remove from channel map to
500   // prevent subsequent subscription from trying to use this XdsChannel as
501   // it is shutting down.
502   xds_client_->xds_channel_map_.erase(server_.Key());
503   ads_call_.reset();
504   lrs_call_.reset();
505 }
506 
ResetBackoff()507 void XdsClient::XdsChannel::ResetBackoff() { transport_->ResetBackoff(); }
508 
ads_call() const509 XdsClient::XdsChannel::AdsCall* XdsClient::XdsChannel::ads_call() const {
510   return ads_call_->call();
511 }
512 
lrs_call() const513 XdsClient::XdsChannel::LrsCall* XdsClient::XdsChannel::lrs_call() const {
514   return lrs_call_->call();
515 }
516 
MaybeStartLrsCall()517 void XdsClient::XdsChannel::MaybeStartLrsCall() {
518   if (lrs_call_ != nullptr) return;
519   lrs_call_.reset(
520       new RetryableCall<LrsCall>(WeakRef(DEBUG_LOCATION, "XdsChannel+lrs")));
521 }
522 
StopLrsCallLocked()523 void XdsClient::XdsChannel::StopLrsCallLocked() {
524   xds_client_->xds_load_report_server_map_.erase(server_.Key());
525   lrs_call_.reset();
526 }
527 
SubscribeLocked(const XdsResourceType * type,const XdsResourceName & name)528 void XdsClient::XdsChannel::SubscribeLocked(const XdsResourceType* type,
529                                             const XdsResourceName& name) {
530   if (ads_call_ == nullptr) {
531     // Start the ADS call if this is the first request.
532     ads_call_.reset(
533         new RetryableCall<AdsCall>(WeakRef(DEBUG_LOCATION, "XdsChannel+ads")));
534     // Note: AdsCall's ctor will automatically subscribe to all
535     // resources that the XdsClient already has watchers for, so we can
536     // return here.
537     return;
538   }
539   // If the ADS call is in backoff state, we don't need to do anything now
540   // because when the call is restarted it will resend all necessary requests.
541   if (ads_call() == nullptr) return;
542   // Subscribe to this resource if the ADS call is active.
543   ads_call()->SubscribeLocked(type, name, /*delay_send=*/false);
544 }
545 
UnsubscribeLocked(const XdsResourceType * type,const XdsResourceName & name,bool delay_unsubscription)546 void XdsClient::XdsChannel::UnsubscribeLocked(const XdsResourceType* type,
547                                               const XdsResourceName& name,
548                                               bool delay_unsubscription) {
549   if (ads_call_ != nullptr) {
550     auto* call = ads_call_->call();
551     if (call != nullptr) {
552       call->UnsubscribeLocked(type, name, delay_unsubscription);
553       if (!call->HasSubscribedResources()) {
554         ads_call_.reset();
555       }
556     }
557   }
558 }
559 
MaybeFallbackLocked(const std::string & authority,AuthorityState & authority_state)560 bool XdsClient::XdsChannel::MaybeFallbackLocked(
561     const std::string& authority, AuthorityState& authority_state) {
562   if (!xds_client_->HasUncachedResources(authority_state)) {
563     return false;
564   }
565   std::vector<const XdsBootstrap::XdsServer*> xds_servers;
566   if (authority != kOldStyleAuthority) {
567     xds_servers =
568         xds_client_->bootstrap().LookupAuthority(authority)->servers();
569   }
570   if (xds_servers.empty()) xds_servers = xds_client_->bootstrap().servers();
571   for (size_t i = authority_state.xds_channels.size(); i < xds_servers.size();
572        ++i) {
573     authority_state.xds_channels.emplace_back(
574         xds_client_->GetOrCreateXdsChannelLocked(*xds_servers[i], "fallback"));
575     for (const auto& type_resource : authority_state.resource_map) {
576       for (const auto& key_state : type_resource.second) {
577         authority_state.xds_channels.back()->SubscribeLocked(
578             type_resource.first, {authority, key_state.first});
579       }
580     }
581     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
582       gpr_log(GPR_INFO,
583               "[xds_client %p] authority %s: added fallback server %s (%s)",
584               xds_client_.get(), authority.c_str(),
585               xds_servers[i]->server_uri().c_str(),
586               authority_state.xds_channels.back()->status().ToString().c_str());
587     }
588     if (authority_state.xds_channels.back()->status().ok()) return true;
589   }
590   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
591     gpr_log(GPR_INFO, "[xds_client %p] authority %s: No fallback server",
592             xds_client_.get(), authority.c_str());
593   }
594   return false;
595 }
596 
SetHealthyLocked()597 void XdsClient::XdsChannel::SetHealthyLocked() {
598   status_ = absl::OkStatus();
599   // Make this channel active iff:
600   // 1. Channel is on the list of authority channels
601   // 2. Channel is not the last channel on the list (i.e. not the active
602   // channel)
603   for (auto& authority : xds_client_->authority_state_map_) {
604     auto& channels = authority.second.xds_channels;
605     // Skip if channel is active.
606     if (channels.back() == this) continue;
607     auto channel_it = std::find(channels.begin(), channels.end(), this);
608     // Skip if this is not on the list
609     if (channel_it != channels.end()) {
610       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
611         gpr_log(GPR_INFO, "[xds_client %p] authority %s: Falling forward to %s",
612                 xds_client_.get(), authority.first.c_str(),
613                 server_.server_uri().c_str());
614       }
615       // Lower priority channels are no longer needed, connection is back!
616       channels.erase(channel_it + 1, channels.end());
617     }
618   }
619 }
620 
OnConnectivityFailure(absl::Status status)621 void XdsClient::XdsChannel::OnConnectivityFailure(absl::Status status) {
622   {
623     MutexLock lock(&xds_client_->mu_);
624     SetChannelStatusLocked(std::move(status));
625   }
626   xds_client_->work_serializer_.DrainQueue();
627 }
628 
SetChannelStatusLocked(absl::Status status)629 void XdsClient::XdsChannel::SetChannelStatusLocked(absl::Status status) {
630   if (shutting_down_) return;
631   status = absl::Status(status.code(), absl::StrCat("xDS channel for server ",
632                                                     server_.server_uri(), ": ",
633                                                     status.message()));
634   gpr_log(GPR_INFO, "[xds_client %p] %s", xds_client(),
635           status.ToString().c_str());
636   // If the node ID is set, append that to the status message that we send to
637   // the watchers, so that it will appear in log messages visible to users.
638   const auto* node = xds_client_->bootstrap_->node();
639   if (node != nullptr) {
640     status = absl::Status(
641         status.code(),
642         absl::StrCat(status.message(),
643                      " (node ID:", xds_client_->bootstrap_->node()->id(), ")"));
644   }
645   // Save status in channel, so that we can immediately generate an
646   // error for any new watchers that may be started.
647   status_ = status;
648   // Find all watchers for this channel.
649   std::set<RefCountedPtr<ResourceWatcherInterface>> watchers;
650   for (auto& a : xds_client_->authority_state_map_) {  // authority
651     if (a.second.xds_channels.empty() || a.second.xds_channels.back() != this ||
652         MaybeFallbackLocked(a.first, a.second)) {
653       continue;
654     }
655     for (const auto& t : a.second.resource_map) {  // type
656       for (const auto& r : t.second) {             // resource id
657         for (const auto& w : r.second.watchers) {  // watchers
658           watchers.insert(w.second);
659         }
660       }
661     }
662   }
663   if (!watchers.empty()) {
664     // Enqueue notification for the watchers.
665     xds_client_->work_serializer_.Schedule(
666         [watchers = std::move(watchers), status = std::move(status)]()
667             ABSL_EXCLUSIVE_LOCKS_REQUIRED(xds_client_->work_serializer_) {
668               for (const auto& watcher : watchers) {
669                 watcher->OnError(status, ReadDelayHandle::NoWait());
670               }
671             },
672         DEBUG_LOCATION);
673   }
674 }
675 
676 //
677 // XdsClient::XdsChannel::RetryableCall<>
678 //
679 
680 template <typename T>
RetryableCall(WeakRefCountedPtr<XdsChannel> xds_channel)681 XdsClient::XdsChannel::RetryableCall<T>::RetryableCall(
682     WeakRefCountedPtr<XdsChannel> xds_channel)
683     : xds_channel_(std::move(xds_channel)),
684       backoff_(BackOff::Options()
685                    .set_initial_backoff(Duration::Seconds(
686                        GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS))
687                    .set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER)
688                    .set_jitter(GRPC_XDS_RECONNECT_JITTER)
689                    .set_max_backoff(Duration::Seconds(
690                        GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS))) {
691   StartNewCallLocked();
692 }
693 
694 template <typename T>
Orphan()695 void XdsClient::XdsChannel::RetryableCall<T>::Orphan() {
696   shutting_down_ = true;
697   call_.reset();
698   if (timer_handle_.has_value()) {
699     xds_channel()->xds_client()->engine()->Cancel(*timer_handle_);
700     timer_handle_.reset();
701   }
702   this->Unref(DEBUG_LOCATION, "RetryableCall+orphaned");
703 }
704 
705 template <typename T>
OnCallFinishedLocked()706 void XdsClient::XdsChannel::RetryableCall<T>::OnCallFinishedLocked() {
707   // If we saw a response on the current stream, reset backoff.
708   if (call_->seen_response()) backoff_.Reset();
709   call_.reset();
710   // Start retry timer.
711   StartRetryTimerLocked();
712 }
713 
714 template <typename T>
StartNewCallLocked()715 void XdsClient::XdsChannel::RetryableCall<T>::StartNewCallLocked() {
716   if (shutting_down_) return;
717   GPR_ASSERT(xds_channel_->transport_ != nullptr);
718   GPR_ASSERT(call_ == nullptr);
719   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
720     gpr_log(GPR_INFO,
721             "[xds_client %p] xds server %s: start new call from retryable "
722             "call %p",
723             xds_channel()->xds_client(),
724             xds_channel()->server_.server_uri().c_str(), this);
725   }
726   call_ = MakeOrphanable<T>(
727       this->Ref(DEBUG_LOCATION, "RetryableCall+start_new_call"));
728 }
729 
730 template <typename T>
StartRetryTimerLocked()731 void XdsClient::XdsChannel::RetryableCall<T>::StartRetryTimerLocked() {
732   if (shutting_down_) return;
733   const Timestamp next_attempt_time = backoff_.NextAttemptTime();
734   const Duration timeout =
735       std::max(next_attempt_time - Timestamp::Now(), Duration::Zero());
736   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
737     gpr_log(GPR_INFO,
738             "[xds_client %p] xds server %s: call attempt failed; "
739             "retry timer will fire in %" PRId64 "ms.",
740             xds_channel()->xds_client(),
741             xds_channel()->server_.server_uri().c_str(), timeout.millis());
742   }
743   timer_handle_ = xds_channel()->xds_client()->engine()->RunAfter(
744       timeout,
745       [self = this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start")]() {
746         ApplicationCallbackExecCtx callback_exec_ctx;
747         ExecCtx exec_ctx;
748         self->OnRetryTimer();
749       });
750 }
751 
752 template <typename T>
OnRetryTimer()753 void XdsClient::XdsChannel::RetryableCall<T>::OnRetryTimer() {
754   MutexLock lock(&xds_channel_->xds_client()->mu_);
755   if (timer_handle_.has_value()) {
756     timer_handle_.reset();
757     if (shutting_down_) return;
758     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
759       gpr_log(GPR_INFO,
760               "[xds_client %p] xds server %s: retry timer fired (retryable "
761               "call: %p)",
762               xds_channel()->xds_client(),
763               xds_channel()->server_.server_uri().c_str(), this);
764     }
765     StartNewCallLocked();
766   }
767 }
768 
769 //
770 // XdsClient::XdsChannel::AdsCall::AdsReadDelayHandle
771 //
772 
773 class XdsClient::XdsChannel::AdsCall::AdsReadDelayHandle final
774     : public XdsClient::ReadDelayHandle {
775  public:
AdsReadDelayHandle(RefCountedPtr<AdsCall> ads_call)776   explicit AdsReadDelayHandle(RefCountedPtr<AdsCall> ads_call)
777       : ads_call_(std::move(ads_call)) {}
778 
~AdsReadDelayHandle()779   ~AdsReadDelayHandle() override {
780     MutexLock lock(&ads_call_->xds_client()->mu_);
781     auto call = ads_call_->streaming_call_.get();
782     if (call != nullptr) call->StartRecvMessage();
783   }
784 
785  private:
786   RefCountedPtr<AdsCall> ads_call_;
787 };
788 
789 //
790 // XdsClient::XdsChannel::AdsCall::AdsResponseParser
791 //
792 
793 absl::Status
ProcessAdsResponseFields(AdsResponseFields fields)794 XdsClient::XdsChannel::AdsCall::AdsResponseParser::ProcessAdsResponseFields(
795     AdsResponseFields fields) {
796   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
797     gpr_log(
798         GPR_INFO,
799         "[xds_client %p] xds server %s: received ADS response: type_url=%s, "
800         "version=%s, nonce=%s, num_resources=%" PRIuPTR,
801         ads_call_->xds_client(),
802         ads_call_->xds_channel()->server_.server_uri().c_str(),
803         fields.type_url.c_str(), fields.version.c_str(), fields.nonce.c_str(),
804         fields.num_resources);
805   }
806   result_.type =
807       ads_call_->xds_client()->GetResourceTypeLocked(fields.type_url);
808   if (result_.type == nullptr) {
809     return absl::InvalidArgumentError(
810         absl::StrCat("unknown resource type ", fields.type_url));
811   }
812   result_.type_url = std::move(fields.type_url);
813   result_.version = std::move(fields.version);
814   result_.nonce = std::move(fields.nonce);
815   result_.read_delay_handle =
816       MakeRefCounted<AdsReadDelayHandle>(ads_call_->Ref());
817   return absl::OkStatus();
818 }
819 
820 namespace {
821 
822 // Build a resource metadata struct for ADS result accepting methods and CSDS.
CreateResourceMetadataAcked(std::string serialized_proto,std::string version,Timestamp update_time)823 XdsApi::ResourceMetadata CreateResourceMetadataAcked(
824     std::string serialized_proto, std::string version, Timestamp update_time) {
825   XdsApi::ResourceMetadata resource_metadata;
826   resource_metadata.serialized_proto = std::move(serialized_proto);
827   resource_metadata.update_time = update_time;
828   resource_metadata.version = std::move(version);
829   resource_metadata.client_status = XdsApi::ResourceMetadata::ACKED;
830   return resource_metadata;
831 }
832 
833 // Update resource_metadata for NACK.
UpdateResourceMetadataNacked(const std::string & version,const std::string & details,Timestamp update_time,XdsApi::ResourceMetadata * resource_metadata)834 void UpdateResourceMetadataNacked(const std::string& version,
835                                   const std::string& details,
836                                   Timestamp update_time,
837                                   XdsApi::ResourceMetadata* resource_metadata) {
838   resource_metadata->client_status = XdsApi::ResourceMetadata::NACKED;
839   resource_metadata->failed_version = version;
840   resource_metadata->failed_details = details;
841   resource_metadata->failed_update_time = update_time;
842 }
843 
844 }  // namespace
845 
ParseResource(upb_Arena * arena,size_t idx,absl::string_view type_url,absl::string_view resource_name,absl::string_view serialized_resource)846 void XdsClient::XdsChannel::AdsCall::AdsResponseParser::ParseResource(
847     upb_Arena* arena, size_t idx, absl::string_view type_url,
848     absl::string_view resource_name, absl::string_view serialized_resource) {
849   std::string error_prefix = absl::StrCat(
850       "resource index ", idx, ": ",
851       resource_name.empty() ? "" : absl::StrCat(resource_name, ": "));
852   // Check the type_url of the resource.
853   if (result_.type_url != type_url) {
854     result_.errors.emplace_back(
855         absl::StrCat(error_prefix, "incorrect resource type \"", type_url,
856                      "\" (should be \"", result_.type_url, "\")"));
857     ++result_.num_invalid_resources;
858     return;
859   }
860   // Parse the resource.
861   XdsResourceType::DecodeContext context = {
862       xds_client(), ads_call_->xds_channel()->server_, &grpc_xds_client_trace,
863       xds_client()->def_pool_.ptr(), arena};
864   XdsResourceType::DecodeResult decode_result =
865       result_.type->Decode(context, serialized_resource);
866   // If we didn't already have the resource name from the Resource
867   // wrapper, try to get it from the decoding result.
868   if (resource_name.empty()) {
869     if (decode_result.name.has_value()) {
870       resource_name = *decode_result.name;
871       error_prefix =
872           absl::StrCat("resource index ", idx, ": ", resource_name, ": ");
873     } else {
874       // We don't have any way of determining the resource name, so
875       // there's nothing more we can do here.
876       result_.errors.emplace_back(absl::StrCat(
877           error_prefix, decode_result.resource.status().ToString()));
878       ++result_.num_invalid_resources;
879       return;
880     }
881   }
882   // If decoding failed, make sure we include the error in the NACK.
883   const absl::Status& decode_status = decode_result.resource.status();
884   if (!decode_status.ok()) {
885     result_.errors.emplace_back(
886         absl::StrCat(error_prefix, decode_status.ToString()));
887   }
888   // Check the resource name.
889   auto parsed_resource_name =
890       xds_client()->ParseXdsResourceName(resource_name, result_.type);
891   if (!parsed_resource_name.ok()) {
892     result_.errors.emplace_back(
893         absl::StrCat(error_prefix, "Cannot parse xDS resource name"));
894     ++result_.num_invalid_resources;
895     return;
896   }
897   // Cancel resource-does-not-exist timer, if needed.
898   auto timer_it = ads_call_->state_map_.find(result_.type);
899   if (timer_it != ads_call_->state_map_.end()) {
900     auto it = timer_it->second.subscribed_resources.find(
901         parsed_resource_name->authority);
902     if (it != timer_it->second.subscribed_resources.end()) {
903       auto res_it = it->second.find(parsed_resource_name->key);
904       if (res_it != it->second.end()) {
905         res_it->second->MarkSeen();
906       }
907     }
908   }
909   // Lookup the authority in the cache.
910   auto authority_it =
911       xds_client()->authority_state_map_.find(parsed_resource_name->authority);
912   if (authority_it == xds_client()->authority_state_map_.end()) {
913     return;  // Skip resource -- we don't have a subscription for it.
914   }
915   // Found authority, so look up type.
916   AuthorityState& authority_state = authority_it->second;
917   auto type_it = authority_state.resource_map.find(result_.type);
918   if (type_it == authority_state.resource_map.end()) {
919     return;  // Skip resource -- we don't have a subscription for it.
920   }
921   auto& type_map = type_it->second;
922   // Found type, so look up resource key.
923   auto it = type_map.find(parsed_resource_name->key);
924   if (it == type_map.end()) {
925     return;  // Skip resource -- we don't have a subscription for it.
926   }
927   ResourceState& resource_state = it->second;
928   // If needed, record that we've seen this resource.
929   if (result_.type->AllResourcesRequiredInSotW()) {
930     result_.resources_seen[parsed_resource_name->authority].insert(
931         parsed_resource_name->key);
932   }
933   // If we previously ignored the resource's deletion, log that we're
934   // now re-adding it.
935   if (resource_state.ignored_deletion) {
936     gpr_log(GPR_INFO,
937             "[xds_client %p] xds server %s: server returned new version of "
938             "resource for which we previously ignored a deletion: type %s "
939             "name %s",
940             xds_client(),
941             ads_call_->xds_channel()->server_.server_uri().c_str(),
942             std::string(type_url).c_str(), std::string(resource_name).c_str());
943     resource_state.ignored_deletion = false;
944   }
945   // Update resource state based on whether the resource is valid.
946   if (!decode_status.ok()) {
947     xds_client()->NotifyWatchersOnErrorLocked(
948         resource_state.watchers,
949         absl::UnavailableError(
950             absl::StrCat("invalid resource: ", decode_status.ToString())),
951         result_.read_delay_handle);
952     UpdateResourceMetadataNacked(result_.version, decode_status.ToString(),
953                                  update_time_, &resource_state.meta);
954     ++result_.num_invalid_resources;
955     return;
956   }
957   // Resource is valid.
958   ++result_.num_valid_resources;
959   // If it didn't change, ignore it.
960   if (resource_state.resource != nullptr &&
961       result_.type->ResourcesEqual(resource_state.resource.get(),
962                                    decode_result.resource->get())) {
963     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
964       gpr_log(GPR_INFO,
965               "[xds_client %p] %s resource %s identical to current, ignoring.",
966               xds_client(), result_.type_url.c_str(),
967               std::string(resource_name).c_str());
968     }
969     return;
970   }
971   // Update the resource state.
972   resource_state.resource = std::move(*decode_result.resource);
973   resource_state.meta = CreateResourceMetadataAcked(
974       std::string(serialized_resource), result_.version, update_time_);
975   // Notify watchers.
976   auto& watchers_list = resource_state.watchers;
977   xds_client()->work_serializer_.Schedule(
978       [watchers_list, value = resource_state.resource,
979        read_delay_handle = result_.read_delay_handle]()
980           ABSL_EXCLUSIVE_LOCKS_REQUIRED(&xds_client()->work_serializer_) {
981             for (const auto& p : watchers_list) {
982               p.first->OnGenericResourceChanged(value, read_delay_handle);
983             }
984           },
985       DEBUG_LOCATION);
986 }
987 
988 void XdsClient::XdsChannel::AdsCall::AdsResponseParser::
ResourceWrapperParsingFailed(size_t idx,absl::string_view message)989     ResourceWrapperParsingFailed(size_t idx, absl::string_view message) {
990   result_.errors.emplace_back(
991       absl::StrCat("resource index ", idx, ": ", message));
992   ++result_.num_invalid_resources;
993 }
994 
995 //
996 // XdsClient::XdsChannel::AdsCall
997 //
998 
AdsCall(RefCountedPtr<RetryableCall<AdsCall>> retryable_call)999 XdsClient::XdsChannel::AdsCall::AdsCall(
1000     RefCountedPtr<RetryableCall<AdsCall>> retryable_call)
1001     : InternallyRefCounted<AdsCall>(
1002           GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace) ? "AdsCall"
1003                                                                   : nullptr),
1004       retryable_call_(std::move(retryable_call)) {
1005   GPR_ASSERT(xds_client() != nullptr);
1006   // Init the ADS call.
1007   const char* method =
1008       "/envoy.service.discovery.v3.AggregatedDiscoveryService/"
1009       "StreamAggregatedResources";
1010   streaming_call_ = xds_channel()->transport_->CreateStreamingCall(
1011       method, std::make_unique<StreamEventHandler>(
1012                   // Passing the initial ref here.  This ref will go away when
1013                   // the StreamEventHandler is destroyed.
1014                   RefCountedPtr<AdsCall>(this)));
1015   GPR_ASSERT(streaming_call_ != nullptr);
1016   // Start the call.
1017   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1018     gpr_log(GPR_INFO,
1019             "[xds_client %p] xds server %s: starting ADS call "
1020             "(ads_call: %p, streaming_call: %p)",
1021             xds_client(), xds_channel()->server_.server_uri().c_str(), this,
1022             streaming_call_.get());
1023   }
1024   // If this is a reconnect, add any necessary subscriptions from what's
1025   // already in the cache.
1026   for (auto& a : xds_client()->authority_state_map_) {
1027     const std::string& authority = a.first;
1028     auto it = std::find(a.second.xds_channels.begin(),
1029                         a.second.xds_channels.end(), xds_channel());
1030     // Skip authorities that are not using this xDS channel. The channel can be
1031     // anywhere in the list.
1032     if (it == a.second.xds_channels.end()) continue;
1033     for (const auto& t : a.second.resource_map) {
1034       const XdsResourceType* type = t.first;
1035       for (const auto& r : t.second) {
1036         const XdsResourceKey& resource_key = r.first;
1037         SubscribeLocked(type, {authority, resource_key}, /*delay_send=*/true);
1038       }
1039     }
1040   }
1041   // Send initial message if we added any subscriptions above.
1042   for (const auto& p : state_map_) {
1043     SendMessageLocked(p.first);
1044   }
1045   streaming_call_->StartRecvMessage();
1046 }
1047 
Orphan()1048 void XdsClient::XdsChannel::AdsCall::Orphan() {
1049   state_map_.clear();
1050   // Note that the initial ref is held by the StreamEventHandler, which
1051   // will be destroyed when streaming_call_ is destroyed, which may not happen
1052   // here, since there may be other refs held to streaming_call_ by internal
1053   // callbacks.
1054   streaming_call_.reset();
1055 }
1056 
SendMessageLocked(const XdsResourceType * type)1057 void XdsClient::XdsChannel::AdsCall::SendMessageLocked(
1058     const XdsResourceType* type)
1059     ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
1060   // Buffer message sending if an existing message is in flight.
1061   if (send_message_pending_ != nullptr) {
1062     buffered_requests_.insert(type);
1063     return;
1064   }
1065   auto& state = state_map_[type];
1066   std::string serialized_message = xds_client()->api_.CreateAdsRequest(
1067       type->type_url(), xds_channel()->resource_type_version_map_[type],
1068       state.nonce, ResourceNamesForRequest(type), state.status,
1069       !sent_initial_message_);
1070   sent_initial_message_ = true;
1071   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1072     gpr_log(GPR_INFO,
1073             "[xds_client %p] xds server %s: sending ADS request: type=%s "
1074             "version=%s nonce=%s error=%s",
1075             xds_client(), xds_channel()->server_.server_uri().c_str(),
1076             std::string(type->type_url()).c_str(),
1077             xds_channel()->resource_type_version_map_[type].c_str(),
1078             state.nonce.c_str(), state.status.ToString().c_str());
1079   }
1080   state.status = absl::OkStatus();
1081   streaming_call_->SendMessage(std::move(serialized_message));
1082   send_message_pending_ = type;
1083 }
1084 
SubscribeLocked(const XdsResourceType * type,const XdsResourceName & name,bool delay_send)1085 void XdsClient::XdsChannel::AdsCall::SubscribeLocked(
1086     const XdsResourceType* type, const XdsResourceName& name, bool delay_send) {
1087   auto& state = state_map_[type].subscribed_resources[name.authority][name.key];
1088   if (state == nullptr) {
1089     state = MakeOrphanable<ResourceTimer>(type, name);
1090     if (!delay_send) SendMessageLocked(type);
1091   }
1092 }
1093 
UnsubscribeLocked(const XdsResourceType * type,const XdsResourceName & name,bool delay_unsubscription)1094 void XdsClient::XdsChannel::AdsCall::UnsubscribeLocked(
1095     const XdsResourceType* type, const XdsResourceName& name,
1096     bool delay_unsubscription) {
1097   auto& type_state_map = state_map_[type];
1098   auto& authority_map = type_state_map.subscribed_resources[name.authority];
1099   authority_map.erase(name.key);
1100   if (authority_map.empty()) {
1101     type_state_map.subscribed_resources.erase(name.authority);
1102   }
1103   // Don't need to send unsubscription message if this was the last
1104   // resource we were subscribed to, since we'll be closing the stream
1105   // immediately in that case.
1106   if (!delay_unsubscription && HasSubscribedResources()) {
1107     SendMessageLocked(type);
1108   }
1109 }
1110 
HasSubscribedResources() const1111 bool XdsClient::XdsChannel::AdsCall::HasSubscribedResources() const {
1112   for (const auto& p : state_map_) {
1113     if (!p.second.subscribed_resources.empty()) return true;
1114   }
1115   return false;
1116 }
1117 
OnRequestSent(bool ok)1118 void XdsClient::XdsChannel::AdsCall::OnRequestSent(bool ok) {
1119   MutexLock lock(&xds_client()->mu_);
1120   // For each resource that was in the message we just sent, start the
1121   // resource timer if needed.
1122   if (ok) {
1123     auto& resource_type_state = state_map_[send_message_pending_];
1124     for (const auto& p : resource_type_state.subscribed_resources) {
1125       for (auto& q : p.second) {
1126         q.second->MaybeMarkSubscriptionSendComplete(
1127             Ref(DEBUG_LOCATION, "ResourceTimer"));
1128       }
1129     }
1130   }
1131   send_message_pending_ = nullptr;
1132   if (ok && IsCurrentCallOnChannel()) {
1133     // Continue to send another pending message if any.
1134     // TODO(roth): The current code to handle buffered messages has the
1135     // advantage of sending only the most recent list of resource names for
1136     // each resource type (no matter how many times that resource type has
1137     // been requested to send while the current message sending is still
1138     // pending). But its disadvantage is that we send the requests in fixed
1139     // order of resource types. We need to fix this if we are seeing some
1140     // resource type(s) starved due to frequent requests of other resource
1141     // type(s).
1142     auto it = buffered_requests_.begin();
1143     if (it != buffered_requests_.end()) {
1144       SendMessageLocked(*it);
1145       buffered_requests_.erase(it);
1146     }
1147   }
1148 }
1149 
OnRecvMessage(absl::string_view payload)1150 void XdsClient::XdsChannel::AdsCall::OnRecvMessage(absl::string_view payload) {
1151   // Needs to be destroyed after the mutex is released.
1152   RefCountedPtr<ReadDelayHandle> read_delay_handle;
1153   {
1154     MutexLock lock(&xds_client()->mu_);
1155     if (!IsCurrentCallOnChannel()) return;
1156     // Parse and validate the response.
1157     AdsResponseParser parser(this);
1158     absl::Status status = xds_client()->api_.ParseAdsResponse(payload, &parser);
1159     // This includes a handle that will trigger an ADS read.
1160     AdsResponseParser::Result result = parser.TakeResult();
1161     read_delay_handle = std::move(result.read_delay_handle);
1162     if (!status.ok()) {
1163       // Ignore unparsable response.
1164       gpr_log(GPR_ERROR,
1165               "[xds_client %p] xds server %s: error parsing ADS response (%s) "
1166               "-- ignoring",
1167               xds_client(), xds_channel()->server_.server_uri().c_str(),
1168               status.ToString().c_str());
1169     } else {
1170       seen_response_ = true;
1171       xds_channel()->SetHealthyLocked();
1172       // Update nonce.
1173       auto& state = state_map_[result.type];
1174       state.nonce = result.nonce;
1175       // If we got an error, set state.status so that we'll NACK the update.
1176       if (!result.errors.empty()) {
1177         state.status = absl::UnavailableError(
1178             absl::StrCat("xDS response validation errors: [",
1179                          absl::StrJoin(result.errors, "; "), "]"));
1180         gpr_log(GPR_ERROR,
1181                 "[xds_client %p] xds server %s: ADS response invalid for "
1182                 "resource "
1183                 "type %s version %s, will NACK: nonce=%s status=%s",
1184                 xds_client(), xds_channel()->server_.server_uri().c_str(),
1185                 result.type_url.c_str(), result.version.c_str(),
1186                 state.nonce.c_str(), state.status.ToString().c_str());
1187       }
1188       // Delete resources not seen in update if needed.
1189       if (result.type->AllResourcesRequiredInSotW()) {
1190         for (auto& a : xds_client()->authority_state_map_) {
1191           const std::string& authority = a.first;
1192           AuthorityState& authority_state = a.second;
1193           // Skip authorities that are not using this xDS channel.
1194           if (authority_state.xds_channels.back() != xds_channel()) {
1195             continue;
1196           }
1197           auto seen_authority_it = result.resources_seen.find(authority);
1198           // Find this resource type.
1199           auto type_it = authority_state.resource_map.find(result.type);
1200           if (type_it == authority_state.resource_map.end()) continue;
1201           // Iterate over resource ids.
1202           for (auto& r : type_it->second) {
1203             const XdsResourceKey& resource_key = r.first;
1204             ResourceState& resource_state = r.second;
1205             if (seen_authority_it == result.resources_seen.end() ||
1206                 seen_authority_it->second.find(resource_key) ==
1207                     seen_authority_it->second.end()) {
1208               // If the resource was newly requested but has not yet been
1209               // received, we don't want to generate an error for the
1210               // watchers, because this ADS response may be in reaction to an
1211               // earlier request that did not yet request the new resource, so
1212               // its absence from the response does not necessarily indicate
1213               // that the resource does not exist.  For that case, we rely on
1214               // the request timeout instead.
1215               if (resource_state.resource == nullptr) continue;
1216               if (xds_channel()->server_.IgnoreResourceDeletion()) {
1217                 if (!resource_state.ignored_deletion) {
1218                   gpr_log(GPR_ERROR,
1219                           "[xds_client %p] xds server %s: ignoring deletion "
1220                           "for resource type %s name %s",
1221                           xds_client(),
1222                           xds_channel()->server_.server_uri().c_str(),
1223                           result.type_url.c_str(),
1224                           XdsClient::ConstructFullXdsResourceName(
1225                               authority, result.type_url.c_str(), resource_key)
1226                               .c_str());
1227                   resource_state.ignored_deletion = true;
1228                 }
1229               } else {
1230                 resource_state.resource.reset();
1231                 resource_state.meta.client_status =
1232                     XdsApi::ResourceMetadata::DOES_NOT_EXIST;
1233                 xds_client()->NotifyWatchersOnResourceDoesNotExist(
1234                     resource_state.watchers, read_delay_handle);
1235               }
1236             }
1237           }
1238         }
1239       }
1240       // If we had valid resources or the update was empty, update the version.
1241       if (result.num_valid_resources > 0 || result.errors.empty()) {
1242         xds_channel()->resource_type_version_map_[result.type] =
1243             std::move(result.version);
1244       }
1245       // Send ACK or NACK.
1246       SendMessageLocked(result.type);
1247     }
1248     // Update metrics.
1249     if (xds_client()->metrics_reporter_ != nullptr) {
1250       xds_client()->metrics_reporter_->ReportResourceUpdates(
1251           xds_channel()->server_.server_uri(), result.type_url,
1252           result.num_valid_resources, result.num_invalid_resources);
1253     }
1254   }
1255   xds_client()->work_serializer_.DrainQueue();
1256 }
1257 
OnStatusReceived(absl::Status status)1258 void XdsClient::XdsChannel::AdsCall::OnStatusReceived(absl::Status status) {
1259   {
1260     MutexLock lock(&xds_client()->mu_);
1261     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1262       gpr_log(GPR_INFO,
1263               "[xds_client %p] xds server %s: ADS call status received "
1264               "(xds_channel=%p, ads_call=%p, streaming_call=%p): %s",
1265               xds_client(), xds_channel()->server_.server_uri().c_str(),
1266               xds_channel(), this, streaming_call_.get(),
1267               status.ToString().c_str());
1268     }
1269     // Cancel any does-not-exist timers that may be pending.
1270     for (const auto& p : state_map_) {
1271       for (const auto& q : p.second.subscribed_resources) {
1272         for (auto& r : q.second) {
1273           r.second->MaybeCancelTimer();
1274         }
1275       }
1276     }
1277     // Ignore status from a stale call.
1278     if (IsCurrentCallOnChannel()) {
1279       // Try to restart the call.
1280       retryable_call_->OnCallFinishedLocked();
1281       // If we didn't receive a response on the stream, report the
1282       // stream failure as a connectivity failure, which will report the
1283       // error to all watchers of resources on this channel.
1284       if (!seen_response_) {
1285         xds_channel()->SetChannelStatusLocked(absl::UnavailableError(
1286             absl::StrCat("xDS call failed with no responses received; status: ",
1287                          status.ToString())));
1288       }
1289     }
1290   }
1291   xds_client()->work_serializer_.DrainQueue();
1292 }
1293 
IsCurrentCallOnChannel() const1294 bool XdsClient::XdsChannel::AdsCall::IsCurrentCallOnChannel() const {
1295   // If the retryable ADS call is null (which only happens when the xds
1296   // channel is shutting down), all the ADS calls are stale.
1297   if (xds_channel()->ads_call_ == nullptr) return false;
1298   return this == xds_channel()->ads_call_->call();
1299 }
1300 
1301 std::vector<std::string>
ResourceNamesForRequest(const XdsResourceType * type)1302 XdsClient::XdsChannel::AdsCall::ResourceNamesForRequest(
1303     const XdsResourceType* type) {
1304   std::vector<std::string> resource_names;
1305   auto it = state_map_.find(type);
1306   if (it != state_map_.end()) {
1307     for (auto& a : it->second.subscribed_resources) {
1308       const std::string& authority = a.first;
1309       for (auto& p : a.second) {
1310         const XdsResourceKey& resource_key = p.first;
1311         resource_names.emplace_back(XdsClient::ConstructFullXdsResourceName(
1312             authority, type->type_url(), resource_key));
1313         OrphanablePtr<ResourceTimer>& resource_timer = p.second;
1314         resource_timer->MarkSubscriptionSendStarted();
1315       }
1316     }
1317   }
1318   return resource_names;
1319 }
1320 
1321 //
1322 // XdsClient::XdsChannel::LrsCall::Timer
1323 //
1324 
Orphan()1325 void XdsClient::XdsChannel::LrsCall::Timer::Orphan() {
1326   if (timer_handle_.has_value()) {
1327     xds_client()->engine()->Cancel(*timer_handle_);
1328     timer_handle_.reset();
1329   }
1330   Unref(DEBUG_LOCATION, "Orphan");
1331 }
1332 
ScheduleNextReportLocked()1333 void XdsClient::XdsChannel::LrsCall::Timer::ScheduleNextReportLocked() {
1334   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1335     gpr_log(GPR_INFO,
1336             "[xds_client %p] xds server %s: scheduling next load report in %s",
1337             xds_client(),
1338             lrs_call_->xds_channel()->server_.server_uri().c_str(),
1339             lrs_call_->load_reporting_interval_.ToString().c_str());
1340   }
1341   timer_handle_ = xds_client()->engine()->RunAfter(
1342       lrs_call_->load_reporting_interval_,
1343       [self = Ref(DEBUG_LOCATION, "timer")]() {
1344         ApplicationCallbackExecCtx callback_exec_ctx;
1345         ExecCtx exec_ctx;
1346         self->OnNextReportTimer();
1347       });
1348 }
1349 
OnNextReportTimer()1350 void XdsClient::XdsChannel::LrsCall::Timer::OnNextReportTimer() {
1351   MutexLock lock(&xds_client()->mu_);
1352   timer_handle_.reset();
1353   if (IsCurrentTimerOnCall()) lrs_call_->SendReportLocked();
1354 }
1355 
1356 //
1357 // XdsClient::XdsChannel::LrsCall
1358 //
1359 
LrsCall(RefCountedPtr<RetryableCall<LrsCall>> retryable_call)1360 XdsClient::XdsChannel::LrsCall::LrsCall(
1361     RefCountedPtr<RetryableCall<LrsCall>> retryable_call)
1362     : InternallyRefCounted<LrsCall>(
1363           GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace) ? "LrsCall"
1364                                                                   : nullptr),
1365       retryable_call_(std::move(retryable_call)) {
1366   // Init the LRS call. Note that the call will progress every time there's
1367   // activity in xds_client()->interested_parties_, which is comprised of
1368   // the polling entities from client_channel.
1369   GPR_ASSERT(xds_client() != nullptr);
1370   const char* method =
1371       "/envoy.service.load_stats.v3.LoadReportingService/StreamLoadStats";
1372   streaming_call_ = xds_channel()->transport_->CreateStreamingCall(
1373       method, std::make_unique<StreamEventHandler>(
1374                   // Passing the initial ref here.  This ref will go away when
1375                   // the StreamEventHandler is destroyed.
1376                   RefCountedPtr<LrsCall>(this)));
1377   GPR_ASSERT(streaming_call_ != nullptr);
1378   // Start the call.
1379   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1380     gpr_log(GPR_INFO,
1381             "[xds_client %p] xds server %s: starting LRS call (lrs_call=%p, "
1382             "streaming_call=%p)",
1383             xds_client(), xds_channel()->server_.server_uri().c_str(), this,
1384             streaming_call_.get());
1385   }
1386   // Send the initial request.
1387   std::string serialized_payload = xds_client()->api_.CreateLrsInitialRequest();
1388   SendMessageLocked(std::move(serialized_payload));
1389   // Read initial response.
1390   streaming_call_->StartRecvMessage();
1391 }
1392 
Orphan()1393 void XdsClient::XdsChannel::LrsCall::Orphan() {
1394   timer_.reset();
1395   // Note that the initial ref is held by the StreamEventHandler, which
1396   // will be destroyed when streaming_call_ is destroyed, which may not happen
1397   // here, since there may be other refs held to streaming_call_ by internal
1398   // callbacks.
1399   streaming_call_.reset();
1400 }
1401 
MaybeScheduleNextReportLocked()1402 void XdsClient::XdsChannel::LrsCall::MaybeScheduleNextReportLocked() {
1403   // If there are no more registered stats to report, cancel the call.
1404   auto it = xds_client()->xds_load_report_server_map_.find(
1405       xds_channel()->server_.Key());
1406   if (it == xds_client()->xds_load_report_server_map_.end() ||
1407       it->second.load_report_map.empty()) {
1408     it->second.xds_channel->StopLrsCallLocked();
1409     return;
1410   }
1411   // Don't start if the previous send_message op hasn't completed yet.
1412   // If this happens, we'll be called again from OnRequestSent().
1413   if (send_message_pending_) return;
1414   // Don't start if no LRS response has arrived.
1415   if (!seen_response()) return;
1416   // If there is no timer, create one.
1417   // This happens on the initial response and whenever the interval changes.
1418   if (timer_ == nullptr) {
1419     timer_ = MakeOrphanable<Timer>(Ref(DEBUG_LOCATION, "LRS timer"));
1420   }
1421   // Schedule the next load report.
1422   timer_->ScheduleNextReportLocked();
1423 }
1424 
1425 namespace {
1426 
LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap & snapshot)1427 bool LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap& snapshot) {
1428   for (const auto& p : snapshot) {
1429     const XdsApi::ClusterLoadReport& cluster_snapshot = p.second;
1430     if (!cluster_snapshot.dropped_requests.IsZero()) return false;
1431     for (const auto& q : cluster_snapshot.locality_stats) {
1432       const XdsClusterLocalityStats::Snapshot& locality_snapshot = q.second;
1433       if (!locality_snapshot.IsZero()) return false;
1434     }
1435   }
1436   return true;
1437 }
1438 
1439 }  // namespace
1440 
SendReportLocked()1441 void XdsClient::XdsChannel::LrsCall::SendReportLocked() {
1442   // Construct snapshot from all reported stats.
1443   XdsApi::ClusterLoadReportMap snapshot =
1444       xds_client()->BuildLoadReportSnapshotLocked(
1445           xds_channel()->server_, send_all_clusters_, cluster_names_);
1446   // Skip client load report if the counters were all zero in the last
1447   // report and they are still zero in this one.
1448   const bool old_val = last_report_counters_were_zero_;
1449   last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot);
1450   if (old_val && last_report_counters_were_zero_) {
1451     MaybeScheduleNextReportLocked();
1452     return;
1453   }
1454   // Send a request that contains the snapshot.
1455   std::string serialized_payload =
1456       xds_client()->api_.CreateLrsRequest(std::move(snapshot));
1457   SendMessageLocked(std::move(serialized_payload));
1458 }
1459 
SendMessageLocked(std::string payload)1460 void XdsClient::XdsChannel::LrsCall::SendMessageLocked(std::string payload) {
1461   send_message_pending_ = true;
1462   streaming_call_->SendMessage(std::move(payload));
1463 }
1464 
OnRequestSent()1465 void XdsClient::XdsChannel::LrsCall::OnRequestSent() {
1466   MutexLock lock(&xds_client()->mu_);
1467   send_message_pending_ = false;
1468   if (IsCurrentCallOnChannel()) MaybeScheduleNextReportLocked();
1469 }
1470 
OnRecvMessage(absl::string_view payload)1471 void XdsClient::XdsChannel::LrsCall::OnRecvMessage(absl::string_view payload) {
1472   MutexLock lock(&xds_client()->mu_);
1473   // If we're no longer the current call, ignore the result.
1474   if (!IsCurrentCallOnChannel()) return;
1475   // Start recv after any code branch
1476   auto cleanup = absl::MakeCleanup(
1477       [call = streaming_call_.get()]() { call->StartRecvMessage(); });
1478   // Parse the response.
1479   bool send_all_clusters = false;
1480   std::set<std::string> new_cluster_names;
1481   Duration new_load_reporting_interval;
1482   absl::Status status = xds_client()->api_.ParseLrsResponse(
1483       payload, &send_all_clusters, &new_cluster_names,
1484       &new_load_reporting_interval);
1485   if (!status.ok()) {
1486     gpr_log(GPR_ERROR,
1487             "[xds_client %p] xds server %s: LRS response parsing failed: %s",
1488             xds_client(), xds_channel()->server_.server_uri().c_str(),
1489             status.ToString().c_str());
1490     return;
1491   }
1492   seen_response_ = true;
1493   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1494     gpr_log(
1495         GPR_INFO,
1496         "[xds_client %p] xds server %s: LRS response received, %" PRIuPTR
1497         " cluster names, send_all_clusters=%d, load_report_interval=%" PRId64
1498         "ms",
1499         xds_client(), xds_channel()->server_.server_uri().c_str(),
1500         new_cluster_names.size(), send_all_clusters,
1501         new_load_reporting_interval.millis());
1502     size_t i = 0;
1503     for (const auto& name : new_cluster_names) {
1504       gpr_log(GPR_INFO, "[xds_client %p] cluster_name %" PRIuPTR ": %s",
1505               xds_client(), i++, name.c_str());
1506     }
1507   }
1508   if (new_load_reporting_interval <
1509       Duration::Milliseconds(GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS)) {
1510     new_load_reporting_interval =
1511         Duration::Milliseconds(GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS);
1512     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1513       gpr_log(GPR_INFO,
1514               "[xds_client %p] xds server %s: increased load_report_interval "
1515               "to minimum value %dms",
1516               xds_client(), xds_channel()->server_.server_uri().c_str(),
1517               GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS);
1518     }
1519   }
1520   // Ignore identical update.
1521   if (send_all_clusters == send_all_clusters_ &&
1522       cluster_names_ == new_cluster_names &&
1523       load_reporting_interval_ == new_load_reporting_interval) {
1524     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1525       gpr_log(GPR_INFO,
1526               "[xds_client %p] xds server %s: incoming LRS response identical "
1527               "to current, ignoring.",
1528               xds_client(), xds_channel()->server_.server_uri().c_str());
1529     }
1530     return;
1531   }
1532   // If the interval has changed, we'll need to restart the timer below.
1533   const bool restart_timer =
1534       load_reporting_interval_ != new_load_reporting_interval;
1535   // Record the new config.
1536   send_all_clusters_ = send_all_clusters;
1537   cluster_names_ = std::move(new_cluster_names);
1538   load_reporting_interval_ = new_load_reporting_interval;
1539   // Restart timer if needed.
1540   if (restart_timer) {
1541     timer_.reset();
1542     MaybeScheduleNextReportLocked();
1543   }
1544 }
1545 
OnStatusReceived(absl::Status status)1546 void XdsClient::XdsChannel::LrsCall::OnStatusReceived(absl::Status status) {
1547   MutexLock lock(&xds_client()->mu_);
1548   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1549     gpr_log(GPR_INFO,
1550             "[xds_client %p] xds server %s: LRS call status received "
1551             "(xds_channel=%p, lrs_call=%p, streaming_call=%p): %s",
1552             xds_client(), xds_channel()->server_.server_uri().c_str(),
1553             xds_channel(), this, streaming_call_.get(),
1554             status.ToString().c_str());
1555   }
1556   // Ignore status from a stale call.
1557   if (IsCurrentCallOnChannel()) {
1558     // Try to restart the call.
1559     retryable_call_->OnCallFinishedLocked();
1560   }
1561 }
1562 
IsCurrentCallOnChannel() const1563 bool XdsClient::XdsChannel::LrsCall::IsCurrentCallOnChannel() const {
1564   // If the retryable LRS call is null (which only happens when the xds
1565   // channel is shutting down), all the LRS calls are stale.
1566   if (xds_channel()->lrs_call_ == nullptr) return false;
1567   return this == xds_channel()->lrs_call_->call();
1568 }
1569 
1570 //
1571 // XdsClient
1572 //
1573 
1574 constexpr absl::string_view XdsClient::kOldStyleAuthority;
1575 
XdsClient(std::unique_ptr<XdsBootstrap> bootstrap,OrphanablePtr<XdsTransportFactory> transport_factory,std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine,std::unique_ptr<XdsMetricsReporter> metrics_reporter,std::string user_agent_name,std::string user_agent_version,Duration resource_request_timeout)1576 XdsClient::XdsClient(
1577     std::unique_ptr<XdsBootstrap> bootstrap,
1578     OrphanablePtr<XdsTransportFactory> transport_factory,
1579     std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine,
1580     std::unique_ptr<XdsMetricsReporter> metrics_reporter,
1581     std::string user_agent_name, std::string user_agent_version,
1582     Duration resource_request_timeout)
1583     : DualRefCounted<XdsClient>(
1584           GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace) ? "XdsClient"
1585                                                                   : nullptr),
1586       bootstrap_(std::move(bootstrap)),
1587       transport_factory_(std::move(transport_factory)),
1588       request_timeout_(resource_request_timeout),
1589       xds_federation_enabled_(XdsFederationEnabled()),
1590       api_(this, &grpc_xds_client_trace, bootstrap_->node(), &def_pool_,
1591            std::move(user_agent_name), std::move(user_agent_version)),
1592       work_serializer_(engine),
1593       engine_(std::move(engine)),
1594       metrics_reporter_(std::move(metrics_reporter)) {
1595   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1596     gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this);
1597   }
1598   GPR_ASSERT(bootstrap_ != nullptr);
1599   if (bootstrap_->node() != nullptr) {
1600     gpr_log(GPR_INFO, "[xds_client %p] xDS node ID: %s", this,
1601             bootstrap_->node()->id().c_str());
1602   }
1603 }
1604 
~XdsClient()1605 XdsClient::~XdsClient() {
1606   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1607     gpr_log(GPR_INFO, "[xds_client %p] destroying xds client", this);
1608   }
1609 }
1610 
Orphaned()1611 void XdsClient::Orphaned() {
1612   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1613     gpr_log(GPR_INFO, "[xds_client %p] shutting down xds client", this);
1614   }
1615   MutexLock lock(&mu_);
1616   shutting_down_ = true;
1617   // Clear cache and any remaining watchers that may not have been cancelled.
1618   authority_state_map_.clear();
1619   invalid_watchers_.clear();
1620   // We may still be sending lingering queued load report data, so don't
1621   // just clear the load reporting map, but we do want to clear the refs
1622   // we're holding to the XdsChannel objects, to make sure that
1623   // everything shuts down properly.
1624   for (auto& p : xds_load_report_server_map_) {
1625     p.second.xds_channel.reset(DEBUG_LOCATION, "XdsClient::Orphan()");
1626   }
1627 }
1628 
GetOrCreateXdsChannelLocked(const XdsBootstrap::XdsServer & server,const char * reason)1629 RefCountedPtr<XdsClient::XdsChannel> XdsClient::GetOrCreateXdsChannelLocked(
1630     const XdsBootstrap::XdsServer& server, const char* reason) {
1631   std::string key = server.Key();
1632   auto it = xds_channel_map_.find(key);
1633   if (it != xds_channel_map_.end()) {
1634     return it->second->Ref(DEBUG_LOCATION, reason);
1635   }
1636   // Channel not found, so create a new one.
1637   auto xds_channel =
1638       MakeRefCounted<XdsChannel>(WeakRef(DEBUG_LOCATION, "XdsChannel"), server);
1639   xds_channel_map_[std::move(key)] = xds_channel.get();
1640   return xds_channel;
1641 }
1642 
HasUncachedResources(const AuthorityState & authority_state)1643 bool XdsClient::HasUncachedResources(const AuthorityState& authority_state) {
1644   for (const auto& type_resource : authority_state.resource_map) {
1645     for (const auto& key_state : type_resource.second) {
1646       if (key_state.second.meta.client_status ==
1647           XdsApi::ResourceMetadata::REQUESTED) {
1648         return true;
1649       }
1650     }
1651   }
1652   return false;
1653 }
1654 
WatchResource(const XdsResourceType * type,absl::string_view name,RefCountedPtr<ResourceWatcherInterface> watcher)1655 void XdsClient::WatchResource(const XdsResourceType* type,
1656                               absl::string_view name,
1657                               RefCountedPtr<ResourceWatcherInterface> watcher) {
1658   ResourceWatcherInterface* w = watcher.get();
1659   // Lambda for handling failure cases.
1660   auto fail = [&](absl::Status status) mutable {
1661     {
1662       MutexLock lock(&mu_);
1663       MaybeRegisterResourceTypeLocked(type);
1664       invalid_watchers_[w] = watcher;
1665     }
1666     work_serializer_.Run(
1667         [watcher = std::move(watcher), status = std::move(status)]()
1668             ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
1669               watcher->OnError(status, ReadDelayHandle::NoWait());
1670             },
1671         DEBUG_LOCATION);
1672   };
1673   auto resource_name = ParseXdsResourceName(name, type);
1674   if (!resource_name.ok()) {
1675     fail(absl::UnavailableError(
1676         absl::StrCat("Unable to parse resource name ", name)));
1677     return;
1678   }
1679   // Find server to use.
1680   std::vector<const XdsBootstrap::XdsServer*> xds_servers;
1681   if (resource_name->authority != kOldStyleAuthority) {
1682     auto* authority =
1683         bootstrap_->LookupAuthority(std::string(resource_name->authority));
1684     if (authority == nullptr) {
1685       fail(absl::UnavailableError(
1686           absl::StrCat("authority \"", resource_name->authority,
1687                        "\" not present in bootstrap config")));
1688       return;
1689     }
1690     xds_servers = authority->servers();
1691   }
1692   if (xds_servers.empty()) xds_servers = bootstrap_->servers();
1693   {
1694     MutexLock lock(&mu_);
1695     MaybeRegisterResourceTypeLocked(type);
1696 
1697     AuthorityState& authority_state =
1698         authority_state_map_[resource_name->authority];
1699     auto it_is_new = authority_state.resource_map[type].emplace(
1700         resource_name->key, ResourceState());
1701     bool first_watcher_for_resource = it_is_new.second;
1702     ResourceState& resource_state = it_is_new.first->second;
1703     resource_state.watchers[w] = watcher;
1704     if (first_watcher_for_resource) {
1705       // We try to add new channels in 2 cases:
1706       // - This is the first resource for this authority (i.e., the list
1707       //   of channels is empty).
1708       // - The last channel in the list is failing.  That failure may not
1709       //   have previously triggered fallback if there were no uncached
1710       //   resources, but we've just added a new uncached resource,
1711       //   so we need to trigger fallback now.
1712       //
1713       // Note that when we add a channel, it might already be failing
1714       // due to being used in a different authority.  So we keep going
1715       // until either we add one that isn't failing or we've added them all.
1716       if (authority_state.xds_channels.empty() ||
1717           !authority_state.xds_channels.back()->status().ok()) {
1718         for (size_t i = authority_state.xds_channels.size();
1719              i < xds_servers.size(); ++i) {
1720           authority_state.xds_channels.emplace_back(
1721               GetOrCreateXdsChannelLocked(*xds_servers[i], "start watch"));
1722           if (authority_state.xds_channels.back()->status().ok()) {
1723             break;
1724           }
1725         }
1726       }
1727       for (const auto& channel : authority_state.xds_channels) {
1728         channel->SubscribeLocked(type, *resource_name);
1729       }
1730     } else {
1731       // If we already have a cached value for the resource, notify the new
1732       // watcher immediately.
1733       if (resource_state.resource != nullptr) {
1734         if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1735           gpr_log(GPR_INFO,
1736                   "[xds_client %p] returning cached listener data for %s", this,
1737                   std::string(name).c_str());
1738         }
1739         work_serializer_.Schedule(
1740             [watcher, value = resource_state.resource]()
1741                 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
1742                   watcher->OnGenericResourceChanged(value,
1743                                                     ReadDelayHandle::NoWait());
1744                 },
1745             DEBUG_LOCATION);
1746       } else if (resource_state.meta.client_status ==
1747                  XdsApi::ResourceMetadata::DOES_NOT_EXIST) {
1748         if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1749           gpr_log(GPR_INFO,
1750                   "[xds_client %p] reporting cached does-not-exist for %s",
1751                   this, std::string(name).c_str());
1752         }
1753         work_serializer_.Schedule(
1754             [watcher]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
1755               watcher->OnResourceDoesNotExist(ReadDelayHandle::NoWait());
1756             },
1757             DEBUG_LOCATION);
1758       } else if (resource_state.meta.client_status ==
1759                  XdsApi::ResourceMetadata::NACKED) {
1760         if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1761           gpr_log(
1762               GPR_INFO,
1763               "[xds_client %p] reporting cached validation failure for %s: %s",
1764               this, std::string(name).c_str(),
1765               resource_state.meta.failed_details.c_str());
1766         }
1767         std::string details = resource_state.meta.failed_details;
1768         const auto* node = bootstrap_->node();
1769         if (node != nullptr) {
1770           absl::StrAppend(&details, " (node ID:", bootstrap_->node()->id(),
1771                           ")");
1772         }
1773         work_serializer_.Schedule(
1774             [watcher, details = std::move(details)]()
1775                 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
1776                   watcher->OnError(absl::UnavailableError(absl::StrCat(
1777                                        "invalid resource: ", details)),
1778                                    ReadDelayHandle::NoWait());
1779                 },
1780             DEBUG_LOCATION);
1781       }
1782     }
1783     absl::Status channel_status = authority_state.xds_channels.back()->status();
1784     if (!channel_status.ok()) {
1785       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1786         gpr_log(GPR_INFO,
1787                 "[xds_client %p] returning cached channel error for %s: %s",
1788                 this, std::string(name).c_str(),
1789                 channel_status.ToString().c_str());
1790       }
1791       work_serializer_.Schedule(
1792           [watcher = std::move(watcher), status = std::move(channel_status)]()
1793               ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) mutable {
1794                 watcher->OnError(std::move(status), ReadDelayHandle::NoWait());
1795               },
1796           DEBUG_LOCATION);
1797     }
1798   }
1799   work_serializer_.DrainQueue();
1800 }
1801 
CancelResourceWatch(const XdsResourceType * type,absl::string_view name,ResourceWatcherInterface * watcher,bool delay_unsubscription)1802 void XdsClient::CancelResourceWatch(const XdsResourceType* type,
1803                                     absl::string_view name,
1804                                     ResourceWatcherInterface* watcher,
1805                                     bool delay_unsubscription) {
1806   auto resource_name = ParseXdsResourceName(name, type);
1807   MutexLock lock(&mu_);
1808   // We cannot be sure whether the watcher is in invalid_watchers_ or in
1809   // authority_state_map_, so we check both, just to be safe.
1810   invalid_watchers_.erase(watcher);
1811   // Find authority.
1812   if (!resource_name.ok()) return;
1813   auto authority_it = authority_state_map_.find(resource_name->authority);
1814   if (authority_it == authority_state_map_.end()) return;
1815   AuthorityState& authority_state = authority_it->second;
1816   // Find type map.
1817   auto type_it = authority_state.resource_map.find(type);
1818   if (type_it == authority_state.resource_map.end()) return;
1819   auto& type_map = type_it->second;
1820   // Find resource key.
1821   auto resource_it = type_map.find(resource_name->key);
1822   if (resource_it == type_map.end()) return;
1823   ResourceState& resource_state = resource_it->second;
1824   // Remove watcher.
1825   resource_state.watchers.erase(watcher);
1826   // Clean up empty map entries, if any.
1827   if (resource_state.watchers.empty()) {
1828     if (resource_state.ignored_deletion) {
1829       gpr_log(GPR_INFO,
1830               "[xds_client %p] unsubscribing from a resource for which we "
1831               "previously ignored a deletion: type %s name %s",
1832               this, std::string(type->type_url()).c_str(),
1833               std::string(name).c_str());
1834     }
1835     for (const auto& xds_channel : authority_state.xds_channels) {
1836       xds_channel->UnsubscribeLocked(type, *resource_name,
1837                                      delay_unsubscription);
1838     }
1839     type_map.erase(resource_it);
1840     if (type_map.empty()) {
1841       authority_state.resource_map.erase(type_it);
1842       if (authority_state.resource_map.empty()) {
1843         authority_state.xds_channels.clear();
1844       }
1845     }
1846   }
1847 }
1848 
MaybeRegisterResourceTypeLocked(const XdsResourceType * resource_type)1849 void XdsClient::MaybeRegisterResourceTypeLocked(
1850     const XdsResourceType* resource_type) {
1851   auto it = resource_types_.find(resource_type->type_url());
1852   if (it != resource_types_.end()) {
1853     GPR_ASSERT(it->second == resource_type);
1854     return;
1855   }
1856   resource_types_.emplace(resource_type->type_url(), resource_type);
1857   resource_type->InitUpbSymtab(this, def_pool_.ptr());
1858 }
1859 
GetResourceTypeLocked(absl::string_view resource_type)1860 const XdsResourceType* XdsClient::GetResourceTypeLocked(
1861     absl::string_view resource_type) {
1862   auto it = resource_types_.find(resource_type);
1863   if (it != resource_types_.end()) return it->second;
1864   return nullptr;
1865 }
1866 
ParseXdsResourceName(absl::string_view name,const XdsResourceType * type)1867 absl::StatusOr<XdsClient::XdsResourceName> XdsClient::ParseXdsResourceName(
1868     absl::string_view name, const XdsResourceType* type) {
1869   // Old-style names use the empty string for authority.
1870   // authority is set to kOldStyleAuthority to indicate that it's an
1871   // old-style name.
1872   if (!xds_federation_enabled_ || !absl::StartsWith(name, "xdstp:")) {
1873     return XdsResourceName{std::string(kOldStyleAuthority),
1874                            {std::string(name), {}}};
1875   }
1876   // New style name.  Parse URI.
1877   auto uri = URI::Parse(name);
1878   if (!uri.ok()) return uri.status();
1879   // Split the resource type off of the path to get the id.
1880   std::pair<absl::string_view, absl::string_view> path_parts = absl::StrSplit(
1881       absl::StripPrefix(uri->path(), "/"), absl::MaxSplits('/', 1));
1882   if (type->type_url() != path_parts.first) {
1883     return absl::InvalidArgumentError(
1884         "xdstp URI path must indicate valid xDS resource type");
1885   }
1886   // Canonicalize order of query params.
1887   std::vector<URI::QueryParam> query_params;
1888   for (const auto& p : uri->query_parameter_map()) {
1889     query_params.emplace_back(
1890         URI::QueryParam{std::string(p.first), std::string(p.second)});
1891   }
1892   return XdsResourceName{
1893       uri->authority(),
1894       {std::string(path_parts.second), std::move(query_params)}};
1895 }
1896 
ConstructFullXdsResourceName(absl::string_view authority,absl::string_view resource_type,const XdsResourceKey & key)1897 std::string XdsClient::ConstructFullXdsResourceName(
1898     absl::string_view authority, absl::string_view resource_type,
1899     const XdsResourceKey& key) {
1900   if (authority != kOldStyleAuthority) {
1901     auto uri = URI::Create("xdstp", std::string(authority),
1902                            absl::StrCat("/", resource_type, "/", key.id),
1903                            key.query_params, /*fragment=*/"");
1904     GPR_ASSERT(uri.ok());
1905     return uri->ToString();
1906   }
1907   // Old-style name.
1908   return key.id;
1909 }
1910 
AddClusterDropStats(const XdsBootstrap::XdsServer & xds_server,absl::string_view cluster_name,absl::string_view eds_service_name)1911 RefCountedPtr<XdsClusterDropStats> XdsClient::AddClusterDropStats(
1912     const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
1913     absl::string_view eds_service_name) {
1914   auto key =
1915       std::make_pair(std::string(cluster_name), std::string(eds_service_name));
1916   RefCountedPtr<XdsClusterDropStats> cluster_drop_stats;
1917   {
1918     MutexLock lock(&mu_);
1919     // We jump through some hoops here to make sure that the
1920     // absl::string_views stored in the XdsClusterDropStats object point
1921     // to the strings in the xds_load_report_server_map_ keys, so that
1922     // they have the same lifetime.
1923     auto server_it = xds_load_report_server_map_
1924                          .emplace(xds_server.Key(), LoadReportServer())
1925                          .first;
1926     if (server_it->second.xds_channel == nullptr) {
1927       server_it->second.xds_channel = GetOrCreateXdsChannelLocked(
1928           xds_server, "load report map (drop stats)");
1929     }
1930     auto load_report_it = server_it->second.load_report_map
1931                               .emplace(std::move(key), LoadReportState())
1932                               .first;
1933     LoadReportState& load_report_state = load_report_it->second;
1934     if (load_report_state.drop_stats != nullptr) {
1935       cluster_drop_stats = load_report_state.drop_stats->RefIfNonZero();
1936     }
1937     if (cluster_drop_stats == nullptr) {
1938       if (load_report_state.drop_stats != nullptr) {
1939         load_report_state.deleted_drop_stats +=
1940             load_report_state.drop_stats->GetSnapshotAndReset();
1941       }
1942       cluster_drop_stats = MakeRefCounted<XdsClusterDropStats>(
1943           Ref(DEBUG_LOCATION, "DropStats"), server_it->first /*xds_server*/,
1944           load_report_it->first.first /*cluster_name*/,
1945           load_report_it->first.second /*eds_service_name*/);
1946       load_report_state.drop_stats = cluster_drop_stats.get();
1947     }
1948     server_it->second.xds_channel->MaybeStartLrsCall();
1949   }
1950   work_serializer_.DrainQueue();
1951   return cluster_drop_stats;
1952 }
1953 
RemoveClusterDropStats(absl::string_view xds_server_key,absl::string_view cluster_name,absl::string_view eds_service_name,XdsClusterDropStats * cluster_drop_stats)1954 void XdsClient::RemoveClusterDropStats(
1955     absl::string_view xds_server_key, absl::string_view cluster_name,
1956     absl::string_view eds_service_name,
1957     XdsClusterDropStats* cluster_drop_stats) {
1958   MutexLock lock(&mu_);
1959   auto server_it = xds_load_report_server_map_.find(xds_server_key);
1960   if (server_it == xds_load_report_server_map_.end()) return;
1961   auto load_report_it = server_it->second.load_report_map.find(
1962       std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
1963   if (load_report_it == server_it->second.load_report_map.end()) return;
1964   LoadReportState& load_report_state = load_report_it->second;
1965   if (load_report_state.drop_stats == cluster_drop_stats) {
1966     // Record final snapshot in deleted_drop_stats, which will be
1967     // added to the next load report.
1968     load_report_state.deleted_drop_stats +=
1969         load_report_state.drop_stats->GetSnapshotAndReset();
1970     load_report_state.drop_stats = nullptr;
1971   }
1972 }
1973 
AddClusterLocalityStats(const XdsBootstrap::XdsServer & xds_server,absl::string_view cluster_name,absl::string_view eds_service_name,RefCountedPtr<XdsLocalityName> locality)1974 RefCountedPtr<XdsClusterLocalityStats> XdsClient::AddClusterLocalityStats(
1975     const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
1976     absl::string_view eds_service_name,
1977     RefCountedPtr<XdsLocalityName> locality) {
1978   auto key =
1979       std::make_pair(std::string(cluster_name), std::string(eds_service_name));
1980   RefCountedPtr<XdsClusterLocalityStats> cluster_locality_stats;
1981   {
1982     MutexLock lock(&mu_);
1983     // We jump through some hoops here to make sure that the
1984     // absl::string_views stored in the XdsClusterDropStats object point
1985     // to the strings in the xds_load_report_server_map_ keys, so that
1986     // they have the same lifetime.
1987     auto server_it = xds_load_report_server_map_
1988                          .emplace(xds_server.Key(), LoadReportServer())
1989                          .first;
1990     if (server_it->second.xds_channel == nullptr) {
1991       server_it->second.xds_channel = GetOrCreateXdsChannelLocked(
1992           xds_server, "load report map (locality stats)");
1993     }
1994     auto load_report_it = server_it->second.load_report_map
1995                               .emplace(std::move(key), LoadReportState())
1996                               .first;
1997     LoadReportState& load_report_state = load_report_it->second;
1998     LoadReportState::LocalityState& locality_state =
1999         load_report_state.locality_stats[locality];
2000     if (locality_state.locality_stats != nullptr) {
2001       cluster_locality_stats = locality_state.locality_stats->RefIfNonZero();
2002     }
2003     if (cluster_locality_stats == nullptr) {
2004       if (locality_state.locality_stats != nullptr) {
2005         locality_state.deleted_locality_stats +=
2006             locality_state.locality_stats->GetSnapshotAndReset();
2007       }
2008       cluster_locality_stats = MakeRefCounted<XdsClusterLocalityStats>(
2009           Ref(DEBUG_LOCATION, "LocalityStats"), server_it->first /*xds_server*/,
2010           load_report_it->first.first /*cluster_name*/,
2011           load_report_it->first.second /*eds_service_name*/,
2012           std::move(locality));
2013       locality_state.locality_stats = cluster_locality_stats.get();
2014     }
2015     server_it->second.xds_channel->MaybeStartLrsCall();
2016   }
2017   work_serializer_.DrainQueue();
2018   return cluster_locality_stats;
2019 }
2020 
RemoveClusterLocalityStats(absl::string_view xds_server_key,absl::string_view cluster_name,absl::string_view eds_service_name,const RefCountedPtr<XdsLocalityName> & locality,XdsClusterLocalityStats * cluster_locality_stats)2021 void XdsClient::RemoveClusterLocalityStats(
2022     absl::string_view xds_server_key, absl::string_view cluster_name,
2023     absl::string_view eds_service_name,
2024     const RefCountedPtr<XdsLocalityName>& locality,
2025     XdsClusterLocalityStats* cluster_locality_stats) {
2026   MutexLock lock(&mu_);
2027   auto server_it = xds_load_report_server_map_.find(xds_server_key);
2028   if (server_it == xds_load_report_server_map_.end()) return;
2029   auto load_report_it = server_it->second.load_report_map.find(
2030       std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
2031   if (load_report_it == server_it->second.load_report_map.end()) return;
2032   LoadReportState& load_report_state = load_report_it->second;
2033   auto locality_it = load_report_state.locality_stats.find(locality);
2034   if (locality_it == load_report_state.locality_stats.end()) return;
2035   LoadReportState::LocalityState& locality_state = locality_it->second;
2036   if (locality_state.locality_stats == cluster_locality_stats) {
2037     // Record final snapshot in deleted_locality_stats, which will be
2038     // added to the next load report.
2039     locality_state.deleted_locality_stats +=
2040         locality_state.locality_stats->GetSnapshotAndReset();
2041     locality_state.locality_stats = nullptr;
2042   }
2043 }
2044 
ResetBackoff()2045 void XdsClient::ResetBackoff() {
2046   MutexLock lock(&mu_);
2047   for (auto& p : xds_channel_map_) {
2048     p.second->ResetBackoff();
2049   }
2050 }
2051 
NotifyWatchersOnErrorLocked(const std::map<ResourceWatcherInterface *,RefCountedPtr<ResourceWatcherInterface>> & watchers,absl::Status status,RefCountedPtr<ReadDelayHandle> read_delay_handle)2052 void XdsClient::NotifyWatchersOnErrorLocked(
2053     const std::map<ResourceWatcherInterface*,
2054                    RefCountedPtr<ResourceWatcherInterface>>& watchers,
2055     absl::Status status, RefCountedPtr<ReadDelayHandle> read_delay_handle) {
2056   const auto* node = bootstrap_->node();
2057   if (node != nullptr) {
2058     status = absl::Status(
2059         status.code(),
2060         absl::StrCat(status.message(), " (node ID:", node->id(), ")"));
2061   }
2062   work_serializer_.Schedule(
2063       [watchers, status = std::move(status),
2064        read_delay_handle = std::move(read_delay_handle)]()
2065           ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
2066             for (const auto& p : watchers) {
2067               p.first->OnError(status, read_delay_handle);
2068             }
2069           },
2070       DEBUG_LOCATION);
2071 }
2072 
NotifyWatchersOnResourceDoesNotExist(const std::map<ResourceWatcherInterface *,RefCountedPtr<ResourceWatcherInterface>> & watchers,RefCountedPtr<ReadDelayHandle> read_delay_handle)2073 void XdsClient::NotifyWatchersOnResourceDoesNotExist(
2074     const std::map<ResourceWatcherInterface*,
2075                    RefCountedPtr<ResourceWatcherInterface>>& watchers,
2076     RefCountedPtr<ReadDelayHandle> read_delay_handle) {
2077   work_serializer_.Schedule(
2078       [watchers, read_delay_handle = std::move(read_delay_handle)]()
2079           ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
2080             for (const auto& p : watchers) {
2081               p.first->OnResourceDoesNotExist(read_delay_handle);
2082             }
2083           },
2084       DEBUG_LOCATION);
2085 }
2086 
BuildLoadReportSnapshotLocked(const XdsBootstrap::XdsServer & xds_server,bool send_all_clusters,const std::set<std::string> & clusters)2087 XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked(
2088     const XdsBootstrap::XdsServer& xds_server, bool send_all_clusters,
2089     const std::set<std::string>& clusters) {
2090   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2091     gpr_log(GPR_INFO, "[xds_client %p] start building load report", this);
2092   }
2093   XdsApi::ClusterLoadReportMap snapshot_map;
2094   auto server_it = xds_load_report_server_map_.find(xds_server.Key());
2095   if (server_it == xds_load_report_server_map_.end()) return snapshot_map;
2096   auto& load_report_map = server_it->second.load_report_map;
2097   for (auto load_report_it = load_report_map.begin();
2098        load_report_it != load_report_map.end();) {
2099     // Cluster key is cluster and EDS service name.
2100     const auto& cluster_key = load_report_it->first;
2101     LoadReportState& load_report = load_report_it->second;
2102     // If the CDS response for a cluster indicates to use LRS but the
2103     // LRS server does not say that it wants reports for this cluster,
2104     // then we'll have stats objects here whose data we're not going to
2105     // include in the load report.  However, we still need to clear out
2106     // the data from the stats objects, so that if the LRS server starts
2107     // asking for the data in the future, we don't incorrectly include
2108     // data from previous reporting intervals in that future report.
2109     const bool record_stats =
2110         send_all_clusters || clusters.find(cluster_key.first) != clusters.end();
2111     XdsApi::ClusterLoadReport snapshot;
2112     // Aggregate drop stats.
2113     snapshot.dropped_requests = std::move(load_report.deleted_drop_stats);
2114     if (load_report.drop_stats != nullptr) {
2115       snapshot.dropped_requests +=
2116           load_report.drop_stats->GetSnapshotAndReset();
2117       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2118         gpr_log(GPR_INFO,
2119                 "[xds_client %p] cluster=%s eds_service_name=%s drop_stats=%p",
2120                 this, cluster_key.first.c_str(), cluster_key.second.c_str(),
2121                 load_report.drop_stats);
2122       }
2123     }
2124     // Aggregate locality stats.
2125     for (auto it = load_report.locality_stats.begin();
2126          it != load_report.locality_stats.end();) {
2127       const RefCountedPtr<XdsLocalityName>& locality_name = it->first;
2128       auto& locality_state = it->second;
2129       XdsClusterLocalityStats::Snapshot& locality_snapshot =
2130           snapshot.locality_stats[locality_name];
2131       locality_snapshot = std::move(locality_state.deleted_locality_stats);
2132       if (locality_state.locality_stats != nullptr) {
2133         locality_snapshot +=
2134             locality_state.locality_stats->GetSnapshotAndReset();
2135         if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2136           gpr_log(GPR_INFO,
2137                   "[xds_client %p] cluster=%s eds_service_name=%s "
2138                   "locality=%s locality_stats=%p",
2139                   this, cluster_key.first.c_str(), cluster_key.second.c_str(),
2140                   locality_name->human_readable_string().c_str(),
2141                   locality_state.locality_stats);
2142         }
2143       }
2144       // If the only thing left in this entry was final snapshots from
2145       // deleted locality stats objects, remove the entry.
2146       if (locality_state.locality_stats == nullptr) {
2147         it = load_report.locality_stats.erase(it);
2148       } else {
2149         ++it;
2150       }
2151     }
2152     // Compute load report interval.
2153     const Timestamp now = Timestamp::Now();
2154     snapshot.load_report_interval = now - load_report.last_report_time;
2155     load_report.last_report_time = now;
2156     // Record snapshot.
2157     if (record_stats) {
2158       snapshot_map[cluster_key] = std::move(snapshot);
2159     }
2160     // If the only thing left in this entry was final snapshots from
2161     // deleted stats objects, remove the entry.
2162     if (load_report.locality_stats.empty() &&
2163         load_report.drop_stats == nullptr) {
2164       load_report_it = load_report_map.erase(load_report_it);
2165     } else {
2166       ++load_report_it;
2167     }
2168   }
2169   return snapshot_map;
2170 }
2171 
2172 namespace {
2173 
EncodeTimestamp(Timestamp value,upb_Arena * arena)2174 google_protobuf_Timestamp* EncodeTimestamp(Timestamp value, upb_Arena* arena) {
2175   google_protobuf_Timestamp* timestamp = google_protobuf_Timestamp_new(arena);
2176   gpr_timespec timespec = value.as_timespec(GPR_CLOCK_REALTIME);
2177   google_protobuf_Timestamp_set_seconds(timestamp, timespec.tv_sec);
2178   google_protobuf_Timestamp_set_nanos(timestamp, timespec.tv_nsec);
2179   return timestamp;
2180 }
2181 
FillGenericXdsConfig(const XdsApi::ResourceMetadata & metadata,upb_StringView type_url,upb_StringView resource_name,upb_Arena * arena,envoy_service_status_v3_ClientConfig_GenericXdsConfig * entry)2182 void FillGenericXdsConfig(
2183     const XdsApi::ResourceMetadata& metadata, upb_StringView type_url,
2184     upb_StringView resource_name, upb_Arena* arena,
2185     envoy_service_status_v3_ClientConfig_GenericXdsConfig* entry) {
2186   envoy_service_status_v3_ClientConfig_GenericXdsConfig_set_type_url(entry,
2187                                                                      type_url);
2188   envoy_service_status_v3_ClientConfig_GenericXdsConfig_set_name(entry,
2189                                                                  resource_name);
2190   envoy_service_status_v3_ClientConfig_GenericXdsConfig_set_client_status(
2191       entry, metadata.client_status);
2192   if (!metadata.serialized_proto.empty()) {
2193     envoy_service_status_v3_ClientConfig_GenericXdsConfig_set_version_info(
2194         entry, StdStringToUpbString(metadata.version));
2195     envoy_service_status_v3_ClientConfig_GenericXdsConfig_set_last_updated(
2196         entry, EncodeTimestamp(metadata.update_time, arena));
2197     auto* any_field =
2198         envoy_service_status_v3_ClientConfig_GenericXdsConfig_mutable_xds_config(
2199             entry, arena);
2200     google_protobuf_Any_set_type_url(any_field, type_url);
2201     google_protobuf_Any_set_value(
2202         any_field, StdStringToUpbString(metadata.serialized_proto));
2203   }
2204   if (metadata.client_status == XdsApi::ResourceMetadata::NACKED) {
2205     auto* update_failure_state = envoy_admin_v3_UpdateFailureState_new(arena);
2206     envoy_admin_v3_UpdateFailureState_set_details(
2207         update_failure_state, StdStringToUpbString(metadata.failed_details));
2208     envoy_admin_v3_UpdateFailureState_set_version_info(
2209         update_failure_state, StdStringToUpbString(metadata.failed_version));
2210     envoy_admin_v3_UpdateFailureState_set_last_update_attempt(
2211         update_failure_state,
2212         EncodeTimestamp(metadata.failed_update_time, arena));
2213     envoy_service_status_v3_ClientConfig_GenericXdsConfig_set_error_state(
2214         entry, update_failure_state);
2215   }
2216 }
2217 
2218 }  // namespace
2219 
DumpClientConfig(std::set<std::string> * string_pool,upb_Arena * arena,envoy_service_status_v3_ClientConfig * client_config)2220 void XdsClient::DumpClientConfig(
2221     std::set<std::string>* string_pool, upb_Arena* arena,
2222     envoy_service_status_v3_ClientConfig* client_config) {
2223   // Assemble config dump messages
2224   // Fill-in the node information
2225   auto* node =
2226       envoy_service_status_v3_ClientConfig_mutable_node(client_config, arena);
2227   api_.PopulateNode(node, arena);
2228   // Dump each resource.
2229   for (const auto& a : authority_state_map_) {  // authority
2230     const std::string& authority = a.first;
2231     for (const auto& t : a.second.resource_map) {  // type
2232       const XdsResourceType* type = t.first;
2233       auto it =
2234           string_pool
2235               ->emplace(absl::StrCat("type.googleapis.com/", type->type_url()))
2236               .first;
2237       upb_StringView type_url = StdStringToUpbString(*it);
2238       for (const auto& r : t.second) {  // resource id
2239         auto it2 = string_pool
2240                        ->emplace(ConstructFullXdsResourceName(
2241                            authority, type->type_url(), r.first))
2242                        .first;
2243         upb_StringView resource_name = StdStringToUpbString(*it2);
2244         envoy_service_status_v3_ClientConfig_GenericXdsConfig* entry =
2245             envoy_service_status_v3_ClientConfig_add_generic_xds_configs(
2246                 client_config, arena);
2247         FillGenericXdsConfig(r.second.meta, type_url, resource_name, arena,
2248                              entry);
2249       }
2250     }
2251   }
2252 }
2253 
2254 namespace {
2255 
CacheStateForEntry(const XdsApi::ResourceMetadata & metadata,bool resource_cached)2256 absl::string_view CacheStateForEntry(const XdsApi::ResourceMetadata& metadata,
2257                                      bool resource_cached) {
2258   switch (metadata.client_status) {
2259     case XdsApi::ResourceMetadata::REQUESTED:
2260       return "requested";
2261     case XdsApi::ResourceMetadata::DOES_NOT_EXIST:
2262       return "does_not_exist";
2263     case XdsApi::ResourceMetadata::ACKED:
2264       return "acked";
2265     case XdsApi::ResourceMetadata::NACKED:
2266       return resource_cached ? "nacked_but_cached" : "nacked";
2267   }
2268   Crash("unknown resource state");
2269 }
2270 
2271 }  // namespace
2272 
ReportResourceCounts(absl::FunctionRef<void (const ResourceCountLabels &,uint64_t)> func)2273 void XdsClient::ReportResourceCounts(
2274     absl::FunctionRef<void(const ResourceCountLabels&, uint64_t)> func) {
2275   ResourceCountLabels labels;
2276   for (const auto& a : authority_state_map_) {  // authority
2277     labels.xds_authority = a.first;
2278     for (const auto& t : a.second.resource_map) {  // type
2279       labels.resource_type = t.first->type_url();
2280       // Count the number of entries in each state.
2281       std::map<absl::string_view, uint64_t> counts;
2282       for (const auto& r : t.second) {  // resource id
2283         absl::string_view cache_state =
2284             CacheStateForEntry(r.second.meta, r.second.resource != nullptr);
2285         ++counts[cache_state];
2286       }
2287       // Report the count for each state.
2288       for (const auto& c : counts) {
2289         labels.cache_state = c.first;
2290         func(labels, c.second);
2291       }
2292     }
2293   }
2294 }
2295 
ReportServerConnections(absl::FunctionRef<void (absl::string_view,bool)> func)2296 void XdsClient::ReportServerConnections(
2297     absl::FunctionRef<void(absl::string_view, bool)> func) {
2298   for (const auto& p : xds_channel_map_) {
2299     func(p.second->server_uri(), p.second->status().ok());
2300   }
2301 }
2302 
2303 }  // namespace grpc_core
2304