1 // Copyright 2022 gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/lib/event_engine/posix_engine/posix_endpoint.h"
16
17 #include <algorithm>
18 #include <chrono>
19 #include <list>
20 #include <memory>
21 #include <string>
22 #include <thread>
23 #include <type_traits>
24 #include <vector>
25
26 #include "absl/status/statusor.h"
27 #include "absl/strings/str_cat.h"
28 #include "absl/strings/str_split.h"
29 #include "absl/strings/string_view.h"
30 #include "gtest/gtest.h"
31
32 #include <grpc/event_engine/event_engine.h>
33 #include <grpc/grpc.h>
34 #include <grpc/impl/channel_arg_names.h>
35
36 #include "src/core/lib/channel/channel_args.h"
37 #include "src/core/lib/config/config_vars.h"
38 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
39 #include "src/core/lib/event_engine/poller.h"
40 #include "src/core/lib/event_engine/posix_engine/event_poller.h"
41 #include "src/core/lib/event_engine/posix_engine/event_poller_posix_default.h"
42 #include "src/core/lib/event_engine/posix_engine/posix_engine.h"
43 #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
44 #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
45 #include "src/core/lib/event_engine/tcp_socket_utils.h"
46 #include "src/core/lib/gprpp/dual_ref_counted.h"
47 #include "src/core/lib/gprpp/notification.h"
48 #include "src/core/lib/gprpp/ref_counted_ptr.h"
49 #include "src/core/lib/resource_quota/resource_quota.h"
50 #include "test/core/event_engine/event_engine_test_utils.h"
51 #include "test/core/event_engine/posix/posix_engine_test_utils.h"
52 #include "test/core/event_engine/test_suite/posix/oracle_event_engine_posix.h"
53 #include "test/core/util/port.h"
54
55 namespace grpc_event_engine {
56 namespace experimental {
57
58 namespace {
59
60 using Endpoint = ::grpc_event_engine::experimental::EventEngine::Endpoint;
61 using Listener = ::grpc_event_engine::experimental::EventEngine::Listener;
62 using namespace std::chrono_literals;
63
64 constexpr int kMinMessageSize = 1024;
65 constexpr int kNumConnections = 10;
66 constexpr int kNumExchangedMessages = 100;
67 std::atomic<int> g_num_active_connections{0};
68
69 struct Connection {
70 std::unique_ptr<EventEngine::Endpoint> client_endpoint;
71 std::unique_ptr<EventEngine::Endpoint> server_endpoint;
72 };
73
CreateConnectedEndpoints(PosixEventPoller & poller,bool is_zero_copy_enabled,int num_connections,std::shared_ptr<EventEngine> posix_ee,std::shared_ptr<EventEngine> oracle_ee)74 std::list<Connection> CreateConnectedEndpoints(
75 PosixEventPoller& poller, bool is_zero_copy_enabled, int num_connections,
76 std::shared_ptr<EventEngine> posix_ee,
77 std::shared_ptr<EventEngine> oracle_ee) {
78 std::list<Connection> connections;
79 auto memory_quota = std::make_unique<grpc_core::MemoryQuota>("bar");
80 std::string target_addr = absl::StrCat(
81 "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
82 auto resolved_addr = URIToResolvedAddress(target_addr);
83 GPR_ASSERT(resolved_addr.ok());
84 std::unique_ptr<EventEngine::Endpoint> server_endpoint;
85 grpc_core::Notification* server_signal = new grpc_core::Notification();
86
87 Listener::AcceptCallback accept_cb =
88 [&server_endpoint, &server_signal](
89 std::unique_ptr<Endpoint> ep,
90 grpc_core::MemoryAllocator /*memory_allocator*/) {
91 server_endpoint = std::move(ep);
92 server_signal->Notify();
93 };
94 grpc_core::ChannelArgs args;
95 auto quota = grpc_core::ResourceQuota::Default();
96 args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota);
97 if (is_zero_copy_enabled) {
98 args = args.Set(GRPC_ARG_TCP_TX_ZEROCOPY_ENABLED, 1);
99 args = args.Set(GRPC_ARG_TCP_TX_ZEROCOPY_SEND_BYTES_THRESHOLD,
100 kMinMessageSize);
101 }
102 ChannelArgsEndpointConfig config(args);
103 auto listener = oracle_ee->CreateListener(
104 std::move(accept_cb),
105 [](absl::Status status) { ASSERT_TRUE(status.ok()); }, config,
106 std::make_unique<grpc_core::MemoryQuota>("foo"));
107 GPR_ASSERT(listener.ok());
108
109 EXPECT_TRUE((*listener)->Bind(*resolved_addr).ok());
110 EXPECT_TRUE((*listener)->Start().ok());
111
112 // Create client socket and connect to the target address.
113 for (int i = 0; i < num_connections; ++i) {
114 int client_fd = ConnectToServerOrDie(*resolved_addr);
115 EventHandle* handle =
116 poller.CreateHandle(client_fd, "test", poller.CanTrackErrors());
117 EXPECT_NE(handle, nullptr);
118 server_signal->WaitForNotification();
119 EXPECT_NE(server_endpoint, nullptr);
120 ++g_num_active_connections;
121 PosixTcpOptions options = TcpOptionsFromEndpointConfig(config);
122 connections.push_back(Connection{
123 CreatePosixEndpoint(
124 handle,
125 PosixEngineClosure::TestOnlyToClosure(
126 [&poller](absl::Status /*status*/) {
127 if (--g_num_active_connections == 0) {
128 poller.Kick();
129 }
130 }),
131 posix_ee,
132 options.resource_quota->memory_quota()->CreateMemoryAllocator(
133 "test"),
134 options),
135 std::move(server_endpoint)});
136 delete server_signal;
137 server_signal = new grpc_core::Notification();
138 }
139 delete server_signal;
140 return connections;
141 }
142
143 } // namespace
144
TestScenarioName(const::testing::TestParamInfo<bool> & info)145 std::string TestScenarioName(const ::testing::TestParamInfo<bool>& info) {
146 return absl::StrCat("is_zero_copy_enabled_", info.param);
147 }
148
149 // A helper class to drive the polling of Fds. It repeatedly calls the Work(..)
150 // method on the poller to get pet pending events, then schedules another
151 // parallel Work(..) instantiation and processes these pending events. This
152 // continues until all Fds have orphaned themselves.
153 class Worker : public grpc_core::DualRefCounted<Worker> {
154 public:
Worker(std::shared_ptr<EventEngine> engine,PosixEventPoller * poller)155 Worker(std::shared_ptr<EventEngine> engine, PosixEventPoller* poller)
156 : engine_(std::move(engine)), poller_(poller) {
157 WeakRef().release();
158 }
Orphaned()159 void Orphaned() override { signal.Notify(); }
Start()160 void Start() {
161 // Start executing Work(..).
162 engine_->Run([this]() { Work(); });
163 }
164
Wait()165 void Wait() {
166 signal.WaitForNotification();
167 WeakUnref();
168 }
169
170 private:
Work()171 void Work() {
172 auto result = poller_->Work(24h, [this]() {
173 // Schedule next work instantiation immediately and take a Ref for
174 // the next instantiation.
175 Ref().release();
176 engine_->Run([this]() { Work(); });
177 });
178 ASSERT_TRUE(result == Poller::WorkResult::kOk ||
179 result == Poller::WorkResult::kKicked);
180 // Corresponds to the Ref taken for the current instantiation. If the
181 // result was Poller::WorkResult::kKicked, then the next work instantiation
182 // would not have been scheduled and the poll_again callback would have
183 // been deleted.
184 Unref();
185 }
186 std::shared_ptr<EventEngine> engine_;
187 // The poller is not owned by the Worker. Rather it is owned by the test
188 // which creates the worker instance.
189 PosixEventPoller* poller_;
190 grpc_core::Notification signal;
191 };
192
193 class PosixEndpointTest : public ::testing::TestWithParam<bool> {
SetUp()194 void SetUp() override {
195 oracle_ee_ = std::make_shared<PosixOracleEventEngine>();
196 scheduler_ =
197 std::make_unique<grpc_event_engine::experimental::TestScheduler>(
198 posix_ee_.get());
199 EXPECT_NE(scheduler_, nullptr);
200 poller_ = MakeDefaultPoller(scheduler_.get());
201 posix_ee_ = PosixEventEngine::MakeTestOnlyPosixEventEngine(poller_);
202 EXPECT_NE(posix_ee_, nullptr);
203 scheduler_->ChangeCurrentEventEngine(posix_ee_.get());
204 if (poller_ != nullptr) {
205 gpr_log(GPR_INFO, "Using poller: %s", poller_->Name().c_str());
206 }
207 }
208
TearDown()209 void TearDown() override {
210 if (poller_ != nullptr) {
211 poller_->Shutdown();
212 }
213 WaitForSingleOwner(std::move(posix_ee_));
214 WaitForSingleOwner(std::move(oracle_ee_));
215 }
216
217 public:
Scheduler()218 TestScheduler* Scheduler() { return scheduler_.get(); }
219
GetPosixEE()220 std::shared_ptr<EventEngine> GetPosixEE() { return posix_ee_; }
221
GetOracleEE()222 std::shared_ptr<EventEngine> GetOracleEE() { return oracle_ee_; }
223
PosixPoller()224 PosixEventPoller* PosixPoller() { return poller_.get(); }
225
226 private:
227 std::shared_ptr<PosixEventPoller> poller_;
228 std::unique_ptr<TestScheduler> scheduler_;
229 std::shared_ptr<EventEngine> posix_ee_;
230 std::shared_ptr<EventEngine> oracle_ee_;
231 };
232
TEST_P(PosixEndpointTest,ConnectExchangeBidiDataTransferTest)233 TEST_P(PosixEndpointTest, ConnectExchangeBidiDataTransferTest) {
234 if (PosixPoller() == nullptr) {
235 return;
236 }
237 Worker* worker = new Worker(GetPosixEE(), PosixPoller());
238 worker->Start();
239 {
240 auto connections = CreateConnectedEndpoints(*PosixPoller(), GetParam(), 1,
241 GetPosixEE(), GetOracleEE());
242 auto it = connections.begin();
243 auto client_endpoint = std::move((*it).client_endpoint);
244 auto server_endpoint = std::move((*it).server_endpoint);
245 EXPECT_NE(client_endpoint, nullptr);
246 EXPECT_NE(server_endpoint, nullptr);
247 connections.erase(it);
248
249 // Alternate message exchanges between client -- server and server --
250 // client.
251 for (int i = 0; i < kNumExchangedMessages; i++) {
252 // Send from client to server and verify data read at the server.
253 ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(),
254 client_endpoint.get(),
255 server_endpoint.get())
256 .ok());
257 // Send from server to client and verify data read at the client.
258 ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(),
259 server_endpoint.get(),
260 client_endpoint.get())
261 .ok());
262 }
263 }
264 worker->Wait();
265 }
266
267 // Create N connections and exchange and verify random number of messages over
268 // each connection in parallel.
TEST_P(PosixEndpointTest,MultipleIPv6ConnectionsToOneOracleListenerTest)269 TEST_P(PosixEndpointTest, MultipleIPv6ConnectionsToOneOracleListenerTest) {
270 if (PosixPoller() == nullptr) {
271 return;
272 }
273 Worker* worker = new Worker(GetPosixEE(), PosixPoller());
274 worker->Start();
275 auto connections = CreateConnectedEndpoints(
276 *PosixPoller(), GetParam(), kNumConnections, GetPosixEE(), GetOracleEE());
277 std::vector<std::thread> threads;
278 // Create one thread for each connection. For each connection, create
279 // 2 more worker threads: to exchange and verify bi-directional data transfer.
280 threads.reserve(kNumConnections);
281 for (int i = 0; i < kNumConnections; i++) {
282 // For each connection, simulate a parallel bi-directional data transfer.
283 // All bi-directional transfers are run in parallel across all connections.
284 auto it = connections.begin();
285 auto client_endpoint = std::move((*it).client_endpoint);
286 auto server_endpoint = std::move((*it).server_endpoint);
287 EXPECT_NE(client_endpoint, nullptr);
288 EXPECT_NE(server_endpoint, nullptr);
289 connections.erase(it);
290 threads.emplace_back([client_endpoint = std::move(client_endpoint),
291 server_endpoint = std::move(server_endpoint)]() {
292 std::vector<std::thread> workers;
293 workers.reserve(2);
294 auto worker = [client_endpoint = client_endpoint.get(),
295 server_endpoint =
296 server_endpoint.get()](bool client_to_server) {
297 for (int i = 0; i < kNumExchangedMessages; i++) {
298 // If client_to_server is true, send from client to server and
299 // verify data read at the server. Otherwise send data from server
300 // to client and verify data read at client.
301 if (client_to_server) {
302 EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(),
303 client_endpoint, server_endpoint)
304 .ok());
305 } else {
306 EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(),
307 server_endpoint, client_endpoint)
308 .ok());
309 }
310 }
311 };
312 // worker[0] simulates a flow from client to server endpoint
313 workers.emplace_back([&worker]() { worker(true); });
314 // worker[1] simulates a flow from server to client endpoint
315 workers.emplace_back([&worker]() { worker(false); });
316 workers[0].join();
317 workers[1].join();
318 });
319 }
320 for (auto& t : threads) {
321 t.join();
322 }
323 worker->Wait();
324 }
325
326 // Test with zero copy enabled and disabled.
327 INSTANTIATE_TEST_SUITE_P(PosixEndpoint, PosixEndpointTest,
328 ::testing::ValuesIn({false, true}), &TestScenarioName);
329
330 } // namespace experimental
331 } // namespace grpc_event_engine
332
main(int argc,char ** argv)333 int main(int argc, char** argv) {
334 ::testing::InitGoogleTest(&argc, argv);
335 auto poll_strategy = grpc_core::ConfigVars::Get().PollStrategy();
336 auto strings = absl::StrSplit(poll_strategy, ',');
337 if (std::find(strings.begin(), strings.end(), "none") != strings.end()) {
338 // Skip the test entirely if poll strategy is none.
339 return 0;
340 }
341 // TODO(ctiller): EventEngine temporarily needs grpc to be initialized first
342 // until we clear out the iomgr shutdown code.
343 grpc_init();
344 int r = RUN_ALL_TESTS();
345 grpc_shutdown();
346 return r;
347 }
348