xref: /aosp_15_r20/external/grpc-grpc/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
16 
17 #include <inttypes.h>
18 #include <stdlib.h>
19 
20 #include <algorithm>
21 #include <chrono>
22 #include <limits>
23 #include <vector>
24 
25 #include "absl/memory/memory.h"
26 #include "absl/strings/str_cat.h"
27 
28 #include <grpc/event_engine/slice.h>
29 #include <grpc/support/log.h>
30 #include <grpc/support/time.h>
31 
32 #include "src/core/lib/debug/stats.h"
33 #include "src/core/lib/debug/trace.h"
34 #include "src/core/lib/event_engine/tcp_socket_utils.h"
35 #include "src/core/lib/gpr/useful.h"
36 #include "src/core/lib/gprpp/time.h"
37 #include "src/core/lib/iomgr/port.h"
38 #include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h"
39 #include "test/core/util/port.h"
40 
41 #if defined(GRPC_POSIX_SOCKET_TCP)
42 #include "src/core/lib/event_engine/posix_engine/native_posix_dns_resolver.h"
43 #else
44 #include "src/core/lib/gprpp/crash.h"
45 #endif
46 // IWYU pragma: no_include <sys/socket.h>
47 
48 extern gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type);
49 
50 static grpc_core::TraceFlag trace_writes(false, "fuzzing_ee_writes");
51 static grpc_core::TraceFlag trace_timers(false, "fuzzing_ee_timers");
52 
53 using namespace std::chrono_literals;
54 
55 namespace grpc_event_engine {
56 namespace experimental {
57 
58 namespace {
59 
60 constexpr EventEngine::Duration kOneYear = 8760h;
61 
62 // Inside the fuzzing event engine we consider everything is bound to a single
63 // loopback device. It cannot reach any other devices, and shares all ports
64 // between ipv4 and ipv6.
65 
PortToAddress(int port)66 EventEngine::ResolvedAddress PortToAddress(int port) {
67   return URIToResolvedAddress(absl::StrCat("ipv4:127.0.0.1:", port)).value();
68 }
69 
70 }  // namespace
71 
72 grpc_core::NoDestruct<grpc_core::Mutex> FuzzingEventEngine::mu_;
73 grpc_core::NoDestruct<grpc_core::Mutex> FuzzingEventEngine::now_mu_;
74 
75 namespace {
76 const intptr_t kTaskHandleSalt = 12345;
77 FuzzingEventEngine* g_fuzzing_event_engine = nullptr;
78 gpr_timespec (*g_orig_gpr_now_impl)(gpr_clock_type clock_type);
79 }  // namespace
80 
FuzzingEventEngine(Options options,const fuzzing_event_engine::Actions & actions)81 FuzzingEventEngine::FuzzingEventEngine(
82     Options options, const fuzzing_event_engine::Actions& actions)
83     : max_delay_{options.max_delay_write, options.max_delay_run_after} {
84   tasks_by_id_.clear();
85   tasks_by_time_.clear();
86   next_task_id_ = 1;
87   current_tick_ = 0;
88   // Start at 5 seconds after the epoch.
89   // This needs to be more than 1, and otherwise is kind of arbitrary.
90   // The grpc_core::Timer code special cases the zero second time period after
91   // epoch to allow for some fancy atomic stuff.
92   now_ = Time() + std::chrono::seconds(5);
93 
94   // Allow the fuzzer to assign ports.
95   // Once this list is exhausted, we fall back to a deterministic algorithm.
96   for (auto port : actions.assign_ports()) {
97     if (port == 0 || port > 65535) continue;
98     free_ports_.push(port);
99     fuzzer_mentioned_ports_.insert(port);
100   }
101 
102   // Fill the write sizes queue for future connections.
103   for (const auto& connection : actions.connections()) {
104     std::queue<size_t> write_sizes;
105     for (auto size : connection.write_size()) {
106       write_sizes.push(size);
107     }
108     write_sizes_for_future_connections_.emplace(std::move(write_sizes));
109   }
110 
111   // Whilst a fuzzing EventEngine is active we override grpc's now function.
112   g_orig_gpr_now_impl = gpr_now_impl;
113   gpr_now_impl = GlobalNowImpl;
114   GPR_ASSERT(g_fuzzing_event_engine == nullptr);
115   g_fuzzing_event_engine = this;
116   grpc_core::TestOnlySetProcessEpoch(NowAsTimespec(GPR_CLOCK_MONOTONIC));
117 
118   for (const auto& delay_ns : actions.run_delay()) {
119     Duration delay = std::chrono::nanoseconds(delay_ns);
120     task_delays_.push(delay);
121   }
122 
123   previous_pick_port_functions_ = grpc_set_pick_port_functions(
__anond85b8d310302() 124       grpc_pick_port_functions{+[]() -> int {
125                                  grpc_core::MutexLock lock(&*mu_);
126                                  return g_fuzzing_event_engine->AllocatePort();
127                                },
128                                +[](int) {}});
129 }
130 
FuzzingDone()131 void FuzzingEventEngine::FuzzingDone() {
132   grpc_core::MutexLock lock(&*mu_);
133   while (!task_delays_.empty()) task_delays_.pop();
134 }
135 
NowAsTimespec(gpr_clock_type clock_type)136 gpr_timespec FuzzingEventEngine::NowAsTimespec(gpr_clock_type clock_type) {
137   // TODO(ctiller): add a facility to track realtime and monotonic clocks
138   // separately to simulate divergence.
139   GPR_ASSERT(clock_type != GPR_TIMESPAN);
140   const Duration d = now_.time_since_epoch();
141   auto secs = std::chrono::duration_cast<std::chrono::seconds>(d);
142   return {secs.count(), static_cast<int32_t>((d - secs).count()), clock_type};
143 }
144 
Tick(Duration max_time)145 void FuzzingEventEngine::Tick(Duration max_time) {
146   bool incremented_time = false;
147   while (true) {
148     std::vector<absl::AnyInvocable<void()>> to_run;
149     {
150       grpc_core::MutexLock lock(&*mu_);
151       grpc_core::MutexLock now_lock(&*now_mu_);
152       if (!incremented_time) {
153         Duration incr = max_time;
154         // TODO(ctiller): look at tasks_by_time_ and jump forward (once iomgr
155         // timers are gone)
156         if (!tasks_by_time_.empty()) {
157           incr = std::min(incr, tasks_by_time_.begin()->first - now_);
158         }
159         if (incr < exponential_gate_time_increment_) {
160           exponential_gate_time_increment_ = std::chrono::milliseconds(1);
161         } else {
162           incr = std::min(incr, exponential_gate_time_increment_);
163           exponential_gate_time_increment_ +=
164               exponential_gate_time_increment_ / 1000;
165         }
166         incr = std::max(incr, std::chrono::duration_cast<Duration>(
167                                   std::chrono::milliseconds(1)));
168         now_ += incr;
169         GPR_ASSERT(now_.time_since_epoch().count() >= 0);
170         ++current_tick_;
171         incremented_time = true;
172       }
173       // Find newly expired timers.
174       while (!tasks_by_time_.empty() && tasks_by_time_.begin()->first <= now_) {
175         auto& task = *tasks_by_time_.begin()->second;
176         tasks_by_id_.erase(task.id);
177         if (task.closure != nullptr) {
178           to_run.push_back(std::move(task.closure));
179         }
180         tasks_by_time_.erase(tasks_by_time_.begin());
181       }
182     }
183     if (to_run.empty()) return;
184     for (auto& closure : to_run) {
185       closure();
186     }
187   }
188 }
189 
TickUntilIdle()190 void FuzzingEventEngine::TickUntilIdle() {
191   while (true) {
192     {
193       grpc_core::MutexLock lock(&*mu_);
194       if (tasks_by_id_.empty()) return;
195     }
196     Tick();
197   }
198 }
199 
TickUntil(Time t)200 void FuzzingEventEngine::TickUntil(Time t) {
201   while (true) {
202     auto now = Now();
203     if (now >= t) break;
204     Tick(t - now);
205   }
206 }
207 
TickForDuration(Duration d)208 void FuzzingEventEngine::TickForDuration(Duration d) { TickUntil(Now() + d); }
209 
SetRunAfterDurationCallback(absl::AnyInvocable<void (Duration)> callback)210 void FuzzingEventEngine::SetRunAfterDurationCallback(
211     absl::AnyInvocable<void(Duration)> callback) {
212   grpc_core::MutexLock lock(&run_after_duration_callback_mu_);
213   run_after_duration_callback_ = std::move(callback);
214 }
215 
Now()216 FuzzingEventEngine::Time FuzzingEventEngine::Now() {
217   grpc_core::MutexLock lock(&*now_mu_);
218   return now_;
219 }
220 
AllocatePort()221 int FuzzingEventEngine::AllocatePort() {
222   // If the fuzzer selected some port orderings, do that first.
223   if (!free_ports_.empty()) {
224     int p = free_ports_.front();
225     free_ports_.pop();
226     return p;
227   }
228   // Otherwise just scan through starting at one and skipping any ports
229   // that were in the fuzzers initial list.
230   while (true) {
231     int p = next_free_port_++;
232     if (fuzzer_mentioned_ports_.count(p) == 0) {
233       return p;
234     }
235   }
236 }
237 
238 absl::StatusOr<std::unique_ptr<EventEngine::Listener>>
CreateListener(Listener::AcceptCallback on_accept,absl::AnyInvocable<void (absl::Status)> on_shutdown,const EndpointConfig &,std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)239 FuzzingEventEngine::CreateListener(
240     Listener::AcceptCallback on_accept,
241     absl::AnyInvocable<void(absl::Status)> on_shutdown, const EndpointConfig&,
242     std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) {
243   grpc_core::MutexLock lock(&*mu_);
244   // Create a listener and register it into the set of listener info in the
245   // event engine.
246   return absl::make_unique<FuzzingListener>(
247       *listeners_
248            .emplace(std::make_shared<ListenerInfo>(
249                std::move(on_accept), std::move(on_shutdown),
250                std::move(memory_allocator_factory)))
251            .first);
252 }
253 
~FuzzingListener()254 FuzzingEventEngine::FuzzingListener::~FuzzingListener() {
255   grpc_core::MutexLock lock(&*mu_);
256   g_fuzzing_event_engine->listeners_.erase(info_);
257 }
258 
IsPortUsed(int port)259 bool FuzzingEventEngine::IsPortUsed(int port) {
260   // Return true if a port is bound to a listener.
261   for (const auto& listener : listeners_) {
262     if (std::find(listener->ports.begin(), listener->ports.end(), port) !=
263         listener->ports.end()) {
264       return true;
265     }
266   }
267   return false;
268 }
269 
Bind(const ResolvedAddress & addr)270 absl::StatusOr<int> FuzzingEventEngine::FuzzingListener::Bind(
271     const ResolvedAddress& addr) {
272   // Extract the port from the address (or fail if non-localhost).
273   auto port = ResolvedAddressGetPort(addr);
274   grpc_core::MutexLock lock(&*mu_);
275   // Check that the listener hasn't already been started.
276   if (info_->started) return absl::InternalError("Already started");
277   if (port != 0) {
278     // If the port is non-zero, check that it's not already in use.
279     if (g_fuzzing_event_engine->IsPortUsed(port)) {
280       return absl::InternalError("Port in use");
281     }
282   } else {
283     // If the port is zero, allocate a new one.
284     do {
285       port = g_fuzzing_event_engine->AllocatePort();
286     } while (g_fuzzing_event_engine->IsPortUsed(port));
287   }
288   // Add the port to the listener.
289   info_->ports.push_back(port);
290   return port;
291 }
292 
Start()293 absl::Status FuzzingEventEngine::FuzzingListener::Start() {
294   // Start the listener or fail if it's already started.
295   grpc_core::MutexLock lock(&*mu_);
296   if (info_->started) return absl::InternalError("Already started");
297   info_->started = true;
298   return absl::OkStatus();
299 }
300 
Write(SliceBuffer * data,int index)301 bool FuzzingEventEngine::EndpointMiddle::Write(SliceBuffer* data, int index) {
302   GPR_ASSERT(!closed[index]);
303   const int peer_index = 1 - index;
304   if (data->Length() == 0) return true;
305   size_t write_len = std::numeric_limits<size_t>::max();
306   // Check the write_sizes queue for fuzzer imposed restrictions on this write
307   // size. This allows the fuzzer to force small writes to be seen by the
308   // reader.
309   if (!write_sizes[index].empty()) {
310     write_len = write_sizes[index].front();
311     write_sizes[index].pop();
312   }
313   if (write_len > data->Length()) {
314     write_len = data->Length();
315   }
316   // If the write_len is zero, we still need to write something, so we write one
317   // byte.
318   if (write_len == 0) write_len = 1;
319   if (trace_writes.enabled()) {
320     gpr_log(GPR_INFO, "WRITE[%p:%d]: %" PRIdPTR " bytes", this, index,
321             write_len);
322   }
323   // Expand the pending buffer.
324   size_t prev_len = pending[index].size();
325   pending[index].resize(prev_len + write_len);
326   // Move bytes from the to-write data into the pending buffer.
327   data->MoveFirstNBytesIntoBuffer(write_len, pending[index].data() + prev_len);
328   // If there was a pending read, then we can fulfill it.
329   if (pending_read[peer_index].has_value()) {
330     pending_read[peer_index]->buffer->Append(
331         Slice::FromCopiedBuffer(pending[index]));
332     pending[index].clear();
333     g_fuzzing_event_engine->RunLocked(
334         RunType::kWrite,
335         [cb = std::move(pending_read[peer_index]->on_read)]() mutable {
336           cb(absl::OkStatus());
337         });
338     pending_read[peer_index].reset();
339   }
340   return data->Length() == 0;
341 }
342 
Write(absl::AnyInvocable<void (absl::Status)> on_writable,SliceBuffer * data,const WriteArgs *)343 bool FuzzingEventEngine::FuzzingEndpoint::Write(
344     absl::AnyInvocable<void(absl::Status)> on_writable, SliceBuffer* data,
345     const WriteArgs*) {
346   grpc_core::global_stats().IncrementSyscallWrite();
347   grpc_core::MutexLock lock(&*mu_);
348   GPR_ASSERT(!middle_->closed[my_index()]);
349   GPR_ASSERT(!middle_->writing[my_index()]);
350   // If the write succeeds immediately, then we return true.
351   if (middle_->Write(data, my_index())) return true;
352   middle_->writing[my_index()] = true;
353   ScheduleDelayedWrite(middle_, my_index(), std::move(on_writable), data);
354   return false;
355 }
356 
ScheduleDelayedWrite(std::shared_ptr<EndpointMiddle> middle,int index,absl::AnyInvocable<void (absl::Status)> on_writable,SliceBuffer * data)357 void FuzzingEventEngine::FuzzingEndpoint::ScheduleDelayedWrite(
358     std::shared_ptr<EndpointMiddle> middle, int index,
359     absl::AnyInvocable<void(absl::Status)> on_writable, SliceBuffer* data) {
360   g_fuzzing_event_engine->RunLocked(
361       RunType::kWrite, [middle = std::move(middle), index, data,
362                         on_writable = std::move(on_writable)]() mutable {
363         grpc_core::ReleasableMutexLock lock(&*mu_);
364         GPR_ASSERT(middle->writing[index]);
365         if (middle->closed[index]) {
366           g_fuzzing_event_engine->RunLocked(
367               RunType::kRunAfter,
368               [on_writable = std::move(on_writable)]() mutable {
369                 on_writable(absl::InternalError("Endpoint closed"));
370               });
371           return;
372         }
373         if (middle->Write(data, index)) {
374           middle->writing[index] = false;
375           lock.Release();
376           on_writable(absl::OkStatus());
377           return;
378         }
379         ScheduleDelayedWrite(std::move(middle), index, std::move(on_writable),
380                              data);
381       });
382 }
383 
~FuzzingEndpoint()384 FuzzingEventEngine::FuzzingEndpoint::~FuzzingEndpoint() {
385   grpc_core::MutexLock lock(&*mu_);
386   middle_->closed[my_index()] = true;
387   if (middle_->pending_read[my_index()].has_value()) {
388     g_fuzzing_event_engine->RunLocked(
389         RunType::kRunAfter,
390         [cb = std::move(middle_->pending_read[my_index()]->on_read)]() mutable {
391           cb(absl::InternalError("Endpoint closed"));
392         });
393     middle_->pending_read[my_index()].reset();
394   }
395   if (!middle_->writing[peer_index()] &&
396       middle_->pending_read[peer_index()].has_value()) {
397     g_fuzzing_event_engine->RunLocked(
398         RunType::kRunAfter,
399         [cb = std::move(
400              middle_->pending_read[peer_index()]->on_read)]() mutable {
401           cb(absl::InternalError("Endpoint closed"));
402         });
403     middle_->pending_read[peer_index()].reset();
404   }
405 }
406 
Read(absl::AnyInvocable<void (absl::Status)> on_read,SliceBuffer * buffer,const ReadArgs *)407 bool FuzzingEventEngine::FuzzingEndpoint::Read(
408     absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer,
409     const ReadArgs*) {
410   buffer->Clear();
411   grpc_core::MutexLock lock(&*mu_);
412   GPR_ASSERT(!middle_->closed[my_index()]);
413   if (middle_->pending[peer_index()].empty()) {
414     // If the endpoint is closed, fail asynchronously.
415     if (middle_->closed[peer_index()]) {
416       g_fuzzing_event_engine->RunLocked(
417           RunType::kRunAfter, [on_read = std::move(on_read)]() mutable {
418             on_read(absl::InternalError("Endpoint closed"));
419           });
420       return false;
421     }
422     // If the endpoint has no pending data, then we need to wait for a write.
423     middle_->pending_read[my_index()] = PendingRead{std::move(on_read), buffer};
424     return false;
425   } else {
426     // If the endpoint has pending data, then we can fulfill the read
427     // immediately.
428     buffer->Append(Slice::FromCopiedBuffer(middle_->pending[peer_index()]));
429     middle_->pending[peer_index()].clear();
430     return true;
431   }
432 }
433 
WriteSizesForConnection()434 std::queue<size_t> FuzzingEventEngine::WriteSizesForConnection() {
435   if (write_sizes_for_future_connections_.empty()) return std::queue<size_t>();
436   auto ret = std::move(write_sizes_for_future_connections_.front());
437   write_sizes_for_future_connections_.pop();
438   return ret;
439 }
440 
EndpointMiddle(int listener_port,int client_port)441 FuzzingEventEngine::EndpointMiddle::EndpointMiddle(int listener_port,
442                                                    int client_port)
443     : addrs{PortToAddress(listener_port), PortToAddress(client_port)},
444       write_sizes{g_fuzzing_event_engine->WriteSizesForConnection(),
445                   g_fuzzing_event_engine->WriteSizesForConnection()} {}
446 
Connect(OnConnectCallback on_connect,const ResolvedAddress & addr,const EndpointConfig &,MemoryAllocator,Duration)447 EventEngine::ConnectionHandle FuzzingEventEngine::Connect(
448     OnConnectCallback on_connect, const ResolvedAddress& addr,
449     const EndpointConfig&, MemoryAllocator, Duration) {
450   // TODO(ctiller): do something with the timeout
451   // Schedule a timer to run (with some fuzzer selected delay) the on_connect
452   // callback.
453   grpc_core::MutexLock lock(&*mu_);
454   auto task_handle = RunAfterLocked(
455       RunType::kRunAfter, Duration(0),
456       [this, addr, on_connect = std::move(on_connect)]() mutable {
457         // Check for a legal address and extract the target port number.
458         auto port = ResolvedAddressGetPort(addr);
459         grpc_core::MutexLock lock(&*mu_);
460         // Find the listener that is listening on the target port.
461         for (auto it = listeners_.begin(); it != listeners_.end(); ++it) {
462           const auto& listener = *it;
463           // Listener must be started.
464           if (!listener->started) continue;
465           for (int listener_port : listener->ports) {
466             if (port == listener_port) {
467               // Port matches on a started listener: create an endpoint, call
468               // on_accept for the listener and on_connect for the client.
469               auto middle = std::make_shared<EndpointMiddle>(
470                   listener_port, g_fuzzing_event_engine->AllocatePort());
471               auto ep1 = std::make_unique<FuzzingEndpoint>(middle, 0);
472               auto ep2 = std::make_unique<FuzzingEndpoint>(middle, 1);
473               RunLocked(RunType::kRunAfter, [listener,
474                                              ep1 = std::move(ep1)]() mutable {
475                 listener->on_accept(
476                     std::move(ep1),
477                     listener->memory_allocator_factory->CreateMemoryAllocator(
478                         "fuzzing"));
479               });
480               RunLocked(RunType::kRunAfter, [on_connect = std::move(on_connect),
481                                              ep2 = std::move(ep2)]() mutable {
482                 on_connect(std::move(ep2));
483               });
484               return;
485             }
486           }
487         }
488         // Fail: no such listener.
489         RunLocked(RunType::kRunAfter,
490                   [on_connect = std::move(on_connect)]() mutable {
491                     on_connect(absl::InvalidArgumentError("No listener found"));
492                   });
493       });
494   return ConnectionHandle{{task_handle.keys[0], task_handle.keys[1]}};
495 }
496 
CancelConnect(ConnectionHandle connection_handle)497 bool FuzzingEventEngine::CancelConnect(ConnectionHandle connection_handle) {
498   return Cancel(
499       TaskHandle{{connection_handle.keys[0], connection_handle.keys[1]}});
500 }
501 
IsWorkerThread()502 bool FuzzingEventEngine::IsWorkerThread() { abort(); }
503 
504 absl::StatusOr<std::unique_ptr<EventEngine::DNSResolver>>
GetDNSResolver(const DNSResolver::ResolverOptions &)505 FuzzingEventEngine::GetDNSResolver(const DNSResolver::ResolverOptions&) {
506 #if defined(GRPC_POSIX_SOCKET_TCP)
507   return std::make_unique<NativePosixDNSResolver>(shared_from_this());
508 #else
509   grpc_core::Crash("FuzzingEventEngine::GetDNSResolver Not implemented");
510 #endif
511 }
512 
Run(Closure * closure)513 void FuzzingEventEngine::Run(Closure* closure) {
514   grpc_core::MutexLock lock(&*mu_);
515   RunAfterLocked(RunType::kRunAfter, Duration::zero(),
516                  [closure]() { closure->Run(); });
517 }
518 
Run(absl::AnyInvocable<void ()> closure)519 void FuzzingEventEngine::Run(absl::AnyInvocable<void()> closure) {
520   grpc_core::MutexLock lock(&*mu_);
521   RunAfterLocked(RunType::kRunAfter, Duration::zero(), std::move(closure));
522 }
523 
RunAfter(Duration when,Closure * closure)524 EventEngine::TaskHandle FuzzingEventEngine::RunAfter(Duration when,
525                                                      Closure* closure) {
526   return RunAfter(when, [closure]() { closure->Run(); });
527 }
528 
RunAfter(Duration when,absl::AnyInvocable<void ()> closure)529 EventEngine::TaskHandle FuzzingEventEngine::RunAfter(
530     Duration when, absl::AnyInvocable<void()> closure) {
531   {
532     grpc_core::MutexLock lock(&run_after_duration_callback_mu_);
533     if (run_after_duration_callback_ != nullptr) {
534       run_after_duration_callback_(when);
535     }
536   }
537   grpc_core::MutexLock lock(&*mu_);
538   // (b/258949216): Cap it to one year to avoid integer overflow errors.
539   return RunAfterLocked(RunType::kRunAfter, std::min(when, kOneYear),
540                         std::move(closure));
541 }
542 
RunAfterExactly(Duration when,absl::AnyInvocable<void ()> closure)543 EventEngine::TaskHandle FuzzingEventEngine::RunAfterExactly(
544     Duration when, absl::AnyInvocable<void()> closure) {
545   grpc_core::MutexLock lock(&*mu_);
546   // (b/258949216): Cap it to one year to avoid integer overflow errors.
547   return RunAfterLocked(RunType::kExact, std::min(when, kOneYear),
548                         std::move(closure));
549 }
550 
RunAfterLocked(RunType run_type,Duration when,absl::AnyInvocable<void ()> closure)551 EventEngine::TaskHandle FuzzingEventEngine::RunAfterLocked(
552     RunType run_type, Duration when, absl::AnyInvocable<void()> closure) {
553   const intptr_t id = next_task_id_;
554   ++next_task_id_;
555   Duration delay_taken = Duration::zero();
556   if (run_type != RunType::kExact) {
557     if (!task_delays_.empty()) {
558       delay_taken = grpc_core::Clamp(task_delays_.front(), Duration::zero(),
559                                      max_delay_[static_cast<int>(run_type)]);
560       task_delays_.pop();
561     } else if (run_type != RunType::kWrite && when == Duration::zero()) {
562       // For zero-duration events, if there is no more delay input from
563       // the test case, we default to a small non-zero value to avoid
564       // busy loops that prevent us from making forward progress.
565       delay_taken = std::chrono::microseconds(1);
566     }
567     when += delay_taken;
568   }
569   auto task = std::make_shared<Task>(id, std::move(closure));
570   tasks_by_id_.emplace(id, task);
571   Time final_time;
572   Time now;
573   {
574     grpc_core::MutexLock lock(&*now_mu_);
575     final_time = now_ + when;
576     now = now_;
577     tasks_by_time_.emplace(final_time, std::move(task));
578   }
579   if (trace_timers.enabled()) {
580     gpr_log(GPR_INFO,
581             "Schedule timer %" PRIx64 " @ %" PRIu64 " (now=%" PRIu64
582             "; delay=%" PRIu64 "; fuzzing_added=%" PRIu64 "; type=%d)",
583             id, static_cast<uint64_t>(final_time.time_since_epoch().count()),
584             now.time_since_epoch().count(), when.count(), delay_taken.count(),
585             static_cast<int>(run_type));
586   }
587   return TaskHandle{id, kTaskHandleSalt};
588 }
589 
Cancel(TaskHandle handle)590 bool FuzzingEventEngine::Cancel(TaskHandle handle) {
591   grpc_core::MutexLock lock(&*mu_);
592   GPR_ASSERT(handle.keys[1] == kTaskHandleSalt);
593   const intptr_t id = handle.keys[0];
594   auto it = tasks_by_id_.find(id);
595   if (it == tasks_by_id_.end()) {
596     return false;
597   }
598   if (it->second->closure == nullptr) {
599     return false;
600   }
601   if (trace_timers.enabled()) {
602     gpr_log(GPR_INFO, "Cancel timer %" PRIx64, id);
603   }
604   it->second->closure = nullptr;
605   return true;
606 }
607 
GlobalNowImpl(gpr_clock_type clock_type)608 gpr_timespec FuzzingEventEngine::GlobalNowImpl(gpr_clock_type clock_type) {
609   if (g_fuzzing_event_engine == nullptr) {
610     return gpr_inf_future(clock_type);
611   }
612   GPR_ASSERT(g_fuzzing_event_engine != nullptr);
613   grpc_core::MutexLock lock(&*now_mu_);
614   return g_fuzzing_event_engine->NowAsTimespec(clock_type);
615 }
616 
UnsetGlobalHooks()617 void FuzzingEventEngine::UnsetGlobalHooks() {
618   if (g_fuzzing_event_engine != this) return;
619   g_fuzzing_event_engine = nullptr;
620   gpr_now_impl = g_orig_gpr_now_impl;
621   g_orig_gpr_now_impl = nullptr;
622   grpc_set_pick_port_functions(previous_pick_port_functions_);
623 }
624 
~ListenerInfo()625 FuzzingEventEngine::ListenerInfo::~ListenerInfo() {
626   GPR_ASSERT(g_fuzzing_event_engine != nullptr);
627   g_fuzzing_event_engine->Run(
628       [on_shutdown = std::move(on_shutdown),
629        shutdown_status = std::move(shutdown_status)]() mutable {
630         on_shutdown(std::move(shutdown_status));
631       });
632 }
633 
634 }  // namespace experimental
635 }  // namespace grpc_event_engine
636