xref: /aosp_15_r20/external/grpc-grpc/test/cpp/end2end/client_lb_end2end_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2016 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <algorithm>
16 #include <chrono>
17 #include <deque>
18 #include <memory>
19 #include <mutex>
20 #include <random>
21 #include <set>
22 #include <string>
23 #include <thread>
24 
25 #include <gmock/gmock.h>
26 #include <gtest/gtest.h>
27 
28 #include "absl/memory/memory.h"
29 #include "absl/strings/str_cat.h"
30 #include "absl/strings/str_format.h"
31 #include "absl/strings/str_join.h"
32 #include "absl/strings/string_view.h"
33 
34 #include <grpc/event_engine/endpoint_config.h>
35 #include <grpc/grpc.h>
36 #include <grpc/support/alloc.h>
37 #include <grpc/support/atm.h>
38 #include <grpc/support/log.h>
39 #include <grpc/support/time.h>
40 #include <grpcpp/channel.h>
41 #include <grpcpp/client_context.h>
42 #include <grpcpp/create_channel.h>
43 #include <grpcpp/ext/call_metric_recorder.h>
44 #include <grpcpp/ext/orca_service.h>
45 #include <grpcpp/ext/server_metric_recorder.h>
46 #include <grpcpp/health_check_service_interface.h>
47 #include <grpcpp/impl/sync.h>
48 #include <grpcpp/server.h>
49 #include <grpcpp/server_builder.h>
50 
51 #include "src/core/client_channel/backup_poller.h"
52 #include "src/core/client_channel/config_selector.h"
53 #include "src/core/client_channel/global_subchannel_pool.h"
54 #include "src/core/lib/address_utils/parse_address.h"
55 #include "src/core/lib/address_utils/sockaddr_utils.h"
56 #include "src/core/lib/backoff/backoff.h"
57 #include "src/core/lib/channel/channel_args.h"
58 #include "src/core/lib/config/config_vars.h"
59 #include "src/core/lib/gprpp/crash.h"
60 #include "src/core/lib/gprpp/debug_location.h"
61 #include "src/core/lib/gprpp/env.h"
62 #include "src/core/lib/gprpp/notification.h"
63 #include "src/core/lib/gprpp/ref_counted_ptr.h"
64 #include "src/core/lib/gprpp/time.h"
65 #include "src/core/lib/iomgr/tcp_client.h"
66 #include "src/core/lib/security/credentials/fake/fake_credentials.h"
67 #include "src/core/lib/surface/server.h"
68 #include "src/core/lib/transport/connectivity_state.h"
69 #include "src/core/resolver/endpoint_addresses.h"
70 #include "src/core/resolver/fake/fake_resolver.h"
71 #include "src/core/service_config/service_config.h"
72 #include "src/core/service_config/service_config_impl.h"
73 #include "src/cpp/server/secure_server_credentials.h"
74 #include "src/proto/grpc/health/v1/health.grpc.pb.h"
75 #include "src/proto/grpc/testing/echo.grpc.pb.h"
76 #include "src/proto/grpc/testing/xds/v3/orca_load_report.pb.h"
77 #include "test/core/util/port.h"
78 #include "test/core/util/resolve_localhost_ip46.h"
79 #include "test/core/util/test_config.h"
80 #include "test/core/util/test_lb_policies.h"
81 #include "test/cpp/end2end/connection_attempt_injector.h"
82 #include "test/cpp/end2end/test_service_impl.h"
83 #include "test/cpp/util/credentials.h"
84 
85 namespace grpc {
86 namespace testing {
87 namespace {
88 
89 using xds::data::orca::v3::OrcaLoadReport;
90 constexpr char kRequestMessage[] = "Live long and prosper.";
91 
92 // A noop health check service that just terminates the call and returns OK
93 // status in its methods. This is used to test the retry mechanism in
94 // SubchannelStreamClient.
95 class NoopHealthCheckServiceImpl : public health::v1::Health::Service {
96  public:
97   ~NoopHealthCheckServiceImpl() override = default;
Check(ServerContext *,const health::v1::HealthCheckRequest *,health::v1::HealthCheckResponse *)98   Status Check(ServerContext*, const health::v1::HealthCheckRequest*,
99                health::v1::HealthCheckResponse*) override {
100     return Status::OK;
101   }
Watch(ServerContext *,const health::v1::HealthCheckRequest *,ServerWriter<health::v1::HealthCheckResponse> *)102   Status Watch(ServerContext*, const health::v1::HealthCheckRequest*,
103                ServerWriter<health::v1::HealthCheckResponse>*) override {
104     grpc_core::MutexLock lock(&mu_);
105     request_count_++;
106     return Status::OK;
107   }
request_count()108   int request_count() {
109     grpc_core::MutexLock lock(&mu_);
110     return request_count_;
111   }
112 
113  private:
114   grpc_core::Mutex mu_;
115   int request_count_ ABSL_GUARDED_BY(&mu_) = 0;
116 };
117 
118 // Subclass of TestServiceImpl that increments a request counter for
119 // every call to the Echo RPC.
120 class MyTestServiceImpl : public TestServiceImpl {
121  public:
Echo(ServerContext * context,const EchoRequest * request,EchoResponse * response)122   Status Echo(ServerContext* context, const EchoRequest* request,
123               EchoResponse* response) override {
124     {
125       grpc_core::MutexLock lock(&mu_);
126       ++request_count_;
127     }
128     AddClient(context->peer());
129     if (request->has_param() && request->param().has_backend_metrics()) {
130       const auto& request_metrics = request->param().backend_metrics();
131       auto* recorder = context->ExperimentalGetCallMetricRecorder();
132       EXPECT_NE(recorder, nullptr);
133       // Do not record when zero since it indicates no test per-call report.
134       if (request_metrics.application_utilization() > 0) {
135         recorder->RecordApplicationUtilizationMetric(
136             request_metrics.application_utilization());
137       }
138       if (request_metrics.cpu_utilization() > 0) {
139         recorder->RecordCpuUtilizationMetric(request_metrics.cpu_utilization());
140       }
141       if (request_metrics.mem_utilization() > 0) {
142         recorder->RecordMemoryUtilizationMetric(
143             request_metrics.mem_utilization());
144       }
145       if (request_metrics.rps_fractional() > 0) {
146         recorder->RecordQpsMetric(request_metrics.rps_fractional());
147       }
148       if (request_metrics.eps() > 0) {
149         recorder->RecordEpsMetric(request_metrics.eps());
150       }
151       for (const auto& p : request_metrics.request_cost()) {
152         char* key = static_cast<char*>(
153             grpc_call_arena_alloc(context->c_call(), p.first.size() + 1));
154         strncpy(key, p.first.data(), p.first.size());
155         key[p.first.size()] = '\0';
156         recorder->RecordRequestCostMetric(key, p.second);
157       }
158       for (const auto& p : request_metrics.utilization()) {
159         char* key = static_cast<char*>(
160             grpc_call_arena_alloc(context->c_call(), p.first.size() + 1));
161         strncpy(key, p.first.data(), p.first.size());
162         key[p.first.size()] = '\0';
163         recorder->RecordUtilizationMetric(key, p.second);
164       }
165       for (const auto& p : request_metrics.named_metrics()) {
166         char* key = static_cast<char*>(
167             grpc_call_arena_alloc(context->c_call(), p.first.size() + 1));
168         strncpy(key, p.first.data(), p.first.size());
169         key[p.first.size()] = '\0';
170         recorder->RecordNamedMetric(key, p.second);
171       }
172     }
173     return TestServiceImpl::Echo(context, request, response);
174   }
175 
request_count()176   size_t request_count() {
177     grpc_core::MutexLock lock(&mu_);
178     return request_count_;
179   }
180 
ResetCounters()181   void ResetCounters() {
182     grpc_core::MutexLock lock(&mu_);
183     request_count_ = 0;
184   }
185 
clients()186   std::set<std::string> clients() {
187     grpc_core::MutexLock lock(&clients_mu_);
188     return clients_;
189   }
190 
191  private:
AddClient(const std::string & client)192   void AddClient(const std::string& client) {
193     grpc_core::MutexLock lock(&clients_mu_);
194     clients_.insert(client);
195   }
196 
197   grpc_core::Mutex mu_;
198   size_t request_count_ ABSL_GUARDED_BY(&mu_) = 0;
199 
200   grpc_core::Mutex clients_mu_;
201   std::set<std::string> clients_ ABSL_GUARDED_BY(&clients_mu_);
202 };
203 
204 class FakeResolverResponseGeneratorWrapper {
205  public:
FakeResolverResponseGeneratorWrapper()206   FakeResolverResponseGeneratorWrapper()
207       : response_generator_(grpc_core::MakeRefCounted<
208                             grpc_core::FakeResolverResponseGenerator>()) {}
209 
FakeResolverResponseGeneratorWrapper(FakeResolverResponseGeneratorWrapper && other)210   FakeResolverResponseGeneratorWrapper(
211       FakeResolverResponseGeneratorWrapper&& other) noexcept {
212     response_generator_ = std::move(other.response_generator_);
213   }
214 
SetResponse(grpc_core::Resolver::Result result)215   void SetResponse(grpc_core::Resolver::Result result) {
216     grpc_core::ExecCtx exec_ctx;
217     response_generator_->SetResponseSynchronously(std::move(result));
218   }
219 
SetNextResolution(const std::vector<int> & ports,const char * service_config_json=nullptr,const grpc_core::ChannelArgs & per_address_args=grpc_core::ChannelArgs ())220   void SetNextResolution(const std::vector<int>& ports,
221                          const char* service_config_json = nullptr,
222                          const grpc_core::ChannelArgs& per_address_args =
223                              grpc_core::ChannelArgs()) {
224     SetResponse(BuildFakeResults(ports, service_config_json, per_address_args));
225   }
226 
Get() const227   grpc_core::FakeResolverResponseGenerator* Get() const {
228     return response_generator_.get();
229   }
230 
231  private:
BuildFakeResults(const std::vector<int> & ports,const char * service_config_json=nullptr,const grpc_core::ChannelArgs & per_address_args=grpc_core::ChannelArgs ())232   static grpc_core::Resolver::Result BuildFakeResults(
233       const std::vector<int>& ports, const char* service_config_json = nullptr,
234       const grpc_core::ChannelArgs& per_address_args =
235           grpc_core::ChannelArgs()) {
236     grpc_core::Resolver::Result result;
237     result.addresses = grpc_core::EndpointAddressesList();
238     for (const int& port : ports) {
239       absl::StatusOr<grpc_core::URI> lb_uri =
240           grpc_core::URI::Parse(grpc_core::LocalIpUri(port));
241       GPR_ASSERT(lb_uri.ok());
242       grpc_resolved_address address;
243       GPR_ASSERT(grpc_parse_uri(*lb_uri, &address));
244       result.addresses->emplace_back(address, per_address_args);
245     }
246     if (result.addresses->empty()) {
247       result.resolution_note = "fake resolver empty address list";
248     }
249     if (service_config_json != nullptr) {
250       result.service_config = grpc_core::ServiceConfigImpl::Create(
251           grpc_core::ChannelArgs(), service_config_json);
252       EXPECT_TRUE(result.service_config.ok()) << result.service_config.status();
253     }
254     return result;
255   }
256 
257   grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
258       response_generator_;
259 };
260 
261 class ClientLbEnd2endTest : public ::testing::Test {
262  protected:
ClientLbEnd2endTest()263   ClientLbEnd2endTest()
264       : server_host_("localhost"),
265         creds_(std::make_shared<FakeTransportSecurityChannelCredentials>()) {}
266 
SetUp()267   void SetUp() override { grpc_init(); }
268 
TearDown()269   void TearDown() override {
270     for (size_t i = 0; i < servers_.size(); ++i) {
271       servers_[i]->Shutdown();
272     }
273     servers_.clear();
274     creds_.reset();
275     grpc_shutdown();
276   }
277 
CreateServers(size_t num_servers,std::vector<int> ports=std::vector<int> ())278   void CreateServers(size_t num_servers,
279                      std::vector<int> ports = std::vector<int>()) {
280     servers_.clear();
281     for (size_t i = 0; i < num_servers; ++i) {
282       int port = 0;
283       if (ports.size() == num_servers) port = ports[i];
284       servers_.emplace_back(new ServerData(port));
285     }
286   }
287 
StartServer(size_t index)288   void StartServer(size_t index) { servers_[index]->Start(server_host_); }
289 
StartServers(size_t num_servers,std::vector<int> ports=std::vector<int> ())290   void StartServers(size_t num_servers,
291                     std::vector<int> ports = std::vector<int>()) {
292     CreateServers(num_servers, std::move(ports));
293     for (size_t i = 0; i < num_servers; ++i) {
294       StartServer(i);
295     }
296   }
297 
GetServersPorts(size_t start_index=0,size_t stop_index=0)298   std::vector<int> GetServersPorts(size_t start_index = 0,
299                                    size_t stop_index = 0) {
300     if (stop_index == 0) stop_index = servers_.size();
301     std::vector<int> ports;
302     for (size_t i = start_index; i < stop_index; ++i) {
303       ports.push_back(servers_[i]->port_);
304     }
305     return ports;
306   }
307 
BuildStub(const std::shared_ptr<Channel> & channel)308   std::unique_ptr<grpc::testing::EchoTestService::Stub> BuildStub(
309       const std::shared_ptr<Channel>& channel) {
310     return grpc::testing::EchoTestService::NewStub(channel);
311   }
312 
BuildChannel(const std::string & lb_policy_name,const FakeResolverResponseGeneratorWrapper & response_generator,ChannelArguments args=ChannelArguments ())313   std::shared_ptr<Channel> BuildChannel(
314       const std::string& lb_policy_name,
315       const FakeResolverResponseGeneratorWrapper& response_generator,
316       ChannelArguments args = ChannelArguments()) {
317     if (!lb_policy_name.empty()) {
318       args.SetLoadBalancingPolicyName(lb_policy_name);
319     }  // else, default to pick first
320     args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
321                     response_generator.Get());
322     return grpc::CreateCustomChannel("fake:default.example.com", creds_, args);
323   }
324 
SendRpc(const std::unique_ptr<grpc::testing::EchoTestService::Stub> & stub,EchoResponse * response=nullptr,int timeout_ms=1000,bool wait_for_ready=false,EchoRequest * request=nullptr)325   Status SendRpc(
326       const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
327       EchoResponse* response = nullptr, int timeout_ms = 1000,
328       bool wait_for_ready = false, EchoRequest* request = nullptr) {
329     EchoResponse local_response;
330     if (response == nullptr) response = &local_response;
331     EchoRequest local_request;
332     if (request == nullptr) request = &local_request;
333     request->set_message(kRequestMessage);
334     request->mutable_param()->set_echo_metadata(true);
335     ClientContext context;
336     context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms));
337     if (wait_for_ready) context.set_wait_for_ready(true);
338     context.AddMetadata("foo", "1");
339     context.AddMetadata("bar", "2");
340     context.AddMetadata("baz", "3");
341     return stub->Echo(&context, *request, response);
342   }
343 
CheckRpcSendOk(const grpc_core::DebugLocation & location,const std::unique_ptr<grpc::testing::EchoTestService::Stub> & stub,bool wait_for_ready=false,const OrcaLoadReport * load_report=nullptr,int timeout_ms=2000)344   void CheckRpcSendOk(
345       const grpc_core::DebugLocation& location,
346       const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
347       bool wait_for_ready = false, const OrcaLoadReport* load_report = nullptr,
348       int timeout_ms = 2000) {
349     EchoResponse response;
350     EchoRequest request;
351     EchoRequest* request_ptr = nullptr;
352     if (load_report != nullptr) {
353       request_ptr = &request;
354       auto params = request.mutable_param();
355       auto backend_metrics = params->mutable_backend_metrics();
356       *backend_metrics = *load_report;
357     }
358     Status status =
359         SendRpc(stub, &response, timeout_ms, wait_for_ready, request_ptr);
360     ASSERT_TRUE(status.ok())
361         << "From " << location.file() << ":" << location.line()
362         << "\nError: " << status.error_message() << " "
363         << status.error_details();
364     ASSERT_EQ(response.message(), kRequestMessage)
365         << "From " << location.file() << ":" << location.line();
366   }
367 
CheckRpcSendFailure(const grpc_core::DebugLocation & location,const std::unique_ptr<grpc::testing::EchoTestService::Stub> & stub,StatusCode expected_status,absl::string_view expected_message_regex)368   void CheckRpcSendFailure(
369       const grpc_core::DebugLocation& location,
370       const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
371       StatusCode expected_status, absl::string_view expected_message_regex) {
372     Status status = SendRpc(stub);
373     EXPECT_FALSE(status.ok());
374     EXPECT_EQ(expected_status, status.error_code())
375         << location.file() << ":" << location.line();
376     EXPECT_THAT(status.error_message(),
377                 ::testing::MatchesRegex(expected_message_regex))
378         << location.file() << ":" << location.line();
379   }
380 
SendRpcsUntil(const grpc_core::DebugLocation & debug_location,const std::unique_ptr<grpc::testing::EchoTestService::Stub> & stub,absl::AnyInvocable<bool (const Status &)> continue_predicate,EchoRequest * request_ptr=nullptr,int timeout_ms=15000)381   void SendRpcsUntil(
382       const grpc_core::DebugLocation& debug_location,
383       const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
384       absl::AnyInvocable<bool(const Status&)> continue_predicate,
385       EchoRequest* request_ptr = nullptr, int timeout_ms = 15000) {
386     absl::Time deadline = absl::InfiniteFuture();
387     if (timeout_ms != 0) {
388       deadline = absl::Now() +
389                  (absl::Milliseconds(timeout_ms) * grpc_test_slowdown_factor());
390     }
391     while (true) {
392       Status status =
393           SendRpc(stub, /*response=*/nullptr, /*timeout_ms=*/1000,
394                   /*wait_for_ready=*/false, /*request=*/request_ptr);
395       if (!continue_predicate(status)) return;
396       EXPECT_LE(absl::Now(), deadline)
397           << debug_location.file() << ":" << debug_location.line();
398       if (absl::Now() >= deadline) break;
399     }
400   }
401 
402   struct ServerData {
403     const int port_;
404     std::unique_ptr<Server> server_;
405     MyTestServiceImpl service_;
406     std::unique_ptr<experimental::ServerMetricRecorder> server_metric_recorder_;
407     experimental::OrcaService orca_service_;
408     std::unique_ptr<std::thread> thread_;
409     bool enable_noop_health_check_service_ = false;
410     NoopHealthCheckServiceImpl noop_health_check_service_impl_;
411 
412     grpc_core::Mutex mu_;
413     grpc_core::CondVar cond_;
414     bool server_ready_ ABSL_GUARDED_BY(mu_) = false;
415     bool started_ ABSL_GUARDED_BY(mu_) = false;
416 
ServerDatagrpc::testing::__anon18215c690111::ClientLbEnd2endTest::ServerData417     explicit ServerData(int port = 0)
418         : port_(port > 0 ? port : grpc_pick_unused_port_or_die()),
419           server_metric_recorder_(experimental::ServerMetricRecorder::Create()),
420           orca_service_(
421               server_metric_recorder_.get(),
422               experimental::OrcaService::Options().set_min_report_duration(
423                   absl::Seconds(0.1))) {}
424 
Startgrpc::testing::__anon18215c690111::ClientLbEnd2endTest::ServerData425     void Start(const std::string& server_host) {
426       gpr_log(GPR_INFO, "starting server on port %d", port_);
427       grpc_core::MutexLock lock(&mu_);
428       started_ = true;
429       thread_ = std::make_unique<std::thread>(
430           std::bind(&ServerData::Serve, this, server_host));
431       while (!server_ready_) {
432         cond_.Wait(&mu_);
433       }
434       server_ready_ = false;
435       gpr_log(GPR_INFO, "server startup complete");
436     }
437 
Servegrpc::testing::__anon18215c690111::ClientLbEnd2endTest::ServerData438     void Serve(const std::string& server_host) {
439       std::ostringstream server_address;
440       server_address << server_host << ":" << port_;
441       ServerBuilder builder;
442       std::shared_ptr<ServerCredentials> creds(new SecureServerCredentials(
443           grpc_fake_transport_security_server_credentials_create()));
444       builder.AddListeningPort(server_address.str(), std::move(creds));
445       builder.RegisterService(&service_);
446       builder.RegisterService(&orca_service_);
447       if (enable_noop_health_check_service_) {
448         builder.RegisterService(&noop_health_check_service_impl_);
449       }
450       grpc::ServerBuilder::experimental_type(&builder)
451           .EnableCallMetricRecording(server_metric_recorder_.get());
452       server_ = builder.BuildAndStart();
453       grpc_core::MutexLock lock(&mu_);
454       server_ready_ = true;
455       cond_.Signal();
456     }
457 
Shutdowngrpc::testing::__anon18215c690111::ClientLbEnd2endTest::ServerData458     void Shutdown() {
459       grpc_core::MutexLock lock(&mu_);
460       if (!started_) return;
461       server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
462       thread_->join();
463       started_ = false;
464     }
465 
StopListeningAndSendGoawaysgrpc::testing::__anon18215c690111::ClientLbEnd2endTest::ServerData466     void StopListeningAndSendGoaways() {
467       grpc_core::ExecCtx exec_ctx;
468       auto* server = grpc_core::Server::FromC(server_->c_server());
469       server->StopListening();
470       server->SendGoaways();
471     }
472 
SetServingStatusgrpc::testing::__anon18215c690111::ClientLbEnd2endTest::ServerData473     void SetServingStatus(const std::string& service, bool serving) {
474       server_->GetHealthCheckService()->SetServingStatus(service, serving);
475     }
476   };
477 
ResetCounters()478   void ResetCounters() {
479     for (const auto& server : servers_) server->service_.ResetCounters();
480   }
481 
SeenAllServers(size_t start_index=0,size_t stop_index=0)482   bool SeenAllServers(size_t start_index = 0, size_t stop_index = 0) {
483     if (stop_index == 0) stop_index = servers_.size();
484     for (size_t i = start_index; i < stop_index; ++i) {
485       if (servers_[i]->service_.request_count() == 0) return false;
486     }
487     return true;
488   }
489 
490   // If status_check is null, all RPCs must succeed.
491   // If status_check is non-null, it will be called for all non-OK RPCs.
WaitForServers(const grpc_core::DebugLocation & location,const std::unique_ptr<grpc::testing::EchoTestService::Stub> & stub,size_t start_index=0,size_t stop_index=0,absl::AnyInvocable<void (const Status &)> status_check=nullptr,absl::Duration timeout=absl::Seconds (30))492   void WaitForServers(
493       const grpc_core::DebugLocation& location,
494       const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
495       size_t start_index = 0, size_t stop_index = 0,
496       absl::AnyInvocable<void(const Status&)> status_check = nullptr,
497       absl::Duration timeout = absl::Seconds(30)) {
498     if (stop_index == 0) stop_index = servers_.size();
499     auto deadline = absl::Now() + (timeout * grpc_test_slowdown_factor());
500     gpr_log(GPR_INFO,
501             "========= WAITING FOR BACKENDS [%" PRIuPTR ", %" PRIuPTR
502             ") ==========",
503             start_index, stop_index);
504     while (!SeenAllServers(start_index, stop_index)) {
505       Status status = SendRpc(stub);
506       if (status_check != nullptr) {
507         if (!status.ok()) status_check(status);
508       } else {
509         EXPECT_TRUE(status.ok())
510             << " code=" << status.error_code() << " message=\""
511             << status.error_message() << "\" at " << location.file() << ":"
512             << location.line();
513       }
514       EXPECT_LE(absl::Now(), deadline)
515           << " at " << location.file() << ":" << location.line();
516       if (absl::Now() >= deadline) break;
517     }
518     ResetCounters();
519   }
520 
WaitForServer(const grpc_core::DebugLocation & location,const std::unique_ptr<grpc::testing::EchoTestService::Stub> & stub,size_t server_index,absl::AnyInvocable<void (const Status &)> status_check=nullptr)521   void WaitForServer(
522       const grpc_core::DebugLocation& location,
523       const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
524       size_t server_index,
525       absl::AnyInvocable<void(const Status&)> status_check = nullptr) {
526     WaitForServers(location, stub, server_index, server_index + 1,
527                    std::move(status_check));
528   }
529 
WaitForChannelState(Channel * channel,absl::AnyInvocable<bool (grpc_connectivity_state)> predicate,bool try_to_connect=false,int timeout_seconds=5)530   bool WaitForChannelState(
531       Channel* channel,
532       absl::AnyInvocable<bool(grpc_connectivity_state)> predicate,
533       bool try_to_connect = false, int timeout_seconds = 5) {
534     const gpr_timespec deadline =
535         grpc_timeout_seconds_to_deadline(timeout_seconds);
536     while (true) {
537       grpc_connectivity_state state = channel->GetState(try_to_connect);
538       if (predicate(state)) break;
539       if (!channel->WaitForStateChange(state, deadline)) return false;
540     }
541     return true;
542   }
543 
WaitForChannelNotReady(Channel * channel,int timeout_seconds=5)544   bool WaitForChannelNotReady(Channel* channel, int timeout_seconds = 5) {
545     auto predicate = [](grpc_connectivity_state state) {
546       return state != GRPC_CHANNEL_READY;
547     };
548     return WaitForChannelState(channel, predicate, false, timeout_seconds);
549   }
550 
WaitForChannelReady(Channel * channel,int timeout_seconds=5)551   bool WaitForChannelReady(Channel* channel, int timeout_seconds = 5) {
552     auto predicate = [](grpc_connectivity_state state) {
553       return state == GRPC_CHANNEL_READY;
554     };
555     return WaitForChannelState(channel, predicate, true, timeout_seconds);
556   }
557 
558   // Updates \a connection_order by appending to it the index of the newly
559   // connected server. Must be called after every single RPC.
UpdateConnectionOrder(const std::vector<std::unique_ptr<ServerData>> & servers,std::vector<int> * connection_order)560   void UpdateConnectionOrder(
561       const std::vector<std::unique_ptr<ServerData>>& servers,
562       std::vector<int>* connection_order) {
563     for (size_t i = 0; i < servers.size(); ++i) {
564       if (servers[i]->service_.request_count() == 1) {
565         // Was the server index known? If not, update connection_order.
566         const auto it =
567             std::find(connection_order->begin(), connection_order->end(), i);
568         if (it == connection_order->end()) {
569           connection_order->push_back(i);
570           return;
571         }
572       }
573     }
574   }
575 
EnableNoopHealthCheckService()576   void EnableNoopHealthCheckService() {
577     for (auto& server : servers_) {
578       server->enable_noop_health_check_service_ = true;
579     }
580   }
581 
MakeConnectionFailureRegex(absl::string_view prefix)582   static std::string MakeConnectionFailureRegex(absl::string_view prefix) {
583     return absl::StrCat(prefix,
584                         "; last error: (UNKNOWN|UNAVAILABLE): "
585                         "(ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: "
586                         "(Failed to connect to remote host: )?"
587                         "(Connection refused|Connection reset by peer|"
588                         "recvmsg:Connection reset by peer|"
589                         "getsockopt\\(SO\\_ERROR\\): Connection reset by peer|"
590                         "Socket closed|FD shutdown)");
591   }
592 
593   const std::string server_host_;
594   std::vector<std::unique_ptr<ServerData>> servers_;
595   std::shared_ptr<ChannelCredentials> creds_;
596 };
597 
TEST_F(ClientLbEnd2endTest,ChannelStateConnectingWhenResolving)598 TEST_F(ClientLbEnd2endTest, ChannelStateConnectingWhenResolving) {
599   const int kNumServers = 3;
600   StartServers(kNumServers);
601   FakeResolverResponseGeneratorWrapper response_generator;
602   auto channel = BuildChannel("", response_generator);
603   auto stub = BuildStub(channel);
604   // Initial state should be IDLE.
605   EXPECT_EQ(channel->GetState(false /* try_to_connect */), GRPC_CHANNEL_IDLE);
606   // Tell the channel to try to connect.
607   // Note that this call also returns IDLE, since the state change has
608   // not yet occurred; it just gets triggered by this call.
609   EXPECT_EQ(channel->GetState(true /* try_to_connect */), GRPC_CHANNEL_IDLE);
610   // Now that the channel is trying to connect, we should get to state
611   // CONNECTING.
612   ASSERT_TRUE(
613       WaitForChannelState(channel.get(), [&](grpc_connectivity_state state) {
614         if (state == GRPC_CHANNEL_IDLE) return false;
615         EXPECT_EQ(state, GRPC_CHANNEL_CONNECTING);
616         return true;
617       }));
618   // Return a resolver result, which allows the connection attempt to proceed.
619   response_generator.SetNextResolution(GetServersPorts());
620   // We should eventually transition into state READY.
621   EXPECT_TRUE(WaitForChannelReady(channel.get()));
622 }
623 
TEST_F(ClientLbEnd2endTest,ChannelIdleness)624 TEST_F(ClientLbEnd2endTest, ChannelIdleness) {
625   // Start server.
626   const int kNumServers = 1;
627   StartServers(kNumServers);
628   // Set max idle time and build the channel.
629   ChannelArguments args;
630   args.SetInt(GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS,
631               1000 * grpc_test_slowdown_factor());
632   FakeResolverResponseGeneratorWrapper response_generator;
633   auto channel = BuildChannel("", response_generator, args);
634   auto stub = BuildStub(channel);
635   // The initial channel state should be IDLE.
636   EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
637   // After sending RPC, channel state should be READY.
638   gpr_log(GPR_INFO, "*** SENDING RPC, CHANNEL SHOULD CONNECT ***");
639   response_generator.SetNextResolution(GetServersPorts());
640   CheckRpcSendOk(DEBUG_LOCATION, stub);
641   EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
642   // After a period time not using the channel, the channel state should switch
643   // to IDLE.
644   gpr_log(GPR_INFO, "*** WAITING FOR CHANNEL TO GO IDLE ***");
645   gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(1200));
646   EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
647   // Sending a new RPC should awake the IDLE channel.
648   gpr_log(GPR_INFO, "*** SENDING ANOTHER RPC, CHANNEL SHOULD RECONNECT ***");
649   response_generator.SetNextResolution(GetServersPorts());
650   CheckRpcSendOk(DEBUG_LOCATION, stub);
651   EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
652 }
653 
TEST_F(ClientLbEnd2endTest,AuthorityOverrideOnChannel)654 TEST_F(ClientLbEnd2endTest, AuthorityOverrideOnChannel) {
655   StartServers(1);
656   // Set authority via channel arg.
657   FakeResolverResponseGeneratorWrapper response_generator;
658   ChannelArguments args;
659   args.SetString(GRPC_ARG_DEFAULT_AUTHORITY, "foo.example.com");
660   auto channel = BuildChannel("", response_generator, args);
661   auto stub = BuildStub(channel);
662   response_generator.SetNextResolution(GetServersPorts());
663   // Send an RPC.
664   EchoRequest request;
665   request.mutable_param()->set_echo_host_from_authority_header(true);
666   EchoResponse response;
667   Status status = SendRpc(stub, &response, /*timeout_ms=*/1000,
668                           /*wait_for_ready=*/false, &request);
669   EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
670                            << " message=" << status.error_message();
671   // Check that the right authority was seen by the server.
672   EXPECT_EQ("foo.example.com", response.param().host());
673 }
674 
TEST_F(ClientLbEnd2endTest,AuthorityOverrideFromResolver)675 TEST_F(ClientLbEnd2endTest, AuthorityOverrideFromResolver) {
676   StartServers(1);
677   FakeResolverResponseGeneratorWrapper response_generator;
678   auto channel = BuildChannel("", response_generator);
679   auto stub = BuildStub(channel);
680   // Inject resolver result that sets the per-address authority to a
681   // different value.
682   response_generator.SetNextResolution(
683       GetServersPorts(), /*service_config_json=*/nullptr,
684       grpc_core::ChannelArgs().Set(GRPC_ARG_DEFAULT_AUTHORITY,
685                                    "foo.example.com"));
686   // Send an RPC.
687   EchoRequest request;
688   request.mutable_param()->set_echo_host_from_authority_header(true);
689   EchoResponse response;
690   Status status = SendRpc(stub, &response, /*timeout_ms=*/1000,
691                           /*wait_for_ready=*/false, &request);
692   EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
693                            << " message=" << status.error_message();
694   // Check that the right authority was seen by the server.
695   EXPECT_EQ("foo.example.com", response.param().host());
696 }
697 
TEST_F(ClientLbEnd2endTest,AuthorityOverridePrecedence)698 TEST_F(ClientLbEnd2endTest, AuthorityOverridePrecedence) {
699   StartServers(1);
700   // Set authority via channel arg.
701   FakeResolverResponseGeneratorWrapper response_generator;
702   ChannelArguments args;
703   args.SetString(GRPC_ARG_DEFAULT_AUTHORITY, "foo.example.com");
704   auto channel = BuildChannel("", response_generator, args);
705   auto stub = BuildStub(channel);
706   // Inject resolver result that sets the per-address authority to a
707   // different value.
708   response_generator.SetNextResolution(
709       GetServersPorts(), /*service_config_json=*/nullptr,
710       grpc_core::ChannelArgs().Set(GRPC_ARG_DEFAULT_AUTHORITY,
711                                    "bar.example.com"));
712   // Send an RPC.
713   EchoRequest request;
714   request.mutable_param()->set_echo_host_from_authority_header(true);
715   EchoResponse response;
716   Status status = SendRpc(stub, &response, /*timeout_ms=*/1000,
717                           /*wait_for_ready=*/false, &request);
718   EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
719                            << " message=" << status.error_message();
720   // Check that the right authority was seen by the server.
721   EXPECT_EQ("foo.example.com", response.param().host());
722 }
723 
724 //
725 // pick_first tests
726 //
727 
728 using PickFirstTest = ClientLbEnd2endTest;
729 
TEST_F(PickFirstTest,Basic)730 TEST_F(PickFirstTest, Basic) {
731   // Start servers and send one RPC per server.
732   const int kNumServers = 3;
733   StartServers(kNumServers);
734   FakeResolverResponseGeneratorWrapper response_generator;
735   auto channel = BuildChannel(
736       "", response_generator);  // test that pick first is the default.
737   auto stub = BuildStub(channel);
738   response_generator.SetNextResolution(GetServersPorts());
739   for (size_t i = 0; i < servers_.size(); ++i) {
740     CheckRpcSendOk(DEBUG_LOCATION, stub);
741   }
742   // All requests should have gone to a single server.
743   bool found = false;
744   for (size_t i = 0; i < servers_.size(); ++i) {
745     const int request_count = servers_[i]->service_.request_count();
746     if (request_count == kNumServers) {
747       found = true;
748     } else {
749       EXPECT_EQ(0, request_count);
750     }
751   }
752   EXPECT_TRUE(found);
753   // Check LB policy name for the channel.
754   EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
755 }
756 
TEST_F(PickFirstTest,ProcessPending)757 TEST_F(PickFirstTest, ProcessPending) {
758   StartServers(1);  // Single server
759   FakeResolverResponseGeneratorWrapper response_generator;
760   auto channel = BuildChannel(
761       "", response_generator);  // test that pick first is the default.
762   auto stub = BuildStub(channel);
763   response_generator.SetNextResolution({servers_[0]->port_});
764   WaitForServer(DEBUG_LOCATION, stub, 0);
765   // Create a new channel and its corresponding PF LB policy, which will pick
766   // the subchannels in READY state from the previous RPC against the same
767   // target (even if it happened over a different channel, because subchannels
768   // are globally reused). Progress should happen without any transition from
769   // this READY state.
770   FakeResolverResponseGeneratorWrapper second_response_generator;
771   auto second_channel = BuildChannel("", second_response_generator);
772   auto second_stub = BuildStub(second_channel);
773   second_response_generator.SetNextResolution({servers_[0]->port_});
774   CheckRpcSendOk(DEBUG_LOCATION, second_stub);
775 }
776 
TEST_F(PickFirstTest,SelectsReadyAtStartup)777 TEST_F(PickFirstTest, SelectsReadyAtStartup) {
778   ChannelArguments args;
779   constexpr int kInitialBackOffMs = 5000;
780   args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS,
781               kInitialBackOffMs * grpc_test_slowdown_factor());
782   // Create 2 servers, but start only the second one.
783   std::vector<int> ports = {grpc_pick_unused_port_or_die(),
784                             grpc_pick_unused_port_or_die()};
785   CreateServers(2, ports);
786   StartServer(1);
787   FakeResolverResponseGeneratorWrapper response_generator1;
788   auto channel1 = BuildChannel("pick_first", response_generator1, args);
789   auto stub1 = BuildStub(channel1);
790   response_generator1.SetNextResolution(ports);
791   // Wait for second server to be ready.
792   WaitForServer(DEBUG_LOCATION, stub1, 1);
793   // Create a second channel with the same addresses.  Its PF instance
794   // should immediately pick the second subchannel, since it's already
795   // in READY state.
796   FakeResolverResponseGeneratorWrapper response_generator2;
797   auto channel2 = BuildChannel("pick_first", response_generator2, args);
798   response_generator2.SetNextResolution(ports);
799   // Check that the channel reports READY without waiting for the
800   // initial backoff.
801   EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1 /* timeout_seconds */));
802 }
803 
TEST_F(PickFirstTest,BackOffInitialReconnect)804 TEST_F(PickFirstTest, BackOffInitialReconnect) {
805   ChannelArguments args;
806   constexpr int kInitialBackOffMs = 100;
807   args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS,
808               kInitialBackOffMs * grpc_test_slowdown_factor());
809   const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
810   FakeResolverResponseGeneratorWrapper response_generator;
811   auto channel = BuildChannel("pick_first", response_generator, args);
812   auto stub = BuildStub(channel);
813   response_generator.SetNextResolution(ports);
814   // Start trying to connect.  The channel will report
815   // TRANSIENT_FAILURE, because the server is not reachable.
816   const grpc_core::Timestamp t0 = grpc_core::Timestamp::Now();
817   ASSERT_TRUE(WaitForChannelState(
818       channel.get(),
819       [&](grpc_connectivity_state state) {
820         if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) return true;
821         EXPECT_THAT(state, ::testing::AnyOf(GRPC_CHANNEL_IDLE,
822                                             GRPC_CHANNEL_CONNECTING));
823         return false;
824       },
825       /*try_to_connect=*/true));
826   // Bring up a server on the chosen port.
827   StartServers(1, ports);
828   // Now the channel will become connected.
829   ASSERT_TRUE(WaitForChannelReady(channel.get()));
830   // Check how long it took.
831   const grpc_core::Duration waited = grpc_core::Timestamp::Now() - t0;
832   gpr_log(GPR_DEBUG, "Waited %" PRId64 " milliseconds", waited.millis());
833   // We should have waited at least kInitialBackOffMs. We substract one to
834   // account for test and precision accuracy drift.
835   EXPECT_GE(waited.millis(),
836             (kInitialBackOffMs * grpc_test_slowdown_factor()) - 1);
837   // But not much more.
838   EXPECT_LE(waited.millis(),
839             (kInitialBackOffMs * grpc_test_slowdown_factor()) * 1.3);
840 }
841 
TEST_F(PickFirstTest,BackOffMinReconnect)842 TEST_F(PickFirstTest, BackOffMinReconnect) {
843   ChannelArguments args;
844   constexpr int kMinReconnectBackOffMs = 1000;
845   args.SetInt(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS,
846               kMinReconnectBackOffMs * grpc_test_slowdown_factor());
847   const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
848   FakeResolverResponseGeneratorWrapper response_generator;
849   auto channel = BuildChannel("pick_first", response_generator, args);
850   auto stub = BuildStub(channel);
851   response_generator.SetNextResolution(ports);
852   // Make connection delay a 10% longer than it's willing to in order to make
853   // sure we are hitting the codepath that waits for the min reconnect backoff.
854   ConnectionAttemptInjector injector;
855   injector.SetDelay(grpc_core::Duration::Milliseconds(
856       kMinReconnectBackOffMs * grpc_test_slowdown_factor() * 1.10));
857   const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
858   channel->WaitForConnected(
859       grpc_timeout_milliseconds_to_deadline(kMinReconnectBackOffMs * 2));
860   const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
861   const grpc_core::Duration waited =
862       grpc_core::Duration::FromTimespec(gpr_time_sub(t1, t0));
863   gpr_log(GPR_DEBUG, "Waited %" PRId64 " milliseconds", waited.millis());
864   // We should have waited at least kMinReconnectBackOffMs. We substract one to
865   // account for test and precision accuracy drift.
866   EXPECT_GE(waited.millis(),
867             (kMinReconnectBackOffMs * grpc_test_slowdown_factor()) - 1);
868 }
869 
TEST_F(PickFirstTest,ResetConnectionBackoff)870 TEST_F(PickFirstTest, ResetConnectionBackoff) {
871   ChannelArguments args;
872   constexpr int kInitialBackOffMs = 1000;
873   args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS,
874               kInitialBackOffMs * grpc_test_slowdown_factor());
875   const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
876   FakeResolverResponseGeneratorWrapper response_generator;
877   auto channel = BuildChannel("pick_first", response_generator, args);
878   auto stub = BuildStub(channel);
879   response_generator.SetNextResolution(ports);
880   // The channel won't become connected (there's no server).
881   EXPECT_FALSE(
882       channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10)));
883   // Bring up a server on the chosen port.
884   StartServers(1, ports);
885   const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
886   // Wait for connect, but not long enough.  This proves that we're
887   // being throttled by initial backoff.
888   EXPECT_FALSE(
889       channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10)));
890   // Reset connection backoff.
891   experimental::ChannelResetConnectionBackoff(channel.get());
892   // Wait for connect.  Should happen as soon as the client connects to
893   // the newly started server, which should be before the initial
894   // backoff timeout elapses.
895   EXPECT_TRUE(channel->WaitForConnected(
896       grpc_timeout_milliseconds_to_deadline(kInitialBackOffMs)));
897   const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
898   const grpc_core::Duration waited =
899       grpc_core::Duration::FromTimespec(gpr_time_sub(t1, t0));
900   gpr_log(GPR_DEBUG, "Waited %" PRId64 " milliseconds", waited.millis());
901   // We should have waited less than kInitialBackOffMs.
902   EXPECT_LT(waited.millis(), kInitialBackOffMs * grpc_test_slowdown_factor());
903 }
904 
TEST_F(ClientLbEnd2endTest,ResetConnectionBackoffNextAttemptStartsImmediately)905 TEST_F(ClientLbEnd2endTest,
906        ResetConnectionBackoffNextAttemptStartsImmediately) {
907   // Start connection injector.
908   ConnectionAttemptInjector injector;
909   // Create client.
910   const int port = grpc_pick_unused_port_or_die();
911   ChannelArguments args;
912   const int kInitialBackOffMs = 5000;
913   args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS,
914               kInitialBackOffMs * grpc_test_slowdown_factor());
915   FakeResolverResponseGeneratorWrapper response_generator;
916   auto channel = BuildChannel("pick_first", response_generator, args);
917   auto stub = BuildStub(channel);
918   response_generator.SetNextResolution({port});
919   // Intercept initial connection attempt.
920   auto hold1 = injector.AddHold(port);
921   gpr_log(GPR_INFO, "=== TRIGGERING INITIAL CONNECTION ATTEMPT");
922   EXPECT_EQ(GRPC_CHANNEL_IDLE, channel->GetState(/*try_to_connect=*/true));
923   hold1->Wait();
924   EXPECT_EQ(GRPC_CHANNEL_CONNECTING,
925             channel->GetState(/*try_to_connect=*/false));
926   // Reset backoff.
927   gpr_log(GPR_INFO, "=== RESETTING BACKOFF");
928   experimental::ChannelResetConnectionBackoff(channel.get());
929   // Intercept next attempt.  Do this before resuming the first attempt,
930   // just in case the client makes progress faster than this thread.
931   auto hold2 = injector.AddHold(port);
932   // Fail current attempt and wait for next one to start.
933   gpr_log(GPR_INFO, "=== RESUMING INITIAL ATTEMPT");
934   const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
935   hold1->Resume();
936   gpr_log(GPR_INFO, "=== WAITING FOR SECOND ATTEMPT");
937   // This WaitForStateChange() call just makes sure we're doing some polling.
938   EXPECT_TRUE(channel->WaitForStateChange(GRPC_CHANNEL_CONNECTING,
939                                           grpc_timeout_seconds_to_deadline(1)));
940   hold2->Wait();
941   const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
942   gpr_log(GPR_INFO, "=== RESUMING SECOND ATTEMPT");
943   hold2->Resume();
944   // Elapsed time should be very short, much less than kInitialBackOffMs.
945   const grpc_core::Duration waited =
946       grpc_core::Duration::FromTimespec(gpr_time_sub(t1, t0));
947   gpr_log(GPR_DEBUG, "Waited %" PRId64 " milliseconds", waited.millis());
948   EXPECT_LT(waited.millis(), 1000 * grpc_test_slowdown_factor());
949 }
950 
TEST_F(PickFirstTest,Updates)951 TEST_F(PickFirstTest, Updates) {
952   // Start servers and send one RPC per server.
953   const int kNumServers = 3;
954   StartServers(kNumServers);
955   FakeResolverResponseGeneratorWrapper response_generator;
956   auto channel = BuildChannel("pick_first", response_generator);
957   auto stub = BuildStub(channel);
958   // Perform one RPC against the first server.
959   response_generator.SetNextResolution(GetServersPorts(0, 1));
960   gpr_log(GPR_INFO, "****** SET [0] *******");
961   CheckRpcSendOk(DEBUG_LOCATION, stub);
962   EXPECT_EQ(servers_[0]->service_.request_count(), 1);
963   // An empty update will result in the channel going into TRANSIENT_FAILURE.
964   response_generator.SetNextResolution({});
965   gpr_log(GPR_INFO, "****** SET none *******");
966   WaitForChannelNotReady(channel.get());
967   // Next update introduces servers_[1], making the channel recover.
968   response_generator.SetNextResolution(GetServersPorts(1, 2));
969   gpr_log(GPR_INFO, "****** SET [1] *******");
970   WaitForChannelReady(channel.get());
971   WaitForServer(DEBUG_LOCATION, stub, 1);
972   // And again for servers_[2]
973   response_generator.SetNextResolution(GetServersPorts(2, 3));
974   gpr_log(GPR_INFO, "****** SET [2] *******");
975   WaitForServer(DEBUG_LOCATION, stub, 2);
976   // Check LB policy name for the channel.
977   EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
978 }
979 
TEST_F(PickFirstTest,UpdateSuperset)980 TEST_F(PickFirstTest, UpdateSuperset) {
981   // Start servers and send one RPC per server.
982   const int kNumServers = 3;
983   StartServers(kNumServers);
984   FakeResolverResponseGeneratorWrapper response_generator;
985   auto channel = BuildChannel("pick_first", response_generator);
986   auto stub = BuildStub(channel);
987 
988   std::vector<int> ports;
989 
990   // Perform one RPC against the first server.
991   ports.emplace_back(servers_[0]->port_);
992   response_generator.SetNextResolution(ports);
993   gpr_log(GPR_INFO, "****** SET [0] *******");
994   CheckRpcSendOk(DEBUG_LOCATION, stub);
995   EXPECT_EQ(servers_[0]->service_.request_count(), 1);
996   servers_[0]->service_.ResetCounters();
997 
998   // Send and superset update
999   ports.clear();
1000   ports.emplace_back(servers_[1]->port_);
1001   ports.emplace_back(servers_[0]->port_);
1002   response_generator.SetNextResolution(ports);
1003   gpr_log(GPR_INFO, "****** SET superset *******");
1004   CheckRpcSendOk(DEBUG_LOCATION, stub);
1005   // We stick to the previously connected server.
1006   WaitForServer(DEBUG_LOCATION, stub, 0);
1007   EXPECT_EQ(0, servers_[1]->service_.request_count());
1008 
1009   // Check LB policy name for the channel.
1010   EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
1011 }
1012 
TEST_F(PickFirstTest,UpdateToUnconnected)1013 TEST_F(PickFirstTest, UpdateToUnconnected) {
1014   const int kNumServers = 2;
1015   CreateServers(kNumServers);
1016   StartServer(0);
1017   FakeResolverResponseGeneratorWrapper response_generator;
1018   auto channel = BuildChannel("pick_first", response_generator);
1019   auto stub = BuildStub(channel);
1020 
1021   std::vector<int> ports;
1022 
1023   // Try to send rpcs against a list where the server is available.
1024   ports.emplace_back(servers_[0]->port_);
1025   response_generator.SetNextResolution(ports);
1026   gpr_log(GPR_INFO, "****** SET [0] *******");
1027   CheckRpcSendOk(DEBUG_LOCATION, stub);
1028 
1029   // Send resolution for which all servers are currently unavailable. Eventually
1030   // this triggers replacing the existing working subchannel_list with the new
1031   // currently unresponsive list.
1032   ports.clear();
1033   ports.emplace_back(grpc_pick_unused_port_or_die());
1034   ports.emplace_back(servers_[1]->port_);
1035   response_generator.SetNextResolution(ports);
1036   gpr_log(GPR_INFO, "****** SET [unavailable] *******");
1037   EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
1038 
1039   // Ensure that the last resolution was installed correctly by verifying that
1040   // the channel becomes ready once one of if its endpoints becomes available.
1041   gpr_log(GPR_INFO, "****** StartServer(1) *******");
1042   StartServer(1);
1043   EXPECT_TRUE(WaitForChannelReady(channel.get()));
1044 }
1045 
TEST_F(PickFirstTest,GlobalSubchannelPool)1046 TEST_F(PickFirstTest, GlobalSubchannelPool) {
1047   // Start one server.
1048   const int kNumServers = 1;
1049   StartServers(kNumServers);
1050   std::vector<int> ports = GetServersPorts();
1051   // Create two channels that (by default) use the global subchannel pool.
1052   FakeResolverResponseGeneratorWrapper response_generator1;
1053   auto channel1 = BuildChannel("pick_first", response_generator1);
1054   auto stub1 = BuildStub(channel1);
1055   response_generator1.SetNextResolution(ports);
1056   FakeResolverResponseGeneratorWrapper response_generator2;
1057   auto channel2 = BuildChannel("pick_first", response_generator2);
1058   auto stub2 = BuildStub(channel2);
1059   response_generator2.SetNextResolution(ports);
1060   WaitForServer(DEBUG_LOCATION, stub1, 0);
1061   // Send one RPC on each channel.
1062   CheckRpcSendOk(DEBUG_LOCATION, stub1);
1063   CheckRpcSendOk(DEBUG_LOCATION, stub2);
1064   // The server receives two requests.
1065   EXPECT_EQ(2, servers_[0]->service_.request_count());
1066   // The two requests are from the same client port, because the two channels
1067   // share subchannels via the global subchannel pool.
1068   EXPECT_EQ(1UL, servers_[0]->service_.clients().size());
1069 }
1070 
TEST_F(PickFirstTest,LocalSubchannelPool)1071 TEST_F(PickFirstTest, LocalSubchannelPool) {
1072   // Start one server.
1073   const int kNumServers = 1;
1074   StartServers(kNumServers);
1075   std::vector<int> ports = GetServersPorts();
1076   // Create two channels that use local subchannel pool.
1077   ChannelArguments args;
1078   args.SetInt(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL, 1);
1079   FakeResolverResponseGeneratorWrapper response_generator1;
1080   auto channel1 = BuildChannel("pick_first", response_generator1, args);
1081   auto stub1 = BuildStub(channel1);
1082   response_generator1.SetNextResolution(ports);
1083   FakeResolverResponseGeneratorWrapper response_generator2;
1084   auto channel2 = BuildChannel("pick_first", response_generator2, args);
1085   auto stub2 = BuildStub(channel2);
1086   response_generator2.SetNextResolution(ports);
1087   WaitForServer(DEBUG_LOCATION, stub1, 0);
1088   // Send one RPC on each channel.
1089   CheckRpcSendOk(DEBUG_LOCATION, stub1);
1090   CheckRpcSendOk(DEBUG_LOCATION, stub2);
1091   // The server receives two requests.
1092   EXPECT_EQ(2, servers_[0]->service_.request_count());
1093   // The two requests are from two client ports, because the two channels didn't
1094   // share subchannels with each other.
1095   EXPECT_EQ(2UL, servers_[0]->service_.clients().size());
1096 }
1097 
TEST_F(PickFirstTest,ManyUpdates)1098 TEST_F(PickFirstTest, ManyUpdates) {
1099   const int kNumUpdates = 1000;
1100   const int kNumServers = 3;
1101   StartServers(kNumServers);
1102   FakeResolverResponseGeneratorWrapper response_generator;
1103   auto channel = BuildChannel("pick_first", response_generator);
1104   auto stub = BuildStub(channel);
1105   std::vector<int> ports = GetServersPorts();
1106   for (size_t i = 0; i < kNumUpdates; ++i) {
1107     std::shuffle(ports.begin(), ports.end(),
1108                  std::mt19937(std::random_device()()));
1109     response_generator.SetNextResolution(ports);
1110     // We should re-enter core at the end of the loop to give the resolution
1111     // setting closure a chance to run.
1112     if ((i + 1) % 10 == 0) CheckRpcSendOk(DEBUG_LOCATION, stub);
1113   }
1114   // Check LB policy name for the channel.
1115   EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
1116 }
1117 
TEST_F(PickFirstTest,ReresolutionNoSelected)1118 TEST_F(PickFirstTest, ReresolutionNoSelected) {
1119   // Prepare the ports for up servers and down servers.
1120   const int kNumServers = 3;
1121   const int kNumAliveServers = 1;
1122   StartServers(kNumAliveServers);
1123   std::vector<int> alive_ports, dead_ports;
1124   for (size_t i = 0; i < kNumServers; ++i) {
1125     if (i < kNumAliveServers) {
1126       alive_ports.emplace_back(servers_[i]->port_);
1127     } else {
1128       dead_ports.emplace_back(grpc_pick_unused_port_or_die());
1129     }
1130   }
1131   FakeResolverResponseGeneratorWrapper response_generator;
1132   auto channel = BuildChannel("pick_first", response_generator);
1133   auto stub = BuildStub(channel);
1134   // The initial resolution only contains dead ports. There won't be any
1135   // selected subchannel. Re-resolution will return the same result.
1136   response_generator.SetNextResolution(dead_ports);
1137   gpr_log(GPR_INFO, "****** INITIAL RESOLUTION SET *******");
1138   for (size_t i = 0; i < 10; ++i) {
1139     CheckRpcSendFailure(
1140         DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE,
1141         MakeConnectionFailureRegex("failed to connect to all addresses"));
1142   }
1143   // PF should request re-resolution.
1144   gpr_log(GPR_INFO, "****** WAITING FOR RE-RESOLUTION *******");
1145   EXPECT_TRUE(response_generator.Get()->WaitForReresolutionRequest(
1146       absl::Seconds(5 * grpc_test_slowdown_factor())));
1147   gpr_log(GPR_INFO, "****** RE-RESOLUTION SEEN *******");
1148   // Send a resolver result that contains reachable ports, so that the
1149   // pick_first LB policy can recover soon.
1150   response_generator.SetNextResolution(alive_ports);
1151   gpr_log(GPR_INFO, "****** RE-RESOLUTION SENT *******");
1152   WaitForServer(DEBUG_LOCATION, stub, 0, [](const Status& status) {
1153     EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code());
1154     EXPECT_THAT(status.error_message(),
1155                 ::testing::ContainsRegex(MakeConnectionFailureRegex(
1156                     "failed to connect to all addresses")));
1157   });
1158   CheckRpcSendOk(DEBUG_LOCATION, stub);
1159   EXPECT_EQ(servers_[0]->service_.request_count(), 1);
1160   // Check LB policy name for the channel.
1161   EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
1162 }
1163 
TEST_F(PickFirstTest,ReconnectWithoutNewResolverResult)1164 TEST_F(PickFirstTest, ReconnectWithoutNewResolverResult) {
1165   std::vector<int> ports = {grpc_pick_unused_port_or_die()};
1166   StartServers(1, ports);
1167   FakeResolverResponseGeneratorWrapper response_generator;
1168   auto channel = BuildChannel("pick_first", response_generator);
1169   auto stub = BuildStub(channel);
1170   response_generator.SetNextResolution(ports);
1171   gpr_log(GPR_INFO, "****** INITIAL CONNECTION *******");
1172   WaitForServer(DEBUG_LOCATION, stub, 0);
1173   gpr_log(GPR_INFO, "****** STOPPING SERVER ******");
1174   servers_[0]->Shutdown();
1175   EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
1176   gpr_log(GPR_INFO, "****** RESTARTING SERVER ******");
1177   StartServers(1, ports);
1178   WaitForServer(DEBUG_LOCATION, stub, 0);
1179 }
1180 
TEST_F(PickFirstTest,ReconnectWithoutNewResolverResultStartsFromTopOfList)1181 TEST_F(PickFirstTest, ReconnectWithoutNewResolverResultStartsFromTopOfList) {
1182   std::vector<int> ports = {grpc_pick_unused_port_or_die(),
1183                             grpc_pick_unused_port_or_die()};
1184   CreateServers(2, ports);
1185   StartServer(1);
1186   FakeResolverResponseGeneratorWrapper response_generator;
1187   auto channel = BuildChannel("pick_first", response_generator);
1188   auto stub = BuildStub(channel);
1189   response_generator.SetNextResolution(ports);
1190   gpr_log(GPR_INFO, "****** INITIAL CONNECTION *******");
1191   WaitForServer(DEBUG_LOCATION, stub, 1);
1192   gpr_log(GPR_INFO, "****** STOPPING SERVER ******");
1193   servers_[1]->Shutdown();
1194   EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
1195   gpr_log(GPR_INFO, "****** STARTING BOTH SERVERS ******");
1196   StartServers(2, ports);
1197   WaitForServer(DEBUG_LOCATION, stub, 0);
1198 }
1199 
TEST_F(PickFirstTest,FailsEmptyResolverUpdate)1200 TEST_F(PickFirstTest, FailsEmptyResolverUpdate) {
1201   FakeResolverResponseGeneratorWrapper response_generator;
1202   auto channel = BuildChannel("pick_first", response_generator);
1203   auto stub = BuildStub(channel);
1204   gpr_log(GPR_INFO, "****** SENDING INITIAL RESOLVER RESULT *******");
1205   // Send a resolver result with an empty address list and a callback
1206   // that triggers a notification.
1207   grpc_core::Notification notification;
1208   grpc_core::Resolver::Result result;
1209   result.addresses.emplace();
1210   result.result_health_callback = [&](absl::Status status) {
1211     gpr_log(GPR_INFO, "****** RESULT HEALTH CALLBACK *******");
1212     EXPECT_EQ(absl::StatusCode::kUnavailable, status.code());
1213     EXPECT_EQ("address list must not be empty", status.message()) << status;
1214     notification.Notify();
1215   };
1216   response_generator.SetResponse(std::move(result));
1217   // Wait for channel to report TRANSIENT_FAILURE.
1218   gpr_log(GPR_INFO, "****** TELLING CHANNEL TO CONNECT *******");
1219   auto predicate = [](grpc_connectivity_state state) {
1220     return state == GRPC_CHANNEL_TRANSIENT_FAILURE;
1221   };
1222   EXPECT_TRUE(
1223       WaitForChannelState(channel.get(), predicate, /*try_to_connect=*/true));
1224   // Callback should run.
1225   notification.WaitForNotification();
1226   // Return a valid address.
1227   gpr_log(GPR_INFO, "****** SENDING NEXT RESOLVER RESULT *******");
1228   StartServers(1);
1229   response_generator.SetNextResolution(GetServersPorts());
1230   gpr_log(GPR_INFO, "****** SENDING WAIT_FOR_READY RPC *******");
1231   CheckRpcSendOk(DEBUG_LOCATION, stub, /*wait_for_ready=*/true);
1232 }
1233 
TEST_F(PickFirstTest,CheckStateBeforeStartWatch)1234 TEST_F(PickFirstTest, CheckStateBeforeStartWatch) {
1235   std::vector<int> ports = {grpc_pick_unused_port_or_die()};
1236   StartServers(1, ports);
1237   FakeResolverResponseGeneratorWrapper response_generator;
1238   auto channel_1 = BuildChannel("pick_first", response_generator);
1239   auto stub_1 = BuildStub(channel_1);
1240   response_generator.SetNextResolution(ports);
1241   gpr_log(GPR_INFO, "****** RESOLUTION SET FOR CHANNEL 1 *******");
1242   WaitForServer(DEBUG_LOCATION, stub_1, 0);
1243   gpr_log(GPR_INFO, "****** CHANNEL 1 CONNECTED *******");
1244   servers_[0]->Shutdown();
1245   EXPECT_TRUE(WaitForChannelNotReady(channel_1.get()));
1246   // Channel 1 will receive a re-resolution containing the same server. It will
1247   // create a new subchannel and hold a ref to it.
1248   StartServers(1, ports);
1249   gpr_log(GPR_INFO, "****** SERVER RESTARTED *******");
1250   FakeResolverResponseGeneratorWrapper response_generator_2;
1251   auto channel_2 = BuildChannel("pick_first", response_generator_2);
1252   auto stub_2 = BuildStub(channel_2);
1253   response_generator_2.SetNextResolution(ports);
1254   gpr_log(GPR_INFO, "****** RESOLUTION SET FOR CHANNEL 2 *******");
1255   WaitForServer(DEBUG_LOCATION, stub_2, 0);
1256   gpr_log(GPR_INFO, "****** CHANNEL 2 CONNECTED *******");
1257   servers_[0]->Shutdown();
1258   // Wait until the disconnection has triggered the connectivity notification.
1259   // Otherwise, the subchannel may be picked for next call but will fail soon.
1260   EXPECT_TRUE(WaitForChannelNotReady(channel_2.get()));
1261   // Channel 2 will also receive a re-resolution containing the same server.
1262   // Both channels will ref the same subchannel that failed.
1263   StartServers(1, ports);
1264   gpr_log(GPR_INFO, "****** SERVER RESTARTED AGAIN *******");
1265   gpr_log(GPR_INFO, "****** CHANNEL 2 STARTING A CALL *******");
1266   // The first call after the server restart will succeed.
1267   CheckRpcSendOk(DEBUG_LOCATION, stub_2);
1268   gpr_log(GPR_INFO, "****** CHANNEL 2 FINISHED A CALL *******");
1269   // Check LB policy name for the channel.
1270   EXPECT_EQ("pick_first", channel_1->GetLoadBalancingPolicyName());
1271   // Check LB policy name for the channel.
1272   EXPECT_EQ("pick_first", channel_2->GetLoadBalancingPolicyName());
1273 }
1274 
TEST_F(PickFirstTest,IdleOnDisconnect)1275 TEST_F(PickFirstTest, IdleOnDisconnect) {
1276   // Start server, send RPC, and make sure channel is READY.
1277   const int kNumServers = 1;
1278   StartServers(kNumServers);
1279   FakeResolverResponseGeneratorWrapper response_generator;
1280   auto channel =
1281       BuildChannel("", response_generator);  // pick_first is the default.
1282   auto stub = BuildStub(channel);
1283   response_generator.SetNextResolution(GetServersPorts());
1284   CheckRpcSendOk(DEBUG_LOCATION, stub);
1285   EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
1286   // Stop server.  Channel should go into state IDLE.
1287   servers_[0]->Shutdown();
1288   EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
1289   EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
1290   servers_.clear();
1291 }
1292 
TEST_F(PickFirstTest,PendingUpdateAndSelectedSubchannelFails)1293 TEST_F(PickFirstTest, PendingUpdateAndSelectedSubchannelFails) {
1294   FakeResolverResponseGeneratorWrapper response_generator;
1295   auto channel =
1296       BuildChannel("", response_generator);  // pick_first is the default.
1297   auto stub = BuildStub(channel);
1298   StartServers(2);
1299   // Initially resolve to first server and make sure it connects.
1300   gpr_log(GPR_INFO, "Phase 1: Connect to first server.");
1301   response_generator.SetNextResolution({servers_[0]->port_});
1302   CheckRpcSendOk(DEBUG_LOCATION, stub, true /* wait_for_ready */);
1303   EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
1304   ConnectionAttemptInjector injector;
1305   auto hold = injector.AddHold(servers_[1]->port_);
1306   // Send a resolution update with the remaining servers, none of which are
1307   // running yet, so the update will stay pending.
1308   gpr_log(GPR_INFO,
1309           "Phase 2: Resolver update pointing to remaining "
1310           "(not started) servers.");
1311   response_generator.SetNextResolution(GetServersPorts(1 /* start_index */));
1312   // Add hold before connection attempt to ensure RPCs will be sent to first
1313   // server. Otherwise, pending subchannel list might already have gone into
1314   // TRANSIENT_FAILURE due to hitting the end of the server list by the time
1315   // we check the state.
1316   hold->Wait();
1317   // RPCs will continue to be sent to the first server.
1318   CheckRpcSendOk(DEBUG_LOCATION, stub);
1319   // Now stop the first server, so that the current subchannel list
1320   // fails.  This should cause us to immediately swap over to the
1321   // pending list, even though it's not yet connected.  The state should
1322   // be set to CONNECTING, since that's what the pending subchannel list
1323   // was doing when we swapped over.
1324   gpr_log(GPR_INFO, "Phase 3: Stopping first server.");
1325   servers_[0]->Shutdown();
1326   WaitForChannelNotReady(channel.get());
1327   EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_CONNECTING);
1328   // Resume connection attempt to second server now that first server is down.
1329   // The channel should go to READY state and RPCs should go to the second
1330   // server.
1331   gpr_log(GPR_INFO, "Phase 4: Resuming connection attempt to second server.");
1332   hold->Resume();
1333   WaitForChannelReady(channel.get());
1334   WaitForServer(DEBUG_LOCATION, stub, 1);
1335 }
1336 
TEST_F(PickFirstTest,StaysIdleUponEmptyUpdate)1337 TEST_F(PickFirstTest, StaysIdleUponEmptyUpdate) {
1338   // Start server, send RPC, and make sure channel is READY.
1339   const int kNumServers = 1;
1340   StartServers(kNumServers);
1341   FakeResolverResponseGeneratorWrapper response_generator;
1342   auto channel =
1343       BuildChannel("", response_generator);  // pick_first is the default.
1344   auto stub = BuildStub(channel);
1345   response_generator.SetNextResolution(GetServersPorts());
1346   CheckRpcSendOk(DEBUG_LOCATION, stub);
1347   EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
1348   // Stop server.  Channel should go into state IDLE.
1349   servers_[0]->Shutdown();
1350   EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
1351   EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
1352   // Now send resolver update that includes no addresses.  Channel
1353   // should stay in state IDLE.
1354   response_generator.SetNextResolution({});
1355   EXPECT_FALSE(channel->WaitForStateChange(
1356       GRPC_CHANNEL_IDLE, grpc_timeout_seconds_to_deadline(3)));
1357   // Now bring the backend back up and send a non-empty resolver update,
1358   // and then try to send an RPC.  Channel should go back into state READY.
1359   StartServer(0);
1360   response_generator.SetNextResolution(GetServersPorts());
1361   CheckRpcSendOk(DEBUG_LOCATION, stub);
1362   EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
1363 }
1364 
TEST_F(PickFirstTest,StaysTransientFailureOnFailedConnectionAttemptUntilReady)1365 TEST_F(PickFirstTest,
1366        StaysTransientFailureOnFailedConnectionAttemptUntilReady) {
1367   // Allocate 3 ports, with no servers running.
1368   std::vector<int> ports = {grpc_pick_unused_port_or_die(),
1369                             grpc_pick_unused_port_or_die(),
1370                             grpc_pick_unused_port_or_die()};
1371   // Create channel with a 1-second backoff.
1372   ChannelArguments args;
1373   args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS,
1374               1000 * grpc_test_slowdown_factor());
1375   FakeResolverResponseGeneratorWrapper response_generator;
1376   auto channel = BuildChannel("", response_generator, args);
1377   auto stub = BuildStub(channel);
1378   response_generator.SetNextResolution(ports);
1379   EXPECT_EQ(GRPC_CHANNEL_IDLE, channel->GetState(false));
1380   // Send an RPC, which should fail.
1381   CheckRpcSendFailure(
1382       DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE,
1383       MakeConnectionFailureRegex("failed to connect to all addresses"));
1384   // Channel should be in TRANSIENT_FAILURE.
1385   EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel->GetState(false));
1386   // Now start a server on the last port.
1387   StartServers(1, {ports.back()});
1388   // Channel should remain in TRANSIENT_FAILURE until it transitions to READY.
1389   EXPECT_TRUE(channel->WaitForStateChange(GRPC_CHANNEL_TRANSIENT_FAILURE,
1390                                           grpc_timeout_seconds_to_deadline(4)));
1391   EXPECT_EQ(GRPC_CHANNEL_READY, channel->GetState(false));
1392   CheckRpcSendOk(DEBUG_LOCATION, stub);
1393 }
1394 
1395 //
1396 // round_robin tests
1397 //
1398 
1399 using RoundRobinTest = ClientLbEnd2endTest;
1400 
TEST_F(RoundRobinTest,Basic)1401 TEST_F(RoundRobinTest, Basic) {
1402   // Start servers and send one RPC per server.
1403   const int kNumServers = 3;
1404   StartServers(kNumServers);
1405   FakeResolverResponseGeneratorWrapper response_generator;
1406   auto channel = BuildChannel("round_robin", response_generator);
1407   auto stub = BuildStub(channel);
1408   response_generator.SetNextResolution(GetServersPorts());
1409   // Wait until all backends are ready.
1410   do {
1411     CheckRpcSendOk(DEBUG_LOCATION, stub);
1412   } while (!SeenAllServers());
1413   ResetCounters();
1414   // "Sync" to the end of the list. Next sequence of picks will start at the
1415   // first server (index 0).
1416   WaitForServer(DEBUG_LOCATION, stub, servers_.size() - 1);
1417   std::vector<int> connection_order;
1418   for (size_t i = 0; i < servers_.size(); ++i) {
1419     CheckRpcSendOk(DEBUG_LOCATION, stub);
1420     UpdateConnectionOrder(servers_, &connection_order);
1421   }
1422   // Backends should be iterated over in the order in which the addresses were
1423   // given.
1424   const auto expected = std::vector<int>{0, 1, 2};
1425   EXPECT_EQ(expected, connection_order);
1426   // Check LB policy name for the channel.
1427   EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
1428 }
1429 
TEST_F(RoundRobinTest,ProcessPending)1430 TEST_F(RoundRobinTest, ProcessPending) {
1431   StartServers(1);  // Single server
1432   FakeResolverResponseGeneratorWrapper response_generator;
1433   auto channel = BuildChannel("round_robin", response_generator);
1434   auto stub = BuildStub(channel);
1435   response_generator.SetNextResolution({servers_[0]->port_});
1436   WaitForServer(DEBUG_LOCATION, stub, 0);
1437   // Create a new channel and its corresponding RR LB policy, which will pick
1438   // the subchannels in READY state from the previous RPC against the same
1439   // target (even if it happened over a different channel, because subchannels
1440   // are globally reused). Progress should happen without any transition from
1441   // this READY state.
1442   FakeResolverResponseGeneratorWrapper second_response_generator;
1443   auto second_channel = BuildChannel("round_robin", second_response_generator);
1444   auto second_stub = BuildStub(second_channel);
1445   second_response_generator.SetNextResolution({servers_[0]->port_});
1446   CheckRpcSendOk(DEBUG_LOCATION, second_stub);
1447 }
1448 
TEST_F(RoundRobinTest,Updates)1449 TEST_F(RoundRobinTest, Updates) {
1450   // Start servers.
1451   const int kNumServers = 3;
1452   StartServers(kNumServers);
1453   FakeResolverResponseGeneratorWrapper response_generator;
1454   auto channel = BuildChannel("round_robin", response_generator);
1455   auto stub = BuildStub(channel);
1456   // Start with a single server.
1457   gpr_log(GPR_INFO, "*** FIRST BACKEND ***");
1458   std::vector<int> ports = {servers_[0]->port_};
1459   response_generator.SetNextResolution(ports);
1460   WaitForServer(DEBUG_LOCATION, stub, 0);
1461   // Send RPCs. They should all go servers_[0]
1462   for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(DEBUG_LOCATION, stub);
1463   EXPECT_EQ(10, servers_[0]->service_.request_count());
1464   EXPECT_EQ(0, servers_[1]->service_.request_count());
1465   EXPECT_EQ(0, servers_[2]->service_.request_count());
1466   ResetCounters();
1467   // And now for the second server.
1468   gpr_log(GPR_INFO, "*** SECOND BACKEND ***");
1469   ports.clear();
1470   ports.emplace_back(servers_[1]->port_);
1471   response_generator.SetNextResolution(ports);
1472   // Wait until update has been processed, as signaled by the second backend
1473   // receiving a request.
1474   EXPECT_EQ(0, servers_[1]->service_.request_count());
1475   WaitForServer(DEBUG_LOCATION, stub, 1);
1476   for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(DEBUG_LOCATION, stub);
1477   EXPECT_EQ(0, servers_[0]->service_.request_count());
1478   EXPECT_EQ(10, servers_[1]->service_.request_count());
1479   EXPECT_EQ(0, servers_[2]->service_.request_count());
1480   ResetCounters();
1481   // ... and for the last server.
1482   gpr_log(GPR_INFO, "*** THIRD BACKEND ***");
1483   ports.clear();
1484   ports.emplace_back(servers_[2]->port_);
1485   response_generator.SetNextResolution(ports);
1486   WaitForServer(DEBUG_LOCATION, stub, 2);
1487   for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(DEBUG_LOCATION, stub);
1488   EXPECT_EQ(0, servers_[0]->service_.request_count());
1489   EXPECT_EQ(0, servers_[1]->service_.request_count());
1490   EXPECT_EQ(10, servers_[2]->service_.request_count());
1491   ResetCounters();
1492   // Back to all servers.
1493   gpr_log(GPR_INFO, "*** ALL BACKENDS ***");
1494   ports.clear();
1495   ports.emplace_back(servers_[0]->port_);
1496   ports.emplace_back(servers_[1]->port_);
1497   ports.emplace_back(servers_[2]->port_);
1498   response_generator.SetNextResolution(ports);
1499   WaitForServers(DEBUG_LOCATION, stub);
1500   // Send three RPCs, one per server.
1501   for (size_t i = 0; i < 3; ++i) CheckRpcSendOk(DEBUG_LOCATION, stub);
1502   EXPECT_EQ(1, servers_[0]->service_.request_count());
1503   EXPECT_EQ(1, servers_[1]->service_.request_count());
1504   EXPECT_EQ(1, servers_[2]->service_.request_count());
1505   ResetCounters();
1506   // An empty update will result in the channel going into TRANSIENT_FAILURE.
1507   gpr_log(GPR_INFO, "*** NO BACKENDS ***");
1508   ports.clear();
1509   response_generator.SetNextResolution(ports);
1510   WaitForChannelNotReady(channel.get());
1511   CheckRpcSendFailure(DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE,
1512                       "empty address list: fake resolver empty address list");
1513   servers_[0]->service_.ResetCounters();
1514   // Next update introduces servers_[1], making the channel recover.
1515   gpr_log(GPR_INFO, "*** BACK TO SECOND BACKEND ***");
1516   ports.clear();
1517   ports.emplace_back(servers_[1]->port_);
1518   response_generator.SetNextResolution(ports);
1519   WaitForChannelReady(channel.get());
1520   WaitForServer(DEBUG_LOCATION, stub, 1);
1521   EXPECT_EQ(GRPC_CHANNEL_READY, channel->GetState(/*try_to_connect=*/false));
1522   // Check LB policy name for the channel.
1523   EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
1524 }
1525 
TEST_F(RoundRobinTest,UpdateInError)1526 TEST_F(RoundRobinTest, UpdateInError) {
1527   StartServers(2);
1528   FakeResolverResponseGeneratorWrapper response_generator;
1529   auto channel = BuildChannel("round_robin", response_generator);
1530   auto stub = BuildStub(channel);
1531   // Start with a single server.
1532   response_generator.SetNextResolution(GetServersPorts(0, 1));
1533   // Send RPCs. They should all go to server 0.
1534   for (size_t i = 0; i < 10; ++i) {
1535     CheckRpcSendOk(DEBUG_LOCATION, stub, /*wait_for_ready=*/false,
1536                    /*load_report=*/nullptr, /*timeout_ms=*/4000);
1537   }
1538   EXPECT_EQ(10, servers_[0]->service_.request_count());
1539   EXPECT_EQ(0, servers_[1]->service_.request_count());
1540   servers_[0]->service_.ResetCounters();
1541   // Send an update adding an unreachable server and server 1.
1542   std::vector<int> ports = {servers_[0]->port_, grpc_pick_unused_port_or_die(),
1543                             servers_[1]->port_};
1544   response_generator.SetNextResolution(ports);
1545   WaitForServers(DEBUG_LOCATION, stub, 0, 2, /*status_check=*/nullptr,
1546                  /*timeout=*/absl::Seconds(60));
1547   // Send a bunch more RPCs.  They should all succeed and should be
1548   // split evenly between the two servers.
1549   // Note: The split may be slightly uneven because of an extra picker
1550   // update that can happen if the subchannels for servers 0 and 1
1551   // report READY before the subchannel for the unreachable server
1552   // transitions from CONNECTING to TRANSIENT_FAILURE.
1553   for (size_t i = 0; i < 10; ++i) {
1554     CheckRpcSendOk(DEBUG_LOCATION, stub, /*wait_for_ready=*/false,
1555                    /*load_report=*/nullptr, /*timeout_ms=*/4000);
1556   }
1557   EXPECT_THAT(servers_[0]->service_.request_count(),
1558               ::testing::AllOf(::testing::Ge(4), ::testing::Le(6)));
1559   EXPECT_THAT(servers_[1]->service_.request_count(),
1560               ::testing::AllOf(::testing::Ge(4), ::testing::Le(6)));
1561   EXPECT_EQ(10, servers_[0]->service_.request_count() +
1562                     servers_[1]->service_.request_count());
1563 }
1564 
TEST_F(RoundRobinTest,ManyUpdates)1565 TEST_F(RoundRobinTest, ManyUpdates) {
1566   // Start servers and send one RPC per server.
1567   const int kNumServers = 3;
1568   StartServers(kNumServers);
1569   FakeResolverResponseGeneratorWrapper response_generator;
1570   auto channel = BuildChannel("round_robin", response_generator);
1571   auto stub = BuildStub(channel);
1572   std::vector<int> ports = GetServersPorts();
1573   for (size_t i = 0; i < 1000; ++i) {
1574     std::shuffle(ports.begin(), ports.end(),
1575                  std::mt19937(std::random_device()()));
1576     response_generator.SetNextResolution(ports);
1577     if (i % 10 == 0) CheckRpcSendOk(DEBUG_LOCATION, stub);
1578   }
1579   // Check LB policy name for the channel.
1580   EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
1581 }
1582 
TEST_F(RoundRobinTest,ReresolveOnSubchannelConnectionFailure)1583 TEST_F(RoundRobinTest, ReresolveOnSubchannelConnectionFailure) {
1584   // Start 3 servers.
1585   StartServers(3);
1586   // Create channel.
1587   FakeResolverResponseGeneratorWrapper response_generator;
1588   auto channel = BuildChannel("round_robin", response_generator);
1589   auto stub = BuildStub(channel);
1590   // Initially, tell the channel about only the first two servers.
1591   std::vector<int> ports = {servers_[0]->port_, servers_[1]->port_};
1592   response_generator.SetNextResolution(ports);
1593   // Wait for both servers to be seen.
1594   WaitForServers(DEBUG_LOCATION, stub, 0, 2);
1595   // Have server 0 send a GOAWAY.  This should trigger a re-resolution.
1596   gpr_log(GPR_INFO, "****** SENDING GOAWAY FROM SERVER 0 *******");
1597   {
1598     grpc_core::ExecCtx exec_ctx;
1599     grpc_core::Server::FromC(servers_[0]->server_->c_server())->SendGoaways();
1600   }
1601   gpr_log(GPR_INFO, "****** WAITING FOR RE-RESOLUTION REQUEST *******");
1602   EXPECT_TRUE(response_generator.Get()->WaitForReresolutionRequest(
1603       absl::Seconds(5 * grpc_test_slowdown_factor())));
1604   gpr_log(GPR_INFO, "****** RE-RESOLUTION REQUEST SEEN *******");
1605   // Tell the fake resolver to send an update that adds the last server, but
1606   // only when the LB policy requests re-resolution.
1607   ports.push_back(servers_[2]->port_);
1608   response_generator.SetNextResolution(ports);
1609   // Wait for the client to see server 2.
1610   WaitForServer(DEBUG_LOCATION, stub, 2);
1611 }
1612 
TEST_F(RoundRobinTest,FailsEmptyResolverUpdate)1613 TEST_F(RoundRobinTest, FailsEmptyResolverUpdate) {
1614   FakeResolverResponseGeneratorWrapper response_generator;
1615   auto channel = BuildChannel("round_robin", response_generator);
1616   auto stub = BuildStub(channel);
1617   gpr_log(GPR_INFO, "****** SENDING INITIAL RESOLVER RESULT *******");
1618   // Send a resolver result with an empty address list and a callback
1619   // that triggers a notification.
1620   grpc_core::Notification notification;
1621   grpc_core::Resolver::Result result;
1622   result.addresses.emplace();
1623   result.resolution_note = "injected error";
1624   result.result_health_callback = [&](absl::Status status) {
1625     EXPECT_EQ(absl::StatusCode::kUnavailable, status.code());
1626     EXPECT_EQ("empty address list: injected error", status.message()) << status;
1627     notification.Notify();
1628   };
1629   response_generator.SetResponse(std::move(result));
1630   // Wait for channel to report TRANSIENT_FAILURE.
1631   gpr_log(GPR_INFO, "****** TELLING CHANNEL TO CONNECT *******");
1632   auto predicate = [](grpc_connectivity_state state) {
1633     return state == GRPC_CHANNEL_TRANSIENT_FAILURE;
1634   };
1635   EXPECT_TRUE(
1636       WaitForChannelState(channel.get(), predicate, /*try_to_connect=*/true));
1637   // Callback should have been run.
1638   notification.WaitForNotification();
1639   // Return a valid address.
1640   gpr_log(GPR_INFO, "****** SENDING NEXT RESOLVER RESULT *******");
1641   StartServers(1);
1642   response_generator.SetNextResolution(GetServersPorts());
1643   gpr_log(GPR_INFO, "****** SENDING WAIT_FOR_READY RPC *******");
1644   CheckRpcSendOk(DEBUG_LOCATION, stub, /*wait_for_ready=*/true);
1645 }
1646 
TEST_F(RoundRobinTest,TransientFailure)1647 TEST_F(RoundRobinTest, TransientFailure) {
1648   // Start servers and create channel.  Channel should go to READY state.
1649   const int kNumServers = 3;
1650   StartServers(kNumServers);
1651   FakeResolverResponseGeneratorWrapper response_generator;
1652   auto channel = BuildChannel("round_robin", response_generator);
1653   auto stub = BuildStub(channel);
1654   response_generator.SetNextResolution(GetServersPorts());
1655   EXPECT_TRUE(WaitForChannelReady(channel.get()));
1656   // Now kill the servers.  The channel should transition to TRANSIENT_FAILURE.
1657   for (size_t i = 0; i < servers_.size(); ++i) {
1658     servers_[i]->Shutdown();
1659   }
1660   auto predicate = [](grpc_connectivity_state state) {
1661     return state == GRPC_CHANNEL_TRANSIENT_FAILURE;
1662   };
1663   EXPECT_TRUE(WaitForChannelState(channel.get(), predicate));
1664   CheckRpcSendFailure(
1665       DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE,
1666       MakeConnectionFailureRegex("connections to all backends failing"));
1667 }
1668 
TEST_F(RoundRobinTest,TransientFailureAtStartup)1669 TEST_F(RoundRobinTest, TransientFailureAtStartup) {
1670   // Create channel and return servers that don't exist.  Channel should
1671   // quickly transition into TRANSIENT_FAILURE.
1672   FakeResolverResponseGeneratorWrapper response_generator;
1673   auto channel = BuildChannel("round_robin", response_generator);
1674   auto stub = BuildStub(channel);
1675   response_generator.SetNextResolution({
1676       grpc_pick_unused_port_or_die(),
1677       grpc_pick_unused_port_or_die(),
1678       grpc_pick_unused_port_or_die(),
1679   });
1680   for (size_t i = 0; i < servers_.size(); ++i) {
1681     servers_[i]->Shutdown();
1682   }
1683   auto predicate = [](grpc_connectivity_state state) {
1684     return state == GRPC_CHANNEL_TRANSIENT_FAILURE;
1685   };
1686   EXPECT_TRUE(WaitForChannelState(channel.get(), predicate, true));
1687   CheckRpcSendFailure(
1688       DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE,
1689       MakeConnectionFailureRegex("connections to all backends failing"));
1690 }
1691 
TEST_F(RoundRobinTest,StaysInTransientFailureInSubsequentConnecting)1692 TEST_F(RoundRobinTest, StaysInTransientFailureInSubsequentConnecting) {
1693   // Start connection injector.
1694   ConnectionAttemptInjector injector;
1695   // Get port.
1696   const int port = grpc_pick_unused_port_or_die();
1697   // Create channel.
1698   FakeResolverResponseGeneratorWrapper response_generator;
1699   auto channel = BuildChannel("round_robin", response_generator);
1700   auto stub = BuildStub(channel);
1701   response_generator.SetNextResolution({port});
1702   // Allow first connection attempt to fail normally, and wait for
1703   // channel to report TRANSIENT_FAILURE.
1704   gpr_log(GPR_INFO, "=== WAITING FOR CHANNEL TO REPORT TF ===");
1705   auto predicate = [](grpc_connectivity_state state) {
1706     return state == GRPC_CHANNEL_TRANSIENT_FAILURE;
1707   };
1708   EXPECT_TRUE(
1709       WaitForChannelState(channel.get(), predicate, /*try_to_connect=*/true));
1710   // Wait for next connection attempt to start.
1711   auto hold = injector.AddHold(port);
1712   hold->Wait();
1713   // Now the subchannel should be reporting CONNECTING.  Make sure the
1714   // channel is still in TRANSIENT_FAILURE and is still reporting the
1715   // right status.
1716   EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel->GetState(false));
1717   // Send a few RPCs, just to give the channel a chance to propagate a
1718   // new picker, in case it was going to incorrectly do so.
1719   gpr_log(GPR_INFO, "=== EXPECTING RPCs TO FAIL ===");
1720   for (size_t i = 0; i < 5; ++i) {
1721     CheckRpcSendFailure(
1722         DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE,
1723         MakeConnectionFailureRegex("connections to all backends failing"));
1724   }
1725   // Clean up.
1726   hold->Resume();
1727 }
1728 
TEST_F(RoundRobinTest,ReportsLatestStatusInTransientFailure)1729 TEST_F(RoundRobinTest, ReportsLatestStatusInTransientFailure) {
1730   // Start connection injector.
1731   ConnectionAttemptInjector injector;
1732   // Get ports.
1733   const std::vector<int> ports = {grpc_pick_unused_port_or_die(),
1734                                   grpc_pick_unused_port_or_die()};
1735   // Create channel.
1736   FakeResolverResponseGeneratorWrapper response_generator;
1737   auto channel = BuildChannel("round_robin", response_generator);
1738   auto stub = BuildStub(channel);
1739   response_generator.SetNextResolution(ports);
1740   // Allow first connection attempts to fail normally, and check that
1741   // the RPC fails with the right status message.
1742   CheckRpcSendFailure(
1743       DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE,
1744       MakeConnectionFailureRegex("connections to all backends failing"));
1745   // Now intercept the next connection attempt for each port.
1746   auto hold1 = injector.AddHold(ports[0]);
1747   auto hold2 = injector.AddHold(ports[1]);
1748   hold1->Wait();
1749   hold2->Wait();
1750   // Inject a custom failure message.
1751   hold1->Fail(GRPC_ERROR_CREATE("Survey says... Bzzzzt!"));
1752   // Wait until RPC fails with the right message.
1753   absl::Time deadline =
1754       absl::Now() + (absl::Seconds(5) * grpc_test_slowdown_factor());
1755   while (true) {
1756     Status status = SendRpc(stub);
1757     EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code());
1758     if (::testing::Matches(::testing::MatchesRegex(
1759             "connections to all backends failing; last error: "
1760             "UNKNOWN: (ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: "
1761             "Survey says... Bzzzzt!"))(status.error_message())) {
1762       break;
1763     }
1764     gpr_log(GPR_INFO, "STATUS MESSAGE: %s", status.error_message().c_str());
1765     EXPECT_THAT(status.error_message(),
1766                 ::testing::MatchesRegex(MakeConnectionFailureRegex(
1767                     "connections to all backends failing")));
1768     EXPECT_LT(absl::Now(), deadline);
1769     if (absl::Now() >= deadline) break;
1770   }
1771   // Clean up.
1772   hold2->Resume();
1773 }
1774 
TEST_F(RoundRobinTest,DoesNotFailRpcsUponDisconnection)1775 TEST_F(RoundRobinTest, DoesNotFailRpcsUponDisconnection) {
1776   // Start connection injector.
1777   ConnectionAttemptInjector injector;
1778   // Start server.
1779   StartServers(1);
1780   // Create channel.
1781   FakeResolverResponseGeneratorWrapper response_generator;
1782   auto channel = BuildChannel("round_robin", response_generator);
1783   auto stub = BuildStub(channel);
1784   response_generator.SetNextResolution(GetServersPorts());
1785   // Start a thread constantly sending RPCs in a loop.
1786   gpr_log(GPR_INFO, "=== STARTING CLIENT THREAD ===");
1787   std::atomic<bool> shutdown{false};
1788   gpr_event ev;
1789   gpr_event_init(&ev);
1790   std::thread thd([&]() {
1791     gpr_log(GPR_INFO, "sending first RPC");
1792     CheckRpcSendOk(DEBUG_LOCATION, stub);
1793     gpr_event_set(&ev, reinterpret_cast<void*>(1));
1794     while (!shutdown.load()) {
1795       gpr_log(GPR_INFO, "sending RPC");
1796       CheckRpcSendOk(DEBUG_LOCATION, stub);
1797     }
1798   });
1799   // Wait for first RPC to complete.
1800   gpr_log(GPR_INFO, "=== WAITING FOR FIRST RPC TO COMPLETE ===");
1801   ASSERT_EQ(reinterpret_cast<void*>(1),
1802             gpr_event_wait(&ev, grpc_timeout_seconds_to_deadline(1)));
1803   // Channel should now be READY.
1804   ASSERT_EQ(GRPC_CHANNEL_READY, channel->GetState(false));
1805   // Tell injector to intercept the next connection attempt.
1806   auto hold1 =
1807       injector.AddHold(servers_[0]->port_, /*intercept_completion=*/true);
1808   // Now kill the server.  The subchannel should report IDLE and be
1809   // immediately reconnected to, but this should not cause any test
1810   // failures.
1811   gpr_log(GPR_INFO, "=== SHUTTING DOWN SERVER ===");
1812   {
1813     grpc_core::ExecCtx exec_ctx;
1814     grpc_core::Server::FromC(servers_[0]->server_->c_server())->SendGoaways();
1815   }
1816   gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));
1817   servers_[0]->Shutdown();
1818   // Wait for next attempt to start.
1819   gpr_log(GPR_INFO, "=== WAITING FOR RECONNECTION ATTEMPT ===");
1820   hold1->Wait();
1821   // Start server and allow attempt to continue.
1822   gpr_log(GPR_INFO, "=== RESTARTING SERVER ===");
1823   StartServer(0);
1824   hold1->Resume();
1825   // Wait for next attempt to complete.
1826   gpr_log(GPR_INFO, "=== WAITING FOR RECONNECTION ATTEMPT TO COMPLETE ===");
1827   hold1->WaitForCompletion();
1828   // Now shut down the thread.
1829   gpr_log(GPR_INFO, "=== SHUTTING DOWN CLIENT THREAD ===");
1830   shutdown.store(true);
1831   thd.join();
1832 }
1833 
TEST_F(RoundRobinTest,SingleReconnect)1834 TEST_F(RoundRobinTest, SingleReconnect) {
1835   const int kNumServers = 3;
1836   StartServers(kNumServers);
1837   const auto ports = GetServersPorts();
1838   FakeResolverResponseGeneratorWrapper response_generator;
1839   auto channel = BuildChannel("round_robin", response_generator);
1840   auto stub = BuildStub(channel);
1841   response_generator.SetNextResolution(ports);
1842   WaitForServers(DEBUG_LOCATION, stub);
1843   // Sync to end of list.
1844   WaitForServer(DEBUG_LOCATION, stub, servers_.size() - 1);
1845   for (size_t i = 0; i < servers_.size(); ++i) {
1846     CheckRpcSendOk(DEBUG_LOCATION, stub);
1847     EXPECT_EQ(1, servers_[i]->service_.request_count()) << "for backend #" << i;
1848   }
1849   // One request should have gone to each server.
1850   for (size_t i = 0; i < servers_.size(); ++i) {
1851     EXPECT_EQ(1, servers_[i]->service_.request_count());
1852   }
1853   // Kill the first server.
1854   servers_[0]->StopListeningAndSendGoaways();
1855   // Wait for client to notice that the backend is down.  We know that's
1856   // happened when we see kNumServers RPCs that do not go to backend 0.
1857   ResetCounters();
1858   SendRpcsUntil(
1859       DEBUG_LOCATION, stub,
1860       [&, num_rpcs_not_on_backend_0 = 0](const Status& status) mutable {
1861         EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
1862                                  << " message=" << status.error_message();
1863         if (servers_[0]->service_.request_count() == 1) {
1864           num_rpcs_not_on_backend_0 = 0;
1865         } else {
1866           ++num_rpcs_not_on_backend_0;
1867         }
1868         ResetCounters();
1869         return num_rpcs_not_on_backend_0 < kNumServers;
1870       });
1871   // Send a bunch of RPCs.
1872   for (int i = 0; i < 10 * kNumServers; ++i) {
1873     CheckRpcSendOk(DEBUG_LOCATION, stub);
1874   }
1875   // No requests have gone to the deceased server.
1876   EXPECT_EQ(0UL, servers_[0]->service_.request_count());
1877   // Bring the first server back up.
1878   servers_[0]->Shutdown();
1879   StartServer(0);
1880   // Requests should start arriving at the first server either right away (if
1881   // the server managed to start before the RR policy retried the subchannel) or
1882   // after the subchannel retry delay otherwise (RR's subchannel retried before
1883   // the server was fully back up).
1884   WaitForServer(DEBUG_LOCATION, stub, 0);
1885 }
1886 
1887 // If health checking is required by client but health checking service
1888 // is not running on the server, the channel should be treated as healthy.
TEST_F(RoundRobinTest,ServersHealthCheckingUnimplementedTreatedAsHealthy)1889 TEST_F(RoundRobinTest, ServersHealthCheckingUnimplementedTreatedAsHealthy) {
1890   StartServers(1);  // Single server
1891   ChannelArguments args;
1892   args.SetServiceConfigJSON(
1893       "{\"healthCheckConfig\": "
1894       "{\"serviceName\": \"health_check_service_name\"}}");
1895   FakeResolverResponseGeneratorWrapper response_generator;
1896   auto channel = BuildChannel("round_robin", response_generator, args);
1897   auto stub = BuildStub(channel);
1898   response_generator.SetNextResolution({servers_[0]->port_});
1899   EXPECT_TRUE(WaitForChannelReady(channel.get()));
1900   CheckRpcSendOk(DEBUG_LOCATION, stub);
1901 }
1902 
TEST_F(RoundRobinTest,HealthChecking)1903 TEST_F(RoundRobinTest, HealthChecking) {
1904   EnableDefaultHealthCheckService(true);
1905   // Start servers.
1906   const int kNumServers = 3;
1907   StartServers(kNumServers);
1908   ChannelArguments args;
1909   args.SetServiceConfigJSON(
1910       "{\"healthCheckConfig\": "
1911       "{\"serviceName\": \"health_check_service_name\"}}");
1912   FakeResolverResponseGeneratorWrapper response_generator;
1913   auto channel = BuildChannel("round_robin", response_generator, args);
1914   auto stub = BuildStub(channel);
1915   response_generator.SetNextResolution(GetServersPorts());
1916   // Channel should not become READY, because health checks should be failing.
1917   gpr_log(GPR_INFO,
1918           "*** initial state: unknown health check service name for "
1919           "all servers");
1920   EXPECT_FALSE(WaitForChannelReady(channel.get(), 1));
1921   // Now set one of the servers to be healthy.
1922   // The channel should become healthy and all requests should go to
1923   // the healthy server.
1924   gpr_log(GPR_INFO, "*** server 0 healthy");
1925   servers_[0]->SetServingStatus("health_check_service_name", true);
1926   EXPECT_TRUE(WaitForChannelReady(channel.get()));
1927   // New channel state may be reported before the picker is updated, so
1928   // wait for the server before proceeding.
1929   WaitForServer(DEBUG_LOCATION, stub, 0);
1930   for (int i = 0; i < 10; ++i) {
1931     CheckRpcSendOk(DEBUG_LOCATION, stub);
1932   }
1933   EXPECT_EQ(10, servers_[0]->service_.request_count());
1934   EXPECT_EQ(0, servers_[1]->service_.request_count());
1935   EXPECT_EQ(0, servers_[2]->service_.request_count());
1936   // Now set a second server to be healthy.
1937   gpr_log(GPR_INFO, "*** server 2 healthy");
1938   servers_[2]->SetServingStatus("health_check_service_name", true);
1939   WaitForServer(DEBUG_LOCATION, stub, 2);
1940   for (int i = 0; i < 10; ++i) {
1941     CheckRpcSendOk(DEBUG_LOCATION, stub);
1942   }
1943   EXPECT_EQ(5, servers_[0]->service_.request_count());
1944   EXPECT_EQ(0, servers_[1]->service_.request_count());
1945   EXPECT_EQ(5, servers_[2]->service_.request_count());
1946   // Now set the remaining server to be healthy.
1947   gpr_log(GPR_INFO, "*** server 1 healthy");
1948   servers_[1]->SetServingStatus("health_check_service_name", true);
1949   WaitForServer(DEBUG_LOCATION, stub, 1);
1950   for (int i = 0; i < 9; ++i) {
1951     CheckRpcSendOk(DEBUG_LOCATION, stub);
1952   }
1953   EXPECT_EQ(3, servers_[0]->service_.request_count());
1954   EXPECT_EQ(3, servers_[1]->service_.request_count());
1955   EXPECT_EQ(3, servers_[2]->service_.request_count());
1956   // Now set one server to be unhealthy again.  Then wait until the
1957   // unhealthiness has hit the client.  We know that the client will see
1958   // this when we send kNumServers requests and one of the remaining servers
1959   // sees two of the requests.
1960   gpr_log(GPR_INFO, "*** server 0 unhealthy");
1961   servers_[0]->SetServingStatus("health_check_service_name", false);
1962   do {
1963     ResetCounters();
1964     for (int i = 0; i < kNumServers; ++i) {
1965       CheckRpcSendOk(DEBUG_LOCATION, stub);
1966     }
1967   } while (servers_[1]->service_.request_count() != 2 &&
1968            servers_[2]->service_.request_count() != 2);
1969   // Now set the remaining two servers to be unhealthy.  Make sure the
1970   // channel leaves READY state and that RPCs fail.
1971   gpr_log(GPR_INFO, "*** all servers unhealthy");
1972   servers_[1]->SetServingStatus("health_check_service_name", false);
1973   servers_[2]->SetServingStatus("health_check_service_name", false);
1974   EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
1975   // New channel state may be reported before the picker is updated, so
1976   // one or two more RPCs may succeed before we see a failure.
1977   SendRpcsUntil(DEBUG_LOCATION, stub, [&](const Status& status) {
1978     if (status.ok()) return true;
1979     EXPECT_EQ(status.error_code(), StatusCode::UNAVAILABLE);
1980     EXPECT_THAT(
1981         status.error_message(),
1982         ::testing::MatchesRegex(
1983             "connections to all backends failing; last error: "
1984             "(ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: backend unhealthy"));
1985     return false;
1986   });
1987   // Clean up.
1988   EnableDefaultHealthCheckService(false);
1989 }
1990 
TEST_F(RoundRobinTest,HealthCheckingHandlesSubchannelFailure)1991 TEST_F(RoundRobinTest, HealthCheckingHandlesSubchannelFailure) {
1992   EnableDefaultHealthCheckService(true);
1993   // Start servers.
1994   const int kNumServers = 3;
1995   StartServers(kNumServers);
1996   servers_[0]->SetServingStatus("health_check_service_name", true);
1997   servers_[1]->SetServingStatus("health_check_service_name", true);
1998   servers_[2]->SetServingStatus("health_check_service_name", true);
1999   ChannelArguments args;
2000   args.SetServiceConfigJSON(
2001       "{\"healthCheckConfig\": "
2002       "{\"serviceName\": \"health_check_service_name\"}}");
2003   FakeResolverResponseGeneratorWrapper response_generator;
2004   auto channel = BuildChannel("round_robin", response_generator, args);
2005   auto stub = BuildStub(channel);
2006   response_generator.SetNextResolution(GetServersPorts());
2007   WaitForServer(DEBUG_LOCATION, stub, 0);
2008   // Stop server 0 and send a new resolver result to ensure that RR
2009   // checks each subchannel's state.
2010   servers_[0]->StopListeningAndSendGoaways();
2011   response_generator.SetNextResolution(GetServersPorts());
2012   // Send a bunch more RPCs.
2013   for (size_t i = 0; i < 100; i++) {
2014     CheckRpcSendOk(DEBUG_LOCATION, stub);
2015   }
2016 }
2017 
TEST_F(RoundRobinTest,WithHealthCheckingInhibitPerChannel)2018 TEST_F(RoundRobinTest, WithHealthCheckingInhibitPerChannel) {
2019   EnableDefaultHealthCheckService(true);
2020   // Start server.
2021   const int kNumServers = 1;
2022   StartServers(kNumServers);
2023   // Create a channel with health-checking enabled.
2024   ChannelArguments args;
2025   args.SetServiceConfigJSON(
2026       "{\"healthCheckConfig\": "
2027       "{\"serviceName\": \"health_check_service_name\"}}");
2028   FakeResolverResponseGeneratorWrapper response_generator1;
2029   auto channel1 = BuildChannel("round_robin", response_generator1, args);
2030   auto stub1 = BuildStub(channel1);
2031   std::vector<int> ports = GetServersPorts();
2032   response_generator1.SetNextResolution(ports);
2033   // Create a channel with health checking enabled but inhibited.
2034   args.SetInt(GRPC_ARG_INHIBIT_HEALTH_CHECKING, 1);
2035   FakeResolverResponseGeneratorWrapper response_generator2;
2036   auto channel2 = BuildChannel("round_robin", response_generator2, args);
2037   auto stub2 = BuildStub(channel2);
2038   response_generator2.SetNextResolution(ports);
2039   // First channel should not become READY, because health checks should be
2040   // failing.
2041   EXPECT_FALSE(WaitForChannelReady(channel1.get(), 1));
2042   CheckRpcSendFailure(
2043       DEBUG_LOCATION, stub1, StatusCode::UNAVAILABLE,
2044       "connections to all backends failing; last error: "
2045       "(ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: backend unhealthy");
2046   // Second channel should be READY.
2047   EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1));
2048   CheckRpcSendOk(DEBUG_LOCATION, stub2);
2049   // Enable health checks on the backend and wait for channel 1 to succeed.
2050   servers_[0]->SetServingStatus("health_check_service_name", true);
2051   CheckRpcSendOk(DEBUG_LOCATION, stub1, true /* wait_for_ready */);
2052   // Check that we created only one subchannel to the backend.
2053   EXPECT_EQ(1UL, servers_[0]->service_.clients().size());
2054   // Clean up.
2055   EnableDefaultHealthCheckService(false);
2056 }
2057 
TEST_F(RoundRobinTest,HealthCheckingServiceNamePerChannel)2058 TEST_F(RoundRobinTest, HealthCheckingServiceNamePerChannel) {
2059   EnableDefaultHealthCheckService(true);
2060   // Start server.
2061   const int kNumServers = 1;
2062   StartServers(kNumServers);
2063   // Create a channel with health-checking enabled.
2064   ChannelArguments args;
2065   args.SetServiceConfigJSON(
2066       "{\"healthCheckConfig\": "
2067       "{\"serviceName\": \"health_check_service_name\"}}");
2068   FakeResolverResponseGeneratorWrapper response_generator1;
2069   auto channel1 = BuildChannel("round_robin", response_generator1, args);
2070   auto stub1 = BuildStub(channel1);
2071   std::vector<int> ports = GetServersPorts();
2072   response_generator1.SetNextResolution(ports);
2073   // Create a channel with health-checking enabled with a different
2074   // service name.
2075   ChannelArguments args2;
2076   args2.SetServiceConfigJSON(
2077       "{\"healthCheckConfig\": "
2078       "{\"serviceName\": \"health_check_service_name2\"}}");
2079   FakeResolverResponseGeneratorWrapper response_generator2;
2080   auto channel2 = BuildChannel("round_robin", response_generator2, args2);
2081   auto stub2 = BuildStub(channel2);
2082   response_generator2.SetNextResolution(ports);
2083   // Allow health checks from channel 2 to succeed.
2084   servers_[0]->SetServingStatus("health_check_service_name2", true);
2085   // First channel should not become READY, because health checks should be
2086   // failing.
2087   EXPECT_FALSE(WaitForChannelReady(channel1.get(), 1));
2088   CheckRpcSendFailure(
2089       DEBUG_LOCATION, stub1, StatusCode::UNAVAILABLE,
2090       "connections to all backends failing; last error: "
2091       "(ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: backend unhealthy");
2092   // Second channel should be READY.
2093   EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1));
2094   CheckRpcSendOk(DEBUG_LOCATION, stub2);
2095   // Enable health checks for channel 1 and wait for it to succeed.
2096   servers_[0]->SetServingStatus("health_check_service_name", true);
2097   CheckRpcSendOk(DEBUG_LOCATION, stub1, true /* wait_for_ready */);
2098   // Check that we created only one subchannel to the backend.
2099   EXPECT_EQ(1UL, servers_[0]->service_.clients().size());
2100   // Clean up.
2101   EnableDefaultHealthCheckService(false);
2102 }
2103 
TEST_F(RoundRobinTest,HealthCheckingServiceNameChangesAfterSubchannelsCreated)2104 TEST_F(RoundRobinTest,
2105        HealthCheckingServiceNameChangesAfterSubchannelsCreated) {
2106   EnableDefaultHealthCheckService(true);
2107   // Start server.
2108   const int kNumServers = 1;
2109   StartServers(kNumServers);
2110   // Create a channel with health-checking enabled.
2111   const char* kServiceConfigJson =
2112       "{\"healthCheckConfig\": "
2113       "{\"serviceName\": \"health_check_service_name\"}}";
2114   FakeResolverResponseGeneratorWrapper response_generator;
2115   auto channel = BuildChannel("round_robin", response_generator);
2116   auto stub = BuildStub(channel);
2117   std::vector<int> ports = GetServersPorts();
2118   response_generator.SetNextResolution(ports, kServiceConfigJson);
2119   servers_[0]->SetServingStatus("health_check_service_name", true);
2120   EXPECT_TRUE(WaitForChannelReady(channel.get(), 1 /* timeout_seconds */));
2121   // Send an update on the channel to change it to use a health checking
2122   // service name that is not being reported as healthy.
2123   const char* kServiceConfigJson2 =
2124       "{\"healthCheckConfig\": "
2125       "{\"serviceName\": \"health_check_service_name2\"}}";
2126   response_generator.SetNextResolution(ports, kServiceConfigJson2);
2127   EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
2128   // Clean up.
2129   EnableDefaultHealthCheckService(false);
2130 }
2131 
TEST_F(RoundRobinTest,HealthCheckingRetryOnStreamEnd)2132 TEST_F(RoundRobinTest, HealthCheckingRetryOnStreamEnd) {
2133   // Start servers.
2134   const int kNumServers = 2;
2135   CreateServers(kNumServers);
2136   EnableNoopHealthCheckService();
2137   StartServer(0);
2138   StartServer(1);
2139   ChannelArguments args;
2140   // Create a channel with health-checking enabled.
2141   args.SetServiceConfigJSON(
2142       "{\"healthCheckConfig\": "
2143       "{\"serviceName\": \"health_check_service_name\"}}");
2144   FakeResolverResponseGeneratorWrapper response_generator;
2145   auto channel = BuildChannel("round_robin", response_generator, args);
2146   response_generator.SetNextResolution(GetServersPorts());
2147   EXPECT_FALSE(WaitForChannelReady(channel.get()));
2148   EXPECT_GT(servers_[0]->noop_health_check_service_impl_.request_count(), 1);
2149   EXPECT_GT(servers_[1]->noop_health_check_service_impl_.request_count(), 1);
2150 }
2151 
2152 //
2153 // LB policy pick args
2154 //
2155 
2156 class ClientLbPickArgsTest : public ClientLbEnd2endTest {
2157  protected:
SetUp()2158   void SetUp() override {
2159     ClientLbEnd2endTest::SetUp();
2160     current_test_instance_ = this;
2161   }
2162 
SetUpTestSuite()2163   static void SetUpTestSuite() {
2164     grpc_core::CoreConfiguration::Reset();
2165     grpc_core::CoreConfiguration::RegisterBuilder(
2166         [](grpc_core::CoreConfiguration::Builder* builder) {
2167           grpc_core::RegisterTestPickArgsLoadBalancingPolicy(builder,
2168                                                              SavePickArgs);
2169         });
2170     grpc_init();
2171   }
2172 
TearDownTestSuite()2173   static void TearDownTestSuite() {
2174     grpc_shutdown();
2175     grpc_core::CoreConfiguration::Reset();
2176   }
2177 
args_seen_list()2178   std::vector<grpc_core::PickArgsSeen> args_seen_list() {
2179     grpc_core::MutexLock lock(&mu_);
2180     return args_seen_list_;
2181   }
2182 
ArgsSeenListString(const std::vector<grpc_core::PickArgsSeen> & args_seen_list)2183   static std::string ArgsSeenListString(
2184       const std::vector<grpc_core::PickArgsSeen>& args_seen_list) {
2185     std::vector<std::string> entries;
2186     for (const auto& args_seen : args_seen_list) {
2187       std::vector<std::string> metadata;
2188       for (const auto& p : args_seen.metadata) {
2189         metadata.push_back(absl::StrCat(p.first, "=", p.second));
2190       }
2191       entries.push_back(absl::StrFormat("{path=\"%s\", metadata=[%s]}",
2192                                         args_seen.path,
2193                                         absl::StrJoin(metadata, ", ")));
2194     }
2195     return absl::StrCat("[", absl::StrJoin(entries, ", "), "]");
2196   }
2197 
2198  private:
SavePickArgs(const grpc_core::PickArgsSeen & args_seen)2199   static void SavePickArgs(const grpc_core::PickArgsSeen& args_seen) {
2200     ClientLbPickArgsTest* self = current_test_instance_;
2201     grpc_core::MutexLock lock(&self->mu_);
2202     self->args_seen_list_.emplace_back(args_seen);
2203   }
2204 
2205   static ClientLbPickArgsTest* current_test_instance_;
2206   grpc_core::Mutex mu_;
2207   std::vector<grpc_core::PickArgsSeen> args_seen_list_;
2208 };
2209 
2210 ClientLbPickArgsTest* ClientLbPickArgsTest::current_test_instance_ = nullptr;
2211 
TEST_F(ClientLbPickArgsTest,Basic)2212 TEST_F(ClientLbPickArgsTest, Basic) {
2213   const int kNumServers = 1;
2214   StartServers(kNumServers);
2215   FakeResolverResponseGeneratorWrapper response_generator;
2216   auto channel = BuildChannel("test_pick_args_lb", response_generator);
2217   auto stub = BuildStub(channel);
2218   response_generator.SetNextResolution(GetServersPorts());
2219   // Proactively connect the channel, so that the LB policy will always
2220   // be connected before it sees the pick.  Otherwise, the test would be
2221   // flaky because sometimes the pick would be seen twice (once in
2222   // CONNECTING and again in READY) and other times only once (in READY).
2223   ASSERT_TRUE(channel->WaitForConnected(gpr_inf_future(GPR_CLOCK_MONOTONIC)));
2224   // Check LB policy name for the channel.
2225   EXPECT_EQ("test_pick_args_lb", channel->GetLoadBalancingPolicyName());
2226   // Now send an RPC and check that the picker sees the expected data.
2227   CheckRpcSendOk(DEBUG_LOCATION, stub, /*wait_for_ready=*/true);
2228   auto pick_args_seen_list = args_seen_list();
2229   EXPECT_THAT(pick_args_seen_list,
2230               ::testing::ElementsAre(::testing::AllOf(
2231                   ::testing::Field(&grpc_core::PickArgsSeen::path,
2232                                    "/grpc.testing.EchoTestService/Echo"),
2233                   ::testing::Field(&grpc_core::PickArgsSeen::metadata,
2234                                    ::testing::UnorderedElementsAre(
2235                                        ::testing::Pair("foo", "1"),
2236                                        ::testing::Pair("bar", "2"),
2237                                        ::testing::Pair("baz", "3"))))))
2238       << ArgsSeenListString(pick_args_seen_list);
2239 }
2240 
2241 class OrcaLoadReportBuilder {
2242  public:
2243   OrcaLoadReportBuilder() = default;
OrcaLoadReportBuilder(const OrcaLoadReport & report)2244   explicit OrcaLoadReportBuilder(const OrcaLoadReport& report)
2245       : report_(report) {}
SetApplicationUtilization(double v)2246   OrcaLoadReportBuilder& SetApplicationUtilization(double v) {
2247     report_.set_application_utilization(v);
2248     return *this;
2249   }
SetCpuUtilization(double v)2250   OrcaLoadReportBuilder& SetCpuUtilization(double v) {
2251     report_.set_cpu_utilization(v);
2252     return *this;
2253   }
SetMemUtilization(double v)2254   OrcaLoadReportBuilder& SetMemUtilization(double v) {
2255     report_.set_mem_utilization(v);
2256     return *this;
2257   }
SetQps(double v)2258   OrcaLoadReportBuilder& SetQps(double v) {
2259     report_.set_rps_fractional(v);
2260     return *this;
2261   }
SetEps(double v)2262   OrcaLoadReportBuilder& SetEps(double v) {
2263     report_.set_eps(v);
2264     return *this;
2265   }
SetRequestCost(absl::string_view n,double v)2266   OrcaLoadReportBuilder& SetRequestCost(absl::string_view n, double v) {
2267     (*report_.mutable_request_cost())[n] = v;
2268     return *this;
2269   }
SetUtilization(absl::string_view n,double v)2270   OrcaLoadReportBuilder& SetUtilization(absl::string_view n, double v) {
2271     (*report_.mutable_utilization())[n] = v;
2272     return *this;
2273   }
SetNamedMetrics(absl::string_view n,double v)2274   OrcaLoadReportBuilder& SetNamedMetrics(absl::string_view n, double v) {
2275     (*report_.mutable_named_metrics())[n] = v;
2276     return *this;
2277   }
Build()2278   OrcaLoadReport Build() { return std::move(report_); }
2279 
2280  private:
2281   OrcaLoadReport report_;
2282 };
2283 
2284 //
2285 // tests that LB policies can get the call's trailing metadata
2286 //
2287 
BackendMetricDataToOrcaLoadReport(const grpc_core::BackendMetricData & backend_metric_data)2288 OrcaLoadReport BackendMetricDataToOrcaLoadReport(
2289     const grpc_core::BackendMetricData& backend_metric_data) {
2290   auto builder = OrcaLoadReportBuilder()
2291                      .SetApplicationUtilization(
2292                          backend_metric_data.application_utilization)
2293                      .SetCpuUtilization(backend_metric_data.cpu_utilization)
2294                      .SetMemUtilization(backend_metric_data.mem_utilization)
2295                      .SetQps(backend_metric_data.qps)
2296                      .SetEps(backend_metric_data.eps);
2297   for (const auto& p : backend_metric_data.request_cost) {
2298     builder.SetRequestCost(std::string(p.first), p.second);
2299   }
2300   for (const auto& p : backend_metric_data.utilization) {
2301     builder.SetUtilization(std::string(p.first), p.second);
2302   }
2303   for (const auto& p : backend_metric_data.named_metrics) {
2304     builder.SetNamedMetrics(std::string(p.first), p.second);
2305   }
2306   return builder.Build();
2307 }
2308 
2309 // TODO(roth): Change this to use EqualsProto() once that becomes available in
2310 // OSS.
CheckLoadReportAsExpected(const OrcaLoadReport & actual,const OrcaLoadReport & expected)2311 void CheckLoadReportAsExpected(const OrcaLoadReport& actual,
2312                                const OrcaLoadReport& expected) {
2313   EXPECT_EQ(actual.application_utilization(),
2314             expected.application_utilization());
2315   EXPECT_EQ(actual.cpu_utilization(), expected.cpu_utilization());
2316   EXPECT_EQ(actual.mem_utilization(), expected.mem_utilization());
2317   EXPECT_EQ(actual.rps_fractional(), expected.rps_fractional());
2318   EXPECT_EQ(actual.eps(), expected.eps());
2319   EXPECT_EQ(actual.request_cost().size(), expected.request_cost().size());
2320   for (const auto& p : actual.request_cost()) {
2321     auto it = expected.request_cost().find(p.first);
2322     ASSERT_NE(it, expected.request_cost().end());
2323     EXPECT_EQ(it->second, p.second);
2324   }
2325   EXPECT_EQ(actual.utilization().size(), expected.utilization().size());
2326   for (const auto& p : actual.utilization()) {
2327     auto it = expected.utilization().find(p.first);
2328     ASSERT_NE(it, expected.utilization().end());
2329     EXPECT_EQ(it->second, p.second);
2330   }
2331   EXPECT_EQ(actual.named_metrics().size(), expected.named_metrics().size());
2332   for (const auto& p : actual.named_metrics()) {
2333     auto it = expected.named_metrics().find(p.first);
2334     ASSERT_NE(it, expected.named_metrics().end());
2335     EXPECT_EQ(it->second, p.second);
2336   }
2337 }
2338 
2339 class ClientLbInterceptTrailingMetadataTest : public ClientLbEnd2endTest {
2340  protected:
SetUp()2341   void SetUp() override {
2342     ClientLbEnd2endTest::SetUp();
2343     current_test_instance_ = this;
2344   }
2345 
SetUpTestSuite()2346   static void SetUpTestSuite() {
2347     grpc_core::CoreConfiguration::Reset();
2348     grpc_core::CoreConfiguration::RegisterBuilder(
2349         [](grpc_core::CoreConfiguration::Builder* builder) {
2350           grpc_core::RegisterInterceptRecvTrailingMetadataLoadBalancingPolicy(
2351               builder, ReportTrailerIntercepted);
2352         });
2353     grpc_init();
2354   }
2355 
TearDownTestSuite()2356   static void TearDownTestSuite() {
2357     grpc_shutdown();
2358     grpc_core::CoreConfiguration::Reset();
2359   }
2360 
num_trailers_intercepted()2361   int num_trailers_intercepted() {
2362     grpc_core::MutexLock lock(&mu_);
2363     return num_trailers_intercepted_;
2364   }
2365 
last_status()2366   absl::Status last_status() {
2367     grpc_core::MutexLock lock(&mu_);
2368     return last_status_;
2369   }
2370 
trailing_metadata()2371   grpc_core::MetadataVector trailing_metadata() {
2372     grpc_core::MutexLock lock(&mu_);
2373     return std::move(trailing_metadata_);
2374   }
2375 
backend_load_report()2376   absl::optional<OrcaLoadReport> backend_load_report() {
2377     grpc_core::MutexLock lock(&mu_);
2378     return std::move(load_report_);
2379   }
2380 
2381   // Returns true if received callback within deadline.
WaitForLbCallback()2382   bool WaitForLbCallback() {
2383     grpc_core::MutexLock lock(&mu_);
2384     while (!trailer_intercepted_) {
2385       if (cond_.WaitWithTimeout(&mu_, absl::Seconds(3))) return false;
2386     }
2387     trailer_intercepted_ = false;
2388     return true;
2389   }
2390 
RunPerRpcMetricReportingTest(const OrcaLoadReport & reported,const OrcaLoadReport & expected)2391   void RunPerRpcMetricReportingTest(const OrcaLoadReport& reported,
2392                                     const OrcaLoadReport& expected) {
2393     const int kNumServers = 1;
2394     const int kNumRpcs = 10;
2395     StartServers(kNumServers);
2396     FakeResolverResponseGeneratorWrapper response_generator;
2397     auto channel =
2398         BuildChannel("intercept_trailing_metadata_lb", response_generator);
2399     auto stub = BuildStub(channel);
2400     response_generator.SetNextResolution(GetServersPorts());
2401     for (size_t i = 0; i < kNumRpcs; ++i) {
2402       CheckRpcSendOk(DEBUG_LOCATION, stub, false, &reported);
2403       auto actual = backend_load_report();
2404       ASSERT_TRUE(actual.has_value());
2405       CheckLoadReportAsExpected(*actual, expected);
2406     }
2407     // Check LB policy name for the channel.
2408     EXPECT_EQ("intercept_trailing_metadata_lb",
2409               channel->GetLoadBalancingPolicyName());
2410     EXPECT_EQ(kNumRpcs, num_trailers_intercepted());
2411   }
2412 
2413  private:
ReportTrailerIntercepted(const grpc_core::TrailingMetadataArgsSeen & args_seen)2414   static void ReportTrailerIntercepted(
2415       const grpc_core::TrailingMetadataArgsSeen& args_seen) {
2416     const auto* backend_metric_data = args_seen.backend_metric_data;
2417     ClientLbInterceptTrailingMetadataTest* self = current_test_instance_;
2418     grpc_core::MutexLock lock(&self->mu_);
2419     self->last_status_ = args_seen.status;
2420     self->num_trailers_intercepted_++;
2421     self->trailer_intercepted_ = true;
2422     self->trailing_metadata_ = args_seen.metadata;
2423     if (backend_metric_data != nullptr) {
2424       self->load_report_ =
2425           BackendMetricDataToOrcaLoadReport(*backend_metric_data);
2426     }
2427     self->cond_.Signal();
2428   }
2429 
2430   static ClientLbInterceptTrailingMetadataTest* current_test_instance_;
2431   int num_trailers_intercepted_ = 0;
2432   bool trailer_intercepted_ = false;
2433   grpc_core::Mutex mu_;
2434   grpc_core::CondVar cond_;
2435   absl::Status last_status_;
2436   grpc_core::MetadataVector trailing_metadata_;
2437   absl::optional<OrcaLoadReport> load_report_;
2438 };
2439 
2440 ClientLbInterceptTrailingMetadataTest*
2441     ClientLbInterceptTrailingMetadataTest::current_test_instance_ = nullptr;
2442 
TEST_F(ClientLbInterceptTrailingMetadataTest,StatusOk)2443 TEST_F(ClientLbInterceptTrailingMetadataTest, StatusOk) {
2444   StartServers(1);
2445   FakeResolverResponseGeneratorWrapper response_generator;
2446   auto channel =
2447       BuildChannel("intercept_trailing_metadata_lb", response_generator);
2448   auto stub = BuildStub(channel);
2449   response_generator.SetNextResolution(GetServersPorts());
2450   // Send an OK RPC.
2451   CheckRpcSendOk(DEBUG_LOCATION, stub);
2452   // Check LB policy name for the channel.
2453   EXPECT_EQ("intercept_trailing_metadata_lb",
2454             channel->GetLoadBalancingPolicyName());
2455   EXPECT_EQ(1, num_trailers_intercepted());
2456   EXPECT_EQ(absl::OkStatus(), last_status());
2457 }
2458 
TEST_F(ClientLbInterceptTrailingMetadataTest,StatusFailed)2459 TEST_F(ClientLbInterceptTrailingMetadataTest, StatusFailed) {
2460   StartServers(1);
2461   FakeResolverResponseGeneratorWrapper response_generator;
2462   auto channel =
2463       BuildChannel("intercept_trailing_metadata_lb", response_generator);
2464   auto stub = BuildStub(channel);
2465   response_generator.SetNextResolution(GetServersPorts());
2466   EchoRequest request;
2467   auto* expected_error = request.mutable_param()->mutable_expected_error();
2468   expected_error->set_code(GRPC_STATUS_PERMISSION_DENIED);
2469   expected_error->set_error_message("bummer, man");
2470   Status status = SendRpc(stub, /*response=*/nullptr, /*timeout_ms=*/1000,
2471                           /*wait_for_ready=*/false, &request);
2472   EXPECT_EQ(status.error_code(), StatusCode::PERMISSION_DENIED);
2473   EXPECT_EQ(status.error_message(), "bummer, man");
2474   absl::Status status_seen_by_lb = last_status();
2475   EXPECT_EQ(status_seen_by_lb.code(), absl::StatusCode::kPermissionDenied);
2476   EXPECT_EQ(status_seen_by_lb.message(), "bummer, man");
2477 }
2478 
TEST_F(ClientLbInterceptTrailingMetadataTest,StatusCancelledWithoutStartingRecvTrailingMetadata)2479 TEST_F(ClientLbInterceptTrailingMetadataTest,
2480        StatusCancelledWithoutStartingRecvTrailingMetadata) {
2481   StartServers(1);
2482   FakeResolverResponseGeneratorWrapper response_generator;
2483   auto channel =
2484       BuildChannel("intercept_trailing_metadata_lb", response_generator);
2485   response_generator.SetNextResolution(GetServersPorts());
2486   auto stub = BuildStub(channel);
2487   {
2488     // Start a stream (sends initial metadata) and then cancel without
2489     // calling Finish().
2490     ClientContext ctx;
2491     auto stream = stub->BidiStream(&ctx);
2492     ctx.TryCancel();
2493   }
2494   // Wait for stream to be cancelled.
2495   ASSERT_TRUE(WaitForLbCallback());
2496   // Check status seen by LB policy.
2497   EXPECT_EQ(1, num_trailers_intercepted());
2498   absl::Status status_seen_by_lb = last_status();
2499   EXPECT_EQ(status_seen_by_lb.code(), absl::StatusCode::kCancelled);
2500   EXPECT_EQ(status_seen_by_lb.message(), "call cancelled");
2501 }
2502 
TEST_F(ClientLbInterceptTrailingMetadataTest,InterceptsRetriesDisabled)2503 TEST_F(ClientLbInterceptTrailingMetadataTest, InterceptsRetriesDisabled) {
2504   const int kNumServers = 1;
2505   const int kNumRpcs = 10;
2506   StartServers(kNumServers);
2507   FakeResolverResponseGeneratorWrapper response_generator;
2508   ChannelArguments channel_args;
2509   channel_args.SetInt(GRPC_ARG_ENABLE_RETRIES, 0);
2510   auto channel = BuildChannel("intercept_trailing_metadata_lb",
2511                               response_generator, channel_args);
2512   auto stub = BuildStub(channel);
2513   response_generator.SetNextResolution(GetServersPorts());
2514   for (size_t i = 0; i < kNumRpcs; ++i) {
2515     CheckRpcSendOk(DEBUG_LOCATION, stub);
2516   }
2517   // Check LB policy name for the channel.
2518   EXPECT_EQ("intercept_trailing_metadata_lb",
2519             channel->GetLoadBalancingPolicyName());
2520   EXPECT_EQ(kNumRpcs, num_trailers_intercepted());
2521   EXPECT_THAT(trailing_metadata(),
2522               ::testing::UnorderedElementsAre(
2523                   // TODO(roth): Should grpc-status be visible here?
2524                   ::testing::Pair("grpc-status", "0"),
2525                   ::testing::Pair("user-agent", ::testing::_),
2526                   ::testing::Pair("foo", "1"), ::testing::Pair("bar", "2"),
2527                   ::testing::Pair("baz", "3")));
2528   EXPECT_FALSE(backend_load_report().has_value());
2529 }
2530 
TEST_F(ClientLbInterceptTrailingMetadataTest,InterceptsRetriesEnabled)2531 TEST_F(ClientLbInterceptTrailingMetadataTest, InterceptsRetriesEnabled) {
2532   const int kNumServers = 1;
2533   const int kNumRpcs = 10;
2534   StartServers(kNumServers);
2535   ChannelArguments args;
2536   args.SetServiceConfigJSON(
2537       "{\n"
2538       "  \"methodConfig\": [ {\n"
2539       "    \"name\": [\n"
2540       "      { \"service\": \"grpc.testing.EchoTestService\" }\n"
2541       "    ],\n"
2542       "    \"retryPolicy\": {\n"
2543       "      \"maxAttempts\": 3,\n"
2544       "      \"initialBackoff\": \"1s\",\n"
2545       "      \"maxBackoff\": \"120s\",\n"
2546       "      \"backoffMultiplier\": 1.6,\n"
2547       "      \"retryableStatusCodes\": [ \"ABORTED\" ]\n"
2548       "    }\n"
2549       "  } ]\n"
2550       "}");
2551   FakeResolverResponseGeneratorWrapper response_generator;
2552   auto channel =
2553       BuildChannel("intercept_trailing_metadata_lb", response_generator, args);
2554   auto stub = BuildStub(channel);
2555   response_generator.SetNextResolution(GetServersPorts());
2556   for (size_t i = 0; i < kNumRpcs; ++i) {
2557     CheckRpcSendOk(DEBUG_LOCATION, stub);
2558   }
2559   // Check LB policy name for the channel.
2560   EXPECT_EQ("intercept_trailing_metadata_lb",
2561             channel->GetLoadBalancingPolicyName());
2562   EXPECT_EQ(kNumRpcs, num_trailers_intercepted());
2563   EXPECT_THAT(trailing_metadata(),
2564               ::testing::UnorderedElementsAre(
2565                   // TODO(roth): Should grpc-status be visible here?
2566                   ::testing::Pair("grpc-status", "0"),
2567                   ::testing::Pair("user-agent", ::testing::_),
2568                   ::testing::Pair("foo", "1"), ::testing::Pair("bar", "2"),
2569                   ::testing::Pair("baz", "3")));
2570   EXPECT_FALSE(backend_load_report().has_value());
2571 }
2572 
TEST_F(ClientLbInterceptTrailingMetadataTest,Valid)2573 TEST_F(ClientLbInterceptTrailingMetadataTest, Valid) {
2574   RunPerRpcMetricReportingTest(OrcaLoadReportBuilder()
2575                                    .SetApplicationUtilization(0.25)
2576                                    .SetCpuUtilization(0.5)
2577                                    .SetMemUtilization(0.75)
2578                                    .SetQps(0.25)
2579                                    .SetEps(0.1)
2580                                    .SetRequestCost("foo", -0.8)
2581                                    .SetRequestCost("bar", 1.4)
2582                                    .SetUtilization("baz", 1.0)
2583                                    .SetUtilization("quux", 0.9)
2584                                    .SetNamedMetrics("metric0", 3.0)
2585                                    .SetNamedMetrics("metric1", -1.0)
2586                                    .Build(),
2587                                OrcaLoadReportBuilder()
2588                                    .SetApplicationUtilization(0.25)
2589                                    .SetCpuUtilization(0.5)
2590                                    .SetMemUtilization(0.75)
2591                                    .SetQps(0.25)
2592                                    .SetEps(0.1)
2593                                    .SetRequestCost("foo", -0.8)
2594                                    .SetRequestCost("bar", 1.4)
2595                                    .SetUtilization("baz", 1.0)
2596                                    .SetUtilization("quux", 0.9)
2597                                    .SetNamedMetrics("metric0", 3.0)
2598                                    .SetNamedMetrics("metric1", -1.0)
2599                                    .Build());
2600 }
2601 
TEST_F(ClientLbInterceptTrailingMetadataTest,NegativeValues)2602 TEST_F(ClientLbInterceptTrailingMetadataTest, NegativeValues) {
2603   RunPerRpcMetricReportingTest(OrcaLoadReportBuilder()
2604                                    .SetApplicationUtilization(-0.3)
2605                                    .SetCpuUtilization(-0.1)
2606                                    .SetMemUtilization(-0.2)
2607                                    .SetQps(-3)
2608                                    .SetEps(-4)
2609                                    .SetRequestCost("foo", -5)
2610                                    .SetUtilization("bar", -0.6)
2611                                    .SetNamedMetrics("baz", -0.7)
2612                                    .Build(),
2613                                OrcaLoadReportBuilder()
2614                                    .SetRequestCost("foo", -5)
2615                                    .SetNamedMetrics("baz", -0.7)
2616                                    .Build());
2617 }
2618 
TEST_F(ClientLbInterceptTrailingMetadataTest,AboveOneUtilization)2619 TEST_F(ClientLbInterceptTrailingMetadataTest, AboveOneUtilization) {
2620   RunPerRpcMetricReportingTest(OrcaLoadReportBuilder()
2621                                    .SetApplicationUtilization(1.9)
2622                                    .SetCpuUtilization(1.1)
2623                                    .SetMemUtilization(2)
2624                                    .SetQps(3)
2625                                    .SetEps(4)
2626                                    .SetUtilization("foo", 5)
2627                                    .Build(),
2628                                OrcaLoadReportBuilder()
2629                                    .SetApplicationUtilization(1.9)
2630                                    .SetCpuUtilization(1.1)
2631                                    .SetQps(3)
2632                                    .SetEps(4)
2633                                    .Build());
2634 }
2635 
TEST_F(ClientLbInterceptTrailingMetadataTest,BackendMetricDataMerge)2636 TEST_F(ClientLbInterceptTrailingMetadataTest, BackendMetricDataMerge) {
2637   const int kNumServers = 1;
2638   const int kNumRpcs = 10;
2639   StartServers(kNumServers);
2640   servers_[0]->server_metric_recorder_->SetApplicationUtilization(0.99);
2641   servers_[0]->server_metric_recorder_->SetCpuUtilization(0.99);
2642   servers_[0]->server_metric_recorder_->SetMemoryUtilization(0.99);
2643   servers_[0]->server_metric_recorder_->SetQps(0.99);
2644   servers_[0]->server_metric_recorder_->SetEps(0.99);
2645   servers_[0]->server_metric_recorder_->SetNamedUtilization("foo", 0.99);
2646   servers_[0]->server_metric_recorder_->SetNamedUtilization("bar", 0.1);
2647   OrcaLoadReport per_server_load = OrcaLoadReportBuilder()
2648                                        .SetApplicationUtilization(0.99)
2649                                        .SetCpuUtilization(0.99)
2650                                        .SetMemUtilization(0.99)
2651                                        .SetQps(0.99)
2652                                        .SetEps(0.99)
2653                                        .SetUtilization("foo", 0.99)
2654                                        .SetUtilization("bar", 0.1)
2655                                        .Build();
2656   FakeResolverResponseGeneratorWrapper response_generator;
2657   auto channel =
2658       BuildChannel("intercept_trailing_metadata_lb", response_generator);
2659   auto stub = BuildStub(channel);
2660   response_generator.SetNextResolution(GetServersPorts());
2661   size_t total_num_rpcs = 0;
2662   {
2663     OrcaLoadReport load_report =
2664         OrcaLoadReportBuilder().SetApplicationUtilization(0.5).Build();
2665     OrcaLoadReport expected = OrcaLoadReportBuilder(per_server_load)
2666                                   .SetApplicationUtilization(0.5)
2667                                   .Build();
2668     for (size_t i = 0; i < kNumRpcs; ++i) {
2669       CheckRpcSendOk(DEBUG_LOCATION, stub, false, &load_report);
2670       auto actual = backend_load_report();
2671       ASSERT_TRUE(actual.has_value());
2672       CheckLoadReportAsExpected(*actual, expected);
2673       ++total_num_rpcs;
2674     }
2675   }
2676   {
2677     OrcaLoadReport load_report =
2678         OrcaLoadReportBuilder().SetMemUtilization(0.5).Build();
2679     OrcaLoadReport expected =
2680         OrcaLoadReportBuilder(per_server_load).SetMemUtilization(0.5).Build();
2681     for (size_t i = 0; i < kNumRpcs; ++i) {
2682       CheckRpcSendOk(DEBUG_LOCATION, stub, false, &load_report);
2683       auto actual = backend_load_report();
2684       ASSERT_TRUE(actual.has_value());
2685       CheckLoadReportAsExpected(*actual, expected);
2686       ++total_num_rpcs;
2687     }
2688   }
2689   {
2690     OrcaLoadReport load_report = OrcaLoadReportBuilder().SetQps(0.5).Build();
2691     OrcaLoadReport expected =
2692         OrcaLoadReportBuilder(per_server_load).SetQps(0.5).Build();
2693     for (size_t i = 0; i < kNumRpcs; ++i) {
2694       CheckRpcSendOk(DEBUG_LOCATION, stub, false, &load_report);
2695       auto actual = backend_load_report();
2696       ASSERT_TRUE(actual.has_value());
2697       CheckLoadReportAsExpected(*actual, expected);
2698       ++total_num_rpcs;
2699     }
2700   }
2701   {
2702     OrcaLoadReport load_report = OrcaLoadReportBuilder().SetEps(0.5).Build();
2703     OrcaLoadReport expected =
2704         OrcaLoadReportBuilder(per_server_load).SetEps(0.5).Build();
2705     for (size_t i = 0; i < kNumRpcs; ++i) {
2706       CheckRpcSendOk(DEBUG_LOCATION, stub, false, &load_report);
2707       auto actual = backend_load_report();
2708       ASSERT_TRUE(actual.has_value());
2709       CheckLoadReportAsExpected(*actual, expected);
2710       ++total_num_rpcs;
2711     }
2712   }
2713   {
2714     OrcaLoadReport load_report =
2715         OrcaLoadReportBuilder()
2716             .SetUtilization("foo", 0.5)
2717             .SetUtilization("bar", 1.1)  // Out of range.
2718             .SetUtilization("baz", 1.0)
2719             .Build();
2720     auto expected = OrcaLoadReportBuilder(per_server_load)
2721                         .SetUtilization("foo", 0.5)
2722                         .SetUtilization("baz", 1.0)
2723                         .Build();
2724     for (size_t i = 0; i < kNumRpcs; ++i) {
2725       CheckRpcSendOk(DEBUG_LOCATION, stub, false, &load_report);
2726       auto actual = backend_load_report();
2727       ASSERT_TRUE(actual.has_value());
2728       CheckLoadReportAsExpected(*actual, expected);
2729       ++total_num_rpcs;
2730     }
2731   }
2732   // Check LB policy name for the channel.
2733   EXPECT_EQ("intercept_trailing_metadata_lb",
2734             channel->GetLoadBalancingPolicyName());
2735   EXPECT_EQ(total_num_rpcs, num_trailers_intercepted());
2736 }
2737 
2738 //
2739 // tests that address args from the resolver are visible to the LB policy
2740 //
2741 
2742 class ClientLbAddressTest : public ClientLbEnd2endTest {
2743  protected:
SetUp()2744   void SetUp() override {
2745     ClientLbEnd2endTest::SetUp();
2746     current_test_instance_ = this;
2747   }
2748 
SetUpTestSuite()2749   static void SetUpTestSuite() {
2750     grpc_core::CoreConfiguration::Reset();
2751     grpc_core::CoreConfiguration::RegisterBuilder(
2752         [](grpc_core::CoreConfiguration::Builder* builder) {
2753           grpc_core::RegisterAddressTestLoadBalancingPolicy(builder,
2754                                                             SaveAddress);
2755         });
2756     grpc_init();
2757   }
2758 
TearDownTestSuite()2759   static void TearDownTestSuite() {
2760     grpc_shutdown();
2761     grpc_core::CoreConfiguration::Reset();
2762   }
2763 
addresses_seen()2764   const std::vector<std::string>& addresses_seen() {
2765     grpc_core::MutexLock lock(&mu_);
2766     return addresses_seen_;
2767   }
2768 
2769  private:
SaveAddress(const grpc_core::EndpointAddresses & address)2770   static void SaveAddress(const grpc_core::EndpointAddresses& address) {
2771     ClientLbAddressTest* self = current_test_instance_;
2772     grpc_core::MutexLock lock(&self->mu_);
2773     self->addresses_seen_.emplace_back(address.ToString());
2774   }
2775 
2776   static ClientLbAddressTest* current_test_instance_;
2777   grpc_core::Mutex mu_;
2778   std::vector<std::string> addresses_seen_;
2779 };
2780 
2781 ClientLbAddressTest* ClientLbAddressTest::current_test_instance_ = nullptr;
2782 
TEST_F(ClientLbAddressTest,Basic)2783 TEST_F(ClientLbAddressTest, Basic) {
2784   const int kNumServers = 1;
2785   StartServers(kNumServers);
2786   FakeResolverResponseGeneratorWrapper response_generator;
2787   auto channel = BuildChannel("address_test_lb", response_generator);
2788   auto stub = BuildStub(channel);
2789   // Addresses returned by the resolver will have attached args.
2790   response_generator.SetNextResolution(
2791       GetServersPorts(), nullptr,
2792       grpc_core::ChannelArgs().Set("test_key", "test_value"));
2793   CheckRpcSendOk(DEBUG_LOCATION, stub);
2794   // Check LB policy name for the channel.
2795   EXPECT_EQ("address_test_lb", channel->GetLoadBalancingPolicyName());
2796   // Make sure that the attributes wind up on the subchannels.
2797   std::vector<std::string> expected;
2798   for (const int port : GetServersPorts()) {
2799     expected.emplace_back(absl::StrCat("addrs=[", grpc_core::LocalIp(), ":",
2800                                        port, "] args={test_key=test_value}"));
2801   }
2802   EXPECT_EQ(addresses_seen(), expected);
2803 }
2804 
2805 //
2806 // tests OOB backend metric API
2807 //
2808 
2809 class OobBackendMetricTest : public ClientLbEnd2endTest {
2810  protected:
2811   using BackendMetricReport = std::pair<int /*port*/, OrcaLoadReport>;
2812 
SetUp()2813   void SetUp() override {
2814     ClientLbEnd2endTest::SetUp();
2815     current_test_instance_ = this;
2816   }
2817 
SetUpTestSuite()2818   static void SetUpTestSuite() {
2819     grpc_core::CoreConfiguration::Reset();
2820     grpc_core::CoreConfiguration::RegisterBuilder(
2821         [](grpc_core::CoreConfiguration::Builder* builder) {
2822           grpc_core::RegisterOobBackendMetricTestLoadBalancingPolicy(
2823               builder, BackendMetricCallback);
2824         });
2825     grpc_init();
2826   }
2827 
TearDownTestSuite()2828   static void TearDownTestSuite() {
2829     grpc_shutdown();
2830     grpc_core::CoreConfiguration::Reset();
2831   }
2832 
GetBackendMetricReport()2833   absl::optional<BackendMetricReport> GetBackendMetricReport() {
2834     grpc_core::MutexLock lock(&mu_);
2835     if (backend_metric_reports_.empty()) return absl::nullopt;
2836     auto result = std::move(backend_metric_reports_.front());
2837     backend_metric_reports_.pop_front();
2838     return result;
2839   }
2840 
2841  private:
BackendMetricCallback(const grpc_core::EndpointAddresses & address,const grpc_core::BackendMetricData & backend_metric_data)2842   static void BackendMetricCallback(
2843       const grpc_core::EndpointAddresses& address,
2844       const grpc_core::BackendMetricData& backend_metric_data) {
2845     auto load_report = BackendMetricDataToOrcaLoadReport(backend_metric_data);
2846     int port = grpc_sockaddr_get_port(&address.address());
2847     grpc_core::MutexLock lock(&current_test_instance_->mu_);
2848     current_test_instance_->backend_metric_reports_.push_back(
2849         {port, std::move(load_report)});
2850   }
2851 
2852   static OobBackendMetricTest* current_test_instance_;
2853   grpc_core::Mutex mu_;
2854   std::deque<BackendMetricReport> backend_metric_reports_ ABSL_GUARDED_BY(&mu_);
2855 };
2856 
2857 OobBackendMetricTest* OobBackendMetricTest::current_test_instance_ = nullptr;
2858 
TEST_F(OobBackendMetricTest,Basic)2859 TEST_F(OobBackendMetricTest, Basic) {
2860   StartServers(1);
2861   // Set initial backend metric data on server.
2862   constexpr char kMetricName[] = "foo";
2863   servers_[0]->server_metric_recorder_->SetApplicationUtilization(0.5);
2864   servers_[0]->server_metric_recorder_->SetCpuUtilization(0.1);
2865   servers_[0]->server_metric_recorder_->SetMemoryUtilization(0.2);
2866   servers_[0]->server_metric_recorder_->SetEps(0.3);
2867   servers_[0]->server_metric_recorder_->SetQps(0.4);
2868   servers_[0]->server_metric_recorder_->SetNamedUtilization(kMetricName, 0.4);
2869   // Start client.
2870   FakeResolverResponseGeneratorWrapper response_generator;
2871   auto channel = BuildChannel("oob_backend_metric_test_lb", response_generator);
2872   auto stub = BuildStub(channel);
2873   response_generator.SetNextResolution(GetServersPorts());
2874   // Send an OK RPC.
2875   CheckRpcSendOk(DEBUG_LOCATION, stub);
2876   // Check LB policy name for the channel.
2877   EXPECT_EQ("oob_backend_metric_test_lb",
2878             channel->GetLoadBalancingPolicyName());
2879   // Check report seen by client.
2880   bool report_seen = false;
2881   for (size_t i = 0; i < 5; ++i) {
2882     auto report = GetBackendMetricReport();
2883     if (report.has_value()) {
2884       EXPECT_EQ(report->first, servers_[0]->port_);
2885       EXPECT_EQ(report->second.application_utilization(), 0.5);
2886       EXPECT_EQ(report->second.cpu_utilization(), 0.1);
2887       EXPECT_EQ(report->second.mem_utilization(), 0.2);
2888       EXPECT_EQ(report->second.eps(), 0.3);
2889       EXPECT_EQ(report->second.rps_fractional(), 0.4);
2890       EXPECT_THAT(
2891           report->second.utilization(),
2892           ::testing::UnorderedElementsAre(::testing::Pair(kMetricName, 0.4)));
2893       report_seen = true;
2894       break;
2895     }
2896     gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));
2897   }
2898   ASSERT_TRUE(report_seen);
2899   // Now update the utilization data on the server.
2900   // Note that the server may send a new report while we're updating these,
2901   // so we set them in reverse order, so that we know we'll get all new
2902   // data once we see a report with the new app utilization value.
2903   servers_[0]->server_metric_recorder_->SetNamedUtilization(kMetricName, 0.7);
2904   servers_[0]->server_metric_recorder_->SetQps(0.8);
2905   servers_[0]->server_metric_recorder_->SetEps(0.6);
2906   servers_[0]->server_metric_recorder_->SetMemoryUtilization(0.5);
2907   servers_[0]->server_metric_recorder_->SetCpuUtilization(2.4);
2908   servers_[0]->server_metric_recorder_->SetApplicationUtilization(1.2);
2909   // Wait for client to see new report.
2910   report_seen = false;
2911   for (size_t i = 0; i < 5; ++i) {
2912     auto report = GetBackendMetricReport();
2913     if (report.has_value()) {
2914       EXPECT_EQ(report->first, servers_[0]->port_);
2915       if (report->second.application_utilization() != 0.5) {
2916         EXPECT_EQ(report->second.application_utilization(), 1.2);
2917         EXPECT_EQ(report->second.cpu_utilization(), 2.4);
2918         EXPECT_EQ(report->second.mem_utilization(), 0.5);
2919         EXPECT_EQ(report->second.eps(), 0.6);
2920         EXPECT_EQ(report->second.rps_fractional(), 0.8);
2921         EXPECT_THAT(
2922             report->second.utilization(),
2923             ::testing::UnorderedElementsAre(::testing::Pair(kMetricName, 0.7)));
2924         report_seen = true;
2925         break;
2926       }
2927     }
2928     gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));
2929   }
2930   ASSERT_TRUE(report_seen);
2931 }
2932 
2933 //
2934 // tests rewriting of control plane status codes
2935 //
2936 
2937 class ControlPlaneStatusRewritingTest : public ClientLbEnd2endTest {
2938  protected:
SetUpTestSuite()2939   static void SetUpTestSuite() {
2940     grpc_core::CoreConfiguration::Reset();
2941     grpc_core::CoreConfiguration::RegisterBuilder(
2942         [](grpc_core::CoreConfiguration::Builder* builder) {
2943           grpc_core::RegisterFailLoadBalancingPolicy(
2944               builder, absl::AbortedError("nope"));
2945         });
2946     grpc_init();
2947   }
2948 
TearDownTestSuite()2949   static void TearDownTestSuite() {
2950     grpc_shutdown();
2951     grpc_core::CoreConfiguration::Reset();
2952   }
2953 };
2954 
TEST_F(ControlPlaneStatusRewritingTest,RewritesFromLb)2955 TEST_F(ControlPlaneStatusRewritingTest, RewritesFromLb) {
2956   // Start client.
2957   FakeResolverResponseGeneratorWrapper response_generator;
2958   auto channel = BuildChannel("fail_lb", response_generator);
2959   auto stub = BuildStub(channel);
2960   response_generator.SetNextResolution(GetServersPorts());
2961   // Send an RPC, verify that status was rewritten.
2962   CheckRpcSendFailure(
2963       DEBUG_LOCATION, stub, StatusCode::INTERNAL,
2964       "Illegal status code from LB pick; original status: ABORTED: nope");
2965 }
2966 
TEST_F(ControlPlaneStatusRewritingTest,RewritesFromResolver)2967 TEST_F(ControlPlaneStatusRewritingTest, RewritesFromResolver) {
2968   // Start client.
2969   FakeResolverResponseGeneratorWrapper response_generator;
2970   auto channel = BuildChannel("pick_first", response_generator);
2971   auto stub = BuildStub(channel);
2972   grpc_core::Resolver::Result result;
2973   result.service_config = absl::AbortedError("nope");
2974   result.addresses.emplace();
2975   response_generator.SetResponse(std::move(result));
2976   // Send an RPC, verify that status was rewritten.
2977   CheckRpcSendFailure(
2978       DEBUG_LOCATION, stub, StatusCode::INTERNAL,
2979       "Illegal status code from resolver; original status: ABORTED: nope");
2980 }
2981 
TEST_F(ControlPlaneStatusRewritingTest,RewritesFromConfigSelector)2982 TEST_F(ControlPlaneStatusRewritingTest, RewritesFromConfigSelector) {
2983   class FailConfigSelector : public grpc_core::ConfigSelector {
2984    public:
2985     explicit FailConfigSelector(absl::Status status)
2986         : status_(std::move(status)) {}
2987     const char* name() const override { return "FailConfigSelector"; }
2988     bool Equals(const ConfigSelector* other) const override {
2989       return status_ == static_cast<const FailConfigSelector*>(other)->status_;
2990     }
2991     absl::Status GetCallConfig(GetCallConfigArgs /*args*/) override {
2992       return status_;
2993     }
2994 
2995    private:
2996     absl::Status status_;
2997   };
2998   // Start client.
2999   FakeResolverResponseGeneratorWrapper response_generator;
3000   auto channel = BuildChannel("pick_first", response_generator);
3001   auto stub = BuildStub(channel);
3002   auto config_selector =
3003       grpc_core::MakeRefCounted<FailConfigSelector>(absl::AbortedError("nope"));
3004   grpc_core::Resolver::Result result;
3005   result.addresses.emplace();
3006   result.service_config =
3007       grpc_core::ServiceConfigImpl::Create(grpc_core::ChannelArgs(), "{}");
3008   ASSERT_TRUE(result.service_config.ok()) << result.service_config.status();
3009   result.args = grpc_core::ChannelArgs().SetObject(config_selector);
3010   response_generator.SetResponse(std::move(result));
3011   // Send an RPC, verify that status was rewritten.
3012   CheckRpcSendFailure(
3013       DEBUG_LOCATION, stub, StatusCode::INTERNAL,
3014       "Illegal status code from ConfigSelector; original status: "
3015       "ABORTED: nope");
3016 }
3017 
3018 //
3019 // WeightedRoundRobinTest
3020 //
3021 
3022 const char kServiceConfigPerCall[] =
3023     "{\n"
3024     "  \"loadBalancingConfig\": [\n"
3025     "    {\"weighted_round_robin\": {\n"
3026     "      \"blackoutPeriod\": \"0s\",\n"
3027     "      \"weightUpdatePeriod\": \"0.1s\"\n"
3028     "    }}\n"
3029     "  ]\n"
3030     "}";
3031 
3032 const char kServiceConfigOob[] =
3033     "{\n"
3034     "  \"loadBalancingConfig\": [\n"
3035     "    {\"weighted_round_robin\": {\n"
3036     "      \"blackoutPeriod\": \"0s\",\n"
3037     "      \"weightUpdatePeriod\": \"0.1s\",\n"
3038     "      \"enableOobLoadReport\": true\n"
3039     "    }}\n"
3040     "  ]\n"
3041     "}";
3042 
3043 const char kServiceConfigWithOutlierDetection[] =
3044     "{\n"
3045     "  \"loadBalancingConfig\": [\n"
3046     "    {\"outlier_detection_experimental\": {\n"
3047     "      \"childPolicy\": [\n"
3048     "        {\"weighted_round_robin\": {\n"
3049     "          \"blackoutPeriod\": \"%ds\",\n"
3050     "          \"weightUpdatePeriod\": \"0.1s\"\n"
3051     "        }}\n"
3052     "      ]\n"
3053     "    }}\n"
3054     "  ]\n"
3055     "}";
3056 
3057 class WeightedRoundRobinTest : public ClientLbEnd2endTest {
3058  protected:
ExpectWeightedRoundRobinPicks(const grpc_core::DebugLocation & location,const std::unique_ptr<grpc::testing::EchoTestService::Stub> & stub,const std::vector<size_t> & expected_weights,size_t total_passes=3,EchoRequest * request_ptr=nullptr,int timeout_ms=15000)3059   void ExpectWeightedRoundRobinPicks(
3060       const grpc_core::DebugLocation& location,
3061       const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
3062       const std::vector<size_t>& expected_weights, size_t total_passes = 3,
3063       EchoRequest* request_ptr = nullptr, int timeout_ms = 15000) {
3064     GPR_ASSERT(expected_weights.size() == servers_.size());
3065     size_t total_picks_per_pass = 0;
3066     for (size_t picks : expected_weights) {
3067       total_picks_per_pass += picks;
3068     }
3069     size_t num_picks = 0;
3070     size_t num_passes = 0;
3071     SendRpcsUntil(
3072         location, stub,
3073         [&](const Status&) {
3074           if (++num_picks == total_picks_per_pass) {
3075             bool match = true;
3076             for (size_t i = 0; i < expected_weights.size(); ++i) {
3077               if (servers_[i]->service_.request_count() !=
3078                   expected_weights[i]) {
3079                 match = false;
3080                 break;
3081               }
3082             }
3083             if (match) {
3084               if (++num_passes == total_passes) return false;
3085             } else {
3086               num_passes = 0;
3087             }
3088             num_picks = 0;
3089             ResetCounters();
3090           }
3091           return true;
3092         },
3093         request_ptr, timeout_ms);
3094   }
3095 };
3096 
TEST_F(WeightedRoundRobinTest,CallAndServerMetric)3097 TEST_F(WeightedRoundRobinTest, CallAndServerMetric) {
3098   const int kNumServers = 3;
3099   StartServers(kNumServers);
3100   // Report server metrics that should give 6:4:3 WRR picks.
3101   // weights = qps / (util + (eps/qps)) =
3102   //   1/(0.2+0.2) : 1/(0.3+0.3) : 2/(1.5+0.1) = 6:4:3
3103   // where util is app_util if set, or cpu_util.
3104   servers_[0]->server_metric_recorder_->SetApplicationUtilization(0.2);
3105   servers_[0]->server_metric_recorder_->SetEps(20);
3106   servers_[0]->server_metric_recorder_->SetQps(100);
3107   servers_[1]->server_metric_recorder_->SetApplicationUtilization(0.3);
3108   servers_[1]->server_metric_recorder_->SetEps(30);
3109   servers_[1]->server_metric_recorder_->SetQps(100);
3110   servers_[2]->server_metric_recorder_->SetApplicationUtilization(1.5);
3111   servers_[2]->server_metric_recorder_->SetEps(20);
3112   servers_[2]->server_metric_recorder_->SetQps(200);
3113   // Create channel.
3114   FakeResolverResponseGeneratorWrapper response_generator;
3115   auto channel = BuildChannel("", response_generator);
3116   auto stub = BuildStub(channel);
3117   response_generator.SetNextResolution(GetServersPorts(),
3118                                        kServiceConfigPerCall);
3119   // Send requests with per-call reported EPS/QPS set to 0/100.
3120   // This should give 1/2:1/3:1/15 = 15:10:2 WRR picks.
3121   EchoRequest request;
3122   // We cannot override with 0 with proto3, so setting it to almost 0.
3123   request.mutable_param()->mutable_backend_metrics()->set_eps(
3124       std::numeric_limits<double>::min());
3125   request.mutable_param()->mutable_backend_metrics()->set_rps_fractional(100);
3126   ExpectWeightedRoundRobinPicks(DEBUG_LOCATION, stub,
3127                                 /*expected_weights=*/{15, 10, 2},
3128                                 /*total_passes=*/3, &request);
3129   // Now send requests without per-call reported QPS.
3130   // This should change WRR picks back to 6:4:3.
3131   ExpectWeightedRoundRobinPicks(DEBUG_LOCATION, stub,
3132                                 /*expected_weights=*/{6, 4, 3});
3133   // Check LB policy name for the channel.
3134   EXPECT_EQ("weighted_round_robin", channel->GetLoadBalancingPolicyName());
3135 }
3136 
3137 // This tests a bug seen in production where the outlier_detection
3138 // policy would incorrectly generate a duplicate READY notification on
3139 // all of its subchannels every time it saw an update, thus causing the
3140 // WRR policy to re-enter the blackout period for that address.
TEST_F(WeightedRoundRobinTest,WithOutlierDetection)3141 TEST_F(WeightedRoundRobinTest, WithOutlierDetection) {
3142   const int kBlackoutPeriodSeconds = 5;
3143   const int kNumServers = 3;
3144   StartServers(kNumServers);
3145   // Report server metrics that should give 6:4:3 WRR picks.
3146   // weights = qps / (util + (eps/qps)) =
3147   //   1/(0.2+0.2) : 1/(0.3+0.3) : 2/(1.5+0.1) = 6:4:3
3148   // where util is app_util if set, or cpu_util.
3149   servers_[0]->server_metric_recorder_->SetApplicationUtilization(0.2);
3150   servers_[0]->server_metric_recorder_->SetEps(20);
3151   servers_[0]->server_metric_recorder_->SetQps(100);
3152   servers_[1]->server_metric_recorder_->SetApplicationUtilization(0.3);
3153   servers_[1]->server_metric_recorder_->SetEps(30);
3154   servers_[1]->server_metric_recorder_->SetQps(100);
3155   servers_[2]->server_metric_recorder_->SetApplicationUtilization(1.5);
3156   servers_[2]->server_metric_recorder_->SetEps(20);
3157   servers_[2]->server_metric_recorder_->SetQps(200);
3158   // Create channel.
3159   // Initial blackout period is 0, so that we start seeing traffic in
3160   // the right proportions right away.
3161   FakeResolverResponseGeneratorWrapper response_generator;
3162   auto channel = BuildChannel("", response_generator);
3163   auto stub = BuildStub(channel);
3164   response_generator.SetNextResolution(
3165       GetServersPorts(),
3166       absl::StrFormat(kServiceConfigWithOutlierDetection, 0).c_str());
3167   // Send requests with per-call reported EPS/QPS set to 0/100.
3168   // This should give 1/2:1/3:1/15 = 15:10:2 WRR picks.
3169   // Keep sending RPCs long enough to go past the new blackout period
3170   // that we're going to add later.
3171   absl::Time deadline =
3172       absl::Now() +
3173       absl::Seconds(kBlackoutPeriodSeconds * grpc_test_slowdown_factor());
3174   EchoRequest request;
3175   // We cannot override with 0 with proto3, so setting it to almost 0.
3176   request.mutable_param()->mutable_backend_metrics()->set_eps(
3177       std::numeric_limits<double>::min());
3178   request.mutable_param()->mutable_backend_metrics()->set_rps_fractional(100);
3179   do {
3180     ExpectWeightedRoundRobinPicks(DEBUG_LOCATION, stub,
3181                                   /*expected_weights=*/{15, 10, 2},
3182                                   /*total_passes=*/3, &request);
3183   } while (absl::Now() < deadline);
3184   // Send a new resolver response that increases blackout period.
3185   response_generator.SetNextResolution(
3186       GetServersPorts(),
3187       absl::StrFormat(kServiceConfigWithOutlierDetection,
3188                       kBlackoutPeriodSeconds * grpc_test_slowdown_factor())
3189           .c_str());
3190   // Weights should be the same before the blackout period expires.
3191   ExpectWeightedRoundRobinPicks(
3192       DEBUG_LOCATION, stub, /*expected_weights=*/{15, 10, 2},
3193       /*total_passes=*/3, &request,
3194       /*timeout_ms=*/(kBlackoutPeriodSeconds - 1) * 1000);
3195 }
3196 
3197 class WeightedRoundRobinParamTest
3198     : public WeightedRoundRobinTest,
3199       public ::testing::WithParamInterface<const char*> {};
3200 
3201 INSTANTIATE_TEST_SUITE_P(WeightedRoundRobin, WeightedRoundRobinParamTest,
3202                          ::testing::Values(kServiceConfigPerCall,
3203                                            kServiceConfigOob));
3204 
TEST_P(WeightedRoundRobinParamTest,Basic)3205 TEST_P(WeightedRoundRobinParamTest, Basic) {
3206   const int kNumServers = 3;
3207   StartServers(kNumServers);
3208   // Report server metrics that should give 1:2:4 WRR picks.
3209   // weights = qps / (util + (eps/qps)) =
3210   //   1/(0.4+0.4) : 1/(0.2+0.2) : 2/(0.3+0.1) = 1:2:4
3211   // where util is app_util if set, or cpu_util.
3212   servers_[0]->server_metric_recorder_->SetApplicationUtilization(0.4);
3213   servers_[0]->server_metric_recorder_->SetEps(40);
3214   servers_[0]->server_metric_recorder_->SetQps(100);
3215   servers_[1]->server_metric_recorder_->SetApplicationUtilization(0.2);
3216   servers_[1]->server_metric_recorder_->SetEps(20);
3217   servers_[1]->server_metric_recorder_->SetQps(100);
3218   servers_[2]->server_metric_recorder_->SetApplicationUtilization(0.3);
3219   servers_[2]->server_metric_recorder_->SetEps(5);
3220   servers_[2]->server_metric_recorder_->SetQps(200);
3221   // Create channel.
3222   FakeResolverResponseGeneratorWrapper response_generator;
3223   auto channel = BuildChannel("", response_generator);
3224   auto stub = BuildStub(channel);
3225   response_generator.SetNextResolution(GetServersPorts(), GetParam());
3226   // Wait for the right set of WRR picks.
3227   ExpectWeightedRoundRobinPicks(DEBUG_LOCATION, stub,
3228                                 /*expected_weights=*/{1, 2, 4});
3229   // Check LB policy name for the channel.
3230   EXPECT_EQ("weighted_round_robin", channel->GetLoadBalancingPolicyName());
3231 }
3232 
3233 }  // namespace
3234 }  // namespace testing
3235 }  // namespace grpc
3236 
main(int argc,char ** argv)3237 int main(int argc, char** argv) {
3238   ::testing::InitGoogleTest(&argc, argv);
3239   grpc::testing::TestEnvironment env(&argc, argv);
3240   // Make the backup poller poll very frequently in order to pick up
3241   // updates from all the subchannels's FDs.
3242   grpc_core::ConfigVars::Overrides overrides;
3243   overrides.client_channel_backup_poll_interval_ms = 1;
3244   grpc_core::ConfigVars::SetOverrides(overrides);
3245 #if TARGET_OS_IPHONE
3246   // Workaround Apple CFStream bug
3247   grpc_core::SetEnv("grpc_cfstream", "0");
3248 #endif
3249   grpc_init();
3250   grpc::testing::ConnectionAttemptInjector::Init();
3251   const auto result = RUN_ALL_TESTS();
3252   grpc_shutdown();
3253   return result;
3254 }
3255