1 //
2 //
3 // Copyright 2023 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "test/cpp/interop/backend_metrics_lb_policy.h"
20
21 #include "absl/strings/str_format.h"
22
23 #include <grpc/support/port_platform.h>
24
25 #include "src/core/lib/iomgr/pollset_set.h"
26 #include "src/core/load_balancing/delegating_helper.h"
27 #include "src/core/load_balancing/oob_backend_metric.h"
28
29 namespace grpc {
30 namespace testing {
31
32 namespace {
33
34 using grpc_core::CoreConfiguration;
35 using grpc_core::LoadBalancingPolicy;
36 using grpc_core::MakeRefCounted;
37 using grpc_core::OrphanablePtr;
38 using grpc_core::RefCountedPtr;
39
40 constexpr absl::string_view kBackendMetricsLbPolicyName =
41 "test_backend_metrics_load_balancer";
42 constexpr absl::string_view kMetricsTrackerArgument = "orca_metrics_tracker";
43
BackendMetricDataToOrcaLoadReport(const grpc_core::BackendMetricData * backend_metric_data)44 LoadReportTracker::LoadReportEntry BackendMetricDataToOrcaLoadReport(
45 const grpc_core::BackendMetricData* backend_metric_data) {
46 if (backend_metric_data == nullptr) {
47 return absl::nullopt;
48 }
49 TestOrcaReport load_report;
50 load_report.set_cpu_utilization(backend_metric_data->cpu_utilization);
51 load_report.set_memory_utilization(backend_metric_data->mem_utilization);
52 for (const auto& p : backend_metric_data->request_cost) {
53 std::string name(p.first);
54 (*load_report.mutable_request_cost())[name] = p.second;
55 }
56 for (const auto& p : backend_metric_data->utilization) {
57 std::string name(p.first);
58 (*load_report.mutable_utilization())[name] = p.second;
59 }
60 return load_report;
61 }
62
63 class BackendMetricsLbPolicy : public LoadBalancingPolicy {
64 public:
BackendMetricsLbPolicy(Args args)65 explicit BackendMetricsLbPolicy(Args args)
66 : LoadBalancingPolicy(std::move(args), /*initial_refcount=*/2) {
67 load_report_tracker_ =
68 channel_args().GetPointer<LoadReportTracker>(kMetricsTrackerArgument);
69 GPR_ASSERT(load_report_tracker_ != nullptr);
70 Args delegate_args;
71 delegate_args.work_serializer = work_serializer();
72 delegate_args.args = channel_args();
73 delegate_args.channel_control_helper =
74 std::make_unique<Helper>(RefCountedPtr<BackendMetricsLbPolicy>(this));
75 delegate_ =
76 CoreConfiguration::Get().lb_policy_registry().CreateLoadBalancingPolicy(
77 "pick_first", std::move(delegate_args));
78 grpc_pollset_set_add_pollset_set(delegate_->interested_parties(),
79 interested_parties());
80 }
81
82 ~BackendMetricsLbPolicy() override = default;
83
name() const84 absl::string_view name() const override {
85 return kBackendMetricsLbPolicyName;
86 }
87
UpdateLocked(UpdateArgs args)88 absl::Status UpdateLocked(UpdateArgs args) override {
89 auto config =
90 CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
91 grpc_core::Json::FromArray({grpc_core::Json::FromObject(
92 {{"pick_first", grpc_core::Json::FromObject({})}})}));
93 args.config = std::move(config.value());
94 return delegate_->UpdateLocked(std::move(args));
95 }
96
ExitIdleLocked()97 void ExitIdleLocked() override { delegate_->ExitIdleLocked(); }
98
ResetBackoffLocked()99 void ResetBackoffLocked() override { delegate_->ResetBackoffLocked(); }
100
101 private:
102 class Picker : public SubchannelPicker {
103 public:
Picker(RefCountedPtr<SubchannelPicker> delegate_picker,LoadReportTracker * load_report_tracker)104 Picker(RefCountedPtr<SubchannelPicker> delegate_picker,
105 LoadReportTracker* load_report_tracker)
106 : delegate_picker_(std::move(delegate_picker)),
107 load_report_tracker_(load_report_tracker) {}
108
Pick(PickArgs args)109 PickResult Pick(PickArgs args) override {
110 // Do pick.
111 PickResult result = delegate_picker_->Pick(args);
112 // Intercept trailing metadata.
113 auto* complete_pick = absl::get_if<PickResult::Complete>(&result.result);
114 if (complete_pick != nullptr) {
115 complete_pick->subchannel_call_tracker =
116 std::make_unique<SubchannelCallTracker>(load_report_tracker_);
117 }
118 return result;
119 }
120
121 private:
122 RefCountedPtr<SubchannelPicker> delegate_picker_;
123 LoadReportTracker* load_report_tracker_;
124 };
125
126 class OobMetricWatcher : public grpc_core::OobBackendMetricWatcher {
127 public:
OobMetricWatcher(LoadReportTracker * load_report_tracker)128 explicit OobMetricWatcher(LoadReportTracker* load_report_tracker)
129 : load_report_tracker_(load_report_tracker) {}
130
131 private:
OnBackendMetricReport(const grpc_core::BackendMetricData & backend_metric_data)132 void OnBackendMetricReport(
133 const grpc_core::BackendMetricData& backend_metric_data) override {
134 load_report_tracker_->RecordOobLoadReport(backend_metric_data);
135 }
136
137 LoadReportTracker* load_report_tracker_;
138 };
139
140 class Helper : public ParentOwningDelegatingChannelControlHelper<
141 BackendMetricsLbPolicy> {
142 public:
Helper(RefCountedPtr<BackendMetricsLbPolicy> parent)143 explicit Helper(RefCountedPtr<BackendMetricsLbPolicy> parent)
144 : ParentOwningDelegatingChannelControlHelper(std::move(parent)) {}
145
CreateSubchannel(const grpc_resolved_address & address,const grpc_core::ChannelArgs & per_address_args,const grpc_core::ChannelArgs & args)146 RefCountedPtr<grpc_core::SubchannelInterface> CreateSubchannel(
147 const grpc_resolved_address& address,
148 const grpc_core::ChannelArgs& per_address_args,
149 const grpc_core::ChannelArgs& args) override {
150 auto subchannel =
151 parent_helper()->CreateSubchannel(address, per_address_args, args);
152 subchannel->AddDataWatcher(MakeOobBackendMetricWatcher(
153 grpc_core::Duration::Seconds(1),
154 std::make_unique<OobMetricWatcher>(parent()->load_report_tracker_)));
155 return subchannel;
156 }
157
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)158 void UpdateState(grpc_connectivity_state state, const absl::Status& status,
159 RefCountedPtr<SubchannelPicker> picker) override {
160 parent_helper()->UpdateState(
161 state, status,
162 MakeRefCounted<Picker>(std::move(picker),
163 parent()->load_report_tracker_));
164 }
165 };
166
167 class SubchannelCallTracker : public SubchannelCallTrackerInterface {
168 public:
SubchannelCallTracker(LoadReportTracker * load_report_tracker)169 explicit SubchannelCallTracker(LoadReportTracker* load_report_tracker)
170 : load_report_tracker_(load_report_tracker) {}
171
Start()172 void Start() override {}
173
Finish(FinishArgs args)174 void Finish(FinishArgs args) override {
175 load_report_tracker_->RecordPerRpcLoadReport(
176 args.backend_metric_accessor->GetBackendMetricData());
177 }
178
179 private:
180 LoadReportTracker* load_report_tracker_;
181 };
182
ShutdownLocked()183 void ShutdownLocked() override {
184 grpc_pollset_set_del_pollset_set(delegate_->interested_parties(),
185 interested_parties());
186 delegate_.reset();
187 }
188
189 OrphanablePtr<LoadBalancingPolicy> delegate_;
190 LoadReportTracker* load_report_tracker_;
191 };
192
193 class BackendMetricsLbPolicyFactory
194 : public grpc_core::LoadBalancingPolicyFactory {
195 private:
196 class BackendMetricsLbPolicyFactoryConfig
197 : public LoadBalancingPolicy::Config {
198 private:
name() const199 absl::string_view name() const override {
200 return kBackendMetricsLbPolicyName;
201 }
202 };
203
name() const204 absl::string_view name() const override {
205 return kBackendMetricsLbPolicyName;
206 }
207
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const208 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
209 LoadBalancingPolicy::Args args) const override {
210 return grpc_core::MakeOrphanable<BackendMetricsLbPolicy>(std::move(args));
211 }
212
213 absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const grpc_core::Json &) const214 ParseLoadBalancingConfig(const grpc_core::Json& /*json*/) const override {
215 return MakeRefCounted<BackendMetricsLbPolicyFactoryConfig>();
216 }
217 };
218 } // namespace
219
RegisterBackendMetricsLbPolicy(CoreConfiguration::Builder * builder)220 void RegisterBackendMetricsLbPolicy(CoreConfiguration::Builder* builder) {
221 builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
222 std::make_unique<BackendMetricsLbPolicyFactory>());
223 }
224
RecordPerRpcLoadReport(const grpc_core::BackendMetricData * backend_metric_data)225 void LoadReportTracker::RecordPerRpcLoadReport(
226 const grpc_core::BackendMetricData* backend_metric_data) {
227 grpc_core::MutexLock lock(&load_reports_mu_);
228 per_rpc_load_reports_.emplace_back(
229 BackendMetricDataToOrcaLoadReport(backend_metric_data));
230 }
231
RecordOobLoadReport(const grpc_core::BackendMetricData & oob_metric_data)232 void LoadReportTracker::RecordOobLoadReport(
233 const grpc_core::BackendMetricData& oob_metric_data) {
234 grpc_core::MutexLock lock(&load_reports_mu_);
235 oob_load_reports_.emplace_back(
236 *BackendMetricDataToOrcaLoadReport(&oob_metric_data));
237 load_reports_cv_.Signal();
238 }
239
240 absl::optional<LoadReportTracker::LoadReportEntry>
GetNextLoadReport()241 LoadReportTracker::GetNextLoadReport() {
242 grpc_core::MutexLock lock(&load_reports_mu_);
243 if (per_rpc_load_reports_.empty()) {
244 return absl::nullopt;
245 }
246 auto report = std::move(per_rpc_load_reports_.front());
247 per_rpc_load_reports_.pop_front();
248 return report;
249 }
250
WaitForOobLoadReport(const std::function<bool (const TestOrcaReport &)> & predicate,absl::Duration poll_timeout,size_t max_attempts)251 LoadReportTracker::LoadReportEntry LoadReportTracker::WaitForOobLoadReport(
252 const std::function<bool(const TestOrcaReport&)>& predicate,
253 absl::Duration poll_timeout, size_t max_attempts) {
254 grpc_core::MutexLock lock(&load_reports_mu_);
255 // This condition will be called under lock
256 for (size_t i = 0; i < max_attempts; i++) {
257 if (oob_load_reports_.empty()) {
258 load_reports_cv_.WaitWithTimeout(&load_reports_mu_, poll_timeout);
259 if (oob_load_reports_.empty()) {
260 return absl::nullopt;
261 }
262 }
263 auto report = std::move(oob_load_reports_.front());
264 oob_load_reports_.pop_front();
265 if (predicate(report)) {
266 gpr_log(GPR_DEBUG, "Report #%" PRIuPTR " matched", i + 1);
267 return report;
268 }
269 }
270 return absl::nullopt;
271 }
272
ResetCollectedLoadReports()273 void LoadReportTracker::ResetCollectedLoadReports() {
274 grpc_core::MutexLock lock(&load_reports_mu_);
275 per_rpc_load_reports_.clear();
276 oob_load_reports_.clear();
277 }
278
GetChannelArguments()279 ChannelArguments LoadReportTracker::GetChannelArguments() {
280 ChannelArguments arguments;
281 arguments.SetPointer(std::string(kMetricsTrackerArgument), this);
282 return arguments;
283 }
284
285 } // namespace testing
286 } // namespace grpc
287