1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
16
17 #include <inttypes.h>
18 #include <stdlib.h>
19
20 #include <algorithm>
21 #include <chrono>
22 #include <limits>
23 #include <vector>
24
25 #include "absl/memory/memory.h"
26 #include "absl/strings/str_cat.h"
27
28 #include <grpc/event_engine/slice.h>
29 #include <grpc/support/log.h>
30 #include <grpc/support/time.h>
31
32 #include "src/core/lib/debug/stats.h"
33 #include "src/core/lib/debug/trace.h"
34 #include "src/core/lib/event_engine/tcp_socket_utils.h"
35 #include "src/core/lib/gpr/useful.h"
36 #include "src/core/lib/gprpp/time.h"
37 #include "src/core/lib/iomgr/port.h"
38 #include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h"
39 #include "test/core/util/port.h"
40
41 #if defined(GRPC_POSIX_SOCKET_TCP)
42 #include "src/core/lib/event_engine/posix_engine/native_posix_dns_resolver.h"
43 #else
44 #include "src/core/lib/gprpp/crash.h"
45 #endif
46 // IWYU pragma: no_include <sys/socket.h>
47
48 extern gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type);
49
50 static grpc_core::TraceFlag trace_writes(false, "fuzzing_ee_writes");
51 static grpc_core::TraceFlag trace_timers(false, "fuzzing_ee_timers");
52
53 using namespace std::chrono_literals;
54
55 namespace grpc_event_engine {
56 namespace experimental {
57
58 namespace {
59
60 constexpr EventEngine::Duration kOneYear = 8760h;
61
62 // Inside the fuzzing event engine we consider everything is bound to a single
63 // loopback device. It cannot reach any other devices, and shares all ports
64 // between ipv4 and ipv6.
65
PortToAddress(int port)66 EventEngine::ResolvedAddress PortToAddress(int port) {
67 return URIToResolvedAddress(absl::StrCat("ipv4:127.0.0.1:", port)).value();
68 }
69
70 } // namespace
71
72 grpc_core::NoDestruct<grpc_core::Mutex> FuzzingEventEngine::mu_;
73 grpc_core::NoDestruct<grpc_core::Mutex> FuzzingEventEngine::now_mu_;
74
75 namespace {
76 const intptr_t kTaskHandleSalt = 12345;
77 FuzzingEventEngine* g_fuzzing_event_engine = nullptr;
78 gpr_timespec (*g_orig_gpr_now_impl)(gpr_clock_type clock_type);
79 } // namespace
80
FuzzingEventEngine(Options options,const fuzzing_event_engine::Actions & actions)81 FuzzingEventEngine::FuzzingEventEngine(
82 Options options, const fuzzing_event_engine::Actions& actions)
83 : max_delay_{options.max_delay_write, options.max_delay_run_after} {
84 tasks_by_id_.clear();
85 tasks_by_time_.clear();
86 next_task_id_ = 1;
87 current_tick_ = 0;
88 // Start at 5 seconds after the epoch.
89 // This needs to be more than 1, and otherwise is kind of arbitrary.
90 // The grpc_core::Timer code special cases the zero second time period after
91 // epoch to allow for some fancy atomic stuff.
92 now_ = Time() + std::chrono::seconds(5);
93
94 // Allow the fuzzer to assign ports.
95 // Once this list is exhausted, we fall back to a deterministic algorithm.
96 for (auto port : actions.assign_ports()) {
97 if (port == 0 || port > 65535) continue;
98 free_ports_.push(port);
99 fuzzer_mentioned_ports_.insert(port);
100 }
101
102 // Fill the write sizes queue for future connections.
103 for (const auto& connection : actions.connections()) {
104 std::queue<size_t> write_sizes;
105 for (auto size : connection.write_size()) {
106 write_sizes.push(size);
107 }
108 write_sizes_for_future_connections_.emplace(std::move(write_sizes));
109 }
110
111 // Whilst a fuzzing EventEngine is active we override grpc's now function.
112 g_orig_gpr_now_impl = gpr_now_impl;
113 gpr_now_impl = GlobalNowImpl;
114 GPR_ASSERT(g_fuzzing_event_engine == nullptr);
115 g_fuzzing_event_engine = this;
116 grpc_core::TestOnlySetProcessEpoch(NowAsTimespec(GPR_CLOCK_MONOTONIC));
117
118 for (const auto& delay_ns : actions.run_delay()) {
119 Duration delay = std::chrono::nanoseconds(delay_ns);
120 task_delays_.push(delay);
121 }
122
123 previous_pick_port_functions_ = grpc_set_pick_port_functions(
__anond85b8d310302() 124 grpc_pick_port_functions{+[]() -> int {
125 grpc_core::MutexLock lock(&*mu_);
126 return g_fuzzing_event_engine->AllocatePort();
127 },
128 +[](int) {}});
129 }
130
FuzzingDone()131 void FuzzingEventEngine::FuzzingDone() {
132 grpc_core::MutexLock lock(&*mu_);
133 while (!task_delays_.empty()) task_delays_.pop();
134 }
135
NowAsTimespec(gpr_clock_type clock_type)136 gpr_timespec FuzzingEventEngine::NowAsTimespec(gpr_clock_type clock_type) {
137 // TODO(ctiller): add a facility to track realtime and monotonic clocks
138 // separately to simulate divergence.
139 GPR_ASSERT(clock_type != GPR_TIMESPAN);
140 const Duration d = now_.time_since_epoch();
141 auto secs = std::chrono::duration_cast<std::chrono::seconds>(d);
142 return {secs.count(), static_cast<int32_t>((d - secs).count()), clock_type};
143 }
144
Tick(Duration max_time)145 void FuzzingEventEngine::Tick(Duration max_time) {
146 bool incremented_time = false;
147 while (true) {
148 std::vector<absl::AnyInvocable<void()>> to_run;
149 {
150 grpc_core::MutexLock lock(&*mu_);
151 grpc_core::MutexLock now_lock(&*now_mu_);
152 if (!incremented_time) {
153 Duration incr = max_time;
154 // TODO(ctiller): look at tasks_by_time_ and jump forward (once iomgr
155 // timers are gone)
156 if (!tasks_by_time_.empty()) {
157 incr = std::min(incr, tasks_by_time_.begin()->first - now_);
158 }
159 if (incr < exponential_gate_time_increment_) {
160 exponential_gate_time_increment_ = std::chrono::milliseconds(1);
161 } else {
162 incr = std::min(incr, exponential_gate_time_increment_);
163 exponential_gate_time_increment_ +=
164 exponential_gate_time_increment_ / 1000;
165 }
166 incr = std::max(incr, std::chrono::duration_cast<Duration>(
167 std::chrono::milliseconds(1)));
168 now_ += incr;
169 GPR_ASSERT(now_.time_since_epoch().count() >= 0);
170 ++current_tick_;
171 incremented_time = true;
172 }
173 // Find newly expired timers.
174 while (!tasks_by_time_.empty() && tasks_by_time_.begin()->first <= now_) {
175 auto& task = *tasks_by_time_.begin()->second;
176 tasks_by_id_.erase(task.id);
177 if (task.closure != nullptr) {
178 to_run.push_back(std::move(task.closure));
179 }
180 tasks_by_time_.erase(tasks_by_time_.begin());
181 }
182 }
183 if (to_run.empty()) return;
184 for (auto& closure : to_run) {
185 closure();
186 }
187 }
188 }
189
TickUntilIdle()190 void FuzzingEventEngine::TickUntilIdle() {
191 while (true) {
192 {
193 grpc_core::MutexLock lock(&*mu_);
194 if (tasks_by_id_.empty()) return;
195 }
196 Tick();
197 }
198 }
199
TickUntil(Time t)200 void FuzzingEventEngine::TickUntil(Time t) {
201 while (true) {
202 auto now = Now();
203 if (now >= t) break;
204 Tick(t - now);
205 }
206 }
207
TickForDuration(Duration d)208 void FuzzingEventEngine::TickForDuration(Duration d) { TickUntil(Now() + d); }
209
SetRunAfterDurationCallback(absl::AnyInvocable<void (Duration)> callback)210 void FuzzingEventEngine::SetRunAfterDurationCallback(
211 absl::AnyInvocable<void(Duration)> callback) {
212 grpc_core::MutexLock lock(&run_after_duration_callback_mu_);
213 run_after_duration_callback_ = std::move(callback);
214 }
215
Now()216 FuzzingEventEngine::Time FuzzingEventEngine::Now() {
217 grpc_core::MutexLock lock(&*now_mu_);
218 return now_;
219 }
220
AllocatePort()221 int FuzzingEventEngine::AllocatePort() {
222 // If the fuzzer selected some port orderings, do that first.
223 if (!free_ports_.empty()) {
224 int p = free_ports_.front();
225 free_ports_.pop();
226 return p;
227 }
228 // Otherwise just scan through starting at one and skipping any ports
229 // that were in the fuzzers initial list.
230 while (true) {
231 int p = next_free_port_++;
232 if (fuzzer_mentioned_ports_.count(p) == 0) {
233 return p;
234 }
235 }
236 }
237
238 absl::StatusOr<std::unique_ptr<EventEngine::Listener>>
CreateListener(Listener::AcceptCallback on_accept,absl::AnyInvocable<void (absl::Status)> on_shutdown,const EndpointConfig &,std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)239 FuzzingEventEngine::CreateListener(
240 Listener::AcceptCallback on_accept,
241 absl::AnyInvocable<void(absl::Status)> on_shutdown, const EndpointConfig&,
242 std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) {
243 grpc_core::MutexLock lock(&*mu_);
244 // Create a listener and register it into the set of listener info in the
245 // event engine.
246 return absl::make_unique<FuzzingListener>(
247 *listeners_
248 .emplace(std::make_shared<ListenerInfo>(
249 std::move(on_accept), std::move(on_shutdown),
250 std::move(memory_allocator_factory)))
251 .first);
252 }
253
~FuzzingListener()254 FuzzingEventEngine::FuzzingListener::~FuzzingListener() {
255 grpc_core::MutexLock lock(&*mu_);
256 g_fuzzing_event_engine->listeners_.erase(info_);
257 }
258
IsPortUsed(int port)259 bool FuzzingEventEngine::IsPortUsed(int port) {
260 // Return true if a port is bound to a listener.
261 for (const auto& listener : listeners_) {
262 if (std::find(listener->ports.begin(), listener->ports.end(), port) !=
263 listener->ports.end()) {
264 return true;
265 }
266 }
267 return false;
268 }
269
Bind(const ResolvedAddress & addr)270 absl::StatusOr<int> FuzzingEventEngine::FuzzingListener::Bind(
271 const ResolvedAddress& addr) {
272 // Extract the port from the address (or fail if non-localhost).
273 auto port = ResolvedAddressGetPort(addr);
274 grpc_core::MutexLock lock(&*mu_);
275 // Check that the listener hasn't already been started.
276 if (info_->started) return absl::InternalError("Already started");
277 if (port != 0) {
278 // If the port is non-zero, check that it's not already in use.
279 if (g_fuzzing_event_engine->IsPortUsed(port)) {
280 return absl::InternalError("Port in use");
281 }
282 } else {
283 // If the port is zero, allocate a new one.
284 do {
285 port = g_fuzzing_event_engine->AllocatePort();
286 } while (g_fuzzing_event_engine->IsPortUsed(port));
287 }
288 // Add the port to the listener.
289 info_->ports.push_back(port);
290 return port;
291 }
292
Start()293 absl::Status FuzzingEventEngine::FuzzingListener::Start() {
294 // Start the listener or fail if it's already started.
295 grpc_core::MutexLock lock(&*mu_);
296 if (info_->started) return absl::InternalError("Already started");
297 info_->started = true;
298 return absl::OkStatus();
299 }
300
Write(SliceBuffer * data,int index)301 bool FuzzingEventEngine::EndpointMiddle::Write(SliceBuffer* data, int index) {
302 GPR_ASSERT(!closed[index]);
303 const int peer_index = 1 - index;
304 if (data->Length() == 0) return true;
305 size_t write_len = std::numeric_limits<size_t>::max();
306 // Check the write_sizes queue for fuzzer imposed restrictions on this write
307 // size. This allows the fuzzer to force small writes to be seen by the
308 // reader.
309 if (!write_sizes[index].empty()) {
310 write_len = write_sizes[index].front();
311 write_sizes[index].pop();
312 }
313 if (write_len > data->Length()) {
314 write_len = data->Length();
315 }
316 // If the write_len is zero, we still need to write something, so we write one
317 // byte.
318 if (write_len == 0) write_len = 1;
319 if (trace_writes.enabled()) {
320 gpr_log(GPR_INFO, "WRITE[%p:%d]: %" PRIdPTR " bytes", this, index,
321 write_len);
322 }
323 // Expand the pending buffer.
324 size_t prev_len = pending[index].size();
325 pending[index].resize(prev_len + write_len);
326 // Move bytes from the to-write data into the pending buffer.
327 data->MoveFirstNBytesIntoBuffer(write_len, pending[index].data() + prev_len);
328 // If there was a pending read, then we can fulfill it.
329 if (pending_read[peer_index].has_value()) {
330 pending_read[peer_index]->buffer->Append(
331 Slice::FromCopiedBuffer(pending[index]));
332 pending[index].clear();
333 g_fuzzing_event_engine->RunLocked(
334 RunType::kWrite,
335 [cb = std::move(pending_read[peer_index]->on_read)]() mutable {
336 cb(absl::OkStatus());
337 });
338 pending_read[peer_index].reset();
339 }
340 return data->Length() == 0;
341 }
342
Write(absl::AnyInvocable<void (absl::Status)> on_writable,SliceBuffer * data,const WriteArgs *)343 bool FuzzingEventEngine::FuzzingEndpoint::Write(
344 absl::AnyInvocable<void(absl::Status)> on_writable, SliceBuffer* data,
345 const WriteArgs*) {
346 grpc_core::global_stats().IncrementSyscallWrite();
347 grpc_core::MutexLock lock(&*mu_);
348 GPR_ASSERT(!middle_->closed[my_index()]);
349 GPR_ASSERT(!middle_->writing[my_index()]);
350 // If the write succeeds immediately, then we return true.
351 if (middle_->Write(data, my_index())) return true;
352 middle_->writing[my_index()] = true;
353 ScheduleDelayedWrite(middle_, my_index(), std::move(on_writable), data);
354 return false;
355 }
356
ScheduleDelayedWrite(std::shared_ptr<EndpointMiddle> middle,int index,absl::AnyInvocable<void (absl::Status)> on_writable,SliceBuffer * data)357 void FuzzingEventEngine::FuzzingEndpoint::ScheduleDelayedWrite(
358 std::shared_ptr<EndpointMiddle> middle, int index,
359 absl::AnyInvocable<void(absl::Status)> on_writable, SliceBuffer* data) {
360 g_fuzzing_event_engine->RunLocked(
361 RunType::kWrite, [middle = std::move(middle), index, data,
362 on_writable = std::move(on_writable)]() mutable {
363 grpc_core::ReleasableMutexLock lock(&*mu_);
364 GPR_ASSERT(middle->writing[index]);
365 if (middle->closed[index]) {
366 g_fuzzing_event_engine->RunLocked(
367 RunType::kRunAfter,
368 [on_writable = std::move(on_writable)]() mutable {
369 on_writable(absl::InternalError("Endpoint closed"));
370 });
371 return;
372 }
373 if (middle->Write(data, index)) {
374 middle->writing[index] = false;
375 lock.Release();
376 on_writable(absl::OkStatus());
377 return;
378 }
379 ScheduleDelayedWrite(std::move(middle), index, std::move(on_writable),
380 data);
381 });
382 }
383
~FuzzingEndpoint()384 FuzzingEventEngine::FuzzingEndpoint::~FuzzingEndpoint() {
385 grpc_core::MutexLock lock(&*mu_);
386 middle_->closed[my_index()] = true;
387 if (middle_->pending_read[my_index()].has_value()) {
388 g_fuzzing_event_engine->RunLocked(
389 RunType::kRunAfter,
390 [cb = std::move(middle_->pending_read[my_index()]->on_read)]() mutable {
391 cb(absl::InternalError("Endpoint closed"));
392 });
393 middle_->pending_read[my_index()].reset();
394 }
395 if (!middle_->writing[peer_index()] &&
396 middle_->pending_read[peer_index()].has_value()) {
397 g_fuzzing_event_engine->RunLocked(
398 RunType::kRunAfter,
399 [cb = std::move(
400 middle_->pending_read[peer_index()]->on_read)]() mutable {
401 cb(absl::InternalError("Endpoint closed"));
402 });
403 middle_->pending_read[peer_index()].reset();
404 }
405 }
406
Read(absl::AnyInvocable<void (absl::Status)> on_read,SliceBuffer * buffer,const ReadArgs *)407 bool FuzzingEventEngine::FuzzingEndpoint::Read(
408 absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer,
409 const ReadArgs*) {
410 buffer->Clear();
411 grpc_core::MutexLock lock(&*mu_);
412 GPR_ASSERT(!middle_->closed[my_index()]);
413 if (middle_->pending[peer_index()].empty()) {
414 // If the endpoint is closed, fail asynchronously.
415 if (middle_->closed[peer_index()]) {
416 g_fuzzing_event_engine->RunLocked(
417 RunType::kRunAfter, [on_read = std::move(on_read)]() mutable {
418 on_read(absl::InternalError("Endpoint closed"));
419 });
420 return false;
421 }
422 // If the endpoint has no pending data, then we need to wait for a write.
423 middle_->pending_read[my_index()] = PendingRead{std::move(on_read), buffer};
424 return false;
425 } else {
426 // If the endpoint has pending data, then we can fulfill the read
427 // immediately.
428 buffer->Append(Slice::FromCopiedBuffer(middle_->pending[peer_index()]));
429 middle_->pending[peer_index()].clear();
430 return true;
431 }
432 }
433
WriteSizesForConnection()434 std::queue<size_t> FuzzingEventEngine::WriteSizesForConnection() {
435 if (write_sizes_for_future_connections_.empty()) return std::queue<size_t>();
436 auto ret = std::move(write_sizes_for_future_connections_.front());
437 write_sizes_for_future_connections_.pop();
438 return ret;
439 }
440
EndpointMiddle(int listener_port,int client_port)441 FuzzingEventEngine::EndpointMiddle::EndpointMiddle(int listener_port,
442 int client_port)
443 : addrs{PortToAddress(listener_port), PortToAddress(client_port)},
444 write_sizes{g_fuzzing_event_engine->WriteSizesForConnection(),
445 g_fuzzing_event_engine->WriteSizesForConnection()} {}
446
Connect(OnConnectCallback on_connect,const ResolvedAddress & addr,const EndpointConfig &,MemoryAllocator,Duration)447 EventEngine::ConnectionHandle FuzzingEventEngine::Connect(
448 OnConnectCallback on_connect, const ResolvedAddress& addr,
449 const EndpointConfig&, MemoryAllocator, Duration) {
450 // TODO(ctiller): do something with the timeout
451 // Schedule a timer to run (with some fuzzer selected delay) the on_connect
452 // callback.
453 grpc_core::MutexLock lock(&*mu_);
454 auto task_handle = RunAfterLocked(
455 RunType::kRunAfter, Duration(0),
456 [this, addr, on_connect = std::move(on_connect)]() mutable {
457 // Check for a legal address and extract the target port number.
458 auto port = ResolvedAddressGetPort(addr);
459 grpc_core::MutexLock lock(&*mu_);
460 // Find the listener that is listening on the target port.
461 for (auto it = listeners_.begin(); it != listeners_.end(); ++it) {
462 const auto& listener = *it;
463 // Listener must be started.
464 if (!listener->started) continue;
465 for (int listener_port : listener->ports) {
466 if (port == listener_port) {
467 // Port matches on a started listener: create an endpoint, call
468 // on_accept for the listener and on_connect for the client.
469 auto middle = std::make_shared<EndpointMiddle>(
470 listener_port, g_fuzzing_event_engine->AllocatePort());
471 auto ep1 = std::make_unique<FuzzingEndpoint>(middle, 0);
472 auto ep2 = std::make_unique<FuzzingEndpoint>(middle, 1);
473 RunLocked(RunType::kRunAfter, [listener,
474 ep1 = std::move(ep1)]() mutable {
475 listener->on_accept(
476 std::move(ep1),
477 listener->memory_allocator_factory->CreateMemoryAllocator(
478 "fuzzing"));
479 });
480 RunLocked(RunType::kRunAfter, [on_connect = std::move(on_connect),
481 ep2 = std::move(ep2)]() mutable {
482 on_connect(std::move(ep2));
483 });
484 return;
485 }
486 }
487 }
488 // Fail: no such listener.
489 RunLocked(RunType::kRunAfter,
490 [on_connect = std::move(on_connect)]() mutable {
491 on_connect(absl::InvalidArgumentError("No listener found"));
492 });
493 });
494 return ConnectionHandle{{task_handle.keys[0], task_handle.keys[1]}};
495 }
496
CancelConnect(ConnectionHandle connection_handle)497 bool FuzzingEventEngine::CancelConnect(ConnectionHandle connection_handle) {
498 return Cancel(
499 TaskHandle{{connection_handle.keys[0], connection_handle.keys[1]}});
500 }
501
IsWorkerThread()502 bool FuzzingEventEngine::IsWorkerThread() { abort(); }
503
504 absl::StatusOr<std::unique_ptr<EventEngine::DNSResolver>>
GetDNSResolver(const DNSResolver::ResolverOptions &)505 FuzzingEventEngine::GetDNSResolver(const DNSResolver::ResolverOptions&) {
506 #if defined(GRPC_POSIX_SOCKET_TCP)
507 return std::make_unique<NativePosixDNSResolver>(shared_from_this());
508 #else
509 grpc_core::Crash("FuzzingEventEngine::GetDNSResolver Not implemented");
510 #endif
511 }
512
Run(Closure * closure)513 void FuzzingEventEngine::Run(Closure* closure) {
514 grpc_core::MutexLock lock(&*mu_);
515 RunAfterLocked(RunType::kRunAfter, Duration::zero(),
516 [closure]() { closure->Run(); });
517 }
518
Run(absl::AnyInvocable<void ()> closure)519 void FuzzingEventEngine::Run(absl::AnyInvocable<void()> closure) {
520 grpc_core::MutexLock lock(&*mu_);
521 RunAfterLocked(RunType::kRunAfter, Duration::zero(), std::move(closure));
522 }
523
RunAfter(Duration when,Closure * closure)524 EventEngine::TaskHandle FuzzingEventEngine::RunAfter(Duration when,
525 Closure* closure) {
526 return RunAfter(when, [closure]() { closure->Run(); });
527 }
528
RunAfter(Duration when,absl::AnyInvocable<void ()> closure)529 EventEngine::TaskHandle FuzzingEventEngine::RunAfter(
530 Duration when, absl::AnyInvocable<void()> closure) {
531 {
532 grpc_core::MutexLock lock(&run_after_duration_callback_mu_);
533 if (run_after_duration_callback_ != nullptr) {
534 run_after_duration_callback_(when);
535 }
536 }
537 grpc_core::MutexLock lock(&*mu_);
538 // (b/258949216): Cap it to one year to avoid integer overflow errors.
539 return RunAfterLocked(RunType::kRunAfter, std::min(when, kOneYear),
540 std::move(closure));
541 }
542
RunAfterExactly(Duration when,absl::AnyInvocable<void ()> closure)543 EventEngine::TaskHandle FuzzingEventEngine::RunAfterExactly(
544 Duration when, absl::AnyInvocable<void()> closure) {
545 grpc_core::MutexLock lock(&*mu_);
546 // (b/258949216): Cap it to one year to avoid integer overflow errors.
547 return RunAfterLocked(RunType::kExact, std::min(when, kOneYear),
548 std::move(closure));
549 }
550
RunAfterLocked(RunType run_type,Duration when,absl::AnyInvocable<void ()> closure)551 EventEngine::TaskHandle FuzzingEventEngine::RunAfterLocked(
552 RunType run_type, Duration when, absl::AnyInvocable<void()> closure) {
553 const intptr_t id = next_task_id_;
554 ++next_task_id_;
555 Duration delay_taken = Duration::zero();
556 if (run_type != RunType::kExact) {
557 if (!task_delays_.empty()) {
558 delay_taken = grpc_core::Clamp(task_delays_.front(), Duration::zero(),
559 max_delay_[static_cast<int>(run_type)]);
560 task_delays_.pop();
561 } else if (run_type != RunType::kWrite && when == Duration::zero()) {
562 // For zero-duration events, if there is no more delay input from
563 // the test case, we default to a small non-zero value to avoid
564 // busy loops that prevent us from making forward progress.
565 delay_taken = std::chrono::microseconds(1);
566 }
567 when += delay_taken;
568 }
569 auto task = std::make_shared<Task>(id, std::move(closure));
570 tasks_by_id_.emplace(id, task);
571 Time final_time;
572 Time now;
573 {
574 grpc_core::MutexLock lock(&*now_mu_);
575 final_time = now_ + when;
576 now = now_;
577 tasks_by_time_.emplace(final_time, std::move(task));
578 }
579 if (trace_timers.enabled()) {
580 gpr_log(GPR_INFO,
581 "Schedule timer %" PRIx64 " @ %" PRIu64 " (now=%" PRIu64
582 "; delay=%" PRIu64 "; fuzzing_added=%" PRIu64 "; type=%d)",
583 id, static_cast<uint64_t>(final_time.time_since_epoch().count()),
584 now.time_since_epoch().count(), when.count(), delay_taken.count(),
585 static_cast<int>(run_type));
586 }
587 return TaskHandle{id, kTaskHandleSalt};
588 }
589
Cancel(TaskHandle handle)590 bool FuzzingEventEngine::Cancel(TaskHandle handle) {
591 grpc_core::MutexLock lock(&*mu_);
592 GPR_ASSERT(handle.keys[1] == kTaskHandleSalt);
593 const intptr_t id = handle.keys[0];
594 auto it = tasks_by_id_.find(id);
595 if (it == tasks_by_id_.end()) {
596 return false;
597 }
598 if (it->second->closure == nullptr) {
599 return false;
600 }
601 if (trace_timers.enabled()) {
602 gpr_log(GPR_INFO, "Cancel timer %" PRIx64, id);
603 }
604 it->second->closure = nullptr;
605 return true;
606 }
607
GlobalNowImpl(gpr_clock_type clock_type)608 gpr_timespec FuzzingEventEngine::GlobalNowImpl(gpr_clock_type clock_type) {
609 if (g_fuzzing_event_engine == nullptr) {
610 return gpr_inf_future(clock_type);
611 }
612 GPR_ASSERT(g_fuzzing_event_engine != nullptr);
613 grpc_core::MutexLock lock(&*now_mu_);
614 return g_fuzzing_event_engine->NowAsTimespec(clock_type);
615 }
616
UnsetGlobalHooks()617 void FuzzingEventEngine::UnsetGlobalHooks() {
618 if (g_fuzzing_event_engine != this) return;
619 g_fuzzing_event_engine = nullptr;
620 gpr_now_impl = g_orig_gpr_now_impl;
621 g_orig_gpr_now_impl = nullptr;
622 grpc_set_pick_port_functions(previous_pick_port_functions_);
623 }
624
~ListenerInfo()625 FuzzingEventEngine::ListenerInfo::~ListenerInfo() {
626 GPR_ASSERT(g_fuzzing_event_engine != nullptr);
627 g_fuzzing_event_engine->Run(
628 [on_shutdown = std::move(on_shutdown),
629 shutdown_status = std::move(shutdown_status)]() mutable {
630 on_shutdown(std::move(shutdown_status));
631 });
632 }
633
634 } // namespace experimental
635 } // namespace grpc_event_engine
636