xref: /aosp_15_r20/external/grpc-grpc/test/core/event_engine/test_suite/tests/server_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <algorithm>
16 #include <chrono>
17 #include <memory>
18 #include <string>
19 #include <thread>
20 #include <tuple>
21 #include <type_traits>
22 #include <utility>
23 #include <vector>
24 
25 #include "absl/status/status.h"
26 #include "absl/status/statusor.h"
27 #include "absl/strings/str_cat.h"
28 #include "absl/time/clock.h"
29 #include "absl/time/time.h"
30 #include "gtest/gtest.h"
31 
32 #include <grpc/event_engine/event_engine.h>
33 #include <grpc/event_engine/memory_allocator.h>
34 #include <grpc/impl/channel_arg_names.h>
35 #include <grpc/support/log.h>
36 
37 #include "src/core/lib/channel/channel_args.h"
38 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
39 #include "src/core/lib/event_engine/tcp_socket_utils.h"
40 #include "src/core/lib/gprpp/notification.h"
41 #include "src/core/lib/iomgr/exec_ctx.h"
42 #include "src/core/lib/resource_quota/memory_quota.h"
43 #include "src/core/lib/resource_quota/resource_quota.h"
44 #include "test/core/event_engine/event_engine_test_utils.h"
45 #include "test/core/event_engine/test_suite/event_engine_test_framework.h"
46 #include "test/core/util/port.h"
47 
48 namespace grpc_event_engine {
49 namespace experimental {
50 
InitServerTests()51 void InitServerTests() {}
52 
53 }  // namespace experimental
54 }  // namespace grpc_event_engine
55 
56 class EventEngineServerTest : public EventEngineTest {};
57 
58 using namespace std::chrono_literals;
59 
60 namespace {
61 
62 using ::grpc_event_engine::experimental::ChannelArgsEndpointConfig;
63 using ::grpc_event_engine::experimental::EventEngine;
64 using ::grpc_event_engine::experimental::URIToResolvedAddress;
65 using Endpoint = ::grpc_event_engine::experimental::EventEngine::Endpoint;
66 using Listener = ::grpc_event_engine::experimental::EventEngine::Listener;
67 using ::grpc_event_engine::experimental::GetNextSendMessage;
68 
69 constexpr int kNumExchangedMessages = 100;
70 
71 }  // namespace
72 
TEST_F(EventEngineServerTest,CannotBindAfterStarted)73 TEST_F(EventEngineServerTest, CannotBindAfterStarted) {
74   std::shared_ptr<EventEngine> engine(this->NewEventEngine());
75   ChannelArgsEndpointConfig config;
76   auto listener = engine->CreateListener(
77       [](std::unique_ptr<Endpoint>, grpc_core::MemoryAllocator) {},
78       [](absl::Status) {}, config,
79       std::make_unique<grpc_core::MemoryQuota>("foo"));
80   // Bind an initial port to ensure normal listener startup
81   auto resolved_addr = URIToResolvedAddress(absl::StrCat(
82       "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die())));
83   ASSERT_TRUE(resolved_addr.ok()) << resolved_addr.status();
84   auto bind_result = (*listener)->Bind(*resolved_addr);
85   ASSERT_TRUE(bind_result.ok()) << bind_result.status();
86   auto listen_result = (*listener)->Start();
87   ASSERT_TRUE(listen_result.ok()) << listen_result;
88   // A subsequent bind, which should fail
89   auto resolved_addr2 = URIToResolvedAddress(absl::StrCat(
90       "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die())));
91   ASSERT_TRUE(resolved_addr2.ok());
92   ASSERT_FALSE((*listener)->Bind(*resolved_addr2).ok());
93 }
94 
95 // Create a connection using the oracle EventEngine to a listener created
96 // by the Test EventEngine and exchange bi-di data over the connection.
97 // For each data transfer, verify that data written at one end of the stream
98 // equals data read at the other end of the stream.
TEST_F(EventEngineServerTest,ServerConnectExchangeBidiDataTransferTest)99 TEST_F(EventEngineServerTest, ServerConnectExchangeBidiDataTransferTest) {
100   grpc_core::ExecCtx ctx;
101   std::shared_ptr<EventEngine> oracle_ee(this->NewOracleEventEngine());
102   std::shared_ptr<EventEngine> test_ee(this->NewEventEngine());
103   auto memory_quota = std::make_unique<grpc_core::MemoryQuota>("bar");
104   std::string target_addr = absl::StrCat(
105       "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
106   auto resolved_addr = URIToResolvedAddress(target_addr);
107   GPR_ASSERT(resolved_addr.ok());
108   std::unique_ptr<EventEngine::Endpoint> client_endpoint;
109   std::unique_ptr<EventEngine::Endpoint> server_endpoint;
110   grpc_core::Notification client_signal;
111   grpc_core::Notification server_signal;
112 
113   Listener::AcceptCallback accept_cb =
114       [&server_endpoint, &server_signal](
115           std::unique_ptr<Endpoint> ep,
116           grpc_core::MemoryAllocator /*memory_allocator*/) {
117         server_endpoint = std::move(ep);
118         server_signal.Notify();
119       };
120 
121   grpc_core::ChannelArgs args;
122   auto quota = grpc_core::ResourceQuota::Default();
123   args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota);
124   ChannelArgsEndpointConfig config(args);
125   auto listener = *test_ee->CreateListener(
126       std::move(accept_cb),
127       [](absl::Status status) {
128         ASSERT_TRUE(status.ok()) << status.ToString();
129       },
130       config, std::make_unique<grpc_core::MemoryQuota>("foo"));
131 
132   ASSERT_TRUE(listener->Bind(*resolved_addr).ok());
133   ASSERT_TRUE(listener->Start().ok());
134 
135   oracle_ee->Connect(
136       [&client_endpoint,
137        &client_signal](absl::StatusOr<std::unique_ptr<Endpoint>> endpoint) {
138         ASSERT_TRUE(endpoint.ok()) << endpoint.status();
139         client_endpoint = std::move(*endpoint);
140         client_signal.Notify();
141       },
142       *resolved_addr, config, memory_quota->CreateMemoryAllocator("conn-1"),
143       24h);
144 
145   client_signal.WaitForNotification();
146   server_signal.WaitForNotification();
147   ASSERT_NE(client_endpoint.get(), nullptr);
148   ASSERT_NE(server_endpoint.get(), nullptr);
149 
150   // Alternate message exchanges between client -- server and server --
151   // client.
152   for (int i = 0; i < kNumExchangedMessages; i++) {
153     // Send from client to server and verify data read at the server.
154     ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(), client_endpoint.get(),
155                                     server_endpoint.get())
156                     .ok());
157 
158     // Send from server to client and verify data read at the client.
159     ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(), server_endpoint.get(),
160                                     client_endpoint.get())
161                     .ok());
162   }
163   client_endpoint.reset();
164   server_endpoint.reset();
165   listener.reset();
166 }
167 
168 // Create 1 listener bound to N IPv6 addresses and M connections where M > N and
169 // exchange and verify random number of messages over each connection.
TEST_F(EventEngineServerTest,ServerMultipleIPv6ConnectionsToOneOracleListenerTest)170 TEST_F(EventEngineServerTest,
171        ServerMultipleIPv6ConnectionsToOneOracleListenerTest) {
172   grpc_core::ExecCtx ctx;
173   static constexpr int kNumListenerAddresses = 10;  // N
174   static constexpr int kNumConnections = 10;        // M
175   std::shared_ptr<EventEngine> oracle_ee(this->NewOracleEventEngine());
176   std::shared_ptr<EventEngine> test_ee(this->NewEventEngine());
177   auto memory_quota = std::make_unique<grpc_core::MemoryQuota>("bar");
178   std::unique_ptr<EventEngine::Endpoint> server_endpoint;
179   // Notifications can only be fired once, so they are newed every loop
180   grpc_core::Notification* server_signal = new grpc_core::Notification();
181   std::vector<std::string> target_addrs;
182   std::vector<std::tuple<std::unique_ptr<Endpoint>, std::unique_ptr<Endpoint>>>
183       connections;
184 
185   Listener::AcceptCallback accept_cb =
186       [&server_endpoint, &server_signal](
187           std::unique_ptr<Endpoint> ep,
188           grpc_core::MemoryAllocator /*memory_allocator*/) {
189         server_endpoint = std::move(ep);
190         server_signal->Notify();
191       };
192   grpc_core::ChannelArgs args;
193   auto quota = grpc_core::ResourceQuota::Default();
194   args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota);
195   ChannelArgsEndpointConfig config(args);
196   auto listener = *test_ee->CreateListener(
197       std::move(accept_cb),
198       [](absl::Status status) {
199         ASSERT_TRUE(status.ok()) << status.ToString();
200       },
201       config, std::make_unique<grpc_core::MemoryQuota>("foo"));
202 
203   target_addrs.reserve(kNumListenerAddresses);
204   for (int i = 0; i < kNumListenerAddresses; i++) {
205     std::string target_addr = absl::StrCat(
206         "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
207     ASSERT_TRUE(listener->Bind(*URIToResolvedAddress(target_addr)).ok());
208     target_addrs.push_back(target_addr);
209   }
210   ASSERT_TRUE(listener->Start().ok());
211   absl::SleepFor(absl::Milliseconds(500));
212   for (int i = 0; i < kNumConnections; i++) {
213     std::unique_ptr<EventEngine::Endpoint> client_endpoint;
214     grpc_core::Notification client_signal;
215     // Create an oracle EventEngine client and connect to a one of the
216     // addresses bound to the test EventEngine listener. Verify that the
217     // connection succeeds.
218     grpc_core::ChannelArgs client_args;
219     auto client_quota = grpc_core::ResourceQuota::Default();
220     client_args = client_args.Set(GRPC_ARG_RESOURCE_QUOTA, client_quota);
221     ChannelArgsEndpointConfig client_config(client_args);
222     oracle_ee->Connect(
223         [&client_endpoint,
224          &client_signal](absl::StatusOr<std::unique_ptr<Endpoint>> endpoint) {
225           ASSERT_TRUE(endpoint.ok());
226           client_endpoint = std::move(*endpoint);
227           client_signal.Notify();
228         },
229         *URIToResolvedAddress(target_addrs[i % kNumListenerAddresses]),
230         client_config,
231         memory_quota->CreateMemoryAllocator(
232             absl::StrCat("conn-", std::to_string(i))),
233         24h);
234 
235     client_signal.WaitForNotification();
236     server_signal->WaitForNotification();
237     ASSERT_NE(client_endpoint.get(), nullptr);
238     ASSERT_NE(server_endpoint.get(), nullptr);
239     connections.push_back(std::make_tuple(std::move(client_endpoint),
240                                           std::move(server_endpoint)));
241     delete server_signal;
242     server_signal = new grpc_core::Notification();
243   }
244   delete server_signal;
245 
246   std::vector<std::thread> threads;
247   // Create one thread for each connection. For each connection, create
248   // 2 more worker threads: to exchange and verify bi-directional data
249   // transfer.
250   threads.reserve(kNumConnections);
251   for (int i = 0; i < kNumConnections; i++) {
252     // For each connection, simulate a parallel bi-directional data transfer.
253     // All bi-directional transfers are run in parallel across all
254     // connections. Each bi-directional data transfer uses a random number of
255     // messages.
256     threads.emplace_back([client_endpoint =
257                               std::move(std::get<0>(connections[i])),
258                           server_endpoint =
259                               std::move(std::get<1>(connections[i]))]() {
260       std::vector<std::thread> workers;
261       workers.reserve(2);
262       auto worker = [client_endpoint = client_endpoint.get(),
263                      server_endpoint =
264                          server_endpoint.get()](bool client_to_server) {
265         grpc_core::ExecCtx ctx;
266         for (int i = 0; i < kNumExchangedMessages; i++) {
267           // If client_to_server is true, send from client to server and
268           // verify data read at the server. Otherwise send data from server
269           // to client and verify data read at client.
270           if (client_to_server) {
271             ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(),
272                                             client_endpoint, server_endpoint)
273                             .ok());
274           } else {
275             ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(),
276                                             server_endpoint, client_endpoint)
277                             .ok());
278           }
279         }
280       };
281       // worker[0] simulates a flow from client to server endpoint
282       workers.emplace_back([&worker]() { worker(true); });
283       // worker[1] simulates a flow from server to client endpoint
284       workers.emplace_back([&worker]() { worker(false); });
285       workers[0].join();
286       workers[1].join();
287     });
288   }
289   for (auto& t : threads) {
290     t.join();
291   }
292   server_endpoint.reset();
293   listener.reset();
294 }
295 
296 // TODO(vigneshbabu): Add more tests which create listeners bound to a mix
297 // Ipv6 and other type of addresses (UDS) in the same test.
298