xref: /aosp_15_r20/external/grpc-grpc/test/cpp/end2end/context_allocator_end2end_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2020 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <algorithm>
20 #include <atomic>
21 #include <condition_variable>
22 #include <functional>
23 #include <memory>
24 #include <mutex>
25 #include <sstream>
26 #include <thread>
27 
28 #include <gtest/gtest.h>
29 
30 #include <grpc/support/log.h>
31 #include <grpcpp/channel.h>
32 #include <grpcpp/client_context.h>
33 #include <grpcpp/create_channel.h>
34 #include <grpcpp/server.h>
35 #include <grpcpp/server_builder.h>
36 #include <grpcpp/server_context.h>
37 #include <grpcpp/support/client_callback.h>
38 #include <grpcpp/support/message_allocator.h>
39 
40 #include "src/core/lib/iomgr/iomgr.h"
41 #include "src/proto/grpc/testing/echo.grpc.pb.h"
42 #include "test/core/util/port.h"
43 #include "test/core/util/test_config.h"
44 #include "test/cpp/end2end/test_service_impl.h"
45 #include "test/cpp/util/test_credentials_provider.h"
46 
47 namespace grpc {
48 namespace testing {
49 namespace {
50 
51 enum class Protocol { INPROC, TCP };
52 
53 class TestScenario {
54  public:
TestScenario(Protocol protocol,const std::string & creds_type)55   TestScenario(Protocol protocol, const std::string& creds_type)
56       : protocol(protocol), credentials_type(creds_type) {}
57   void Log() const;
58   Protocol protocol;
59   const std::string credentials_type;
60 };
61 
operator <<(std::ostream & out,const TestScenario & scenario)62 std::ostream& operator<<(std::ostream& out, const TestScenario& scenario) {
63   return out << "TestScenario{protocol="
64              << (scenario.protocol == Protocol::INPROC ? "INPROC" : "TCP")
65              << "," << scenario.credentials_type << "}";
66 }
67 
Log() const68 void TestScenario::Log() const {
69   std::ostringstream out;
70   out << *this;
71   gpr_log(GPR_INFO, "%s", out.str().c_str());
72 }
73 
74 class ContextAllocatorEnd2endTestBase
75     : public ::testing::TestWithParam<TestScenario> {
76  protected:
SetUpTestSuite()77   static void SetUpTestSuite() { grpc_init(); }
TearDownTestSuite()78   static void TearDownTestSuite() { grpc_shutdown(); }
ContextAllocatorEnd2endTestBase()79   ContextAllocatorEnd2endTestBase() {}
80 
81   ~ContextAllocatorEnd2endTestBase() override = default;
82 
SetUp()83   void SetUp() override { GetParam().Log(); }
84 
CreateServer(std::unique_ptr<grpc::ContextAllocator> context_allocator)85   void CreateServer(std::unique_ptr<grpc::ContextAllocator> context_allocator) {
86     ServerBuilder builder;
87 
88     auto server_creds = GetCredentialsProvider()->GetServerCredentials(
89         GetParam().credentials_type);
90     if (GetParam().protocol == Protocol::TCP) {
91       picked_port_ = grpc_pick_unused_port_or_die();
92       server_address_ << "localhost:" << picked_port_;
93       builder.AddListeningPort(server_address_.str(), server_creds);
94     }
95     builder.SetContextAllocator(std::move(context_allocator));
96     builder.RegisterService(&callback_service_);
97 
98     server_ = builder.BuildAndStart();
99   }
100 
DestroyServer()101   void DestroyServer() {
102     if (server_) {
103       server_->Shutdown();
104       server_.reset();
105     }
106   }
107 
ResetStub()108   void ResetStub() {
109     ChannelArguments args;
110     auto channel_creds = GetCredentialsProvider()->GetChannelCredentials(
111         GetParam().credentials_type, &args);
112     switch (GetParam().protocol) {
113       case Protocol::TCP:
114         channel_ = grpc::CreateCustomChannel(server_address_.str(),
115                                              channel_creds, args);
116         break;
117       case Protocol::INPROC:
118         channel_ = server_->InProcessChannel(args);
119         break;
120       default:
121         assert(false);
122     }
123     stub_ = EchoTestService::NewStub(channel_);
124   }
125 
TearDown()126   void TearDown() override {
127     DestroyServer();
128     if (picked_port_ > 0) {
129       grpc_recycle_unused_port(picked_port_);
130     }
131   }
132 
SendRpcs(int num_rpcs)133   void SendRpcs(int num_rpcs) {
134     std::string test_string;
135     for (int i = 0; i < num_rpcs; i++) {
136       EchoRequest request;
137       EchoResponse response;
138       ClientContext cli_ctx;
139 
140       test_string += std::string(1024, 'x');
141       request.set_message(test_string);
142       std::string val;
143       cli_ctx.set_compression_algorithm(GRPC_COMPRESS_GZIP);
144 
145       std::mutex mu;
146       std::condition_variable cv;
147       bool done = false;
148       stub_->async()->Echo(
149           &cli_ctx, &request, &response,
150           [&request, &response, &done, &mu, &cv, val](Status s) {
151             GPR_ASSERT(s.ok());
152 
153             EXPECT_EQ(request.message(), response.message());
154             std::lock_guard<std::mutex> l(mu);
155             done = true;
156             cv.notify_one();
157           });
158       std::unique_lock<std::mutex> l(mu);
159       while (!done) {
160         cv.wait(l);
161       }
162     }
163   }
164 
165   int picked_port_{0};
166   std::shared_ptr<Channel> channel_;
167   std::unique_ptr<EchoTestService::Stub> stub_;
168   CallbackTestServiceImpl callback_service_;
169   std::unique_ptr<Server> server_;
170   std::ostringstream server_address_;
171 };
172 
173 class DefaultContextAllocatorTest : public ContextAllocatorEnd2endTestBase {};
174 
TEST_P(DefaultContextAllocatorTest,SimpleRpc)175 TEST_P(DefaultContextAllocatorTest, SimpleRpc) {
176   const int kRpcCount = 10;
177   CreateServer(nullptr);
178   ResetStub();
179   SendRpcs(kRpcCount);
180 }
181 
182 class NullContextAllocatorTest : public ContextAllocatorEnd2endTestBase {
183  public:
184   class NullAllocator : public grpc::ContextAllocator {
185    public:
NullAllocator(std::atomic<int> * allocation_count,std::atomic<int> * deallocation_count)186     NullAllocator(std::atomic<int>* allocation_count,
187                   std::atomic<int>* deallocation_count)
188         : allocation_count_(allocation_count),
189           deallocation_count_(deallocation_count) {}
NewCallbackServerContext()190     grpc::CallbackServerContext* NewCallbackServerContext() override {
191       allocation_count_->fetch_add(1, std::memory_order_relaxed);
192       return nullptr;
193     }
194 
NewGenericCallbackServerContext()195     GenericCallbackServerContext* NewGenericCallbackServerContext() override {
196       allocation_count_->fetch_add(1, std::memory_order_relaxed);
197       return nullptr;
198     }
199 
Release(grpc::CallbackServerContext *)200     void Release(
201         grpc::CallbackServerContext* /*callback_server_context*/) override {
202       deallocation_count_->fetch_add(1, std::memory_order_relaxed);
203     }
204 
Release(GenericCallbackServerContext *)205     void Release(
206         GenericCallbackServerContext* /*generic_callback_server_context*/)
207         override {
208       deallocation_count_->fetch_add(1, std::memory_order_relaxed);
209     }
210 
211     std::atomic<int>* allocation_count_;
212     std::atomic<int>* deallocation_count_;
213   };
214 };
215 
TEST_P(NullContextAllocatorTest,UnaryRpc)216 TEST_P(NullContextAllocatorTest, UnaryRpc) {
217   const int kRpcCount = 10;
218   std::atomic<int> allocation_count{0};
219   std::atomic<int> deallocation_count{0};
220   std::unique_ptr<NullAllocator> allocator(
221       new NullAllocator(&allocation_count, &deallocation_count));
222   CreateServer(std::move(allocator));
223   ResetStub();
224   SendRpcs(kRpcCount);
225   // messages_deallocaton_count is updated in Release after server side
226   // OnDone.
227   DestroyServer();
228   EXPECT_EQ(kRpcCount, allocation_count);
229   EXPECT_EQ(kRpcCount, deallocation_count);
230 }
231 
232 class SimpleContextAllocatorTest : public ContextAllocatorEnd2endTestBase {
233  public:
234   class SimpleAllocator : public grpc::ContextAllocator {
235    public:
SimpleAllocator(std::atomic<int> * allocation_count,std::atomic<int> * deallocation_count)236     SimpleAllocator(std::atomic<int>* allocation_count,
237                     std::atomic<int>* deallocation_count)
238         : allocation_count_(allocation_count),
239           deallocation_count_(deallocation_count) {}
NewCallbackServerContext()240     grpc::CallbackServerContext* NewCallbackServerContext() override {
241       allocation_count_->fetch_add(1, std::memory_order_relaxed);
242       return new grpc::CallbackServerContext();
243     }
NewGenericCallbackServerContext()244     GenericCallbackServerContext* NewGenericCallbackServerContext() override {
245       allocation_count_->fetch_add(1, std::memory_order_relaxed);
246       return new GenericCallbackServerContext();
247     }
248 
Release(grpc::CallbackServerContext * callback_server_context)249     void Release(
250         grpc::CallbackServerContext* callback_server_context) override {
251       deallocation_count_->fetch_add(1, std::memory_order_relaxed);
252       delete callback_server_context;
253     }
254 
Release(GenericCallbackServerContext * generic_callback_server_context)255     void Release(GenericCallbackServerContext* generic_callback_server_context)
256         override {
257       deallocation_count_->fetch_add(1, std::memory_order_relaxed);
258       delete generic_callback_server_context;
259     }
260 
261     std::atomic<int>* allocation_count_;
262     std::atomic<int>* deallocation_count_;
263   };
264 };
265 
TEST_P(SimpleContextAllocatorTest,UnaryRpc)266 TEST_P(SimpleContextAllocatorTest, UnaryRpc) {
267   const int kRpcCount = 10;
268   std::atomic<int> allocation_count{0};
269   std::atomic<int> deallocation_count{0};
270   std::unique_ptr<SimpleAllocator> allocator(
271       new SimpleAllocator(&allocation_count, &deallocation_count));
272   CreateServer(std::move(allocator));
273   ResetStub();
274   SendRpcs(kRpcCount);
275   // messages_deallocaton_count is updated in Release after server side
276   // OnDone.
277   DestroyServer();
278   EXPECT_EQ(kRpcCount, allocation_count);
279   EXPECT_EQ(kRpcCount, deallocation_count);
280 }
281 
CreateTestScenarios(bool test_insecure)282 std::vector<TestScenario> CreateTestScenarios(bool test_insecure) {
283   std::vector<TestScenario> scenarios;
284   std::vector<std::string> credentials_types{
285       GetCredentialsProvider()->GetSecureCredentialsTypeList()};
286   auto insec_ok = [] {
287     // Only allow insecure credentials type when it is registered with the
288     // provider. User may create providers that do not have insecure.
289     return GetCredentialsProvider()->GetChannelCredentials(
290                kInsecureCredentialsType, nullptr) != nullptr;
291   };
292   if (test_insecure && insec_ok()) {
293     credentials_types.push_back(kInsecureCredentialsType);
294   }
295   GPR_ASSERT(!credentials_types.empty());
296 
297   Protocol parr[]{Protocol::INPROC, Protocol::TCP};
298   for (Protocol p : parr) {
299     for (const auto& cred : credentials_types) {
300       if (p == Protocol::INPROC &&
301           (cred != kInsecureCredentialsType || !insec_ok())) {
302         continue;
303       }
304       scenarios.emplace_back(p, cred);
305     }
306   }
307   return scenarios;
308 }
309 
310 // TODO(ddyihai): adding client streaming/server streaming/bidi streaming
311 // test.
312 
313 INSTANTIATE_TEST_SUITE_P(DefaultContextAllocatorTest,
314                          DefaultContextAllocatorTest,
315                          ::testing::ValuesIn(CreateTestScenarios(true)));
316 INSTANTIATE_TEST_SUITE_P(NullContextAllocatorTest, NullContextAllocatorTest,
317                          ::testing::ValuesIn(CreateTestScenarios(true)));
318 INSTANTIATE_TEST_SUITE_P(SimpleContextAllocatorTest, SimpleContextAllocatorTest,
319                          ::testing::ValuesIn(CreateTestScenarios(true)));
320 
321 }  // namespace
322 }  // namespace testing
323 }  // namespace grpc
324 
main(int argc,char ** argv)325 int main(int argc, char** argv) {
326   grpc::testing::TestEnvironment env(&argc, argv);
327   ::testing::InitGoogleTest(&argc, argv);
328   int ret = RUN_ALL_TESTS();
329   return ret;
330 }
331