xref: /aosp_15_r20/external/pigweed/pw_multisink/multisink_threaded_test.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include <cstddef>
16 #include <cstdint>
17 
18 #include "pw_containers/vector.h"
19 #include "pw_multisink/multisink.h"
20 #include "pw_multisink/test_thread.h"
21 #include "pw_span/span.h"
22 #include "pw_string/string_builder.h"
23 #include "pw_thread/thread.h"
24 #include "pw_thread/yield.h"
25 #include "pw_unit_test/framework.h"
26 
27 namespace pw::multisink {
28 namespace {
29 
30 constexpr size_t kEntryBufferSize = sizeof("message 000");
31 constexpr size_t kMaxMessageCount = 250;
32 constexpr size_t kBufferSize = kMaxMessageCount * kEntryBufferSize;
33 
34 using MessageSpan = span<const StringBuffer<kEntryBufferSize>>;
35 
36 // This function is unused if joining is not supported.
CompareSentAndReceivedMessages(const MessageSpan & sent_messages,const MessageSpan & received_messages)37 [[maybe_unused]] void CompareSentAndReceivedMessages(
38     const MessageSpan& sent_messages, const MessageSpan& received_messages) {
39   ASSERT_EQ(sent_messages.size(), received_messages.size());
40   for (size_t i = 0; i < sent_messages.size(); ++i) {
41     ASSERT_EQ(sent_messages[i].size(), received_messages[i].size());
42     EXPECT_EQ(std::string_view(sent_messages[i]),
43               std::string_view(received_messages[i]));
44   }
45 }
46 
47 }  // namespace
48 
49 // Static message pool to avoid recreating messages for every test and avoids
50 // using std::string.
51 class MessagePool {
52  public:
Instance()53   static MessagePool& Instance() {
54     static MessagePool instance;
55     return instance;
56   }
57 
58   MessagePool(const MessagePool&) = delete;
59   MessagePool& operator=(const MessagePool&) = delete;
60   MessagePool(MessagePool&&) = delete;
61   MessagePool& operator=(MessagePool&&) = delete;
62 
GetMessages(size_t message_count) const63   MessageSpan GetMessages(size_t message_count) const {
64     PW_ASSERT(message_count <= messages_.size());
65     return MessageSpan(messages_.begin(), message_count);
66   }
67 
68  private:
MessagePool()69   MessagePool() {
70     for (size_t i = 0; i < kMaxMessageCount; ++i) {
71       messages_.emplace_back();
72       messages_.back() << "message %u" << static_cast<unsigned int>(i);
73     }
74   }
75 
76   Vector<StringBuffer<kEntryBufferSize>, kMaxMessageCount> messages_;
77 };
78 
79 // Continuously reads logs from a multisink, using PopEntry() and stores copies
80 // of the retrieved messages for later verification. The thread stops when the
81 // the number of read messages and total drop count matches the expected count.
82 class LogPopReaderThread : public thread::ThreadCore {
83  public:
LogPopReaderThread(MultiSink & multisink,uint32_t expected_message_and_drop_count)84   LogPopReaderThread(MultiSink& multisink,
85                      uint32_t expected_message_and_drop_count)
86       : multisink_(multisink),
87         total_drop_count_(0),
88         expected_message_and_drop_count_(expected_message_and_drop_count) {
89     PW_ASSERT(expected_message_and_drop_count_ <= kMaxMessageCount);
90   }
91 
drop_count()92   uint32_t drop_count() { return total_drop_count_; }
93 
received_messages()94   const MessageSpan received_messages() {
95     return MessageSpan(received_messages_.begin(), received_messages_.size());
96   }
97 
Run()98   void Run() override {
99     multisink_.AttachDrain(drain_);
100     ReadAllEntries();
101   }
102 
ReadAllEntries()103   virtual void ReadAllEntries() {
104     do {
105       uint32_t drop_count = 0;
106       uint32_t ingress_drop_count = 0;
107       const Result<ConstByteSpan> possible_entry =
108           drain_.PopEntry(entry_buffer_, drop_count, ingress_drop_count);
109       total_drop_count_ += drop_count + ingress_drop_count;
110       if (possible_entry.status().IsOutOfRange()) {
111         pw::this_thread::yield();
112         continue;
113       }
114       ASSERT_EQ(possible_entry.status(), OkStatus());
115       if (received_messages_.full()) {
116         return;
117       }
118       received_messages_.emplace_back();
119       received_messages_.back() << std::string_view(
120           reinterpret_cast<const char*>(possible_entry.value().data()),
121           possible_entry.value().size());
122       pw::this_thread::yield();
123     } while (total_drop_count_ + received_messages_.size() <
124              expected_message_and_drop_count_);
125   }
126 
127  protected:
128   MultiSink::Drain drain_;
129   MultiSink& multisink_;
130   std::array<std::byte, kEntryBufferSize> entry_buffer_;
131   uint32_t total_drop_count_;
132   const uint32_t expected_message_and_drop_count_;
133   Vector<StringBuffer<kEntryBufferSize>, kMaxMessageCount> received_messages_;
134 };
135 
136 class LogPeekAndCommitReaderThread : public LogPopReaderThread {
137  public:
LogPeekAndCommitReaderThread(MultiSink & multisink,uint32_t expected_message_and_drop_count)138   LogPeekAndCommitReaderThread(MultiSink& multisink,
139                                uint32_t expected_message_and_drop_count)
140       : LogPopReaderThread(multisink, expected_message_and_drop_count) {}
141 
ReadAllEntries()142   void ReadAllEntries() override {
143     do {
144       uint32_t drop_count = 0;
145       uint32_t ingress_drop_count = 0;
146       const Result<MultiSink::Drain::PeekedEntry> possible_entry =
147           drain_.PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
148       total_drop_count_ += drop_count + ingress_drop_count;
149       if (possible_entry.status().IsOutOfRange()) {
150         pw::this_thread::yield();
151         continue;
152       }
153       ASSERT_EQ(possible_entry.status(), OkStatus());
154       if (received_messages_.full()) {
155         return;
156       }
157       pw::this_thread::yield();
158       received_messages_.emplace_back();
159       received_messages_.back() << std::string_view(
160           reinterpret_cast<const char*>(possible_entry.value().entry().data()),
161           possible_entry.value().entry().size());
162       ASSERT_EQ(drain_.PopEntry(possible_entry.value()), OkStatus());
163       pw::this_thread::yield();
164     } while (total_drop_count_ + received_messages_.size() <
165              expected_message_and_drop_count_);
166   }
167 };
168 
169 // Adds the provided messages to the shared multisink.
170 class LogWriterThread : public thread::ThreadCore {
171  public:
LogWriterThread(MultiSink & multisink,const MessageSpan & message_stack)172   LogWriterThread(MultiSink& multisink, const MessageSpan& message_stack)
173       : multisink_(multisink), message_stack_(message_stack) {}
174 
Run()175   void Run() override {
176     for (const auto& message : message_stack_) {
177       multisink_.HandleEntry(as_bytes(span(std::string_view(message))));
178       pw::this_thread::yield();
179     }
180   }
181 
182  private:
183   MultiSink& multisink_;
184   const MessageSpan& message_stack_;
185 };
186 
187 class MultiSinkTest : public ::testing::Test {
188  protected:
MultiSinkTest()189   MultiSinkTest() : buffer_{}, multisink_(buffer_) {}
190 
191   std::byte buffer_[kBufferSize];
192   MultiSink multisink_;
193 };
194 
195 #if PW_THREAD_JOINING_ENABLED
196 
TEST_F(MultiSinkTest,SingleWriterSingleReader)197 TEST_F(MultiSinkTest, SingleWriterSingleReader) {
198   const uint32_t log_count = 100;
199   const uint32_t drop_count = 5;
200   const uint32_t expected_message_and_drop_count = log_count + drop_count;
201   const auto message_stack = MessagePool::Instance().GetMessages(log_count);
202 
203   // Start reader thread.
204   LogPopReaderThread reader_thread_core(multisink_,
205                                         expected_message_and_drop_count);
206   Thread reader_thread(test::MultiSinkTestThreadOptions(), reader_thread_core);
207   // Start writer thread.
208   LogWriterThread writer_thread_core(multisink_, message_stack);
209   Thread writer_thread(test::MultiSinkTestThreadOptions(), writer_thread_core);
210 
211   // Wait for writer thread to end.
212   writer_thread.join();
213   multisink_.HandleDropped(drop_count);
214   reader_thread.join();
215 
216   EXPECT_EQ(reader_thread_core.drop_count(), drop_count);
217   CompareSentAndReceivedMessages(message_stack,
218                                  reader_thread_core.received_messages());
219 }
220 
TEST_F(MultiSinkTest,SingleWriterSinglePeekAndCommitReader)221 TEST_F(MultiSinkTest, SingleWriterSinglePeekAndCommitReader) {
222   const uint32_t log_count = 100;
223   const uint32_t drop_count = 5;
224   const uint32_t expected_message_and_drop_count = log_count + drop_count;
225   const auto message_stack = MessagePool::Instance().GetMessages(log_count);
226 
227   // Start reader thread.
228   LogPeekAndCommitReaderThread reader_thread_core(
229       multisink_, expected_message_and_drop_count);
230   Thread reader_thread(test::MultiSinkTestThreadOptions(), reader_thread_core);
231   // Start writer thread.
232   LogWriterThread writer_thread_core(multisink_, message_stack);
233   Thread writer_thread(test::MultiSinkTestThreadOptions(), writer_thread_core);
234 
235   // Wait for writer thread to end.
236   writer_thread.join();
237   multisink_.HandleDropped(drop_count);
238   reader_thread.join();
239 
240   EXPECT_EQ(reader_thread_core.drop_count(), drop_count);
241   CompareSentAndReceivedMessages(message_stack,
242                                  reader_thread_core.received_messages());
243 }
244 
TEST_F(MultiSinkTest,SingleWriterMultipleReaders)245 TEST_F(MultiSinkTest, SingleWriterMultipleReaders) {
246   const uint32_t log_count = 100;
247   const uint32_t drop_count = 5;
248   const uint32_t expected_message_and_drop_count = log_count + drop_count;
249   const auto message_stack = MessagePool::Instance().GetMessages(log_count);
250 
251   // Start reader threads.
252   LogPopReaderThread reader_thread_core1(multisink_,
253                                          expected_message_and_drop_count);
254   Thread reader_thread1(test::MultiSinkTestThreadOptions(),
255                         reader_thread_core1);
256   LogPopReaderThread reader_thread_core2(multisink_,
257                                          expected_message_and_drop_count);
258   Thread reader_thread2(test::MultiSinkTestThreadOptions(),
259                         reader_thread_core2);
260   LogPeekAndCommitReaderThread reader_thread_core3(
261       multisink_, expected_message_and_drop_count);
262   Thread reader_thread3(test::MultiSinkTestThreadOptions(),
263                         reader_thread_core3);
264   // Start writer thread.
265   LogWriterThread writer_thread_core(multisink_, message_stack);
266   Thread writer_thread(test::MultiSinkTestThreadOptions(), writer_thread_core);
267 
268   // Wait for writer thread to end.
269   writer_thread.join();
270   multisink_.HandleDropped(drop_count);
271   reader_thread1.join();
272   reader_thread2.join();
273   reader_thread3.join();
274 
275   EXPECT_EQ(reader_thread_core1.drop_count(), drop_count);
276   CompareSentAndReceivedMessages(message_stack,
277                                  reader_thread_core1.received_messages());
278   EXPECT_EQ(reader_thread_core2.drop_count(), drop_count);
279   CompareSentAndReceivedMessages(message_stack,
280                                  reader_thread_core2.received_messages());
281   EXPECT_EQ(reader_thread_core3.drop_count(), drop_count);
282   CompareSentAndReceivedMessages(message_stack,
283                                  reader_thread_core3.received_messages());
284 }
285 
TEST_F(MultiSinkTest,MultipleWritersMultipleReaders)286 TEST_F(MultiSinkTest, MultipleWritersMultipleReaders) {
287   const uint32_t log_count = 100;
288   const uint32_t drop_count = 7;
289   const uint32_t expected_message_and_drop_count = 2 * log_count + drop_count;
290   const auto message_stack = MessagePool::Instance().GetMessages(log_count);
291 
292   // Start reader threads.
293   LogPopReaderThread reader_thread_core1(multisink_,
294                                          expected_message_and_drop_count);
295   Thread reader_thread1(test::MultiSinkTestThreadOptions(),
296                         reader_thread_core1);
297   LogPopReaderThread reader_thread_core2(multisink_,
298                                          expected_message_and_drop_count);
299   Thread reader_thread2(test::MultiSinkTestThreadOptions(),
300                         reader_thread_core2);
301   LogPeekAndCommitReaderThread reader_thread_core3(
302       multisink_, expected_message_and_drop_count);
303   Thread reader_thread3(test::MultiSinkTestThreadOptions(),
304                         reader_thread_core3);
305   // Start writer threads.
306   LogWriterThread writer_thread_core1(multisink_, message_stack);
307   Thread writer_thread1(test::MultiSinkTestThreadOptions(),
308                         writer_thread_core1);
309   LogWriterThread writer_thread_core2(multisink_, message_stack);
310   Thread writer_thread2(test::MultiSinkTestThreadOptions(),
311                         writer_thread_core2);
312 
313   // Wait for writer thread to end.
314   writer_thread1.join();
315   writer_thread2.join();
316   multisink_.HandleDropped(drop_count);
317   reader_thread1.join();
318   reader_thread2.join();
319   reader_thread3.join();
320 
321   EXPECT_EQ(reader_thread_core1.drop_count(), drop_count);
322   EXPECT_EQ(reader_thread_core2.drop_count(), drop_count);
323   EXPECT_EQ(reader_thread_core3.drop_count(), drop_count);
324   // Since we don't know the order that messages came in, we can't check them.
325   EXPECT_EQ(reader_thread_core1.received_messages().size(),
326             expected_message_and_drop_count - drop_count);
327   EXPECT_EQ(reader_thread_core2.received_messages().size(),
328             expected_message_and_drop_count - drop_count);
329   EXPECT_EQ(reader_thread_core3.received_messages().size(),
330             expected_message_and_drop_count - drop_count);
331 }
332 
TEST_F(MultiSinkTest,OverflowMultisink)333 TEST_F(MultiSinkTest, OverflowMultisink) {
334   // Expect the multisink to overflow and readers to not fail when poping, or
335   // peeking and commiting entries.
336   const size_t log_count = kMaxMessageCount;
337   const size_t max_buffer_entry_count = 20;
338   std::byte small_multisink_buffer[max_buffer_entry_count * kEntryBufferSize];
339   MultiSink small_multisink(small_multisink_buffer);
340 
341   const auto message_stack = MessagePool::Instance().GetMessages(log_count);
342 
343   // Start reader threads.
344   LogPeekAndCommitReaderThread reader_thread_core1(small_multisink, log_count);
345   Thread reader_thread1(test::MultiSinkTestThreadOptions(),
346                         reader_thread_core1);
347   LogPopReaderThread reader_thread_core2(small_multisink, log_count);
348   Thread reader_thread2(test::MultiSinkTestThreadOptions(),
349                         reader_thread_core2);
350 
351   // Start writer threads.
352   LogWriterThread writer_thread_core1(small_multisink, message_stack);
353   Thread writer_thread1(test::MultiSinkTestThreadOptions(),
354                         writer_thread_core1);
355   LogWriterThread writer_thread_core2(small_multisink, message_stack);
356   Thread writer_thread2(test::MultiSinkTestThreadOptions(),
357                         writer_thread_core2);
358 
359   // Wait for writer thread to end.
360   writer_thread1.join();
361   writer_thread2.join();
362   reader_thread1.join();
363   reader_thread2.join();
364 
365   // Verifying received messages and drop message counts is unreliable as we
366   // can't control the order threads will operate.
367 }
368 
369 #endif  // PW_THREAD_JOINING_ENABLED
370 
371 }  // namespace pw::multisink
372