1 // Copyright 2022 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_TEST_CORE_EVENT_ENGINE_FUZZING_EVENT_ENGINE_FUZZING_EVENT_ENGINE_H 16 #define GRPC_TEST_CORE_EVENT_ENGINE_FUZZING_EVENT_ENGINE_FUZZING_EVENT_ENGINE_H 17 18 #include <stddef.h> 19 20 #include <chrono> 21 #include <cstdint> 22 #include <map> 23 #include <memory> 24 #include <queue> 25 #include <set> 26 #include <thread> 27 #include <utility> 28 #include <vector> 29 30 #include "absl/base/thread_annotations.h" 31 #include "absl/functional/any_invocable.h" 32 #include "absl/status/status.h" 33 #include "absl/status/statusor.h" 34 #include "absl/types/optional.h" 35 36 #include <grpc/event_engine/endpoint_config.h> 37 #include <grpc/event_engine/event_engine.h> 38 #include <grpc/event_engine/memory_allocator.h> 39 #include <grpc/event_engine/slice_buffer.h> 40 #include <grpc/support/time.h> 41 42 #include "src/core/lib/event_engine/time_util.h" 43 #include "src/core/lib/gprpp/no_destruct.h" 44 #include "src/core/lib/gprpp/sync.h" 45 #include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h" 46 #include "test/core/util/port.h" 47 48 namespace grpc_event_engine { 49 namespace experimental { 50 51 // EventEngine implementation to be used by fuzzers. 52 // It's only allowed to have one FuzzingEventEngine instantiated at a time. 53 class FuzzingEventEngine : public EventEngine { 54 public: 55 struct Options { 56 Duration max_delay_run_after = std::chrono::seconds(30); 57 Duration max_delay_write = std::chrono::seconds(30); 58 }; 59 explicit FuzzingEventEngine(Options options, 60 const fuzzing_event_engine::Actions& actions); ~FuzzingEventEngine()61 ~FuzzingEventEngine() override { UnsetGlobalHooks(); } 62 63 using Time = std::chrono::time_point<FuzzingEventEngine, Duration>; 64 65 // Once the fuzzing work is completed, this method should be called to speed 66 // quiescence. 67 void FuzzingDone() ABSL_LOCKS_EXCLUDED(mu_); 68 // Increment time once and perform any scheduled work. 69 void Tick(Duration max_time = std::chrono::seconds(600)) 70 ABSL_LOCKS_EXCLUDED(mu_); 71 // Repeatedly call Tick() until there is no more work to do. 72 void TickUntilIdle() ABSL_LOCKS_EXCLUDED(mu_); 73 // Tick until some time 74 void TickUntil(Time t) ABSL_LOCKS_EXCLUDED(mu_); 75 // Tick for some duration 76 void TickForDuration(Duration d) ABSL_LOCKS_EXCLUDED(mu_); 77 78 // Sets a callback to be invoked any time RunAfter() is called. 79 // Allows tests to verify the specified duration. 80 void SetRunAfterDurationCallback(absl::AnyInvocable<void(Duration)> callback); 81 82 absl::StatusOr<std::unique_ptr<Listener>> CreateListener( 83 Listener::AcceptCallback on_accept, 84 absl::AnyInvocable<void(absl::Status)> on_shutdown, 85 const EndpointConfig& config, 86 std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) 87 ABSL_LOCKS_EXCLUDED(mu_) override; 88 89 ConnectionHandle Connect(OnConnectCallback on_connect, 90 const ResolvedAddress& addr, 91 const EndpointConfig& args, 92 MemoryAllocator memory_allocator, Duration timeout) 93 ABSL_LOCKS_EXCLUDED(mu_) override; 94 95 bool CancelConnect(ConnectionHandle handle) ABSL_LOCKS_EXCLUDED(mu_) override; 96 97 bool IsWorkerThread() override; 98 99 absl::StatusOr<std::unique_ptr<DNSResolver>> GetDNSResolver( 100 const DNSResolver::ResolverOptions& options) override; 101 102 void Run(Closure* closure) ABSL_LOCKS_EXCLUDED(mu_) override; 103 void Run(absl::AnyInvocable<void()> closure) 104 ABSL_LOCKS_EXCLUDED(mu_) override; 105 TaskHandle RunAfter(Duration when, Closure* closure) 106 ABSL_LOCKS_EXCLUDED(mu_) override; 107 TaskHandle RunAfter(Duration when, absl::AnyInvocable<void()> closure) 108 ABSL_LOCKS_EXCLUDED(mu_) override; 109 bool Cancel(TaskHandle handle) ABSL_LOCKS_EXCLUDED(mu_) override; 110 111 TaskHandle RunAfterExactly(Duration when, absl::AnyInvocable<void()> closure) 112 ABSL_LOCKS_EXCLUDED(mu_); 113 114 Time Now() ABSL_LOCKS_EXCLUDED(mu_); 115 116 // Clear any global hooks installed by this event engine. Call prior to 117 // destruction to ensure no overlap between tests if constructing/destructing 118 // each test. 119 void UnsetGlobalHooks() ABSL_LOCKS_EXCLUDED(mu_); 120 max_delay_write()121 Duration max_delay_write() const { 122 return max_delay_[static_cast<int>(RunType::kWrite)]; 123 } 124 125 private: 126 enum class RunType { 127 kWrite, 128 kRunAfter, 129 kExact, 130 }; 131 132 // One pending task to be run. 133 struct Task { TaskTask134 Task(intptr_t id, absl::AnyInvocable<void()> closure) 135 : id(id), closure(std::move(closure)) {} 136 intptr_t id; 137 absl::AnyInvocable<void()> closure; 138 }; 139 140 // Per listener information. 141 // We keep a shared_ptr to this, one reference held by the FuzzingListener 142 // Listener implementation, and one reference in the event engine state, so it 143 // may be iterated through and inspected - principally to discover the ports 144 // on which this listener is listening. 145 struct ListenerInfo { ListenerInfoListenerInfo146 ListenerInfo( 147 Listener::AcceptCallback on_accept, 148 absl::AnyInvocable<void(absl::Status)> on_shutdown, 149 std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) 150 : on_accept(std::move(on_accept)), 151 on_shutdown(std::move(on_shutdown)), 152 memory_allocator_factory(std::move(memory_allocator_factory)), 153 started(false) {} 154 ~ListenerInfo() ABSL_LOCKS_EXCLUDED(mu_); 155 // The callback to invoke when a new connection is accepted. 156 Listener::AcceptCallback on_accept; 157 // The callback to invoke when the listener is shut down. 158 absl::AnyInvocable<void(absl::Status)> on_shutdown; 159 // The memory allocator factory to use for this listener. 160 const std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory; 161 // The ports on which this listener is listening. 162 std::vector<int> ports ABSL_GUARDED_BY(mu_); 163 // Has start been called on the listener? 164 // Used to emulate the Bind/Start semantics demanded by the API. 165 bool started ABSL_GUARDED_BY(mu_); 166 // The status to return via on_shutdown. 167 absl::Status shutdown_status ABSL_GUARDED_BY(mu_); 168 }; 169 170 // Implementation of Listener. 171 class FuzzingListener final : public Listener { 172 public: FuzzingListener(std::shared_ptr<ListenerInfo> info)173 explicit FuzzingListener(std::shared_ptr<ListenerInfo> info) 174 : info_(std::move(info)) {} 175 ~FuzzingListener() override; 176 absl::StatusOr<int> Bind(const ResolvedAddress& addr) override; 177 absl::Status Start() override; 178 179 private: 180 std::shared_ptr<ListenerInfo> info_; 181 }; 182 183 // One read that's outstanding. 184 struct PendingRead { 185 // Callback to invoke when the read completes. 186 absl::AnyInvocable<void(absl::Status)> on_read; 187 // The buffer to read into. 188 SliceBuffer* buffer; 189 }; 190 191 // The join between two Endpoint instances. 192 struct EndpointMiddle { 193 EndpointMiddle(int listener_port, int client_port) 194 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 195 // Address of each side of the endpoint. 196 const ResolvedAddress addrs[2]; 197 // Is the endpoint closed? 198 bool closed[2] ABSL_GUARDED_BY(mu_) = {false, false}; 199 // Is the endpoint writing? 200 bool writing[2] ABSL_GUARDED_BY(mu_) = {false, false}; 201 // Bytes written into each endpoint and awaiting a read. 202 std::vector<uint8_t> pending[2] ABSL_GUARDED_BY(mu_); 203 // The sizes of each accepted write, as determined by the fuzzer actions. 204 std::queue<size_t> write_sizes[2] ABSL_GUARDED_BY(mu_); 205 // The next read that's pending (or nullopt). 206 absl::optional<PendingRead> pending_read[2] ABSL_GUARDED_BY(mu_); 207 208 // Helper to take some bytes from data and queue them into pending[index]. 209 // Returns true if all bytes were consumed, false if more writes are needed. 210 bool Write(SliceBuffer* data, int index) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 211 }; 212 213 // Implementation of Endpoint. 214 // When a connection is formed, we create two of these - one with index 0, the 215 // other index 1, both pointing to the same EndpointMiddle. 216 class FuzzingEndpoint final : public Endpoint { 217 public: FuzzingEndpoint(std::shared_ptr<EndpointMiddle> middle,int index)218 FuzzingEndpoint(std::shared_ptr<EndpointMiddle> middle, int index) 219 : middle_(std::move(middle)), index_(index) {} 220 ~FuzzingEndpoint() override; 221 222 bool Read(absl::AnyInvocable<void(absl::Status)> on_read, 223 SliceBuffer* buffer, const ReadArgs* args) override; 224 bool Write(absl::AnyInvocable<void(absl::Status)> on_writable, 225 SliceBuffer* data, const WriteArgs* args) override; GetPeerAddress()226 const ResolvedAddress& GetPeerAddress() const override { 227 return middle_->addrs[peer_index()]; 228 } GetLocalAddress()229 const ResolvedAddress& GetLocalAddress() const override { 230 return middle_->addrs[my_index()]; 231 } 232 233 private: my_index()234 int my_index() const { return index_; } peer_index()235 int peer_index() const { return 1 - index_; } 236 // Schedule additional writes to be performed later. 237 // Takes a ref to middle instead of holding this, so that should the 238 // endpoint be destroyed we don't have to worry about use-after-free. 239 // Instead that scheduled callback will see the middle is closed and finally 240 // report completion to the caller. 241 // Since there is no timeliness contract for the completion of writes after 242 // endpoint shutdown, it's believed this is a legal implementation. 243 static void ScheduleDelayedWrite( 244 std::shared_ptr<EndpointMiddle> middle, int index, 245 absl::AnyInvocable<void(absl::Status)> on_writable, SliceBuffer* data) 246 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 247 const std::shared_ptr<EndpointMiddle> middle_; 248 const int index_; 249 }; 250 RunLocked(RunType run_type,absl::AnyInvocable<void ()> closure)251 void RunLocked(RunType run_type, absl::AnyInvocable<void()> closure) 252 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { 253 RunAfterLocked(run_type, Duration::zero(), std::move(closure)); 254 } 255 256 TaskHandle RunAfterLocked(RunType run_type, Duration when, 257 absl::AnyInvocable<void()> closure) 258 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 259 260 // Allocate a port. Considered fuzzer selected port orderings first, and then 261 // falls back to an exhaustive incremental search from port #1. 262 int AllocatePort() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 263 // Is the given port in use by any listener? 264 bool IsPortUsed(int port) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 265 // For the next connection being built, query the list of fuzzer selected 266 // write size limits. 267 std::queue<size_t> WriteSizesForConnection() 268 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 269 270 gpr_timespec NowAsTimespec(gpr_clock_type clock_type) 271 ABSL_EXCLUSIVE_LOCKS_REQUIRED(now_mu_); 272 static gpr_timespec GlobalNowImpl(gpr_clock_type clock_type) 273 ABSL_LOCKS_EXCLUDED(mu_); 274 275 static grpc_core::NoDestruct<grpc_core::Mutex> mu_; 276 static grpc_core::NoDestruct<grpc_core::Mutex> now_mu_ 277 ABSL_ACQUIRED_AFTER(mu_); 278 279 Duration exponential_gate_time_increment_ ABSL_GUARDED_BY(mu_) = 280 std::chrono::milliseconds(1); 281 const Duration max_delay_[2]; 282 intptr_t next_task_id_ ABSL_GUARDED_BY(mu_); 283 intptr_t current_tick_ ABSL_GUARDED_BY(now_mu_); 284 Time now_ ABSL_GUARDED_BY(now_mu_); 285 std::queue<Duration> task_delays_ ABSL_GUARDED_BY(mu_); 286 std::map<intptr_t, std::shared_ptr<Task>> tasks_by_id_ ABSL_GUARDED_BY(mu_); 287 std::multimap<Time, std::shared_ptr<Task>> tasks_by_time_ 288 ABSL_GUARDED_BY(mu_); 289 std::set<std::shared_ptr<ListenerInfo>> listeners_ ABSL_GUARDED_BY(mu_); 290 // Fuzzer selected port allocations. 291 std::queue<int> free_ports_ ABSL_GUARDED_BY(mu_); 292 // Next free port to allocate once fuzzer selections are exhausted. 293 int next_free_port_ ABSL_GUARDED_BY(mu_) = 1; 294 // Ports that were included in the fuzzer selected port orderings. 295 std::set<int> fuzzer_mentioned_ports_ ABSL_GUARDED_BY(mu_); 296 // Fuzzer selected write sizes for future connections - one picked off per 297 // WriteSizesForConnection() call. 298 std::queue<std::queue<size_t>> write_sizes_for_future_connections_ 299 ABSL_GUARDED_BY(mu_); 300 grpc_pick_port_functions previous_pick_port_functions_; 301 302 grpc_core::Mutex run_after_duration_callback_mu_; 303 absl::AnyInvocable<void(Duration)> run_after_duration_callback_ 304 ABSL_GUARDED_BY(run_after_duration_callback_mu_); 305 }; 306 307 class ThreadedFuzzingEventEngine : public FuzzingEventEngine { 308 public: ThreadedFuzzingEventEngine()309 ThreadedFuzzingEventEngine() 310 : ThreadedFuzzingEventEngine(std::chrono::milliseconds(10)) {} 311 ThreadedFuzzingEventEngine(Duration max_time)312 explicit ThreadedFuzzingEventEngine(Duration max_time) 313 : FuzzingEventEngine(FuzzingEventEngine::Options(), 314 fuzzing_event_engine::Actions()), 315 main_([this, max_time]() { 316 while (!done_.load()) { 317 absl::SleepFor(absl::Milliseconds( 318 grpc_event_engine::experimental::Milliseconds(max_time))); 319 Tick(); 320 } 321 }) {} 322 ~ThreadedFuzzingEventEngine()323 ~ThreadedFuzzingEventEngine() override { 324 done_.store(true); 325 main_.join(); 326 } 327 328 private: 329 std::atomic<bool> done_{false}; 330 std::thread main_; 331 }; 332 333 } // namespace experimental 334 } // namespace grpc_event_engine 335 336 #endif // GRPC_TEST_CORE_EVENT_ENGINE_FUZZING_EVENT_ENGINE_FUZZING_EVENT_ENGINE_H 337