xref: /aosp_15_r20/external/grpc-grpc/test/cpp/end2end/server_load_reporting_end2end_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <thread>
20 
21 #include <gmock/gmock.h>
22 #include <gtest/gtest.h>
23 
24 #include <grpc++/grpc++.h>
25 #include <grpc/grpc.h>
26 #include <grpc/support/log.h>
27 #include <grpc/support/port_platform.h>
28 #include <grpc/support/string_util.h>
29 #include <grpcpp/ext/server_load_reporting.h>
30 #include <grpcpp/server_builder.h>
31 
32 #include "src/core/client_channel/backup_poller.h"
33 #include "src/core/lib/config/config_vars.h"
34 #include "src/core/lib/gprpp/crash.h"
35 #include "src/proto/grpc/lb/v1/load_reporter.grpc.pb.h"
36 #include "src/proto/grpc/testing/echo.grpc.pb.h"
37 #include "test/core/util/port.h"
38 #include "test/core/util/test_config.h"
39 
40 namespace grpc {
41 namespace testing {
42 namespace {
43 
44 constexpr double kMetricValue = 3.1415;
45 constexpr char kMetricName[] = "METRIC_PI";
46 
47 // Different messages result in different response statuses. For simplicity in
48 // computing request bytes, the message sizes should be the same.
49 const char kOkMessage[] = "hello";
50 const char kServerErrorMessage[] = "sverr";
51 const char kClientErrorMessage[] = "clerr";
52 
53 class EchoTestServiceImpl : public EchoTestService::Service {
54  public:
~EchoTestServiceImpl()55   ~EchoTestServiceImpl() override {}
56 
Echo(ServerContext * context,const EchoRequest * request,EchoResponse * response)57   Status Echo(ServerContext* context, const EchoRequest* request,
58               EchoResponse* response) override {
59     if (request->message() == kServerErrorMessage) {
60       return Status(StatusCode::UNKNOWN, "Server error requested");
61     }
62     if (request->message() == kClientErrorMessage) {
63       return Status(StatusCode::FAILED_PRECONDITION, "Client error requested");
64     }
65     response->set_message(request->message());
66     grpc::load_reporter::experimental::AddLoadReportingCost(
67         context, kMetricName, kMetricValue);
68     return Status::OK;
69   }
70 };
71 
72 class ServerLoadReportingEnd2endTest : public ::testing::Test {
73  protected:
SetUp()74   void SetUp() override {
75     server_address_ =
76         "localhost:" + std::to_string(grpc_pick_unused_port_or_die());
77     server_ =
78         ServerBuilder()
79             .AddListeningPort(server_address_, InsecureServerCredentials())
80             .RegisterService(&echo_service_)
81             .SetOption(std::unique_ptr<grpc::ServerBuilderOption>(
82                 new grpc::load_reporter::experimental::
83                     LoadReportingServiceServerBuilderOption()))
84             .BuildAndStart();
85     server_thread_ =
86         std::thread(&ServerLoadReportingEnd2endTest::RunServerLoop, this);
87   }
88 
RunServerLoop()89   void RunServerLoop() { server_->Wait(); }
90 
TearDown()91   void TearDown() override {
92     server_->Shutdown();
93     server_thread_.join();
94   }
95 
ClientMakeEchoCalls(const std::string & lb_id,const std::string & lb_tag,const std::string & message,size_t num_requests)96   void ClientMakeEchoCalls(const std::string& lb_id, const std::string& lb_tag,
97                            const std::string& message, size_t num_requests) {
98     auto stub = EchoTestService::NewStub(
99         grpc::CreateChannel(server_address_, InsecureChannelCredentials()));
100     std::string lb_token = lb_id + lb_tag;
101     for (size_t i = 0; i < num_requests; ++i) {
102       ClientContext ctx;
103       if (!lb_token.empty()) ctx.AddMetadata(GRPC_LB_TOKEN_MD_KEY, lb_token);
104       EchoRequest request;
105       EchoResponse response;
106       request.set_message(message);
107       Status status = stub->Echo(&ctx, request, &response);
108       if (message == kOkMessage) {
109         ASSERT_EQ(status.error_code(), StatusCode::OK);
110         ASSERT_EQ(request.message(), response.message());
111       } else if (message == kServerErrorMessage) {
112         ASSERT_EQ(status.error_code(), StatusCode::UNKNOWN);
113       } else if (message == kClientErrorMessage) {
114         ASSERT_EQ(status.error_code(), StatusCode::FAILED_PRECONDITION);
115       }
116     }
117   }
118 
119   std::string server_address_;
120   std::unique_ptr<Server> server_;
121   std::thread server_thread_;
122   EchoTestServiceImpl echo_service_;
123 };
124 
TEST_F(ServerLoadReportingEnd2endTest,NoCall)125 TEST_F(ServerLoadReportingEnd2endTest, NoCall) {}
126 
TEST_F(ServerLoadReportingEnd2endTest,BasicReport)127 TEST_F(ServerLoadReportingEnd2endTest, BasicReport) {
128   auto channel =
129       grpc::CreateChannel(server_address_, InsecureChannelCredentials());
130   auto stub = grpc::lb::v1::LoadReporter::NewStub(channel);
131   ClientContext ctx;
132   auto stream = stub->ReportLoad(&ctx);
133   grpc::lb::v1::LoadReportRequest request;
134   request.mutable_initial_request()->set_load_balanced_hostname(
135       server_address_);
136   request.mutable_initial_request()->set_load_key("LOAD_KEY");
137   request.mutable_initial_request()
138       ->mutable_load_report_interval()
139       ->set_seconds(5);
140   stream->Write(request);
141   gpr_log(GPR_INFO, "Initial request sent.");
142   grpc::lb::v1::LoadReportResponse response;
143   stream->Read(&response);
144   const std::string& lb_id = response.initial_response().load_balancer_id();
145   gpr_log(GPR_INFO, "Initial response received (lb_id: %s).", lb_id.c_str());
146   ClientMakeEchoCalls(lb_id, "LB_TAG", kOkMessage, 1);
147 
148   unsigned load_count = 0;
149   bool got_in_progress = false;
150   bool got_orphaned = false;
151   bool got_calls = false;
152   while (load_count < 3) {
153     stream->Read(&response);
154     for (const auto& load : response.load()) {
155       load_count++;
156       if (load.in_progress_report_case()) {
157         // The special load record that reports the number of in-progress
158         // calls.
159         EXPECT_EQ(load.num_calls_in_progress(), 1);
160         EXPECT_FALSE(got_in_progress);
161         got_in_progress = true;
162       } else if (load.orphaned_load_case()) {
163         // The call from the balancer doesn't have any valid LB token.
164         EXPECT_EQ(load.orphaned_load_case(), load.kLoadKeyUnknown);
165         EXPECT_EQ(load.num_calls_started(), 1);
166         EXPECT_EQ(load.num_calls_finished_without_error(), 0);
167         EXPECT_EQ(load.num_calls_finished_with_error(), 0);
168         EXPECT_FALSE(got_orphaned);
169         got_orphaned = true;
170       } else {
171         // This corresponds to the calls from the client.
172         EXPECT_EQ(load.num_calls_started(), 1);
173         EXPECT_EQ(load.num_calls_finished_without_error(), 1);
174         EXPECT_EQ(load.num_calls_finished_with_error(), 0);
175         EXPECT_GE(load.total_bytes_received(), sizeof(kOkMessage));
176         EXPECT_GE(load.total_bytes_sent(), sizeof(kOkMessage));
177         EXPECT_EQ(load.metric_data().size(), 1);
178         EXPECT_EQ(load.metric_data().Get(0).metric_name(), kMetricName);
179         EXPECT_EQ(load.metric_data().Get(0).num_calls_finished_with_metric(),
180                   1);
181         EXPECT_EQ(load.metric_data().Get(0).total_metric_value(), kMetricValue);
182         EXPECT_FALSE(got_calls);
183         got_calls = true;
184       }
185     }
186   }
187   EXPECT_EQ(load_count, 3);
188   EXPECT_TRUE(got_in_progress);
189   EXPECT_TRUE(got_orphaned);
190   EXPECT_TRUE(got_calls);
191   stream->WritesDone();
192   EXPECT_EQ(stream->Finish().error_code(), StatusCode::CANCELLED);
193 }
194 
195 // TODO(juanlishen): Add more tests.
196 
197 }  // namespace
198 }  // namespace testing
199 }  // namespace grpc
200 
main(int argc,char ** argv)201 int main(int argc, char** argv) {
202   grpc::testing::TestEnvironment env(&argc, argv);
203   ::testing::InitGoogleTest(&argc, argv);
204   // Make the backup poller poll very frequently in order to pick up
205   // updates from all the subchannels's FDs.
206   grpc_core::ConfigVars::Overrides config_overrides;
207   config_overrides.client_channel_backup_poll_interval_ms = 1;
208   grpc_core::ConfigVars::SetOverrides(config_overrides);
209   return RUN_ALL_TESTS();
210 }
211