xref: /aosp_15_r20/external/grpc-grpc/src/core/load_balancing/xds/xds_wrr_locality.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include <cstdint>
20 #include <map>
21 #include <memory>
22 #include <string>
23 #include <utility>
24 
25 #include "absl/status/status.h"
26 #include "absl/status/statusor.h"
27 #include "absl/strings/str_cat.h"
28 #include "absl/strings/string_view.h"
29 #include "absl/types/optional.h"
30 
31 #include <grpc/impl/connectivity_state.h>
32 #include <grpc/support/json.h>
33 #include <grpc/support/log.h>
34 
35 #include "src/core/ext/xds/xds_client_stats.h"
36 #include "src/core/lib/channel/channel_args.h"
37 #include "src/core/lib/config/core_configuration.h"
38 #include "src/core/lib/debug/trace.h"
39 #include "src/core/lib/gprpp/debug_location.h"
40 #include "src/core/lib/gprpp/orphanable.h"
41 #include "src/core/lib/gprpp/ref_counted_ptr.h"
42 #include "src/core/lib/gprpp/ref_counted_string.h"
43 #include "src/core/lib/gprpp/validation_errors.h"
44 #include "src/core/lib/iomgr/pollset_set.h"
45 #include "src/core/lib/json/json.h"
46 #include "src/core/lib/json/json_args.h"
47 #include "src/core/lib/json/json_object_loader.h"
48 #include "src/core/lib/json/json_writer.h"
49 #include "src/core/load_balancing/delegating_helper.h"
50 #include "src/core/load_balancing/lb_policy.h"
51 #include "src/core/load_balancing/lb_policy_factory.h"
52 #include "src/core/load_balancing/lb_policy_registry.h"
53 #include "src/core/load_balancing/xds/xds_channel_args.h"
54 #include "src/core/resolver/endpoint_addresses.h"
55 
56 namespace grpc_core {
57 
58 TraceFlag grpc_xds_wrr_locality_lb_trace(false, "xds_wrr_locality_lb");
59 
60 namespace {
61 
62 constexpr absl::string_view kXdsWrrLocality = "xds_wrr_locality_experimental";
63 
64 // Config for xds_wrr_locality LB policy.
65 class XdsWrrLocalityLbConfig final : public LoadBalancingPolicy::Config {
66  public:
67   XdsWrrLocalityLbConfig() = default;
68 
69   XdsWrrLocalityLbConfig(const XdsWrrLocalityLbConfig&) = delete;
70   XdsWrrLocalityLbConfig& operator=(const XdsWrrLocalityLbConfig&) = delete;
71 
72   XdsWrrLocalityLbConfig(XdsWrrLocalityLbConfig&& other) = delete;
73   XdsWrrLocalityLbConfig& operator=(XdsWrrLocalityLbConfig&& other) = delete;
74 
name() const75   absl::string_view name() const override { return kXdsWrrLocality; }
76 
child_config() const77   const Json& child_config() const { return child_config_; }
78 
JsonLoader(const JsonArgs &)79   static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
80     // Note: The "childPolicy" field requires custom processing, so
81     // it's handled in JsonPostLoad() instead.
82     static const auto* loader =
83         JsonObjectLoader<XdsWrrLocalityLbConfig>().Finish();
84     return loader;
85   }
86 
JsonPostLoad(const Json & json,const JsonArgs &,ValidationErrors * errors)87   void JsonPostLoad(const Json& json, const JsonArgs&,
88                     ValidationErrors* errors) {
89     ValidationErrors::ScopedField field(errors, ".childPolicy");
90     auto it = json.object().find("childPolicy");
91     if (it == json.object().end()) {
92       errors->AddError("field not present");
93       return;
94     }
95     auto lb_config =
96         CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
97             it->second);
98     if (!lb_config.ok()) {
99       errors->AddError(lb_config.status().message());
100       return;
101     }
102     child_config_ = it->second;
103   }
104 
105  private:
106   Json child_config_;
107 };
108 
109 // xds_wrr_locality LB policy.
110 class XdsWrrLocalityLb final : public LoadBalancingPolicy {
111  public:
112   explicit XdsWrrLocalityLb(Args args);
113 
name() const114   absl::string_view name() const override { return kXdsWrrLocality; }
115 
116   absl::Status UpdateLocked(UpdateArgs args) override;
117   void ExitIdleLocked() override;
118   void ResetBackoffLocked() override;
119 
120  private:
121   using Helper = ParentOwningDelegatingChannelControlHelper<XdsWrrLocalityLb>;
122 
123   ~XdsWrrLocalityLb() override;
124 
125   void ShutdownLocked() override;
126 
127   OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
128       const ChannelArgs& args);
129 
130   OrphanablePtr<LoadBalancingPolicy> child_policy_;
131 };
132 
133 //
134 // XdsWrrLocalityLb
135 //
136 
XdsWrrLocalityLb(Args args)137 XdsWrrLocalityLb::XdsWrrLocalityLb(Args args)
138     : LoadBalancingPolicy(std::move(args)) {}
139 
~XdsWrrLocalityLb()140 XdsWrrLocalityLb::~XdsWrrLocalityLb() {
141   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_wrr_locality_lb_trace)) {
142     gpr_log(GPR_INFO, "[xds_wrr_locality_lb %p] destroying", this);
143   }
144 }
145 
ShutdownLocked()146 void XdsWrrLocalityLb::ShutdownLocked() {
147   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_wrr_locality_lb_trace)) {
148     gpr_log(GPR_INFO, "[xds_wrr_locality_lb %p] shutting down", this);
149   }
150   if (child_policy_ != nullptr) {
151     grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
152                                      interested_parties());
153     child_policy_.reset();
154   }
155 }
156 
ExitIdleLocked()157 void XdsWrrLocalityLb::ExitIdleLocked() {
158   if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
159 }
160 
ResetBackoffLocked()161 void XdsWrrLocalityLb::ResetBackoffLocked() {
162   if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
163 }
164 
UpdateLocked(UpdateArgs args)165 absl::Status XdsWrrLocalityLb::UpdateLocked(UpdateArgs args) {
166   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_wrr_locality_lb_trace)) {
167     gpr_log(GPR_INFO, "[xds_wrr_locality_lb %p] Received update", this);
168   }
169   auto config = args.config.TakeAsSubclass<XdsWrrLocalityLbConfig>();
170   // Scan the addresses to find the weight for each locality.
171   std::map<RefCountedStringValue, uint32_t> locality_weights;
172   if (args.addresses.ok()) {
173     (*args.addresses)->ForEach([&](const EndpointAddresses& endpoint) {
174       auto* locality_name = endpoint.args().GetObject<XdsLocalityName>();
175       uint32_t weight =
176           endpoint.args().GetInt(GRPC_ARG_XDS_LOCALITY_WEIGHT).value_or(0);
177       if (locality_name != nullptr && weight > 0) {
178         auto p = locality_weights.emplace(
179             locality_name->human_readable_string(), weight);
180         if (!p.second && p.first->second != weight) {
181           gpr_log(GPR_ERROR,
182                   "INTERNAL ERROR: xds_wrr_locality found different weights "
183                   "for locality %s (%u vs %u); using first value",
184                   p.first->first.c_str(), p.first->second, weight);
185         }
186       }
187     });
188   }
189   // Construct the config for the weighted_target policy.
190   Json::Object weighted_targets;
191   for (const auto& p : locality_weights) {
192     absl::string_view locality_name = p.first.as_string_view();
193     uint32_t weight = p.second;
194     // Add weighted target entry.
195     weighted_targets[std::string(locality_name)] = Json::FromObject({
196         {"weight", Json::FromNumber(weight)},
197         {"childPolicy", config->child_config()},
198     });
199   }
200   Json child_config_json = Json::FromArray({
201       Json::FromObject({
202           {"weighted_target_experimental",
203            Json::FromObject({
204                {"targets", Json::FromObject(std::move(weighted_targets))},
205            })},
206       }),
207   });
208   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_wrr_locality_lb_trace)) {
209     gpr_log(GPR_INFO,
210             "[xds_wrr_locality_lb %p] generated child policy config: %s", this,
211             JsonDump(child_config_json, /*indent=*/1).c_str());
212   }
213   // Parse config.
214   auto child_config =
215       CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
216           child_config_json);
217   if (!child_config.ok()) {
218     // This should never happen, but if it does, we basically have no
219     // way to fix it, so we put the channel in TRANSIENT_FAILURE.
220     gpr_log(GPR_ERROR,
221             "[xds_wrr_locality %p] error parsing generated child policy "
222             "config -- putting channel in TRANSIENT_FAILURE: %s",
223             this, child_config.status().ToString().c_str());
224     absl::Status status = absl::InternalError(absl::StrCat(
225         "xds_wrr_locality LB policy: error parsing generated child policy "
226         "config: ",
227         child_config.status().ToString()));
228     channel_control_helper()->UpdateState(
229         GRPC_CHANNEL_TRANSIENT_FAILURE, status,
230         MakeRefCounted<TransientFailurePicker>(status));
231     return status;
232   }
233   // Create child policy if needed (i.e., on first update).
234   if (child_policy_ == nullptr) {
235     child_policy_ = CreateChildPolicyLocked(args.args);
236   }
237   // Construct update args.
238   UpdateArgs update_args;
239   update_args.addresses = std::move(args.addresses);
240   update_args.config = std::move(*child_config);
241   update_args.resolution_note = std::move(args.resolution_note);
242   update_args.args = std::move(args.args);
243   // Update the policy.
244   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_wrr_locality_lb_trace)) {
245     gpr_log(GPR_INFO, "[xds_wrr_locality_lb %p] updating child policy %p", this,
246             child_policy_.get());
247   }
248   return child_policy_->UpdateLocked(std::move(update_args));
249 }
250 
CreateChildPolicyLocked(const ChannelArgs & args)251 OrphanablePtr<LoadBalancingPolicy> XdsWrrLocalityLb::CreateChildPolicyLocked(
252     const ChannelArgs& args) {
253   LoadBalancingPolicy::Args lb_policy_args;
254   lb_policy_args.work_serializer = work_serializer();
255   lb_policy_args.args = args;
256   lb_policy_args.channel_control_helper = std::make_unique<Helper>(
257       RefAsSubclass<XdsWrrLocalityLb>(DEBUG_LOCATION, "Helper"));
258   auto lb_policy =
259       CoreConfiguration::Get().lb_policy_registry().CreateLoadBalancingPolicy(
260           "weighted_target_experimental", std::move(lb_policy_args));
261   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_wrr_locality_lb_trace)) {
262     gpr_log(GPR_INFO, "[xds_wrr_locality_lb %p] created new child policy %p",
263             this, lb_policy.get());
264   }
265   // Add our interested_parties pollset_set to that of the newly created
266   // child policy. This will make the child policy progress upon activity on
267   // this LB policy, which in turn is tied to the application's call.
268   grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
269                                    interested_parties());
270   return lb_policy;
271 }
272 
273 //
274 // factory
275 //
276 
277 class XdsWrrLocalityLbFactory final : public LoadBalancingPolicyFactory {
278  public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const279   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
280       LoadBalancingPolicy::Args args) const override {
281     return MakeOrphanable<XdsWrrLocalityLb>(std::move(args));
282   }
283 
name() const284   absl::string_view name() const override { return kXdsWrrLocality; }
285 
286   absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json & json) const287   ParseLoadBalancingConfig(const Json& json) const override {
288     return LoadFromJson<RefCountedPtr<XdsWrrLocalityLbConfig>>(
289         json, JsonArgs(),
290         "errors validating xds_wrr_locality LB policy config");
291   }
292 };
293 
294 }  // namespace
295 
RegisterXdsWrrLocalityLbPolicy(CoreConfiguration::Builder * builder)296 void RegisterXdsWrrLocalityLbPolicy(CoreConfiguration::Builder* builder) {
297   builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
298       std::make_unique<XdsWrrLocalityLbFactory>());
299 }
300 
301 }  // namespace grpc_core
302