xref: /aosp_15_r20/external/grpc-grpc/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_TEST_CORE_EVENT_ENGINE_FUZZING_EVENT_ENGINE_FUZZING_EVENT_ENGINE_H
16 #define GRPC_TEST_CORE_EVENT_ENGINE_FUZZING_EVENT_ENGINE_FUZZING_EVENT_ENGINE_H
17 
18 #include <stddef.h>
19 
20 #include <chrono>
21 #include <cstdint>
22 #include <map>
23 #include <memory>
24 #include <queue>
25 #include <set>
26 #include <thread>
27 #include <utility>
28 #include <vector>
29 
30 #include "absl/base/thread_annotations.h"
31 #include "absl/functional/any_invocable.h"
32 #include "absl/status/status.h"
33 #include "absl/status/statusor.h"
34 #include "absl/types/optional.h"
35 
36 #include <grpc/event_engine/endpoint_config.h>
37 #include <grpc/event_engine/event_engine.h>
38 #include <grpc/event_engine/memory_allocator.h>
39 #include <grpc/event_engine/slice_buffer.h>
40 #include <grpc/support/time.h>
41 
42 #include "src/core/lib/event_engine/time_util.h"
43 #include "src/core/lib/gprpp/no_destruct.h"
44 #include "src/core/lib/gprpp/sync.h"
45 #include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h"
46 #include "test/core/util/port.h"
47 
48 namespace grpc_event_engine {
49 namespace experimental {
50 
51 // EventEngine implementation to be used by fuzzers.
52 // It's only allowed to have one FuzzingEventEngine instantiated at a time.
53 class FuzzingEventEngine : public EventEngine {
54  public:
55   struct Options {
56     Duration max_delay_run_after = std::chrono::seconds(30);
57     Duration max_delay_write = std::chrono::seconds(30);
58   };
59   explicit FuzzingEventEngine(Options options,
60                               const fuzzing_event_engine::Actions& actions);
~FuzzingEventEngine()61   ~FuzzingEventEngine() override { UnsetGlobalHooks(); }
62 
63   using Time = std::chrono::time_point<FuzzingEventEngine, Duration>;
64 
65   // Once the fuzzing work is completed, this method should be called to speed
66   // quiescence.
67   void FuzzingDone() ABSL_LOCKS_EXCLUDED(mu_);
68   // Increment time once and perform any scheduled work.
69   void Tick(Duration max_time = std::chrono::seconds(600))
70       ABSL_LOCKS_EXCLUDED(mu_);
71   // Repeatedly call Tick() until there is no more work to do.
72   void TickUntilIdle() ABSL_LOCKS_EXCLUDED(mu_);
73   // Tick until some time
74   void TickUntil(Time t) ABSL_LOCKS_EXCLUDED(mu_);
75   // Tick for some duration
76   void TickForDuration(Duration d) ABSL_LOCKS_EXCLUDED(mu_);
77 
78   // Sets a callback to be invoked any time RunAfter() is called.
79   // Allows tests to verify the specified duration.
80   void SetRunAfterDurationCallback(absl::AnyInvocable<void(Duration)> callback);
81 
82   absl::StatusOr<std::unique_ptr<Listener>> CreateListener(
83       Listener::AcceptCallback on_accept,
84       absl::AnyInvocable<void(absl::Status)> on_shutdown,
85       const EndpointConfig& config,
86       std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)
87       ABSL_LOCKS_EXCLUDED(mu_) override;
88 
89   ConnectionHandle Connect(OnConnectCallback on_connect,
90                            const ResolvedAddress& addr,
91                            const EndpointConfig& args,
92                            MemoryAllocator memory_allocator, Duration timeout)
93       ABSL_LOCKS_EXCLUDED(mu_) override;
94 
95   bool CancelConnect(ConnectionHandle handle) ABSL_LOCKS_EXCLUDED(mu_) override;
96 
97   bool IsWorkerThread() override;
98 
99   absl::StatusOr<std::unique_ptr<DNSResolver>> GetDNSResolver(
100       const DNSResolver::ResolverOptions& options) override;
101 
102   void Run(Closure* closure) ABSL_LOCKS_EXCLUDED(mu_) override;
103   void Run(absl::AnyInvocable<void()> closure)
104       ABSL_LOCKS_EXCLUDED(mu_) override;
105   TaskHandle RunAfter(Duration when, Closure* closure)
106       ABSL_LOCKS_EXCLUDED(mu_) override;
107   TaskHandle RunAfter(Duration when, absl::AnyInvocable<void()> closure)
108       ABSL_LOCKS_EXCLUDED(mu_) override;
109   bool Cancel(TaskHandle handle) ABSL_LOCKS_EXCLUDED(mu_) override;
110 
111   TaskHandle RunAfterExactly(Duration when, absl::AnyInvocable<void()> closure)
112       ABSL_LOCKS_EXCLUDED(mu_);
113 
114   Time Now() ABSL_LOCKS_EXCLUDED(mu_);
115 
116   // Clear any global hooks installed by this event engine. Call prior to
117   // destruction to ensure no overlap between tests if constructing/destructing
118   // each test.
119   void UnsetGlobalHooks() ABSL_LOCKS_EXCLUDED(mu_);
120 
max_delay_write()121   Duration max_delay_write() const {
122     return max_delay_[static_cast<int>(RunType::kWrite)];
123   }
124 
125  private:
126   enum class RunType {
127     kWrite,
128     kRunAfter,
129     kExact,
130   };
131 
132   // One pending task to be run.
133   struct Task {
TaskTask134     Task(intptr_t id, absl::AnyInvocable<void()> closure)
135         : id(id), closure(std::move(closure)) {}
136     intptr_t id;
137     absl::AnyInvocable<void()> closure;
138   };
139 
140   // Per listener information.
141   // We keep a shared_ptr to this, one reference held by the FuzzingListener
142   // Listener implementation, and one reference in the event engine state, so it
143   // may be iterated through and inspected - principally to discover the ports
144   // on which this listener is listening.
145   struct ListenerInfo {
ListenerInfoListenerInfo146     ListenerInfo(
147         Listener::AcceptCallback on_accept,
148         absl::AnyInvocable<void(absl::Status)> on_shutdown,
149         std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)
150         : on_accept(std::move(on_accept)),
151           on_shutdown(std::move(on_shutdown)),
152           memory_allocator_factory(std::move(memory_allocator_factory)),
153           started(false) {}
154     ~ListenerInfo() ABSL_LOCKS_EXCLUDED(mu_);
155     // The callback to invoke when a new connection is accepted.
156     Listener::AcceptCallback on_accept;
157     // The callback to invoke when the listener is shut down.
158     absl::AnyInvocable<void(absl::Status)> on_shutdown;
159     // The memory allocator factory to use for this listener.
160     const std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory;
161     // The ports on which this listener is listening.
162     std::vector<int> ports ABSL_GUARDED_BY(mu_);
163     // Has start been called on the listener?
164     // Used to emulate the Bind/Start semantics demanded by the API.
165     bool started ABSL_GUARDED_BY(mu_);
166     // The status to return via on_shutdown.
167     absl::Status shutdown_status ABSL_GUARDED_BY(mu_);
168   };
169 
170   // Implementation of Listener.
171   class FuzzingListener final : public Listener {
172    public:
FuzzingListener(std::shared_ptr<ListenerInfo> info)173     explicit FuzzingListener(std::shared_ptr<ListenerInfo> info)
174         : info_(std::move(info)) {}
175     ~FuzzingListener() override;
176     absl::StatusOr<int> Bind(const ResolvedAddress& addr) override;
177     absl::Status Start() override;
178 
179    private:
180     std::shared_ptr<ListenerInfo> info_;
181   };
182 
183   // One read that's outstanding.
184   struct PendingRead {
185     // Callback to invoke when the read completes.
186     absl::AnyInvocable<void(absl::Status)> on_read;
187     // The buffer to read into.
188     SliceBuffer* buffer;
189   };
190 
191   // The join between two Endpoint instances.
192   struct EndpointMiddle {
193     EndpointMiddle(int listener_port, int client_port)
194         ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
195     // Address of each side of the endpoint.
196     const ResolvedAddress addrs[2];
197     // Is the endpoint closed?
198     bool closed[2] ABSL_GUARDED_BY(mu_) = {false, false};
199     // Is the endpoint writing?
200     bool writing[2] ABSL_GUARDED_BY(mu_) = {false, false};
201     // Bytes written into each endpoint and awaiting a read.
202     std::vector<uint8_t> pending[2] ABSL_GUARDED_BY(mu_);
203     // The sizes of each accepted write, as determined by the fuzzer actions.
204     std::queue<size_t> write_sizes[2] ABSL_GUARDED_BY(mu_);
205     // The next read that's pending (or nullopt).
206     absl::optional<PendingRead> pending_read[2] ABSL_GUARDED_BY(mu_);
207 
208     // Helper to take some bytes from data and queue them into pending[index].
209     // Returns true if all bytes were consumed, false if more writes are needed.
210     bool Write(SliceBuffer* data, int index) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
211   };
212 
213   // Implementation of Endpoint.
214   // When a connection is formed, we create two of these - one with index 0, the
215   // other index 1, both pointing to the same EndpointMiddle.
216   class FuzzingEndpoint final : public Endpoint {
217    public:
FuzzingEndpoint(std::shared_ptr<EndpointMiddle> middle,int index)218     FuzzingEndpoint(std::shared_ptr<EndpointMiddle> middle, int index)
219         : middle_(std::move(middle)), index_(index) {}
220     ~FuzzingEndpoint() override;
221 
222     bool Read(absl::AnyInvocable<void(absl::Status)> on_read,
223               SliceBuffer* buffer, const ReadArgs* args) override;
224     bool Write(absl::AnyInvocable<void(absl::Status)> on_writable,
225                SliceBuffer* data, const WriteArgs* args) override;
GetPeerAddress()226     const ResolvedAddress& GetPeerAddress() const override {
227       return middle_->addrs[peer_index()];
228     }
GetLocalAddress()229     const ResolvedAddress& GetLocalAddress() const override {
230       return middle_->addrs[my_index()];
231     }
232 
233    private:
my_index()234     int my_index() const { return index_; }
peer_index()235     int peer_index() const { return 1 - index_; }
236     // Schedule additional writes to be performed later.
237     // Takes a ref to middle instead of holding this, so that should the
238     // endpoint be destroyed we don't have to worry about use-after-free.
239     // Instead that scheduled callback will see the middle is closed and finally
240     // report completion to the caller.
241     // Since there is no timeliness contract for the completion of writes after
242     // endpoint shutdown, it's believed this is a legal implementation.
243     static void ScheduleDelayedWrite(
244         std::shared_ptr<EndpointMiddle> middle, int index,
245         absl::AnyInvocable<void(absl::Status)> on_writable, SliceBuffer* data)
246         ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
247     const std::shared_ptr<EndpointMiddle> middle_;
248     const int index_;
249   };
250 
RunLocked(RunType run_type,absl::AnyInvocable<void ()> closure)251   void RunLocked(RunType run_type, absl::AnyInvocable<void()> closure)
252       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
253     RunAfterLocked(run_type, Duration::zero(), std::move(closure));
254   }
255 
256   TaskHandle RunAfterLocked(RunType run_type, Duration when,
257                             absl::AnyInvocable<void()> closure)
258       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
259 
260   // Allocate a port. Considered fuzzer selected port orderings first, and then
261   // falls back to an exhaustive incremental search from port #1.
262   int AllocatePort() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
263   // Is the given port in use by any listener?
264   bool IsPortUsed(int port) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
265   // For the next connection being built, query the list of fuzzer selected
266   // write size limits.
267   std::queue<size_t> WriteSizesForConnection()
268       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
269 
270   gpr_timespec NowAsTimespec(gpr_clock_type clock_type)
271       ABSL_EXCLUSIVE_LOCKS_REQUIRED(now_mu_);
272   static gpr_timespec GlobalNowImpl(gpr_clock_type clock_type)
273       ABSL_LOCKS_EXCLUDED(mu_);
274 
275   static grpc_core::NoDestruct<grpc_core::Mutex> mu_;
276   static grpc_core::NoDestruct<grpc_core::Mutex> now_mu_
277       ABSL_ACQUIRED_AFTER(mu_);
278 
279   Duration exponential_gate_time_increment_ ABSL_GUARDED_BY(mu_) =
280       std::chrono::milliseconds(1);
281   const Duration max_delay_[2];
282   intptr_t next_task_id_ ABSL_GUARDED_BY(mu_);
283   intptr_t current_tick_ ABSL_GUARDED_BY(now_mu_);
284   Time now_ ABSL_GUARDED_BY(now_mu_);
285   std::queue<Duration> task_delays_ ABSL_GUARDED_BY(mu_);
286   std::map<intptr_t, std::shared_ptr<Task>> tasks_by_id_ ABSL_GUARDED_BY(mu_);
287   std::multimap<Time, std::shared_ptr<Task>> tasks_by_time_
288       ABSL_GUARDED_BY(mu_);
289   std::set<std::shared_ptr<ListenerInfo>> listeners_ ABSL_GUARDED_BY(mu_);
290   // Fuzzer selected port allocations.
291   std::queue<int> free_ports_ ABSL_GUARDED_BY(mu_);
292   // Next free port to allocate once fuzzer selections are exhausted.
293   int next_free_port_ ABSL_GUARDED_BY(mu_) = 1;
294   // Ports that were included in the fuzzer selected port orderings.
295   std::set<int> fuzzer_mentioned_ports_ ABSL_GUARDED_BY(mu_);
296   // Fuzzer selected write sizes for future connections - one picked off per
297   // WriteSizesForConnection() call.
298   std::queue<std::queue<size_t>> write_sizes_for_future_connections_
299       ABSL_GUARDED_BY(mu_);
300   grpc_pick_port_functions previous_pick_port_functions_;
301 
302   grpc_core::Mutex run_after_duration_callback_mu_;
303   absl::AnyInvocable<void(Duration)> run_after_duration_callback_
304       ABSL_GUARDED_BY(run_after_duration_callback_mu_);
305 };
306 
307 class ThreadedFuzzingEventEngine : public FuzzingEventEngine {
308  public:
ThreadedFuzzingEventEngine()309   ThreadedFuzzingEventEngine()
310       : ThreadedFuzzingEventEngine(std::chrono::milliseconds(10)) {}
311 
ThreadedFuzzingEventEngine(Duration max_time)312   explicit ThreadedFuzzingEventEngine(Duration max_time)
313       : FuzzingEventEngine(FuzzingEventEngine::Options(),
314                            fuzzing_event_engine::Actions()),
315         main_([this, max_time]() {
316           while (!done_.load()) {
317             absl::SleepFor(absl::Milliseconds(
318                 grpc_event_engine::experimental::Milliseconds(max_time)));
319             Tick();
320           }
321         }) {}
322 
~ThreadedFuzzingEventEngine()323   ~ThreadedFuzzingEventEngine() override {
324     done_.store(true);
325     main_.join();
326   }
327 
328  private:
329   std::atomic<bool> done_{false};
330   std::thread main_;
331 };
332 
333 }  // namespace experimental
334 }  // namespace grpc_event_engine
335 
336 #endif  // GRPC_TEST_CORE_EVENT_ENGINE_FUZZING_EVENT_ENGINE_FUZZING_EVENT_ENGINE_H
337