1 //
2 //
3 // Copyright 2024 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "test/cpp/interop/backend_metrics_lb_policy.h"
20
21 #include <memory>
22 #include <thread>
23
24 #include <gmock/gmock.h>
25 #include <gtest/gtest.h>
26
27 #include <grpc/grpc.h>
28 #include <grpcpp/ext/call_metric_recorder.h>
29 #include <grpcpp/ext/orca_service.h>
30 #include <grpcpp/grpcpp.h>
31 #include <grpcpp/support/status.h>
32
33 #include "src/core/lib/config/config_vars.h"
34 #include "src/core/lib/gprpp/sync.h"
35 #include "src/proto/grpc/testing/messages.pb.h"
36 #include "src/proto/grpc/testing/test.grpc.pb.h"
37 #include "test/core/util/port.h"
38 #include "test/core/util/test_config.h"
39
40 namespace grpc {
41 namespace testing {
42 namespace {
43
44 class EchoServiceImpl : public grpc::testing::TestService::CallbackService {
45 public:
UnaryCall(grpc::CallbackServerContext * context,const grpc::testing::SimpleRequest *,grpc::testing::SimpleResponse *)46 grpc::ServerUnaryReactor* UnaryCall(
47 grpc::CallbackServerContext* context,
48 const grpc::testing::SimpleRequest* /* request */,
49 grpc::testing::SimpleResponse* /* response */) override {
50 auto reactor = context->DefaultReactor();
51 reactor->Finish(grpc::Status::OK);
52 return reactor;
53 }
54 };
55
56 class Server {
57 public:
Server()58 Server() : port_(grpc_pick_unused_port_or_die()) {
59 server_thread_ = std::thread(ServerLoop, this);
60 grpc_core::MutexLock lock(&mu_);
61 cond_.WaitWithTimeout(&mu_, absl::Seconds(1));
62 }
63
~Server()64 ~Server() {
65 server_->Shutdown();
66 server_thread_.join();
67 }
68
address() const69 std::string address() const { return absl::StrCat("localhost:", port_); }
70
71 private:
ServerLoop(Server * server)72 static void ServerLoop(Server* server) { server->Run(); }
73
Run()74 void Run() {
75 ServerBuilder builder;
76 EchoServiceImpl service;
77 auto server_metric_recorder =
78 grpc::experimental::ServerMetricRecorder::Create();
79 server_metric_recorder->SetCpuUtilization(.5f);
80 grpc::experimental::OrcaService orca_service(
81 server_metric_recorder.get(),
82 grpc::experimental::OrcaService::Options().set_min_report_duration(
83 absl::Seconds(1)));
84 builder.RegisterService(&orca_service);
85 builder.RegisterService(&service);
86 builder.AddListeningPort(address(), InsecureServerCredentials());
87 auto grpc_server = builder.BuildAndStart();
88 server_ = grpc_server.get();
89 {
90 grpc_core::MutexLock lock(&mu_);
91 cond_.SignalAll();
92 }
93 grpc_server->Wait();
94 }
95
96 int port_;
97 grpc_core::Mutex mu_;
98 grpc_core::CondVar cond_;
99 std::thread server_thread_;
100 grpc::Server* server_;
101 };
102
TEST(BackendMetricsLbPolicyTest,TestOobMetricsReceipt)103 TEST(BackendMetricsLbPolicyTest, TestOobMetricsReceipt) {
104 LoadReportTracker tracker;
105 grpc_core::CoreConfiguration::RegisterBuilder(RegisterBackendMetricsLbPolicy);
106 Server server;
107 ChannelArguments args = tracker.GetChannelArguments();
108 args.SetLoadBalancingPolicyName("test_backend_metrics_load_balancer");
109 auto channel = grpc::CreateCustomChannel(server.address(),
110 InsecureChannelCredentials(), args);
111 auto stub = grpc::testing::TestService::Stub(channel);
112 ClientContext ctx;
113 SimpleRequest req;
114 SimpleResponse res;
115 grpc_core::Mutex mu;
116 grpc_core::CondVar cond;
117 absl::optional<Status> status;
118
119 stub.async()->UnaryCall(&ctx, &req, &res, [&](auto s) {
120 grpc_core::MutexLock lock(&mu);
121 status = s;
122 cond.SignalAll();
123 });
124 // This report is sent on start, available immediately
125 auto report = tracker.WaitForOobLoadReport(
126 [](auto report) { return report.cpu_utilization() == 0.5; },
127 absl::Seconds(5) * grpc_test_slowdown_factor(), 3);
128 ASSERT_TRUE(report.has_value());
129 EXPECT_EQ(report->cpu_utilization(), 0.5);
130 for (size_t i = 0; i < 3; i++) {
131 // Wait for slightly more than 1 min
132 report = tracker.WaitForOobLoadReport(
133 [](auto report) { return report.cpu_utilization() == 0.5; },
134 absl::Milliseconds(1500), 3);
135 ASSERT_TRUE(report.has_value());
136 EXPECT_EQ(report->cpu_utilization(), 0.5);
137 }
138 {
139 grpc_core::MutexLock lock(&mu);
140 if (!status.has_value()) {
141 cond.Wait(&mu);
142 }
143 ASSERT_TRUE(status.has_value());
144 EXPECT_EQ(status->error_code(), grpc::StatusCode::OK);
145 }
146 }
147
148 } // namespace
149 } // namespace testing
150 } // namespace grpc
151
main(int argc,char ** argv)152 int main(int argc, char** argv) {
153 ::testing::InitGoogleTest(&argc, argv);
154 grpc::testing::TestEnvironment env(&argc, argv);
155 grpc_init();
156 auto result = RUN_ALL_TESTS();
157 grpc_shutdown();
158 return result;
159 }
160