1 //
2 //
3 // Copyright 2023 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "test/core/end2end/fuzzers/fuzzing_common.h"
20
21 #include <string.h>
22
23 #include <memory>
24 #include <new>
25
26 #include "absl/strings/str_cat.h"
27 #include "absl/types/optional.h"
28
29 #include <grpc/byte_buffer.h>
30 #include <grpc/event_engine/event_engine.h>
31 #include <grpc/grpc.h>
32 #include <grpc/slice.h>
33 #include <grpc/status.h>
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/time.h>
36
37 #include "src/core/lib/event_engine/default_event_engine.h"
38 #include "src/core/lib/experiments/config.h"
39 #include "src/core/lib/gprpp/crash.h"
40 #include "src/core/lib/iomgr/exec_ctx.h"
41 #include "src/core/lib/iomgr/executor.h"
42 #include "src/core/lib/iomgr/timer_manager.h"
43 #include "src/core/lib/resource_quota/memory_quota.h"
44 #include "src/core/lib/slice/slice_internal.h"
45 #include "src/core/lib/surface/channel.h"
46 #include "test/core/end2end/fuzzers/api_fuzzer.pb.h"
47
48 namespace grpc_core {
49
50 namespace {
__anoneb274ae20202() 51 int force_experiments = []() {
52 ForceEnableExperiment("event_engine_client", true);
53 ForceEnableExperiment("event_engine_listener", true);
54 return 1;
55 }();
56 } // namespace
57
58 namespace testing {
59
free_non_null(void * p)60 static void free_non_null(void* p) {
61 GPR_ASSERT(p != nullptr);
62 gpr_free(p);
63 }
64
65 enum class CallType { CLIENT, SERVER, PENDING_SERVER, TOMBSTONED };
66
67 class Call : public std::enable_shared_from_this<Call> {
68 public:
Call(CallType type)69 explicit Call(CallType type) : type_(type) {
70 grpc_metadata_array_init(&recv_initial_metadata_);
71 grpc_metadata_array_init(&recv_trailing_metadata_);
72 grpc_call_details_init(&call_details_);
73 }
74
75 ~Call();
76
type() const77 CallType type() const { return type_; }
78
done() const79 bool done() const {
80 if ((type_ == CallType::TOMBSTONED || call_closed_) && pending_ops_ == 0) {
81 return true;
82 }
83 if (call_ == nullptr && type() != CallType::PENDING_SERVER) return true;
84 return false;
85 }
86
Shutdown()87 void Shutdown() {
88 if (call_ != nullptr) {
89 grpc_call_cancel(call_, nullptr);
90 type_ = CallType::TOMBSTONED;
91 }
92 }
93
SetCall(grpc_call * call)94 void SetCall(grpc_call* call) {
95 GPR_ASSERT(call_ == nullptr);
96 call_ = call;
97 }
98
call() const99 grpc_call* call() const { return call_; }
100
RequestCall(grpc_server * server,grpc_completion_queue * cq)101 void RequestCall(grpc_server* server, grpc_completion_queue* cq) {
102 auto* v = FinishedRequestCall();
103 grpc_call_error error = grpc_server_request_call(
104 server, &call_, &call_details_, &recv_initial_metadata_, cq, cq, v);
105 if (error != GRPC_CALL_OK) {
106 v->Run(false);
107 }
108 }
109
Allocate(size_t size)110 void* Allocate(size_t size) {
111 void* p = gpr_malloc(size);
112 free_pointers_.push_back(p);
113 return p;
114 }
115
116 template <typename T>
AllocArray(size_t elems)117 T* AllocArray(size_t elems) {
118 return static_cast<T*>(Allocate(sizeof(T) * elems));
119 }
120
121 template <typename T>
NewCopy(T value)122 T* NewCopy(T value) {
123 T* p = AllocArray<T>(1);
124 new (p) T(value);
125 return p;
126 }
127
128 template <typename T>
ReadSlice(const T & s)129 grpc_slice ReadSlice(const T& s) {
130 grpc_slice slice = grpc_slice_from_cpp_string(s.value());
131 unref_slices_.push_back(slice);
132 return slice;
133 }
134
135 template <typename M>
ReadMetadata(const M & metadata)136 grpc_metadata_array ReadMetadata(const M& metadata) {
137 grpc_metadata* m = AllocArray<grpc_metadata>(metadata.size());
138 for (int i = 0; i < metadata.size(); ++i) {
139 m[i].key = ReadSlice(metadata[i].key());
140 m[i].value = ReadSlice(metadata[i].value());
141 }
142 return grpc_metadata_array{static_cast<size_t>(metadata.size()),
143 static_cast<size_t>(metadata.size()), m};
144 }
145
ReadOp(const api_fuzzer::BatchOp & batch_op,bool * batch_is_ok,uint8_t * batch_ops,std::vector<std::function<void ()>> * unwinders)146 absl::optional<grpc_op> ReadOp(
147 const api_fuzzer::BatchOp& batch_op, bool* batch_is_ok,
148 uint8_t* batch_ops, std::vector<std::function<void()>>* unwinders) {
149 grpc_op op;
150 memset(&op, 0, sizeof(op));
151 switch (batch_op.op_case()) {
152 case api_fuzzer::BatchOp::OP_NOT_SET:
153 // invalid value
154 return {};
155 case api_fuzzer::BatchOp::kSendInitialMetadata:
156 if (sent_initial_metadata_) {
157 *batch_is_ok = false;
158 } else {
159 sent_initial_metadata_ = true;
160 op.op = GRPC_OP_SEND_INITIAL_METADATA;
161 *batch_ops |= 1 << GRPC_OP_SEND_INITIAL_METADATA;
162 auto ary = ReadMetadata(batch_op.send_initial_metadata().metadata());
163 op.data.send_initial_metadata.count = ary.count;
164 op.data.send_initial_metadata.metadata = ary.metadata;
165 }
166 break;
167 case api_fuzzer::BatchOp::kSendMessage:
168 op.op = GRPC_OP_SEND_MESSAGE;
169 if (send_message_ != nullptr) {
170 *batch_is_ok = false;
171 } else {
172 *batch_ops |= 1 << GRPC_OP_SEND_MESSAGE;
173 std::vector<grpc_slice> slices;
174 for (const auto& m : batch_op.send_message().message()) {
175 slices.push_back(ReadSlice(m));
176 }
177 send_message_ = op.data.send_message.send_message =
178 grpc_raw_byte_buffer_create(slices.data(), slices.size());
179 unwinders->push_back([this]() {
180 grpc_byte_buffer_destroy(send_message_);
181 send_message_ = nullptr;
182 });
183 }
184 break;
185 case api_fuzzer::BatchOp::kSendCloseFromClient:
186 op.op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
187 *batch_ops |= 1 << GRPC_OP_SEND_CLOSE_FROM_CLIENT;
188 break;
189 case api_fuzzer::BatchOp::kSendStatusFromServer: {
190 op.op = GRPC_OP_SEND_STATUS_FROM_SERVER;
191 *batch_ops |= 1 << GRPC_OP_SEND_STATUS_FROM_SERVER;
192 auto ary = ReadMetadata(batch_op.send_status_from_server().metadata());
193 op.data.send_status_from_server.trailing_metadata_count = ary.count;
194 op.data.send_status_from_server.trailing_metadata = ary.metadata;
195 op.data.send_status_from_server.status = static_cast<grpc_status_code>(
196 batch_op.send_status_from_server().status_code());
197 op.data.send_status_from_server.status_details =
198 batch_op.send_status_from_server().has_status_details()
199 ? NewCopy(ReadSlice(
200 batch_op.send_status_from_server().status_details()))
201 : nullptr;
202 } break;
203 case api_fuzzer::BatchOp::kReceiveInitialMetadata:
204 if (enqueued_recv_initial_metadata_) {
205 *batch_is_ok = false;
206 } else {
207 enqueued_recv_initial_metadata_ = true;
208 op.op = GRPC_OP_RECV_INITIAL_METADATA;
209 *batch_ops |= 1 << GRPC_OP_RECV_INITIAL_METADATA;
210 op.data.recv_initial_metadata.recv_initial_metadata =
211 &recv_initial_metadata_;
212 }
213 break;
214 case api_fuzzer::BatchOp::kReceiveMessage:
215 // Allow only one active pending_recv_message_op to exist. Otherwise if
216 // the previous enqueued recv_message_op is not complete by the time
217 // we get here, then under certain conditions, enqueuing this op will
218 // overwrite the internal call->receiving_buffer maintained by grpc
219 // leading to a memory leak.
220 if (call_closed_ || pending_recv_message_op_) {
221 *batch_is_ok = false;
222 } else {
223 op.op = GRPC_OP_RECV_MESSAGE;
224 *batch_ops |= 1 << GRPC_OP_RECV_MESSAGE;
225 pending_recv_message_op_ = true;
226 op.data.recv_message.recv_message = &recv_message_;
227 unwinders->push_back([this]() { pending_recv_message_op_ = false; });
228 }
229 break;
230 case api_fuzzer::BatchOp::kReceiveStatusOnClient:
231 op.op = GRPC_OP_RECV_STATUS_ON_CLIENT;
232 op.data.recv_status_on_client.status = &status_;
233 op.data.recv_status_on_client.trailing_metadata =
234 &recv_trailing_metadata_;
235 op.data.recv_status_on_client.status_details = &recv_status_details_;
236 *batch_ops |= 1 << GRPC_OP_RECV_STATUS_ON_CLIENT;
237 break;
238 case api_fuzzer::BatchOp::kReceiveCloseOnServer:
239 op.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
240 *batch_ops |= 1 << GRPC_OP_RECV_CLOSE_ON_SERVER;
241 op.data.recv_close_on_server.cancelled = &cancelled_;
242 break;
243 }
244 op.reserved = nullptr;
245 op.flags = batch_op.flags();
246 return op;
247 }
248
FinishedBatchValidator(uint8_t has_ops)249 Validator* FinishedBatchValidator(uint8_t has_ops) {
250 ++pending_ops_;
251 auto self = shared_from_this();
252 return MakeValidator([self, has_ops](bool /*success*/) {
253 --self->pending_ops_;
254 if (has_ops & (1u << GRPC_OP_RECV_MESSAGE)) {
255 self->pending_recv_message_op_ = false;
256 if (self->recv_message_ != nullptr) {
257 grpc_byte_buffer_destroy(self->recv_message_);
258 self->recv_message_ = nullptr;
259 }
260 }
261 if ((has_ops & (1u << GRPC_OP_SEND_MESSAGE))) {
262 grpc_byte_buffer_destroy(self->send_message_);
263 self->send_message_ = nullptr;
264 }
265 if ((has_ops & (1u << GRPC_OP_RECV_STATUS_ON_CLIENT)) ||
266 (has_ops & (1u << GRPC_OP_RECV_CLOSE_ON_SERVER))) {
267 self->call_closed_ = true;
268 }
269 });
270 }
271
FinishedRequestCall()272 Validator* FinishedRequestCall() {
273 ++pending_ops_;
274 auto self = shared_from_this();
275 return MakeValidator([self](bool success) {
276 GPR_ASSERT(self->pending_ops_ > 0);
277 --self->pending_ops_;
278 if (success) {
279 GPR_ASSERT(self->call_ != nullptr);
280 self->type_ = CallType::SERVER;
281 } else {
282 self->type_ = CallType::TOMBSTONED;
283 }
284 });
285 }
286
287 private:
288 CallType type_;
289 grpc_call* call_ = nullptr;
290 grpc_byte_buffer* recv_message_ = nullptr;
291 grpc_status_code status_;
292 grpc_metadata_array recv_initial_metadata_{0, 0, nullptr};
293 grpc_metadata_array recv_trailing_metadata_{0, 0, nullptr};
294 grpc_slice recv_status_details_ = grpc_empty_slice();
295 // set by receive close on server, unset here to trigger
296 // msan if misused
297 int cancelled_;
298 int pending_ops_ = 0;
299 bool sent_initial_metadata_ = false;
300 bool enqueued_recv_initial_metadata_ = false;
301 grpc_call_details call_details_{};
302 grpc_byte_buffer* send_message_ = nullptr;
303 bool call_closed_ = false;
304 bool pending_recv_message_op_ = false;
305
306 std::vector<void*> free_pointers_;
307 std::vector<grpc_slice> unref_slices_;
308 };
309
~Call()310 Call::~Call() {
311 if (call_ != nullptr) {
312 grpc_call_unref(call_);
313 }
314 grpc_slice_unref(recv_status_details_);
315 grpc_call_details_destroy(&call_details_);
316
317 for (auto* p : free_pointers_) {
318 gpr_free(p);
319 }
320 for (auto s : unref_slices_) {
321 grpc_slice_unref(s);
322 }
323
324 if (recv_message_ != nullptr) {
325 grpc_byte_buffer_destroy(recv_message_);
326 recv_message_ = nullptr;
327 }
328
329 grpc_metadata_array_destroy(&recv_initial_metadata_);
330 grpc_metadata_array_destroy(&recv_trailing_metadata_);
331 }
332
333 namespace {
ValidateConnectivityWatch(gpr_timespec deadline,int * counter)334 Validator* ValidateConnectivityWatch(gpr_timespec deadline, int* counter) {
335 return MakeValidator([deadline, counter](bool success) {
336 if (!success) {
337 auto now = gpr_now(deadline.clock_type);
338 GPR_ASSERT(gpr_time_cmp(now, deadline) >= 0);
339 }
340 --*counter;
341 });
342 }
343 } // namespace
344
345 using ::grpc_event_engine::experimental::FuzzingEventEngine;
346 using ::grpc_event_engine::experimental::GetDefaultEventEngine;
347 using ::grpc_event_engine::experimental::SetEventEngineFactory;
348
BasicFuzzer(const fuzzing_event_engine::Actions & actions)349 BasicFuzzer::BasicFuzzer(const fuzzing_event_engine::Actions& actions)
350 : engine_([actions]() {
351 SetEventEngineFactory(
352 [actions]() -> std::unique_ptr<
353 grpc_event_engine::experimental::EventEngine> {
354 return std::make_unique<FuzzingEventEngine>(
355 FuzzingEventEngine::Options(), actions);
356 });
357 return std::dynamic_pointer_cast<FuzzingEventEngine>(
358 GetDefaultEventEngine());
359 }()) {
360 grpc_timer_manager_set_start_threaded(false);
361 grpc_init();
362 {
363 ExecCtx exec_ctx;
364 Executor::SetThreadingAll(false);
365 }
366 resource_quota_ = MakeResourceQuota("fuzzer");
367 cq_ = grpc_completion_queue_create_for_next(nullptr);
368 }
369
~BasicFuzzer()370 BasicFuzzer::~BasicFuzzer() {
371 GPR_ASSERT(ActiveCall() == nullptr);
372 GPR_ASSERT(calls_.empty());
373
374 engine_->TickUntilIdle();
375
376 grpc_completion_queue_shutdown(cq_);
377 GPR_ASSERT(PollCq() == Result::kComplete);
378 grpc_completion_queue_destroy(cq_);
379
380 grpc_shutdown_blocking();
381 engine_->UnsetGlobalHooks();
382 }
383
Tick()384 void BasicFuzzer::Tick() {
385 engine_->Tick();
386 grpc_timer_manager_tick();
387 }
388
PollCq()389 BasicFuzzer::Result BasicFuzzer::PollCq() {
390 grpc_event ev = grpc_completion_queue_next(
391 cq_, gpr_inf_past(GPR_CLOCK_REALTIME), nullptr);
392 switch (ev.type) {
393 case GRPC_OP_COMPLETE: {
394 static_cast<Validator*>(ev.tag)->Run(ev.success);
395 break;
396 }
397 case GRPC_QUEUE_TIMEOUT:
398 break;
399 case GRPC_QUEUE_SHUTDOWN:
400 return Result::kComplete;
401 }
402 return Result::kPending;
403 }
404
CheckConnectivity(bool try_to_connect)405 BasicFuzzer::Result BasicFuzzer::CheckConnectivity(bool try_to_connect) {
406 if (channel() != nullptr) {
407 grpc_channel_check_connectivity_state(channel(), try_to_connect);
408 } else {
409 return Result::kFailed;
410 }
411 return Result::kComplete;
412 }
413
WatchConnectivity(uint32_t duration_us)414 BasicFuzzer::Result BasicFuzzer::WatchConnectivity(uint32_t duration_us) {
415 if (channel() != nullptr) {
416 grpc_connectivity_state st =
417 grpc_channel_check_connectivity_state(channel(), 0);
418 if (st != GRPC_CHANNEL_SHUTDOWN) {
419 gpr_timespec deadline =
420 gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
421 gpr_time_from_micros(duration_us, GPR_TIMESPAN));
422 grpc_channel_watch_connectivity_state(
423 channel(), st, deadline, cq_,
424 ValidateConnectivityWatch(deadline, &pending_channel_watches_));
425 pending_channel_watches_++;
426 }
427 } else {
428 return Result::kFailed;
429 }
430 return Result::kComplete;
431 }
432
CancelAllCallsIfShutdown()433 BasicFuzzer::Result BasicFuzzer::CancelAllCallsIfShutdown() {
434 if (server() != nullptr && server_shutdown_) {
435 grpc_server_cancel_all_calls(server());
436 } else {
437 return Result::kFailed;
438 }
439 return Result::kComplete;
440 }
441
ShutdownServer()442 BasicFuzzer::Result BasicFuzzer::ShutdownServer() {
443 if (server() != nullptr) {
444 grpc_server_shutdown_and_notify(
445 server(), cq_, AssertSuccessAndDecrement(&pending_server_shutdowns_));
446 pending_server_shutdowns_++;
447 server_shutdown_ = true;
448 } else {
449 return Result::kFailed;
450 }
451 return Result::kComplete;
452 }
453
DestroyServerIfReady()454 BasicFuzzer::Result BasicFuzzer::DestroyServerIfReady() {
455 if (server() != nullptr && server_shutdown_ &&
456 pending_server_shutdowns_ == 0) {
457 DestroyServer();
458 } else {
459 return Result::kFailed;
460 }
461 return Result::kComplete;
462 }
463
CreateCall(const api_fuzzer::CreateCall & create_call)464 BasicFuzzer::Result BasicFuzzer::CreateCall(
465 const api_fuzzer::CreateCall& create_call) {
466 bool ok = true;
467 if (channel() == nullptr) ok = false;
468 // If the active call is a server call, then use it as the parent call
469 // to exercise the propagation logic.
470 Call* parent_call = ActiveCall();
471 if (parent_call != nullptr && parent_call->type() != CallType::SERVER) {
472 parent_call = nullptr;
473 }
474 calls_.emplace_back(new Call(CallType::CLIENT));
475 grpc_slice method = calls_.back()->ReadSlice(create_call.method());
476 if (GRPC_SLICE_LENGTH(method) == 0) {
477 ok = false;
478 }
479 grpc_slice host = calls_.back()->ReadSlice(create_call.host());
480 gpr_timespec deadline =
481 gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
482 gpr_time_from_micros(create_call.timeout(), GPR_TIMESPAN));
483
484 if (ok) {
485 calls_.back()->SetCall(grpc_channel_create_call(
486 channel(), parent_call == nullptr ? nullptr : parent_call->call(),
487 create_call.propagation_mask(), cq_, method, &host, deadline, nullptr));
488 } else {
489 calls_.pop_back();
490 return Result::kFailed;
491 }
492 return Result::kComplete;
493 }
494
ChangeActiveCall()495 BasicFuzzer::Result BasicFuzzer::ChangeActiveCall() {
496 active_call_++;
497 ActiveCall();
498 return Result::kComplete;
499 }
500
QueueBatchForActiveCall(const api_fuzzer::Batch & queue_batch)501 BasicFuzzer::Result BasicFuzzer::QueueBatchForActiveCall(
502 const api_fuzzer::Batch& queue_batch) {
503 auto* active_call = ActiveCall();
504 if (active_call == nullptr ||
505 active_call->type() == CallType::PENDING_SERVER ||
506 active_call->call() == nullptr) {
507 return Result::kFailed;
508 }
509 const auto& batch = queue_batch.operations();
510 if (batch.size() > 6) {
511 return Result::kFailed;
512 }
513 std::vector<grpc_op> ops;
514 bool ok = true;
515 uint8_t has_ops = 0;
516 std::vector<std::function<void()>> unwinders;
517 for (const auto& batch_op : batch) {
518 auto op = active_call->ReadOp(batch_op, &ok, &has_ops, &unwinders);
519 if (!op.has_value()) continue;
520 ops.push_back(*op);
521 }
522
523 if (channel() == nullptr) ok = false;
524 if (ok) {
525 auto* v = active_call->FinishedBatchValidator(has_ops);
526 grpc_call_error error = grpc_call_start_batch(
527 active_call->call(), ops.data(), ops.size(), v, nullptr);
528 if (error != GRPC_CALL_OK) {
529 v->Run(false);
530 }
531 } else {
532 for (auto& unwind : unwinders) {
533 unwind();
534 }
535 return Result::kFailed;
536 }
537 return Result::kComplete;
538 }
539
CancelActiveCall()540 BasicFuzzer::Result BasicFuzzer::CancelActiveCall() {
541 auto* active_call = ActiveCall();
542 if (active_call != nullptr && active_call->call() != nullptr) {
543 grpc_call_cancel(active_call->call(), nullptr);
544 } else {
545 return Result::kFailed;
546 }
547 return Result::kComplete;
548 }
549
Pause(Duration duration)550 BasicFuzzer::Result BasicFuzzer::Pause(Duration duration) {
551 ++paused_;
552 engine()->RunAfterExactly(duration, [this]() { --paused_; });
553 return Result::kComplete;
554 }
555
ServerRequestCall()556 BasicFuzzer::Result BasicFuzzer::ServerRequestCall() {
557 if (server() == nullptr) {
558 return Result::kFailed;
559 }
560 calls_.emplace_back(new Call(CallType::PENDING_SERVER));
561 calls_.back()->RequestCall(server(), cq_);
562 return Result::kComplete;
563 }
564
DestroyActiveCall()565 BasicFuzzer::Result BasicFuzzer::DestroyActiveCall() {
566 auto* active_call = ActiveCall();
567 if (active_call != nullptr &&
568 active_call->type() != CallType::PENDING_SERVER &&
569 active_call->call() != nullptr) {
570 calls_[active_call_]->Shutdown();
571 } else {
572 return Result::kFailed;
573 }
574 return Result::kComplete;
575 }
576
ValidatePeerForActiveCall()577 BasicFuzzer::Result BasicFuzzer::ValidatePeerForActiveCall() {
578 auto* active_call = ActiveCall();
579 if (active_call != nullptr && active_call->call() != nullptr) {
580 free_non_null(grpc_call_get_peer(active_call->call()));
581 } else {
582 return Result::kFailed;
583 }
584 return Result::kComplete;
585 }
586
ValidateChannelTarget()587 BasicFuzzer::Result BasicFuzzer::ValidateChannelTarget() {
588 if (channel() != nullptr) {
589 free_non_null(grpc_channel_get_target(channel()));
590 } else {
591 return Result::kFailed;
592 }
593 return Result::kComplete;
594 }
595
ResizeResourceQuota(uint32_t resize_resource_quota)596 BasicFuzzer::Result BasicFuzzer::ResizeResourceQuota(
597 uint32_t resize_resource_quota) {
598 ExecCtx exec_ctx;
599 resource_quota_->memory_quota()->SetSize(resize_resource_quota);
600 return Result::kComplete;
601 }
602
CloseChannel()603 BasicFuzzer::Result BasicFuzzer::CloseChannel() {
604 if (channel() != nullptr) {
605 DestroyChannel();
606 } else {
607 return Result::kFailed;
608 }
609 return Result::kComplete;
610 }
611
ActiveCall()612 Call* BasicFuzzer::ActiveCall() {
613 while (!calls_.empty()) {
614 if (active_call_ >= calls_.size()) {
615 active_call_ = 0;
616 }
617 if (calls_[active_call_] != nullptr && !calls_[active_call_]->done()) {
618 return calls_[active_call_].get();
619 }
620 calls_.erase(calls_.begin() + active_call_);
621 }
622 return nullptr;
623 }
624
ShutdownCalls()625 void BasicFuzzer::ShutdownCalls() {
626 for (auto& call : calls_) {
627 if (call == nullptr) continue;
628 if (call->type() == CallType::PENDING_SERVER) continue;
629 call->Shutdown();
630 }
631 }
632
Continue()633 bool BasicFuzzer::Continue() {
634 return channel() != nullptr || server() != nullptr ||
635 pending_channel_watches_ > 0 || ActiveCall() != nullptr || paused_;
636 }
637
ExecuteAction(const api_fuzzer::Action & action)638 BasicFuzzer::Result BasicFuzzer::ExecuteAction(
639 const api_fuzzer::Action& action) {
640 gpr_log(GPR_DEBUG, "EXECUTE_ACTION: %s", action.DebugString().c_str());
641 switch (action.type_case()) {
642 case api_fuzzer::Action::TYPE_NOT_SET:
643 return BasicFuzzer::Result::kFailed;
644 // tickle completion queue
645 case api_fuzzer::Action::kPollCq:
646 return PollCq();
647 // create an insecure channel
648 case api_fuzzer::Action::kCreateChannel:
649 return CreateChannel(action.create_channel());
650 // destroy a channel
651 case api_fuzzer::Action::kCloseChannel:
652 return CloseChannel();
653 // bring up a server
654 case api_fuzzer::Action::kCreateServer:
655 return CreateServer(action.create_server());
656 // begin server shutdown
657 case api_fuzzer::Action::kShutdownServer:
658 return ShutdownServer();
659 // cancel all calls if server is shutdown
660 case api_fuzzer::Action::kCancelAllCallsIfShutdown:
661 return CancelAllCallsIfShutdown();
662 // destroy server
663 case api_fuzzer::Action::kDestroyServerIfReady:
664 return DestroyServerIfReady();
665 // check connectivity
666 case api_fuzzer::Action::kCheckConnectivity:
667 return CheckConnectivity(action.check_connectivity());
668 // watch connectivity
669 case api_fuzzer::Action::kWatchConnectivity:
670 return WatchConnectivity(action.watch_connectivity());
671 // create a call
672 case api_fuzzer::Action::kCreateCall:
673 return CreateCall(action.create_call());
674 // switch the 'current' call
675 case api_fuzzer::Action::kChangeActiveCall:
676 return ChangeActiveCall();
677 // queue some ops on a call
678 case api_fuzzer::Action::kQueueBatch:
679 return QueueBatchForActiveCall(action.queue_batch());
680 // cancel current call
681 case api_fuzzer::Action::kCancelCall:
682 return CancelActiveCall();
683 // get a calls peer
684 case api_fuzzer::Action::kGetPeer:
685 return ValidatePeerForActiveCall();
686 // get a channels target
687 case api_fuzzer::Action::kGetTarget:
688 return ValidateChannelTarget();
689 // send a ping on a channel
690 case api_fuzzer::Action::kPing:
691 // Ping is no longer a part of the API
692 return BasicFuzzer::Result::kNotSupported;
693 // enable a tracer
694 case api_fuzzer::Action::kEnableTracer: {
695 grpc_tracer_set_enabled(action.enable_tracer().c_str(), 1);
696 break;
697 }
698 // disable a tracer
699 case api_fuzzer::Action::kDisableTracer: {
700 grpc_tracer_set_enabled(action.disable_tracer().c_str(), 0);
701 break;
702 }
703 // request a server call
704 case api_fuzzer::Action::kRequestCall:
705 return ServerRequestCall();
706 // destroy a call
707 case api_fuzzer::Action::kDestroyCall:
708 return DestroyActiveCall();
709 // resize the buffer pool
710 case api_fuzzer::Action::kResizeResourceQuota:
711 return ResizeResourceQuota(action.resize_resource_quota());
712 case api_fuzzer::Action::kSleepMs:
713 return Pause(std::min(Duration::Milliseconds(action.sleep_ms()),
714 Duration::Minutes(1)));
715 default:
716 Crash(absl::StrCat("Unsupported Fuzzing Action of type: ",
717 action.type_case()));
718 }
719 return BasicFuzzer::Result::kComplete;
720 }
721
TryShutdown()722 void BasicFuzzer::TryShutdown() {
723 engine()->FuzzingDone();
724 if (channel() != nullptr) {
725 DestroyChannel();
726 }
727 if (server() != nullptr) {
728 if (!server_shutdown_called()) {
729 ShutdownServer();
730 }
731 if (server_finished_shutting_down()) {
732 DestroyServer();
733 }
734 }
735 ShutdownCalls();
736
737 grpc_timer_manager_tick();
738 GPR_ASSERT(PollCq() == Result::kPending);
739 }
740
Run(absl::Span<const api_fuzzer::Action * const> actions)741 void BasicFuzzer::Run(absl::Span<const api_fuzzer::Action* const> actions) {
742 size_t action_index = 0;
743 auto allow_forced_shutdown = std::make_shared<bool>(false);
744 auto no_more_actions = [&]() { action_index = actions.size(); };
745
746 engine()->RunAfterExactly(minimum_run_time_, [allow_forced_shutdown] {
747 *allow_forced_shutdown = true;
748 });
749
750 while (action_index < actions.size() || Continue()) {
751 Tick();
752
753 if (paused_) continue;
754
755 if (action_index == actions.size()) {
756 if (*allow_forced_shutdown) TryShutdown();
757 continue;
758 }
759
760 auto result = ExecuteAction(*actions[action_index++]);
761 if (result == Result::kFailed) {
762 no_more_actions();
763 }
764 }
765 }
766
767 } // namespace testing
768 } // namespace grpc_core
769