xref: /aosp_15_r20/external/pigweed/pw_channel/forwarding_channel_test.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_channel/forwarding_channel.h"
16 
17 #include <algorithm>
18 #include <array>
19 
20 #include "pw_allocator/testing.h"
21 #include "pw_multibuf/header_chunk_region_tracker.h"
22 #include "pw_multibuf/simple_allocator.h"
23 #include "pw_string/string.h"
24 #include "pw_unit_test/framework.h"
25 
26 namespace {
27 
28 using ::pw::Result;
29 using ::pw::async2::Context;
30 using ::pw::async2::Pending;
31 using ::pw::async2::Poll;
32 using ::pw::async2::Ready;
33 using ::pw::async2::Task;
34 using ::pw::async2::Waker;
35 using ::pw::channel::ByteReader;
36 using ::pw::channel::DatagramReader;
37 using ::pw::multibuf::MultiBuf;
38 
39 // Creates and initializes a MultiBuf to the specified value.
40 class InitializedMultiBuf {
41  public:
InitializedMultiBuf(std::string_view contents)42   InitializedMultiBuf(std::string_view contents) {
43     std::optional<pw::multibuf::OwnedChunk> chunk =
44         pw::multibuf::HeaderChunkRegionTracker::AllocateRegionAsChunk(
45             allocator_, contents.size());
46     std::memcpy(chunk.value().data(), contents.data(), contents.size());
47     buf_.PushFrontChunk(std::move(*chunk));
48   }
49 
Take()50   pw::multibuf::MultiBuf Take() { return std::move(buf_); }
51 
52  private:
53   pw::allocator::test::AllocatorForTest<2048> allocator_;
54   pw::multibuf::MultiBuf buf_;
55 };
56 
CopyToString(const pw::multibuf::MultiBuf & mb)57 pw::InlineString<128> CopyToString(const pw::multibuf::MultiBuf& mb) {
58   pw::InlineString<128> contents(mb.size(), '\0');
59   std::copy(
60       mb.begin(), mb.end(), reinterpret_cast<std::byte*>(contents.data()));
61   return contents;
62 }
63 
64 template <pw::channel::DataType kType,
65           size_t kDataSize = 128,
66           size_t kMetaSize = 128>
67 class TestChannelPair {
68  public:
TestChannelPair()69   TestChannelPair()
70       : first_out_alloc_(first_out_data_area_, meta_alloc_),
71         second_out_alloc_(second_out_data_area_, meta_alloc_),
72         pair_(first_out_alloc_, second_out_alloc_) {}
73 
operator ->()74   pw::channel::ForwardingChannelPair<kType>* operator->() { return &pair_; }
operator *()75   pw::channel::ForwardingChannelPair<kType>& operator*() { return pair_; }
76 
77  private:
78   std::array<std::byte, kDataSize> first_out_data_area_;
79   std::array<std::byte, kDataSize> second_out_data_area_;
80   pw::allocator::test::AllocatorForTest<kMetaSize> meta_alloc_;
81   pw::multibuf::SimpleAllocator first_out_alloc_;
82   pw::multibuf::SimpleAllocator second_out_alloc_;
83 
84   pw::channel::ForwardingChannelPair<kType> pair_;
85 };
86 
87 // TODO: b/330788671 - Have the test tasks run in multiple stages to ensure that
88 //     wakers are stored / woken properly by ForwardingChannel.
TEST(ForwardingDatagramChannel,ForwardsEmptyDatagrams)89 TEST(ForwardingDatagramChannel, ForwardsEmptyDatagrams) {
90   pw::async2::Dispatcher dispatcher;
91 
92   class : public pw::async2::Task {
93    public:
94     TestChannelPair<pw::channel::DataType::kDatagram> pair;
95 
96     int test_completed = 0;
97 
98    private:
99     pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
100       // No data yet
101       EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
102       EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
103 
104       // Send datagram first->second
105       EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
106                 pair->first().PendReadyToWrite(cx));
107       PW_TEST_EXPECT_OK(pair->first().StageWrite({}));  // Write empty datagram
108 
109       EXPECT_EQ(pw::async2::Pending(), pair->first().PendReadyToWrite(cx));
110       EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
111 
112       auto empty_chunk_result = pair->second().PendRead(cx);
113       EXPECT_TRUE(empty_chunk_result.IsReady());
114       EXPECT_TRUE(empty_chunk_result->ok());
115       EXPECT_EQ((*empty_chunk_result)->size(), 0u);
116 
117       EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
118 
119       // Send datagram second->first
120       EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
121                 pair->second().PendReadyToWrite(cx));
122       PW_TEST_EXPECT_OK(pair->second().StageWrite({}));  // Write empty datagram
123 
124       EXPECT_EQ(pw::async2::Pending(), pair->second().PendReadyToWrite(cx));
125       EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
126 
127       empty_chunk_result = pair->first().PendRead(cx);
128       EXPECT_TRUE(empty_chunk_result.IsReady());
129       EXPECT_TRUE(empty_chunk_result->ok());
130       EXPECT_EQ((*empty_chunk_result)->size(), 0u);
131 
132       EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
133 
134       test_completed += 1;
135       return pw::async2::Ready();
136     }
137   } test_task;
138 
139   dispatcher.Post(test_task);
140 
141   EXPECT_TRUE(dispatcher.RunUntilStalled().IsReady());
142   EXPECT_EQ(test_task.test_completed, 1);
143 }
144 
TEST(ForwardingDatagramChannel,ForwardsNonEmptyDatagrams)145 TEST(ForwardingDatagramChannel, ForwardsNonEmptyDatagrams) {
146   pw::async2::Dispatcher dispatcher;
147 
148   class : public pw::async2::Task {
149    public:
150     TestChannelPair<pw::channel::DataType::kDatagram> pair;
151 
152     int test_completed = 0;
153 
154    private:
155     pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
156       InitializedMultiBuf b1("Hello");
157       InitializedMultiBuf b2("world!");
158 
159       // Send datagram first->second
160       EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
161                 pair->first().PendReadyToWrite(cx));
162       PW_TEST_EXPECT_OK(pair->first().StageWrite(b1.Take()));
163 
164       EXPECT_EQ(pw::async2::Pending(), pair->first().PendReadyToWrite(cx));
165 
166       EXPECT_EQ(CopyToString(pair->second().PendRead(cx).value().value()),
167                 "Hello");
168 
169       EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
170                 pair->first().PendReadyToWrite(cx));
171       EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
172 
173       PW_TEST_EXPECT_OK(pair->first().StageWrite(b2.Take()));
174       EXPECT_EQ(CopyToString(pair->second().PendRead(cx).value().value()),
175                 "world!");
176 
177       EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
178       EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
179                 pair->first().PendReadyToWrite(cx));
180 
181       test_completed += 1;
182       return pw::async2::Ready();
183     }
184   } test_task;
185 
186   dispatcher.Post(test_task);
187 
188   EXPECT_TRUE(dispatcher.RunUntilStalled().IsReady());
189   EXPECT_EQ(test_task.test_completed, 1);
190 }
191 
TEST(ForwardingDatagramChannel,ForwardsDatagrams)192 TEST(ForwardingDatagramChannel, ForwardsDatagrams) {
193   pw::async2::Dispatcher dispatcher;
194 
195   class : public pw::async2::Task {
196    public:
197     TestChannelPair<pw::channel::DataType::kDatagram> pair;
198 
199     int test_completed = 0;
200 
201    private:
202     pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
203       // No data yet
204       EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
205       EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
206 
207       // Send datagram first->second
208       EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
209                 pair->first().PendReadyToWrite(cx));
210       PW_TEST_EXPECT_OK(pair->first().StageWrite({}));  // Write empty datagram
211 
212       EXPECT_EQ(pw::async2::Pending(), pair->first().PendReadyToWrite(cx));
213       EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
214 
215       auto empty_chunk_result = pair->second().PendRead(cx);
216       EXPECT_TRUE(empty_chunk_result.IsReady());
217       EXPECT_TRUE(empty_chunk_result->ok());
218       EXPECT_EQ((*empty_chunk_result)->size(), 0u);
219 
220       EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
221 
222       // Send datagram second->first
223       EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
224                 pair->second().PendReadyToWrite(cx));
225       PW_TEST_EXPECT_OK(pair->second().StageWrite({}));  // Write empty datagram
226 
227       EXPECT_EQ(pw::async2::Pending(), pair->second().PendReadyToWrite(cx));
228       EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
229 
230       empty_chunk_result = pair->first().PendRead(cx);
231       EXPECT_TRUE(empty_chunk_result.IsReady());
232       EXPECT_TRUE(empty_chunk_result->ok());
233       EXPECT_EQ((*empty_chunk_result)->size(), 0u);
234 
235       EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
236 
237       test_completed += 1;
238       return pw::async2::Ready();
239     }
240   } test_task;
241 
242   dispatcher.Post(test_task);
243 
244   EXPECT_TRUE(dispatcher.RunUntilStalled().IsReady());
245   EXPECT_EQ(test_task.test_completed, 1);
246 }
247 
TEST(ForwardingDatagramchannel,PendCloseAwakensAndClosesPeer)248 TEST(ForwardingDatagramchannel, PendCloseAwakensAndClosesPeer) {
249   class TryToReadUntilClosed : public Task {
250    public:
251     TryToReadUntilClosed(DatagramReader& reader) : reader_(reader) {}
252 
253     int packets_read = 0;
254     Waker waker;
255 
256    private:
257     pw::async2::Poll<> DoPend(Context& cx) final {
258       Poll<Result<MultiBuf>> read = reader_.PendRead(cx);
259       if (read.IsPending()) {
260         PW_ASYNC_STORE_WAKER(
261             cx, waker, "TryToReadUntilClosed is waiting for reader");
262         return Pending();
263       }
264 
265       if (read->ok()) {
266         packets_read += 1;
267         EXPECT_TRUE(read->value().empty());
268         return Pending();
269       }
270       EXPECT_EQ(read->status(), pw::Status::FailedPrecondition());
271       return Ready();
272     }
273 
274     DatagramReader& reader_;
275   };
276 
277   pw::async2::Dispatcher dispatcher;
278   TestChannelPair<pw::channel::DataType::kDatagram> pair;
279   TryToReadUntilClosed read_task(pair->first());
280   dispatcher.Post(read_task);
281 
282   EXPECT_EQ(dispatcher.RunUntilStalled(), Pending());
283   EXPECT_EQ(dispatcher.RunUntilStalled(), Pending());
284 
285   Waker empty_waker;
286   Context empty_cx(dispatcher, empty_waker);
287 
288   // Write a datagram, but close before the datagram is read.
289   EXPECT_EQ(pair->second().PendReadyToWrite(empty_cx), Ready(pw::OkStatus()));
290   PW_TEST_EXPECT_OK(pair->second().StageWrite({}));
291   EXPECT_EQ(pair->second().PendClose(empty_cx), Ready(pw::OkStatus()));
292   EXPECT_FALSE(pair->second().is_read_or_write_open());
293 
294   // Closed second, so first is closed for writes, but still open for reads.
295   EXPECT_TRUE(pair->first().is_read_open());
296   EXPECT_FALSE(pair->first().is_write_open());
297 
298   // First should read the packet and immediately be marked closed.
299   EXPECT_EQ(read_task.packets_read, 0);
300   EXPECT_EQ(dispatcher.RunUntilStalled(), Pending());
301   EXPECT_EQ(read_task.packets_read, 1);
302 
303   EXPECT_FALSE(pair->first().is_read_or_write_open());
304 
305   std::move(read_task.waker).Wake();  // wake the task so it runs again
306   EXPECT_EQ(dispatcher.RunUntilStalled(), Ready());  // runs to completion
307 
308   EXPECT_FALSE(pair->first().is_read_or_write_open());
309   EXPECT_EQ(read_task.packets_read, 1);
310 }
311 
TEST(ForwardingByteChannel,IgnoresEmptyWrites)312 TEST(ForwardingByteChannel, IgnoresEmptyWrites) {
313   pw::async2::Dispatcher dispatcher;
314 
315   class : public pw::async2::Task {
316    public:
317     TestChannelPair<pw::channel::DataType::kByte> pair;
318 
319     int test_completed = 0;
320 
321    private:
322     pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
323       // No data yet
324       EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
325       EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
326 
327       // Send nothing first->second
328       EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
329                 pair->first().PendReadyToWrite(cx));
330       PW_TEST_EXPECT_OK(pair->first().StageWrite({}));
331 
332       // Still no data
333       EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
334       EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
335 
336       // Send nothing second->first
337       EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
338                 pair->first().PendReadyToWrite(cx));
339       PW_TEST_EXPECT_OK(pair->first().StageWrite({}));
340 
341       // Still no data
342       EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
343       EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
344 
345       test_completed += 1;
346       return pw::async2::Ready();
347     }
348   } test_task;
349 
350   dispatcher.Post(test_task);
351 
352   EXPECT_TRUE(dispatcher.RunUntilStalled().IsReady());
353   EXPECT_EQ(test_task.test_completed, 1);
354 }
355 
TEST(ForwardingByteChannel,WriteData)356 TEST(ForwardingByteChannel, WriteData) {
357   class ReadTask : public pw::async2::Task {
358    public:
359     ReadTask(pw::channel::ForwardingByteChannelPair& pair) : pair_(pair) {}
360 
361    private:
362     pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
363       EXPECT_EQ(pw::async2::Pending(), pair_.first().PendRead(cx));
364 
365       auto hello_world_result = pair_.second().PendRead(cx);
366       if (hello_world_result.IsPending()) {
367         return pw::async2::Pending();
368       }
369 
370       EXPECT_EQ(CopyToString(hello_world_result->value()), "hello world");
371 
372       return pw::async2::Ready();
373     }
374 
375     pw::channel::ForwardingByteChannelPair& pair_;
376   };
377 
378   class WriteTask : public pw::async2::Task {
379    public:
380     WriteTask(pw::channel::ForwardingByteChannelPair& pair, MultiBuf&& data)
381         : pair_(pair), data_(std::move(data)) {}
382 
383    private:
384     pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
385       EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
386                 pair_.first().PendReadyToWrite(cx));
387       EXPECT_EQ(pw::OkStatus(), pair_.first().StageWrite(std::move(data_)));
388       return pw::async2::Ready();
389     }
390 
391     pw::channel::ForwardingByteChannelPair& pair_;
392     MultiBuf data_;
393   };
394 
395   InitializedMultiBuf data("hello world");
396 
397   TestChannelPair<pw::channel::DataType::kByte> pair;
398   ReadTask read_task(*pair);
399   WriteTask write_task(*pair, data.Take());
400 
401   pw::async2::Dispatcher dispatcher;
402 
403   dispatcher.Post(read_task);
404   ASSERT_FALSE(dispatcher.RunUntilStalled().IsReady());
405   ASSERT_FALSE(dispatcher.RunUntilStalled().IsReady());
406 
407   dispatcher.Post(write_task);
408   ASSERT_TRUE(dispatcher.RunUntilStalled().IsReady());
409 }
410 
TEST(ForwardingByteChannel,WriteDataInMultiplePieces)411 TEST(ForwardingByteChannel, WriteDataInMultiplePieces) {
412   pw::async2::Dispatcher dispatcher;
413 
414   class : public pw::async2::Task {
415    public:
416     TestChannelPair<pw::channel::DataType::kByte> pair;
417 
418     int test_completed = 0;
419 
420    private:
421     pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
422       // No data yet
423       EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
424       EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
425 
426       InitializedMultiBuf b1("hello");
427       InitializedMultiBuf b2(" ");
428       InitializedMultiBuf b3("world");
429 
430       // Send "hello world" first->second
431       EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
432                 pair->first().PendReadyToWrite(cx));
433       EXPECT_EQ(pw::OkStatus(), pair->first().StageWrite(b1.Take()));
434       EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
435                 pair->first().PendReadyToWrite(cx));
436       EXPECT_EQ(pw::OkStatus(), pair->first().StageWrite(b2.Take()));
437       EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
438                 pair->first().PendReadyToWrite(cx));
439       EXPECT_EQ(pw::OkStatus(), pair->first().StageWrite(b3.Take()));
440 
441       EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
442 
443       auto hello_world_result = pair->second().PendRead(cx);
444       EXPECT_TRUE(hello_world_result.IsReady());
445 
446       EXPECT_EQ(CopyToString(hello_world_result->value()), "hello world");
447 
448       // Send nothing second->first
449       EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
450                 pair->first().PendReadyToWrite(cx));
451       EXPECT_EQ(pw::OkStatus(), pair->first().StageWrite({}));
452 
453       // Still no data
454       EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
455       EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
456 
457       test_completed += 1;
458       return pw::async2::Ready();
459     }
460   } test_task;
461 
462   dispatcher.Post(test_task);
463 
464   EXPECT_TRUE(dispatcher.RunUntilStalled().IsReady());
465   EXPECT_EQ(test_task.test_completed, 1);
466 }
467 
TEST(ForwardingByteChannel,PendCloseAwakensAndClosesPeer)468 TEST(ForwardingByteChannel, PendCloseAwakensAndClosesPeer) {
469   class TryToReadUntilClosed : public Task {
470    public:
471     TryToReadUntilClosed(ByteReader& reader) : reader_(reader) {}
472 
473     int bytes_read = 0;
474     Waker waker;
475 
476    private:
477     pw::async2::Poll<> DoPend(Context& cx) final {
478       Poll<Result<MultiBuf>> read = reader_.PendRead(cx);
479       if (read.IsPending()) {
480         PW_ASYNC_STORE_WAKER(
481             cx, waker, "TryToReadUntilClosed is waiting for reader");
482         return Pending();
483       }
484 
485       if (read->ok()) {
486         bytes_read += read->value().size();
487         EXPECT_EQ(read->value().size(), 5u);
488         return Pending();
489       }
490 
491       EXPECT_EQ(read->status(), pw::Status::FailedPrecondition());
492       return Ready();
493     }
494     ByteReader& reader_;
495   };
496 
497   pw::async2::Dispatcher dispatcher;
498   TestChannelPair<pw::channel::DataType::kByte> pair;
499   TryToReadUntilClosed read_task(pair->first());
500   dispatcher.Post(read_task);
501 
502   EXPECT_EQ(dispatcher.RunUntilStalled(), Pending());
503   EXPECT_EQ(dispatcher.RunUntilStalled(), Pending());
504 
505   Waker empty_waker;
506   Context empty_cx(dispatcher, empty_waker);
507 
508   InitializedMultiBuf data("hello");
509 
510   // Write a datagram, but close before the datagram is read.
511   EXPECT_EQ(pair->second().PendReadyToWrite(empty_cx), Ready(pw::OkStatus()));
512   EXPECT_EQ(pair->second().StageWrite(data.Take()), pw::OkStatus());
513   EXPECT_EQ(pair->second().PendClose(empty_cx), Ready(pw::OkStatus()));
514   EXPECT_FALSE(pair->second().is_read_or_write_open());
515 
516   // Closed second, so first is closed for writes, but still open for reads.
517   EXPECT_TRUE(pair->first().is_read_open());
518   EXPECT_FALSE(pair->first().is_write_open());
519 
520   // First should read the packet and immediately be marked closed.
521   EXPECT_EQ(read_task.bytes_read, 0);
522   EXPECT_EQ(dispatcher.RunUntilStalled(), Pending());
523   EXPECT_EQ(read_task.bytes_read, 5);
524 
525   EXPECT_FALSE(pair->second().is_read_or_write_open());
526 
527   std::move(read_task.waker).Wake();  // wake the task so it runs again
528   EXPECT_EQ(dispatcher.RunUntilStalled(), Ready());  // runs to completion
529 
530   EXPECT_FALSE(pair->first().is_read_or_write_open());
531   EXPECT_EQ(read_task.bytes_read, 5);
532 }
533 
534 }  // namespace
535