xref: /aosp_15_r20/external/grpc-grpc/src/core/ext/xds/xds_client.h (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 // Copyright 2019 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #ifndef GRPC_SRC_CORE_EXT_XDS_XDS_CLIENT_H
18 #define GRPC_SRC_CORE_EXT_XDS_XDS_CLIENT_H
19 
20 #include <grpc/support/port_platform.h>
21 
22 #include <map>
23 #include <memory>
24 #include <set>
25 #include <string>
26 #include <utility>
27 #include <vector>
28 
29 #include "absl/base/thread_annotations.h"
30 #include "absl/status/status.h"
31 #include "absl/status/statusor.h"
32 #include "absl/strings/string_view.h"
33 #include "upb/reflection/def.hpp"
34 
35 #include <grpc/event_engine/event_engine.h>
36 
37 #include "src/core/ext/xds/xds_api.h"
38 #include "src/core/ext/xds/xds_bootstrap.h"
39 #include "src/core/ext/xds/xds_client_stats.h"
40 #include "src/core/ext/xds/xds_metrics.h"
41 #include "src/core/ext/xds/xds_resource_type.h"
42 #include "src/core/ext/xds/xds_transport.h"
43 #include "src/core/lib/debug/trace.h"
44 #include "src/core/lib/gprpp/dual_ref_counted.h"
45 #include "src/core/lib/gprpp/orphanable.h"
46 #include "src/core/lib/gprpp/ref_counted.h"
47 #include "src/core/lib/gprpp/ref_counted_ptr.h"
48 #include "src/core/lib/gprpp/sync.h"
49 #include "src/core/lib/gprpp/time.h"
50 #include "src/core/lib/gprpp/work_serializer.h"
51 #include "src/core/lib/uri/uri_parser.h"
52 
53 namespace grpc_core {
54 
55 namespace testing {
56 class XdsClientTestPeer;
57 }
58 
59 extern TraceFlag grpc_xds_client_trace;
60 extern TraceFlag grpc_xds_client_refcount_trace;
61 
62 class XdsClient : public DualRefCounted<XdsClient> {
63  public:
64   // The authority reported for old-style (non-xdstp) resource names.
65   static constexpr absl::string_view kOldStyleAuthority = "#old";
66 
67   class ReadDelayHandle : public RefCounted<ReadDelayHandle> {
68    public:
NoWait()69     static RefCountedPtr<ReadDelayHandle> NoWait() { return nullptr; }
70   };
71 
72   // Resource watcher interface.  Implemented by callers.
73   // Note: Most callers will not use this API directly but rather via a
74   // resource-type-specific wrapper API provided by the relevant
75   // XdsResourceType implementation.
76   class ResourceWatcherInterface : public RefCounted<ResourceWatcherInterface> {
77    public:
78     virtual void OnGenericResourceChanged(
79         std::shared_ptr<const XdsResourceType::ResourceData> resource,
80         RefCountedPtr<ReadDelayHandle> read_delay_handle)
81         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
82     virtual void OnError(absl::Status status,
83                          RefCountedPtr<ReadDelayHandle> read_delay_handle)
84         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
85     virtual void OnResourceDoesNotExist(
86         RefCountedPtr<ReadDelayHandle> read_delay_handle)
87         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
88   };
89 
90   XdsClient(
91       std::unique_ptr<XdsBootstrap> bootstrap,
92       OrphanablePtr<XdsTransportFactory> transport_factory,
93       std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine,
94       std::unique_ptr<XdsMetricsReporter> metrics_reporter,
95       std::string user_agent_name, std::string user_agent_version,
96       Duration resource_request_timeout = Duration::Seconds(15));
97   ~XdsClient() override;
98 
bootstrap()99   const XdsBootstrap& bootstrap() const {
100     return *bootstrap_;  // ctor asserts that it is non-null
101   }
102 
transport_factory()103   XdsTransportFactory* transport_factory() const {
104     return transport_factory_.get();
105   }
106 
107   // Start and cancel watch for a resource.
108   //
109   // The XdsClient takes ownership of the watcher, but the caller may
110   // keep a raw pointer to the watcher, which may be used only for
111   // cancellation.  (Because the caller does not own the watcher, the
112   // pointer must not be used for any other purpose.)
113   // If the caller is going to start a new watch after cancelling the
114   // old one, it should set delay_unsubscription to true.
115   //
116   // The resource type object must be a global singleton, since the first
117   // time the XdsClient sees a particular resource type object, it will
118   // store the pointer to that object as the authoritative implementation for
119   // its type URLs.  The resource type object must outlive the XdsClient object,
120   // and it is illegal to start a subsequent watch for the same type URLs using
121   // a different resource type object.
122   //
123   // Note: Most callers will not use this API directly but rather via a
124   // resource-type-specific wrapper API provided by the relevant
125   // XdsResourceType implementation.
126   void WatchResource(const XdsResourceType* type, absl::string_view name,
127                      RefCountedPtr<ResourceWatcherInterface> watcher);
128   void CancelResourceWatch(const XdsResourceType* type,
129                            absl::string_view listener_name,
130                            ResourceWatcherInterface* watcher,
131                            bool delay_unsubscription = false);
132 
133   // Adds and removes drop stats for cluster_name and eds_service_name.
134   RefCountedPtr<XdsClusterDropStats> AddClusterDropStats(
135       const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
136       absl::string_view eds_service_name);
137   void RemoveClusterDropStats(absl::string_view xds_server,
138                               absl::string_view cluster_name,
139                               absl::string_view eds_service_name,
140                               XdsClusterDropStats* cluster_drop_stats);
141 
142   // Adds and removes locality stats for cluster_name and eds_service_name
143   // for the specified locality.
144   RefCountedPtr<XdsClusterLocalityStats> AddClusterLocalityStats(
145       const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
146       absl::string_view eds_service_name,
147       RefCountedPtr<XdsLocalityName> locality);
148   void RemoveClusterLocalityStats(
149       absl::string_view xds_server, absl::string_view cluster_name,
150       absl::string_view eds_service_name,
151       const RefCountedPtr<XdsLocalityName>& locality,
152       XdsClusterLocalityStats* cluster_locality_stats);
153 
154   // Resets connection backoff state.
155   void ResetBackoff();
156 
engine()157   grpc_event_engine::experimental::EventEngine* engine() {
158     return engine_.get();
159   }
160 
161  protected:
162   void Orphaned() override;
163 
mu()164   Mutex* mu() ABSL_LOCK_RETURNED(&mu_) { return &mu_; }
165 
166   // Dumps the active xDS config to the provided
167   // envoy.service.status.v3.ClientConfig message including the config status
168   // (e.g., CLIENT_REQUESTED, CLIENT_ACKED, CLIENT_NACKED).
169   void DumpClientConfig(std::set<std::string>* string_pool, upb_Arena* arena,
170                         envoy_service_status_v3_ClientConfig* client_config)
171       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
172 
173   // Invokes func once for each combination of labels to report the
174   // resource count for those labels.
175   struct ResourceCountLabels {
176     absl::string_view xds_authority;
177     absl::string_view resource_type;
178     absl::string_view cache_state;
179   };
180   void ReportResourceCounts(
181       absl::FunctionRef<void(const ResourceCountLabels&, uint64_t)> func)
182       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
183 
184   // Invokes func once for each xDS server to report whether the
185   // connection to that server is working.
186   void ReportServerConnections(
187       absl::FunctionRef<void(absl::string_view /*xds_server*/, bool)> func)
188       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
189 
190  private:
191   friend testing::XdsClientTestPeer;
192 
193   struct XdsResourceKey {
194     std::string id;
195     std::vector<URI::QueryParam> query_params;
196 
197     bool operator<(const XdsResourceKey& other) const {
198       int c = id.compare(other.id);
199       if (c != 0) return c < 0;
200       return query_params < other.query_params;
201     }
202   };
203 
204   struct AuthorityState;
205 
206   struct XdsResourceName {
207     std::string authority;
208     XdsResourceKey key;
209   };
210 
211   // Contains a channel to the xds server and all the data related to the
212   // channel.  Holds a ref to the xds client object.
213   class XdsChannel final : public DualRefCounted<XdsChannel> {
214    public:
215     template <typename T>
216     class RetryableCall;
217 
218     class AdsCall;
219     class LrsCall;
220 
221     XdsChannel(WeakRefCountedPtr<XdsClient> xds_client,
222                const XdsBootstrap::XdsServer& server);
223     ~XdsChannel() override;
224 
xds_client()225     XdsClient* xds_client() const { return xds_client_.get(); }
226     AdsCall* ads_call() const;
227     LrsCall* lrs_call() const;
228 
229     void ResetBackoff();
230 
231     void MaybeStartLrsCall();
232     void StopLrsCallLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
233 
234     // Returns non-OK if there has been an error since the last time the
235     // ADS stream saw a response.
status()236     const absl::Status& status() const { return status_; }
237 
238     void SubscribeLocked(const XdsResourceType* type,
239                          const XdsResourceName& name)
240         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
241     void UnsubscribeLocked(const XdsResourceType* type,
242                            const XdsResourceName& name,
243                            bool delay_unsubscription)
244         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
245 
server_uri()246     absl::string_view server_uri() const { return server_.server_uri(); }
247 
248    private:
249     // Attempts to find a suitable Xds fallback server. Returns true if
250     // a connection to a suitable server had been established.
251     bool MaybeFallbackLocked(const std::string& authority,
252                              XdsClient::AuthorityState& authority_state)
253         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
254     void SetHealthyLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
255     void Orphaned() override;
256 
257     void OnConnectivityFailure(absl::Status status);
258 
259     // Enqueues error notifications to watchers.  Caller must drain
260     // XdsClient::work_serializer_ after releasing the lock.
261     void SetChannelStatusLocked(absl::Status status)
262         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
263 
264     // The owning xds client.
265     WeakRefCountedPtr<XdsClient> xds_client_;
266 
267     const XdsBootstrap::XdsServer& server_;  // Owned by bootstrap.
268 
269     OrphanablePtr<XdsTransportFactory::XdsTransport> transport_;
270 
271     bool shutting_down_ = false;
272 
273     // The retryable ADS and LRS calls.
274     OrphanablePtr<RetryableCall<AdsCall>> ads_call_;
275     OrphanablePtr<RetryableCall<LrsCall>> lrs_call_;
276 
277     // Stores the most recent accepted resource version for each resource type.
278     std::map<const XdsResourceType*, std::string /*version*/>
279         resource_type_version_map_;
280 
281     absl::Status status_;
282   };
283 
284   struct ResourceState {
285     std::map<ResourceWatcherInterface*, RefCountedPtr<ResourceWatcherInterface>>
286         watchers;
287     // The latest data seen for the resource.
288     std::shared_ptr<const XdsResourceType::ResourceData> resource;
289     XdsApi::ResourceMetadata meta;
290     bool ignored_deletion = false;
291   };
292 
293   struct AuthorityState {
294     std::vector<RefCountedPtr<XdsChannel>> xds_channels;
295     std::map<const XdsResourceType*, std::map<XdsResourceKey, ResourceState>>
296         resource_map;
297   };
298 
299   struct LoadReportState {
300     struct LocalityState {
301       XdsClusterLocalityStats* locality_stats = nullptr;
302       XdsClusterLocalityStats::Snapshot deleted_locality_stats;
303     };
304 
305     XdsClusterDropStats* drop_stats = nullptr;
306     XdsClusterDropStats::Snapshot deleted_drop_stats;
307     std::map<RefCountedPtr<XdsLocalityName>, LocalityState,
308              XdsLocalityName::Less>
309         locality_stats;
310     Timestamp last_report_time = Timestamp::Now();
311   };
312 
313   // Load report data.
314   using LoadReportMap = std::map<
315       std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>,
316       LoadReportState>;
317 
318   struct LoadReportServer {
319     RefCountedPtr<XdsChannel> xds_channel;
320     LoadReportMap load_report_map;
321   };
322 
323   // Sends an error notification to a specific set of watchers.
324   void NotifyWatchersOnErrorLocked(
325       const std::map<ResourceWatcherInterface*,
326                      RefCountedPtr<ResourceWatcherInterface>>& watchers,
327       absl::Status status, RefCountedPtr<ReadDelayHandle> read_delay_handle);
328   // Sends a resource-does-not-exist notification to a specific set of watchers.
329   void NotifyWatchersOnResourceDoesNotExist(
330       const std::map<ResourceWatcherInterface*,
331                      RefCountedPtr<ResourceWatcherInterface>>& watchers,
332       RefCountedPtr<ReadDelayHandle> read_delay_handle);
333 
334   void MaybeRegisterResourceTypeLocked(const XdsResourceType* resource_type)
335       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
336 
337   // Gets the type for resource_type, or null if the type is unknown.
338   const XdsResourceType* GetResourceTypeLocked(absl::string_view resource_type)
339       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
340 
341   absl::StatusOr<XdsResourceName> ParseXdsResourceName(
342       absl::string_view name, const XdsResourceType* type);
343   static std::string ConstructFullXdsResourceName(
344       absl::string_view authority, absl::string_view resource_type,
345       const XdsResourceKey& key);
346 
347   XdsApi::ClusterLoadReportMap BuildLoadReportSnapshotLocked(
348       const XdsBootstrap::XdsServer& xds_server, bool send_all_clusters,
349       const std::set<std::string>& clusters) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
350   RefCountedPtr<XdsChannel> GetOrCreateXdsChannelLocked(
351       const XdsBootstrap::XdsServer& server, const char* reason)
352       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
353   bool HasUncachedResources(const AuthorityState& authority_state);
354 
355   std::unique_ptr<XdsBootstrap> bootstrap_;
356   OrphanablePtr<XdsTransportFactory> transport_factory_;
357   const Duration request_timeout_;
358   const bool xds_federation_enabled_;
359   XdsApi api_;
360   WorkSerializer work_serializer_;
361   std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_;
362   std::unique_ptr<XdsMetricsReporter> metrics_reporter_;
363 
364   Mutex mu_;
365 
366   // Stores resource type objects seen by type URL.
367   std::map<absl::string_view /*resource_type*/, const XdsResourceType*>
368       resource_types_ ABSL_GUARDED_BY(mu_);
369   upb::DefPool def_pool_ ABSL_GUARDED_BY(mu_);
370 
371   // Map of existing xDS server channels.
372   std::map<std::string /*XdsServer key*/, XdsChannel*> xds_channel_map_
373       ABSL_GUARDED_BY(mu_);
374 
375   std::map<std::string /*authority*/, AuthorityState> authority_state_map_
376       ABSL_GUARDED_BY(mu_);
377 
378   std::map<std::string /*XdsServer key*/, LoadReportServer, std::less<>>
379       xds_load_report_server_map_ ABSL_GUARDED_BY(mu_);
380 
381   // Stores started watchers whose resource name was not parsed successfully,
382   // waiting to be cancelled or reset in Orphan().
383   std::map<ResourceWatcherInterface*, RefCountedPtr<ResourceWatcherInterface>>
384       invalid_watchers_ ABSL_GUARDED_BY(mu_);
385 
386   bool shutting_down_ ABSL_GUARDED_BY(mu_) = false;
387 };
388 
389 }  // namespace grpc_core
390 
391 #endif  // GRPC_SRC_CORE_EXT_XDS_XDS_CLIENT_H
392