xref: /aosp_15_r20/external/grpc-grpc/test/core/event_engine/posix/posix_endpoint_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2022 gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/core/lib/event_engine/posix_engine/posix_endpoint.h"
16 
17 #include <algorithm>
18 #include <chrono>
19 #include <list>
20 #include <memory>
21 #include <string>
22 #include <thread>
23 #include <type_traits>
24 #include <vector>
25 
26 #include "absl/status/statusor.h"
27 #include "absl/strings/str_cat.h"
28 #include "absl/strings/str_split.h"
29 #include "absl/strings/string_view.h"
30 #include "gtest/gtest.h"
31 
32 #include <grpc/event_engine/event_engine.h>
33 #include <grpc/grpc.h>
34 #include <grpc/impl/channel_arg_names.h>
35 
36 #include "src/core/lib/channel/channel_args.h"
37 #include "src/core/lib/config/config_vars.h"
38 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
39 #include "src/core/lib/event_engine/poller.h"
40 #include "src/core/lib/event_engine/posix_engine/event_poller.h"
41 #include "src/core/lib/event_engine/posix_engine/event_poller_posix_default.h"
42 #include "src/core/lib/event_engine/posix_engine/posix_engine.h"
43 #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
44 #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
45 #include "src/core/lib/event_engine/tcp_socket_utils.h"
46 #include "src/core/lib/gprpp/dual_ref_counted.h"
47 #include "src/core/lib/gprpp/notification.h"
48 #include "src/core/lib/gprpp/ref_counted_ptr.h"
49 #include "src/core/lib/resource_quota/resource_quota.h"
50 #include "test/core/event_engine/event_engine_test_utils.h"
51 #include "test/core/event_engine/posix/posix_engine_test_utils.h"
52 #include "test/core/event_engine/test_suite/posix/oracle_event_engine_posix.h"
53 #include "test/core/util/port.h"
54 
55 namespace grpc_event_engine {
56 namespace experimental {
57 
58 namespace {
59 
60 using Endpoint = ::grpc_event_engine::experimental::EventEngine::Endpoint;
61 using Listener = ::grpc_event_engine::experimental::EventEngine::Listener;
62 using namespace std::chrono_literals;
63 
64 constexpr int kMinMessageSize = 1024;
65 constexpr int kNumConnections = 10;
66 constexpr int kNumExchangedMessages = 100;
67 std::atomic<int> g_num_active_connections{0};
68 
69 struct Connection {
70   std::unique_ptr<EventEngine::Endpoint> client_endpoint;
71   std::unique_ptr<EventEngine::Endpoint> server_endpoint;
72 };
73 
CreateConnectedEndpoints(PosixEventPoller & poller,bool is_zero_copy_enabled,int num_connections,std::shared_ptr<EventEngine> posix_ee,std::shared_ptr<EventEngine> oracle_ee)74 std::list<Connection> CreateConnectedEndpoints(
75     PosixEventPoller& poller, bool is_zero_copy_enabled, int num_connections,
76     std::shared_ptr<EventEngine> posix_ee,
77     std::shared_ptr<EventEngine> oracle_ee) {
78   std::list<Connection> connections;
79   auto memory_quota = std::make_unique<grpc_core::MemoryQuota>("bar");
80   std::string target_addr = absl::StrCat(
81       "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
82   auto resolved_addr = URIToResolvedAddress(target_addr);
83   GPR_ASSERT(resolved_addr.ok());
84   std::unique_ptr<EventEngine::Endpoint> server_endpoint;
85   grpc_core::Notification* server_signal = new grpc_core::Notification();
86 
87   Listener::AcceptCallback accept_cb =
88       [&server_endpoint, &server_signal](
89           std::unique_ptr<Endpoint> ep,
90           grpc_core::MemoryAllocator /*memory_allocator*/) {
91         server_endpoint = std::move(ep);
92         server_signal->Notify();
93       };
94   grpc_core::ChannelArgs args;
95   auto quota = grpc_core::ResourceQuota::Default();
96   args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota);
97   if (is_zero_copy_enabled) {
98     args = args.Set(GRPC_ARG_TCP_TX_ZEROCOPY_ENABLED, 1);
99     args = args.Set(GRPC_ARG_TCP_TX_ZEROCOPY_SEND_BYTES_THRESHOLD,
100                     kMinMessageSize);
101   }
102   ChannelArgsEndpointConfig config(args);
103   auto listener = oracle_ee->CreateListener(
104       std::move(accept_cb),
105       [](absl::Status status) { ASSERT_TRUE(status.ok()); }, config,
106       std::make_unique<grpc_core::MemoryQuota>("foo"));
107   GPR_ASSERT(listener.ok());
108 
109   EXPECT_TRUE((*listener)->Bind(*resolved_addr).ok());
110   EXPECT_TRUE((*listener)->Start().ok());
111 
112   // Create client socket and connect to the target address.
113   for (int i = 0; i < num_connections; ++i) {
114     int client_fd = ConnectToServerOrDie(*resolved_addr);
115     EventHandle* handle =
116         poller.CreateHandle(client_fd, "test", poller.CanTrackErrors());
117     EXPECT_NE(handle, nullptr);
118     server_signal->WaitForNotification();
119     EXPECT_NE(server_endpoint, nullptr);
120     ++g_num_active_connections;
121     PosixTcpOptions options = TcpOptionsFromEndpointConfig(config);
122     connections.push_back(Connection{
123         CreatePosixEndpoint(
124             handle,
125             PosixEngineClosure::TestOnlyToClosure(
126                 [&poller](absl::Status /*status*/) {
127                   if (--g_num_active_connections == 0) {
128                     poller.Kick();
129                   }
130                 }),
131             posix_ee,
132             options.resource_quota->memory_quota()->CreateMemoryAllocator(
133                 "test"),
134             options),
135         std::move(server_endpoint)});
136     delete server_signal;
137     server_signal = new grpc_core::Notification();
138   }
139   delete server_signal;
140   return connections;
141 }
142 
143 }  // namespace
144 
TestScenarioName(const::testing::TestParamInfo<bool> & info)145 std::string TestScenarioName(const ::testing::TestParamInfo<bool>& info) {
146   return absl::StrCat("is_zero_copy_enabled_", info.param);
147 }
148 
149 // A helper class to drive the polling of Fds. It repeatedly calls the Work(..)
150 // method on the poller to get pet pending events, then schedules another
151 // parallel Work(..) instantiation and processes these pending events. This
152 // continues until all Fds have orphaned themselves.
153 class Worker : public grpc_core::DualRefCounted<Worker> {
154  public:
Worker(std::shared_ptr<EventEngine> engine,PosixEventPoller * poller)155   Worker(std::shared_ptr<EventEngine> engine, PosixEventPoller* poller)
156       : engine_(std::move(engine)), poller_(poller) {
157     WeakRef().release();
158   }
Orphaned()159   void Orphaned() override { signal.Notify(); }
Start()160   void Start() {
161     // Start executing Work(..).
162     engine_->Run([this]() { Work(); });
163   }
164 
Wait()165   void Wait() {
166     signal.WaitForNotification();
167     WeakUnref();
168   }
169 
170  private:
Work()171   void Work() {
172     auto result = poller_->Work(24h, [this]() {
173       // Schedule next work instantiation immediately and take a Ref for
174       // the next instantiation.
175       Ref().release();
176       engine_->Run([this]() { Work(); });
177     });
178     ASSERT_TRUE(result == Poller::WorkResult::kOk ||
179                 result == Poller::WorkResult::kKicked);
180     // Corresponds to the Ref taken for the current instantiation. If the
181     // result was Poller::WorkResult::kKicked, then the next work instantiation
182     // would not have been scheduled and the poll_again callback would have
183     // been deleted.
184     Unref();
185   }
186   std::shared_ptr<EventEngine> engine_;
187   // The poller is not owned by the Worker. Rather it is owned by the test
188   // which creates the worker instance.
189   PosixEventPoller* poller_;
190   grpc_core::Notification signal;
191 };
192 
193 class PosixEndpointTest : public ::testing::TestWithParam<bool> {
SetUp()194   void SetUp() override {
195     oracle_ee_ = std::make_shared<PosixOracleEventEngine>();
196     scheduler_ =
197         std::make_unique<grpc_event_engine::experimental::TestScheduler>(
198             posix_ee_.get());
199     EXPECT_NE(scheduler_, nullptr);
200     poller_ = MakeDefaultPoller(scheduler_.get());
201     posix_ee_ = PosixEventEngine::MakeTestOnlyPosixEventEngine(poller_);
202     EXPECT_NE(posix_ee_, nullptr);
203     scheduler_->ChangeCurrentEventEngine(posix_ee_.get());
204     if (poller_ != nullptr) {
205       gpr_log(GPR_INFO, "Using poller: %s", poller_->Name().c_str());
206     }
207   }
208 
TearDown()209   void TearDown() override {
210     if (poller_ != nullptr) {
211       poller_->Shutdown();
212     }
213     WaitForSingleOwner(std::move(posix_ee_));
214     WaitForSingleOwner(std::move(oracle_ee_));
215   }
216 
217  public:
Scheduler()218   TestScheduler* Scheduler() { return scheduler_.get(); }
219 
GetPosixEE()220   std::shared_ptr<EventEngine> GetPosixEE() { return posix_ee_; }
221 
GetOracleEE()222   std::shared_ptr<EventEngine> GetOracleEE() { return oracle_ee_; }
223 
PosixPoller()224   PosixEventPoller* PosixPoller() { return poller_.get(); }
225 
226  private:
227   std::shared_ptr<PosixEventPoller> poller_;
228   std::unique_ptr<TestScheduler> scheduler_;
229   std::shared_ptr<EventEngine> posix_ee_;
230   std::shared_ptr<EventEngine> oracle_ee_;
231 };
232 
TEST_P(PosixEndpointTest,ConnectExchangeBidiDataTransferTest)233 TEST_P(PosixEndpointTest, ConnectExchangeBidiDataTransferTest) {
234   if (PosixPoller() == nullptr) {
235     return;
236   }
237   Worker* worker = new Worker(GetPosixEE(), PosixPoller());
238   worker->Start();
239   {
240     auto connections = CreateConnectedEndpoints(*PosixPoller(), GetParam(), 1,
241                                                 GetPosixEE(), GetOracleEE());
242     auto it = connections.begin();
243     auto client_endpoint = std::move((*it).client_endpoint);
244     auto server_endpoint = std::move((*it).server_endpoint);
245     EXPECT_NE(client_endpoint, nullptr);
246     EXPECT_NE(server_endpoint, nullptr);
247     connections.erase(it);
248 
249     // Alternate message exchanges between client -- server and server --
250     // client.
251     for (int i = 0; i < kNumExchangedMessages; i++) {
252       // Send from client to server and verify data read at the server.
253       ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(),
254                                       client_endpoint.get(),
255                                       server_endpoint.get())
256                       .ok());
257       // Send from server to client and verify data read at the client.
258       ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(),
259                                       server_endpoint.get(),
260                                       client_endpoint.get())
261                       .ok());
262     }
263   }
264   worker->Wait();
265 }
266 
267 // Create  N connections and exchange and verify random number of messages over
268 // each connection in parallel.
TEST_P(PosixEndpointTest,MultipleIPv6ConnectionsToOneOracleListenerTest)269 TEST_P(PosixEndpointTest, MultipleIPv6ConnectionsToOneOracleListenerTest) {
270   if (PosixPoller() == nullptr) {
271     return;
272   }
273   Worker* worker = new Worker(GetPosixEE(), PosixPoller());
274   worker->Start();
275   auto connections = CreateConnectedEndpoints(
276       *PosixPoller(), GetParam(), kNumConnections, GetPosixEE(), GetOracleEE());
277   std::vector<std::thread> threads;
278   // Create one thread for each connection. For each connection, create
279   // 2 more worker threads: to exchange and verify bi-directional data transfer.
280   threads.reserve(kNumConnections);
281   for (int i = 0; i < kNumConnections; i++) {
282     // For each connection, simulate a parallel bi-directional data transfer.
283     // All bi-directional transfers are run in parallel across all connections.
284     auto it = connections.begin();
285     auto client_endpoint = std::move((*it).client_endpoint);
286     auto server_endpoint = std::move((*it).server_endpoint);
287     EXPECT_NE(client_endpoint, nullptr);
288     EXPECT_NE(server_endpoint, nullptr);
289     connections.erase(it);
290     threads.emplace_back([client_endpoint = std::move(client_endpoint),
291                           server_endpoint = std::move(server_endpoint)]() {
292       std::vector<std::thread> workers;
293       workers.reserve(2);
294       auto worker = [client_endpoint = client_endpoint.get(),
295                      server_endpoint =
296                          server_endpoint.get()](bool client_to_server) {
297         for (int i = 0; i < kNumExchangedMessages; i++) {
298           // If client_to_server is true, send from client to server and
299           // verify data read at the server. Otherwise send data from server
300           // to client and verify data read at client.
301           if (client_to_server) {
302             EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(),
303                                             client_endpoint, server_endpoint)
304                             .ok());
305           } else {
306             EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(),
307                                             server_endpoint, client_endpoint)
308                             .ok());
309           }
310         }
311       };
312       // worker[0] simulates a flow from client to server endpoint
313       workers.emplace_back([&worker]() { worker(true); });
314       // worker[1] simulates a flow from server to client endpoint
315       workers.emplace_back([&worker]() { worker(false); });
316       workers[0].join();
317       workers[1].join();
318     });
319   }
320   for (auto& t : threads) {
321     t.join();
322   }
323   worker->Wait();
324 }
325 
326 // Test with zero copy enabled and disabled.
327 INSTANTIATE_TEST_SUITE_P(PosixEndpoint, PosixEndpointTest,
328                          ::testing::ValuesIn({false, true}), &TestScenarioName);
329 
330 }  // namespace experimental
331 }  // namespace grpc_event_engine
332 
main(int argc,char ** argv)333 int main(int argc, char** argv) {
334   ::testing::InitGoogleTest(&argc, argv);
335   auto poll_strategy = grpc_core::ConfigVars::Get().PollStrategy();
336   auto strings = absl::StrSplit(poll_strategy, ',');
337   if (std::find(strings.begin(), strings.end(), "none") != strings.end()) {
338     // Skip the test entirely if poll strategy is none.
339     return 0;
340   }
341   // TODO(ctiller): EventEngine temporarily needs grpc to be initialized first
342   // until we clear out the iomgr shutdown code.
343   grpc_init();
344   int r = RUN_ALL_TESTS();
345   grpc_shutdown();
346   return r;
347 }
348