1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include <memory>
16 #include <string>
17 #include <thread>
18 #include <utility>
19 #include <vector>
20
21 #include <gtest/gtest.h>
22
23 #include "absl/status/statusor.h"
24 #include "absl/strings/str_cat.h"
25 #include "absl/time/time.h"
26
27 #include <grpc/grpc.h>
28 #include <grpc/support/time.h>
29 #include <grpcpp/client_context.h>
30 #include <grpcpp/grpcpp.h>
31 #include <grpcpp/support/server_callback.h>
32
33 #include "src/core/lib/event_engine/tcp_socket_utils.h"
34 #include "src/core/lib/experiments/config.h"
35 #include "src/core/lib/gprpp/notification.h"
36 #include "src/core/lib/security/credentials/fake/fake_credentials.h"
37 #include "src/cpp/server/secure_server_credentials.h"
38 #include "src/proto/grpc/testing/echo.grpc.pb.h"
39 #include "test/core/util/port.h"
40 #include "test/core/util/test_config.h"
41 #include "test/cpp/util/credentials.h"
42
43 // IWYU pragma: no_include <sys/socket.h>
44
45 // A stress test which spins up a server with a small configured resource quota
46 // value. It then creates many channels which exchange large payloads with the
47 // server. This would drive the server to reach resource quota limits and
48 // trigger reclamation.
49
50 namespace grpc {
51 namespace testing {
52 namespace {
53 constexpr int kResourceQuotaSizeBytes = 1024 * 1024;
54 constexpr int kPayloadSizeBytes = 1024 * 1024;
55 constexpr int kNumParallelChannels = 10;
56 } // namespace
57
58 class EchoClientUnaryReactor : public grpc::ClientUnaryReactor {
59 public:
EchoClientUnaryReactor(ClientContext * ctx,EchoTestService::Stub * stub,const std::string payload,Status * status)60 EchoClientUnaryReactor(ClientContext* ctx, EchoTestService::Stub* stub,
61 const std::string payload, Status* status)
62 : ctx_(ctx), payload_(payload), status_(status) {
63 request_.set_message(payload);
64 stub->async()->Echo(ctx_, &request_, &response_, this);
65 StartCall();
66 }
67
Await()68 void Await() { notification_.WaitForNotification(); }
69
70 protected:
OnReadInitialMetadataDone(bool)71 void OnReadInitialMetadataDone(bool /*ok*/) override {}
72
OnDone(const Status & s)73 void OnDone(const Status& s) override {
74 *status_ = s;
75 notification_.Notify();
76 }
77
78 private:
79 ClientContext* const ctx_;
80 EchoRequest request_;
81 EchoResponse response_;
82 const std::string payload_;
83 grpc_core::Notification notification_;
84 Status* const status_;
85 };
86
87 class EchoServerUnaryReactor : public ServerUnaryReactor {
88 public:
EchoServerUnaryReactor(CallbackServerContext *,const EchoRequest * request,EchoResponse * response)89 EchoServerUnaryReactor(CallbackServerContext* /*ctx*/,
90 const EchoRequest* request, EchoResponse* response) {
91 response->set_message(request->message());
92 Finish(grpc::Status::OK);
93 }
94
95 private:
OnDone()96 void OnDone() override { delete this; }
97 };
98
99 class GrpcCallbackServiceImpl : public EchoTestService::CallbackService {
100 public:
Echo(CallbackServerContext * context,const EchoRequest * request,EchoResponse * response)101 ServerUnaryReactor* Echo(CallbackServerContext* context,
102 const EchoRequest* request,
103 EchoResponse* response) override {
104 return new EchoServerUnaryReactor(context, request, response);
105 }
106 };
107
108 class End2EndResourceQuotaUnaryTest : public ::testing::Test {
109 protected:
End2EndResourceQuotaUnaryTest()110 End2EndResourceQuotaUnaryTest() {
111 int port = grpc_pick_unused_port_or_die();
112 server_address_ = absl::StrCat("localhost:", port);
113 payload_ = std::string(kPayloadSizeBytes, 'a');
114 ServerBuilder builder;
115 builder.AddListeningPort(server_address_, InsecureServerCredentials());
116 builder.SetResourceQuota(
117 grpc::ResourceQuota("TestService").Resize(kResourceQuotaSizeBytes));
118 builder.RegisterService(&grpc_service_);
119 server_ = builder.BuildAndStart();
120 }
121
~End2EndResourceQuotaUnaryTest()122 ~End2EndResourceQuotaUnaryTest() override { server_->Shutdown(); }
123
MakeGrpcCall()124 void MakeGrpcCall() {
125 ClientContext ctx;
126 Status status;
127 auto stub = EchoTestService::NewStub(
128 CreateChannel(server_address_, grpc::InsecureChannelCredentials()));
129 ctx.set_wait_for_ready(true);
130 EchoClientUnaryReactor reactor(&ctx, stub.get(), payload_, &status);
131 reactor.Await();
132 }
133
MakeGrpcCalls()134 void MakeGrpcCalls() {
135 std::vector<std::thread> workers;
136 workers.reserve(kNumParallelChannels);
137 // Run MakeGrpcCall() many times concurrently.
138 for (int i = 0; i < kNumParallelChannels; ++i) {
139 workers.emplace_back([this]() { MakeGrpcCall(); });
140 }
141 for (int i = 0; i < kNumParallelChannels; ++i) {
142 workers[i].join();
143 }
144 }
145
146 int port_;
147 std::unique_ptr<Server> server_;
148 string server_address_;
149 GrpcCallbackServiceImpl grpc_service_;
150 std::string payload_;
151 };
152
TEST_F(End2EndResourceQuotaUnaryTest,MultipleUnaryRPCTest)153 TEST_F(End2EndResourceQuotaUnaryTest, MultipleUnaryRPCTest) { MakeGrpcCalls(); }
154
155 class End2EndConnectionQuotaTest : public ::testing::TestWithParam<int> {
156 protected:
End2EndConnectionQuotaTest()157 End2EndConnectionQuotaTest() {
158 port_ = grpc_pick_unused_port_or_die();
159 server_address_ = absl::StrCat("[::]:", port_);
160 connect_address_ = absl::StrCat("ipv6:[::1]:", port_);
161 payload_ = std::string(kPayloadSizeBytes, 'a');
162 ServerBuilder builder;
163 builder.AddListeningPort(
164 server_address_,
165 std::make_shared<SecureServerCredentials>(
166 grpc_fake_transport_security_server_credentials_create()));
167 builder.AddChannelArgument(GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS, 1000);
168 builder.AddChannelArgument(GRPC_ARG_MAX_ALLOWED_INCOMING_CONNECTIONS,
169 GetParam());
170 builder.AddChannelArgument(
171 GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 10000);
172 builder.RegisterService(&grpc_service_);
173 server_ = builder.BuildAndStart();
174 }
175
~End2EndConnectionQuotaTest()176 ~End2EndConnectionQuotaTest() override { server_->Shutdown(); }
177
CreateGrpcChannelStub()178 std::unique_ptr<EchoTestService::Stub> CreateGrpcChannelStub() {
179 grpc::ChannelArguments args;
180 args.SetInt(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL, 1);
181 args.SetInt(GRPC_ARG_ENABLE_RETRIES, 0);
182 args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, 20000);
183 args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 10000);
184 args.SetInt(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS, 15000);
185 args.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
186
187 return EchoTestService::NewStub(CreateCustomChannel(
188 connect_address_,
189 std::make_shared<FakeTransportSecurityChannelCredentials>(), args));
190 }
191
TestExceedingConnectionQuota()192 void TestExceedingConnectionQuota() {
193 const int kNumConnections = 2 * GetParam();
194 #ifdef GPR_LINUX
195 // On linux systems create 2 * NumConnection tcp connections which don't
196 // do anything and verify that they get closed after
197 // GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS seconds.
198 auto connect_address_resolved =
199 grpc_event_engine::experimental::URIToResolvedAddress(connect_address_);
200 std::vector<std::thread> workers;
201 workers.reserve(kNumConnections);
202 for (int i = 0; i < kNumConnections; ++i) {
203 workers.emplace_back([connect_address_resolved]() {
204 int client_fd;
205 int one = 1;
206 char buf[1024];
207 client_fd = socket(AF_INET6, SOCK_STREAM, 0);
208 setsockopt(client_fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
209 // Connection should succeed.
210 EXPECT_EQ(connect(client_fd,
211 const_cast<struct sockaddr*>(
212 connect_address_resolved->address()),
213 connect_address_resolved->size()),
214 0);
215 // recv should not block forever and it should return because
216 // GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS is set and the server should
217 // close this connections after that timeout expires.
218 while (recv(client_fd, buf, 1024, 0) > 0) {
219 }
220 close(client_fd);
221 });
222 }
223 for (int i = 0; i < kNumConnections; ++i) {
224 workers[i].join();
225 }
226 #endif
227 // Subsequent kNumConnections / 2 RPCs should succeed because the previously
228 // spawned client connections have been closed.
229 std::vector<std::unique_ptr<EchoTestService::Stub>> stubs;
230 stubs.reserve(kNumConnections);
231 for (int i = 0; i < kNumConnections; i++) {
232 stubs.push_back(CreateGrpcChannelStub());
233 }
234 for (int i = 0; i < kNumConnections; ++i) {
235 ClientContext ctx;
236 Status status;
237 ctx.set_wait_for_ready(false);
238 EchoClientUnaryReactor reactor(&ctx, stubs[i].get(), payload_, &status);
239 reactor.Await();
240 // The first half RPCs should succeed.
241 if (i < kNumConnections / 2) {
242 EXPECT_TRUE(status.ok());
243
244 } else {
245 // The second half should fail because they would attempt to create a
246 // new connection and fail since it would exceed the connection quota
247 // limit set at the server.
248 EXPECT_FALSE(status.ok());
249 }
250 }
251 }
252
253 int port_;
254 std::unique_ptr<Server> server_;
255 string server_address_;
256 string connect_address_;
257 GrpcCallbackServiceImpl grpc_service_;
258 std::string payload_;
259 };
260
TEST_P(End2EndConnectionQuotaTest,ConnectionQuotaTest)261 TEST_P(End2EndConnectionQuotaTest, ConnectionQuotaTest) {
262 TestExceedingConnectionQuota();
263 }
264
265 INSTANTIATE_TEST_SUITE_P(ConnectionQuotaParamTest, End2EndConnectionQuotaTest,
266 ::testing::ValuesIn<int>({10, 100}));
267
268 } // namespace testing
269 } // namespace grpc
270
main(int argc,char ** argv)271 int main(int argc, char** argv) {
272 ::testing::InitGoogleTest(&argc, argv);
273 return RUN_ALL_TESTS();
274 }
275