1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include "src/core/ext/xds/xds_endpoint.h"
20 
21 #include <stdlib.h>
22 #include <string.h>
23 
24 #include <algorithm>
25 #include <limits>
26 #include <set>
27 #include <vector>
28 
29 #include "absl/status/status.h"
30 #include "absl/status/statusor.h"
31 #include "absl/strings/str_cat.h"
32 #include "absl/strings/str_join.h"
33 #include "absl/types/optional.h"
34 #include "envoy/config/core/v3/address.upb.h"
35 #include "envoy/config/core/v3/base.upb.h"
36 #include "envoy/config/core/v3/health_check.upb.h"
37 #include "envoy/config/endpoint/v3/endpoint.upb.h"
38 #include "envoy/config/endpoint/v3/endpoint.upbdefs.h"
39 #include "envoy/config/endpoint/v3/endpoint_components.upb.h"
40 #include "envoy/type/v3/percent.upb.h"
41 #include "google/protobuf/wrappers.upb.h"
42 #include "upb/text/encode.h"
43 
44 #include <grpc/support/log.h>
45 
46 #include "src/core/ext/xds/upb_utils.h"
47 #include "src/core/ext/xds/xds_cluster.h"
48 #include "src/core/ext/xds/xds_health_status.h"
49 #include "src/core/ext/xds/xds_resource_type.h"
50 #include "src/core/lib/address_utils/parse_address.h"
51 #include "src/core/lib/address_utils/sockaddr_utils.h"
52 #include "src/core/lib/channel/channel_args.h"
53 #include "src/core/lib/debug/trace.h"
54 #include "src/core/lib/gprpp/validation_errors.h"
55 #include "src/core/lib/iomgr/resolved_address.h"
56 
57 namespace grpc_core {
58 
59 //
60 // XdsEndpointResource
61 //
62 
ToString() const63 std::string XdsEndpointResource::Priority::Locality::ToString() const {
64   std::vector<std::string> endpoint_strings;
65   for (const ServerAddress& endpoint : endpoints) {
66     endpoint_strings.emplace_back(endpoint.ToString());
67   }
68   return absl::StrCat("{name=", name->AsHumanReadableString(),
69                       ", lb_weight=", lb_weight, ", endpoints=[",
70                       absl::StrJoin(endpoint_strings, ", "), "]}");
71 }
72 
operator ==(const Priority & other) const73 bool XdsEndpointResource::Priority::operator==(const Priority& other) const {
74   if (localities.size() != other.localities.size()) return false;
75   auto it1 = localities.begin();
76   auto it2 = other.localities.begin();
77   while (it1 != localities.end()) {
78     if (*it1->first != *it2->first) return false;
79     if (it1->second != it2->second) return false;
80     ++it1;
81     ++it2;
82   }
83   return true;
84 }
85 
ToString() const86 std::string XdsEndpointResource::Priority::ToString() const {
87   std::vector<std::string> locality_strings;
88   locality_strings.reserve(localities.size());
89   for (const auto& p : localities) {
90     locality_strings.emplace_back(p.second.ToString());
91   }
92   return absl::StrCat("[", absl::StrJoin(locality_strings, ", "), "]");
93 }
94 
ShouldDrop(const std::string ** category_name)95 bool XdsEndpointResource::DropConfig::ShouldDrop(
96     const std::string** category_name) {
97   for (size_t i = 0; i < drop_category_list_.size(); ++i) {
98     const auto& drop_category = drop_category_list_[i];
99     // Generate a random number in [0, 1000000).
100     const uint32_t random = [&]() {
101       MutexLock lock(&mu_);
102       return absl::Uniform<uint32_t>(bit_gen_, 0, 1000000);
103     }();
104     if (random < drop_category.parts_per_million) {
105       *category_name = &drop_category.name;
106       return true;
107     }
108   }
109   return false;
110 }
111 
ToString() const112 std::string XdsEndpointResource::DropConfig::ToString() const {
113   std::vector<std::string> category_strings;
114   for (const DropCategory& category : drop_category_list_) {
115     category_strings.emplace_back(
116         absl::StrCat(category.name, "=", category.parts_per_million));
117   }
118   return absl::StrCat("{[", absl::StrJoin(category_strings, ", "),
119                       "], drop_all=", drop_all_, "}");
120 }
121 
ToString() const122 std::string XdsEndpointResource::ToString() const {
123   std::vector<std::string> priority_strings;
124   for (size_t i = 0; i < priorities.size(); ++i) {
125     const Priority& priority = priorities[i];
126     priority_strings.emplace_back(
127         absl::StrCat("priority ", i, ": ", priority.ToString()));
128   }
129   return absl::StrCat("priorities=[", absl::StrJoin(priority_strings, ", "),
130                       "], drop_config=", drop_config->ToString());
131 }
132 
133 //
134 // XdsEndpointResourceType
135 //
136 
137 namespace {
138 
MaybeLogClusterLoadAssignment(const XdsResourceType::DecodeContext & context,const envoy_config_endpoint_v3_ClusterLoadAssignment * cla)139 void MaybeLogClusterLoadAssignment(
140     const XdsResourceType::DecodeContext& context,
141     const envoy_config_endpoint_v3_ClusterLoadAssignment* cla) {
142   if (GRPC_TRACE_FLAG_ENABLED(*context.tracer) &&
143       gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) {
144     const upb_MessageDef* msg_type =
145         envoy_config_endpoint_v3_ClusterLoadAssignment_getmsgdef(
146             context.symtab);
147     char buf[10240];
148     upb_TextEncode(cla, msg_type, nullptr, 0, buf, sizeof(buf));
149     gpr_log(GPR_DEBUG, "[xds_client %p] ClusterLoadAssignment: %s",
150             context.client, buf);
151   }
152 }
153 
ServerAddressParse(const envoy_config_endpoint_v3_LbEndpoint * lb_endpoint,ValidationErrors * errors)154 absl::optional<ServerAddress> ServerAddressParse(
155     const envoy_config_endpoint_v3_LbEndpoint* lb_endpoint,
156     ValidationErrors* errors) {
157   // health_status
158   const int32_t health_status =
159       envoy_config_endpoint_v3_LbEndpoint_health_status(lb_endpoint);
160   if (!XdsOverrideHostEnabled() &&
161       health_status != envoy_config_core_v3_UNKNOWN &&
162       health_status != envoy_config_core_v3_HEALTHY) {
163     return absl::nullopt;
164   }
165   auto status = XdsHealthStatus::FromUpb(health_status);
166   if (!status.has_value()) return absl::nullopt;
167   // load_balancing_weight
168   uint32_t weight = 1;
169   {
170     ValidationErrors::ScopedField field(errors, ".load_balancing_weight");
171     const google_protobuf_UInt32Value* load_balancing_weight =
172         envoy_config_endpoint_v3_LbEndpoint_load_balancing_weight(lb_endpoint);
173     if (load_balancing_weight != nullptr) {
174       weight = google_protobuf_UInt32Value_value(load_balancing_weight);
175       if (weight == 0) {
176         errors->AddError("must be greater than 0");
177       }
178     }
179   }
180   // endpoint
181   grpc_resolved_address grpc_address;
182   {
183     ValidationErrors::ScopedField field(errors, ".endpoint");
184     const envoy_config_endpoint_v3_Endpoint* endpoint =
185         envoy_config_endpoint_v3_LbEndpoint_endpoint(lb_endpoint);
186     if (endpoint == nullptr) {
187       errors->AddError("field not present");
188       return absl::nullopt;
189     }
190     ValidationErrors::ScopedField field2(errors, ".address");
191     const envoy_config_core_v3_Address* address =
192         envoy_config_endpoint_v3_Endpoint_address(endpoint);
193     if (address == nullptr) {
194       errors->AddError("field not present");
195       return absl::nullopt;
196     }
197     ValidationErrors::ScopedField field3(errors, ".socket_address");
198     const envoy_config_core_v3_SocketAddress* socket_address =
199         envoy_config_core_v3_Address_socket_address(address);
200     if (socket_address == nullptr) {
201       errors->AddError("field not present");
202       return absl::nullopt;
203     }
204     std::string address_str = UpbStringToStdString(
205         envoy_config_core_v3_SocketAddress_address(socket_address));
206     uint32_t port;
207     {
208       ValidationErrors::ScopedField field(errors, ".port_value");
209       port = envoy_config_core_v3_SocketAddress_port_value(socket_address);
210       if (GPR_UNLIKELY(port >> 16) != 0) {
211         errors->AddError("invalid port");
212         return absl::nullopt;
213       }
214     }
215     auto addr = StringToSockaddr(address_str, port);
216     if (!addr.ok()) {
217       errors->AddError(addr.status().message());
218     } else {
219       grpc_address = *addr;
220     }
221   }
222   // Convert to ServerAddress.
223   std::map<const char*, std::unique_ptr<ServerAddress::AttributeInterface>>
224       attributes;
225   attributes[ServerAddressWeightAttribute::kServerAddressWeightAttributeKey] =
226       std::make_unique<ServerAddressWeightAttribute>(weight);
227   attributes[XdsEndpointHealthStatusAttribute::kKey] =
228       std::make_unique<XdsEndpointHealthStatusAttribute>(*status);
229   return ServerAddress(grpc_address, ChannelArgs(), std::move(attributes));
230 }
231 
232 struct ParsedLocality {
233   size_t priority;
234   XdsEndpointResource::Priority::Locality locality;
235 };
236 
237 struct ResolvedAddressLessThan {
operator ()grpc_core::__anon51bc21830211::ResolvedAddressLessThan238   bool operator()(const grpc_resolved_address& a1,
239                   const grpc_resolved_address& a2) const {
240     if (a1.len != a2.len) return a1.len < a2.len;
241     return memcmp(a1.addr, a2.addr, a1.len) < 0;
242   }
243 };
244 using ResolvedAddressSet =
245     std::set<grpc_resolved_address, ResolvedAddressLessThan>;
246 
LocalityParse(const envoy_config_endpoint_v3_LocalityLbEndpoints * locality_lb_endpoints,ResolvedAddressSet * address_set,ValidationErrors * errors)247 absl::optional<ParsedLocality> LocalityParse(
248     const envoy_config_endpoint_v3_LocalityLbEndpoints* locality_lb_endpoints,
249     ResolvedAddressSet* address_set, ValidationErrors* errors) {
250   const size_t original_error_size = errors->size();
251   ParsedLocality parsed_locality;
252   // load_balancing_weight
253   // If LB weight is not specified or 0, it means this locality is assigned
254   // no load.
255   const google_protobuf_UInt32Value* lb_weight =
256       envoy_config_endpoint_v3_LocalityLbEndpoints_load_balancing_weight(
257           locality_lb_endpoints);
258   parsed_locality.locality.lb_weight =
259       lb_weight != nullptr ? google_protobuf_UInt32Value_value(lb_weight) : 0;
260   if (parsed_locality.locality.lb_weight == 0) return absl::nullopt;
261   // locality
262   const envoy_config_core_v3_Locality* locality =
263       envoy_config_endpoint_v3_LocalityLbEndpoints_locality(
264           locality_lb_endpoints);
265   if (locality == nullptr) {
266     ValidationErrors::ScopedField field(errors, ".locality");
267     errors->AddError("field not present");
268     return absl::nullopt;
269   }
270   // region
271   std::string region =
272       UpbStringToStdString(envoy_config_core_v3_Locality_region(locality));
273   // zone
274   std::string zone =
275       UpbStringToStdString(envoy_config_core_v3_Locality_zone(locality));
276   // sub_zone
277   std::string sub_zone =
278       UpbStringToStdString(envoy_config_core_v3_Locality_sub_zone(locality));
279   parsed_locality.locality.name = MakeRefCounted<XdsLocalityName>(
280       std::move(region), std::move(zone), std::move(sub_zone));
281   // lb_endpoints
282   size_t size;
283   const envoy_config_endpoint_v3_LbEndpoint* const* lb_endpoints =
284       envoy_config_endpoint_v3_LocalityLbEndpoints_lb_endpoints(
285           locality_lb_endpoints, &size);
286   for (size_t i = 0; i < size; ++i) {
287     ValidationErrors::ScopedField field(errors,
288                                         absl::StrCat(".lb_endpoints[", i, "]"));
289     auto address = ServerAddressParse(lb_endpoints[i], errors);
290     if (address.has_value()) {
291       bool inserted = address_set->insert(address->address()).second;
292       if (!inserted) {
293         errors->AddError(absl::StrCat(
294             "duplicate endpoint address \"",
295             grpc_sockaddr_to_uri(&address->address()).value_or("<unknown>"),
296             "\""));
297       }
298       parsed_locality.locality.endpoints.push_back(std::move(*address));
299     }
300   }
301   // priority
302   parsed_locality.priority =
303       envoy_config_endpoint_v3_LocalityLbEndpoints_priority(
304           locality_lb_endpoints);
305   // Return result.
306   if (original_error_size != errors->size()) return absl::nullopt;
307   return parsed_locality;
308 }
309 
DropParseAndAppend(const envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_DropOverload * drop_overload,XdsEndpointResource::DropConfig * drop_config,ValidationErrors * errors)310 void DropParseAndAppend(
311     const envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_DropOverload*
312         drop_overload,
313     XdsEndpointResource::DropConfig* drop_config, ValidationErrors* errors) {
314   // category
315   std::string category = UpbStringToStdString(
316       envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_DropOverload_category(
317           drop_overload));
318   if (category.empty()) {
319     ValidationErrors::ScopedField field(errors, ".category");
320     errors->AddError("empty drop category name");
321   }
322   // drop_percentage
323   uint32_t numerator;
324   {
325     ValidationErrors::ScopedField field(errors, ".drop_percentage");
326     const envoy_type_v3_FractionalPercent* drop_percentage =
327         envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_DropOverload_drop_percentage(
328             drop_overload);
329     if (drop_percentage == nullptr) {
330       errors->AddError("field not present");
331       return;
332     }
333     numerator = envoy_type_v3_FractionalPercent_numerator(drop_percentage);
334     {
335       ValidationErrors::ScopedField field(errors, ".denominator");
336       const int denominator =
337           envoy_type_v3_FractionalPercent_denominator(drop_percentage);
338       // Normalize to million.
339       switch (denominator) {
340         case envoy_type_v3_FractionalPercent_HUNDRED:
341           numerator *= 10000;
342           break;
343         case envoy_type_v3_FractionalPercent_TEN_THOUSAND:
344           numerator *= 100;
345           break;
346         case envoy_type_v3_FractionalPercent_MILLION:
347           break;
348         default:
349           errors->AddError("unknown denominator type");
350       }
351     }
352     // Cap numerator to 1000000.
353     numerator = std::min(numerator, 1000000u);
354   }
355   // Add category.
356   drop_config->AddCategory(std::move(category), numerator);
357 }
358 
EdsResourceParse(const XdsResourceType::DecodeContext &,const envoy_config_endpoint_v3_ClusterLoadAssignment * cluster_load_assignment)359 absl::StatusOr<XdsEndpointResource> EdsResourceParse(
360     const XdsResourceType::DecodeContext& /*context*/,
361     const envoy_config_endpoint_v3_ClusterLoadAssignment*
362         cluster_load_assignment) {
363   ValidationErrors errors;
364   XdsEndpointResource eds_resource;
365   // endpoints
366   {
367     ValidationErrors::ScopedField field(&errors, "endpoints");
368     ResolvedAddressSet address_set;
369     size_t locality_size;
370     const envoy_config_endpoint_v3_LocalityLbEndpoints* const* endpoints =
371         envoy_config_endpoint_v3_ClusterLoadAssignment_endpoints(
372             cluster_load_assignment, &locality_size);
373     for (size_t i = 0; i < locality_size; ++i) {
374       ValidationErrors::ScopedField field(&errors, absl::StrCat("[", i, "]"));
375       auto parsed_locality = LocalityParse(endpoints[i], &address_set, &errors);
376       if (parsed_locality.has_value()) {
377         GPR_ASSERT(parsed_locality->locality.lb_weight != 0);
378         // Make sure prorities is big enough. Note that they might not
379         // arrive in priority order.
380         if (eds_resource.priorities.size() < parsed_locality->priority + 1) {
381           eds_resource.priorities.resize(parsed_locality->priority + 1);
382         }
383         auto& locality_map =
384             eds_resource.priorities[parsed_locality->priority].localities;
385         auto it = locality_map.find(parsed_locality->locality.name.get());
386         if (it != locality_map.end()) {
387           errors.AddError(absl::StrCat(
388               "duplicate locality ",
389               parsed_locality->locality.name->AsHumanReadableString(),
390               " found in priority ", parsed_locality->priority));
391         } else {
392           locality_map.emplace(parsed_locality->locality.name.get(),
393                                std::move(parsed_locality->locality));
394         }
395       }
396     }
397     for (size_t i = 0; i < eds_resource.priorities.size(); ++i) {
398       const auto& priority = eds_resource.priorities[i];
399       if (priority.localities.empty()) {
400         errors.AddError(absl::StrCat("priority ", i, " empty"));
401       } else {
402         // Check that the sum of the locality weights in this priority
403         // does not exceed the max value for a uint32.
404         uint64_t total_weight = 0;
405         for (const auto& p : priority.localities) {
406           total_weight += p.second.lb_weight;
407           if (total_weight > std::numeric_limits<uint32_t>::max()) {
408             errors.AddError(
409                 absl::StrCat("sum of locality weights for priority ", i,
410                              " exceeds uint32 max"));
411             break;
412           }
413         }
414       }
415     }
416   }
417   // policy
418   eds_resource.drop_config = MakeRefCounted<XdsEndpointResource::DropConfig>();
419   const auto* policy = envoy_config_endpoint_v3_ClusterLoadAssignment_policy(
420       cluster_load_assignment);
421   if (policy != nullptr) {
422     ValidationErrors::ScopedField field(&errors, "policy");
423     size_t drop_size;
424     const auto* const* drop_overload =
425         envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_drop_overloads(
426             policy, &drop_size);
427     for (size_t i = 0; i < drop_size; ++i) {
428       ValidationErrors::ScopedField field(
429           &errors, absl::StrCat(".drop_overloads[", i, "]"));
430       DropParseAndAppend(drop_overload[i], eds_resource.drop_config.get(),
431                          &errors);
432     }
433   }
434   // Return result.
435   if (!errors.ok()) {
436     return errors.status(absl::StatusCode::kInvalidArgument,
437                          "errors parsing EDS resource");
438   }
439   return eds_resource;
440 }
441 
442 }  // namespace
443 
Decode(const XdsResourceType::DecodeContext & context,absl::string_view serialized_resource) const444 XdsResourceType::DecodeResult XdsEndpointResourceType::Decode(
445     const XdsResourceType::DecodeContext& context,
446     absl::string_view serialized_resource) const {
447   DecodeResult result;
448   // Parse serialized proto.
449   auto* resource = envoy_config_endpoint_v3_ClusterLoadAssignment_parse(
450       serialized_resource.data(), serialized_resource.size(), context.arena);
451   if (resource == nullptr) {
452     result.resource = absl::InvalidArgumentError(
453         "Can't parse ClusterLoadAssignment resource.");
454     return result;
455   }
456   MaybeLogClusterLoadAssignment(context, resource);
457   // Validate resource.
458   result.name = UpbStringToStdString(
459       envoy_config_endpoint_v3_ClusterLoadAssignment_cluster_name(resource));
460   auto eds_resource = EdsResourceParse(context, resource);
461   if (!eds_resource.ok()) {
462     if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) {
463       gpr_log(GPR_ERROR, "[xds_client %p] invalid ClusterLoadAssignment %s: %s",
464               context.client, result.name->c_str(),
465               eds_resource.status().ToString().c_str());
466     }
467     result.resource = eds_resource.status();
468   } else {
469     if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) {
470       gpr_log(GPR_INFO, "[xds_client %p] parsed ClusterLoadAssignment %s: %s",
471               context.client, result.name->c_str(),
472               eds_resource->ToString().c_str());
473     }
474     result.resource =
475         std::make_unique<XdsEndpointResource>(std::move(*eds_resource));
476   }
477   return result;
478 }
479 
480 }  // namespace grpc_core
481