1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include <cstdint>
20 #include <map>
21 #include <memory>
22 #include <string>
23 #include <utility>
24
25 #include "absl/status/status.h"
26 #include "absl/status/statusor.h"
27 #include "absl/strings/str_cat.h"
28 #include "absl/strings/string_view.h"
29 #include "absl/types/optional.h"
30
31 #include <grpc/impl/connectivity_state.h>
32 #include <grpc/support/json.h>
33 #include <grpc/support/log.h>
34
35 #include "src/core/ext/xds/xds_client_stats.h"
36 #include "src/core/lib/channel/channel_args.h"
37 #include "src/core/lib/config/core_configuration.h"
38 #include "src/core/lib/debug/trace.h"
39 #include "src/core/lib/gprpp/debug_location.h"
40 #include "src/core/lib/gprpp/orphanable.h"
41 #include "src/core/lib/gprpp/ref_counted_ptr.h"
42 #include "src/core/lib/gprpp/ref_counted_string.h"
43 #include "src/core/lib/gprpp/validation_errors.h"
44 #include "src/core/lib/iomgr/pollset_set.h"
45 #include "src/core/lib/json/json.h"
46 #include "src/core/lib/json/json_args.h"
47 #include "src/core/lib/json/json_object_loader.h"
48 #include "src/core/lib/json/json_writer.h"
49 #include "src/core/load_balancing/delegating_helper.h"
50 #include "src/core/load_balancing/lb_policy.h"
51 #include "src/core/load_balancing/lb_policy_factory.h"
52 #include "src/core/load_balancing/lb_policy_registry.h"
53 #include "src/core/load_balancing/xds/xds_channel_args.h"
54 #include "src/core/resolver/endpoint_addresses.h"
55
56 namespace grpc_core {
57
58 TraceFlag grpc_xds_wrr_locality_lb_trace(false, "xds_wrr_locality_lb");
59
60 namespace {
61
62 constexpr absl::string_view kXdsWrrLocality = "xds_wrr_locality_experimental";
63
64 // Config for xds_wrr_locality LB policy.
65 class XdsWrrLocalityLbConfig final : public LoadBalancingPolicy::Config {
66 public:
67 XdsWrrLocalityLbConfig() = default;
68
69 XdsWrrLocalityLbConfig(const XdsWrrLocalityLbConfig&) = delete;
70 XdsWrrLocalityLbConfig& operator=(const XdsWrrLocalityLbConfig&) = delete;
71
72 XdsWrrLocalityLbConfig(XdsWrrLocalityLbConfig&& other) = delete;
73 XdsWrrLocalityLbConfig& operator=(XdsWrrLocalityLbConfig&& other) = delete;
74
name() const75 absl::string_view name() const override { return kXdsWrrLocality; }
76
child_config() const77 const Json& child_config() const { return child_config_; }
78
JsonLoader(const JsonArgs &)79 static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
80 // Note: The "childPolicy" field requires custom processing, so
81 // it's handled in JsonPostLoad() instead.
82 static const auto* loader =
83 JsonObjectLoader<XdsWrrLocalityLbConfig>().Finish();
84 return loader;
85 }
86
JsonPostLoad(const Json & json,const JsonArgs &,ValidationErrors * errors)87 void JsonPostLoad(const Json& json, const JsonArgs&,
88 ValidationErrors* errors) {
89 ValidationErrors::ScopedField field(errors, ".childPolicy");
90 auto it = json.object().find("childPolicy");
91 if (it == json.object().end()) {
92 errors->AddError("field not present");
93 return;
94 }
95 auto lb_config =
96 CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
97 it->second);
98 if (!lb_config.ok()) {
99 errors->AddError(lb_config.status().message());
100 return;
101 }
102 child_config_ = it->second;
103 }
104
105 private:
106 Json child_config_;
107 };
108
109 // xds_wrr_locality LB policy.
110 class XdsWrrLocalityLb final : public LoadBalancingPolicy {
111 public:
112 explicit XdsWrrLocalityLb(Args args);
113
name() const114 absl::string_view name() const override { return kXdsWrrLocality; }
115
116 absl::Status UpdateLocked(UpdateArgs args) override;
117 void ExitIdleLocked() override;
118 void ResetBackoffLocked() override;
119
120 private:
121 using Helper = ParentOwningDelegatingChannelControlHelper<XdsWrrLocalityLb>;
122
123 ~XdsWrrLocalityLb() override;
124
125 void ShutdownLocked() override;
126
127 OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
128 const ChannelArgs& args);
129
130 OrphanablePtr<LoadBalancingPolicy> child_policy_;
131 };
132
133 //
134 // XdsWrrLocalityLb
135 //
136
XdsWrrLocalityLb(Args args)137 XdsWrrLocalityLb::XdsWrrLocalityLb(Args args)
138 : LoadBalancingPolicy(std::move(args)) {}
139
~XdsWrrLocalityLb()140 XdsWrrLocalityLb::~XdsWrrLocalityLb() {
141 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_wrr_locality_lb_trace)) {
142 gpr_log(GPR_INFO, "[xds_wrr_locality_lb %p] destroying", this);
143 }
144 }
145
ShutdownLocked()146 void XdsWrrLocalityLb::ShutdownLocked() {
147 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_wrr_locality_lb_trace)) {
148 gpr_log(GPR_INFO, "[xds_wrr_locality_lb %p] shutting down", this);
149 }
150 if (child_policy_ != nullptr) {
151 grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
152 interested_parties());
153 child_policy_.reset();
154 }
155 }
156
ExitIdleLocked()157 void XdsWrrLocalityLb::ExitIdleLocked() {
158 if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
159 }
160
ResetBackoffLocked()161 void XdsWrrLocalityLb::ResetBackoffLocked() {
162 if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
163 }
164
UpdateLocked(UpdateArgs args)165 absl::Status XdsWrrLocalityLb::UpdateLocked(UpdateArgs args) {
166 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_wrr_locality_lb_trace)) {
167 gpr_log(GPR_INFO, "[xds_wrr_locality_lb %p] Received update", this);
168 }
169 auto config = args.config.TakeAsSubclass<XdsWrrLocalityLbConfig>();
170 // Scan the addresses to find the weight for each locality.
171 std::map<RefCountedStringValue, uint32_t> locality_weights;
172 if (args.addresses.ok()) {
173 (*args.addresses)->ForEach([&](const EndpointAddresses& endpoint) {
174 auto* locality_name = endpoint.args().GetObject<XdsLocalityName>();
175 uint32_t weight =
176 endpoint.args().GetInt(GRPC_ARG_XDS_LOCALITY_WEIGHT).value_or(0);
177 if (locality_name != nullptr && weight > 0) {
178 auto p = locality_weights.emplace(
179 locality_name->human_readable_string(), weight);
180 if (!p.second && p.first->second != weight) {
181 gpr_log(GPR_ERROR,
182 "INTERNAL ERROR: xds_wrr_locality found different weights "
183 "for locality %s (%u vs %u); using first value",
184 p.first->first.c_str(), p.first->second, weight);
185 }
186 }
187 });
188 }
189 // Construct the config for the weighted_target policy.
190 Json::Object weighted_targets;
191 for (const auto& p : locality_weights) {
192 absl::string_view locality_name = p.first.as_string_view();
193 uint32_t weight = p.second;
194 // Add weighted target entry.
195 weighted_targets[std::string(locality_name)] = Json::FromObject({
196 {"weight", Json::FromNumber(weight)},
197 {"childPolicy", config->child_config()},
198 });
199 }
200 Json child_config_json = Json::FromArray({
201 Json::FromObject({
202 {"weighted_target_experimental",
203 Json::FromObject({
204 {"targets", Json::FromObject(std::move(weighted_targets))},
205 })},
206 }),
207 });
208 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_wrr_locality_lb_trace)) {
209 gpr_log(GPR_INFO,
210 "[xds_wrr_locality_lb %p] generated child policy config: %s", this,
211 JsonDump(child_config_json, /*indent=*/1).c_str());
212 }
213 // Parse config.
214 auto child_config =
215 CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
216 child_config_json);
217 if (!child_config.ok()) {
218 // This should never happen, but if it does, we basically have no
219 // way to fix it, so we put the channel in TRANSIENT_FAILURE.
220 gpr_log(GPR_ERROR,
221 "[xds_wrr_locality %p] error parsing generated child policy "
222 "config -- putting channel in TRANSIENT_FAILURE: %s",
223 this, child_config.status().ToString().c_str());
224 absl::Status status = absl::InternalError(absl::StrCat(
225 "xds_wrr_locality LB policy: error parsing generated child policy "
226 "config: ",
227 child_config.status().ToString()));
228 channel_control_helper()->UpdateState(
229 GRPC_CHANNEL_TRANSIENT_FAILURE, status,
230 MakeRefCounted<TransientFailurePicker>(status));
231 return status;
232 }
233 // Create child policy if needed (i.e., on first update).
234 if (child_policy_ == nullptr) {
235 child_policy_ = CreateChildPolicyLocked(args.args);
236 }
237 // Construct update args.
238 UpdateArgs update_args;
239 update_args.addresses = std::move(args.addresses);
240 update_args.config = std::move(*child_config);
241 update_args.resolution_note = std::move(args.resolution_note);
242 update_args.args = std::move(args.args);
243 // Update the policy.
244 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_wrr_locality_lb_trace)) {
245 gpr_log(GPR_INFO, "[xds_wrr_locality_lb %p] updating child policy %p", this,
246 child_policy_.get());
247 }
248 return child_policy_->UpdateLocked(std::move(update_args));
249 }
250
CreateChildPolicyLocked(const ChannelArgs & args)251 OrphanablePtr<LoadBalancingPolicy> XdsWrrLocalityLb::CreateChildPolicyLocked(
252 const ChannelArgs& args) {
253 LoadBalancingPolicy::Args lb_policy_args;
254 lb_policy_args.work_serializer = work_serializer();
255 lb_policy_args.args = args;
256 lb_policy_args.channel_control_helper = std::make_unique<Helper>(
257 RefAsSubclass<XdsWrrLocalityLb>(DEBUG_LOCATION, "Helper"));
258 auto lb_policy =
259 CoreConfiguration::Get().lb_policy_registry().CreateLoadBalancingPolicy(
260 "weighted_target_experimental", std::move(lb_policy_args));
261 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_wrr_locality_lb_trace)) {
262 gpr_log(GPR_INFO, "[xds_wrr_locality_lb %p] created new child policy %p",
263 this, lb_policy.get());
264 }
265 // Add our interested_parties pollset_set to that of the newly created
266 // child policy. This will make the child policy progress upon activity on
267 // this LB policy, which in turn is tied to the application's call.
268 grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
269 interested_parties());
270 return lb_policy;
271 }
272
273 //
274 // factory
275 //
276
277 class XdsWrrLocalityLbFactory final : public LoadBalancingPolicyFactory {
278 public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const279 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
280 LoadBalancingPolicy::Args args) const override {
281 return MakeOrphanable<XdsWrrLocalityLb>(std::move(args));
282 }
283
name() const284 absl::string_view name() const override { return kXdsWrrLocality; }
285
286 absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json & json) const287 ParseLoadBalancingConfig(const Json& json) const override {
288 return LoadFromJson<RefCountedPtr<XdsWrrLocalityLbConfig>>(
289 json, JsonArgs(),
290 "errors validating xds_wrr_locality LB policy config");
291 }
292 };
293
294 } // namespace
295
RegisterXdsWrrLocalityLbPolicy(CoreConfiguration::Builder * builder)296 void RegisterXdsWrrLocalityLbPolicy(CoreConfiguration::Builder* builder) {
297 builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
298 std::make_unique<XdsWrrLocalityLbFactory>());
299 }
300
301 } // namespace grpc_core
302