xref: /aosp_15_r20/external/pigweed/pw_uart/blocking_adapter_test.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_uart/blocking_adapter.h"
16 
17 #include <array>
18 #include <memory>
19 #include <mutex>
20 #include <optional>
21 #include <utility>
22 
23 #include "pw_assert/check.h"
24 #include "pw_bytes/array.h"
25 #include "pw_log/log.h"
26 #include "pw_sync/lock_annotations.h"
27 #include "pw_sync/mutex.h"
28 #include "pw_sync/timed_thread_notification.h"
29 #include "pw_thread/test_thread_context.h"
30 #include "pw_unit_test/framework.h"
31 #include "pw_work_queue/work_queue.h"
32 
33 // Waits for something critical for test execution.
34 // We use PW_CHECK to ensure we crash on timeout instead of hanging forever.
35 // This is a macro so the crash points to the invocation site.
36 #define ASSERT_WAIT(waitable) PW_CHECK(waitable.try_acquire_for(1000ms))
37 
38 namespace pw::uart {
39 namespace {
40 
41 using namespace std::chrono_literals;
42 
43 // A mock UartNonBlocking for testing the blocking adapter.
44 class UartNonBlockingMock : public UartNonBlocking {
45  public:
enabled() const46   bool enabled() const { return enabled_; }
47 
WaitAndCompleteRead(Status status,ConstByteSpan data)48   void WaitAndCompleteRead(Status status, ConstByteSpan data) {
49     // Wait for a read to start.
50     ASSERT_WAIT(read_started_);
51 
52     std::optional<ReadTransaction> read = ConsumeCurrentRead();
53     PW_CHECK(read.has_value());
54 
55     // Copy data into rx buffer;
56     PW_CHECK_UINT_GE(read->rx_buffer.size(), data.size());
57     std::copy(data.begin(), data.end(), read->rx_buffer.begin());
58 
59     read->Complete(status, data.size());
60   }
61 
WaitForWrite()62   ConstByteSpan WaitForWrite() PW_LOCKS_EXCLUDED(mutex_) {
63     // Wait for a write to start.
64     ASSERT_WAIT(write_started_);
65 
66     std::lock_guard lock(mutex_);
67     PW_CHECK(current_write_.has_value());
68     return current_write_->tx_buffer;
69   }
70 
CompleteWrite(StatusWithSize status_size)71   void CompleteWrite(StatusWithSize status_size) {
72     std::optional<WriteTransaction> write = ConsumeCurrentWrite();
73     PW_CHECK(write.has_value());
74     write->Complete(status_size);
75   }
76 
WaitAndCompleteFlush(Status status)77   void WaitAndCompleteFlush(Status status) {
78     // Wait for a flush to start.
79     ASSERT_WAIT(flush_started_);
80 
81     std::optional<FlushTransaction> flush = ConsumeCurrentFlush();
82     PW_CHECK(flush.has_value());
83 
84     flush->Complete(status);
85   }
86 
87  private:
88   sync::Mutex mutex_;
89   bool enabled_ = false;
90 
91   //
92   // UartNonBlocking impl.
93   //
DoEnable(bool enabled)94   Status DoEnable(bool enabled) override {
95     enabled_ = enabled;
96     return OkStatus();
97   }
98 
DoSetBaudRate(uint32_t)99   Status DoSetBaudRate(uint32_t) override { return OkStatus(); }
DoConservativeReadAvailable()100   size_t DoConservativeReadAvailable() override { return 0; }
DoClearPendingReceiveBytes()101   Status DoClearPendingReceiveBytes() override { return OkStatus(); }
102 
103   // Read
104   struct ReadTransaction {
105     ByteSpan rx_buffer;
106     size_t min_bytes;
107     Function<void(Status, ConstByteSpan buffer)> callback;
108 
Completepw::uart::__anonea452bc80111::UartNonBlockingMock::ReadTransaction109     void Complete(Status status, size_t num_bytes) {
110       callback(status, rx_buffer.first(num_bytes));
111     }
112   };
113   std::optional<ReadTransaction> current_read_ PW_GUARDED_BY(mutex_);
114   sync::TimedThreadNotification read_started_;
115 
ConsumeCurrentRead()116   std::optional<ReadTransaction> ConsumeCurrentRead()
117       PW_LOCKS_EXCLUDED(mutex_) {
118     std::lock_guard lock(mutex_);
119     return std::exchange(current_read_, std::nullopt);
120   }
121 
DoRead(ByteSpan rx_buffer,size_t min_bytes,Function<void (Status,ConstByteSpan buffer)> && callback)122   Status DoRead(ByteSpan rx_buffer,
123                 size_t min_bytes,
124                 Function<void(Status, ConstByteSpan buffer)>&& callback)
125       override PW_LOCKS_EXCLUDED(mutex_) {
126     {
127       std::lock_guard lock(mutex_);
128 
129       if (current_read_) {
130         return Status::Unavailable();
131       }
132 
133       current_read_.emplace(ReadTransaction{
134           .rx_buffer = rx_buffer,
135           .min_bytes = min_bytes,
136           .callback = std::move(callback),
137       });
138     }
139 
140     read_started_.release();
141     return OkStatus();
142   }
143 
DoCancelRead()144   bool DoCancelRead() override {
145     std::optional<ReadTransaction> read = ConsumeCurrentRead();
146     if (!read.has_value()) {
147       return false;
148     }
149     read->Complete(Status::Cancelled(), 0);
150     return true;
151   }
152 
153   // Write
154   struct WriteTransaction {
155     ConstByteSpan tx_buffer;
156     Function<void(StatusWithSize)> callback;
157 
Completepw::uart::__anonea452bc80111::UartNonBlockingMock::WriteTransaction158     void Complete(StatusWithSize status_size) { callback(status_size); }
159   };
160   std::optional<WriteTransaction> current_write_ PW_GUARDED_BY(mutex_);
161   sync::TimedThreadNotification write_started_;
162 
ConsumeCurrentWrite()163   std::optional<WriteTransaction> ConsumeCurrentWrite()
164       PW_LOCKS_EXCLUDED(mutex_) {
165     std::lock_guard lock(mutex_);
166     return std::exchange(current_write_, std::nullopt);
167   }
168 
DoWrite(ConstByteSpan tx_buffer,Function<void (StatusWithSize status)> && callback)169   Status DoWrite(ConstByteSpan tx_buffer,
170                  Function<void(StatusWithSize status)>&& callback) override
171       PW_LOCKS_EXCLUDED(mutex_) {
172     {
173       std::lock_guard lock(mutex_);
174 
175       if (current_write_) {
176         return Status::Unavailable();
177       }
178 
179       current_write_.emplace(WriteTransaction{
180           .tx_buffer = tx_buffer,
181           .callback = std::move(callback),
182       });
183     }
184 
185     write_started_.release();
186     return OkStatus();
187   }
188 
DoCancelWrite()189   bool DoCancelWrite() override {
190     std::optional<WriteTransaction> write = ConsumeCurrentWrite();
191     if (!write.has_value()) {
192       return false;
193     }
194     write->Complete(StatusWithSize::Cancelled(0));
195     return true;
196   }
197 
198   // Flush
199   struct FlushTransaction {
200     Function<void(Status)> callback;
201 
Completepw::uart::__anonea452bc80111::UartNonBlockingMock::FlushTransaction202     void Complete(Status status) { callback(status); }
203   };
204   std::optional<FlushTransaction> current_flush_ PW_GUARDED_BY(mutex_);
205   sync::TimedThreadNotification flush_started_;
206 
ConsumeCurrentFlush()207   std::optional<FlushTransaction> ConsumeCurrentFlush()
208       PW_LOCKS_EXCLUDED(mutex_) {
209     std::lock_guard lock(mutex_);
210     return std::exchange(current_flush_, std::nullopt);
211   }
212 
DoFlushOutput(Function<void (Status status)> && callback)213   Status DoFlushOutput(Function<void(Status status)>&& callback) override
214       PW_LOCKS_EXCLUDED(mutex_) {
215     {
216       std::lock_guard lock(mutex_);
217 
218       if (current_flush_) {
219         return Status::Unavailable();
220       }
221 
222       current_flush_.emplace(FlushTransaction{
223           .callback = std::move(callback),
224       });
225     }
226 
227     flush_started_.release();
228     return OkStatus();
229   }
230 
DoCancelFlushOutput()231   bool DoCancelFlushOutput() override {
232     std::optional<FlushTransaction> flush = ConsumeCurrentFlush();
233     if (!flush.has_value()) {
234       return false;
235     }
236     flush->Complete(Status::Cancelled());
237     return true;
238   }
239 };
240 
241 // Test fixture
242 class BlockingAdapterTest : public ::testing::Test {
243  protected:
BlockingAdapterTest()244   BlockingAdapterTest() : adapter(underlying) {}
245 
246   UartNonBlockingMock underlying;
247   UartBlockingAdapter adapter;
248 
249   work_queue::WorkQueueWithBuffer<2> work_queue;
250 
251   // State used by tests.
252   // Ideally these would be locals, but that would require capturing more than
253   // one pointer worth of data, exceeding PW_FUNCTION_INLINE_CALLABLE_SIZE.
254   sync::TimedThreadNotification blocking_action_complete;
255   static constexpr auto kReadBufferSize = 16;
256   std::array<std::byte, kReadBufferSize> read_buffer;
257   StatusWithSize read_result;
258   Status write_result;
259 
SetUp()260   void SetUp() override { StartWorkQueueThread(); }
261 
TearDown()262   void TearDown() override { StopWorkQueueThread(); }
263 
StartWorkQueueThread()264   void StartWorkQueueThread() {
265     PW_CHECK(!work_queue_thread_, "WorkQueue thread already started");
266     work_queue_thread_context_ =
267         std::make_unique<thread::test::TestThreadContext>();
268     work_queue_thread_.emplace(work_queue_thread_context_->options(),
269                                work_queue);
270   }
271 
StopWorkQueueThread()272   void StopWorkQueueThread() {
273     if (work_queue_thread_) {
274       PW_LOG_DEBUG("Stopping work queue...");
275       work_queue.RequestStop();
276 #if PW_THREAD_JOINING_ENABLED
277       work_queue_thread_->join();
278 #else
279       work_queue_thread_->detach();
280 #endif
281       // Once stopped, the WorkQueue cannot be started again (stop_requested_
282       // latches), so we don't set work_queue_thread_ to std::nullopt here.
283       // work_queue_thread_ = std::nullopt;
284     }
285   }
286 
287  private:
288   std::unique_ptr<thread::test::TestThreadContext> work_queue_thread_context_;
289   std::optional<thread::Thread> work_queue_thread_;
290 };
291 
292 //
293 // Enable
294 //
295 
TEST_F(BlockingAdapterTest,EnableWorks)296 TEST_F(BlockingAdapterTest, EnableWorks) {
297   // Start out disabled
298   ASSERT_FALSE(underlying.enabled());
299 
300   // Can enable
301   PW_TEST_EXPECT_OK(adapter.Enable());
302   EXPECT_TRUE(underlying.enabled());
303 }
304 
TEST_F(BlockingAdapterTest,DisableWorks)305 TEST_F(BlockingAdapterTest, DisableWorks) {
306   // Start out enabled
307   PW_TEST_ASSERT_OK(underlying.Enable());
308   ASSERT_TRUE(underlying.enabled());
309 
310   // Can disable
311   PW_TEST_EXPECT_OK(adapter.Disable());
312   EXPECT_FALSE(underlying.enabled());
313 }
314 
315 //
316 // Read
317 //
318 
TEST_F(BlockingAdapterTest,ReadWorks)319 TEST_F(BlockingAdapterTest, ReadWorks) {
320   // Call blocking ReadExactly on the work queue.
321   work_queue.CheckPushWork([this]() {
322     PW_LOG_DEBUG("Calling adapter.ReadExactly()...");
323     read_result = adapter.ReadExactly(read_buffer);
324     blocking_action_complete.release();
325   });
326 
327   constexpr auto kRxData = bytes::Array<0x12, 0x34, 0x56>();
328   static_assert(kRxData.size() <= kReadBufferSize);
329 
330   underlying.WaitAndCompleteRead(OkStatus(), kRxData);
331 
332   // Wait for the read to complete.
333   ASSERT_WAIT(blocking_action_complete);
334 
335   PW_TEST_EXPECT_OK(read_result.status());
336   EXPECT_EQ(read_result.size(), kRxData.size());
337   EXPECT_TRUE(std::equal(kRxData.begin(), kRxData.end(), read_buffer.begin()));
338 }
339 
TEST_F(BlockingAdapterTest,ReadHandlesTimeouts)340 TEST_F(BlockingAdapterTest, ReadHandlesTimeouts) {
341   // Call blocking TryReadExactlyFor on the work queue.
342   work_queue.CheckPushWork([this]() {
343     PW_LOG_DEBUG("Calling adapter.TryReadExactlyFor()...");
344     read_result = adapter.TryReadExactlyFor(read_buffer, 100ms);
345     blocking_action_complete.release();
346   });
347 
348   // Don't complete the transaction; let it time out.
349 
350   // Wait for the read to complete.
351   ASSERT_WAIT(blocking_action_complete);
352 
353   EXPECT_EQ(read_result.status(), Status::DeadlineExceeded());
354 }
355 
356 //
357 // Write
358 //
TEST_F(BlockingAdapterTest,WriteWorks)359 TEST_F(BlockingAdapterTest, WriteWorks) {
360   static constexpr auto kTxData = bytes::Array<0x12, 0x34, 0x56>();
361 
362   // Call blocking Write on the work queue.
363   work_queue.CheckPushWork([this]() {
364     PW_LOG_DEBUG("Calling adapter.Write()...");
365     write_result = adapter.Write(kTxData);
366     blocking_action_complete.release();
367   });
368 
369   ConstByteSpan tx_buffer = underlying.WaitForWrite();
370   EXPECT_EQ(tx_buffer.size(), kTxData.size());
371   EXPECT_TRUE(std::equal(tx_buffer.begin(), tx_buffer.end(), kTxData.begin()));
372 
373   underlying.CompleteWrite(StatusWithSize(tx_buffer.size()));
374 
375   // Wait for the write to complete.
376   ASSERT_WAIT(blocking_action_complete);
377   PW_TEST_EXPECT_OK(write_result);
378 }
379 
TEST_F(BlockingAdapterTest,WriteHandlesTimeouts)380 TEST_F(BlockingAdapterTest, WriteHandlesTimeouts) {
381   static constexpr auto kTxData = bytes::Array<0x12, 0x34, 0x56>();
382 
383   // Call blocking TryWriteFor on the work queue.
384   work_queue.CheckPushWork([this]() {
385     PW_LOG_DEBUG("Calling adapter.TryWriteFor()...");
386     write_result = adapter.TryWriteFor(kTxData, 100ms).status();
387     blocking_action_complete.release();
388   });
389 
390   // Don't complete the transaction; let it time out.
391 
392   // Wait for the write to complete.
393   ASSERT_WAIT(blocking_action_complete);
394   EXPECT_EQ(write_result, Status::DeadlineExceeded());
395 }
396 
397 //
398 // FlushOutput
399 //
TEST_F(BlockingAdapterTest,FlushOutputWorks)400 TEST_F(BlockingAdapterTest, FlushOutputWorks) {
401   // Call blocking FlushOutput on the work queue.
402   work_queue.CheckPushWork([this]() {
403     PW_LOG_DEBUG("Calling adapter.FlushOutput()...");
404     write_result = adapter.FlushOutput();
405     blocking_action_complete.release();
406   });
407 
408   underlying.WaitAndCompleteFlush(OkStatus());
409 
410   // Wait for the flush to complete.
411   ASSERT_WAIT(blocking_action_complete);
412   PW_TEST_EXPECT_OK(write_result);
413 }
414 
415 // FlushOutput does not provide a variant with timeout.
416 
417 }  // namespace
418 }  // namespace pw::uart
419