xref: /aosp_15_r20/external/grpc-grpc/test/cpp/interop/backend_metrics_lb_policy.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2023 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "test/cpp/interop/backend_metrics_lb_policy.h"
20 
21 #include "absl/strings/str_format.h"
22 
23 #include <grpc/support/port_platform.h>
24 
25 #include "src/core/lib/iomgr/pollset_set.h"
26 #include "src/core/load_balancing/delegating_helper.h"
27 #include "src/core/load_balancing/oob_backend_metric.h"
28 
29 namespace grpc {
30 namespace testing {
31 
32 namespace {
33 
34 using grpc_core::CoreConfiguration;
35 using grpc_core::LoadBalancingPolicy;
36 using grpc_core::MakeRefCounted;
37 using grpc_core::OrphanablePtr;
38 using grpc_core::RefCountedPtr;
39 
40 constexpr absl::string_view kBackendMetricsLbPolicyName =
41     "test_backend_metrics_load_balancer";
42 constexpr absl::string_view kMetricsTrackerArgument = "orca_metrics_tracker";
43 
BackendMetricDataToOrcaLoadReport(const grpc_core::BackendMetricData * backend_metric_data)44 LoadReportTracker::LoadReportEntry BackendMetricDataToOrcaLoadReport(
45     const grpc_core::BackendMetricData* backend_metric_data) {
46   if (backend_metric_data == nullptr) {
47     return absl::nullopt;
48   }
49   TestOrcaReport load_report;
50   load_report.set_cpu_utilization(backend_metric_data->cpu_utilization);
51   load_report.set_memory_utilization(backend_metric_data->mem_utilization);
52   for (const auto& p : backend_metric_data->request_cost) {
53     std::string name(p.first);
54     (*load_report.mutable_request_cost())[name] = p.second;
55   }
56   for (const auto& p : backend_metric_data->utilization) {
57     std::string name(p.first);
58     (*load_report.mutable_utilization())[name] = p.second;
59   }
60   return load_report;
61 }
62 
63 class BackendMetricsLbPolicy : public LoadBalancingPolicy {
64  public:
BackendMetricsLbPolicy(Args args)65   explicit BackendMetricsLbPolicy(Args args)
66       : LoadBalancingPolicy(std::move(args), /*initial_refcount=*/2) {
67     load_report_tracker_ =
68         channel_args().GetPointer<LoadReportTracker>(kMetricsTrackerArgument);
69     GPR_ASSERT(load_report_tracker_ != nullptr);
70     Args delegate_args;
71     delegate_args.work_serializer = work_serializer();
72     delegate_args.args = channel_args();
73     delegate_args.channel_control_helper =
74         std::make_unique<Helper>(RefCountedPtr<BackendMetricsLbPolicy>(this));
75     delegate_ =
76         CoreConfiguration::Get().lb_policy_registry().CreateLoadBalancingPolicy(
77             "pick_first", std::move(delegate_args));
78     grpc_pollset_set_add_pollset_set(delegate_->interested_parties(),
79                                      interested_parties());
80   }
81 
82   ~BackendMetricsLbPolicy() override = default;
83 
name() const84   absl::string_view name() const override {
85     return kBackendMetricsLbPolicyName;
86   }
87 
UpdateLocked(UpdateArgs args)88   absl::Status UpdateLocked(UpdateArgs args) override {
89     auto config =
90         CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
91             grpc_core::Json::FromArray({grpc_core::Json::FromObject(
92                 {{"pick_first", grpc_core::Json::FromObject({})}})}));
93     args.config = std::move(config.value());
94     return delegate_->UpdateLocked(std::move(args));
95   }
96 
ExitIdleLocked()97   void ExitIdleLocked() override { delegate_->ExitIdleLocked(); }
98 
ResetBackoffLocked()99   void ResetBackoffLocked() override { delegate_->ResetBackoffLocked(); }
100 
101  private:
102   class Picker : public SubchannelPicker {
103    public:
Picker(RefCountedPtr<SubchannelPicker> delegate_picker,LoadReportTracker * load_report_tracker)104     Picker(RefCountedPtr<SubchannelPicker> delegate_picker,
105            LoadReportTracker* load_report_tracker)
106         : delegate_picker_(std::move(delegate_picker)),
107           load_report_tracker_(load_report_tracker) {}
108 
Pick(PickArgs args)109     PickResult Pick(PickArgs args) override {
110       // Do pick.
111       PickResult result = delegate_picker_->Pick(args);
112       // Intercept trailing metadata.
113       auto* complete_pick = absl::get_if<PickResult::Complete>(&result.result);
114       if (complete_pick != nullptr) {
115         complete_pick->subchannel_call_tracker =
116             std::make_unique<SubchannelCallTracker>(load_report_tracker_);
117       }
118       return result;
119     }
120 
121    private:
122     RefCountedPtr<SubchannelPicker> delegate_picker_;
123     LoadReportTracker* load_report_tracker_;
124   };
125 
126   class OobMetricWatcher : public grpc_core::OobBackendMetricWatcher {
127    public:
OobMetricWatcher(LoadReportTracker * load_report_tracker)128     explicit OobMetricWatcher(LoadReportTracker* load_report_tracker)
129         : load_report_tracker_(load_report_tracker) {}
130 
131    private:
OnBackendMetricReport(const grpc_core::BackendMetricData & backend_metric_data)132     void OnBackendMetricReport(
133         const grpc_core::BackendMetricData& backend_metric_data) override {
134       load_report_tracker_->RecordOobLoadReport(backend_metric_data);
135     }
136 
137     LoadReportTracker* load_report_tracker_;
138   };
139 
140   class Helper : public ParentOwningDelegatingChannelControlHelper<
141                      BackendMetricsLbPolicy> {
142    public:
Helper(RefCountedPtr<BackendMetricsLbPolicy> parent)143     explicit Helper(RefCountedPtr<BackendMetricsLbPolicy> parent)
144         : ParentOwningDelegatingChannelControlHelper(std::move(parent)) {}
145 
CreateSubchannel(const grpc_resolved_address & address,const grpc_core::ChannelArgs & per_address_args,const grpc_core::ChannelArgs & args)146     RefCountedPtr<grpc_core::SubchannelInterface> CreateSubchannel(
147         const grpc_resolved_address& address,
148         const grpc_core::ChannelArgs& per_address_args,
149         const grpc_core::ChannelArgs& args) override {
150       auto subchannel =
151           parent_helper()->CreateSubchannel(address, per_address_args, args);
152       subchannel->AddDataWatcher(MakeOobBackendMetricWatcher(
153           grpc_core::Duration::Seconds(1),
154           std::make_unique<OobMetricWatcher>(parent()->load_report_tracker_)));
155       return subchannel;
156     }
157 
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)158     void UpdateState(grpc_connectivity_state state, const absl::Status& status,
159                      RefCountedPtr<SubchannelPicker> picker) override {
160       parent_helper()->UpdateState(
161           state, status,
162           MakeRefCounted<Picker>(std::move(picker),
163                                  parent()->load_report_tracker_));
164     }
165   };
166 
167   class SubchannelCallTracker : public SubchannelCallTrackerInterface {
168    public:
SubchannelCallTracker(LoadReportTracker * load_report_tracker)169     explicit SubchannelCallTracker(LoadReportTracker* load_report_tracker)
170         : load_report_tracker_(load_report_tracker) {}
171 
Start()172     void Start() override {}
173 
Finish(FinishArgs args)174     void Finish(FinishArgs args) override {
175       load_report_tracker_->RecordPerRpcLoadReport(
176           args.backend_metric_accessor->GetBackendMetricData());
177     }
178 
179    private:
180     LoadReportTracker* load_report_tracker_;
181   };
182 
ShutdownLocked()183   void ShutdownLocked() override {
184     grpc_pollset_set_del_pollset_set(delegate_->interested_parties(),
185                                      interested_parties());
186     delegate_.reset();
187   }
188 
189   OrphanablePtr<LoadBalancingPolicy> delegate_;
190   LoadReportTracker* load_report_tracker_;
191 };
192 
193 class BackendMetricsLbPolicyFactory
194     : public grpc_core::LoadBalancingPolicyFactory {
195  private:
196   class BackendMetricsLbPolicyFactoryConfig
197       : public LoadBalancingPolicy::Config {
198    private:
name() const199     absl::string_view name() const override {
200       return kBackendMetricsLbPolicyName;
201     }
202   };
203 
name() const204   absl::string_view name() const override {
205     return kBackendMetricsLbPolicyName;
206   }
207 
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const208   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
209       LoadBalancingPolicy::Args args) const override {
210     return grpc_core::MakeOrphanable<BackendMetricsLbPolicy>(std::move(args));
211   }
212 
213   absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const grpc_core::Json &) const214   ParseLoadBalancingConfig(const grpc_core::Json& /*json*/) const override {
215     return MakeRefCounted<BackendMetricsLbPolicyFactoryConfig>();
216   }
217 };
218 }  // namespace
219 
RegisterBackendMetricsLbPolicy(CoreConfiguration::Builder * builder)220 void RegisterBackendMetricsLbPolicy(CoreConfiguration::Builder* builder) {
221   builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
222       std::make_unique<BackendMetricsLbPolicyFactory>());
223 }
224 
RecordPerRpcLoadReport(const grpc_core::BackendMetricData * backend_metric_data)225 void LoadReportTracker::RecordPerRpcLoadReport(
226     const grpc_core::BackendMetricData* backend_metric_data) {
227   grpc_core::MutexLock lock(&load_reports_mu_);
228   per_rpc_load_reports_.emplace_back(
229       BackendMetricDataToOrcaLoadReport(backend_metric_data));
230 }
231 
RecordOobLoadReport(const grpc_core::BackendMetricData & oob_metric_data)232 void LoadReportTracker::RecordOobLoadReport(
233     const grpc_core::BackendMetricData& oob_metric_data) {
234   grpc_core::MutexLock lock(&load_reports_mu_);
235   oob_load_reports_.emplace_back(
236       *BackendMetricDataToOrcaLoadReport(&oob_metric_data));
237   load_reports_cv_.Signal();
238 }
239 
240 absl::optional<LoadReportTracker::LoadReportEntry>
GetNextLoadReport()241 LoadReportTracker::GetNextLoadReport() {
242   grpc_core::MutexLock lock(&load_reports_mu_);
243   if (per_rpc_load_reports_.empty()) {
244     return absl::nullopt;
245   }
246   auto report = std::move(per_rpc_load_reports_.front());
247   per_rpc_load_reports_.pop_front();
248   return report;
249 }
250 
WaitForOobLoadReport(const std::function<bool (const TestOrcaReport &)> & predicate,absl::Duration poll_timeout,size_t max_attempts)251 LoadReportTracker::LoadReportEntry LoadReportTracker::WaitForOobLoadReport(
252     const std::function<bool(const TestOrcaReport&)>& predicate,
253     absl::Duration poll_timeout, size_t max_attempts) {
254   grpc_core::MutexLock lock(&load_reports_mu_);
255   // This condition will be called under lock
256   for (size_t i = 0; i < max_attempts; i++) {
257     if (oob_load_reports_.empty()) {
258       load_reports_cv_.WaitWithTimeout(&load_reports_mu_, poll_timeout);
259       if (oob_load_reports_.empty()) {
260         return absl::nullopt;
261       }
262     }
263     auto report = std::move(oob_load_reports_.front());
264     oob_load_reports_.pop_front();
265     if (predicate(report)) {
266       gpr_log(GPR_DEBUG, "Report #%" PRIuPTR " matched", i + 1);
267       return report;
268     }
269   }
270   return absl::nullopt;
271 }
272 
ResetCollectedLoadReports()273 void LoadReportTracker::ResetCollectedLoadReports() {
274   grpc_core::MutexLock lock(&load_reports_mu_);
275   per_rpc_load_reports_.clear();
276   oob_load_reports_.clear();
277 }
278 
GetChannelArguments()279 ChannelArguments LoadReportTracker::GetChannelArguments() {
280   ChannelArguments arguments;
281   arguments.SetPointer(std::string(kMetricsTrackerArgument), this);
282   return arguments;
283 }
284 
285 }  // namespace testing
286 }  // namespace grpc
287