1 //
2 // Copyright 2019 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include "src/core/resolver/xds/xds_dependency_manager.h"
20
21 #include <set>
22
23 #include "absl/strings/str_join.h"
24
25 #include "src/core/ext/xds/xds_routing.h"
26 #include "src/core/lib/config/core_configuration.h"
27 #include "src/core/lib/gprpp/match.h"
28 #include "src/core/load_balancing/xds/xds_channel_args.h"
29 #include "src/core/resolver/fake/fake_resolver.h"
30 #include "src/core/resolver/xds/xds_resolver_trace.h"
31
32 namespace grpc_core {
33
34 namespace {
35
36 // Max depth of aggregate cluster dependency graph.
37 constexpr int kMaxXdsAggregateClusterRecursionDepth = 16;
38
39 } // namespace
40
41 //
42 // XdsDependencyManager::XdsConfig::ClusterConfig
43 //
44
ClusterConfig(std::shared_ptr<const XdsClusterResource> cluster,std::shared_ptr<const XdsEndpointResource> endpoints,std::string resolution_note)45 XdsDependencyManager::XdsConfig::ClusterConfig::ClusterConfig(
46 std::shared_ptr<const XdsClusterResource> cluster,
47 std::shared_ptr<const XdsEndpointResource> endpoints,
48 std::string resolution_note)
49 : cluster(std::move(cluster)),
50 children(absl::in_place_type_t<EndpointConfig>(), std::move(endpoints),
51 std::move(resolution_note)) {}
52
ClusterConfig(std::shared_ptr<const XdsClusterResource> cluster,std::vector<absl::string_view> leaf_clusters)53 XdsDependencyManager::XdsConfig::ClusterConfig::ClusterConfig(
54 std::shared_ptr<const XdsClusterResource> cluster,
55 std::vector<absl::string_view> leaf_clusters)
56 : cluster(std::move(cluster)),
57 children(absl::in_place_type_t<AggregateConfig>(),
58 std::move(leaf_clusters)) {}
59
60 //
61 // XdsDependencyManager::XdsConfig
62 //
63
ToString() const64 std::string XdsDependencyManager::XdsConfig::ToString() const {
65 std::vector<std::string> parts = {
66 "{\n listener: {", listener->ToString(),
67 "}\n route_config: {", route_config->ToString(),
68 "}\n virtual_host: {", virtual_host->ToString(),
69 "}\n clusters: {\n"};
70 for (const auto& p : clusters) {
71 parts.push_back(absl::StrCat(" \"", p.first, "\": "));
72 if (!p.second.ok()) {
73 parts.push_back(p.second.status().ToString());
74 parts.push_back("\n");
75 } else {
76 parts.push_back(
77 absl::StrCat(" {\n"
78 " cluster: {",
79 p.second->cluster->ToString(), "}\n"));
80 Match(
81 p.second->children,
82 [&](const ClusterConfig::EndpointConfig& endpoint_config) {
83 parts.push_back(
84 absl::StrCat(" endpoints: {",
85 endpoint_config.endpoints == nullptr
86 ? "<null>"
87 : endpoint_config.endpoints->ToString(),
88 "}\n"
89 " resolution_note: \"",
90 endpoint_config.resolution_note, "\"\n"));
91 },
92 [&](const ClusterConfig::AggregateConfig& aggregate_config) {
93 parts.push_back(absl::StrCat(
94 " leaf_clusters: [",
95 absl::StrJoin(aggregate_config.leaf_clusters, ", "), "]\n"));
96 });
97 parts.push_back(
98 " }\n"
99 " ]\n");
100 }
101 }
102 parts.push_back(" }\n}");
103 return absl::StrJoin(parts, "");
104 }
105
106 //
107 // XdsDependencyManager::ListenerWatcher
108 //
109
110 class XdsDependencyManager::ListenerWatcher final
111 : public XdsListenerResourceType::WatcherInterface {
112 public:
ListenerWatcher(RefCountedPtr<XdsDependencyManager> dependency_mgr)113 explicit ListenerWatcher(RefCountedPtr<XdsDependencyManager> dependency_mgr)
114 : dependency_mgr_(std::move(dependency_mgr)) {}
115
OnResourceChanged(std::shared_ptr<const XdsListenerResource> listener,RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)116 void OnResourceChanged(
117 std::shared_ptr<const XdsListenerResource> listener,
118 RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
119 dependency_mgr_->work_serializer_->Run(
120 [dependency_mgr = dependency_mgr_, listener = std::move(listener),
121 read_delay_handle = std::move(read_delay_handle)]() mutable {
122 dependency_mgr->OnListenerUpdate(std::move(listener));
123 },
124 DEBUG_LOCATION);
125 }
126
OnError(absl::Status status,RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)127 void OnError(
128 absl::Status status,
129 RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
130 dependency_mgr_->work_serializer_->Run(
131 [dependency_mgr = dependency_mgr_, status = std::move(status),
132 read_delay_handle = std::move(read_delay_handle)]() mutable {
133 dependency_mgr->OnError(dependency_mgr->listener_resource_name_,
134 std::move(status));
135 },
136 DEBUG_LOCATION);
137 }
138
OnResourceDoesNotExist(RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)139 void OnResourceDoesNotExist(
140 RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
141 dependency_mgr_->work_serializer_->Run(
142 [dependency_mgr = dependency_mgr_,
143 read_delay_handle = std::move(read_delay_handle)]() {
144 dependency_mgr->OnResourceDoesNotExist(
145 absl::StrCat(dependency_mgr->listener_resource_name_,
146 ": xDS listener resource does not exist"));
147 },
148 DEBUG_LOCATION);
149 }
150
151 private:
152 RefCountedPtr<XdsDependencyManager> dependency_mgr_;
153 };
154
155 //
156 // XdsDependencyManager::RouteConfigWatcher
157 //
158
159 class XdsDependencyManager::RouteConfigWatcher final
160 : public XdsRouteConfigResourceType::WatcherInterface {
161 public:
RouteConfigWatcher(RefCountedPtr<XdsDependencyManager> dependency_mgr,std::string name)162 RouteConfigWatcher(RefCountedPtr<XdsDependencyManager> dependency_mgr,
163 std::string name)
164 : dependency_mgr_(std::move(dependency_mgr)), name_(std::move(name)) {}
165
OnResourceChanged(std::shared_ptr<const XdsRouteConfigResource> route_config,RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)166 void OnResourceChanged(
167 std::shared_ptr<const XdsRouteConfigResource> route_config,
168 RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
169 dependency_mgr_->work_serializer_->Run(
170 [self = RefAsSubclass<RouteConfigWatcher>(),
171 route_config = std::move(route_config),
172 read_delay_handle = std::move(read_delay_handle)]() mutable {
173 self->dependency_mgr_->OnRouteConfigUpdate(self->name_,
174 std::move(route_config));
175 },
176 DEBUG_LOCATION);
177 }
178
OnError(absl::Status status,RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)179 void OnError(
180 absl::Status status,
181 RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
182 dependency_mgr_->work_serializer_->Run(
183 [self = RefAsSubclass<RouteConfigWatcher>(), status = std::move(status),
184 read_delay_handle = std::move(read_delay_handle)]() mutable {
185 self->dependency_mgr_->OnError(self->name_, std::move(status));
186 },
187 DEBUG_LOCATION);
188 }
189
OnResourceDoesNotExist(RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)190 void OnResourceDoesNotExist(
191 RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
192 dependency_mgr_->work_serializer_->Run(
193 [self = RefAsSubclass<RouteConfigWatcher>(),
194 read_delay_handle = std::move(read_delay_handle)]() {
195 self->dependency_mgr_->OnResourceDoesNotExist(absl::StrCat(
196 self->name_,
197 ": xDS route configuration resource does not exist"));
198 },
199 DEBUG_LOCATION);
200 }
201
202 private:
203 RefCountedPtr<XdsDependencyManager> dependency_mgr_;
204 std::string name_;
205 };
206
207 //
208 // XdsDependencyManager::ClusterWatcher
209 //
210
211 class XdsDependencyManager::ClusterWatcher final
212 : public XdsClusterResourceType::WatcherInterface {
213 public:
ClusterWatcher(RefCountedPtr<XdsDependencyManager> dependency_mgr,absl::string_view name)214 ClusterWatcher(RefCountedPtr<XdsDependencyManager> dependency_mgr,
215 absl::string_view name)
216 : dependency_mgr_(std::move(dependency_mgr)), name_(name) {}
217
OnResourceChanged(std::shared_ptr<const XdsClusterResource> cluster,RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)218 void OnResourceChanged(
219 std::shared_ptr<const XdsClusterResource> cluster,
220 RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
221 dependency_mgr_->work_serializer_->Run(
222 [self = RefAsSubclass<ClusterWatcher>(), cluster = std::move(cluster),
223 read_delay_handle = std::move(read_delay_handle)]() mutable {
224 self->dependency_mgr_->OnClusterUpdate(self->name_,
225 std::move(cluster));
226 },
227 DEBUG_LOCATION);
228 }
229
OnError(absl::Status status,RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)230 void OnError(
231 absl::Status status,
232 RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
233 dependency_mgr_->work_serializer_->Run(
234 [self = RefAsSubclass<ClusterWatcher>(), status = std::move(status),
235 read_delay_handle = std::move(read_delay_handle)]() mutable {
236 self->dependency_mgr_->OnClusterError(self->name_, std::move(status));
237 },
238 DEBUG_LOCATION);
239 }
240
OnResourceDoesNotExist(RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)241 void OnResourceDoesNotExist(
242 RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
243 dependency_mgr_->work_serializer_->Run(
244 [self = RefAsSubclass<ClusterWatcher>(),
245 read_delay_handle = std::move(read_delay_handle)]() {
246 self->dependency_mgr_->OnClusterDoesNotExist(self->name_);
247 },
248 DEBUG_LOCATION);
249 }
250
251 private:
252 RefCountedPtr<XdsDependencyManager> dependency_mgr_;
253 std::string name_;
254 };
255
256 //
257 // XdsDependencyManager::EndpointWatcher
258 //
259
260 class XdsDependencyManager::EndpointWatcher final
261 : public XdsEndpointResourceType::WatcherInterface {
262 public:
EndpointWatcher(RefCountedPtr<XdsDependencyManager> dependency_mgr,absl::string_view name)263 EndpointWatcher(RefCountedPtr<XdsDependencyManager> dependency_mgr,
264 absl::string_view name)
265 : dependency_mgr_(std::move(dependency_mgr)), name_(name) {}
266
OnResourceChanged(std::shared_ptr<const XdsEndpointResource> endpoint,RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)267 void OnResourceChanged(
268 std::shared_ptr<const XdsEndpointResource> endpoint,
269 RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
270 dependency_mgr_->work_serializer_->Run(
271 [self = RefAsSubclass<EndpointWatcher>(),
272 endpoint = std::move(endpoint),
273 read_delay_handle = std::move(read_delay_handle)]() mutable {
274 self->dependency_mgr_->OnEndpointUpdate(self->name_,
275 std::move(endpoint));
276 },
277 DEBUG_LOCATION);
278 }
279
OnError(absl::Status status,RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)280 void OnError(
281 absl::Status status,
282 RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
283 dependency_mgr_->work_serializer_->Run(
284 [self = RefAsSubclass<EndpointWatcher>(), status = std::move(status),
285 read_delay_handle = std::move(read_delay_handle)]() mutable {
286 self->dependency_mgr_->OnEndpointError(self->name_,
287 std::move(status));
288 },
289 DEBUG_LOCATION);
290 }
291
OnResourceDoesNotExist(RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)292 void OnResourceDoesNotExist(
293 RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
294 dependency_mgr_->work_serializer_->Run(
295 [self = RefAsSubclass<EndpointWatcher>(),
296 read_delay_handle = std::move(read_delay_handle)]() {
297 self->dependency_mgr_->OnEndpointDoesNotExist(self->name_);
298 },
299 DEBUG_LOCATION);
300 }
301
302 private:
303 RefCountedPtr<XdsDependencyManager> dependency_mgr_;
304 std::string name_;
305 };
306
307 //
308 // XdsDependencyManager::DnsResultHandler
309 //
310
311 class XdsDependencyManager::DnsResultHandler final
312 : public Resolver::ResultHandler {
313 public:
DnsResultHandler(RefCountedPtr<XdsDependencyManager> dependency_mgr,std::string name)314 DnsResultHandler(RefCountedPtr<XdsDependencyManager> dependency_mgr,
315 std::string name)
316 : dependency_mgr_(std::move(dependency_mgr)), name_(std::move(name)) {}
317
ReportResult(Resolver::Result result)318 void ReportResult(Resolver::Result result) override {
319 dependency_mgr_->work_serializer_->Run(
320 [dependency_mgr = dependency_mgr_, name = name_,
321 result = std::move(result)]() mutable {
322 dependency_mgr->OnDnsResult(name, std::move(result));
323 },
324 DEBUG_LOCATION);
325 }
326
327 private:
328 RefCountedPtr<XdsDependencyManager> dependency_mgr_;
329 std::string name_;
330 };
331
332 //
333 // XdsDependencyManager::ClusterSubscription
334 //
335
Orphaned()336 void XdsDependencyManager::ClusterSubscription::Orphaned() {
337 dependency_mgr_->work_serializer_->Run(
338 [self = WeakRef()]() {
339 self->dependency_mgr_->OnClusterSubscriptionUnref(self->cluster_name_,
340 self.get());
341 },
342 DEBUG_LOCATION);
343 }
344
345 //
346 // XdsDependencyManager
347 //
348
XdsDependencyManager(RefCountedPtr<GrpcXdsClient> xds_client,std::shared_ptr<WorkSerializer> work_serializer,std::unique_ptr<Watcher> watcher,std::string data_plane_authority,std::string listener_resource_name,ChannelArgs args,grpc_pollset_set * interested_parties)349 XdsDependencyManager::XdsDependencyManager(
350 RefCountedPtr<GrpcXdsClient> xds_client,
351 std::shared_ptr<WorkSerializer> work_serializer,
352 std::unique_ptr<Watcher> watcher, std::string data_plane_authority,
353 std::string listener_resource_name, ChannelArgs args,
354 grpc_pollset_set* interested_parties)
355 : xds_client_(std::move(xds_client)),
356 work_serializer_(std::move(work_serializer)),
357 watcher_(std::move(watcher)),
358 data_plane_authority_(std::move(data_plane_authority)),
359 listener_resource_name_(std::move(listener_resource_name)),
360 args_(std::move(args)),
361 interested_parties_(interested_parties) {
362 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
363 gpr_log(GPR_INFO,
364 "[XdsDependencyManager %p] starting watch for listener %s", this,
365 listener_resource_name_.c_str());
366 }
367 auto listener_watcher = MakeRefCounted<ListenerWatcher>(Ref());
368 listener_watcher_ = listener_watcher.get();
369 XdsListenerResourceType::StartWatch(
370 xds_client_.get(), listener_resource_name_, std::move(listener_watcher));
371 }
372
Orphan()373 void XdsDependencyManager::Orphan() {
374 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
375 gpr_log(GPR_INFO, "[XdsDependencyManager %p] shutting down", this);
376 }
377 if (listener_watcher_ != nullptr) {
378 XdsListenerResourceType::CancelWatch(
379 xds_client_.get(), listener_resource_name_, listener_watcher_,
380 /*delay_unsubscription=*/false);
381 }
382 if (route_config_watcher_ != nullptr) {
383 XdsRouteConfigResourceType::CancelWatch(
384 xds_client_.get(), route_config_name_, route_config_watcher_,
385 /*delay_unsubscription=*/false);
386 }
387 for (const auto& p : cluster_watchers_) {
388 XdsClusterResourceType::CancelWatch(xds_client_.get(), p.first,
389 p.second.watcher,
390 /*delay_unsubscription=*/false);
391 }
392 for (const auto& p : endpoint_watchers_) {
393 XdsEndpointResourceType::CancelWatch(xds_client_.get(), p.first,
394 p.second.watcher,
395 /*delay_unsubscription=*/false);
396 }
397 cluster_subscriptions_.clear();
398 xds_client_.reset();
399 for (auto& p : dns_resolvers_) {
400 p.second.resolver.reset();
401 }
402 Unref();
403 }
404
OnListenerUpdate(std::shared_ptr<const XdsListenerResource> listener)405 void XdsDependencyManager::OnListenerUpdate(
406 std::shared_ptr<const XdsListenerResource> listener) {
407 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
408 gpr_log(GPR_INFO, "[XdsDependencyManager %p] received Listener update",
409 this);
410 }
411 if (xds_client_ == nullptr) return;
412 const auto* hcm = absl::get_if<XdsListenerResource::HttpConnectionManager>(
413 &listener->listener);
414 if (hcm == nullptr) {
415 return OnError(listener_resource_name_,
416 absl::UnavailableError("not an API listener"));
417 }
418 current_listener_ = std::move(listener);
419 Match(
420 hcm->route_config,
421 // RDS resource name
422 [&](const std::string& rds_name) {
423 // If the RDS name changed, update the RDS watcher.
424 // Note that this will be true on the initial update, because
425 // route_config_name_ will be empty.
426 if (route_config_name_ != rds_name) {
427 // If we already had a watch (i.e., if the previous config had
428 // a different RDS name), stop the previous watch.
429 // There will be no previous watch if either (a) this is the
430 // initial resource update or (b) the previous Listener had an
431 // inlined RouteConfig.
432 if (route_config_watcher_ != nullptr) {
433 XdsRouteConfigResourceType::CancelWatch(
434 xds_client_.get(), route_config_name_, route_config_watcher_,
435 /*delay_unsubscription=*/true);
436 route_config_watcher_ = nullptr;
437 }
438 // Start watch for the new RDS resource name.
439 route_config_name_ = rds_name;
440 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
441 gpr_log(
442 GPR_INFO,
443 "[XdsDependencyManager %p] starting watch for route config %s",
444 this, route_config_name_.c_str());
445 }
446 auto watcher =
447 MakeRefCounted<RouteConfigWatcher>(Ref(), route_config_name_);
448 route_config_watcher_ = watcher.get();
449 XdsRouteConfigResourceType::StartWatch(
450 xds_client_.get(), route_config_name_, std::move(watcher));
451 } else {
452 // RDS resource name has not changed, so no watch needs to be
453 // updated, but we still need to propagate any changes in the
454 // HCM config (e.g., the list of HTTP filters).
455 MaybeReportUpdate();
456 }
457 },
458 // inlined RouteConfig
459 [&](const std::shared_ptr<const XdsRouteConfigResource>& route_config) {
460 // If the previous update specified an RDS resource instead of
461 // having an inlined RouteConfig, we need to cancel the RDS watch.
462 if (route_config_watcher_ != nullptr) {
463 XdsRouteConfigResourceType::CancelWatch(
464 xds_client_.get(), route_config_name_, route_config_watcher_);
465 route_config_watcher_ = nullptr;
466 route_config_name_.clear();
467 }
468 OnRouteConfigUpdate("", route_config);
469 });
470 }
471
472 namespace {
473
474 class XdsVirtualHostListIterator final
475 : public XdsRouting::VirtualHostListIterator {
476 public:
XdsVirtualHostListIterator(const std::vector<XdsRouteConfigResource::VirtualHost> * virtual_hosts)477 explicit XdsVirtualHostListIterator(
478 const std::vector<XdsRouteConfigResource::VirtualHost>* virtual_hosts)
479 : virtual_hosts_(virtual_hosts) {}
480
Size() const481 size_t Size() const override { return virtual_hosts_->size(); }
482
GetDomainsForVirtualHost(size_t index) const483 const std::vector<std::string>& GetDomainsForVirtualHost(
484 size_t index) const override {
485 return (*virtual_hosts_)[index].domains;
486 }
487
488 private:
489 const std::vector<XdsRouteConfigResource::VirtualHost>* virtual_hosts_;
490 };
491
492 // Gets the set of clusters referenced in the specified virtual host.
GetClustersFromVirtualHost(const XdsRouteConfigResource::VirtualHost & virtual_host)493 absl::flat_hash_set<absl::string_view> GetClustersFromVirtualHost(
494 const XdsRouteConfigResource::VirtualHost& virtual_host) {
495 absl::flat_hash_set<absl::string_view> clusters;
496 for (auto& route : virtual_host.routes) {
497 auto* route_action =
498 absl::get_if<XdsRouteConfigResource::Route::RouteAction>(&route.action);
499 if (route_action == nullptr) continue;
500 Match(
501 route_action->action,
502 // cluster name
503 [&](const XdsRouteConfigResource::Route::RouteAction::ClusterName&
504 cluster_name) { clusters.insert(cluster_name.cluster_name); },
505 // WeightedClusters
506 [&](const std::vector<
507 XdsRouteConfigResource::Route::RouteAction::ClusterWeight>&
508 weighted_clusters) {
509 for (const auto& weighted_cluster : weighted_clusters) {
510 clusters.insert(weighted_cluster.name);
511 }
512 },
513 // ClusterSpecifierPlugin
514 [&](const XdsRouteConfigResource::Route::RouteAction::
515 ClusterSpecifierPluginName&) {
516 // Clusters are determined dynamically in this case, so we
517 // can't add any clusters here.
518 });
519 }
520 return clusters;
521 }
522
523 } // namespace
524
OnRouteConfigUpdate(const std::string & name,std::shared_ptr<const XdsRouteConfigResource> route_config)525 void XdsDependencyManager::OnRouteConfigUpdate(
526 const std::string& name,
527 std::shared_ptr<const XdsRouteConfigResource> route_config) {
528 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
529 gpr_log(GPR_INFO,
530 "[XdsDependencyManager %p] received RouteConfig update for %s",
531 this, name.empty() ? "<inline>" : name.c_str());
532 }
533 if (xds_client_ == nullptr) return;
534 // Ignore updates for stale names.
535 if (name.empty()) {
536 if (!route_config_name_.empty()) return;
537 } else {
538 if (name != route_config_name_) return;
539 }
540 // Find the relevant VirtualHost from the RouteConfiguration.
541 // If the resource doesn't have the right vhost, fail without updating
542 // our data.
543 auto vhost_index = XdsRouting::FindVirtualHostForDomain(
544 XdsVirtualHostListIterator(&route_config->virtual_hosts),
545 data_plane_authority_);
546 if (!vhost_index.has_value()) {
547 OnError(route_config_name_.empty() ? listener_resource_name_
548 : route_config_name_,
549 absl::UnavailableError(
550 absl::StrCat("could not find VirtualHost for ",
551 data_plane_authority_, " in RouteConfiguration")));
552 return;
553 }
554 // Update our data.
555 current_route_config_ = std::move(route_config);
556 current_virtual_host_ = ¤t_route_config_->virtual_hosts[*vhost_index];
557 clusters_from_route_config_ =
558 GetClustersFromVirtualHost(*current_virtual_host_);
559 MaybeReportUpdate();
560 }
561
OnError(std::string context,absl::Status status)562 void XdsDependencyManager::OnError(std::string context, absl::Status status) {
563 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
564 gpr_log(GPR_INFO,
565 "[XdsDependencyManager %p] received Listener or RouteConfig "
566 "error: %s %s",
567 this, context.c_str(), status.ToString().c_str());
568 }
569 if (xds_client_ == nullptr) return;
570 if (current_virtual_host_ != nullptr) return;
571 watcher_->OnError(context, std::move(status));
572 }
573
OnResourceDoesNotExist(std::string context)574 void XdsDependencyManager::OnResourceDoesNotExist(std::string context) {
575 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
576 gpr_log(GPR_INFO, "[XdsDependencyManager %p] %s", this, context.c_str());
577 }
578 if (xds_client_ == nullptr) return;
579 current_virtual_host_ = nullptr;
580 watcher_->OnResourceDoesNotExist(std::move(context));
581 }
582
OnClusterUpdate(const std::string & name,std::shared_ptr<const XdsClusterResource> cluster)583 void XdsDependencyManager::OnClusterUpdate(
584 const std::string& name,
585 std::shared_ptr<const XdsClusterResource> cluster) {
586 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
587 gpr_log(GPR_INFO, "[XdsDependencyManager %p] received Cluster update: %s",
588 this, name.c_str());
589 }
590 if (xds_client_ == nullptr) return;
591 auto it = cluster_watchers_.find(name);
592 if (it == cluster_watchers_.end()) return;
593 it->second.update = std::move(cluster);
594 MaybeReportUpdate();
595 }
596
OnClusterError(const std::string & name,absl::Status status)597 void XdsDependencyManager::OnClusterError(const std::string& name,
598 absl::Status status) {
599 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
600 gpr_log(GPR_INFO, "[XdsDependencyManager %p] received Cluster error: %s %s",
601 this, name.c_str(), status.ToString().c_str());
602 }
603 if (xds_client_ == nullptr) return;
604 auto it = cluster_watchers_.find(name);
605 if (it == cluster_watchers_.end()) return;
606 if (it->second.update.value_or(nullptr) == nullptr) {
607 it->second.update =
608 absl::Status(status.code(), absl::StrCat(name, ": ", status.message()));
609 }
610 MaybeReportUpdate();
611 }
612
OnClusterDoesNotExist(const std::string & name)613 void XdsDependencyManager::OnClusterDoesNotExist(const std::string& name) {
614 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
615 gpr_log(GPR_INFO, "[XdsDependencyManager %p] Cluster does not exist: %s",
616 this, name.c_str());
617 }
618 if (xds_client_ == nullptr) return;
619 auto it = cluster_watchers_.find(name);
620 if (it == cluster_watchers_.end()) return;
621 it->second.update = absl::UnavailableError(
622 absl::StrCat("CDS resource ", name, " does not exist"));
623 MaybeReportUpdate();
624 }
625
OnEndpointUpdate(const std::string & name,std::shared_ptr<const XdsEndpointResource> endpoint)626 void XdsDependencyManager::OnEndpointUpdate(
627 const std::string& name,
628 std::shared_ptr<const XdsEndpointResource> endpoint) {
629 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
630 gpr_log(GPR_INFO, "[XdsDependencyManager %p] received Endpoint update: %s",
631 this, name.c_str());
632 }
633 if (xds_client_ == nullptr) return;
634 auto it = endpoint_watchers_.find(name);
635 if (it == endpoint_watchers_.end()) return;
636 if (endpoint->priorities.empty()) {
637 it->second.update.resolution_note =
638 absl::StrCat("EDS resource ", name, " contains no localities");
639 } else {
640 std::set<absl::string_view> empty_localities;
641 for (const auto& priority : endpoint->priorities) {
642 for (const auto& p : priority.localities) {
643 if (p.second.endpoints.empty()) {
644 empty_localities.insert(
645 p.first->human_readable_string().as_string_view());
646 }
647 }
648 }
649 if (!empty_localities.empty()) {
650 it->second.update.resolution_note =
651 absl::StrCat("EDS resource ", name, " contains empty localities: [",
652 absl::StrJoin(empty_localities, "; "), "]");
653 }
654 }
655 it->second.update.endpoints = std::move(endpoint);
656 MaybeReportUpdate();
657 }
658
OnEndpointError(const std::string & name,absl::Status status)659 void XdsDependencyManager::OnEndpointError(const std::string& name,
660 absl::Status status) {
661 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
662 gpr_log(GPR_INFO,
663 "[XdsDependencyManager %p] received Endpoint error: %s %s", this,
664 name.c_str(), status.ToString().c_str());
665 }
666 if (xds_client_ == nullptr) return;
667 auto it = endpoint_watchers_.find(name);
668 if (it == endpoint_watchers_.end()) return;
669 if (it->second.update.endpoints == nullptr) {
670 it->second.update.resolution_note =
671 absl::StrCat("EDS resource ", name, ": ", status.ToString());
672 MaybeReportUpdate();
673 }
674 }
675
OnEndpointDoesNotExist(const std::string & name)676 void XdsDependencyManager::OnEndpointDoesNotExist(const std::string& name) {
677 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
678 gpr_log(GPR_INFO, "[XdsDependencyManager %p] Endpoint does not exist: %s",
679 this, name.c_str());
680 }
681 if (xds_client_ == nullptr) return;
682 auto it = endpoint_watchers_.find(name);
683 if (it == endpoint_watchers_.end()) return;
684 it->second.update.endpoints.reset();
685 it->second.update.resolution_note =
686 absl::StrCat("EDS resource ", name, " does not exist");
687 MaybeReportUpdate();
688 }
689
OnDnsResult(const std::string & dns_name,Resolver::Result result)690 void XdsDependencyManager::OnDnsResult(const std::string& dns_name,
691 Resolver::Result result) {
692 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
693 gpr_log(GPR_INFO, "[XdsDependencyManager %p] received DNS update: %s", this,
694 dns_name.c_str());
695 }
696 if (xds_client_ == nullptr) return;
697 auto it = dns_resolvers_.find(dns_name);
698 if (it == dns_resolvers_.end()) return;
699 PopulateDnsUpdate(dns_name, std::move(result), &it->second);
700 MaybeReportUpdate();
701 }
702
PopulateDnsUpdate(const std::string & dns_name,Resolver::Result result,DnsState * dns_state)703 void XdsDependencyManager::PopulateDnsUpdate(const std::string& dns_name,
704 Resolver::Result result,
705 DnsState* dns_state) {
706 // Convert resolver result to EDS update.
707 XdsEndpointResource::Priority::Locality locality;
708 locality.name = MakeRefCounted<XdsLocalityName>("", "", "");
709 locality.lb_weight = 1;
710 if (result.addresses.ok()) {
711 locality.endpoints = std::move(*result.addresses);
712 dns_state->update.resolution_note = std::move(result.resolution_note);
713 } else if (result.resolution_note.empty()) {
714 dns_state->update.resolution_note =
715 absl::StrCat("DNS resolution failed for ", dns_name, ": ",
716 result.addresses.status().ToString());
717 }
718 XdsEndpointResource::Priority priority;
719 priority.localities.emplace(locality.name.get(), std::move(locality));
720 auto resource = std::make_shared<XdsEndpointResource>();
721 resource->priorities.emplace_back(std::move(priority));
722 dns_state->update.endpoints = std::move(resource);
723 }
724
PopulateClusterConfigMap(absl::string_view name,int depth,absl::flat_hash_map<std::string,absl::StatusOr<XdsConfig::ClusterConfig>> * cluster_config_map,std::set<absl::string_view> * eds_resources_seen,std::set<absl::string_view> * dns_names_seen,absl::StatusOr<std::vector<absl::string_view>> * leaf_clusters)725 bool XdsDependencyManager::PopulateClusterConfigMap(
726 absl::string_view name, int depth,
727 absl::flat_hash_map<std::string, absl::StatusOr<XdsConfig::ClusterConfig>>*
728 cluster_config_map,
729 std::set<absl::string_view>* eds_resources_seen,
730 std::set<absl::string_view>* dns_names_seen,
731 absl::StatusOr<std::vector<absl::string_view>>* leaf_clusters) {
732 if (depth > 0) GPR_ASSERT(leaf_clusters != nullptr);
733 if (depth == kMaxXdsAggregateClusterRecursionDepth) {
734 *leaf_clusters =
735 absl::UnavailableError("aggregate cluster graph exceeds max depth");
736 return true;
737 }
738 // Don't process the cluster again if we've already seen it in some
739 // other branch of the recursion tree. We populate it with a non-OK
740 // status here, since we need an entry in the map to avoid incorrectly
741 // stopping the CDS watch, but we'll overwrite this below if we actually
742 // have the data for the cluster.
743 auto p = cluster_config_map->emplace(
744 name, absl::InternalError("cluster data not yet available"));
745 if (!p.second) return true;
746 auto& cluster_config = p.first->second;
747 auto& state = cluster_watchers_[name];
748 // Create a new watcher if needed.
749 if (state.watcher == nullptr) {
750 auto watcher = MakeRefCounted<ClusterWatcher>(Ref(), name);
751 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
752 gpr_log(GPR_INFO,
753 "[XdsDependencyManager %p] starting watch for cluster %s", this,
754 std::string(name).c_str());
755 }
756 state.watcher = watcher.get();
757 XdsClusterResourceType::StartWatch(xds_client_.get(), name,
758 std::move(watcher));
759 return false;
760 }
761 // If there was an error fetching the CDS resource, report the error.
762 if (!state.update.ok()) {
763 cluster_config = state.update.status();
764 return true;
765 }
766 // If we don't have the resource yet, we can't return a config yet.
767 if (*state.update == nullptr) return false;
768 // Populate endpoint info based on cluster type.
769 return Match(
770 (*state.update)->type,
771 // EDS cluster.
772 [&](const XdsClusterResource::Eds& eds) {
773 absl::string_view eds_resource_name =
774 eds.eds_service_name.empty() ? name : eds.eds_service_name;
775 eds_resources_seen->insert(eds_resource_name);
776 // Start EDS watch if needed.
777 auto& eds_state = endpoint_watchers_[eds_resource_name];
778 if (eds_state.watcher == nullptr) {
779 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
780 gpr_log(GPR_INFO,
781 "[XdsDependencyManager %p] starting watch for endpoint %s",
782 this, std::string(eds_resource_name).c_str());
783 }
784 auto watcher =
785 MakeRefCounted<EndpointWatcher>(Ref(), eds_resource_name);
786 eds_state.watcher = watcher.get();
787 XdsEndpointResourceType::StartWatch(
788 xds_client_.get(), eds_resource_name, std::move(watcher));
789 return false;
790 }
791 // Check if EDS resource has been returned.
792 if (eds_state.update.endpoints == nullptr &&
793 eds_state.update.resolution_note.empty()) {
794 return false;
795 }
796 // Populate cluster config.
797 cluster_config.emplace(*state.update, eds_state.update.endpoints,
798 eds_state.update.resolution_note);
799 if (leaf_clusters != nullptr) (*leaf_clusters)->push_back(name);
800 return true;
801 },
802 // LOGICAL_DNS cluster.
803 [&](const XdsClusterResource::LogicalDns& logical_dns) {
804 dns_names_seen->insert(logical_dns.hostname);
805 // Start DNS resolver if needed.
806 auto& dns_state = dns_resolvers_[logical_dns.hostname];
807 if (dns_state.resolver == nullptr) {
808 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
809 gpr_log(GPR_INFO,
810 "[XdsDependencyManager %p] starting DNS resolver for %s",
811 this, logical_dns.hostname.c_str());
812 }
813 auto* fake_resolver_response_generator = args_.GetPointer<
814 FakeResolverResponseGenerator>(
815 GRPC_ARG_XDS_LOGICAL_DNS_CLUSTER_FAKE_RESOLVER_RESPONSE_GENERATOR);
816 ChannelArgs args = args_;
817 std::string target;
818 if (fake_resolver_response_generator != nullptr) {
819 target = absl::StrCat("fake:", logical_dns.hostname);
820 args = args.SetObject(fake_resolver_response_generator->Ref());
821 } else {
822 target = absl::StrCat("dns:", logical_dns.hostname);
823 }
824 dns_state.resolver =
825 CoreConfiguration::Get().resolver_registry().CreateResolver(
826 target, args, interested_parties_, work_serializer_,
827 std::make_unique<DnsResultHandler>(Ref(),
828 logical_dns.hostname));
829 if (dns_state.resolver == nullptr) {
830 Resolver::Result result;
831 result.addresses.emplace(); // Empty list.
832 result.resolution_note = absl::StrCat(
833 "failed to create DNS resolver for ", logical_dns.hostname);
834 PopulateDnsUpdate(logical_dns.hostname, std::move(result),
835 &dns_state);
836 } else {
837 dns_state.resolver->StartLocked();
838 return false;
839 }
840 }
841 // Check if result has been returned.
842 if (dns_state.update.endpoints == nullptr &&
843 dns_state.update.resolution_note.empty()) {
844 return false;
845 }
846 // Populate cluster config.
847 cluster_config.emplace(*state.update, dns_state.update.endpoints,
848 dns_state.update.resolution_note);
849 if (leaf_clusters != nullptr) (*leaf_clusters)->push_back(name);
850 return true;
851 },
852 // Aggregate cluster. Recursively expand to child clusters.
853 [&](const XdsClusterResource::Aggregate& aggregate) {
854 // Grab a ref to the CDS resource for the aggregate cluster here,
855 // since our reference into cluster_watchers_ will be invalidated
856 // when we recursively call ourselves and add entries to the
857 // map for underlying clusters.
858 auto cluster_resource = *state.update;
859 // Recursively expand leaf clusters.
860 absl::StatusOr<std::vector<absl::string_view>> child_leaf_clusters;
861 child_leaf_clusters.emplace();
862 bool have_all_resources = true;
863 for (const std::string& child_name :
864 aggregate.prioritized_cluster_names) {
865 have_all_resources &= PopulateClusterConfigMap(
866 child_name, depth + 1, cluster_config_map, eds_resources_seen,
867 dns_names_seen, &child_leaf_clusters);
868 if (!child_leaf_clusters.ok()) break;
869 }
870 // Note that we cannot use the cluster_config reference we
871 // created above, because it may have been invalidated by map
872 // insertions when we recursively called ourselves, so we have
873 // to do the lookup in cluster_config_map again.
874 auto& aggregate_cluster_config = (*cluster_config_map)[name];
875 // If we exceeded max recursion depth, report an error for the
876 // cluster, and propagate the error up if needed.
877 if (!child_leaf_clusters.ok()) {
878 aggregate_cluster_config = child_leaf_clusters.status();
879 if (leaf_clusters != nullptr) {
880 *leaf_clusters = child_leaf_clusters.status();
881 }
882 return true;
883 }
884 // If needed, propagate leaf cluster list up the tree.
885 if (leaf_clusters != nullptr) {
886 (*leaf_clusters)
887 ->insert((*leaf_clusters)->end(), child_leaf_clusters->begin(),
888 child_leaf_clusters->end());
889 }
890 // If there are no leaf clusters, report an error for the cluster.
891 if (have_all_resources && child_leaf_clusters->empty()) {
892 aggregate_cluster_config = absl::UnavailableError(
893 absl::StrCat("aggregate cluster dependency graph for ", name,
894 " has no leaf clusters"));
895 return true;
896 }
897 // Populate cluster config.
898 // Note that we do this even for aggregate clusters that are not
899 // at the root of the tree, because we need to make sure the list
900 // of underlying cluster names stays alive so that the leaf cluster
901 // list of the root aggregate cluster can point to those strings.
902 aggregate_cluster_config.emplace(std::move(cluster_resource),
903 std::move(*child_leaf_clusters));
904 return have_all_resources;
905 });
906 }
907
908 RefCountedPtr<XdsDependencyManager::ClusterSubscription>
GetClusterSubscription(absl::string_view cluster_name)909 XdsDependencyManager::GetClusterSubscription(absl::string_view cluster_name) {
910 auto it = cluster_subscriptions_.find(cluster_name);
911 if (it != cluster_subscriptions_.end()) {
912 auto subscription = it->second->RefIfNonZero();
913 if (subscription != nullptr) return subscription;
914 }
915 auto subscription = MakeRefCounted<ClusterSubscription>(cluster_name, Ref());
916 cluster_subscriptions_.emplace(subscription->cluster_name(),
917 subscription->WeakRef());
918 // If the cluster is not already subscribed to by virtue of being
919 // referenced in the route config, then trigger the CDS watch.
920 if (!clusters_from_route_config_.contains(cluster_name)) {
921 MaybeReportUpdate();
922 }
923 return subscription;
924 }
925
OnClusterSubscriptionUnref(absl::string_view cluster_name,ClusterSubscription * subscription)926 void XdsDependencyManager::OnClusterSubscriptionUnref(
927 absl::string_view cluster_name, ClusterSubscription* subscription) {
928 auto it = cluster_subscriptions_.find(cluster_name);
929 // Shouldn't happen, but ignore if it does.
930 if (it == cluster_subscriptions_.end()) return;
931 // Do nothing if the subscription has already been replaced.
932 if (it->second != subscription) return;
933 // Remove the entry.
934 cluster_subscriptions_.erase(it);
935 // If this cluster is not already subscribed to by virtue of being
936 // referenced in the route config, then update watches and generate a
937 // new update.
938 if (!clusters_from_route_config_.contains(cluster_name)) {
939 MaybeReportUpdate();
940 }
941 }
942
MaybeReportUpdate()943 void XdsDependencyManager::MaybeReportUpdate() {
944 // Populate Listener and RouteConfig fields.
945 if (current_virtual_host_ == nullptr) return;
946 auto config = MakeRefCounted<XdsConfig>();
947 config->listener = current_listener_;
948 config->route_config = current_route_config_;
949 config->virtual_host = current_virtual_host_;
950 // Determine the set of clusters we should be watching.
951 std::set<absl::string_view> clusters_to_watch;
952 for (const absl::string_view& cluster : clusters_from_route_config_) {
953 clusters_to_watch.insert(cluster);
954 }
955 for (const auto& p : cluster_subscriptions_) {
956 clusters_to_watch.insert(p.first);
957 }
958 // Populate Cluster map.
959 // We traverse the entire graph even if we don't yet have all of the
960 // resources we need to ensure that the right set of watches are active.
961 std::set<absl::string_view> eds_resources_seen;
962 std::set<absl::string_view> dns_names_seen;
963 bool have_all_resources = true;
964 for (const absl::string_view& cluster : clusters_to_watch) {
965 have_all_resources &= PopulateClusterConfigMap(
966 cluster, 0, &config->clusters, &eds_resources_seen, &dns_names_seen);
967 }
968 // Remove entries in cluster_watchers_ for any clusters not in
969 // config->clusters.
970 for (auto it = cluster_watchers_.begin(); it != cluster_watchers_.end();) {
971 const std::string& cluster_name = it->first;
972 if (config->clusters.contains(cluster_name)) {
973 ++it;
974 continue;
975 }
976 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
977 gpr_log(GPR_INFO,
978 "[XdsDependencyManager %p] cancelling watch for cluster %s", this,
979 cluster_name.c_str());
980 }
981 XdsClusterResourceType::CancelWatch(xds_client_.get(), cluster_name,
982 it->second.watcher,
983 /*delay_unsubscription=*/false);
984 cluster_watchers_.erase(it++);
985 }
986 // Remove entries in endpoint_watchers_ for any EDS resources not in
987 // eds_resources_seen.
988 for (auto it = endpoint_watchers_.begin(); it != endpoint_watchers_.end();) {
989 const std::string& eds_resource_name = it->first;
990 if (eds_resources_seen.find(eds_resource_name) !=
991 eds_resources_seen.end()) {
992 ++it;
993 continue;
994 }
995 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
996 gpr_log(GPR_INFO,
997 "[XdsDependencyManager %p] cancelling watch for EDS resource %s",
998 this, eds_resource_name.c_str());
999 }
1000 XdsEndpointResourceType::CancelWatch(xds_client_.get(), eds_resource_name,
1001 it->second.watcher,
1002 /*delay_unsubscription=*/false);
1003 endpoint_watchers_.erase(it++);
1004 }
1005 // Remove entries in dns_resolvers_ for any DNS name not in
1006 // eds_resources_seen.
1007 for (auto it = dns_resolvers_.begin(); it != dns_resolvers_.end();) {
1008 const std::string& dns_name = it->first;
1009 if (dns_names_seen.find(dns_name) != dns_names_seen.end()) {
1010 ++it;
1011 continue;
1012 }
1013 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
1014 gpr_log(GPR_INFO,
1015 "[XdsDependencyManager %p] shutting down DNS resolver for %s",
1016 this, dns_name.c_str());
1017 }
1018 dns_resolvers_.erase(it++);
1019 }
1020 // If we have all the data we need, then send an update.
1021 if (!have_all_resources) {
1022 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
1023 gpr_log(GPR_INFO,
1024 "[XdsDependencyManager %p] missing data -- NOT returning config",
1025 this);
1026 }
1027 return;
1028 }
1029 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
1030 gpr_log(GPR_INFO, "[XdsDependencyManager %p] returning config: %s", this,
1031 config->ToString().c_str());
1032 }
1033 watcher_->OnUpdate(std::move(config));
1034 }
1035
1036 } // namespace grpc_core
1037