xref: /aosp_15_r20/external/grpc-grpc/test/core/end2end/fuzzers/fuzzing_common.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2023 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "test/core/end2end/fuzzers/fuzzing_common.h"
20 
21 #include <string.h>
22 
23 #include <memory>
24 #include <new>
25 
26 #include "absl/strings/str_cat.h"
27 #include "absl/types/optional.h"
28 
29 #include <grpc/byte_buffer.h>
30 #include <grpc/event_engine/event_engine.h>
31 #include <grpc/grpc.h>
32 #include <grpc/slice.h>
33 #include <grpc/status.h>
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/time.h>
36 
37 #include "src/core/lib/event_engine/default_event_engine.h"
38 #include "src/core/lib/experiments/config.h"
39 #include "src/core/lib/gprpp/crash.h"
40 #include "src/core/lib/iomgr/exec_ctx.h"
41 #include "src/core/lib/iomgr/executor.h"
42 #include "src/core/lib/iomgr/timer_manager.h"
43 #include "src/core/lib/resource_quota/memory_quota.h"
44 #include "src/core/lib/slice/slice_internal.h"
45 #include "src/core/lib/surface/channel.h"
46 #include "test/core/end2end/fuzzers/api_fuzzer.pb.h"
47 
48 namespace grpc_core {
49 
50 namespace {
__anoneb274ae20202() 51 int force_experiments = []() {
52   ForceEnableExperiment("event_engine_client", true);
53   ForceEnableExperiment("event_engine_listener", true);
54   return 1;
55 }();
56 }  // namespace
57 
58 namespace testing {
59 
free_non_null(void * p)60 static void free_non_null(void* p) {
61   GPR_ASSERT(p != nullptr);
62   gpr_free(p);
63 }
64 
65 enum class CallType { CLIENT, SERVER, PENDING_SERVER, TOMBSTONED };
66 
67 class Call : public std::enable_shared_from_this<Call> {
68  public:
Call(CallType type)69   explicit Call(CallType type) : type_(type) {
70     grpc_metadata_array_init(&recv_initial_metadata_);
71     grpc_metadata_array_init(&recv_trailing_metadata_);
72     grpc_call_details_init(&call_details_);
73   }
74 
75   ~Call();
76 
type() const77   CallType type() const { return type_; }
78 
done() const79   bool done() const {
80     if ((type_ == CallType::TOMBSTONED || call_closed_) && pending_ops_ == 0) {
81       return true;
82     }
83     if (call_ == nullptr && type() != CallType::PENDING_SERVER) return true;
84     return false;
85   }
86 
Shutdown()87   void Shutdown() {
88     if (call_ != nullptr) {
89       grpc_call_cancel(call_, nullptr);
90       type_ = CallType::TOMBSTONED;
91     }
92   }
93 
SetCall(grpc_call * call)94   void SetCall(grpc_call* call) {
95     GPR_ASSERT(call_ == nullptr);
96     call_ = call;
97   }
98 
call() const99   grpc_call* call() const { return call_; }
100 
RequestCall(grpc_server * server,grpc_completion_queue * cq)101   void RequestCall(grpc_server* server, grpc_completion_queue* cq) {
102     auto* v = FinishedRequestCall();
103     grpc_call_error error = grpc_server_request_call(
104         server, &call_, &call_details_, &recv_initial_metadata_, cq, cq, v);
105     if (error != GRPC_CALL_OK) {
106       v->Run(false);
107     }
108   }
109 
Allocate(size_t size)110   void* Allocate(size_t size) {
111     void* p = gpr_malloc(size);
112     free_pointers_.push_back(p);
113     return p;
114   }
115 
116   template <typename T>
AllocArray(size_t elems)117   T* AllocArray(size_t elems) {
118     return static_cast<T*>(Allocate(sizeof(T) * elems));
119   }
120 
121   template <typename T>
NewCopy(T value)122   T* NewCopy(T value) {
123     T* p = AllocArray<T>(1);
124     new (p) T(value);
125     return p;
126   }
127 
128   template <typename T>
ReadSlice(const T & s)129   grpc_slice ReadSlice(const T& s) {
130     grpc_slice slice = grpc_slice_from_cpp_string(s.value());
131     unref_slices_.push_back(slice);
132     return slice;
133   }
134 
135   template <typename M>
ReadMetadata(const M & metadata)136   grpc_metadata_array ReadMetadata(const M& metadata) {
137     grpc_metadata* m = AllocArray<grpc_metadata>(metadata.size());
138     for (int i = 0; i < metadata.size(); ++i) {
139       m[i].key = ReadSlice(metadata[i].key());
140       m[i].value = ReadSlice(metadata[i].value());
141     }
142     return grpc_metadata_array{static_cast<size_t>(metadata.size()),
143                                static_cast<size_t>(metadata.size()), m};
144   }
145 
ReadOp(const api_fuzzer::BatchOp & batch_op,bool * batch_is_ok,uint8_t * batch_ops,std::vector<std::function<void ()>> * unwinders)146   absl::optional<grpc_op> ReadOp(
147       const api_fuzzer::BatchOp& batch_op, bool* batch_is_ok,
148       uint8_t* batch_ops, std::vector<std::function<void()>>* unwinders) {
149     grpc_op op;
150     memset(&op, 0, sizeof(op));
151     switch (batch_op.op_case()) {
152       case api_fuzzer::BatchOp::OP_NOT_SET:
153         // invalid value
154         return {};
155       case api_fuzzer::BatchOp::kSendInitialMetadata:
156         if (sent_initial_metadata_) {
157           *batch_is_ok = false;
158         } else {
159           sent_initial_metadata_ = true;
160           op.op = GRPC_OP_SEND_INITIAL_METADATA;
161           *batch_ops |= 1 << GRPC_OP_SEND_INITIAL_METADATA;
162           auto ary = ReadMetadata(batch_op.send_initial_metadata().metadata());
163           op.data.send_initial_metadata.count = ary.count;
164           op.data.send_initial_metadata.metadata = ary.metadata;
165         }
166         break;
167       case api_fuzzer::BatchOp::kSendMessage:
168         op.op = GRPC_OP_SEND_MESSAGE;
169         if (send_message_ != nullptr) {
170           *batch_is_ok = false;
171         } else {
172           *batch_ops |= 1 << GRPC_OP_SEND_MESSAGE;
173           std::vector<grpc_slice> slices;
174           for (const auto& m : batch_op.send_message().message()) {
175             slices.push_back(ReadSlice(m));
176           }
177           send_message_ = op.data.send_message.send_message =
178               grpc_raw_byte_buffer_create(slices.data(), slices.size());
179           unwinders->push_back([this]() {
180             grpc_byte_buffer_destroy(send_message_);
181             send_message_ = nullptr;
182           });
183         }
184         break;
185       case api_fuzzer::BatchOp::kSendCloseFromClient:
186         op.op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
187         *batch_ops |= 1 << GRPC_OP_SEND_CLOSE_FROM_CLIENT;
188         break;
189       case api_fuzzer::BatchOp::kSendStatusFromServer: {
190         op.op = GRPC_OP_SEND_STATUS_FROM_SERVER;
191         *batch_ops |= 1 << GRPC_OP_SEND_STATUS_FROM_SERVER;
192         auto ary = ReadMetadata(batch_op.send_status_from_server().metadata());
193         op.data.send_status_from_server.trailing_metadata_count = ary.count;
194         op.data.send_status_from_server.trailing_metadata = ary.metadata;
195         op.data.send_status_from_server.status = static_cast<grpc_status_code>(
196             batch_op.send_status_from_server().status_code());
197         op.data.send_status_from_server.status_details =
198             batch_op.send_status_from_server().has_status_details()
199                 ? NewCopy(ReadSlice(
200                       batch_op.send_status_from_server().status_details()))
201                 : nullptr;
202       } break;
203       case api_fuzzer::BatchOp::kReceiveInitialMetadata:
204         if (enqueued_recv_initial_metadata_) {
205           *batch_is_ok = false;
206         } else {
207           enqueued_recv_initial_metadata_ = true;
208           op.op = GRPC_OP_RECV_INITIAL_METADATA;
209           *batch_ops |= 1 << GRPC_OP_RECV_INITIAL_METADATA;
210           op.data.recv_initial_metadata.recv_initial_metadata =
211               &recv_initial_metadata_;
212         }
213         break;
214       case api_fuzzer::BatchOp::kReceiveMessage:
215         // Allow only one active pending_recv_message_op to exist. Otherwise if
216         // the previous enqueued recv_message_op is not complete by the time
217         // we get here, then under certain conditions, enqueuing this op will
218         // overwrite the internal call->receiving_buffer maintained by grpc
219         // leading to a memory leak.
220         if (call_closed_ || pending_recv_message_op_) {
221           *batch_is_ok = false;
222         } else {
223           op.op = GRPC_OP_RECV_MESSAGE;
224           *batch_ops |= 1 << GRPC_OP_RECV_MESSAGE;
225           pending_recv_message_op_ = true;
226           op.data.recv_message.recv_message = &recv_message_;
227           unwinders->push_back([this]() { pending_recv_message_op_ = false; });
228         }
229         break;
230       case api_fuzzer::BatchOp::kReceiveStatusOnClient:
231         op.op = GRPC_OP_RECV_STATUS_ON_CLIENT;
232         op.data.recv_status_on_client.status = &status_;
233         op.data.recv_status_on_client.trailing_metadata =
234             &recv_trailing_metadata_;
235         op.data.recv_status_on_client.status_details = &recv_status_details_;
236         *batch_ops |= 1 << GRPC_OP_RECV_STATUS_ON_CLIENT;
237         break;
238       case api_fuzzer::BatchOp::kReceiveCloseOnServer:
239         op.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
240         *batch_ops |= 1 << GRPC_OP_RECV_CLOSE_ON_SERVER;
241         op.data.recv_close_on_server.cancelled = &cancelled_;
242         break;
243     }
244     op.reserved = nullptr;
245     op.flags = batch_op.flags();
246     return op;
247   }
248 
FinishedBatchValidator(uint8_t has_ops)249   Validator* FinishedBatchValidator(uint8_t has_ops) {
250     ++pending_ops_;
251     auto self = shared_from_this();
252     return MakeValidator([self, has_ops](bool /*success*/) {
253       --self->pending_ops_;
254       if (has_ops & (1u << GRPC_OP_RECV_MESSAGE)) {
255         self->pending_recv_message_op_ = false;
256         if (self->recv_message_ != nullptr) {
257           grpc_byte_buffer_destroy(self->recv_message_);
258           self->recv_message_ = nullptr;
259         }
260       }
261       if ((has_ops & (1u << GRPC_OP_SEND_MESSAGE))) {
262         grpc_byte_buffer_destroy(self->send_message_);
263         self->send_message_ = nullptr;
264       }
265       if ((has_ops & (1u << GRPC_OP_RECV_STATUS_ON_CLIENT)) ||
266           (has_ops & (1u << GRPC_OP_RECV_CLOSE_ON_SERVER))) {
267         self->call_closed_ = true;
268       }
269     });
270   }
271 
FinishedRequestCall()272   Validator* FinishedRequestCall() {
273     ++pending_ops_;
274     auto self = shared_from_this();
275     return MakeValidator([self](bool success) {
276       GPR_ASSERT(self->pending_ops_ > 0);
277       --self->pending_ops_;
278       if (success) {
279         GPR_ASSERT(self->call_ != nullptr);
280         self->type_ = CallType::SERVER;
281       } else {
282         self->type_ = CallType::TOMBSTONED;
283       }
284     });
285   }
286 
287  private:
288   CallType type_;
289   grpc_call* call_ = nullptr;
290   grpc_byte_buffer* recv_message_ = nullptr;
291   grpc_status_code status_;
292   grpc_metadata_array recv_initial_metadata_{0, 0, nullptr};
293   grpc_metadata_array recv_trailing_metadata_{0, 0, nullptr};
294   grpc_slice recv_status_details_ = grpc_empty_slice();
295   // set by receive close on server, unset here to trigger
296   // msan if misused
297   int cancelled_;
298   int pending_ops_ = 0;
299   bool sent_initial_metadata_ = false;
300   bool enqueued_recv_initial_metadata_ = false;
301   grpc_call_details call_details_{};
302   grpc_byte_buffer* send_message_ = nullptr;
303   bool call_closed_ = false;
304   bool pending_recv_message_op_ = false;
305 
306   std::vector<void*> free_pointers_;
307   std::vector<grpc_slice> unref_slices_;
308 };
309 
~Call()310 Call::~Call() {
311   if (call_ != nullptr) {
312     grpc_call_unref(call_);
313   }
314   grpc_slice_unref(recv_status_details_);
315   grpc_call_details_destroy(&call_details_);
316 
317   for (auto* p : free_pointers_) {
318     gpr_free(p);
319   }
320   for (auto s : unref_slices_) {
321     grpc_slice_unref(s);
322   }
323 
324   if (recv_message_ != nullptr) {
325     grpc_byte_buffer_destroy(recv_message_);
326     recv_message_ = nullptr;
327   }
328 
329   grpc_metadata_array_destroy(&recv_initial_metadata_);
330   grpc_metadata_array_destroy(&recv_trailing_metadata_);
331 }
332 
333 namespace {
ValidateConnectivityWatch(gpr_timespec deadline,int * counter)334 Validator* ValidateConnectivityWatch(gpr_timespec deadline, int* counter) {
335   return MakeValidator([deadline, counter](bool success) {
336     if (!success) {
337       auto now = gpr_now(deadline.clock_type);
338       GPR_ASSERT(gpr_time_cmp(now, deadline) >= 0);
339     }
340     --*counter;
341   });
342 }
343 }  // namespace
344 
345 using ::grpc_event_engine::experimental::FuzzingEventEngine;
346 using ::grpc_event_engine::experimental::GetDefaultEventEngine;
347 using ::grpc_event_engine::experimental::SetEventEngineFactory;
348 
BasicFuzzer(const fuzzing_event_engine::Actions & actions)349 BasicFuzzer::BasicFuzzer(const fuzzing_event_engine::Actions& actions)
350     : engine_([actions]() {
351         SetEventEngineFactory(
352             [actions]() -> std::unique_ptr<
353                             grpc_event_engine::experimental::EventEngine> {
354               return std::make_unique<FuzzingEventEngine>(
355                   FuzzingEventEngine::Options(), actions);
356             });
357         return std::dynamic_pointer_cast<FuzzingEventEngine>(
358             GetDefaultEventEngine());
359       }()) {
360   grpc_timer_manager_set_start_threaded(false);
361   grpc_init();
362   {
363     ExecCtx exec_ctx;
364     Executor::SetThreadingAll(false);
365   }
366   resource_quota_ = MakeResourceQuota("fuzzer");
367   cq_ = grpc_completion_queue_create_for_next(nullptr);
368 }
369 
~BasicFuzzer()370 BasicFuzzer::~BasicFuzzer() {
371   GPR_ASSERT(ActiveCall() == nullptr);
372   GPR_ASSERT(calls_.empty());
373 
374   engine_->TickUntilIdle();
375 
376   grpc_completion_queue_shutdown(cq_);
377   GPR_ASSERT(PollCq() == Result::kComplete);
378   grpc_completion_queue_destroy(cq_);
379 
380   grpc_shutdown_blocking();
381   engine_->UnsetGlobalHooks();
382 }
383 
Tick()384 void BasicFuzzer::Tick() {
385   engine_->Tick();
386   grpc_timer_manager_tick();
387 }
388 
PollCq()389 BasicFuzzer::Result BasicFuzzer::PollCq() {
390   grpc_event ev = grpc_completion_queue_next(
391       cq_, gpr_inf_past(GPR_CLOCK_REALTIME), nullptr);
392   switch (ev.type) {
393     case GRPC_OP_COMPLETE: {
394       static_cast<Validator*>(ev.tag)->Run(ev.success);
395       break;
396     }
397     case GRPC_QUEUE_TIMEOUT:
398       break;
399     case GRPC_QUEUE_SHUTDOWN:
400       return Result::kComplete;
401   }
402   return Result::kPending;
403 }
404 
CheckConnectivity(bool try_to_connect)405 BasicFuzzer::Result BasicFuzzer::CheckConnectivity(bool try_to_connect) {
406   if (channel() != nullptr) {
407     grpc_channel_check_connectivity_state(channel(), try_to_connect);
408   } else {
409     return Result::kFailed;
410   }
411   return Result::kComplete;
412 }
413 
WatchConnectivity(uint32_t duration_us)414 BasicFuzzer::Result BasicFuzzer::WatchConnectivity(uint32_t duration_us) {
415   if (channel() != nullptr) {
416     grpc_connectivity_state st =
417         grpc_channel_check_connectivity_state(channel(), 0);
418     if (st != GRPC_CHANNEL_SHUTDOWN) {
419       gpr_timespec deadline =
420           gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
421                        gpr_time_from_micros(duration_us, GPR_TIMESPAN));
422       grpc_channel_watch_connectivity_state(
423           channel(), st, deadline, cq_,
424           ValidateConnectivityWatch(deadline, &pending_channel_watches_));
425       pending_channel_watches_++;
426     }
427   } else {
428     return Result::kFailed;
429   }
430   return Result::kComplete;
431 }
432 
CancelAllCallsIfShutdown()433 BasicFuzzer::Result BasicFuzzer::CancelAllCallsIfShutdown() {
434   if (server() != nullptr && server_shutdown_) {
435     grpc_server_cancel_all_calls(server());
436   } else {
437     return Result::kFailed;
438   }
439   return Result::kComplete;
440 }
441 
ShutdownServer()442 BasicFuzzer::Result BasicFuzzer::ShutdownServer() {
443   if (server() != nullptr) {
444     grpc_server_shutdown_and_notify(
445         server(), cq_, AssertSuccessAndDecrement(&pending_server_shutdowns_));
446     pending_server_shutdowns_++;
447     server_shutdown_ = true;
448   } else {
449     return Result::kFailed;
450   }
451   return Result::kComplete;
452 }
453 
DestroyServerIfReady()454 BasicFuzzer::Result BasicFuzzer::DestroyServerIfReady() {
455   if (server() != nullptr && server_shutdown_ &&
456       pending_server_shutdowns_ == 0) {
457     DestroyServer();
458   } else {
459     return Result::kFailed;
460   }
461   return Result::kComplete;
462 }
463 
CreateCall(const api_fuzzer::CreateCall & create_call)464 BasicFuzzer::Result BasicFuzzer::CreateCall(
465     const api_fuzzer::CreateCall& create_call) {
466   bool ok = true;
467   if (channel() == nullptr) ok = false;
468   // If the active call is a server call, then use it as the parent call
469   // to exercise the propagation logic.
470   Call* parent_call = ActiveCall();
471   if (parent_call != nullptr && parent_call->type() != CallType::SERVER) {
472     parent_call = nullptr;
473   }
474   calls_.emplace_back(new Call(CallType::CLIENT));
475   grpc_slice method = calls_.back()->ReadSlice(create_call.method());
476   if (GRPC_SLICE_LENGTH(method) == 0) {
477     ok = false;
478   }
479   grpc_slice host = calls_.back()->ReadSlice(create_call.host());
480   gpr_timespec deadline =
481       gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
482                    gpr_time_from_micros(create_call.timeout(), GPR_TIMESPAN));
483 
484   if (ok) {
485     calls_.back()->SetCall(grpc_channel_create_call(
486         channel(), parent_call == nullptr ? nullptr : parent_call->call(),
487         create_call.propagation_mask(), cq_, method, &host, deadline, nullptr));
488   } else {
489     calls_.pop_back();
490     return Result::kFailed;
491   }
492   return Result::kComplete;
493 }
494 
ChangeActiveCall()495 BasicFuzzer::Result BasicFuzzer::ChangeActiveCall() {
496   active_call_++;
497   ActiveCall();
498   return Result::kComplete;
499 }
500 
QueueBatchForActiveCall(const api_fuzzer::Batch & queue_batch)501 BasicFuzzer::Result BasicFuzzer::QueueBatchForActiveCall(
502     const api_fuzzer::Batch& queue_batch) {
503   auto* active_call = ActiveCall();
504   if (active_call == nullptr ||
505       active_call->type() == CallType::PENDING_SERVER ||
506       active_call->call() == nullptr) {
507     return Result::kFailed;
508   }
509   const auto& batch = queue_batch.operations();
510   if (batch.size() > 6) {
511     return Result::kFailed;
512   }
513   std::vector<grpc_op> ops;
514   bool ok = true;
515   uint8_t has_ops = 0;
516   std::vector<std::function<void()>> unwinders;
517   for (const auto& batch_op : batch) {
518     auto op = active_call->ReadOp(batch_op, &ok, &has_ops, &unwinders);
519     if (!op.has_value()) continue;
520     ops.push_back(*op);
521   }
522 
523   if (channel() == nullptr) ok = false;
524   if (ok) {
525     auto* v = active_call->FinishedBatchValidator(has_ops);
526     grpc_call_error error = grpc_call_start_batch(
527         active_call->call(), ops.data(), ops.size(), v, nullptr);
528     if (error != GRPC_CALL_OK) {
529       v->Run(false);
530     }
531   } else {
532     for (auto& unwind : unwinders) {
533       unwind();
534     }
535     return Result::kFailed;
536   }
537   return Result::kComplete;
538 }
539 
CancelActiveCall()540 BasicFuzzer::Result BasicFuzzer::CancelActiveCall() {
541   auto* active_call = ActiveCall();
542   if (active_call != nullptr && active_call->call() != nullptr) {
543     grpc_call_cancel(active_call->call(), nullptr);
544   } else {
545     return Result::kFailed;
546   }
547   return Result::kComplete;
548 }
549 
Pause(Duration duration)550 BasicFuzzer::Result BasicFuzzer::Pause(Duration duration) {
551   ++paused_;
552   engine()->RunAfterExactly(duration, [this]() { --paused_; });
553   return Result::kComplete;
554 }
555 
ServerRequestCall()556 BasicFuzzer::Result BasicFuzzer::ServerRequestCall() {
557   if (server() == nullptr) {
558     return Result::kFailed;
559   }
560   calls_.emplace_back(new Call(CallType::PENDING_SERVER));
561   calls_.back()->RequestCall(server(), cq_);
562   return Result::kComplete;
563 }
564 
DestroyActiveCall()565 BasicFuzzer::Result BasicFuzzer::DestroyActiveCall() {
566   auto* active_call = ActiveCall();
567   if (active_call != nullptr &&
568       active_call->type() != CallType::PENDING_SERVER &&
569       active_call->call() != nullptr) {
570     calls_[active_call_]->Shutdown();
571   } else {
572     return Result::kFailed;
573   }
574   return Result::kComplete;
575 }
576 
ValidatePeerForActiveCall()577 BasicFuzzer::Result BasicFuzzer::ValidatePeerForActiveCall() {
578   auto* active_call = ActiveCall();
579   if (active_call != nullptr && active_call->call() != nullptr) {
580     free_non_null(grpc_call_get_peer(active_call->call()));
581   } else {
582     return Result::kFailed;
583   }
584   return Result::kComplete;
585 }
586 
ValidateChannelTarget()587 BasicFuzzer::Result BasicFuzzer::ValidateChannelTarget() {
588   if (channel() != nullptr) {
589     free_non_null(grpc_channel_get_target(channel()));
590   } else {
591     return Result::kFailed;
592   }
593   return Result::kComplete;
594 }
595 
ResizeResourceQuota(uint32_t resize_resource_quota)596 BasicFuzzer::Result BasicFuzzer::ResizeResourceQuota(
597     uint32_t resize_resource_quota) {
598   ExecCtx exec_ctx;
599   resource_quota_->memory_quota()->SetSize(resize_resource_quota);
600   return Result::kComplete;
601 }
602 
CloseChannel()603 BasicFuzzer::Result BasicFuzzer::CloseChannel() {
604   if (channel() != nullptr) {
605     DestroyChannel();
606   } else {
607     return Result::kFailed;
608   }
609   return Result::kComplete;
610 }
611 
ActiveCall()612 Call* BasicFuzzer::ActiveCall() {
613   while (!calls_.empty()) {
614     if (active_call_ >= calls_.size()) {
615       active_call_ = 0;
616     }
617     if (calls_[active_call_] != nullptr && !calls_[active_call_]->done()) {
618       return calls_[active_call_].get();
619     }
620     calls_.erase(calls_.begin() + active_call_);
621   }
622   return nullptr;
623 }
624 
ShutdownCalls()625 void BasicFuzzer::ShutdownCalls() {
626   for (auto& call : calls_) {
627     if (call == nullptr) continue;
628     if (call->type() == CallType::PENDING_SERVER) continue;
629     call->Shutdown();
630   }
631 }
632 
Continue()633 bool BasicFuzzer::Continue() {
634   return channel() != nullptr || server() != nullptr ||
635          pending_channel_watches_ > 0 || ActiveCall() != nullptr || paused_;
636 }
637 
ExecuteAction(const api_fuzzer::Action & action)638 BasicFuzzer::Result BasicFuzzer::ExecuteAction(
639     const api_fuzzer::Action& action) {
640   gpr_log(GPR_DEBUG, "EXECUTE_ACTION: %s", action.DebugString().c_str());
641   switch (action.type_case()) {
642     case api_fuzzer::Action::TYPE_NOT_SET:
643       return BasicFuzzer::Result::kFailed;
644     // tickle completion queue
645     case api_fuzzer::Action::kPollCq:
646       return PollCq();
647     // create an insecure channel
648     case api_fuzzer::Action::kCreateChannel:
649       return CreateChannel(action.create_channel());
650     // destroy a channel
651     case api_fuzzer::Action::kCloseChannel:
652       return CloseChannel();
653     // bring up a server
654     case api_fuzzer::Action::kCreateServer:
655       return CreateServer(action.create_server());
656     // begin server shutdown
657     case api_fuzzer::Action::kShutdownServer:
658       return ShutdownServer();
659     // cancel all calls if server is shutdown
660     case api_fuzzer::Action::kCancelAllCallsIfShutdown:
661       return CancelAllCallsIfShutdown();
662     // destroy server
663     case api_fuzzer::Action::kDestroyServerIfReady:
664       return DestroyServerIfReady();
665     // check connectivity
666     case api_fuzzer::Action::kCheckConnectivity:
667       return CheckConnectivity(action.check_connectivity());
668     // watch connectivity
669     case api_fuzzer::Action::kWatchConnectivity:
670       return WatchConnectivity(action.watch_connectivity());
671     // create a call
672     case api_fuzzer::Action::kCreateCall:
673       return CreateCall(action.create_call());
674     // switch the 'current' call
675     case api_fuzzer::Action::kChangeActiveCall:
676       return ChangeActiveCall();
677     // queue some ops on a call
678     case api_fuzzer::Action::kQueueBatch:
679       return QueueBatchForActiveCall(action.queue_batch());
680     // cancel current call
681     case api_fuzzer::Action::kCancelCall:
682       return CancelActiveCall();
683     // get a calls peer
684     case api_fuzzer::Action::kGetPeer:
685       return ValidatePeerForActiveCall();
686     // get a channels target
687     case api_fuzzer::Action::kGetTarget:
688       return ValidateChannelTarget();
689     // send a ping on a channel
690     case api_fuzzer::Action::kPing:
691       // Ping is no longer a part of the API
692       return BasicFuzzer::Result::kNotSupported;
693     // enable a tracer
694     case api_fuzzer::Action::kEnableTracer: {
695       grpc_tracer_set_enabled(action.enable_tracer().c_str(), 1);
696       break;
697     }
698     // disable a tracer
699     case api_fuzzer::Action::kDisableTracer: {
700       grpc_tracer_set_enabled(action.disable_tracer().c_str(), 0);
701       break;
702     }
703     // request a server call
704     case api_fuzzer::Action::kRequestCall:
705       return ServerRequestCall();
706     // destroy a call
707     case api_fuzzer::Action::kDestroyCall:
708       return DestroyActiveCall();
709     // resize the buffer pool
710     case api_fuzzer::Action::kResizeResourceQuota:
711       return ResizeResourceQuota(action.resize_resource_quota());
712     case api_fuzzer::Action::kSleepMs:
713       return Pause(std::min(Duration::Milliseconds(action.sleep_ms()),
714                             Duration::Minutes(1)));
715     default:
716       Crash(absl::StrCat("Unsupported Fuzzing Action of type: ",
717                          action.type_case()));
718   }
719   return BasicFuzzer::Result::kComplete;
720 }
721 
TryShutdown()722 void BasicFuzzer::TryShutdown() {
723   engine()->FuzzingDone();
724   if (channel() != nullptr) {
725     DestroyChannel();
726   }
727   if (server() != nullptr) {
728     if (!server_shutdown_called()) {
729       ShutdownServer();
730     }
731     if (server_finished_shutting_down()) {
732       DestroyServer();
733     }
734   }
735   ShutdownCalls();
736 
737   grpc_timer_manager_tick();
738   GPR_ASSERT(PollCq() == Result::kPending);
739 }
740 
Run(absl::Span<const api_fuzzer::Action * const> actions)741 void BasicFuzzer::Run(absl::Span<const api_fuzzer::Action* const> actions) {
742   size_t action_index = 0;
743   auto allow_forced_shutdown = std::make_shared<bool>(false);
744   auto no_more_actions = [&]() { action_index = actions.size(); };
745 
746   engine()->RunAfterExactly(minimum_run_time_, [allow_forced_shutdown] {
747     *allow_forced_shutdown = true;
748   });
749 
750   while (action_index < actions.size() || Continue()) {
751     Tick();
752 
753     if (paused_) continue;
754 
755     if (action_index == actions.size()) {
756       if (*allow_forced_shutdown) TryShutdown();
757       continue;
758     }
759 
760     auto result = ExecuteAction(*actions[action_index++]);
761     if (result == Result::kFailed) {
762       no_more_actions();
763     }
764   }
765 }
766 
767 }  // namespace testing
768 }  // namespace grpc_core
769