xref: /aosp_15_r20/external/pigweed/pw_rpc/fuzz/engine.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2023 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 // clang-format off
16 #include "pw_rpc/internal/log_config.h"  // PW_LOG_* macros must be first.
17 
18 #include "pw_rpc/fuzz/engine.h"
19 // clang-format on
20 
21 #include <algorithm>
22 #include <cctype>
23 #include <chrono>
24 #include <cinttypes>
25 #include <limits>
26 #include <mutex>
27 
28 #include "pw_assert/check.h"
29 #include "pw_bytes/span.h"
30 #include "pw_log/log.h"
31 #include "pw_span/span.h"
32 #include "pw_status/status.h"
33 #include "pw_string/format.h"
34 
35 namespace pw::rpc::fuzz {
36 namespace {
37 
38 using namespace std::chrono_literals;
39 
40 // Maximum number of bytes written in a single unary or stream request.
41 constexpr size_t kMaxWriteLen = MaxSafePayloadSize();
42 static_assert(kMaxWriteLen * 0x7E <= std::numeric_limits<uint16_t>::max());
43 
44 struct ActiveVisitor final {
45   using result_type = bool;
operator ()pw::rpc::fuzz::__anon735a4fdd0111::ActiveVisitor46   result_type operator()(std::monostate&) { return false; }
operator ()pw::rpc::fuzz::__anon735a4fdd0111::ActiveVisitor47   result_type operator()(pw::rpc::RawUnaryReceiver& call) {
48     return call.active();
49   }
operator ()pw::rpc::fuzz::__anon735a4fdd0111::ActiveVisitor50   result_type operator()(pw::rpc::RawClientReaderWriter& call) {
51     return call.active();
52   }
53 };
54 
55 struct CloseClientStreamVisitor final {
56   using result_type = void;
operator ()pw::rpc::fuzz::__anon735a4fdd0111::CloseClientStreamVisitor57   result_type operator()(std::monostate&) {}
operator ()pw::rpc::fuzz::__anon735a4fdd0111::CloseClientStreamVisitor58   result_type operator()(pw::rpc::RawUnaryReceiver&) {}
operator ()pw::rpc::fuzz::__anon735a4fdd0111::CloseClientStreamVisitor59   result_type operator()(pw::rpc::RawClientReaderWriter& call) {
60     call.RequestCompletion().IgnoreError();
61   }
62 };
63 
64 struct WriteVisitor final {
65   using result_type = bool;
operator ()pw::rpc::fuzz::__anon735a4fdd0111::WriteVisitor66   result_type operator()(std::monostate&) { return false; }
operator ()pw::rpc::fuzz::__anon735a4fdd0111::WriteVisitor67   result_type operator()(pw::rpc::RawUnaryReceiver&) { return false; }
operator ()pw::rpc::fuzz::__anon735a4fdd0111::WriteVisitor68   result_type operator()(pw::rpc::RawClientReaderWriter& call) {
69     if (!call.active()) {
70       return false;
71     }
72     call.Write(data).IgnoreError();
73     return true;
74   }
75   ConstByteSpan data;
76 };
77 
78 struct CancelVisitor final {
79   using result_type = void;
operator ()pw::rpc::fuzz::__anon735a4fdd0111::CancelVisitor80   result_type operator()(std::monostate&) {}
operator ()pw::rpc::fuzz::__anon735a4fdd0111::CancelVisitor81   result_type operator()(pw::rpc::RawUnaryReceiver& call) {
82     call.Cancel().IgnoreError();
83   }
operator ()pw::rpc::fuzz::__anon735a4fdd0111::CancelVisitor84   result_type operator()(pw::rpc::RawClientReaderWriter& call) {
85     call.Cancel().IgnoreError();
86   }
87 };
88 
89 struct AbandonVisitor final {
90   using result_type = void;
operator ()pw::rpc::fuzz::__anon735a4fdd0111::AbandonVisitor91   result_type operator()(std::monostate&) {}
operator ()pw::rpc::fuzz::__anon735a4fdd0111::AbandonVisitor92   result_type operator()(pw::rpc::RawUnaryReceiver& call) { call.Abandon(); }
operator ()pw::rpc::fuzz::__anon735a4fdd0111::AbandonVisitor93   result_type operator()(pw::rpc::RawClientReaderWriter& call) {
94     call.Abandon();
95   }
96 };
97 
98 }  // namespace
99 
100 // `Action` methods.
101 
Action(uint32_t encoded)102 Action::Action(uint32_t encoded) {
103   // The first byte is used to determine the operation. The ranges used set the
104   // relative likelihood of each result, e.g. `kWait` is more likely than
105   // `kAbandon`.
106   uint32_t raw = encoded & 0xFF;
107   if (raw == 0) {
108     op = kSkip;
109   } else if (raw < 0x60) {
110     op = kWait;
111   } else if (raw < 0x80) {
112     op = kWriteUnary;
113   } else if (raw < 0xA0) {
114     op = kWriteStream;
115   } else if (raw < 0xC0) {
116     op = kCloseClientStream;
117   } else if (raw < 0xD0) {
118     op = kCancel;
119   } else if (raw < 0xE0) {
120     op = kAbandon;
121   } else if (raw < 0xF0) {
122     op = kSwap;
123   } else {
124     op = kDestroy;
125   }
126   target = ((encoded & 0xFF00) >> 8) % Fuzzer::kMaxConcurrentCalls;
127   value = encoded >> 16;
128 }
129 
Action(Op op_,size_t target_,uint16_t value_)130 Action::Action(Op op_, size_t target_, uint16_t value_)
131     : op(op_), target(target_), value(value_) {}
132 
Action(Op op_,size_t target_,char val,size_t len)133 Action::Action(Op op_, size_t target_, char val, size_t len)
134     : op(op_), target(target_) {
135   PW_ASSERT(op == kWriteUnary || op == kWriteStream);
136   value = static_cast<uint16_t>(((val % 0x80) * kMaxWriteLen) +
137                                 (len % kMaxWriteLen));
138 }
139 
DecodeWriteValue(uint16_t value)140 char Action::DecodeWriteValue(uint16_t value) {
141   return static_cast<char>((value / kMaxWriteLen) % 0x7F);
142 }
143 
DecodeWriteLength(uint16_t value)144 size_t Action::DecodeWriteLength(uint16_t value) {
145   return value % kMaxWriteLen;
146 }
147 
Encode() const148 uint32_t Action::Encode() const {
149   uint32_t encoded = 0;
150   switch (op) {
151     case kSkip:
152       encoded = 0x00;
153       break;
154     case kWait:
155       encoded = 0x5F;
156       break;
157     case kWriteUnary:
158       encoded = 0x7F;
159       break;
160     case kWriteStream:
161       encoded = 0x9F;
162       break;
163     case kCloseClientStream:
164       encoded = 0xBF;
165       break;
166     case kCancel:
167       encoded = 0xCF;
168       break;
169     case kAbandon:
170       encoded = 0xDF;
171       break;
172     case kSwap:
173       encoded = 0xEF;
174       break;
175     case kDestroy:
176       encoded = 0xFF;
177       break;
178   }
179   encoded |=
180       ((target < Fuzzer::kMaxConcurrentCalls ? target
181                                              : Fuzzer::kMaxConcurrentCalls) %
182        0xFF)
183       << 8;
184   encoded |= (static_cast<uint32_t>(value) << 16);
185   return encoded;
186 }
187 
Log(bool verbose,size_t num_actions,const char * fmt,...) const188 void Action::Log(bool verbose, size_t num_actions, const char* fmt, ...) const {
189   if (!verbose) {
190     return;
191   }
192   char s1[16];
193   auto result = callback_id < Fuzzer::kMaxConcurrentCalls
194                     ? string::Format(s1, "%-3zu", callback_id)
195                     : string::Format(s1, "n/a");
196   va_list ap;
197   va_start(ap, fmt);
198   char s2[128];
199   if (result.ok()) {
200     result = string::FormatVaList(s2, fmt, ap);
201   }
202   va_end(ap);
203   if (result.ok()) {
204     PW_LOG_INFO("#%-12zu\tthread: %zu\tcallback for: %s\ttarget call: %zu\t%s",
205                 num_actions,
206                 thread_id,
207                 s1,
208                 target,
209                 s2);
210   } else {
211     LogFailure(verbose, num_actions, result.status());
212   }
213 }
214 
LogFailure(bool verbose,size_t num_actions,Status status) const215 void Action::LogFailure(bool verbose, size_t num_actions, Status status) const {
216   if (verbose && !status.ok()) {
217     PW_LOG_INFO("#%-12zu\tthread: %zu\tFailed to log action: %s",
218                 num_actions,
219                 thread_id,
220                 pw_StatusString(status));
221   }
222 }
223 
224 // FuzzyCall methods.
225 
RecordWrite(size_t num,bool append)226 void FuzzyCall::RecordWrite(size_t num, bool append) {
227   std::lock_guard lock(mutex_);
228   if (append) {
229     last_write_ += num;
230   } else {
231     last_write_ = num;
232   }
233   total_written_ += num;
234   pending_ = true;
235 }
236 
Await()237 void FuzzyCall::Await() {
238   std::unique_lock<sync::Mutex> lock(mutex_);
239   cv_.wait(lock, [this]() PW_NO_LOCK_SAFETY_ANALYSIS { return !pending_; });
240 }
241 
Notify()242 void FuzzyCall::Notify() {
243   if (pending_.exchange(false)) {
244     cv_.notify_all();
245   }
246 }
247 
Swap(FuzzyCall & other)248 void FuzzyCall::Swap(FuzzyCall& other) {
249   if (index_ == other.index_) {
250     return;
251   }
252   // Manually acquire locks in an order based on call IDs to prevent deadlock.
253   if (index_ < other.index_) {
254     mutex_.lock();
255     other.mutex_.lock();
256   } else {
257     other.mutex_.lock();
258     mutex_.lock();
259   }
260   call_.swap(other.call_);
261   std::swap(id_, other.id_);
262   pending_ = other.pending_.exchange(pending_);
263   std::swap(last_write_, other.last_write_);
264   std::swap(total_written_, other.total_written_);
265   mutex_.unlock();
266   other.mutex_.unlock();
267   cv_.notify_all();
268   other.cv_.notify_all();
269 }
270 
Reset(Variant call)271 void FuzzyCall::Reset(Variant call) {
272   {
273     std::lock_guard lock(mutex_);
274     call_ = std::move(call);
275   }
276   cv_.notify_all();
277 }
278 
Log()279 void FuzzyCall::Log() {
280   if (mutex_.try_lock_for(100ms)) {
281     PW_LOG_INFO("call %zu:", index_);
282     PW_LOG_INFO("           active: %s",
283                 std::visit(ActiveVisitor(), call_) ? "true" : "false");
284     PW_LOG_INFO("  request pending: %s ", pending_ ? "true" : "false");
285     PW_LOG_INFO("       last write: %zu bytes", last_write_);
286     PW_LOG_INFO("    total written: %zu bytes", total_written_);
287     mutex_.unlock();
288   } else {
289     PW_LOG_WARN("call %zu: failed to acquire lock", index_);
290   }
291 }
292 
293 // `Fuzzer` methods.
294 
295 #define FUZZ_LOG_VERBOSE(...) \
296   if (verbose_) {             \
297     PW_LOG_INFO(__VA_ARGS__); \
298   }
299 
Fuzzer(Client & client,uint32_t channel_id)300 Fuzzer::Fuzzer(Client& client, uint32_t channel_id)
301     : client_(client, channel_id),
302       timer_([this](chrono::SystemClock::time_point) {
303         PW_LOG_ERROR(
304             "Workers performed %zu actions before timing out without an "
305             "update.",
306             num_actions_.load());
307         PW_LOG_INFO("Additional call details:");
308         for (auto& call : fuzzy_calls_) {
309           call.Log();
310         }
311         PW_CRASH("Fuzzer found a fatal error condition: TIMEOUT.");
312       }) {
313   for (size_t index = 0; index < kMaxConcurrentCalls; ++index) {
314     fuzzy_calls_.emplace_back(index);
315     indices_.push_back(index);
316     contexts_.push_back(CallbackContext{.id = index, .fuzzer = this});
317   }
318 }
319 
Run(uint64_t seed,size_t num_actions)320 void Fuzzer::Run(uint64_t seed, size_t num_actions) {
321   FUZZ_LOG_VERBOSE("Fuzzing RPC client with:");
322   FUZZ_LOG_VERBOSE("  num_actions: %zu", num_actions);
323   FUZZ_LOG_VERBOSE("         seed: %" PRIu64, seed);
324   num_actions_.store(0);
325   random::XorShiftStarRng64 rng(seed);
326   while (true) {
327     {
328       size_t actions_done = num_actions_.load();
329       if (actions_done >= num_actions) {
330         FUZZ_LOG_VERBOSE("Fuzzing complete; %zu actions performed.",
331                          actions_done);
332         break;
333       }
334       FUZZ_LOG_VERBOSE("%zu actions remaining.", num_actions - actions_done);
335     }
336     FUZZ_LOG_VERBOSE("Generating %zu random actions.", kMaxActions);
337     pw::Vector<uint32_t, kMaxActions> actions;
338     for (size_t i = 0; i < kNumThreads; ++i) {
339       size_t num_actions_for_thread;
340       rng.GetInt(num_actions_for_thread, kMaxActionsPerThread + 1);
341       for (size_t j = 0; j < num_actions_for_thread; ++j) {
342         uint32_t encoded = 0;
343         while (!encoded) {
344           rng.GetInt(encoded);
345         }
346         actions.push_back(encoded);
347       }
348       actions.push_back(0);
349     }
350     Run(actions);
351   }
352 }
353 
Run(const pw::Vector<uint32_t> & actions)354 void Fuzzer::Run(const pw::Vector<uint32_t>& actions) {
355   FUZZ_LOG_VERBOSE("Starting %zu threads to perform %zu actions:",
356                    kNumThreads - 1,
357                    actions.size());
358   FUZZ_LOG_VERBOSE("    timeout: %lldms", timer_.timeout() / 1ms);
359   auto iter = actions.begin();
360   timer_.Restart();
361   for (size_t thread_id = 0; thread_id < kNumThreads; ++thread_id) {
362     pw::Vector<uint32_t, kMaxActionsPerThread> thread_actions;
363     while (thread_actions.size() < kMaxActionsPerThread &&
364            iter != actions.end()) {
365       uint32_t encoded = *iter++;
366       if (!encoded) {
367         break;
368       }
369       thread_actions.push_back(encoded);
370     }
371     if (thread_id == 0) {
372       std::lock_guard lock(mutex_);
373       callback_actions_ = std::move(thread_actions);
374       callback_iterator_ = callback_actions_.begin();
375     } else {
376       threads_.emplace_back(
377           [this, thread_id, actions_to_perform = std::move(thread_actions)]() {
378             for (const auto& encoded : actions_to_perform) {
379               Action action(encoded);
380               action.set_thread_id(thread_id);
381               Perform(action);
382             }
383           });
384     }
385   }
386   for (auto& t : threads_) {
387     t.join();
388   }
389   for (auto& fuzzy_call : fuzzy_calls_) {
390     fuzzy_call.Reset();
391   }
392   timer_.Cancel();
393 }
394 
Perform(const Action & action)395 void Fuzzer::Perform(const Action& action) {
396   FuzzyCall& fuzzy_call = FindCall(action.target);
397   switch (action.op) {
398     case Action::kSkip: {
399       if (action.thread_id == 0) {
400         action.Log(verbose_, ++num_actions_, "Callback chain completed");
401       }
402       break;
403     }
404     case Action::kWait: {
405       if (action.callback_id == action.target) {
406         // Don't wait in a callback of the target call.
407         break;
408       }
409       if (fuzzy_call.pending()) {
410         action.Log(verbose_, ++num_actions_, "Waiting for call.");
411         fuzzy_call.Await();
412       }
413       break;
414     }
415     case Action::kWriteUnary:
416     case Action::kWriteStream: {
417       if (action.callback_id == action.target) {
418         // Don't create a new call from the call's own callback.
419         break;
420       }
421       char buf[kMaxWriteLen];
422       char val = Action::DecodeWriteValue(action.value);
423       size_t len = Action::DecodeWriteLength(action.value);
424       memset(buf, val, len);
425       if (verbose_) {
426         char msg_buf[64];
427         span msg(msg_buf);
428         auto result = string::Format(
429             msg,
430             "Writing %s request of ",
431             action.op == Action::kWriteUnary ? "unary" : "stream");
432         if (result.ok()) {
433           size_t off = result.size();
434           result = string::Format(
435               msg.subspan(off),
436               isprint(val) ? "['%c'; %zu]." : "['\\x%02x'; %zu].",
437               val,
438               len);
439         }
440         size_t num_actions = ++num_actions_;
441         if (result.ok()) {
442           action.Log(verbose_, num_actions, "%s", msg.data());
443         } else if (verbose_) {
444           action.LogFailure(verbose_, num_actions, result.status());
445         }
446       }
447       bool append = false;
448       if (action.op == Action::kWriteUnary) {
449         // Send a unary request.
450         fuzzy_call.Reset(client_.UnaryEcho(
451             as_bytes(span(buf, len)),
452             /* on completed */
453             [context = GetContext(action.target)](ConstByteSpan, Status) {
454               context->fuzzer->OnCompleted(context->id);
455             },
456             /* on error */
457             [context = GetContext(action.target)](Status status) {
458               context->fuzzer->OnError(context->id, status);
459             }));
460 
461       } else if (fuzzy_call.Visit(
462                      WriteVisitor{.data = as_bytes(span(buf, len))})) {
463         // Append to an existing stream
464         append = true;
465       } else {
466         // .Open a new stream.
467         fuzzy_call.Reset(client_.BidirectionalEcho(
468             /* on next */
469             [context = GetContext(action.target)](ConstByteSpan) {
470               context->fuzzer->OnNext(context->id);
471             },
472             /* on completed */
473             [context = GetContext(action.target)](Status) {
474               context->fuzzer->OnCompleted(context->id);
475             },
476             /* on error */
477             [context = GetContext(action.target)](Status status) {
478               context->fuzzer->OnError(context->id, status);
479             }));
480       }
481       fuzzy_call.RecordWrite(len, append);
482       break;
483     }
484     case Action::kCloseClientStream:
485       action.Log(verbose_, ++num_actions_, "Closing stream.");
486       fuzzy_call.Visit(CloseClientStreamVisitor());
487       break;
488     case Action::kCancel:
489       action.Log(verbose_, ++num_actions_, "Canceling call.");
490       fuzzy_call.Visit(CancelVisitor());
491       break;
492     case Action::kAbandon: {
493       action.Log(verbose_, ++num_actions_, "Abandoning call.");
494       fuzzy_call.Visit(AbandonVisitor());
495       break;
496     }
497     case Action::kSwap: {
498       size_t other_target = action.value % kMaxConcurrentCalls;
499       if (action.callback_id == action.target ||
500           action.callback_id == other_target) {
501         // Don't move a call from within its own callback.
502         break;
503       }
504       action.Log(verbose_,
505                  ++num_actions_,
506                  "Swapping call with call %zu.",
507                  other_target);
508       std::lock_guard lock(mutex_);
509       FuzzyCall& other = FindCallLocked(other_target);
510       std::swap(indices_[fuzzy_call.id()], indices_[other.id()]);
511       fuzzy_call.Swap(other);
512       break;
513     }
514     case Action::kDestroy: {
515       if (action.callback_id == action.target) {
516         // Don't destroy a call from within its own callback.
517         break;
518       }
519       action.Log(verbose_, ++num_actions_, "Destroying call.");
520       fuzzy_call.Reset();
521       break;
522     }
523     default:
524       break;
525   }
526   timer_.Restart();
527 }
528 
OnNext(size_t callback_id)529 void Fuzzer::OnNext(size_t callback_id) { FindCall(callback_id).Notify(); }
530 
OnCompleted(size_t callback_id)531 void Fuzzer::OnCompleted(size_t callback_id) {
532   uint32_t encoded = 0;
533   {
534     std::lock_guard lock(mutex_);
535     if (callback_iterator_ != callback_actions_.end()) {
536       encoded = *callback_iterator_++;
537     }
538   }
539   Action action(encoded);
540   action.set_callback_id(callback_id);
541   Perform(action);
542   FindCall(callback_id).Notify();
543 }
544 
OnError(size_t callback_id,Status status)545 void Fuzzer::OnError(size_t callback_id, Status status) {
546   FuzzyCall& call = FindCall(callback_id);
547   PW_LOG_WARN("Call %zu received an error from the server: %s",
548               call.id(),
549               pw_StatusString(status));
550   call.Notify();
551 }
552 
553 }  // namespace pw::rpc::fuzz
554