1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_uart/blocking_adapter.h"
16
17 #include <array>
18 #include <memory>
19 #include <mutex>
20 #include <optional>
21 #include <utility>
22
23 #include "pw_assert/check.h"
24 #include "pw_bytes/array.h"
25 #include "pw_log/log.h"
26 #include "pw_sync/lock_annotations.h"
27 #include "pw_sync/mutex.h"
28 #include "pw_sync/timed_thread_notification.h"
29 #include "pw_thread/test_thread_context.h"
30 #include "pw_unit_test/framework.h"
31 #include "pw_work_queue/work_queue.h"
32
33 // Waits for something critical for test execution.
34 // We use PW_CHECK to ensure we crash on timeout instead of hanging forever.
35 // This is a macro so the crash points to the invocation site.
36 #define ASSERT_WAIT(waitable) PW_CHECK(waitable.try_acquire_for(1000ms))
37
38 namespace pw::uart {
39 namespace {
40
41 using namespace std::chrono_literals;
42
43 // A mock UartNonBlocking for testing the blocking adapter.
44 class UartNonBlockingMock : public UartNonBlocking {
45 public:
enabled() const46 bool enabled() const { return enabled_; }
47
WaitAndCompleteRead(Status status,ConstByteSpan data)48 void WaitAndCompleteRead(Status status, ConstByteSpan data) {
49 // Wait for a read to start.
50 ASSERT_WAIT(read_started_);
51
52 std::optional<ReadTransaction> read = ConsumeCurrentRead();
53 PW_CHECK(read.has_value());
54
55 // Copy data into rx buffer;
56 PW_CHECK_UINT_GE(read->rx_buffer.size(), data.size());
57 std::copy(data.begin(), data.end(), read->rx_buffer.begin());
58
59 read->Complete(status, data.size());
60 }
61
WaitForWrite()62 ConstByteSpan WaitForWrite() PW_LOCKS_EXCLUDED(mutex_) {
63 // Wait for a write to start.
64 ASSERT_WAIT(write_started_);
65
66 std::lock_guard lock(mutex_);
67 PW_CHECK(current_write_.has_value());
68 return current_write_->tx_buffer;
69 }
70
CompleteWrite(StatusWithSize status_size)71 void CompleteWrite(StatusWithSize status_size) {
72 std::optional<WriteTransaction> write = ConsumeCurrentWrite();
73 PW_CHECK(write.has_value());
74 write->Complete(status_size);
75 }
76
WaitAndCompleteFlush(Status status)77 void WaitAndCompleteFlush(Status status) {
78 // Wait for a flush to start.
79 ASSERT_WAIT(flush_started_);
80
81 std::optional<FlushTransaction> flush = ConsumeCurrentFlush();
82 PW_CHECK(flush.has_value());
83
84 flush->Complete(status);
85 }
86
87 private:
88 sync::Mutex mutex_;
89 bool enabled_ = false;
90
91 //
92 // UartNonBlocking impl.
93 //
DoEnable(bool enabled)94 Status DoEnable(bool enabled) override {
95 enabled_ = enabled;
96 return OkStatus();
97 }
98
DoSetBaudRate(uint32_t)99 Status DoSetBaudRate(uint32_t) override { return OkStatus(); }
DoConservativeReadAvailable()100 size_t DoConservativeReadAvailable() override { return 0; }
DoClearPendingReceiveBytes()101 Status DoClearPendingReceiveBytes() override { return OkStatus(); }
102
103 // Read
104 struct ReadTransaction {
105 ByteSpan rx_buffer;
106 size_t min_bytes;
107 Function<void(Status, ConstByteSpan buffer)> callback;
108
Completepw::uart::__anonea452bc80111::UartNonBlockingMock::ReadTransaction109 void Complete(Status status, size_t num_bytes) {
110 callback(status, rx_buffer.first(num_bytes));
111 }
112 };
113 std::optional<ReadTransaction> current_read_ PW_GUARDED_BY(mutex_);
114 sync::TimedThreadNotification read_started_;
115
ConsumeCurrentRead()116 std::optional<ReadTransaction> ConsumeCurrentRead()
117 PW_LOCKS_EXCLUDED(mutex_) {
118 std::lock_guard lock(mutex_);
119 return std::exchange(current_read_, std::nullopt);
120 }
121
DoRead(ByteSpan rx_buffer,size_t min_bytes,Function<void (Status,ConstByteSpan buffer)> && callback)122 Status DoRead(ByteSpan rx_buffer,
123 size_t min_bytes,
124 Function<void(Status, ConstByteSpan buffer)>&& callback)
125 override PW_LOCKS_EXCLUDED(mutex_) {
126 {
127 std::lock_guard lock(mutex_);
128
129 if (current_read_) {
130 return Status::Unavailable();
131 }
132
133 current_read_.emplace(ReadTransaction{
134 .rx_buffer = rx_buffer,
135 .min_bytes = min_bytes,
136 .callback = std::move(callback),
137 });
138 }
139
140 read_started_.release();
141 return OkStatus();
142 }
143
DoCancelRead()144 bool DoCancelRead() override {
145 std::optional<ReadTransaction> read = ConsumeCurrentRead();
146 if (!read.has_value()) {
147 return false;
148 }
149 read->Complete(Status::Cancelled(), 0);
150 return true;
151 }
152
153 // Write
154 struct WriteTransaction {
155 ConstByteSpan tx_buffer;
156 Function<void(StatusWithSize)> callback;
157
Completepw::uart::__anonea452bc80111::UartNonBlockingMock::WriteTransaction158 void Complete(StatusWithSize status_size) { callback(status_size); }
159 };
160 std::optional<WriteTransaction> current_write_ PW_GUARDED_BY(mutex_);
161 sync::TimedThreadNotification write_started_;
162
ConsumeCurrentWrite()163 std::optional<WriteTransaction> ConsumeCurrentWrite()
164 PW_LOCKS_EXCLUDED(mutex_) {
165 std::lock_guard lock(mutex_);
166 return std::exchange(current_write_, std::nullopt);
167 }
168
DoWrite(ConstByteSpan tx_buffer,Function<void (StatusWithSize status)> && callback)169 Status DoWrite(ConstByteSpan tx_buffer,
170 Function<void(StatusWithSize status)>&& callback) override
171 PW_LOCKS_EXCLUDED(mutex_) {
172 {
173 std::lock_guard lock(mutex_);
174
175 if (current_write_) {
176 return Status::Unavailable();
177 }
178
179 current_write_.emplace(WriteTransaction{
180 .tx_buffer = tx_buffer,
181 .callback = std::move(callback),
182 });
183 }
184
185 write_started_.release();
186 return OkStatus();
187 }
188
DoCancelWrite()189 bool DoCancelWrite() override {
190 std::optional<WriteTransaction> write = ConsumeCurrentWrite();
191 if (!write.has_value()) {
192 return false;
193 }
194 write->Complete(StatusWithSize::Cancelled(0));
195 return true;
196 }
197
198 // Flush
199 struct FlushTransaction {
200 Function<void(Status)> callback;
201
Completepw::uart::__anonea452bc80111::UartNonBlockingMock::FlushTransaction202 void Complete(Status status) { callback(status); }
203 };
204 std::optional<FlushTransaction> current_flush_ PW_GUARDED_BY(mutex_);
205 sync::TimedThreadNotification flush_started_;
206
ConsumeCurrentFlush()207 std::optional<FlushTransaction> ConsumeCurrentFlush()
208 PW_LOCKS_EXCLUDED(mutex_) {
209 std::lock_guard lock(mutex_);
210 return std::exchange(current_flush_, std::nullopt);
211 }
212
DoFlushOutput(Function<void (Status status)> && callback)213 Status DoFlushOutput(Function<void(Status status)>&& callback) override
214 PW_LOCKS_EXCLUDED(mutex_) {
215 {
216 std::lock_guard lock(mutex_);
217
218 if (current_flush_) {
219 return Status::Unavailable();
220 }
221
222 current_flush_.emplace(FlushTransaction{
223 .callback = std::move(callback),
224 });
225 }
226
227 flush_started_.release();
228 return OkStatus();
229 }
230
DoCancelFlushOutput()231 bool DoCancelFlushOutput() override {
232 std::optional<FlushTransaction> flush = ConsumeCurrentFlush();
233 if (!flush.has_value()) {
234 return false;
235 }
236 flush->Complete(Status::Cancelled());
237 return true;
238 }
239 };
240
241 // Test fixture
242 class BlockingAdapterTest : public ::testing::Test {
243 protected:
BlockingAdapterTest()244 BlockingAdapterTest() : adapter(underlying) {}
245
246 UartNonBlockingMock underlying;
247 UartBlockingAdapter adapter;
248
249 work_queue::WorkQueueWithBuffer<2> work_queue;
250
251 // State used by tests.
252 // Ideally these would be locals, but that would require capturing more than
253 // one pointer worth of data, exceeding PW_FUNCTION_INLINE_CALLABLE_SIZE.
254 sync::TimedThreadNotification blocking_action_complete;
255 static constexpr auto kReadBufferSize = 16;
256 std::array<std::byte, kReadBufferSize> read_buffer;
257 StatusWithSize read_result;
258 Status write_result;
259
SetUp()260 void SetUp() override { StartWorkQueueThread(); }
261
TearDown()262 void TearDown() override { StopWorkQueueThread(); }
263
StartWorkQueueThread()264 void StartWorkQueueThread() {
265 PW_CHECK(!work_queue_thread_, "WorkQueue thread already started");
266 work_queue_thread_context_ =
267 std::make_unique<thread::test::TestThreadContext>();
268 work_queue_thread_.emplace(work_queue_thread_context_->options(),
269 work_queue);
270 }
271
StopWorkQueueThread()272 void StopWorkQueueThread() {
273 if (work_queue_thread_) {
274 PW_LOG_DEBUG("Stopping work queue...");
275 work_queue.RequestStop();
276 #if PW_THREAD_JOINING_ENABLED
277 work_queue_thread_->join();
278 #else
279 work_queue_thread_->detach();
280 #endif
281 // Once stopped, the WorkQueue cannot be started again (stop_requested_
282 // latches), so we don't set work_queue_thread_ to std::nullopt here.
283 // work_queue_thread_ = std::nullopt;
284 }
285 }
286
287 private:
288 std::unique_ptr<thread::test::TestThreadContext> work_queue_thread_context_;
289 std::optional<thread::Thread> work_queue_thread_;
290 };
291
292 //
293 // Enable
294 //
295
TEST_F(BlockingAdapterTest,EnableWorks)296 TEST_F(BlockingAdapterTest, EnableWorks) {
297 // Start out disabled
298 ASSERT_FALSE(underlying.enabled());
299
300 // Can enable
301 PW_TEST_EXPECT_OK(adapter.Enable());
302 EXPECT_TRUE(underlying.enabled());
303 }
304
TEST_F(BlockingAdapterTest,DisableWorks)305 TEST_F(BlockingAdapterTest, DisableWorks) {
306 // Start out enabled
307 PW_TEST_ASSERT_OK(underlying.Enable());
308 ASSERT_TRUE(underlying.enabled());
309
310 // Can disable
311 PW_TEST_EXPECT_OK(adapter.Disable());
312 EXPECT_FALSE(underlying.enabled());
313 }
314
315 //
316 // Read
317 //
318
TEST_F(BlockingAdapterTest,ReadWorks)319 TEST_F(BlockingAdapterTest, ReadWorks) {
320 // Call blocking ReadExactly on the work queue.
321 work_queue.CheckPushWork([this]() {
322 PW_LOG_DEBUG("Calling adapter.ReadExactly()...");
323 read_result = adapter.ReadExactly(read_buffer);
324 blocking_action_complete.release();
325 });
326
327 constexpr auto kRxData = bytes::Array<0x12, 0x34, 0x56>();
328 static_assert(kRxData.size() <= kReadBufferSize);
329
330 underlying.WaitAndCompleteRead(OkStatus(), kRxData);
331
332 // Wait for the read to complete.
333 ASSERT_WAIT(blocking_action_complete);
334
335 PW_TEST_EXPECT_OK(read_result.status());
336 EXPECT_EQ(read_result.size(), kRxData.size());
337 EXPECT_TRUE(std::equal(kRxData.begin(), kRxData.end(), read_buffer.begin()));
338 }
339
TEST_F(BlockingAdapterTest,ReadHandlesTimeouts)340 TEST_F(BlockingAdapterTest, ReadHandlesTimeouts) {
341 // Call blocking TryReadExactlyFor on the work queue.
342 work_queue.CheckPushWork([this]() {
343 PW_LOG_DEBUG("Calling adapter.TryReadExactlyFor()...");
344 read_result = adapter.TryReadExactlyFor(read_buffer, 100ms);
345 blocking_action_complete.release();
346 });
347
348 // Don't complete the transaction; let it time out.
349
350 // Wait for the read to complete.
351 ASSERT_WAIT(blocking_action_complete);
352
353 EXPECT_EQ(read_result.status(), Status::DeadlineExceeded());
354 }
355
356 //
357 // Write
358 //
TEST_F(BlockingAdapterTest,WriteWorks)359 TEST_F(BlockingAdapterTest, WriteWorks) {
360 static constexpr auto kTxData = bytes::Array<0x12, 0x34, 0x56>();
361
362 // Call blocking Write on the work queue.
363 work_queue.CheckPushWork([this]() {
364 PW_LOG_DEBUG("Calling adapter.Write()...");
365 write_result = adapter.Write(kTxData);
366 blocking_action_complete.release();
367 });
368
369 ConstByteSpan tx_buffer = underlying.WaitForWrite();
370 EXPECT_EQ(tx_buffer.size(), kTxData.size());
371 EXPECT_TRUE(std::equal(tx_buffer.begin(), tx_buffer.end(), kTxData.begin()));
372
373 underlying.CompleteWrite(StatusWithSize(tx_buffer.size()));
374
375 // Wait for the write to complete.
376 ASSERT_WAIT(blocking_action_complete);
377 PW_TEST_EXPECT_OK(write_result);
378 }
379
TEST_F(BlockingAdapterTest,WriteHandlesTimeouts)380 TEST_F(BlockingAdapterTest, WriteHandlesTimeouts) {
381 static constexpr auto kTxData = bytes::Array<0x12, 0x34, 0x56>();
382
383 // Call blocking TryWriteFor on the work queue.
384 work_queue.CheckPushWork([this]() {
385 PW_LOG_DEBUG("Calling adapter.TryWriteFor()...");
386 write_result = adapter.TryWriteFor(kTxData, 100ms).status();
387 blocking_action_complete.release();
388 });
389
390 // Don't complete the transaction; let it time out.
391
392 // Wait for the write to complete.
393 ASSERT_WAIT(blocking_action_complete);
394 EXPECT_EQ(write_result, Status::DeadlineExceeded());
395 }
396
397 //
398 // FlushOutput
399 //
TEST_F(BlockingAdapterTest,FlushOutputWorks)400 TEST_F(BlockingAdapterTest, FlushOutputWorks) {
401 // Call blocking FlushOutput on the work queue.
402 work_queue.CheckPushWork([this]() {
403 PW_LOG_DEBUG("Calling adapter.FlushOutput()...");
404 write_result = adapter.FlushOutput();
405 blocking_action_complete.release();
406 });
407
408 underlying.WaitAndCompleteFlush(OkStatus());
409
410 // Wait for the flush to complete.
411 ASSERT_WAIT(blocking_action_complete);
412 PW_TEST_EXPECT_OK(write_result);
413 }
414
415 // FlushOutput does not provide a variant with timeout.
416
417 } // namespace
418 } // namespace pw::uart
419