xref: /aosp_15_r20/external/grpc-grpc/test/cpp/interop/observability_client.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <memory>
20 #include <unordered_map>
21 
22 #include "absl/flags/flag.h"
23 #include "opentelemetry/exporters/prometheus/exporter_factory.h"
24 #include "opentelemetry/exporters/prometheus/exporter_options.h"
25 #include "opentelemetry/sdk/metrics/meter_provider.h"
26 
27 #include <grpc/grpc.h>
28 #include <grpc/support/alloc.h>
29 #include <grpc/support/log.h>
30 #include <grpcpp/channel.h>
31 #include <grpcpp/client_context.h>
32 #include <grpcpp/ext/gcp_observability.h>
33 #include <grpcpp/ext/otel_plugin.h>
34 
35 #include "src/core/lib/gpr/string.h"
36 #include "src/core/lib/gprpp/crash.h"
37 #include "test/core/util/test_config.h"
38 #include "test/cpp/interop/client_helper.h"
39 #include "test/cpp/interop/interop_client.h"
40 #include "test/cpp/util/test_config.h"
41 
42 ABSL_FLAG(bool, use_alts, false,
43           "Whether to use alts. Enable alts will disable tls.");
44 ABSL_FLAG(bool, use_tls, false, "Whether to use tls.");
45 ABSL_FLAG(std::string, custom_credentials_type, "",
46           "User provided credentials type.");
47 ABSL_FLAG(bool, use_test_ca, false, "False to use SSL roots for google");
48 ABSL_FLAG(int32_t, server_port, 0, "Server port.");
49 ABSL_FLAG(std::string, server_host, "localhost", "Server host to connect to");
50 ABSL_FLAG(std::string, server_host_override, "",
51           "Override the server host which is sent in HTTP header");
52 ABSL_FLAG(
53     std::string, test_case, "large_unary",
54     "Configure different test cases. Valid options are:\n\n"
55     "all : all test cases;\n"
56 
57     // TODO(veblush): Replace the help message with the following full message
58     // once Abseil fixes the flag-help compiler error on Windows. (b/171659833)
59     //
60     //"cancel_after_begin : cancel stream after starting it;\n"
61     //"cancel_after_first_response: cancel on first response;\n"
62     //"channel_soak: sends 'soak_iterations' rpcs, rebuilds channel each
63     // time;\n" "client_compressed_streaming : compressed request streaming with
64     //" "client_compressed_unary : single compressed request;\n"
65     //"client_streaming : request streaming with single response;\n"
66     //"compute_engine_creds: large_unary with compute engine auth;\n"
67     //"custom_metadata: server will echo custom metadata;\n"
68     //"empty_stream : bi-di stream with no request/response;\n"
69     //"empty_unary : empty (zero bytes) request and response;\n"
70     //"google_default_credentials: large unary using GDC;\n"
71     //"half_duplex : half-duplex streaming;\n"
72     //"jwt_token_creds: large_unary with JWT token auth;\n"
73     //"large_unary : single request and (large) response;\n"
74     //"long_lived_channel: sends large_unary rpcs over a long-lived channel;\n"
75     //"oauth2_auth_token: raw oauth2 access token auth;\n"
76     //"per_rpc_creds: raw oauth2 access token on a single rpc;\n"
77     //"ping_pong : full-duplex streaming;\n"
78     //"response streaming;\n"
79     //"rpc_soak: 'sends soak_iterations' large_unary rpcs;\n"
80     //"server_compressed_streaming : single request with compressed "
81     //"server_compressed_unary : single compressed response;\n"
82     //"server_streaming : single request with response streaming;\n"
83     //"slow_consumer : single request with response streaming with "
84     //"slow client consumer;\n"
85     //"special_status_message: verify Unicode and whitespace in status
86     // message;\n" "status_code_and_message: verify status code & message;\n"
87     //"timeout_on_sleeping_server: deadline exceeds on stream;\n"
88     //"unimplemented_method: client calls an unimplemented method;\n"
89     //"unimplemented_service: client calls an unimplemented service;\n"
90     //
91 );
92 ABSL_FLAG(int32_t, num_times, 1, "Number of times to run the test case");
93 ABSL_FLAG(std::string, default_service_account, "",
94           "Email of GCE default service account");
95 ABSL_FLAG(std::string, service_account_key_file, "",
96           "Path to service account json key file.");
97 ABSL_FLAG(std::string, oauth_scope, "", "Scope for OAuth tokens.");
98 ABSL_FLAG(bool, do_not_abort_on_transient_failures, false,
99           "If set to 'true', abort() is not called in case of transient "
100           "failures (i.e failures that are temporary and will likely go away "
101           "on retrying; like a temporary connection failure) and an error "
102           "message is printed instead. Note that this flag just controls "
103           "whether abort() is called or not. It does not control whether the "
104           "test is retried in case of transient failures (and currently the "
105           "interop tests are not retried even if this flag is set to true)");
106 ABSL_FLAG(int32_t, soak_iterations, 1000,
107           "The number of iterations to use for the two soak tests; rpc_soak "
108           "and channel_soak.");
109 ABSL_FLAG(int32_t, soak_max_failures, 0,
110           "The number of iterations in soak tests that are allowed to fail "
111           "(either due to non-OK status code or exceeding the "
112           "per-iteration max acceptable latency).");
113 ABSL_FLAG(int32_t, soak_per_iteration_max_acceptable_latency_ms, 0,
114           "The number of milliseconds a single iteration in the two soak "
115           "tests (rpc_soak and channel_soak) should take.");
116 ABSL_FLAG(int32_t, soak_overall_timeout_seconds, 0,
117           "The overall number of seconds after which a soak test should "
118           "stop and fail, if the desired number of iterations have not yet "
119           "completed.");
120 ABSL_FLAG(int32_t, soak_min_time_ms_between_rpcs, 0,
121           "The minimum time in milliseconds between consecutive RPCs in a "
122           "soak test (rpc_soak or channel_soak), useful for limiting QPS");
123 ABSL_FLAG(
124     int32_t, soak_request_size, 271828,
125     "The request size in a soak RPC. "
126     "The default value is set based on the interop large unary test case.");
127 ABSL_FLAG(
128     int32_t, soak_response_size, 314159,
129     "The response size in a soak RPC. "
130     "The default value is set based on the interop large unary test case.");
131 ABSL_FLAG(int32_t, iteration_interval, 10,
132           "The interval in seconds between rpcs. This is used by "
133           "long_connection test");
134 ABSL_FLAG(std::string, additional_metadata, "",
135           "Additional metadata to send in each request, as a "
136           "semicolon-separated list of key:value pairs.");
137 ABSL_FLAG(
138     bool, log_metadata_and_status, false,
139     "If set to 'true', will print received initial and trailing metadata, "
140     "grpc-status and error message to the console, in a stable format.");
141 ABSL_FLAG(bool, enable_observability, false,
142           "Whether to enable GCP Observability");
143 ABSL_FLAG(bool, enable_otel_plugin, false,
144           "Whether to enable OpenTelemetry Plugin");
145 
146 using grpc::testing::CreateChannelForTestCase;
147 using grpc::testing::GetServiceAccountJsonKey;
148 using grpc::testing::UpdateActions;
149 
150 namespace {
151 
152 // Parse the contents of FLAGS_additional_metadata into a map. Allow
153 // alphanumeric characters and dashes in keys, and any character but semicolons
154 // in values. Convert keys to lowercase. On failure, log an error and return
155 // false.
ParseAdditionalMetadataFlag(const std::string & flag,std::multimap<std::string,std::string> * additional_metadata)156 bool ParseAdditionalMetadataFlag(
157     const std::string& flag,
158     std::multimap<std::string, std::string>* additional_metadata) {
159   size_t start_pos = 0;
160   while (start_pos < flag.length()) {
161     size_t colon_pos = flag.find(':', start_pos);
162     if (colon_pos == std::string::npos) {
163       gpr_log(GPR_ERROR,
164               "Couldn't parse metadata flag: extra characters at end of flag");
165       return false;
166     }
167     size_t semicolon_pos = flag.find(';', colon_pos);
168 
169     std::string key = flag.substr(start_pos, colon_pos - start_pos);
170     std::string value =
171         flag.substr(colon_pos + 1, semicolon_pos - colon_pos - 1);
172 
173     constexpr char alphanum_and_hyphen[] =
174         "-0123456789"
175         "abcdefghijklmnopqrstuvwxyz"
176         "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
177     if (key.find_first_not_of(alphanum_and_hyphen) != std::string::npos) {
178       gpr_log(GPR_ERROR,
179               "Couldn't parse metadata flag: key contains characters other "
180               "than alphanumeric and hyphens: %s",
181               key.c_str());
182       return false;
183     }
184 
185     // Convert to lowercase.
186     for (char& c : key) {
187       if (c >= 'A' && c <= 'Z') {
188         c += ('a' - 'A');
189       }
190     }
191 
192     gpr_log(GPR_INFO, "Adding additional metadata with key %s and value %s",
193             key.c_str(), value.c_str());
194     additional_metadata->insert({key, value});
195 
196     if (semicolon_pos == std::string::npos) {
197       break;
198     } else {
199       start_pos = semicolon_pos + 1;
200     }
201   }
202 
203   return true;
204 }
205 
206 }  // namespace
207 
main(int argc,char ** argv)208 int main(int argc, char** argv) {
209   grpc::testing::TestEnvironment env(&argc, argv);
210   grpc::testing::InitTest(&argc, &argv, true);
211   gpr_log(GPR_INFO, "Testing these cases: %s",
212           absl::GetFlag(FLAGS_test_case).c_str());
213   int ret = 0;
214 
215   if (absl::GetFlag(FLAGS_enable_observability)) {
216     // TODO(someone): remove deprecated usage
217     // NOLINTNEXTLINE(clang-diagnostic-deprecated-declarations)
218     auto status = grpc::experimental::GcpObservabilityInit();
219     gpr_log(GPR_DEBUG, "GcpObservabilityInit() status_code: %d", status.code());
220     if (!status.ok()) {
221       return 1;
222     }
223   }
224 
225   // TODO(stanleycheung): switch to CsmObservabilityBuilder once xds setup is
226   // ready
227   if (absl::GetFlag(FLAGS_enable_otel_plugin)) {
228     gpr_log(GPR_DEBUG, "Registering Prometheus exporter");
229     opentelemetry::exporter::metrics::PrometheusExporterOptions opts;
230     // default was "localhost:9464" which causes connection issue across GKE
231     // pods
232     opts.url = "0.0.0.0:9464";
233     auto prometheus_exporter =
234         opentelemetry::exporter::metrics::PrometheusExporterFactory::Create(
235             opts);
236     auto meter_provider =
237         std::make_shared<opentelemetry::sdk::metrics::MeterProvider>();
238     meter_provider->AddMetricReader(std::move(prometheus_exporter));
239     grpc::OpenTelemetryPluginBuilder otel_builder;
240     otel_builder.SetMeterProvider(std::move(meter_provider));
241     assert(otel_builder.BuildAndRegisterGlobal().ok());
242   }
243 
244   grpc::testing::ChannelCreationFunc channel_creation_func;
245   std::string test_case = absl::GetFlag(FLAGS_test_case);
246   if (absl::GetFlag(FLAGS_additional_metadata).empty()) {
247     channel_creation_func = [test_case](auto arguments) {
248       std::vector<std::unique_ptr<
249           grpc::experimental::ClientInterceptorFactoryInterface>>
250           factories;
251       if (absl::GetFlag(FLAGS_log_metadata_and_status)) {
252         factories.emplace_back(
253             new grpc::testing::MetadataAndStatusLoggerInterceptorFactory());
254       }
255       return CreateChannelForTestCase(test_case, std::move(factories),
256                                       arguments);
257     };
258   } else {
259     std::multimap<std::string, std::string> additional_metadata;
260     if (!ParseAdditionalMetadataFlag(absl::GetFlag(FLAGS_additional_metadata),
261                                      &additional_metadata)) {
262       return 1;
263     }
264 
265     channel_creation_func = [test_case, additional_metadata](auto arguments) {
266       std::vector<std::unique_ptr<
267           grpc::experimental::ClientInterceptorFactoryInterface>>
268           factories;
269       factories.emplace_back(
270           new grpc::testing::AdditionalMetadataInterceptorFactory(
271               additional_metadata));
272       if (absl::GetFlag(FLAGS_log_metadata_and_status)) {
273         factories.emplace_back(
274             new grpc::testing::MetadataAndStatusLoggerInterceptorFactory());
275       }
276       return CreateChannelForTestCase(test_case, std::move(factories),
277                                       arguments);
278     };
279   }
280 
281   grpc::testing::InteropClient client(
282       channel_creation_func, true,
283       absl::GetFlag(FLAGS_do_not_abort_on_transient_failures));
284 
285   std::unordered_map<std::string, std::function<bool()>> actions;
286   actions["empty_unary"] =
287       std::bind(&grpc::testing::InteropClient::DoEmpty, &client);
288   actions["large_unary"] =
289       std::bind(&grpc::testing::InteropClient::DoLargeUnary, &client);
290   actions["server_compressed_unary"] = std::bind(
291       &grpc::testing::InteropClient::DoServerCompressedUnary, &client);
292   actions["client_compressed_unary"] = std::bind(
293       &grpc::testing::InteropClient::DoClientCompressedUnary, &client);
294   actions["client_streaming"] =
295       std::bind(&grpc::testing::InteropClient::DoRequestStreaming, &client);
296   actions["server_streaming"] =
297       std::bind(&grpc::testing::InteropClient::DoResponseStreaming, &client);
298   actions["server_compressed_streaming"] = std::bind(
299       &grpc::testing::InteropClient::DoServerCompressedStreaming, &client);
300   actions["client_compressed_streaming"] = std::bind(
301       &grpc::testing::InteropClient::DoClientCompressedStreaming, &client);
302   actions["slow_consumer"] = std::bind(
303       &grpc::testing::InteropClient::DoResponseStreamingWithSlowConsumer,
304       &client);
305   actions["half_duplex"] =
306       std::bind(&grpc::testing::InteropClient::DoHalfDuplex, &client);
307   actions["ping_pong"] =
308       std::bind(&grpc::testing::InteropClient::DoPingPong, &client);
309   actions["cancel_after_begin"] =
310       std::bind(&grpc::testing::InteropClient::DoCancelAfterBegin, &client);
311   actions["cancel_after_first_response"] = std::bind(
312       &grpc::testing::InteropClient::DoCancelAfterFirstResponse, &client);
313   actions["timeout_on_sleeping_server"] = std::bind(
314       &grpc::testing::InteropClient::DoTimeoutOnSleepingServer, &client);
315   actions["empty_stream"] =
316       std::bind(&grpc::testing::InteropClient::DoEmptyStream, &client);
317   actions["pick_first_unary"] =
318       std::bind(&grpc::testing::InteropClient::DoPickFirstUnary, &client);
319   if (absl::GetFlag(FLAGS_use_tls)) {
320     actions["compute_engine_creds"] =
321         std::bind(&grpc::testing::InteropClient::DoComputeEngineCreds, &client,
322                   absl::GetFlag(FLAGS_default_service_account),
323                   absl::GetFlag(FLAGS_oauth_scope));
324     actions["jwt_token_creds"] =
325         std::bind(&grpc::testing::InteropClient::DoJwtTokenCreds, &client,
326                   GetServiceAccountJsonKey());
327     actions["oauth2_auth_token"] =
328         std::bind(&grpc::testing::InteropClient::DoOauth2AuthToken, &client,
329                   absl::GetFlag(FLAGS_default_service_account),
330                   absl::GetFlag(FLAGS_oauth_scope));
331     actions["per_rpc_creds"] =
332         std::bind(&grpc::testing::InteropClient::DoPerRpcCreds, &client,
333                   GetServiceAccountJsonKey());
334   }
335   if (absl::GetFlag(FLAGS_custom_credentials_type) ==
336       "google_default_credentials") {
337     actions["google_default_credentials"] =
338         std::bind(&grpc::testing::InteropClient::DoGoogleDefaultCredentials,
339                   &client, absl::GetFlag(FLAGS_default_service_account));
340   }
341   actions["status_code_and_message"] =
342       std::bind(&grpc::testing::InteropClient::DoStatusWithMessage, &client);
343   actions["special_status_message"] =
344       std::bind(&grpc::testing::InteropClient::DoSpecialStatusMessage, &client);
345   actions["custom_metadata"] =
346       std::bind(&grpc::testing::InteropClient::DoCustomMetadata, &client);
347   actions["unimplemented_method"] =
348       std::bind(&grpc::testing::InteropClient::DoUnimplementedMethod, &client);
349   actions["unimplemented_service"] =
350       std::bind(&grpc::testing::InteropClient::DoUnimplementedService, &client);
351   actions["channel_soak"] = std::bind(
352       &grpc::testing::InteropClient::DoChannelSoakTest, &client,
353       absl::GetFlag(FLAGS_server_host), absl::GetFlag(FLAGS_soak_iterations),
354       absl::GetFlag(FLAGS_soak_max_failures),
355       absl::GetFlag(FLAGS_soak_per_iteration_max_acceptable_latency_ms),
356       absl::GetFlag(FLAGS_soak_min_time_ms_between_rpcs),
357       absl::GetFlag(FLAGS_soak_overall_timeout_seconds),
358       absl::GetFlag(FLAGS_soak_request_size),
359       absl::GetFlag(FLAGS_soak_response_size));
360   actions["rpc_soak"] = std::bind(
361       &grpc::testing::InteropClient::DoRpcSoakTest, &client,
362       absl::GetFlag(FLAGS_server_host), absl::GetFlag(FLAGS_soak_iterations),
363       absl::GetFlag(FLAGS_soak_max_failures),
364       absl::GetFlag(FLAGS_soak_per_iteration_max_acceptable_latency_ms),
365       absl::GetFlag(FLAGS_soak_min_time_ms_between_rpcs),
366       absl::GetFlag(FLAGS_soak_overall_timeout_seconds),
367       absl::GetFlag(FLAGS_soak_request_size),
368       absl::GetFlag(FLAGS_soak_response_size));
369   actions["long_lived_channel"] =
370       std::bind(&grpc::testing::InteropClient::DoLongLivedChannelTest, &client,
371                 absl::GetFlag(FLAGS_soak_iterations),
372                 absl::GetFlag(FLAGS_iteration_interval));
373 
374   UpdateActions(&actions);
375 
376   if (absl::GetFlag(FLAGS_test_case) == "all") {
377     for (const auto& action : actions) {
378       for (int i = 0; i < absl::GetFlag(FLAGS_num_times); i++) {
379         action.second();
380       }
381     }
382   } else if (actions.find(absl::GetFlag(FLAGS_test_case)) != actions.end()) {
383     for (int i = 0; i < absl::GetFlag(FLAGS_num_times); i++) {
384       actions.find(absl::GetFlag(FLAGS_test_case))->second();
385     }
386   } else {
387     std::string test_cases;
388     for (const auto& action : actions) {
389       if (!test_cases.empty()) test_cases += "\n";
390       test_cases += action.first;
391     }
392     gpr_log(GPR_ERROR, "Unsupported test case %s. Valid options are\n%s",
393             absl::GetFlag(FLAGS_test_case).c_str(), test_cases.c_str());
394     ret = 1;
395   }
396 
397   if (absl::GetFlag(FLAGS_enable_observability)) {
398     // TODO(someone): remove deprecated usage
399     // NOLINTNEXTLINE(clang-diagnostic-deprecated-declarations)
400     grpc::experimental::GcpObservabilityClose();
401   }
402 
403   return ret;
404 }
405