1 // Copyright 2023 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 // clang-format off
16 #include "pw_rpc/internal/log_config.h" // PW_LOG_* macros must be first.
17
18 #include "pw_rpc/fuzz/engine.h"
19 // clang-format on
20
21 #include <algorithm>
22 #include <cctype>
23 #include <chrono>
24 #include <cinttypes>
25 #include <limits>
26 #include <mutex>
27
28 #include "pw_assert/check.h"
29 #include "pw_bytes/span.h"
30 #include "pw_log/log.h"
31 #include "pw_span/span.h"
32 #include "pw_status/status.h"
33 #include "pw_string/format.h"
34
35 namespace pw::rpc::fuzz {
36 namespace {
37
38 using namespace std::chrono_literals;
39
40 // Maximum number of bytes written in a single unary or stream request.
41 constexpr size_t kMaxWriteLen = MaxSafePayloadSize();
42 static_assert(kMaxWriteLen * 0x7E <= std::numeric_limits<uint16_t>::max());
43
44 struct ActiveVisitor final {
45 using result_type = bool;
operator ()pw::rpc::fuzz::__anon735a4fdd0111::ActiveVisitor46 result_type operator()(std::monostate&) { return false; }
operator ()pw::rpc::fuzz::__anon735a4fdd0111::ActiveVisitor47 result_type operator()(pw::rpc::RawUnaryReceiver& call) {
48 return call.active();
49 }
operator ()pw::rpc::fuzz::__anon735a4fdd0111::ActiveVisitor50 result_type operator()(pw::rpc::RawClientReaderWriter& call) {
51 return call.active();
52 }
53 };
54
55 struct CloseClientStreamVisitor final {
56 using result_type = void;
operator ()pw::rpc::fuzz::__anon735a4fdd0111::CloseClientStreamVisitor57 result_type operator()(std::monostate&) {}
operator ()pw::rpc::fuzz::__anon735a4fdd0111::CloseClientStreamVisitor58 result_type operator()(pw::rpc::RawUnaryReceiver&) {}
operator ()pw::rpc::fuzz::__anon735a4fdd0111::CloseClientStreamVisitor59 result_type operator()(pw::rpc::RawClientReaderWriter& call) {
60 call.RequestCompletion().IgnoreError();
61 }
62 };
63
64 struct WriteVisitor final {
65 using result_type = bool;
operator ()pw::rpc::fuzz::__anon735a4fdd0111::WriteVisitor66 result_type operator()(std::monostate&) { return false; }
operator ()pw::rpc::fuzz::__anon735a4fdd0111::WriteVisitor67 result_type operator()(pw::rpc::RawUnaryReceiver&) { return false; }
operator ()pw::rpc::fuzz::__anon735a4fdd0111::WriteVisitor68 result_type operator()(pw::rpc::RawClientReaderWriter& call) {
69 if (!call.active()) {
70 return false;
71 }
72 call.Write(data).IgnoreError();
73 return true;
74 }
75 ConstByteSpan data;
76 };
77
78 struct CancelVisitor final {
79 using result_type = void;
operator ()pw::rpc::fuzz::__anon735a4fdd0111::CancelVisitor80 result_type operator()(std::monostate&) {}
operator ()pw::rpc::fuzz::__anon735a4fdd0111::CancelVisitor81 result_type operator()(pw::rpc::RawUnaryReceiver& call) {
82 call.Cancel().IgnoreError();
83 }
operator ()pw::rpc::fuzz::__anon735a4fdd0111::CancelVisitor84 result_type operator()(pw::rpc::RawClientReaderWriter& call) {
85 call.Cancel().IgnoreError();
86 }
87 };
88
89 struct AbandonVisitor final {
90 using result_type = void;
operator ()pw::rpc::fuzz::__anon735a4fdd0111::AbandonVisitor91 result_type operator()(std::monostate&) {}
operator ()pw::rpc::fuzz::__anon735a4fdd0111::AbandonVisitor92 result_type operator()(pw::rpc::RawUnaryReceiver& call) { call.Abandon(); }
operator ()pw::rpc::fuzz::__anon735a4fdd0111::AbandonVisitor93 result_type operator()(pw::rpc::RawClientReaderWriter& call) {
94 call.Abandon();
95 }
96 };
97
98 } // namespace
99
100 // `Action` methods.
101
Action(uint32_t encoded)102 Action::Action(uint32_t encoded) {
103 // The first byte is used to determine the operation. The ranges used set the
104 // relative likelihood of each result, e.g. `kWait` is more likely than
105 // `kAbandon`.
106 uint32_t raw = encoded & 0xFF;
107 if (raw == 0) {
108 op = kSkip;
109 } else if (raw < 0x60) {
110 op = kWait;
111 } else if (raw < 0x80) {
112 op = kWriteUnary;
113 } else if (raw < 0xA0) {
114 op = kWriteStream;
115 } else if (raw < 0xC0) {
116 op = kCloseClientStream;
117 } else if (raw < 0xD0) {
118 op = kCancel;
119 } else if (raw < 0xE0) {
120 op = kAbandon;
121 } else if (raw < 0xF0) {
122 op = kSwap;
123 } else {
124 op = kDestroy;
125 }
126 target = ((encoded & 0xFF00) >> 8) % Fuzzer::kMaxConcurrentCalls;
127 value = encoded >> 16;
128 }
129
Action(Op op_,size_t target_,uint16_t value_)130 Action::Action(Op op_, size_t target_, uint16_t value_)
131 : op(op_), target(target_), value(value_) {}
132
Action(Op op_,size_t target_,char val,size_t len)133 Action::Action(Op op_, size_t target_, char val, size_t len)
134 : op(op_), target(target_) {
135 PW_ASSERT(op == kWriteUnary || op == kWriteStream);
136 value = static_cast<uint16_t>(((val % 0x80) * kMaxWriteLen) +
137 (len % kMaxWriteLen));
138 }
139
DecodeWriteValue(uint16_t value)140 char Action::DecodeWriteValue(uint16_t value) {
141 return static_cast<char>((value / kMaxWriteLen) % 0x7F);
142 }
143
DecodeWriteLength(uint16_t value)144 size_t Action::DecodeWriteLength(uint16_t value) {
145 return value % kMaxWriteLen;
146 }
147
Encode() const148 uint32_t Action::Encode() const {
149 uint32_t encoded = 0;
150 switch (op) {
151 case kSkip:
152 encoded = 0x00;
153 break;
154 case kWait:
155 encoded = 0x5F;
156 break;
157 case kWriteUnary:
158 encoded = 0x7F;
159 break;
160 case kWriteStream:
161 encoded = 0x9F;
162 break;
163 case kCloseClientStream:
164 encoded = 0xBF;
165 break;
166 case kCancel:
167 encoded = 0xCF;
168 break;
169 case kAbandon:
170 encoded = 0xDF;
171 break;
172 case kSwap:
173 encoded = 0xEF;
174 break;
175 case kDestroy:
176 encoded = 0xFF;
177 break;
178 }
179 encoded |=
180 ((target < Fuzzer::kMaxConcurrentCalls ? target
181 : Fuzzer::kMaxConcurrentCalls) %
182 0xFF)
183 << 8;
184 encoded |= (static_cast<uint32_t>(value) << 16);
185 return encoded;
186 }
187
Log(bool verbose,size_t num_actions,const char * fmt,...) const188 void Action::Log(bool verbose, size_t num_actions, const char* fmt, ...) const {
189 if (!verbose) {
190 return;
191 }
192 char s1[16];
193 auto result = callback_id < Fuzzer::kMaxConcurrentCalls
194 ? string::Format(s1, "%-3zu", callback_id)
195 : string::Format(s1, "n/a");
196 va_list ap;
197 va_start(ap, fmt);
198 char s2[128];
199 if (result.ok()) {
200 result = string::FormatVaList(s2, fmt, ap);
201 }
202 va_end(ap);
203 if (result.ok()) {
204 PW_LOG_INFO("#%-12zu\tthread: %zu\tcallback for: %s\ttarget call: %zu\t%s",
205 num_actions,
206 thread_id,
207 s1,
208 target,
209 s2);
210 } else {
211 LogFailure(verbose, num_actions, result.status());
212 }
213 }
214
LogFailure(bool verbose,size_t num_actions,Status status) const215 void Action::LogFailure(bool verbose, size_t num_actions, Status status) const {
216 if (verbose && !status.ok()) {
217 PW_LOG_INFO("#%-12zu\tthread: %zu\tFailed to log action: %s",
218 num_actions,
219 thread_id,
220 pw_StatusString(status));
221 }
222 }
223
224 // FuzzyCall methods.
225
RecordWrite(size_t num,bool append)226 void FuzzyCall::RecordWrite(size_t num, bool append) {
227 std::lock_guard lock(mutex_);
228 if (append) {
229 last_write_ += num;
230 } else {
231 last_write_ = num;
232 }
233 total_written_ += num;
234 pending_ = true;
235 }
236
Await()237 void FuzzyCall::Await() {
238 std::unique_lock<sync::Mutex> lock(mutex_);
239 cv_.wait(lock, [this]() PW_NO_LOCK_SAFETY_ANALYSIS { return !pending_; });
240 }
241
Notify()242 void FuzzyCall::Notify() {
243 if (pending_.exchange(false)) {
244 cv_.notify_all();
245 }
246 }
247
Swap(FuzzyCall & other)248 void FuzzyCall::Swap(FuzzyCall& other) {
249 if (index_ == other.index_) {
250 return;
251 }
252 // Manually acquire locks in an order based on call IDs to prevent deadlock.
253 if (index_ < other.index_) {
254 mutex_.lock();
255 other.mutex_.lock();
256 } else {
257 other.mutex_.lock();
258 mutex_.lock();
259 }
260 call_.swap(other.call_);
261 std::swap(id_, other.id_);
262 pending_ = other.pending_.exchange(pending_);
263 std::swap(last_write_, other.last_write_);
264 std::swap(total_written_, other.total_written_);
265 mutex_.unlock();
266 other.mutex_.unlock();
267 cv_.notify_all();
268 other.cv_.notify_all();
269 }
270
Reset(Variant call)271 void FuzzyCall::Reset(Variant call) {
272 {
273 std::lock_guard lock(mutex_);
274 call_ = std::move(call);
275 }
276 cv_.notify_all();
277 }
278
Log()279 void FuzzyCall::Log() {
280 if (mutex_.try_lock_for(100ms)) {
281 PW_LOG_INFO("call %zu:", index_);
282 PW_LOG_INFO(" active: %s",
283 std::visit(ActiveVisitor(), call_) ? "true" : "false");
284 PW_LOG_INFO(" request pending: %s ", pending_ ? "true" : "false");
285 PW_LOG_INFO(" last write: %zu bytes", last_write_);
286 PW_LOG_INFO(" total written: %zu bytes", total_written_);
287 mutex_.unlock();
288 } else {
289 PW_LOG_WARN("call %zu: failed to acquire lock", index_);
290 }
291 }
292
293 // `Fuzzer` methods.
294
295 #define FUZZ_LOG_VERBOSE(...) \
296 if (verbose_) { \
297 PW_LOG_INFO(__VA_ARGS__); \
298 }
299
Fuzzer(Client & client,uint32_t channel_id)300 Fuzzer::Fuzzer(Client& client, uint32_t channel_id)
301 : client_(client, channel_id),
302 timer_([this](chrono::SystemClock::time_point) {
303 PW_LOG_ERROR(
304 "Workers performed %zu actions before timing out without an "
305 "update.",
306 num_actions_.load());
307 PW_LOG_INFO("Additional call details:");
308 for (auto& call : fuzzy_calls_) {
309 call.Log();
310 }
311 PW_CRASH("Fuzzer found a fatal error condition: TIMEOUT.");
312 }) {
313 for (size_t index = 0; index < kMaxConcurrentCalls; ++index) {
314 fuzzy_calls_.emplace_back(index);
315 indices_.push_back(index);
316 contexts_.push_back(CallbackContext{.id = index, .fuzzer = this});
317 }
318 }
319
Run(uint64_t seed,size_t num_actions)320 void Fuzzer::Run(uint64_t seed, size_t num_actions) {
321 FUZZ_LOG_VERBOSE("Fuzzing RPC client with:");
322 FUZZ_LOG_VERBOSE(" num_actions: %zu", num_actions);
323 FUZZ_LOG_VERBOSE(" seed: %" PRIu64, seed);
324 num_actions_.store(0);
325 random::XorShiftStarRng64 rng(seed);
326 while (true) {
327 {
328 size_t actions_done = num_actions_.load();
329 if (actions_done >= num_actions) {
330 FUZZ_LOG_VERBOSE("Fuzzing complete; %zu actions performed.",
331 actions_done);
332 break;
333 }
334 FUZZ_LOG_VERBOSE("%zu actions remaining.", num_actions - actions_done);
335 }
336 FUZZ_LOG_VERBOSE("Generating %zu random actions.", kMaxActions);
337 pw::Vector<uint32_t, kMaxActions> actions;
338 for (size_t i = 0; i < kNumThreads; ++i) {
339 size_t num_actions_for_thread;
340 rng.GetInt(num_actions_for_thread, kMaxActionsPerThread + 1);
341 for (size_t j = 0; j < num_actions_for_thread; ++j) {
342 uint32_t encoded = 0;
343 while (!encoded) {
344 rng.GetInt(encoded);
345 }
346 actions.push_back(encoded);
347 }
348 actions.push_back(0);
349 }
350 Run(actions);
351 }
352 }
353
Run(const pw::Vector<uint32_t> & actions)354 void Fuzzer::Run(const pw::Vector<uint32_t>& actions) {
355 FUZZ_LOG_VERBOSE("Starting %zu threads to perform %zu actions:",
356 kNumThreads - 1,
357 actions.size());
358 FUZZ_LOG_VERBOSE(" timeout: %lldms", timer_.timeout() / 1ms);
359 auto iter = actions.begin();
360 timer_.Restart();
361 for (size_t thread_id = 0; thread_id < kNumThreads; ++thread_id) {
362 pw::Vector<uint32_t, kMaxActionsPerThread> thread_actions;
363 while (thread_actions.size() < kMaxActionsPerThread &&
364 iter != actions.end()) {
365 uint32_t encoded = *iter++;
366 if (!encoded) {
367 break;
368 }
369 thread_actions.push_back(encoded);
370 }
371 if (thread_id == 0) {
372 std::lock_guard lock(mutex_);
373 callback_actions_ = std::move(thread_actions);
374 callback_iterator_ = callback_actions_.begin();
375 } else {
376 threads_.emplace_back(
377 [this, thread_id, actions_to_perform = std::move(thread_actions)]() {
378 for (const auto& encoded : actions_to_perform) {
379 Action action(encoded);
380 action.set_thread_id(thread_id);
381 Perform(action);
382 }
383 });
384 }
385 }
386 for (auto& t : threads_) {
387 t.join();
388 }
389 for (auto& fuzzy_call : fuzzy_calls_) {
390 fuzzy_call.Reset();
391 }
392 timer_.Cancel();
393 }
394
Perform(const Action & action)395 void Fuzzer::Perform(const Action& action) {
396 FuzzyCall& fuzzy_call = FindCall(action.target);
397 switch (action.op) {
398 case Action::kSkip: {
399 if (action.thread_id == 0) {
400 action.Log(verbose_, ++num_actions_, "Callback chain completed");
401 }
402 break;
403 }
404 case Action::kWait: {
405 if (action.callback_id == action.target) {
406 // Don't wait in a callback of the target call.
407 break;
408 }
409 if (fuzzy_call.pending()) {
410 action.Log(verbose_, ++num_actions_, "Waiting for call.");
411 fuzzy_call.Await();
412 }
413 break;
414 }
415 case Action::kWriteUnary:
416 case Action::kWriteStream: {
417 if (action.callback_id == action.target) {
418 // Don't create a new call from the call's own callback.
419 break;
420 }
421 char buf[kMaxWriteLen];
422 char val = Action::DecodeWriteValue(action.value);
423 size_t len = Action::DecodeWriteLength(action.value);
424 memset(buf, val, len);
425 if (verbose_) {
426 char msg_buf[64];
427 span msg(msg_buf);
428 auto result = string::Format(
429 msg,
430 "Writing %s request of ",
431 action.op == Action::kWriteUnary ? "unary" : "stream");
432 if (result.ok()) {
433 size_t off = result.size();
434 result = string::Format(
435 msg.subspan(off),
436 isprint(val) ? "['%c'; %zu]." : "['\\x%02x'; %zu].",
437 val,
438 len);
439 }
440 size_t num_actions = ++num_actions_;
441 if (result.ok()) {
442 action.Log(verbose_, num_actions, "%s", msg.data());
443 } else if (verbose_) {
444 action.LogFailure(verbose_, num_actions, result.status());
445 }
446 }
447 bool append = false;
448 if (action.op == Action::kWriteUnary) {
449 // Send a unary request.
450 fuzzy_call.Reset(client_.UnaryEcho(
451 as_bytes(span(buf, len)),
452 /* on completed */
453 [context = GetContext(action.target)](ConstByteSpan, Status) {
454 context->fuzzer->OnCompleted(context->id);
455 },
456 /* on error */
457 [context = GetContext(action.target)](Status status) {
458 context->fuzzer->OnError(context->id, status);
459 }));
460
461 } else if (fuzzy_call.Visit(
462 WriteVisitor{.data = as_bytes(span(buf, len))})) {
463 // Append to an existing stream
464 append = true;
465 } else {
466 // .Open a new stream.
467 fuzzy_call.Reset(client_.BidirectionalEcho(
468 /* on next */
469 [context = GetContext(action.target)](ConstByteSpan) {
470 context->fuzzer->OnNext(context->id);
471 },
472 /* on completed */
473 [context = GetContext(action.target)](Status) {
474 context->fuzzer->OnCompleted(context->id);
475 },
476 /* on error */
477 [context = GetContext(action.target)](Status status) {
478 context->fuzzer->OnError(context->id, status);
479 }));
480 }
481 fuzzy_call.RecordWrite(len, append);
482 break;
483 }
484 case Action::kCloseClientStream:
485 action.Log(verbose_, ++num_actions_, "Closing stream.");
486 fuzzy_call.Visit(CloseClientStreamVisitor());
487 break;
488 case Action::kCancel:
489 action.Log(verbose_, ++num_actions_, "Canceling call.");
490 fuzzy_call.Visit(CancelVisitor());
491 break;
492 case Action::kAbandon: {
493 action.Log(verbose_, ++num_actions_, "Abandoning call.");
494 fuzzy_call.Visit(AbandonVisitor());
495 break;
496 }
497 case Action::kSwap: {
498 size_t other_target = action.value % kMaxConcurrentCalls;
499 if (action.callback_id == action.target ||
500 action.callback_id == other_target) {
501 // Don't move a call from within its own callback.
502 break;
503 }
504 action.Log(verbose_,
505 ++num_actions_,
506 "Swapping call with call %zu.",
507 other_target);
508 std::lock_guard lock(mutex_);
509 FuzzyCall& other = FindCallLocked(other_target);
510 std::swap(indices_[fuzzy_call.id()], indices_[other.id()]);
511 fuzzy_call.Swap(other);
512 break;
513 }
514 case Action::kDestroy: {
515 if (action.callback_id == action.target) {
516 // Don't destroy a call from within its own callback.
517 break;
518 }
519 action.Log(verbose_, ++num_actions_, "Destroying call.");
520 fuzzy_call.Reset();
521 break;
522 }
523 default:
524 break;
525 }
526 timer_.Restart();
527 }
528
OnNext(size_t callback_id)529 void Fuzzer::OnNext(size_t callback_id) { FindCall(callback_id).Notify(); }
530
OnCompleted(size_t callback_id)531 void Fuzzer::OnCompleted(size_t callback_id) {
532 uint32_t encoded = 0;
533 {
534 std::lock_guard lock(mutex_);
535 if (callback_iterator_ != callback_actions_.end()) {
536 encoded = *callback_iterator_++;
537 }
538 }
539 Action action(encoded);
540 action.set_callback_id(callback_id);
541 Perform(action);
542 FindCall(callback_id).Notify();
543 }
544
OnError(size_t callback_id,Status status)545 void Fuzzer::OnError(size_t callback_id, Status status) {
546 FuzzyCall& call = FindCall(callback_id);
547 PW_LOG_WARN("Call %zu received an error from the server: %s",
548 call.id(),
549 pw_StatusString(status));
550 call.Notify();
551 }
552
553 } // namespace pw::rpc::fuzz
554