xref: /aosp_15_r20/external/grpc-grpc/src/core/resolver/xds/xds_dependency_manager.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 // Copyright 2019 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include "src/core/resolver/xds/xds_dependency_manager.h"
20 
21 #include <set>
22 
23 #include "absl/strings/str_join.h"
24 
25 #include "src/core/ext/xds/xds_routing.h"
26 #include "src/core/lib/config/core_configuration.h"
27 #include "src/core/lib/gprpp/match.h"
28 #include "src/core/load_balancing/xds/xds_channel_args.h"
29 #include "src/core/resolver/fake/fake_resolver.h"
30 #include "src/core/resolver/xds/xds_resolver_trace.h"
31 
32 namespace grpc_core {
33 
34 namespace {
35 
36 // Max depth of aggregate cluster dependency graph.
37 constexpr int kMaxXdsAggregateClusterRecursionDepth = 16;
38 
39 }  // namespace
40 
41 //
42 // XdsDependencyManager::XdsConfig::ClusterConfig
43 //
44 
ClusterConfig(std::shared_ptr<const XdsClusterResource> cluster,std::shared_ptr<const XdsEndpointResource> endpoints,std::string resolution_note)45 XdsDependencyManager::XdsConfig::ClusterConfig::ClusterConfig(
46     std::shared_ptr<const XdsClusterResource> cluster,
47     std::shared_ptr<const XdsEndpointResource> endpoints,
48     std::string resolution_note)
49     : cluster(std::move(cluster)),
50       children(absl::in_place_type_t<EndpointConfig>(), std::move(endpoints),
51                std::move(resolution_note)) {}
52 
ClusterConfig(std::shared_ptr<const XdsClusterResource> cluster,std::vector<absl::string_view> leaf_clusters)53 XdsDependencyManager::XdsConfig::ClusterConfig::ClusterConfig(
54     std::shared_ptr<const XdsClusterResource> cluster,
55     std::vector<absl::string_view> leaf_clusters)
56     : cluster(std::move(cluster)),
57       children(absl::in_place_type_t<AggregateConfig>(),
58                std::move(leaf_clusters)) {}
59 
60 //
61 // XdsDependencyManager::XdsConfig
62 //
63 
ToString() const64 std::string XdsDependencyManager::XdsConfig::ToString() const {
65   std::vector<std::string> parts = {
66       "{\n  listener: {",     listener->ToString(),
67       "}\n  route_config: {", route_config->ToString(),
68       "}\n  virtual_host: {", virtual_host->ToString(),
69       "}\n  clusters: {\n"};
70   for (const auto& p : clusters) {
71     parts.push_back(absl::StrCat("    \"", p.first, "\": "));
72     if (!p.second.ok()) {
73       parts.push_back(p.second.status().ToString());
74       parts.push_back("\n");
75     } else {
76       parts.push_back(
77           absl::StrCat("      {\n"
78                        "        cluster: {",
79                        p.second->cluster->ToString(), "}\n"));
80       Match(
81           p.second->children,
82           [&](const ClusterConfig::EndpointConfig& endpoint_config) {
83             parts.push_back(
84                 absl::StrCat("        endpoints: {",
85                              endpoint_config.endpoints == nullptr
86                                  ? "<null>"
87                                  : endpoint_config.endpoints->ToString(),
88                              "}\n"
89                              "        resolution_note: \"",
90                              endpoint_config.resolution_note, "\"\n"));
91           },
92           [&](const ClusterConfig::AggregateConfig& aggregate_config) {
93             parts.push_back(absl::StrCat(
94                 "        leaf_clusters: [",
95                 absl::StrJoin(aggregate_config.leaf_clusters, ", "), "]\n"));
96           });
97       parts.push_back(
98           "      }\n"
99           "    ]\n");
100     }
101   }
102   parts.push_back("  }\n}");
103   return absl::StrJoin(parts, "");
104 }
105 
106 //
107 // XdsDependencyManager::ListenerWatcher
108 //
109 
110 class XdsDependencyManager::ListenerWatcher final
111     : public XdsListenerResourceType::WatcherInterface {
112  public:
ListenerWatcher(RefCountedPtr<XdsDependencyManager> dependency_mgr)113   explicit ListenerWatcher(RefCountedPtr<XdsDependencyManager> dependency_mgr)
114       : dependency_mgr_(std::move(dependency_mgr)) {}
115 
OnResourceChanged(std::shared_ptr<const XdsListenerResource> listener,RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)116   void OnResourceChanged(
117       std::shared_ptr<const XdsListenerResource> listener,
118       RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
119     dependency_mgr_->work_serializer_->Run(
120         [dependency_mgr = dependency_mgr_, listener = std::move(listener),
121          read_delay_handle = std::move(read_delay_handle)]() mutable {
122           dependency_mgr->OnListenerUpdate(std::move(listener));
123         },
124         DEBUG_LOCATION);
125   }
126 
OnError(absl::Status status,RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)127   void OnError(
128       absl::Status status,
129       RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
130     dependency_mgr_->work_serializer_->Run(
131         [dependency_mgr = dependency_mgr_, status = std::move(status),
132          read_delay_handle = std::move(read_delay_handle)]() mutable {
133           dependency_mgr->OnError(dependency_mgr->listener_resource_name_,
134                                   std::move(status));
135         },
136         DEBUG_LOCATION);
137   }
138 
OnResourceDoesNotExist(RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)139   void OnResourceDoesNotExist(
140       RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
141     dependency_mgr_->work_serializer_->Run(
142         [dependency_mgr = dependency_mgr_,
143          read_delay_handle = std::move(read_delay_handle)]() {
144           dependency_mgr->OnResourceDoesNotExist(
145               absl::StrCat(dependency_mgr->listener_resource_name_,
146                            ": xDS listener resource does not exist"));
147         },
148         DEBUG_LOCATION);
149   }
150 
151  private:
152   RefCountedPtr<XdsDependencyManager> dependency_mgr_;
153 };
154 
155 //
156 // XdsDependencyManager::RouteConfigWatcher
157 //
158 
159 class XdsDependencyManager::RouteConfigWatcher final
160     : public XdsRouteConfigResourceType::WatcherInterface {
161  public:
RouteConfigWatcher(RefCountedPtr<XdsDependencyManager> dependency_mgr,std::string name)162   RouteConfigWatcher(RefCountedPtr<XdsDependencyManager> dependency_mgr,
163                      std::string name)
164       : dependency_mgr_(std::move(dependency_mgr)), name_(std::move(name)) {}
165 
OnResourceChanged(std::shared_ptr<const XdsRouteConfigResource> route_config,RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)166   void OnResourceChanged(
167       std::shared_ptr<const XdsRouteConfigResource> route_config,
168       RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
169     dependency_mgr_->work_serializer_->Run(
170         [self = RefAsSubclass<RouteConfigWatcher>(),
171          route_config = std::move(route_config),
172          read_delay_handle = std::move(read_delay_handle)]() mutable {
173           self->dependency_mgr_->OnRouteConfigUpdate(self->name_,
174                                                      std::move(route_config));
175         },
176         DEBUG_LOCATION);
177   }
178 
OnError(absl::Status status,RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)179   void OnError(
180       absl::Status status,
181       RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
182     dependency_mgr_->work_serializer_->Run(
183         [self = RefAsSubclass<RouteConfigWatcher>(), status = std::move(status),
184          read_delay_handle = std::move(read_delay_handle)]() mutable {
185           self->dependency_mgr_->OnError(self->name_, std::move(status));
186         },
187         DEBUG_LOCATION);
188   }
189 
OnResourceDoesNotExist(RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)190   void OnResourceDoesNotExist(
191       RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
192     dependency_mgr_->work_serializer_->Run(
193         [self = RefAsSubclass<RouteConfigWatcher>(),
194          read_delay_handle = std::move(read_delay_handle)]() {
195           self->dependency_mgr_->OnResourceDoesNotExist(absl::StrCat(
196               self->name_,
197               ": xDS route configuration resource does not exist"));
198         },
199         DEBUG_LOCATION);
200   }
201 
202  private:
203   RefCountedPtr<XdsDependencyManager> dependency_mgr_;
204   std::string name_;
205 };
206 
207 //
208 // XdsDependencyManager::ClusterWatcher
209 //
210 
211 class XdsDependencyManager::ClusterWatcher final
212     : public XdsClusterResourceType::WatcherInterface {
213  public:
ClusterWatcher(RefCountedPtr<XdsDependencyManager> dependency_mgr,absl::string_view name)214   ClusterWatcher(RefCountedPtr<XdsDependencyManager> dependency_mgr,
215                  absl::string_view name)
216       : dependency_mgr_(std::move(dependency_mgr)), name_(name) {}
217 
OnResourceChanged(std::shared_ptr<const XdsClusterResource> cluster,RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)218   void OnResourceChanged(
219       std::shared_ptr<const XdsClusterResource> cluster,
220       RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
221     dependency_mgr_->work_serializer_->Run(
222         [self = RefAsSubclass<ClusterWatcher>(), cluster = std::move(cluster),
223          read_delay_handle = std::move(read_delay_handle)]() mutable {
224           self->dependency_mgr_->OnClusterUpdate(self->name_,
225                                                  std::move(cluster));
226         },
227         DEBUG_LOCATION);
228   }
229 
OnError(absl::Status status,RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)230   void OnError(
231       absl::Status status,
232       RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
233     dependency_mgr_->work_serializer_->Run(
234         [self = RefAsSubclass<ClusterWatcher>(), status = std::move(status),
235          read_delay_handle = std::move(read_delay_handle)]() mutable {
236           self->dependency_mgr_->OnClusterError(self->name_, std::move(status));
237         },
238         DEBUG_LOCATION);
239   }
240 
OnResourceDoesNotExist(RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)241   void OnResourceDoesNotExist(
242       RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
243     dependency_mgr_->work_serializer_->Run(
244         [self = RefAsSubclass<ClusterWatcher>(),
245          read_delay_handle = std::move(read_delay_handle)]() {
246           self->dependency_mgr_->OnClusterDoesNotExist(self->name_);
247         },
248         DEBUG_LOCATION);
249   }
250 
251  private:
252   RefCountedPtr<XdsDependencyManager> dependency_mgr_;
253   std::string name_;
254 };
255 
256 //
257 // XdsDependencyManager::EndpointWatcher
258 //
259 
260 class XdsDependencyManager::EndpointWatcher final
261     : public XdsEndpointResourceType::WatcherInterface {
262  public:
EndpointWatcher(RefCountedPtr<XdsDependencyManager> dependency_mgr,absl::string_view name)263   EndpointWatcher(RefCountedPtr<XdsDependencyManager> dependency_mgr,
264                   absl::string_view name)
265       : dependency_mgr_(std::move(dependency_mgr)), name_(name) {}
266 
OnResourceChanged(std::shared_ptr<const XdsEndpointResource> endpoint,RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)267   void OnResourceChanged(
268       std::shared_ptr<const XdsEndpointResource> endpoint,
269       RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
270     dependency_mgr_->work_serializer_->Run(
271         [self = RefAsSubclass<EndpointWatcher>(),
272          endpoint = std::move(endpoint),
273          read_delay_handle = std::move(read_delay_handle)]() mutable {
274           self->dependency_mgr_->OnEndpointUpdate(self->name_,
275                                                   std::move(endpoint));
276         },
277         DEBUG_LOCATION);
278   }
279 
OnError(absl::Status status,RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)280   void OnError(
281       absl::Status status,
282       RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
283     dependency_mgr_->work_serializer_->Run(
284         [self = RefAsSubclass<EndpointWatcher>(), status = std::move(status),
285          read_delay_handle = std::move(read_delay_handle)]() mutable {
286           self->dependency_mgr_->OnEndpointError(self->name_,
287                                                  std::move(status));
288         },
289         DEBUG_LOCATION);
290   }
291 
OnResourceDoesNotExist(RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)292   void OnResourceDoesNotExist(
293       RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
294     dependency_mgr_->work_serializer_->Run(
295         [self = RefAsSubclass<EndpointWatcher>(),
296          read_delay_handle = std::move(read_delay_handle)]() {
297           self->dependency_mgr_->OnEndpointDoesNotExist(self->name_);
298         },
299         DEBUG_LOCATION);
300   }
301 
302  private:
303   RefCountedPtr<XdsDependencyManager> dependency_mgr_;
304   std::string name_;
305 };
306 
307 //
308 // XdsDependencyManager::DnsResultHandler
309 //
310 
311 class XdsDependencyManager::DnsResultHandler final
312     : public Resolver::ResultHandler {
313  public:
DnsResultHandler(RefCountedPtr<XdsDependencyManager> dependency_mgr,std::string name)314   DnsResultHandler(RefCountedPtr<XdsDependencyManager> dependency_mgr,
315                    std::string name)
316       : dependency_mgr_(std::move(dependency_mgr)), name_(std::move(name)) {}
317 
ReportResult(Resolver::Result result)318   void ReportResult(Resolver::Result result) override {
319     dependency_mgr_->work_serializer_->Run(
320         [dependency_mgr = dependency_mgr_, name = name_,
321          result = std::move(result)]() mutable {
322           dependency_mgr->OnDnsResult(name, std::move(result));
323         },
324         DEBUG_LOCATION);
325   }
326 
327  private:
328   RefCountedPtr<XdsDependencyManager> dependency_mgr_;
329   std::string name_;
330 };
331 
332 //
333 // XdsDependencyManager::ClusterSubscription
334 //
335 
Orphaned()336 void XdsDependencyManager::ClusterSubscription::Orphaned() {
337   dependency_mgr_->work_serializer_->Run(
338       [self = WeakRef()]() {
339         self->dependency_mgr_->OnClusterSubscriptionUnref(self->cluster_name_,
340                                                           self.get());
341       },
342       DEBUG_LOCATION);
343 }
344 
345 //
346 // XdsDependencyManager
347 //
348 
XdsDependencyManager(RefCountedPtr<GrpcXdsClient> xds_client,std::shared_ptr<WorkSerializer> work_serializer,std::unique_ptr<Watcher> watcher,std::string data_plane_authority,std::string listener_resource_name,ChannelArgs args,grpc_pollset_set * interested_parties)349 XdsDependencyManager::XdsDependencyManager(
350     RefCountedPtr<GrpcXdsClient> xds_client,
351     std::shared_ptr<WorkSerializer> work_serializer,
352     std::unique_ptr<Watcher> watcher, std::string data_plane_authority,
353     std::string listener_resource_name, ChannelArgs args,
354     grpc_pollset_set* interested_parties)
355     : xds_client_(std::move(xds_client)),
356       work_serializer_(std::move(work_serializer)),
357       watcher_(std::move(watcher)),
358       data_plane_authority_(std::move(data_plane_authority)),
359       listener_resource_name_(std::move(listener_resource_name)),
360       args_(std::move(args)),
361       interested_parties_(interested_parties) {
362   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
363     gpr_log(GPR_INFO,
364             "[XdsDependencyManager %p] starting watch for listener %s", this,
365             listener_resource_name_.c_str());
366   }
367   auto listener_watcher = MakeRefCounted<ListenerWatcher>(Ref());
368   listener_watcher_ = listener_watcher.get();
369   XdsListenerResourceType::StartWatch(
370       xds_client_.get(), listener_resource_name_, std::move(listener_watcher));
371 }
372 
Orphan()373 void XdsDependencyManager::Orphan() {
374   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
375     gpr_log(GPR_INFO, "[XdsDependencyManager %p] shutting down", this);
376   }
377   if (listener_watcher_ != nullptr) {
378     XdsListenerResourceType::CancelWatch(
379         xds_client_.get(), listener_resource_name_, listener_watcher_,
380         /*delay_unsubscription=*/false);
381   }
382   if (route_config_watcher_ != nullptr) {
383     XdsRouteConfigResourceType::CancelWatch(
384         xds_client_.get(), route_config_name_, route_config_watcher_,
385         /*delay_unsubscription=*/false);
386   }
387   for (const auto& p : cluster_watchers_) {
388     XdsClusterResourceType::CancelWatch(xds_client_.get(), p.first,
389                                         p.second.watcher,
390                                         /*delay_unsubscription=*/false);
391   }
392   for (const auto& p : endpoint_watchers_) {
393     XdsEndpointResourceType::CancelWatch(xds_client_.get(), p.first,
394                                          p.second.watcher,
395                                          /*delay_unsubscription=*/false);
396   }
397   cluster_subscriptions_.clear();
398   xds_client_.reset();
399   for (auto& p : dns_resolvers_) {
400     p.second.resolver.reset();
401   }
402   Unref();
403 }
404 
OnListenerUpdate(std::shared_ptr<const XdsListenerResource> listener)405 void XdsDependencyManager::OnListenerUpdate(
406     std::shared_ptr<const XdsListenerResource> listener) {
407   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
408     gpr_log(GPR_INFO, "[XdsDependencyManager %p] received Listener update",
409             this);
410   }
411   if (xds_client_ == nullptr) return;
412   const auto* hcm = absl::get_if<XdsListenerResource::HttpConnectionManager>(
413       &listener->listener);
414   if (hcm == nullptr) {
415     return OnError(listener_resource_name_,
416                    absl::UnavailableError("not an API listener"));
417   }
418   current_listener_ = std::move(listener);
419   Match(
420       hcm->route_config,
421       // RDS resource name
422       [&](const std::string& rds_name) {
423         // If the RDS name changed, update the RDS watcher.
424         // Note that this will be true on the initial update, because
425         // route_config_name_ will be empty.
426         if (route_config_name_ != rds_name) {
427           // If we already had a watch (i.e., if the previous config had
428           // a different RDS name), stop the previous watch.
429           // There will be no previous watch if either (a) this is the
430           // initial resource update or (b) the previous Listener had an
431           // inlined RouteConfig.
432           if (route_config_watcher_ != nullptr) {
433             XdsRouteConfigResourceType::CancelWatch(
434                 xds_client_.get(), route_config_name_, route_config_watcher_,
435                 /*delay_unsubscription=*/true);
436             route_config_watcher_ = nullptr;
437           }
438           // Start watch for the new RDS resource name.
439           route_config_name_ = rds_name;
440           if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
441             gpr_log(
442                 GPR_INFO,
443                 "[XdsDependencyManager %p] starting watch for route config %s",
444                 this, route_config_name_.c_str());
445           }
446           auto watcher =
447               MakeRefCounted<RouteConfigWatcher>(Ref(), route_config_name_);
448           route_config_watcher_ = watcher.get();
449           XdsRouteConfigResourceType::StartWatch(
450               xds_client_.get(), route_config_name_, std::move(watcher));
451         } else {
452           // RDS resource name has not changed, so no watch needs to be
453           // updated, but we still need to propagate any changes in the
454           // HCM config (e.g., the list of HTTP filters).
455           MaybeReportUpdate();
456         }
457       },
458       // inlined RouteConfig
459       [&](const std::shared_ptr<const XdsRouteConfigResource>& route_config) {
460         // If the previous update specified an RDS resource instead of
461         // having an inlined RouteConfig, we need to cancel the RDS watch.
462         if (route_config_watcher_ != nullptr) {
463           XdsRouteConfigResourceType::CancelWatch(
464               xds_client_.get(), route_config_name_, route_config_watcher_);
465           route_config_watcher_ = nullptr;
466           route_config_name_.clear();
467         }
468         OnRouteConfigUpdate("", route_config);
469       });
470 }
471 
472 namespace {
473 
474 class XdsVirtualHostListIterator final
475     : public XdsRouting::VirtualHostListIterator {
476  public:
XdsVirtualHostListIterator(const std::vector<XdsRouteConfigResource::VirtualHost> * virtual_hosts)477   explicit XdsVirtualHostListIterator(
478       const std::vector<XdsRouteConfigResource::VirtualHost>* virtual_hosts)
479       : virtual_hosts_(virtual_hosts) {}
480 
Size() const481   size_t Size() const override { return virtual_hosts_->size(); }
482 
GetDomainsForVirtualHost(size_t index) const483   const std::vector<std::string>& GetDomainsForVirtualHost(
484       size_t index) const override {
485     return (*virtual_hosts_)[index].domains;
486   }
487 
488  private:
489   const std::vector<XdsRouteConfigResource::VirtualHost>* virtual_hosts_;
490 };
491 
492 // Gets the set of clusters referenced in the specified virtual host.
GetClustersFromVirtualHost(const XdsRouteConfigResource::VirtualHost & virtual_host)493 absl::flat_hash_set<absl::string_view> GetClustersFromVirtualHost(
494     const XdsRouteConfigResource::VirtualHost& virtual_host) {
495   absl::flat_hash_set<absl::string_view> clusters;
496   for (auto& route : virtual_host.routes) {
497     auto* route_action =
498         absl::get_if<XdsRouteConfigResource::Route::RouteAction>(&route.action);
499     if (route_action == nullptr) continue;
500     Match(
501         route_action->action,
502         // cluster name
503         [&](const XdsRouteConfigResource::Route::RouteAction::ClusterName&
504                 cluster_name) { clusters.insert(cluster_name.cluster_name); },
505         // WeightedClusters
506         [&](const std::vector<
507             XdsRouteConfigResource::Route::RouteAction::ClusterWeight>&
508                 weighted_clusters) {
509           for (const auto& weighted_cluster : weighted_clusters) {
510             clusters.insert(weighted_cluster.name);
511           }
512         },
513         // ClusterSpecifierPlugin
514         [&](const XdsRouteConfigResource::Route::RouteAction::
515                 ClusterSpecifierPluginName&) {
516           // Clusters are determined dynamically in this case, so we
517           // can't add any clusters here.
518         });
519   }
520   return clusters;
521 }
522 
523 }  // namespace
524 
OnRouteConfigUpdate(const std::string & name,std::shared_ptr<const XdsRouteConfigResource> route_config)525 void XdsDependencyManager::OnRouteConfigUpdate(
526     const std::string& name,
527     std::shared_ptr<const XdsRouteConfigResource> route_config) {
528   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
529     gpr_log(GPR_INFO,
530             "[XdsDependencyManager %p] received RouteConfig update for %s",
531             this, name.empty() ? "<inline>" : name.c_str());
532   }
533   if (xds_client_ == nullptr) return;
534   // Ignore updates for stale names.
535   if (name.empty()) {
536     if (!route_config_name_.empty()) return;
537   } else {
538     if (name != route_config_name_) return;
539   }
540   // Find the relevant VirtualHost from the RouteConfiguration.
541   // If the resource doesn't have the right vhost, fail without updating
542   // our data.
543   auto vhost_index = XdsRouting::FindVirtualHostForDomain(
544       XdsVirtualHostListIterator(&route_config->virtual_hosts),
545       data_plane_authority_);
546   if (!vhost_index.has_value()) {
547     OnError(route_config_name_.empty() ? listener_resource_name_
548                                        : route_config_name_,
549             absl::UnavailableError(
550                 absl::StrCat("could not find VirtualHost for ",
551                              data_plane_authority_, " in RouteConfiguration")));
552     return;
553   }
554   // Update our data.
555   current_route_config_ = std::move(route_config);
556   current_virtual_host_ = &current_route_config_->virtual_hosts[*vhost_index];
557   clusters_from_route_config_ =
558       GetClustersFromVirtualHost(*current_virtual_host_);
559   MaybeReportUpdate();
560 }
561 
OnError(std::string context,absl::Status status)562 void XdsDependencyManager::OnError(std::string context, absl::Status status) {
563   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
564     gpr_log(GPR_INFO,
565             "[XdsDependencyManager %p] received Listener or RouteConfig "
566             "error: %s %s",
567             this, context.c_str(), status.ToString().c_str());
568   }
569   if (xds_client_ == nullptr) return;
570   if (current_virtual_host_ != nullptr) return;
571   watcher_->OnError(context, std::move(status));
572 }
573 
OnResourceDoesNotExist(std::string context)574 void XdsDependencyManager::OnResourceDoesNotExist(std::string context) {
575   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
576     gpr_log(GPR_INFO, "[XdsDependencyManager %p] %s", this, context.c_str());
577   }
578   if (xds_client_ == nullptr) return;
579   current_virtual_host_ = nullptr;
580   watcher_->OnResourceDoesNotExist(std::move(context));
581 }
582 
OnClusterUpdate(const std::string & name,std::shared_ptr<const XdsClusterResource> cluster)583 void XdsDependencyManager::OnClusterUpdate(
584     const std::string& name,
585     std::shared_ptr<const XdsClusterResource> cluster) {
586   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
587     gpr_log(GPR_INFO, "[XdsDependencyManager %p] received Cluster update: %s",
588             this, name.c_str());
589   }
590   if (xds_client_ == nullptr) return;
591   auto it = cluster_watchers_.find(name);
592   if (it == cluster_watchers_.end()) return;
593   it->second.update = std::move(cluster);
594   MaybeReportUpdate();
595 }
596 
OnClusterError(const std::string & name,absl::Status status)597 void XdsDependencyManager::OnClusterError(const std::string& name,
598                                           absl::Status status) {
599   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
600     gpr_log(GPR_INFO, "[XdsDependencyManager %p] received Cluster error: %s %s",
601             this, name.c_str(), status.ToString().c_str());
602   }
603   if (xds_client_ == nullptr) return;
604   auto it = cluster_watchers_.find(name);
605   if (it == cluster_watchers_.end()) return;
606   if (it->second.update.value_or(nullptr) == nullptr) {
607     it->second.update =
608         absl::Status(status.code(), absl::StrCat(name, ": ", status.message()));
609   }
610   MaybeReportUpdate();
611 }
612 
OnClusterDoesNotExist(const std::string & name)613 void XdsDependencyManager::OnClusterDoesNotExist(const std::string& name) {
614   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
615     gpr_log(GPR_INFO, "[XdsDependencyManager %p] Cluster does not exist: %s",
616             this, name.c_str());
617   }
618   if (xds_client_ == nullptr) return;
619   auto it = cluster_watchers_.find(name);
620   if (it == cluster_watchers_.end()) return;
621   it->second.update = absl::UnavailableError(
622       absl::StrCat("CDS resource ", name, " does not exist"));
623   MaybeReportUpdate();
624 }
625 
OnEndpointUpdate(const std::string & name,std::shared_ptr<const XdsEndpointResource> endpoint)626 void XdsDependencyManager::OnEndpointUpdate(
627     const std::string& name,
628     std::shared_ptr<const XdsEndpointResource> endpoint) {
629   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
630     gpr_log(GPR_INFO, "[XdsDependencyManager %p] received Endpoint update: %s",
631             this, name.c_str());
632   }
633   if (xds_client_ == nullptr) return;
634   auto it = endpoint_watchers_.find(name);
635   if (it == endpoint_watchers_.end()) return;
636   if (endpoint->priorities.empty()) {
637     it->second.update.resolution_note =
638         absl::StrCat("EDS resource ", name, " contains no localities");
639   } else {
640     std::set<absl::string_view> empty_localities;
641     for (const auto& priority : endpoint->priorities) {
642       for (const auto& p : priority.localities) {
643         if (p.second.endpoints.empty()) {
644           empty_localities.insert(
645               p.first->human_readable_string().as_string_view());
646         }
647       }
648     }
649     if (!empty_localities.empty()) {
650       it->second.update.resolution_note =
651           absl::StrCat("EDS resource ", name, " contains empty localities: [",
652                        absl::StrJoin(empty_localities, "; "), "]");
653     }
654   }
655   it->second.update.endpoints = std::move(endpoint);
656   MaybeReportUpdate();
657 }
658 
OnEndpointError(const std::string & name,absl::Status status)659 void XdsDependencyManager::OnEndpointError(const std::string& name,
660                                            absl::Status status) {
661   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
662     gpr_log(GPR_INFO,
663             "[XdsDependencyManager %p] received Endpoint error: %s %s", this,
664             name.c_str(), status.ToString().c_str());
665   }
666   if (xds_client_ == nullptr) return;
667   auto it = endpoint_watchers_.find(name);
668   if (it == endpoint_watchers_.end()) return;
669   if (it->second.update.endpoints == nullptr) {
670     it->second.update.resolution_note =
671         absl::StrCat("EDS resource ", name, ": ", status.ToString());
672     MaybeReportUpdate();
673   }
674 }
675 
OnEndpointDoesNotExist(const std::string & name)676 void XdsDependencyManager::OnEndpointDoesNotExist(const std::string& name) {
677   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
678     gpr_log(GPR_INFO, "[XdsDependencyManager %p] Endpoint does not exist: %s",
679             this, name.c_str());
680   }
681   if (xds_client_ == nullptr) return;
682   auto it = endpoint_watchers_.find(name);
683   if (it == endpoint_watchers_.end()) return;
684   it->second.update.endpoints.reset();
685   it->second.update.resolution_note =
686       absl::StrCat("EDS resource ", name, " does not exist");
687   MaybeReportUpdate();
688 }
689 
OnDnsResult(const std::string & dns_name,Resolver::Result result)690 void XdsDependencyManager::OnDnsResult(const std::string& dns_name,
691                                        Resolver::Result result) {
692   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
693     gpr_log(GPR_INFO, "[XdsDependencyManager %p] received DNS update: %s", this,
694             dns_name.c_str());
695   }
696   if (xds_client_ == nullptr) return;
697   auto it = dns_resolvers_.find(dns_name);
698   if (it == dns_resolvers_.end()) return;
699   PopulateDnsUpdate(dns_name, std::move(result), &it->second);
700   MaybeReportUpdate();
701 }
702 
PopulateDnsUpdate(const std::string & dns_name,Resolver::Result result,DnsState * dns_state)703 void XdsDependencyManager::PopulateDnsUpdate(const std::string& dns_name,
704                                              Resolver::Result result,
705                                              DnsState* dns_state) {
706   // Convert resolver result to EDS update.
707   XdsEndpointResource::Priority::Locality locality;
708   locality.name = MakeRefCounted<XdsLocalityName>("", "", "");
709   locality.lb_weight = 1;
710   if (result.addresses.ok()) {
711     locality.endpoints = std::move(*result.addresses);
712     dns_state->update.resolution_note = std::move(result.resolution_note);
713   } else if (result.resolution_note.empty()) {
714     dns_state->update.resolution_note =
715         absl::StrCat("DNS resolution failed for ", dns_name, ": ",
716                      result.addresses.status().ToString());
717   }
718   XdsEndpointResource::Priority priority;
719   priority.localities.emplace(locality.name.get(), std::move(locality));
720   auto resource = std::make_shared<XdsEndpointResource>();
721   resource->priorities.emplace_back(std::move(priority));
722   dns_state->update.endpoints = std::move(resource);
723 }
724 
PopulateClusterConfigMap(absl::string_view name,int depth,absl::flat_hash_map<std::string,absl::StatusOr<XdsConfig::ClusterConfig>> * cluster_config_map,std::set<absl::string_view> * eds_resources_seen,std::set<absl::string_view> * dns_names_seen,absl::StatusOr<std::vector<absl::string_view>> * leaf_clusters)725 bool XdsDependencyManager::PopulateClusterConfigMap(
726     absl::string_view name, int depth,
727     absl::flat_hash_map<std::string, absl::StatusOr<XdsConfig::ClusterConfig>>*
728         cluster_config_map,
729     std::set<absl::string_view>* eds_resources_seen,
730     std::set<absl::string_view>* dns_names_seen,
731     absl::StatusOr<std::vector<absl::string_view>>* leaf_clusters) {
732   if (depth > 0) GPR_ASSERT(leaf_clusters != nullptr);
733   if (depth == kMaxXdsAggregateClusterRecursionDepth) {
734     *leaf_clusters =
735         absl::UnavailableError("aggregate cluster graph exceeds max depth");
736     return true;
737   }
738   // Don't process the cluster again if we've already seen it in some
739   // other branch of the recursion tree.  We populate it with a non-OK
740   // status here, since we need an entry in the map to avoid incorrectly
741   // stopping the CDS watch, but we'll overwrite this below if we actually
742   // have the data for the cluster.
743   auto p = cluster_config_map->emplace(
744       name, absl::InternalError("cluster data not yet available"));
745   if (!p.second) return true;
746   auto& cluster_config = p.first->second;
747   auto& state = cluster_watchers_[name];
748   // Create a new watcher if needed.
749   if (state.watcher == nullptr) {
750     auto watcher = MakeRefCounted<ClusterWatcher>(Ref(), name);
751     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
752       gpr_log(GPR_INFO,
753               "[XdsDependencyManager %p] starting watch for cluster %s", this,
754               std::string(name).c_str());
755     }
756     state.watcher = watcher.get();
757     XdsClusterResourceType::StartWatch(xds_client_.get(), name,
758                                        std::move(watcher));
759     return false;
760   }
761   // If there was an error fetching the CDS resource, report the error.
762   if (!state.update.ok()) {
763     cluster_config = state.update.status();
764     return true;
765   }
766   // If we don't have the resource yet, we can't return a config yet.
767   if (*state.update == nullptr) return false;
768   // Populate endpoint info based on cluster type.
769   return Match(
770       (*state.update)->type,
771       // EDS cluster.
772       [&](const XdsClusterResource::Eds& eds) {
773         absl::string_view eds_resource_name =
774             eds.eds_service_name.empty() ? name : eds.eds_service_name;
775         eds_resources_seen->insert(eds_resource_name);
776         // Start EDS watch if needed.
777         auto& eds_state = endpoint_watchers_[eds_resource_name];
778         if (eds_state.watcher == nullptr) {
779           if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
780             gpr_log(GPR_INFO,
781                     "[XdsDependencyManager %p] starting watch for endpoint %s",
782                     this, std::string(eds_resource_name).c_str());
783           }
784           auto watcher =
785               MakeRefCounted<EndpointWatcher>(Ref(), eds_resource_name);
786           eds_state.watcher = watcher.get();
787           XdsEndpointResourceType::StartWatch(
788               xds_client_.get(), eds_resource_name, std::move(watcher));
789           return false;
790         }
791         // Check if EDS resource has been returned.
792         if (eds_state.update.endpoints == nullptr &&
793             eds_state.update.resolution_note.empty()) {
794           return false;
795         }
796         // Populate cluster config.
797         cluster_config.emplace(*state.update, eds_state.update.endpoints,
798                                eds_state.update.resolution_note);
799         if (leaf_clusters != nullptr) (*leaf_clusters)->push_back(name);
800         return true;
801       },
802       // LOGICAL_DNS cluster.
803       [&](const XdsClusterResource::LogicalDns& logical_dns) {
804         dns_names_seen->insert(logical_dns.hostname);
805         // Start DNS resolver if needed.
806         auto& dns_state = dns_resolvers_[logical_dns.hostname];
807         if (dns_state.resolver == nullptr) {
808           if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
809             gpr_log(GPR_INFO,
810                     "[XdsDependencyManager %p] starting DNS resolver for %s",
811                     this, logical_dns.hostname.c_str());
812           }
813           auto* fake_resolver_response_generator = args_.GetPointer<
814               FakeResolverResponseGenerator>(
815               GRPC_ARG_XDS_LOGICAL_DNS_CLUSTER_FAKE_RESOLVER_RESPONSE_GENERATOR);
816           ChannelArgs args = args_;
817           std::string target;
818           if (fake_resolver_response_generator != nullptr) {
819             target = absl::StrCat("fake:", logical_dns.hostname);
820             args = args.SetObject(fake_resolver_response_generator->Ref());
821           } else {
822             target = absl::StrCat("dns:", logical_dns.hostname);
823           }
824           dns_state.resolver =
825               CoreConfiguration::Get().resolver_registry().CreateResolver(
826                   target, args, interested_parties_, work_serializer_,
827                   std::make_unique<DnsResultHandler>(Ref(),
828                                                      logical_dns.hostname));
829           if (dns_state.resolver == nullptr) {
830             Resolver::Result result;
831             result.addresses.emplace();  // Empty list.
832             result.resolution_note = absl::StrCat(
833                 "failed to create DNS resolver for ", logical_dns.hostname);
834             PopulateDnsUpdate(logical_dns.hostname, std::move(result),
835                               &dns_state);
836           } else {
837             dns_state.resolver->StartLocked();
838             return false;
839           }
840         }
841         // Check if result has been returned.
842         if (dns_state.update.endpoints == nullptr &&
843             dns_state.update.resolution_note.empty()) {
844           return false;
845         }
846         // Populate cluster config.
847         cluster_config.emplace(*state.update, dns_state.update.endpoints,
848                                dns_state.update.resolution_note);
849         if (leaf_clusters != nullptr) (*leaf_clusters)->push_back(name);
850         return true;
851       },
852       // Aggregate cluster.  Recursively expand to child clusters.
853       [&](const XdsClusterResource::Aggregate& aggregate) {
854         // Grab a ref to the CDS resource for the aggregate cluster here,
855         // since our reference into cluster_watchers_ will be invalidated
856         // when we recursively call ourselves and add entries to the
857         // map for underlying clusters.
858         auto cluster_resource = *state.update;
859         // Recursively expand leaf clusters.
860         absl::StatusOr<std::vector<absl::string_view>> child_leaf_clusters;
861         child_leaf_clusters.emplace();
862         bool have_all_resources = true;
863         for (const std::string& child_name :
864              aggregate.prioritized_cluster_names) {
865           have_all_resources &= PopulateClusterConfigMap(
866               child_name, depth + 1, cluster_config_map, eds_resources_seen,
867               dns_names_seen, &child_leaf_clusters);
868           if (!child_leaf_clusters.ok()) break;
869         }
870         // Note that we cannot use the cluster_config reference we
871         // created above, because it may have been invalidated by map
872         // insertions when we recursively called ourselves, so we have
873         // to do the lookup in cluster_config_map again.
874         auto& aggregate_cluster_config = (*cluster_config_map)[name];
875         // If we exceeded max recursion depth, report an error for the
876         // cluster, and propagate the error up if needed.
877         if (!child_leaf_clusters.ok()) {
878           aggregate_cluster_config = child_leaf_clusters.status();
879           if (leaf_clusters != nullptr) {
880             *leaf_clusters = child_leaf_clusters.status();
881           }
882           return true;
883         }
884         // If needed, propagate leaf cluster list up the tree.
885         if (leaf_clusters != nullptr) {
886           (*leaf_clusters)
887               ->insert((*leaf_clusters)->end(), child_leaf_clusters->begin(),
888                        child_leaf_clusters->end());
889         }
890         // If there are no leaf clusters, report an error for the cluster.
891         if (have_all_resources && child_leaf_clusters->empty()) {
892           aggregate_cluster_config = absl::UnavailableError(
893               absl::StrCat("aggregate cluster dependency graph for ", name,
894                            " has no leaf clusters"));
895           return true;
896         }
897         // Populate cluster config.
898         // Note that we do this even for aggregate clusters that are not
899         // at the root of the tree, because we need to make sure the list
900         // of underlying cluster names stays alive so that the leaf cluster
901         // list of the root aggregate cluster can point to those strings.
902         aggregate_cluster_config.emplace(std::move(cluster_resource),
903                                          std::move(*child_leaf_clusters));
904         return have_all_resources;
905       });
906 }
907 
908 RefCountedPtr<XdsDependencyManager::ClusterSubscription>
GetClusterSubscription(absl::string_view cluster_name)909 XdsDependencyManager::GetClusterSubscription(absl::string_view cluster_name) {
910   auto it = cluster_subscriptions_.find(cluster_name);
911   if (it != cluster_subscriptions_.end()) {
912     auto subscription = it->second->RefIfNonZero();
913     if (subscription != nullptr) return subscription;
914   }
915   auto subscription = MakeRefCounted<ClusterSubscription>(cluster_name, Ref());
916   cluster_subscriptions_.emplace(subscription->cluster_name(),
917                                  subscription->WeakRef());
918   // If the cluster is not already subscribed to by virtue of being
919   // referenced in the route config, then trigger the CDS watch.
920   if (!clusters_from_route_config_.contains(cluster_name)) {
921     MaybeReportUpdate();
922   }
923   return subscription;
924 }
925 
OnClusterSubscriptionUnref(absl::string_view cluster_name,ClusterSubscription * subscription)926 void XdsDependencyManager::OnClusterSubscriptionUnref(
927     absl::string_view cluster_name, ClusterSubscription* subscription) {
928   auto it = cluster_subscriptions_.find(cluster_name);
929   // Shouldn't happen, but ignore if it does.
930   if (it == cluster_subscriptions_.end()) return;
931   // Do nothing if the subscription has already been replaced.
932   if (it->second != subscription) return;
933   // Remove the entry.
934   cluster_subscriptions_.erase(it);
935   // If this cluster is not already subscribed to by virtue of being
936   // referenced in the route config, then update watches and generate a
937   // new update.
938   if (!clusters_from_route_config_.contains(cluster_name)) {
939     MaybeReportUpdate();
940   }
941 }
942 
MaybeReportUpdate()943 void XdsDependencyManager::MaybeReportUpdate() {
944   // Populate Listener and RouteConfig fields.
945   if (current_virtual_host_ == nullptr) return;
946   auto config = MakeRefCounted<XdsConfig>();
947   config->listener = current_listener_;
948   config->route_config = current_route_config_;
949   config->virtual_host = current_virtual_host_;
950   // Determine the set of clusters we should be watching.
951   std::set<absl::string_view> clusters_to_watch;
952   for (const absl::string_view& cluster : clusters_from_route_config_) {
953     clusters_to_watch.insert(cluster);
954   }
955   for (const auto& p : cluster_subscriptions_) {
956     clusters_to_watch.insert(p.first);
957   }
958   // Populate Cluster map.
959   // We traverse the entire graph even if we don't yet have all of the
960   // resources we need to ensure that the right set of watches are active.
961   std::set<absl::string_view> eds_resources_seen;
962   std::set<absl::string_view> dns_names_seen;
963   bool have_all_resources = true;
964   for (const absl::string_view& cluster : clusters_to_watch) {
965     have_all_resources &= PopulateClusterConfigMap(
966         cluster, 0, &config->clusters, &eds_resources_seen, &dns_names_seen);
967   }
968   // Remove entries in cluster_watchers_ for any clusters not in
969   // config->clusters.
970   for (auto it = cluster_watchers_.begin(); it != cluster_watchers_.end();) {
971     const std::string& cluster_name = it->first;
972     if (config->clusters.contains(cluster_name)) {
973       ++it;
974       continue;
975     }
976     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
977       gpr_log(GPR_INFO,
978               "[XdsDependencyManager %p] cancelling watch for cluster %s", this,
979               cluster_name.c_str());
980     }
981     XdsClusterResourceType::CancelWatch(xds_client_.get(), cluster_name,
982                                         it->second.watcher,
983                                         /*delay_unsubscription=*/false);
984     cluster_watchers_.erase(it++);
985   }
986   // Remove entries in endpoint_watchers_ for any EDS resources not in
987   // eds_resources_seen.
988   for (auto it = endpoint_watchers_.begin(); it != endpoint_watchers_.end();) {
989     const std::string& eds_resource_name = it->first;
990     if (eds_resources_seen.find(eds_resource_name) !=
991         eds_resources_seen.end()) {
992       ++it;
993       continue;
994     }
995     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
996       gpr_log(GPR_INFO,
997               "[XdsDependencyManager %p] cancelling watch for EDS resource %s",
998               this, eds_resource_name.c_str());
999     }
1000     XdsEndpointResourceType::CancelWatch(xds_client_.get(), eds_resource_name,
1001                                          it->second.watcher,
1002                                          /*delay_unsubscription=*/false);
1003     endpoint_watchers_.erase(it++);
1004   }
1005   // Remove entries in dns_resolvers_ for any DNS name not in
1006   // eds_resources_seen.
1007   for (auto it = dns_resolvers_.begin(); it != dns_resolvers_.end();) {
1008     const std::string& dns_name = it->first;
1009     if (dns_names_seen.find(dns_name) != dns_names_seen.end()) {
1010       ++it;
1011       continue;
1012     }
1013     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
1014       gpr_log(GPR_INFO,
1015               "[XdsDependencyManager %p] shutting down DNS resolver for %s",
1016               this, dns_name.c_str());
1017     }
1018     dns_resolvers_.erase(it++);
1019   }
1020   // If we have all the data we need, then send an update.
1021   if (!have_all_resources) {
1022     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
1023       gpr_log(GPR_INFO,
1024               "[XdsDependencyManager %p] missing data -- NOT returning config",
1025               this);
1026     }
1027     return;
1028   }
1029   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
1030     gpr_log(GPR_INFO, "[XdsDependencyManager %p] returning config: %s", this,
1031             config->ToString().c_str());
1032   }
1033   watcher_->OnUpdate(std::move(config));
1034 }
1035 
1036 }  // namespace grpc_core
1037