1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include <cstddef>
16 #include <cstdint>
17
18 #include "pw_containers/vector.h"
19 #include "pw_multisink/multisink.h"
20 #include "pw_multisink/test_thread.h"
21 #include "pw_span/span.h"
22 #include "pw_string/string_builder.h"
23 #include "pw_thread/thread.h"
24 #include "pw_thread/yield.h"
25 #include "pw_unit_test/framework.h"
26
27 namespace pw::multisink {
28 namespace {
29
30 constexpr size_t kEntryBufferSize = sizeof("message 000");
31 constexpr size_t kMaxMessageCount = 250;
32 constexpr size_t kBufferSize = kMaxMessageCount * kEntryBufferSize;
33
34 using MessageSpan = span<const StringBuffer<kEntryBufferSize>>;
35
36 // This function is unused if joining is not supported.
CompareSentAndReceivedMessages(const MessageSpan & sent_messages,const MessageSpan & received_messages)37 [[maybe_unused]] void CompareSentAndReceivedMessages(
38 const MessageSpan& sent_messages, const MessageSpan& received_messages) {
39 ASSERT_EQ(sent_messages.size(), received_messages.size());
40 for (size_t i = 0; i < sent_messages.size(); ++i) {
41 ASSERT_EQ(sent_messages[i].size(), received_messages[i].size());
42 EXPECT_EQ(std::string_view(sent_messages[i]),
43 std::string_view(received_messages[i]));
44 }
45 }
46
47 } // namespace
48
49 // Static message pool to avoid recreating messages for every test and avoids
50 // using std::string.
51 class MessagePool {
52 public:
Instance()53 static MessagePool& Instance() {
54 static MessagePool instance;
55 return instance;
56 }
57
58 MessagePool(const MessagePool&) = delete;
59 MessagePool& operator=(const MessagePool&) = delete;
60 MessagePool(MessagePool&&) = delete;
61 MessagePool& operator=(MessagePool&&) = delete;
62
GetMessages(size_t message_count) const63 MessageSpan GetMessages(size_t message_count) const {
64 PW_ASSERT(message_count <= messages_.size());
65 return MessageSpan(messages_.begin(), message_count);
66 }
67
68 private:
MessagePool()69 MessagePool() {
70 for (size_t i = 0; i < kMaxMessageCount; ++i) {
71 messages_.emplace_back();
72 messages_.back() << "message %u" << static_cast<unsigned int>(i);
73 }
74 }
75
76 Vector<StringBuffer<kEntryBufferSize>, kMaxMessageCount> messages_;
77 };
78
79 // Continuously reads logs from a multisink, using PopEntry() and stores copies
80 // of the retrieved messages for later verification. The thread stops when the
81 // the number of read messages and total drop count matches the expected count.
82 class LogPopReaderThread : public thread::ThreadCore {
83 public:
LogPopReaderThread(MultiSink & multisink,uint32_t expected_message_and_drop_count)84 LogPopReaderThread(MultiSink& multisink,
85 uint32_t expected_message_and_drop_count)
86 : multisink_(multisink),
87 total_drop_count_(0),
88 expected_message_and_drop_count_(expected_message_and_drop_count) {
89 PW_ASSERT(expected_message_and_drop_count_ <= kMaxMessageCount);
90 }
91
drop_count()92 uint32_t drop_count() { return total_drop_count_; }
93
received_messages()94 const MessageSpan received_messages() {
95 return MessageSpan(received_messages_.begin(), received_messages_.size());
96 }
97
Run()98 void Run() override {
99 multisink_.AttachDrain(drain_);
100 ReadAllEntries();
101 }
102
ReadAllEntries()103 virtual void ReadAllEntries() {
104 do {
105 uint32_t drop_count = 0;
106 uint32_t ingress_drop_count = 0;
107 const Result<ConstByteSpan> possible_entry =
108 drain_.PopEntry(entry_buffer_, drop_count, ingress_drop_count);
109 total_drop_count_ += drop_count + ingress_drop_count;
110 if (possible_entry.status().IsOutOfRange()) {
111 pw::this_thread::yield();
112 continue;
113 }
114 ASSERT_EQ(possible_entry.status(), OkStatus());
115 if (received_messages_.full()) {
116 return;
117 }
118 received_messages_.emplace_back();
119 received_messages_.back() << std::string_view(
120 reinterpret_cast<const char*>(possible_entry.value().data()),
121 possible_entry.value().size());
122 pw::this_thread::yield();
123 } while (total_drop_count_ + received_messages_.size() <
124 expected_message_and_drop_count_);
125 }
126
127 protected:
128 MultiSink::Drain drain_;
129 MultiSink& multisink_;
130 std::array<std::byte, kEntryBufferSize> entry_buffer_;
131 uint32_t total_drop_count_;
132 const uint32_t expected_message_and_drop_count_;
133 Vector<StringBuffer<kEntryBufferSize>, kMaxMessageCount> received_messages_;
134 };
135
136 class LogPeekAndCommitReaderThread : public LogPopReaderThread {
137 public:
LogPeekAndCommitReaderThread(MultiSink & multisink,uint32_t expected_message_and_drop_count)138 LogPeekAndCommitReaderThread(MultiSink& multisink,
139 uint32_t expected_message_and_drop_count)
140 : LogPopReaderThread(multisink, expected_message_and_drop_count) {}
141
ReadAllEntries()142 void ReadAllEntries() override {
143 do {
144 uint32_t drop_count = 0;
145 uint32_t ingress_drop_count = 0;
146 const Result<MultiSink::Drain::PeekedEntry> possible_entry =
147 drain_.PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
148 total_drop_count_ += drop_count + ingress_drop_count;
149 if (possible_entry.status().IsOutOfRange()) {
150 pw::this_thread::yield();
151 continue;
152 }
153 ASSERT_EQ(possible_entry.status(), OkStatus());
154 if (received_messages_.full()) {
155 return;
156 }
157 pw::this_thread::yield();
158 received_messages_.emplace_back();
159 received_messages_.back() << std::string_view(
160 reinterpret_cast<const char*>(possible_entry.value().entry().data()),
161 possible_entry.value().entry().size());
162 ASSERT_EQ(drain_.PopEntry(possible_entry.value()), OkStatus());
163 pw::this_thread::yield();
164 } while (total_drop_count_ + received_messages_.size() <
165 expected_message_and_drop_count_);
166 }
167 };
168
169 // Adds the provided messages to the shared multisink.
170 class LogWriterThread : public thread::ThreadCore {
171 public:
LogWriterThread(MultiSink & multisink,const MessageSpan & message_stack)172 LogWriterThread(MultiSink& multisink, const MessageSpan& message_stack)
173 : multisink_(multisink), message_stack_(message_stack) {}
174
Run()175 void Run() override {
176 for (const auto& message : message_stack_) {
177 multisink_.HandleEntry(as_bytes(span(std::string_view(message))));
178 pw::this_thread::yield();
179 }
180 }
181
182 private:
183 MultiSink& multisink_;
184 const MessageSpan& message_stack_;
185 };
186
187 class MultiSinkTest : public ::testing::Test {
188 protected:
MultiSinkTest()189 MultiSinkTest() : buffer_{}, multisink_(buffer_) {}
190
191 std::byte buffer_[kBufferSize];
192 MultiSink multisink_;
193 };
194
195 #if PW_THREAD_JOINING_ENABLED
196
TEST_F(MultiSinkTest,SingleWriterSingleReader)197 TEST_F(MultiSinkTest, SingleWriterSingleReader) {
198 const uint32_t log_count = 100;
199 const uint32_t drop_count = 5;
200 const uint32_t expected_message_and_drop_count = log_count + drop_count;
201 const auto message_stack = MessagePool::Instance().GetMessages(log_count);
202
203 // Start reader thread.
204 LogPopReaderThread reader_thread_core(multisink_,
205 expected_message_and_drop_count);
206 Thread reader_thread(test::MultiSinkTestThreadOptions(), reader_thread_core);
207 // Start writer thread.
208 LogWriterThread writer_thread_core(multisink_, message_stack);
209 Thread writer_thread(test::MultiSinkTestThreadOptions(), writer_thread_core);
210
211 // Wait for writer thread to end.
212 writer_thread.join();
213 multisink_.HandleDropped(drop_count);
214 reader_thread.join();
215
216 EXPECT_EQ(reader_thread_core.drop_count(), drop_count);
217 CompareSentAndReceivedMessages(message_stack,
218 reader_thread_core.received_messages());
219 }
220
TEST_F(MultiSinkTest,SingleWriterSinglePeekAndCommitReader)221 TEST_F(MultiSinkTest, SingleWriterSinglePeekAndCommitReader) {
222 const uint32_t log_count = 100;
223 const uint32_t drop_count = 5;
224 const uint32_t expected_message_and_drop_count = log_count + drop_count;
225 const auto message_stack = MessagePool::Instance().GetMessages(log_count);
226
227 // Start reader thread.
228 LogPeekAndCommitReaderThread reader_thread_core(
229 multisink_, expected_message_and_drop_count);
230 Thread reader_thread(test::MultiSinkTestThreadOptions(), reader_thread_core);
231 // Start writer thread.
232 LogWriterThread writer_thread_core(multisink_, message_stack);
233 Thread writer_thread(test::MultiSinkTestThreadOptions(), writer_thread_core);
234
235 // Wait for writer thread to end.
236 writer_thread.join();
237 multisink_.HandleDropped(drop_count);
238 reader_thread.join();
239
240 EXPECT_EQ(reader_thread_core.drop_count(), drop_count);
241 CompareSentAndReceivedMessages(message_stack,
242 reader_thread_core.received_messages());
243 }
244
TEST_F(MultiSinkTest,SingleWriterMultipleReaders)245 TEST_F(MultiSinkTest, SingleWriterMultipleReaders) {
246 const uint32_t log_count = 100;
247 const uint32_t drop_count = 5;
248 const uint32_t expected_message_and_drop_count = log_count + drop_count;
249 const auto message_stack = MessagePool::Instance().GetMessages(log_count);
250
251 // Start reader threads.
252 LogPopReaderThread reader_thread_core1(multisink_,
253 expected_message_and_drop_count);
254 Thread reader_thread1(test::MultiSinkTestThreadOptions(),
255 reader_thread_core1);
256 LogPopReaderThread reader_thread_core2(multisink_,
257 expected_message_and_drop_count);
258 Thread reader_thread2(test::MultiSinkTestThreadOptions(),
259 reader_thread_core2);
260 LogPeekAndCommitReaderThread reader_thread_core3(
261 multisink_, expected_message_and_drop_count);
262 Thread reader_thread3(test::MultiSinkTestThreadOptions(),
263 reader_thread_core3);
264 // Start writer thread.
265 LogWriterThread writer_thread_core(multisink_, message_stack);
266 Thread writer_thread(test::MultiSinkTestThreadOptions(), writer_thread_core);
267
268 // Wait for writer thread to end.
269 writer_thread.join();
270 multisink_.HandleDropped(drop_count);
271 reader_thread1.join();
272 reader_thread2.join();
273 reader_thread3.join();
274
275 EXPECT_EQ(reader_thread_core1.drop_count(), drop_count);
276 CompareSentAndReceivedMessages(message_stack,
277 reader_thread_core1.received_messages());
278 EXPECT_EQ(reader_thread_core2.drop_count(), drop_count);
279 CompareSentAndReceivedMessages(message_stack,
280 reader_thread_core2.received_messages());
281 EXPECT_EQ(reader_thread_core3.drop_count(), drop_count);
282 CompareSentAndReceivedMessages(message_stack,
283 reader_thread_core3.received_messages());
284 }
285
TEST_F(MultiSinkTest,MultipleWritersMultipleReaders)286 TEST_F(MultiSinkTest, MultipleWritersMultipleReaders) {
287 const uint32_t log_count = 100;
288 const uint32_t drop_count = 7;
289 const uint32_t expected_message_and_drop_count = 2 * log_count + drop_count;
290 const auto message_stack = MessagePool::Instance().GetMessages(log_count);
291
292 // Start reader threads.
293 LogPopReaderThread reader_thread_core1(multisink_,
294 expected_message_and_drop_count);
295 Thread reader_thread1(test::MultiSinkTestThreadOptions(),
296 reader_thread_core1);
297 LogPopReaderThread reader_thread_core2(multisink_,
298 expected_message_and_drop_count);
299 Thread reader_thread2(test::MultiSinkTestThreadOptions(),
300 reader_thread_core2);
301 LogPeekAndCommitReaderThread reader_thread_core3(
302 multisink_, expected_message_and_drop_count);
303 Thread reader_thread3(test::MultiSinkTestThreadOptions(),
304 reader_thread_core3);
305 // Start writer threads.
306 LogWriterThread writer_thread_core1(multisink_, message_stack);
307 Thread writer_thread1(test::MultiSinkTestThreadOptions(),
308 writer_thread_core1);
309 LogWriterThread writer_thread_core2(multisink_, message_stack);
310 Thread writer_thread2(test::MultiSinkTestThreadOptions(),
311 writer_thread_core2);
312
313 // Wait for writer thread to end.
314 writer_thread1.join();
315 writer_thread2.join();
316 multisink_.HandleDropped(drop_count);
317 reader_thread1.join();
318 reader_thread2.join();
319 reader_thread3.join();
320
321 EXPECT_EQ(reader_thread_core1.drop_count(), drop_count);
322 EXPECT_EQ(reader_thread_core2.drop_count(), drop_count);
323 EXPECT_EQ(reader_thread_core3.drop_count(), drop_count);
324 // Since we don't know the order that messages came in, we can't check them.
325 EXPECT_EQ(reader_thread_core1.received_messages().size(),
326 expected_message_and_drop_count - drop_count);
327 EXPECT_EQ(reader_thread_core2.received_messages().size(),
328 expected_message_and_drop_count - drop_count);
329 EXPECT_EQ(reader_thread_core3.received_messages().size(),
330 expected_message_and_drop_count - drop_count);
331 }
332
TEST_F(MultiSinkTest,OverflowMultisink)333 TEST_F(MultiSinkTest, OverflowMultisink) {
334 // Expect the multisink to overflow and readers to not fail when poping, or
335 // peeking and commiting entries.
336 const size_t log_count = kMaxMessageCount;
337 const size_t max_buffer_entry_count = 20;
338 std::byte small_multisink_buffer[max_buffer_entry_count * kEntryBufferSize];
339 MultiSink small_multisink(small_multisink_buffer);
340
341 const auto message_stack = MessagePool::Instance().GetMessages(log_count);
342
343 // Start reader threads.
344 LogPeekAndCommitReaderThread reader_thread_core1(small_multisink, log_count);
345 Thread reader_thread1(test::MultiSinkTestThreadOptions(),
346 reader_thread_core1);
347 LogPopReaderThread reader_thread_core2(small_multisink, log_count);
348 Thread reader_thread2(test::MultiSinkTestThreadOptions(),
349 reader_thread_core2);
350
351 // Start writer threads.
352 LogWriterThread writer_thread_core1(small_multisink, message_stack);
353 Thread writer_thread1(test::MultiSinkTestThreadOptions(),
354 writer_thread_core1);
355 LogWriterThread writer_thread_core2(small_multisink, message_stack);
356 Thread writer_thread2(test::MultiSinkTestThreadOptions(),
357 writer_thread_core2);
358
359 // Wait for writer thread to end.
360 writer_thread1.join();
361 writer_thread2.join();
362 reader_thread1.join();
363 reader_thread2.join();
364
365 // Verifying received messages and drop message counts is unreliable as we
366 // can't control the order threads will operate.
367 }
368
369 #endif // PW_THREAD_JOINING_ENABLED
370
371 } // namespace pw::multisink
372