1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include "src/core/ext/xds/xds_endpoint.h"
20
21 #include <stdlib.h>
22 #include <string.h>
23
24 #include <algorithm>
25 #include <limits>
26 #include <set>
27 #include <vector>
28
29 #include "absl/status/status.h"
30 #include "absl/status/statusor.h"
31 #include "absl/strings/str_cat.h"
32 #include "absl/strings/str_join.h"
33 #include "absl/types/optional.h"
34 #include "envoy/config/core/v3/address.upb.h"
35 #include "envoy/config/core/v3/base.upb.h"
36 #include "envoy/config/core/v3/health_check.upb.h"
37 #include "envoy/config/endpoint/v3/endpoint.upb.h"
38 #include "envoy/config/endpoint/v3/endpoint.upbdefs.h"
39 #include "envoy/config/endpoint/v3/endpoint_components.upb.h"
40 #include "envoy/type/v3/percent.upb.h"
41 #include "google/protobuf/wrappers.upb.h"
42 #include "upb/text/encode.h"
43
44 #include <grpc/support/log.h>
45
46 #include "src/core/ext/xds/upb_utils.h"
47 #include "src/core/ext/xds/xds_cluster.h"
48 #include "src/core/ext/xds/xds_health_status.h"
49 #include "src/core/ext/xds/xds_resource_type.h"
50 #include "src/core/lib/address_utils/parse_address.h"
51 #include "src/core/lib/address_utils/sockaddr_utils.h"
52 #include "src/core/lib/channel/channel_args.h"
53 #include "src/core/lib/debug/trace.h"
54 #include "src/core/lib/gprpp/validation_errors.h"
55 #include "src/core/lib/iomgr/resolved_address.h"
56
57 namespace grpc_core {
58
59 //
60 // XdsEndpointResource
61 //
62
ToString() const63 std::string XdsEndpointResource::Priority::Locality::ToString() const {
64 std::vector<std::string> endpoint_strings;
65 for (const ServerAddress& endpoint : endpoints) {
66 endpoint_strings.emplace_back(endpoint.ToString());
67 }
68 return absl::StrCat("{name=", name->AsHumanReadableString(),
69 ", lb_weight=", lb_weight, ", endpoints=[",
70 absl::StrJoin(endpoint_strings, ", "), "]}");
71 }
72
operator ==(const Priority & other) const73 bool XdsEndpointResource::Priority::operator==(const Priority& other) const {
74 if (localities.size() != other.localities.size()) return false;
75 auto it1 = localities.begin();
76 auto it2 = other.localities.begin();
77 while (it1 != localities.end()) {
78 if (*it1->first != *it2->first) return false;
79 if (it1->second != it2->second) return false;
80 ++it1;
81 ++it2;
82 }
83 return true;
84 }
85
ToString() const86 std::string XdsEndpointResource::Priority::ToString() const {
87 std::vector<std::string> locality_strings;
88 locality_strings.reserve(localities.size());
89 for (const auto& p : localities) {
90 locality_strings.emplace_back(p.second.ToString());
91 }
92 return absl::StrCat("[", absl::StrJoin(locality_strings, ", "), "]");
93 }
94
ShouldDrop(const std::string ** category_name)95 bool XdsEndpointResource::DropConfig::ShouldDrop(
96 const std::string** category_name) {
97 for (size_t i = 0; i < drop_category_list_.size(); ++i) {
98 const auto& drop_category = drop_category_list_[i];
99 // Generate a random number in [0, 1000000).
100 const uint32_t random = [&]() {
101 MutexLock lock(&mu_);
102 return absl::Uniform<uint32_t>(bit_gen_, 0, 1000000);
103 }();
104 if (random < drop_category.parts_per_million) {
105 *category_name = &drop_category.name;
106 return true;
107 }
108 }
109 return false;
110 }
111
ToString() const112 std::string XdsEndpointResource::DropConfig::ToString() const {
113 std::vector<std::string> category_strings;
114 for (const DropCategory& category : drop_category_list_) {
115 category_strings.emplace_back(
116 absl::StrCat(category.name, "=", category.parts_per_million));
117 }
118 return absl::StrCat("{[", absl::StrJoin(category_strings, ", "),
119 "], drop_all=", drop_all_, "}");
120 }
121
ToString() const122 std::string XdsEndpointResource::ToString() const {
123 std::vector<std::string> priority_strings;
124 for (size_t i = 0; i < priorities.size(); ++i) {
125 const Priority& priority = priorities[i];
126 priority_strings.emplace_back(
127 absl::StrCat("priority ", i, ": ", priority.ToString()));
128 }
129 return absl::StrCat("priorities=[", absl::StrJoin(priority_strings, ", "),
130 "], drop_config=", drop_config->ToString());
131 }
132
133 //
134 // XdsEndpointResourceType
135 //
136
137 namespace {
138
MaybeLogClusterLoadAssignment(const XdsResourceType::DecodeContext & context,const envoy_config_endpoint_v3_ClusterLoadAssignment * cla)139 void MaybeLogClusterLoadAssignment(
140 const XdsResourceType::DecodeContext& context,
141 const envoy_config_endpoint_v3_ClusterLoadAssignment* cla) {
142 if (GRPC_TRACE_FLAG_ENABLED(*context.tracer) &&
143 gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) {
144 const upb_MessageDef* msg_type =
145 envoy_config_endpoint_v3_ClusterLoadAssignment_getmsgdef(
146 context.symtab);
147 char buf[10240];
148 upb_TextEncode(cla, msg_type, nullptr, 0, buf, sizeof(buf));
149 gpr_log(GPR_DEBUG, "[xds_client %p] ClusterLoadAssignment: %s",
150 context.client, buf);
151 }
152 }
153
ServerAddressParse(const envoy_config_endpoint_v3_LbEndpoint * lb_endpoint,ValidationErrors * errors)154 absl::optional<ServerAddress> ServerAddressParse(
155 const envoy_config_endpoint_v3_LbEndpoint* lb_endpoint,
156 ValidationErrors* errors) {
157 // health_status
158 const int32_t health_status =
159 envoy_config_endpoint_v3_LbEndpoint_health_status(lb_endpoint);
160 if (!XdsOverrideHostEnabled() &&
161 health_status != envoy_config_core_v3_UNKNOWN &&
162 health_status != envoy_config_core_v3_HEALTHY) {
163 return absl::nullopt;
164 }
165 auto status = XdsHealthStatus::FromUpb(health_status);
166 if (!status.has_value()) return absl::nullopt;
167 // load_balancing_weight
168 uint32_t weight = 1;
169 {
170 ValidationErrors::ScopedField field(errors, ".load_balancing_weight");
171 const google_protobuf_UInt32Value* load_balancing_weight =
172 envoy_config_endpoint_v3_LbEndpoint_load_balancing_weight(lb_endpoint);
173 if (load_balancing_weight != nullptr) {
174 weight = google_protobuf_UInt32Value_value(load_balancing_weight);
175 if (weight == 0) {
176 errors->AddError("must be greater than 0");
177 }
178 }
179 }
180 // endpoint
181 grpc_resolved_address grpc_address;
182 {
183 ValidationErrors::ScopedField field(errors, ".endpoint");
184 const envoy_config_endpoint_v3_Endpoint* endpoint =
185 envoy_config_endpoint_v3_LbEndpoint_endpoint(lb_endpoint);
186 if (endpoint == nullptr) {
187 errors->AddError("field not present");
188 return absl::nullopt;
189 }
190 ValidationErrors::ScopedField field2(errors, ".address");
191 const envoy_config_core_v3_Address* address =
192 envoy_config_endpoint_v3_Endpoint_address(endpoint);
193 if (address == nullptr) {
194 errors->AddError("field not present");
195 return absl::nullopt;
196 }
197 ValidationErrors::ScopedField field3(errors, ".socket_address");
198 const envoy_config_core_v3_SocketAddress* socket_address =
199 envoy_config_core_v3_Address_socket_address(address);
200 if (socket_address == nullptr) {
201 errors->AddError("field not present");
202 return absl::nullopt;
203 }
204 std::string address_str = UpbStringToStdString(
205 envoy_config_core_v3_SocketAddress_address(socket_address));
206 uint32_t port;
207 {
208 ValidationErrors::ScopedField field(errors, ".port_value");
209 port = envoy_config_core_v3_SocketAddress_port_value(socket_address);
210 if (GPR_UNLIKELY(port >> 16) != 0) {
211 errors->AddError("invalid port");
212 return absl::nullopt;
213 }
214 }
215 auto addr = StringToSockaddr(address_str, port);
216 if (!addr.ok()) {
217 errors->AddError(addr.status().message());
218 } else {
219 grpc_address = *addr;
220 }
221 }
222 // Convert to ServerAddress.
223 std::map<const char*, std::unique_ptr<ServerAddress::AttributeInterface>>
224 attributes;
225 attributes[ServerAddressWeightAttribute::kServerAddressWeightAttributeKey] =
226 std::make_unique<ServerAddressWeightAttribute>(weight);
227 attributes[XdsEndpointHealthStatusAttribute::kKey] =
228 std::make_unique<XdsEndpointHealthStatusAttribute>(*status);
229 return ServerAddress(grpc_address, ChannelArgs(), std::move(attributes));
230 }
231
232 struct ParsedLocality {
233 size_t priority;
234 XdsEndpointResource::Priority::Locality locality;
235 };
236
237 struct ResolvedAddressLessThan {
operator ()grpc_core::__anon51bc21830211::ResolvedAddressLessThan238 bool operator()(const grpc_resolved_address& a1,
239 const grpc_resolved_address& a2) const {
240 if (a1.len != a2.len) return a1.len < a2.len;
241 return memcmp(a1.addr, a2.addr, a1.len) < 0;
242 }
243 };
244 using ResolvedAddressSet =
245 std::set<grpc_resolved_address, ResolvedAddressLessThan>;
246
LocalityParse(const envoy_config_endpoint_v3_LocalityLbEndpoints * locality_lb_endpoints,ResolvedAddressSet * address_set,ValidationErrors * errors)247 absl::optional<ParsedLocality> LocalityParse(
248 const envoy_config_endpoint_v3_LocalityLbEndpoints* locality_lb_endpoints,
249 ResolvedAddressSet* address_set, ValidationErrors* errors) {
250 const size_t original_error_size = errors->size();
251 ParsedLocality parsed_locality;
252 // load_balancing_weight
253 // If LB weight is not specified or 0, it means this locality is assigned
254 // no load.
255 const google_protobuf_UInt32Value* lb_weight =
256 envoy_config_endpoint_v3_LocalityLbEndpoints_load_balancing_weight(
257 locality_lb_endpoints);
258 parsed_locality.locality.lb_weight =
259 lb_weight != nullptr ? google_protobuf_UInt32Value_value(lb_weight) : 0;
260 if (parsed_locality.locality.lb_weight == 0) return absl::nullopt;
261 // locality
262 const envoy_config_core_v3_Locality* locality =
263 envoy_config_endpoint_v3_LocalityLbEndpoints_locality(
264 locality_lb_endpoints);
265 if (locality == nullptr) {
266 ValidationErrors::ScopedField field(errors, ".locality");
267 errors->AddError("field not present");
268 return absl::nullopt;
269 }
270 // region
271 std::string region =
272 UpbStringToStdString(envoy_config_core_v3_Locality_region(locality));
273 // zone
274 std::string zone =
275 UpbStringToStdString(envoy_config_core_v3_Locality_zone(locality));
276 // sub_zone
277 std::string sub_zone =
278 UpbStringToStdString(envoy_config_core_v3_Locality_sub_zone(locality));
279 parsed_locality.locality.name = MakeRefCounted<XdsLocalityName>(
280 std::move(region), std::move(zone), std::move(sub_zone));
281 // lb_endpoints
282 size_t size;
283 const envoy_config_endpoint_v3_LbEndpoint* const* lb_endpoints =
284 envoy_config_endpoint_v3_LocalityLbEndpoints_lb_endpoints(
285 locality_lb_endpoints, &size);
286 for (size_t i = 0; i < size; ++i) {
287 ValidationErrors::ScopedField field(errors,
288 absl::StrCat(".lb_endpoints[", i, "]"));
289 auto address = ServerAddressParse(lb_endpoints[i], errors);
290 if (address.has_value()) {
291 bool inserted = address_set->insert(address->address()).second;
292 if (!inserted) {
293 errors->AddError(absl::StrCat(
294 "duplicate endpoint address \"",
295 grpc_sockaddr_to_uri(&address->address()).value_or("<unknown>"),
296 "\""));
297 }
298 parsed_locality.locality.endpoints.push_back(std::move(*address));
299 }
300 }
301 // priority
302 parsed_locality.priority =
303 envoy_config_endpoint_v3_LocalityLbEndpoints_priority(
304 locality_lb_endpoints);
305 // Return result.
306 if (original_error_size != errors->size()) return absl::nullopt;
307 return parsed_locality;
308 }
309
DropParseAndAppend(const envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_DropOverload * drop_overload,XdsEndpointResource::DropConfig * drop_config,ValidationErrors * errors)310 void DropParseAndAppend(
311 const envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_DropOverload*
312 drop_overload,
313 XdsEndpointResource::DropConfig* drop_config, ValidationErrors* errors) {
314 // category
315 std::string category = UpbStringToStdString(
316 envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_DropOverload_category(
317 drop_overload));
318 if (category.empty()) {
319 ValidationErrors::ScopedField field(errors, ".category");
320 errors->AddError("empty drop category name");
321 }
322 // drop_percentage
323 uint32_t numerator;
324 {
325 ValidationErrors::ScopedField field(errors, ".drop_percentage");
326 const envoy_type_v3_FractionalPercent* drop_percentage =
327 envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_DropOverload_drop_percentage(
328 drop_overload);
329 if (drop_percentage == nullptr) {
330 errors->AddError("field not present");
331 return;
332 }
333 numerator = envoy_type_v3_FractionalPercent_numerator(drop_percentage);
334 {
335 ValidationErrors::ScopedField field(errors, ".denominator");
336 const int denominator =
337 envoy_type_v3_FractionalPercent_denominator(drop_percentage);
338 // Normalize to million.
339 switch (denominator) {
340 case envoy_type_v3_FractionalPercent_HUNDRED:
341 numerator *= 10000;
342 break;
343 case envoy_type_v3_FractionalPercent_TEN_THOUSAND:
344 numerator *= 100;
345 break;
346 case envoy_type_v3_FractionalPercent_MILLION:
347 break;
348 default:
349 errors->AddError("unknown denominator type");
350 }
351 }
352 // Cap numerator to 1000000.
353 numerator = std::min(numerator, 1000000u);
354 }
355 // Add category.
356 drop_config->AddCategory(std::move(category), numerator);
357 }
358
EdsResourceParse(const XdsResourceType::DecodeContext &,const envoy_config_endpoint_v3_ClusterLoadAssignment * cluster_load_assignment)359 absl::StatusOr<XdsEndpointResource> EdsResourceParse(
360 const XdsResourceType::DecodeContext& /*context*/,
361 const envoy_config_endpoint_v3_ClusterLoadAssignment*
362 cluster_load_assignment) {
363 ValidationErrors errors;
364 XdsEndpointResource eds_resource;
365 // endpoints
366 {
367 ValidationErrors::ScopedField field(&errors, "endpoints");
368 ResolvedAddressSet address_set;
369 size_t locality_size;
370 const envoy_config_endpoint_v3_LocalityLbEndpoints* const* endpoints =
371 envoy_config_endpoint_v3_ClusterLoadAssignment_endpoints(
372 cluster_load_assignment, &locality_size);
373 for (size_t i = 0; i < locality_size; ++i) {
374 ValidationErrors::ScopedField field(&errors, absl::StrCat("[", i, "]"));
375 auto parsed_locality = LocalityParse(endpoints[i], &address_set, &errors);
376 if (parsed_locality.has_value()) {
377 GPR_ASSERT(parsed_locality->locality.lb_weight != 0);
378 // Make sure prorities is big enough. Note that they might not
379 // arrive in priority order.
380 if (eds_resource.priorities.size() < parsed_locality->priority + 1) {
381 eds_resource.priorities.resize(parsed_locality->priority + 1);
382 }
383 auto& locality_map =
384 eds_resource.priorities[parsed_locality->priority].localities;
385 auto it = locality_map.find(parsed_locality->locality.name.get());
386 if (it != locality_map.end()) {
387 errors.AddError(absl::StrCat(
388 "duplicate locality ",
389 parsed_locality->locality.name->AsHumanReadableString(),
390 " found in priority ", parsed_locality->priority));
391 } else {
392 locality_map.emplace(parsed_locality->locality.name.get(),
393 std::move(parsed_locality->locality));
394 }
395 }
396 }
397 for (size_t i = 0; i < eds_resource.priorities.size(); ++i) {
398 const auto& priority = eds_resource.priorities[i];
399 if (priority.localities.empty()) {
400 errors.AddError(absl::StrCat("priority ", i, " empty"));
401 } else {
402 // Check that the sum of the locality weights in this priority
403 // does not exceed the max value for a uint32.
404 uint64_t total_weight = 0;
405 for (const auto& p : priority.localities) {
406 total_weight += p.second.lb_weight;
407 if (total_weight > std::numeric_limits<uint32_t>::max()) {
408 errors.AddError(
409 absl::StrCat("sum of locality weights for priority ", i,
410 " exceeds uint32 max"));
411 break;
412 }
413 }
414 }
415 }
416 }
417 // policy
418 eds_resource.drop_config = MakeRefCounted<XdsEndpointResource::DropConfig>();
419 const auto* policy = envoy_config_endpoint_v3_ClusterLoadAssignment_policy(
420 cluster_load_assignment);
421 if (policy != nullptr) {
422 ValidationErrors::ScopedField field(&errors, "policy");
423 size_t drop_size;
424 const auto* const* drop_overload =
425 envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_drop_overloads(
426 policy, &drop_size);
427 for (size_t i = 0; i < drop_size; ++i) {
428 ValidationErrors::ScopedField field(
429 &errors, absl::StrCat(".drop_overloads[", i, "]"));
430 DropParseAndAppend(drop_overload[i], eds_resource.drop_config.get(),
431 &errors);
432 }
433 }
434 // Return result.
435 if (!errors.ok()) {
436 return errors.status(absl::StatusCode::kInvalidArgument,
437 "errors parsing EDS resource");
438 }
439 return eds_resource;
440 }
441
442 } // namespace
443
Decode(const XdsResourceType::DecodeContext & context,absl::string_view serialized_resource) const444 XdsResourceType::DecodeResult XdsEndpointResourceType::Decode(
445 const XdsResourceType::DecodeContext& context,
446 absl::string_view serialized_resource) const {
447 DecodeResult result;
448 // Parse serialized proto.
449 auto* resource = envoy_config_endpoint_v3_ClusterLoadAssignment_parse(
450 serialized_resource.data(), serialized_resource.size(), context.arena);
451 if (resource == nullptr) {
452 result.resource = absl::InvalidArgumentError(
453 "Can't parse ClusterLoadAssignment resource.");
454 return result;
455 }
456 MaybeLogClusterLoadAssignment(context, resource);
457 // Validate resource.
458 result.name = UpbStringToStdString(
459 envoy_config_endpoint_v3_ClusterLoadAssignment_cluster_name(resource));
460 auto eds_resource = EdsResourceParse(context, resource);
461 if (!eds_resource.ok()) {
462 if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) {
463 gpr_log(GPR_ERROR, "[xds_client %p] invalid ClusterLoadAssignment %s: %s",
464 context.client, result.name->c_str(),
465 eds_resource.status().ToString().c_str());
466 }
467 result.resource = eds_resource.status();
468 } else {
469 if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) {
470 gpr_log(GPR_INFO, "[xds_client %p] parsed ClusterLoadAssignment %s: %s",
471 context.client, result.name->c_str(),
472 eds_resource->ToString().c_str());
473 }
474 result.resource =
475 std::make_unique<XdsEndpointResource>(std::move(*eds_resource));
476 }
477 return result;
478 }
479
480 } // namespace grpc_core
481