xref: /aosp_15_r20/external/grpc-grpc/test/cpp/end2end/resource_quota_end2end_stress_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <memory>
16 #include <string>
17 #include <thread>
18 #include <utility>
19 #include <vector>
20 
21 #include <gtest/gtest.h>
22 
23 #include "absl/status/statusor.h"
24 #include "absl/strings/str_cat.h"
25 #include "absl/time/time.h"
26 
27 #include <grpc/grpc.h>
28 #include <grpc/support/time.h>
29 #include <grpcpp/client_context.h>
30 #include <grpcpp/grpcpp.h>
31 #include <grpcpp/support/server_callback.h>
32 
33 #include "src/core/lib/event_engine/tcp_socket_utils.h"
34 #include "src/core/lib/experiments/config.h"
35 #include "src/core/lib/gprpp/notification.h"
36 #include "src/core/lib/security/credentials/fake/fake_credentials.h"
37 #include "src/cpp/server/secure_server_credentials.h"
38 #include "src/proto/grpc/testing/echo.grpc.pb.h"
39 #include "test/core/util/port.h"
40 #include "test/core/util/test_config.h"
41 #include "test/cpp/util/credentials.h"
42 
43 // IWYU pragma: no_include <sys/socket.h>
44 
45 // A stress test which spins up a server with a small configured resource quota
46 // value. It then creates many channels which exchange large payloads with the
47 // server. This would drive the server to reach resource quota limits and
48 // trigger reclamation.
49 
50 namespace grpc {
51 namespace testing {
52 namespace {
53 constexpr int kResourceQuotaSizeBytes = 1024 * 1024;
54 constexpr int kPayloadSizeBytes = 1024 * 1024;
55 constexpr int kNumParallelChannels = 10;
56 }  // namespace
57 
58 class EchoClientUnaryReactor : public grpc::ClientUnaryReactor {
59  public:
EchoClientUnaryReactor(ClientContext * ctx,EchoTestService::Stub * stub,const std::string payload,Status * status)60   EchoClientUnaryReactor(ClientContext* ctx, EchoTestService::Stub* stub,
61                          const std::string payload, Status* status)
62       : ctx_(ctx), payload_(payload), status_(status) {
63     request_.set_message(payload);
64     stub->async()->Echo(ctx_, &request_, &response_, this);
65     StartCall();
66   }
67 
Await()68   void Await() { notification_.WaitForNotification(); }
69 
70  protected:
OnReadInitialMetadataDone(bool)71   void OnReadInitialMetadataDone(bool /*ok*/) override {}
72 
OnDone(const Status & s)73   void OnDone(const Status& s) override {
74     *status_ = s;
75     notification_.Notify();
76   }
77 
78  private:
79   ClientContext* const ctx_;
80   EchoRequest request_;
81   EchoResponse response_;
82   const std::string payload_;
83   grpc_core::Notification notification_;
84   Status* const status_;
85 };
86 
87 class EchoServerUnaryReactor : public ServerUnaryReactor {
88  public:
EchoServerUnaryReactor(CallbackServerContext *,const EchoRequest * request,EchoResponse * response)89   EchoServerUnaryReactor(CallbackServerContext* /*ctx*/,
90                          const EchoRequest* request, EchoResponse* response) {
91     response->set_message(request->message());
92     Finish(grpc::Status::OK);
93   }
94 
95  private:
OnDone()96   void OnDone() override { delete this; }
97 };
98 
99 class GrpcCallbackServiceImpl : public EchoTestService::CallbackService {
100  public:
Echo(CallbackServerContext * context,const EchoRequest * request,EchoResponse * response)101   ServerUnaryReactor* Echo(CallbackServerContext* context,
102                            const EchoRequest* request,
103                            EchoResponse* response) override {
104     return new EchoServerUnaryReactor(context, request, response);
105   }
106 };
107 
108 class End2EndResourceQuotaUnaryTest : public ::testing::Test {
109  protected:
End2EndResourceQuotaUnaryTest()110   End2EndResourceQuotaUnaryTest() {
111     int port = grpc_pick_unused_port_or_die();
112     server_address_ = absl::StrCat("localhost:", port);
113     payload_ = std::string(kPayloadSizeBytes, 'a');
114     ServerBuilder builder;
115     builder.AddListeningPort(server_address_, InsecureServerCredentials());
116     builder.SetResourceQuota(
117         grpc::ResourceQuota("TestService").Resize(kResourceQuotaSizeBytes));
118     builder.RegisterService(&grpc_service_);
119     server_ = builder.BuildAndStart();
120   }
121 
~End2EndResourceQuotaUnaryTest()122   ~End2EndResourceQuotaUnaryTest() override { server_->Shutdown(); }
123 
MakeGrpcCall()124   void MakeGrpcCall() {
125     ClientContext ctx;
126     Status status;
127     auto stub = EchoTestService::NewStub(
128         CreateChannel(server_address_, grpc::InsecureChannelCredentials()));
129     ctx.set_wait_for_ready(true);
130     EchoClientUnaryReactor reactor(&ctx, stub.get(), payload_, &status);
131     reactor.Await();
132   }
133 
MakeGrpcCalls()134   void MakeGrpcCalls() {
135     std::vector<std::thread> workers;
136     workers.reserve(kNumParallelChannels);
137     // Run MakeGrpcCall() many times concurrently.
138     for (int i = 0; i < kNumParallelChannels; ++i) {
139       workers.emplace_back([this]() { MakeGrpcCall(); });
140     }
141     for (int i = 0; i < kNumParallelChannels; ++i) {
142       workers[i].join();
143     }
144   }
145 
146   int port_;
147   std::unique_ptr<Server> server_;
148   string server_address_;
149   GrpcCallbackServiceImpl grpc_service_;
150   std::string payload_;
151 };
152 
TEST_F(End2EndResourceQuotaUnaryTest,MultipleUnaryRPCTest)153 TEST_F(End2EndResourceQuotaUnaryTest, MultipleUnaryRPCTest) { MakeGrpcCalls(); }
154 
155 class End2EndConnectionQuotaTest : public ::testing::TestWithParam<int> {
156  protected:
End2EndConnectionQuotaTest()157   End2EndConnectionQuotaTest() {
158     port_ = grpc_pick_unused_port_or_die();
159     server_address_ = absl::StrCat("[::]:", port_);
160     connect_address_ = absl::StrCat("ipv6:[::1]:", port_);
161     payload_ = std::string(kPayloadSizeBytes, 'a');
162     ServerBuilder builder;
163     builder.AddListeningPort(
164         server_address_,
165         std::make_shared<SecureServerCredentials>(
166             grpc_fake_transport_security_server_credentials_create()));
167     builder.AddChannelArgument(GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS, 1000);
168     builder.AddChannelArgument(GRPC_ARG_MAX_ALLOWED_INCOMING_CONNECTIONS,
169                                GetParam());
170     builder.AddChannelArgument(
171         GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 10000);
172     builder.RegisterService(&grpc_service_);
173     server_ = builder.BuildAndStart();
174   }
175 
~End2EndConnectionQuotaTest()176   ~End2EndConnectionQuotaTest() override { server_->Shutdown(); }
177 
CreateGrpcChannelStub()178   std::unique_ptr<EchoTestService::Stub> CreateGrpcChannelStub() {
179     grpc::ChannelArguments args;
180     args.SetInt(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL, 1);
181     args.SetInt(GRPC_ARG_ENABLE_RETRIES, 0);
182     args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, 20000);
183     args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 10000);
184     args.SetInt(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS, 15000);
185     args.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
186 
187     return EchoTestService::NewStub(CreateCustomChannel(
188         connect_address_,
189         std::make_shared<FakeTransportSecurityChannelCredentials>(), args));
190   }
191 
TestExceedingConnectionQuota()192   void TestExceedingConnectionQuota() {
193     const int kNumConnections = 2 * GetParam();
194 #ifdef GPR_LINUX
195     // On linux systems create 2 * NumConnection tcp connections which don't
196     // do anything and verify that they get closed after
197     // GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS seconds.
198     auto connect_address_resolved =
199         grpc_event_engine::experimental::URIToResolvedAddress(connect_address_);
200     std::vector<std::thread> workers;
201     workers.reserve(kNumConnections);
202     for (int i = 0; i < kNumConnections; ++i) {
203       workers.emplace_back([connect_address_resolved]() {
204         int client_fd;
205         int one = 1;
206         char buf[1024];
207         client_fd = socket(AF_INET6, SOCK_STREAM, 0);
208         setsockopt(client_fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
209         // Connection should succeed.
210         EXPECT_EQ(connect(client_fd,
211                           const_cast<struct sockaddr*>(
212                               connect_address_resolved->address()),
213                           connect_address_resolved->size()),
214                   0);
215         // recv should not block forever and it should return because
216         // GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS is set and the server should
217         // close this connections after that timeout expires.
218         while (recv(client_fd, buf, 1024, 0) > 0) {
219         }
220         close(client_fd);
221       });
222     }
223     for (int i = 0; i < kNumConnections; ++i) {
224       workers[i].join();
225     }
226 #endif
227     // Subsequent kNumConnections / 2 RPCs should succeed because the previously
228     // spawned client connections have been closed.
229     std::vector<std::unique_ptr<EchoTestService::Stub>> stubs;
230     stubs.reserve(kNumConnections);
231     for (int i = 0; i < kNumConnections; i++) {
232       stubs.push_back(CreateGrpcChannelStub());
233     }
234     for (int i = 0; i < kNumConnections; ++i) {
235       ClientContext ctx;
236       Status status;
237       ctx.set_wait_for_ready(false);
238       EchoClientUnaryReactor reactor(&ctx, stubs[i].get(), payload_, &status);
239       reactor.Await();
240       // The first half RPCs should succeed.
241       if (i < kNumConnections / 2) {
242         EXPECT_TRUE(status.ok());
243 
244       } else {
245         // The second half should fail because they would attempt to create a
246         // new connection and fail since it would exceed the connection quota
247         // limit set at the server.
248         EXPECT_FALSE(status.ok());
249       }
250     }
251   }
252 
253   int port_;
254   std::unique_ptr<Server> server_;
255   string server_address_;
256   string connect_address_;
257   GrpcCallbackServiceImpl grpc_service_;
258   std::string payload_;
259 };
260 
TEST_P(End2EndConnectionQuotaTest,ConnectionQuotaTest)261 TEST_P(End2EndConnectionQuotaTest, ConnectionQuotaTest) {
262   TestExceedingConnectionQuota();
263 }
264 
265 INSTANTIATE_TEST_SUITE_P(ConnectionQuotaParamTest, End2EndConnectionQuotaTest,
266                          ::testing::ValuesIn<int>({10, 100}));
267 
268 }  // namespace testing
269 }  // namespace grpc
270 
main(int argc,char ** argv)271 int main(int argc, char** argv) {
272   ::testing::InitGoogleTest(&argc, argv);
273   return RUN_ALL_TESTS();
274 }
275