xref: /aosp_15_r20/external/grpc-grpc/test/cpp/end2end/xds/xds_server.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 // Copyright 2017 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include "test/cpp/end2end/xds/xds_server.h"
18 
19 #include <deque>
20 #include <set>
21 #include <string>
22 #include <thread>
23 #include <vector>
24 
25 #include "absl/types/optional.h"
26 
27 #include <grpc/support/log.h>
28 
29 #include "src/core/lib/address_utils/parse_address.h"
30 #include "src/core/lib/gprpp/crash.h"
31 #include "src/core/lib/gprpp/sync.h"
32 #include "src/proto/grpc/testing/xds/v3/ads.grpc.pb.h"
33 #include "src/proto/grpc/testing/xds/v3/discovery.grpc.pb.h"
34 #include "src/proto/grpc/testing/xds/v3/lrs.grpc.pb.h"
35 
36 namespace grpc {
37 namespace testing {
38 
39 //
40 // AdsServiceImpl
41 //
42 
SetResource(google::protobuf::Any resource,const std::string & type_url,const std::string & name)43 void AdsServiceImpl::SetResource(google::protobuf::Any resource,
44                                  const std::string& type_url,
45                                  const std::string& name) {
46   grpc_core::MutexLock lock(&ads_mu_);
47   ResourceTypeState& resource_type_state = resource_map_[type_url];
48   ++resource_type_state.resource_type_version;
49   ResourceState& resource_state = resource_type_state.resource_name_map[name];
50   resource_state.resource_type_version =
51       resource_type_state.resource_type_version;
52   resource_state.resource = std::move(resource);
53   gpr_log(GPR_INFO,
54           "ADS[%s]: Updating %s resource %s; resource_type_version now %u",
55           debug_label_.c_str(), type_url.c_str(), name.c_str(),
56           resource_type_state.resource_type_version);
57   for (SubscriptionState* subscription : resource_state.subscriptions) {
58     subscription->update_queue->emplace_back(type_url, name);
59   }
60 }
61 
UnsetResource(const std::string & type_url,const std::string & name)62 void AdsServiceImpl::UnsetResource(const std::string& type_url,
63                                    const std::string& name) {
64   grpc_core::MutexLock lock(&ads_mu_);
65   ResourceTypeState& resource_type_state = resource_map_[type_url];
66   ++resource_type_state.resource_type_version;
67   ResourceState& resource_state = resource_type_state.resource_name_map[name];
68   resource_state.resource_type_version =
69       resource_type_state.resource_type_version;
70   resource_state.resource.reset();
71   gpr_log(GPR_INFO,
72           "ADS[%s]: Unsetting %s resource %s; resource_type_version now %u",
73           debug_label_.c_str(), type_url.c_str(), name.c_str(),
74           resource_type_state.resource_type_version);
75   for (SubscriptionState* subscription : resource_state.subscriptions) {
76     subscription->update_queue->emplace_back(type_url, name);
77   }
78 }
79 
80 // Checks whether the client needs to receive a newer version of
81 // the resource.
ClientNeedsResourceUpdate(const ResourceTypeState & resource_type_state,const ResourceState & resource_state,int client_resource_type_version)82 bool AdsServiceImpl::ClientNeedsResourceUpdate(
83     const ResourceTypeState& resource_type_state,
84     const ResourceState& resource_state, int client_resource_type_version) {
85   return client_resource_type_version <
86              resource_type_state.resource_type_version &&
87          resource_state.resource_type_version <=
88              resource_type_state.resource_type_version;
89 }
90 
91 // Subscribes to a resource if not already subscribed:
92 // 1. Sets the update_queue field in subscription_state.
93 // 2. Adds subscription_state to resource_state->subscriptions.
MaybeSubscribe(const std::string & resource_type,const std::string & resource_name,SubscriptionState * subscription_state,ResourceState * resource_state,UpdateQueue * update_queue)94 bool AdsServiceImpl::MaybeSubscribe(const std::string& resource_type,
95                                     const std::string& resource_name,
96                                     SubscriptionState* subscription_state,
97                                     ResourceState* resource_state,
98                                     UpdateQueue* update_queue) {
99   // The update_queue will be null if we were not previously subscribed.
100   if (subscription_state->update_queue != nullptr) return false;
101   subscription_state->update_queue = update_queue;
102   resource_state->subscriptions.emplace(subscription_state);
103   gpr_log(GPR_INFO, "ADS[%s]: subscribe to resource type %s name %s state %p",
104           debug_label_.c_str(), resource_type.c_str(), resource_name.c_str(),
105           &subscription_state);
106   return true;
107 }
108 
109 // Removes subscriptions for resources no longer present in the
110 // current request.
ProcessUnsubscriptions(const std::string & resource_type,const std::set<std::string> & resources_in_current_request,SubscriptionNameMap * subscription_name_map,ResourceNameMap * resource_name_map)111 void AdsServiceImpl::ProcessUnsubscriptions(
112     const std::string& resource_type,
113     const std::set<std::string>& resources_in_current_request,
114     SubscriptionNameMap* subscription_name_map,
115     ResourceNameMap* resource_name_map) {
116   for (auto it = subscription_name_map->begin();
117        it != subscription_name_map->end();) {
118     const std::string& resource_name = it->first;
119     SubscriptionState& subscription_state = it->second;
120     if (resources_in_current_request.find(resource_name) !=
121         resources_in_current_request.end()) {
122       ++it;
123       continue;
124     }
125     gpr_log(GPR_INFO, "ADS[%s]: Unsubscribe to type=%s name=%s state=%p",
126             debug_label_.c_str(), resource_type.c_str(), resource_name.c_str(),
127             &subscription_state);
128     auto resource_it = resource_name_map->find(resource_name);
129     GPR_ASSERT(resource_it != resource_name_map->end());
130     auto& resource_state = resource_it->second;
131     resource_state.subscriptions.erase(&subscription_state);
132     if (resource_state.subscriptions.empty() &&
133         !resource_state.resource.has_value()) {
134       resource_name_map->erase(resource_it);
135     }
136     it = subscription_name_map->erase(it);
137   }
138 }
139 
Start()140 void AdsServiceImpl::Start() {
141   grpc_core::MutexLock lock(&ads_mu_);
142   ads_done_ = false;
143 }
144 
Shutdown()145 void AdsServiceImpl::Shutdown() {
146   {
147     grpc_core::MutexLock lock(&ads_mu_);
148     if (!ads_done_) {
149       ads_done_ = true;
150       ads_cond_.SignalAll();
151     }
152     resource_type_response_state_.clear();
153   }
154   gpr_log(GPR_INFO, "ADS[%s]: shut down", debug_label_.c_str());
155 }
156 
157 //
158 // LrsServiceImpl::ClientStats
159 //
160 
total_successful_requests() const161 uint64_t LrsServiceImpl::ClientStats::total_successful_requests() const {
162   uint64_t sum = 0;
163   for (auto& p : locality_stats_) {
164     sum += p.second.total_successful_requests;
165   }
166   return sum;
167 }
168 
total_requests_in_progress() const169 uint64_t LrsServiceImpl::ClientStats::total_requests_in_progress() const {
170   uint64_t sum = 0;
171   for (auto& p : locality_stats_) {
172     sum += p.second.total_requests_in_progress;
173   }
174   return sum;
175 }
176 
total_error_requests() const177 uint64_t LrsServiceImpl::ClientStats::total_error_requests() const {
178   uint64_t sum = 0;
179   for (auto& p : locality_stats_) {
180     sum += p.second.total_error_requests;
181   }
182   return sum;
183 }
184 
total_issued_requests() const185 uint64_t LrsServiceImpl::ClientStats::total_issued_requests() const {
186   uint64_t sum = 0;
187   for (auto& p : locality_stats_) {
188     sum += p.second.total_issued_requests;
189   }
190   return sum;
191 }
192 
dropped_requests(const std::string & category) const193 uint64_t LrsServiceImpl::ClientStats::dropped_requests(
194     const std::string& category) const {
195   auto iter = dropped_requests_.find(category);
196   GPR_ASSERT(iter != dropped_requests_.end());
197   return iter->second;
198 }
199 
operator +=(const ClientStats & other)200 LrsServiceImpl::ClientStats& LrsServiceImpl::ClientStats::operator+=(
201     const ClientStats& other) {
202   for (const auto& p : other.locality_stats_) {
203     locality_stats_[p.first] += p.second;
204   }
205   total_dropped_requests_ += other.total_dropped_requests_;
206   for (const auto& p : other.dropped_requests_) {
207     dropped_requests_[p.first] += p.second;
208   }
209   return *this;
210 }
211 
212 //
213 // LrsServiceImpl
214 //
215 
Start()216 void LrsServiceImpl::Start() {
217   {
218     grpc_core::MutexLock lock(&lrs_mu_);
219     lrs_done_ = false;
220   }
221   {
222     grpc_core::MutexLock lock(&load_report_mu_);
223     result_queue_.clear();
224   }
225 }
226 
Shutdown()227 void LrsServiceImpl::Shutdown() {
228   {
229     grpc_core::MutexLock lock(&lrs_mu_);
230     if (!lrs_done_) {
231       lrs_done_ = true;
232       lrs_cv_.SignalAll();
233     }
234   }
235   gpr_log(GPR_INFO, "LRS[%s]: shut down", debug_label_.c_str());
236 }
237 
WaitForLoadReport(absl::Duration timeout)238 std::vector<LrsServiceImpl::ClientStats> LrsServiceImpl::WaitForLoadReport(
239     absl::Duration timeout) {
240   timeout *= grpc_test_slowdown_factor();
241   grpc_core::MutexLock lock(&load_report_mu_);
242   grpc_core::CondVar cv;
243   if (result_queue_.empty()) {
244     load_report_cond_ = &cv;
245     while (result_queue_.empty()) {
246       if (cv.WaitWithTimeout(&load_report_mu_, timeout)) {
247         gpr_log(GPR_ERROR, "timed out waiting for load report");
248         return {};
249       }
250     }
251     load_report_cond_ = nullptr;
252   }
253   std::vector<ClientStats> result = std::move(result_queue_.front());
254   result_queue_.pop_front();
255   return result;
256 }
257 
258 }  // namespace testing
259 }  // namespace grpc
260