1 //
2 // Copyright 2017 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include "test/cpp/end2end/xds/xds_server.h"
18
19 #include <deque>
20 #include <set>
21 #include <string>
22 #include <thread>
23 #include <vector>
24
25 #include "absl/types/optional.h"
26
27 #include <grpc/support/log.h>
28
29 #include "src/core/lib/address_utils/parse_address.h"
30 #include "src/core/lib/gprpp/crash.h"
31 #include "src/core/lib/gprpp/sync.h"
32 #include "src/proto/grpc/testing/xds/v3/ads.grpc.pb.h"
33 #include "src/proto/grpc/testing/xds/v3/discovery.grpc.pb.h"
34 #include "src/proto/grpc/testing/xds/v3/lrs.grpc.pb.h"
35
36 namespace grpc {
37 namespace testing {
38
39 //
40 // AdsServiceImpl
41 //
42
SetResource(google::protobuf::Any resource,const std::string & type_url,const std::string & name)43 void AdsServiceImpl::SetResource(google::protobuf::Any resource,
44 const std::string& type_url,
45 const std::string& name) {
46 grpc_core::MutexLock lock(&ads_mu_);
47 ResourceTypeState& resource_type_state = resource_map_[type_url];
48 ++resource_type_state.resource_type_version;
49 ResourceState& resource_state = resource_type_state.resource_name_map[name];
50 resource_state.resource_type_version =
51 resource_type_state.resource_type_version;
52 resource_state.resource = std::move(resource);
53 gpr_log(GPR_INFO,
54 "ADS[%s]: Updating %s resource %s; resource_type_version now %u",
55 debug_label_.c_str(), type_url.c_str(), name.c_str(),
56 resource_type_state.resource_type_version);
57 for (SubscriptionState* subscription : resource_state.subscriptions) {
58 subscription->update_queue->emplace_back(type_url, name);
59 }
60 }
61
UnsetResource(const std::string & type_url,const std::string & name)62 void AdsServiceImpl::UnsetResource(const std::string& type_url,
63 const std::string& name) {
64 grpc_core::MutexLock lock(&ads_mu_);
65 ResourceTypeState& resource_type_state = resource_map_[type_url];
66 ++resource_type_state.resource_type_version;
67 ResourceState& resource_state = resource_type_state.resource_name_map[name];
68 resource_state.resource_type_version =
69 resource_type_state.resource_type_version;
70 resource_state.resource.reset();
71 gpr_log(GPR_INFO,
72 "ADS[%s]: Unsetting %s resource %s; resource_type_version now %u",
73 debug_label_.c_str(), type_url.c_str(), name.c_str(),
74 resource_type_state.resource_type_version);
75 for (SubscriptionState* subscription : resource_state.subscriptions) {
76 subscription->update_queue->emplace_back(type_url, name);
77 }
78 }
79
80 // Checks whether the client needs to receive a newer version of
81 // the resource.
ClientNeedsResourceUpdate(const ResourceTypeState & resource_type_state,const ResourceState & resource_state,int client_resource_type_version)82 bool AdsServiceImpl::ClientNeedsResourceUpdate(
83 const ResourceTypeState& resource_type_state,
84 const ResourceState& resource_state, int client_resource_type_version) {
85 return client_resource_type_version <
86 resource_type_state.resource_type_version &&
87 resource_state.resource_type_version <=
88 resource_type_state.resource_type_version;
89 }
90
91 // Subscribes to a resource if not already subscribed:
92 // 1. Sets the update_queue field in subscription_state.
93 // 2. Adds subscription_state to resource_state->subscriptions.
MaybeSubscribe(const std::string & resource_type,const std::string & resource_name,SubscriptionState * subscription_state,ResourceState * resource_state,UpdateQueue * update_queue)94 bool AdsServiceImpl::MaybeSubscribe(const std::string& resource_type,
95 const std::string& resource_name,
96 SubscriptionState* subscription_state,
97 ResourceState* resource_state,
98 UpdateQueue* update_queue) {
99 // The update_queue will be null if we were not previously subscribed.
100 if (subscription_state->update_queue != nullptr) return false;
101 subscription_state->update_queue = update_queue;
102 resource_state->subscriptions.emplace(subscription_state);
103 gpr_log(GPR_INFO, "ADS[%s]: subscribe to resource type %s name %s state %p",
104 debug_label_.c_str(), resource_type.c_str(), resource_name.c_str(),
105 &subscription_state);
106 return true;
107 }
108
109 // Removes subscriptions for resources no longer present in the
110 // current request.
ProcessUnsubscriptions(const std::string & resource_type,const std::set<std::string> & resources_in_current_request,SubscriptionNameMap * subscription_name_map,ResourceNameMap * resource_name_map)111 void AdsServiceImpl::ProcessUnsubscriptions(
112 const std::string& resource_type,
113 const std::set<std::string>& resources_in_current_request,
114 SubscriptionNameMap* subscription_name_map,
115 ResourceNameMap* resource_name_map) {
116 for (auto it = subscription_name_map->begin();
117 it != subscription_name_map->end();) {
118 const std::string& resource_name = it->first;
119 SubscriptionState& subscription_state = it->second;
120 if (resources_in_current_request.find(resource_name) !=
121 resources_in_current_request.end()) {
122 ++it;
123 continue;
124 }
125 gpr_log(GPR_INFO, "ADS[%s]: Unsubscribe to type=%s name=%s state=%p",
126 debug_label_.c_str(), resource_type.c_str(), resource_name.c_str(),
127 &subscription_state);
128 auto resource_it = resource_name_map->find(resource_name);
129 GPR_ASSERT(resource_it != resource_name_map->end());
130 auto& resource_state = resource_it->second;
131 resource_state.subscriptions.erase(&subscription_state);
132 if (resource_state.subscriptions.empty() &&
133 !resource_state.resource.has_value()) {
134 resource_name_map->erase(resource_it);
135 }
136 it = subscription_name_map->erase(it);
137 }
138 }
139
Start()140 void AdsServiceImpl::Start() {
141 grpc_core::MutexLock lock(&ads_mu_);
142 ads_done_ = false;
143 }
144
Shutdown()145 void AdsServiceImpl::Shutdown() {
146 {
147 grpc_core::MutexLock lock(&ads_mu_);
148 if (!ads_done_) {
149 ads_done_ = true;
150 ads_cond_.SignalAll();
151 }
152 resource_type_response_state_.clear();
153 }
154 gpr_log(GPR_INFO, "ADS[%s]: shut down", debug_label_.c_str());
155 }
156
157 //
158 // LrsServiceImpl::ClientStats
159 //
160
total_successful_requests() const161 uint64_t LrsServiceImpl::ClientStats::total_successful_requests() const {
162 uint64_t sum = 0;
163 for (auto& p : locality_stats_) {
164 sum += p.second.total_successful_requests;
165 }
166 return sum;
167 }
168
total_requests_in_progress() const169 uint64_t LrsServiceImpl::ClientStats::total_requests_in_progress() const {
170 uint64_t sum = 0;
171 for (auto& p : locality_stats_) {
172 sum += p.second.total_requests_in_progress;
173 }
174 return sum;
175 }
176
total_error_requests() const177 uint64_t LrsServiceImpl::ClientStats::total_error_requests() const {
178 uint64_t sum = 0;
179 for (auto& p : locality_stats_) {
180 sum += p.second.total_error_requests;
181 }
182 return sum;
183 }
184
total_issued_requests() const185 uint64_t LrsServiceImpl::ClientStats::total_issued_requests() const {
186 uint64_t sum = 0;
187 for (auto& p : locality_stats_) {
188 sum += p.second.total_issued_requests;
189 }
190 return sum;
191 }
192
dropped_requests(const std::string & category) const193 uint64_t LrsServiceImpl::ClientStats::dropped_requests(
194 const std::string& category) const {
195 auto iter = dropped_requests_.find(category);
196 GPR_ASSERT(iter != dropped_requests_.end());
197 return iter->second;
198 }
199
operator +=(const ClientStats & other)200 LrsServiceImpl::ClientStats& LrsServiceImpl::ClientStats::operator+=(
201 const ClientStats& other) {
202 for (const auto& p : other.locality_stats_) {
203 locality_stats_[p.first] += p.second;
204 }
205 total_dropped_requests_ += other.total_dropped_requests_;
206 for (const auto& p : other.dropped_requests_) {
207 dropped_requests_[p.first] += p.second;
208 }
209 return *this;
210 }
211
212 //
213 // LrsServiceImpl
214 //
215
Start()216 void LrsServiceImpl::Start() {
217 {
218 grpc_core::MutexLock lock(&lrs_mu_);
219 lrs_done_ = false;
220 }
221 {
222 grpc_core::MutexLock lock(&load_report_mu_);
223 result_queue_.clear();
224 }
225 }
226
Shutdown()227 void LrsServiceImpl::Shutdown() {
228 {
229 grpc_core::MutexLock lock(&lrs_mu_);
230 if (!lrs_done_) {
231 lrs_done_ = true;
232 lrs_cv_.SignalAll();
233 }
234 }
235 gpr_log(GPR_INFO, "LRS[%s]: shut down", debug_label_.c_str());
236 }
237
WaitForLoadReport(absl::Duration timeout)238 std::vector<LrsServiceImpl::ClientStats> LrsServiceImpl::WaitForLoadReport(
239 absl::Duration timeout) {
240 timeout *= grpc_test_slowdown_factor();
241 grpc_core::MutexLock lock(&load_report_mu_);
242 grpc_core::CondVar cv;
243 if (result_queue_.empty()) {
244 load_report_cond_ = &cv;
245 while (result_queue_.empty()) {
246 if (cv.WaitWithTimeout(&load_report_mu_, timeout)) {
247 gpr_log(GPR_ERROR, "timed out waiting for load report");
248 return {};
249 }
250 }
251 load_report_cond_ = nullptr;
252 }
253 std::vector<ClientStats> result = std::move(result_queue_.front());
254 result_queue_.pop_front();
255 return result;
256 }
257
258 } // namespace testing
259 } // namespace grpc
260