1 // 2 // Copyright 2019 gRPC authors. 3 // 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at 7 // 8 // http://www.apache.org/licenses/LICENSE-2.0 9 // 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 // 16 17 #ifndef GRPC_SRC_CORE_EXT_XDS_XDS_CLIENT_H 18 #define GRPC_SRC_CORE_EXT_XDS_XDS_CLIENT_H 19 20 #include <grpc/support/port_platform.h> 21 22 #include <map> 23 #include <memory> 24 #include <set> 25 #include <string> 26 #include <utility> 27 #include <vector> 28 29 #include "absl/base/thread_annotations.h" 30 #include "absl/status/status.h" 31 #include "absl/status/statusor.h" 32 #include "absl/strings/string_view.h" 33 #include "upb/reflection/def.hpp" 34 35 #include <grpc/event_engine/event_engine.h> 36 37 #include "src/core/ext/xds/xds_api.h" 38 #include "src/core/ext/xds/xds_bootstrap.h" 39 #include "src/core/ext/xds/xds_client_stats.h" 40 #include "src/core/ext/xds/xds_metrics.h" 41 #include "src/core/ext/xds/xds_resource_type.h" 42 #include "src/core/ext/xds/xds_transport.h" 43 #include "src/core/lib/debug/trace.h" 44 #include "src/core/lib/gprpp/dual_ref_counted.h" 45 #include "src/core/lib/gprpp/orphanable.h" 46 #include "src/core/lib/gprpp/ref_counted.h" 47 #include "src/core/lib/gprpp/ref_counted_ptr.h" 48 #include "src/core/lib/gprpp/sync.h" 49 #include "src/core/lib/gprpp/time.h" 50 #include "src/core/lib/gprpp/work_serializer.h" 51 #include "src/core/lib/uri/uri_parser.h" 52 53 namespace grpc_core { 54 55 namespace testing { 56 class XdsClientTestPeer; 57 } 58 59 extern TraceFlag grpc_xds_client_trace; 60 extern TraceFlag grpc_xds_client_refcount_trace; 61 62 class XdsClient : public DualRefCounted<XdsClient> { 63 public: 64 // The authority reported for old-style (non-xdstp) resource names. 65 static constexpr absl::string_view kOldStyleAuthority = "#old"; 66 67 class ReadDelayHandle : public RefCounted<ReadDelayHandle> { 68 public: NoWait()69 static RefCountedPtr<ReadDelayHandle> NoWait() { return nullptr; } 70 }; 71 72 // Resource watcher interface. Implemented by callers. 73 // Note: Most callers will not use this API directly but rather via a 74 // resource-type-specific wrapper API provided by the relevant 75 // XdsResourceType implementation. 76 class ResourceWatcherInterface : public RefCounted<ResourceWatcherInterface> { 77 public: 78 virtual void OnGenericResourceChanged( 79 std::shared_ptr<const XdsResourceType::ResourceData> resource, 80 RefCountedPtr<ReadDelayHandle> read_delay_handle) 81 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0; 82 virtual void OnError(absl::Status status, 83 RefCountedPtr<ReadDelayHandle> read_delay_handle) 84 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0; 85 virtual void OnResourceDoesNotExist( 86 RefCountedPtr<ReadDelayHandle> read_delay_handle) 87 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0; 88 }; 89 90 XdsClient( 91 std::unique_ptr<XdsBootstrap> bootstrap, 92 OrphanablePtr<XdsTransportFactory> transport_factory, 93 std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine, 94 std::unique_ptr<XdsMetricsReporter> metrics_reporter, 95 std::string user_agent_name, std::string user_agent_version, 96 Duration resource_request_timeout = Duration::Seconds(15)); 97 ~XdsClient() override; 98 bootstrap()99 const XdsBootstrap& bootstrap() const { 100 return *bootstrap_; // ctor asserts that it is non-null 101 } 102 transport_factory()103 XdsTransportFactory* transport_factory() const { 104 return transport_factory_.get(); 105 } 106 107 // Start and cancel watch for a resource. 108 // 109 // The XdsClient takes ownership of the watcher, but the caller may 110 // keep a raw pointer to the watcher, which may be used only for 111 // cancellation. (Because the caller does not own the watcher, the 112 // pointer must not be used for any other purpose.) 113 // If the caller is going to start a new watch after cancelling the 114 // old one, it should set delay_unsubscription to true. 115 // 116 // The resource type object must be a global singleton, since the first 117 // time the XdsClient sees a particular resource type object, it will 118 // store the pointer to that object as the authoritative implementation for 119 // its type URLs. The resource type object must outlive the XdsClient object, 120 // and it is illegal to start a subsequent watch for the same type URLs using 121 // a different resource type object. 122 // 123 // Note: Most callers will not use this API directly but rather via a 124 // resource-type-specific wrapper API provided by the relevant 125 // XdsResourceType implementation. 126 void WatchResource(const XdsResourceType* type, absl::string_view name, 127 RefCountedPtr<ResourceWatcherInterface> watcher); 128 void CancelResourceWatch(const XdsResourceType* type, 129 absl::string_view listener_name, 130 ResourceWatcherInterface* watcher, 131 bool delay_unsubscription = false); 132 133 // Adds and removes drop stats for cluster_name and eds_service_name. 134 RefCountedPtr<XdsClusterDropStats> AddClusterDropStats( 135 const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name, 136 absl::string_view eds_service_name); 137 void RemoveClusterDropStats(absl::string_view xds_server, 138 absl::string_view cluster_name, 139 absl::string_view eds_service_name, 140 XdsClusterDropStats* cluster_drop_stats); 141 142 // Adds and removes locality stats for cluster_name and eds_service_name 143 // for the specified locality. 144 RefCountedPtr<XdsClusterLocalityStats> AddClusterLocalityStats( 145 const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name, 146 absl::string_view eds_service_name, 147 RefCountedPtr<XdsLocalityName> locality); 148 void RemoveClusterLocalityStats( 149 absl::string_view xds_server, absl::string_view cluster_name, 150 absl::string_view eds_service_name, 151 const RefCountedPtr<XdsLocalityName>& locality, 152 XdsClusterLocalityStats* cluster_locality_stats); 153 154 // Resets connection backoff state. 155 void ResetBackoff(); 156 engine()157 grpc_event_engine::experimental::EventEngine* engine() { 158 return engine_.get(); 159 } 160 161 protected: 162 void Orphaned() override; 163 mu()164 Mutex* mu() ABSL_LOCK_RETURNED(&mu_) { return &mu_; } 165 166 // Dumps the active xDS config to the provided 167 // envoy.service.status.v3.ClientConfig message including the config status 168 // (e.g., CLIENT_REQUESTED, CLIENT_ACKED, CLIENT_NACKED). 169 void DumpClientConfig(std::set<std::string>* string_pool, upb_Arena* arena, 170 envoy_service_status_v3_ClientConfig* client_config) 171 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_); 172 173 // Invokes func once for each combination of labels to report the 174 // resource count for those labels. 175 struct ResourceCountLabels { 176 absl::string_view xds_authority; 177 absl::string_view resource_type; 178 absl::string_view cache_state; 179 }; 180 void ReportResourceCounts( 181 absl::FunctionRef<void(const ResourceCountLabels&, uint64_t)> func) 182 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_); 183 184 // Invokes func once for each xDS server to report whether the 185 // connection to that server is working. 186 void ReportServerConnections( 187 absl::FunctionRef<void(absl::string_view /*xds_server*/, bool)> func) 188 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_); 189 190 private: 191 friend testing::XdsClientTestPeer; 192 193 struct XdsResourceKey { 194 std::string id; 195 std::vector<URI::QueryParam> query_params; 196 197 bool operator<(const XdsResourceKey& other) const { 198 int c = id.compare(other.id); 199 if (c != 0) return c < 0; 200 return query_params < other.query_params; 201 } 202 }; 203 204 struct AuthorityState; 205 206 struct XdsResourceName { 207 std::string authority; 208 XdsResourceKey key; 209 }; 210 211 // Contains a channel to the xds server and all the data related to the 212 // channel. Holds a ref to the xds client object. 213 class XdsChannel final : public DualRefCounted<XdsChannel> { 214 public: 215 template <typename T> 216 class RetryableCall; 217 218 class AdsCall; 219 class LrsCall; 220 221 XdsChannel(WeakRefCountedPtr<XdsClient> xds_client, 222 const XdsBootstrap::XdsServer& server); 223 ~XdsChannel() override; 224 xds_client()225 XdsClient* xds_client() const { return xds_client_.get(); } 226 AdsCall* ads_call() const; 227 LrsCall* lrs_call() const; 228 229 void ResetBackoff(); 230 231 void MaybeStartLrsCall(); 232 void StopLrsCallLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); 233 234 // Returns non-OK if there has been an error since the last time the 235 // ADS stream saw a response. status()236 const absl::Status& status() const { return status_; } 237 238 void SubscribeLocked(const XdsResourceType* type, 239 const XdsResourceName& name) 240 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); 241 void UnsubscribeLocked(const XdsResourceType* type, 242 const XdsResourceName& name, 243 bool delay_unsubscription) 244 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); 245 server_uri()246 absl::string_view server_uri() const { return server_.server_uri(); } 247 248 private: 249 // Attempts to find a suitable Xds fallback server. Returns true if 250 // a connection to a suitable server had been established. 251 bool MaybeFallbackLocked(const std::string& authority, 252 XdsClient::AuthorityState& authority_state) 253 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); 254 void SetHealthyLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); 255 void Orphaned() override; 256 257 void OnConnectivityFailure(absl::Status status); 258 259 // Enqueues error notifications to watchers. Caller must drain 260 // XdsClient::work_serializer_ after releasing the lock. 261 void SetChannelStatusLocked(absl::Status status) 262 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); 263 264 // The owning xds client. 265 WeakRefCountedPtr<XdsClient> xds_client_; 266 267 const XdsBootstrap::XdsServer& server_; // Owned by bootstrap. 268 269 OrphanablePtr<XdsTransportFactory::XdsTransport> transport_; 270 271 bool shutting_down_ = false; 272 273 // The retryable ADS and LRS calls. 274 OrphanablePtr<RetryableCall<AdsCall>> ads_call_; 275 OrphanablePtr<RetryableCall<LrsCall>> lrs_call_; 276 277 // Stores the most recent accepted resource version for each resource type. 278 std::map<const XdsResourceType*, std::string /*version*/> 279 resource_type_version_map_; 280 281 absl::Status status_; 282 }; 283 284 struct ResourceState { 285 std::map<ResourceWatcherInterface*, RefCountedPtr<ResourceWatcherInterface>> 286 watchers; 287 // The latest data seen for the resource. 288 std::shared_ptr<const XdsResourceType::ResourceData> resource; 289 XdsApi::ResourceMetadata meta; 290 bool ignored_deletion = false; 291 }; 292 293 struct AuthorityState { 294 std::vector<RefCountedPtr<XdsChannel>> xds_channels; 295 std::map<const XdsResourceType*, std::map<XdsResourceKey, ResourceState>> 296 resource_map; 297 }; 298 299 struct LoadReportState { 300 struct LocalityState { 301 XdsClusterLocalityStats* locality_stats = nullptr; 302 XdsClusterLocalityStats::Snapshot deleted_locality_stats; 303 }; 304 305 XdsClusterDropStats* drop_stats = nullptr; 306 XdsClusterDropStats::Snapshot deleted_drop_stats; 307 std::map<RefCountedPtr<XdsLocalityName>, LocalityState, 308 XdsLocalityName::Less> 309 locality_stats; 310 Timestamp last_report_time = Timestamp::Now(); 311 }; 312 313 // Load report data. 314 using LoadReportMap = std::map< 315 std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>, 316 LoadReportState>; 317 318 struct LoadReportServer { 319 RefCountedPtr<XdsChannel> xds_channel; 320 LoadReportMap load_report_map; 321 }; 322 323 // Sends an error notification to a specific set of watchers. 324 void NotifyWatchersOnErrorLocked( 325 const std::map<ResourceWatcherInterface*, 326 RefCountedPtr<ResourceWatcherInterface>>& watchers, 327 absl::Status status, RefCountedPtr<ReadDelayHandle> read_delay_handle); 328 // Sends a resource-does-not-exist notification to a specific set of watchers. 329 void NotifyWatchersOnResourceDoesNotExist( 330 const std::map<ResourceWatcherInterface*, 331 RefCountedPtr<ResourceWatcherInterface>>& watchers, 332 RefCountedPtr<ReadDelayHandle> read_delay_handle); 333 334 void MaybeRegisterResourceTypeLocked(const XdsResourceType* resource_type) 335 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 336 337 // Gets the type for resource_type, or null if the type is unknown. 338 const XdsResourceType* GetResourceTypeLocked(absl::string_view resource_type) 339 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 340 341 absl::StatusOr<XdsResourceName> ParseXdsResourceName( 342 absl::string_view name, const XdsResourceType* type); 343 static std::string ConstructFullXdsResourceName( 344 absl::string_view authority, absl::string_view resource_type, 345 const XdsResourceKey& key); 346 347 XdsApi::ClusterLoadReportMap BuildLoadReportSnapshotLocked( 348 const XdsBootstrap::XdsServer& xds_server, bool send_all_clusters, 349 const std::set<std::string>& clusters) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 350 RefCountedPtr<XdsChannel> GetOrCreateXdsChannelLocked( 351 const XdsBootstrap::XdsServer& server, const char* reason) 352 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 353 bool HasUncachedResources(const AuthorityState& authority_state); 354 355 std::unique_ptr<XdsBootstrap> bootstrap_; 356 OrphanablePtr<XdsTransportFactory> transport_factory_; 357 const Duration request_timeout_; 358 const bool xds_federation_enabled_; 359 XdsApi api_; 360 WorkSerializer work_serializer_; 361 std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_; 362 std::unique_ptr<XdsMetricsReporter> metrics_reporter_; 363 364 Mutex mu_; 365 366 // Stores resource type objects seen by type URL. 367 std::map<absl::string_view /*resource_type*/, const XdsResourceType*> 368 resource_types_ ABSL_GUARDED_BY(mu_); 369 upb::DefPool def_pool_ ABSL_GUARDED_BY(mu_); 370 371 // Map of existing xDS server channels. 372 std::map<std::string /*XdsServer key*/, XdsChannel*> xds_channel_map_ 373 ABSL_GUARDED_BY(mu_); 374 375 std::map<std::string /*authority*/, AuthorityState> authority_state_map_ 376 ABSL_GUARDED_BY(mu_); 377 378 std::map<std::string /*XdsServer key*/, LoadReportServer, std::less<>> 379 xds_load_report_server_map_ ABSL_GUARDED_BY(mu_); 380 381 // Stores started watchers whose resource name was not parsed successfully, 382 // waiting to be cancelled or reset in Orphan(). 383 std::map<ResourceWatcherInterface*, RefCountedPtr<ResourceWatcherInterface>> 384 invalid_watchers_ ABSL_GUARDED_BY(mu_); 385 386 bool shutting_down_ ABSL_GUARDED_BY(mu_) = false; 387 }; 388 389 } // namespace grpc_core 390 391 #endif // GRPC_SRC_CORE_EXT_XDS_XDS_CLIENT_H 392