1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <algorithm>
15 #include <memory>
16 #include <thread>
17 #include <vector>
18
19 #include "absl/functional/any_invocable.h"
20 #include "absl/status/status.h"
21 #include "absl/status/statusor.h"
22 #include "absl/time/clock.h"
23 #include "absl/time/time.h"
24 #include "gtest/gtest.h"
25
26 #include <grpc/event_engine/endpoint_config.h>
27 #include <grpc/event_engine/event_engine.h>
28 #include <grpc/event_engine/memory_allocator.h>
29 #include <grpc/grpc.h>
30 #include <grpc/support/port_platform.h>
31
32 #include "src/core/lib/event_engine/default_event_engine.h"
33 #include "test/core/util/test_config.h"
34
35 namespace {
36
37 using ::grpc_event_engine::experimental::EventEngine;
38 using ::grpc_event_engine::experimental::GetDefaultEventEngine;
39
40 class DefaultEngineTest : public testing::Test {
41 protected:
42 // Does nothing, fills space that a nullptr could not
43 class FakeEventEngine : public EventEngine {
44 public:
45 FakeEventEngine() = default;
46 ~FakeEventEngine() override = default;
CreateListener(Listener::AcceptCallback,absl::AnyInvocable<void (absl::Status)>,const grpc_event_engine::experimental::EndpointConfig &,std::unique_ptr<grpc_event_engine::experimental::MemoryAllocatorFactory>)47 absl::StatusOr<std::unique_ptr<Listener>> CreateListener(
48 Listener::AcceptCallback /* on_accept */,
49 absl::AnyInvocable<void(absl::Status)> /* on_shutdown */,
50 const grpc_event_engine::experimental::EndpointConfig& /* config */,
51 std::unique_ptr<
52 grpc_event_engine::experimental::
53 MemoryAllocatorFactory> /* memory_allocator_factory */)
54 override {
55 return absl::UnimplementedError("test");
56 };
Connect(OnConnectCallback,const ResolvedAddress &,const grpc_event_engine::experimental::EndpointConfig &,grpc_event_engine::experimental::MemoryAllocator,Duration)57 ConnectionHandle Connect(
58 OnConnectCallback /* on_connect */, const ResolvedAddress& /* addr */,
59 const grpc_event_engine::experimental::EndpointConfig& /* args */,
60 grpc_event_engine::experimental::MemoryAllocator /* memory_allocator */,
61 Duration /* timeout */) override {
62 return {-1, -1};
63 };
CancelConnect(ConnectionHandle)64 bool CancelConnect(ConnectionHandle /* handle */) override {
65 return false;
66 };
IsWorkerThread()67 bool IsWorkerThread() override { return false; };
GetDNSResolver(const DNSResolver::ResolverOptions &)68 absl::StatusOr<std::unique_ptr<DNSResolver>> GetDNSResolver(
69 const DNSResolver::ResolverOptions& /* options */) override {
70 return nullptr;
71 };
Run(Closure *)72 void Run(Closure* /* closure */) override{};
Run(absl::AnyInvocable<void ()>)73 void Run(absl::AnyInvocable<void()> /* closure */) override{};
RunAfter(Duration,Closure *)74 TaskHandle RunAfter(Duration /* when */, Closure* /* closure */) override {
75 return {-1, -1};
76 }
RunAfter(Duration,absl::AnyInvocable<void ()>)77 TaskHandle RunAfter(Duration /* when */,
78 absl::AnyInvocable<void()> /* closure */) override {
79 return {-1, -1};
80 }
Cancel(TaskHandle)81 bool Cancel(TaskHandle /* handle */) override { return false; };
82 };
83 };
84
TEST_F(DefaultEngineTest,SharedPtrGlobalEventEngineLifetimesAreValid)85 TEST_F(DefaultEngineTest, SharedPtrGlobalEventEngineLifetimesAreValid) {
86 int create_count = 0;
87 grpc_event_engine::experimental::SetEventEngineFactory([&create_count] {
88 ++create_count;
89 return std::make_unique<FakeEventEngine>();
90 });
91 std::shared_ptr<EventEngine> ee2;
92 {
93 std::shared_ptr<EventEngine> ee1 = GetDefaultEventEngine();
94 ASSERT_EQ(1, create_count);
95 ee2 = GetDefaultEventEngine();
96 ASSERT_EQ(1, create_count);
97 ASSERT_EQ(ee2.use_count(), 2);
98 }
99 // Ensure the first shared_ptr did not delete the global
100 ASSERT_TRUE(ee2.unique());
101 ASSERT_FALSE(ee2->IsWorkerThread()); // useful for ASAN
102 // destroy the global engine via the last shared_ptr, and create a new one.
103 ee2.reset();
104 ee2 = GetDefaultEventEngine();
105 ASSERT_EQ(2, create_count);
106 ASSERT_TRUE(ee2.unique());
107 grpc_event_engine::experimental::EventEngineFactoryReset();
108 }
109
TEST_F(DefaultEngineTest,StressTestSharedPtr)110 TEST_F(DefaultEngineTest, StressTestSharedPtr) {
111 constexpr int thread_count = 13;
112 constexpr absl::Duration spin_time = absl::Seconds(3);
113 std::vector<std::thread> threads;
114 threads.reserve(thread_count);
115 for (int i = 0; i < thread_count; i++) {
116 threads.emplace_back([&spin_time] {
117 auto timeout = absl::Now() + spin_time;
118 do {
119 GetDefaultEventEngine().reset();
120 } while (timeout > absl::Now());
121 });
122 }
123 for (auto& thd : threads) {
124 thd.join();
125 }
126 }
127 } // namespace
128
main(int argc,char ** argv)129 int main(int argc, char** argv) {
130 testing::InitGoogleTest(&argc, argv);
131 grpc::testing::TestEnvironment env(&argc, argv);
132 grpc_init();
133 auto result = RUN_ALL_TESTS();
134 grpc_shutdown();
135 return result;
136 }
137