1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_channel/forwarding_channel.h"
16
17 #include <algorithm>
18 #include <array>
19
20 #include "pw_allocator/testing.h"
21 #include "pw_multibuf/header_chunk_region_tracker.h"
22 #include "pw_multibuf/simple_allocator.h"
23 #include "pw_string/string.h"
24 #include "pw_unit_test/framework.h"
25
26 namespace {
27
28 using ::pw::Result;
29 using ::pw::async2::Context;
30 using ::pw::async2::Pending;
31 using ::pw::async2::Poll;
32 using ::pw::async2::Ready;
33 using ::pw::async2::Task;
34 using ::pw::async2::Waker;
35 using ::pw::channel::ByteReader;
36 using ::pw::channel::DatagramReader;
37 using ::pw::multibuf::MultiBuf;
38
39 // Creates and initializes a MultiBuf to the specified value.
40 class InitializedMultiBuf {
41 public:
InitializedMultiBuf(std::string_view contents)42 InitializedMultiBuf(std::string_view contents) {
43 std::optional<pw::multibuf::OwnedChunk> chunk =
44 pw::multibuf::HeaderChunkRegionTracker::AllocateRegionAsChunk(
45 allocator_, contents.size());
46 std::memcpy(chunk.value().data(), contents.data(), contents.size());
47 buf_.PushFrontChunk(std::move(*chunk));
48 }
49
Take()50 pw::multibuf::MultiBuf Take() { return std::move(buf_); }
51
52 private:
53 pw::allocator::test::AllocatorForTest<2048> allocator_;
54 pw::multibuf::MultiBuf buf_;
55 };
56
CopyToString(const pw::multibuf::MultiBuf & mb)57 pw::InlineString<128> CopyToString(const pw::multibuf::MultiBuf& mb) {
58 pw::InlineString<128> contents(mb.size(), '\0');
59 std::copy(
60 mb.begin(), mb.end(), reinterpret_cast<std::byte*>(contents.data()));
61 return contents;
62 }
63
64 template <pw::channel::DataType kType,
65 size_t kDataSize = 128,
66 size_t kMetaSize = 128>
67 class TestChannelPair {
68 public:
TestChannelPair()69 TestChannelPair()
70 : first_out_alloc_(first_out_data_area_, meta_alloc_),
71 second_out_alloc_(second_out_data_area_, meta_alloc_),
72 pair_(first_out_alloc_, second_out_alloc_) {}
73
operator ->()74 pw::channel::ForwardingChannelPair<kType>* operator->() { return &pair_; }
operator *()75 pw::channel::ForwardingChannelPair<kType>& operator*() { return pair_; }
76
77 private:
78 std::array<std::byte, kDataSize> first_out_data_area_;
79 std::array<std::byte, kDataSize> second_out_data_area_;
80 pw::allocator::test::AllocatorForTest<kMetaSize> meta_alloc_;
81 pw::multibuf::SimpleAllocator first_out_alloc_;
82 pw::multibuf::SimpleAllocator second_out_alloc_;
83
84 pw::channel::ForwardingChannelPair<kType> pair_;
85 };
86
87 // TODO: b/330788671 - Have the test tasks run in multiple stages to ensure that
88 // wakers are stored / woken properly by ForwardingChannel.
TEST(ForwardingDatagramChannel,ForwardsEmptyDatagrams)89 TEST(ForwardingDatagramChannel, ForwardsEmptyDatagrams) {
90 pw::async2::Dispatcher dispatcher;
91
92 class : public pw::async2::Task {
93 public:
94 TestChannelPair<pw::channel::DataType::kDatagram> pair;
95
96 int test_completed = 0;
97
98 private:
99 pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
100 // No data yet
101 EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
102 EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
103
104 // Send datagram first->second
105 EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
106 pair->first().PendReadyToWrite(cx));
107 PW_TEST_EXPECT_OK(pair->first().StageWrite({})); // Write empty datagram
108
109 EXPECT_EQ(pw::async2::Pending(), pair->first().PendReadyToWrite(cx));
110 EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
111
112 auto empty_chunk_result = pair->second().PendRead(cx);
113 EXPECT_TRUE(empty_chunk_result.IsReady());
114 EXPECT_TRUE(empty_chunk_result->ok());
115 EXPECT_EQ((*empty_chunk_result)->size(), 0u);
116
117 EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
118
119 // Send datagram second->first
120 EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
121 pair->second().PendReadyToWrite(cx));
122 PW_TEST_EXPECT_OK(pair->second().StageWrite({})); // Write empty datagram
123
124 EXPECT_EQ(pw::async2::Pending(), pair->second().PendReadyToWrite(cx));
125 EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
126
127 empty_chunk_result = pair->first().PendRead(cx);
128 EXPECT_TRUE(empty_chunk_result.IsReady());
129 EXPECT_TRUE(empty_chunk_result->ok());
130 EXPECT_EQ((*empty_chunk_result)->size(), 0u);
131
132 EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
133
134 test_completed += 1;
135 return pw::async2::Ready();
136 }
137 } test_task;
138
139 dispatcher.Post(test_task);
140
141 EXPECT_TRUE(dispatcher.RunUntilStalled().IsReady());
142 EXPECT_EQ(test_task.test_completed, 1);
143 }
144
TEST(ForwardingDatagramChannel,ForwardsNonEmptyDatagrams)145 TEST(ForwardingDatagramChannel, ForwardsNonEmptyDatagrams) {
146 pw::async2::Dispatcher dispatcher;
147
148 class : public pw::async2::Task {
149 public:
150 TestChannelPair<pw::channel::DataType::kDatagram> pair;
151
152 int test_completed = 0;
153
154 private:
155 pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
156 InitializedMultiBuf b1("Hello");
157 InitializedMultiBuf b2("world!");
158
159 // Send datagram first->second
160 EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
161 pair->first().PendReadyToWrite(cx));
162 PW_TEST_EXPECT_OK(pair->first().StageWrite(b1.Take()));
163
164 EXPECT_EQ(pw::async2::Pending(), pair->first().PendReadyToWrite(cx));
165
166 EXPECT_EQ(CopyToString(pair->second().PendRead(cx).value().value()),
167 "Hello");
168
169 EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
170 pair->first().PendReadyToWrite(cx));
171 EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
172
173 PW_TEST_EXPECT_OK(pair->first().StageWrite(b2.Take()));
174 EXPECT_EQ(CopyToString(pair->second().PendRead(cx).value().value()),
175 "world!");
176
177 EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
178 EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
179 pair->first().PendReadyToWrite(cx));
180
181 test_completed += 1;
182 return pw::async2::Ready();
183 }
184 } test_task;
185
186 dispatcher.Post(test_task);
187
188 EXPECT_TRUE(dispatcher.RunUntilStalled().IsReady());
189 EXPECT_EQ(test_task.test_completed, 1);
190 }
191
TEST(ForwardingDatagramChannel,ForwardsDatagrams)192 TEST(ForwardingDatagramChannel, ForwardsDatagrams) {
193 pw::async2::Dispatcher dispatcher;
194
195 class : public pw::async2::Task {
196 public:
197 TestChannelPair<pw::channel::DataType::kDatagram> pair;
198
199 int test_completed = 0;
200
201 private:
202 pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
203 // No data yet
204 EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
205 EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
206
207 // Send datagram first->second
208 EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
209 pair->first().PendReadyToWrite(cx));
210 PW_TEST_EXPECT_OK(pair->first().StageWrite({})); // Write empty datagram
211
212 EXPECT_EQ(pw::async2::Pending(), pair->first().PendReadyToWrite(cx));
213 EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
214
215 auto empty_chunk_result = pair->second().PendRead(cx);
216 EXPECT_TRUE(empty_chunk_result.IsReady());
217 EXPECT_TRUE(empty_chunk_result->ok());
218 EXPECT_EQ((*empty_chunk_result)->size(), 0u);
219
220 EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
221
222 // Send datagram second->first
223 EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
224 pair->second().PendReadyToWrite(cx));
225 PW_TEST_EXPECT_OK(pair->second().StageWrite({})); // Write empty datagram
226
227 EXPECT_EQ(pw::async2::Pending(), pair->second().PendReadyToWrite(cx));
228 EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
229
230 empty_chunk_result = pair->first().PendRead(cx);
231 EXPECT_TRUE(empty_chunk_result.IsReady());
232 EXPECT_TRUE(empty_chunk_result->ok());
233 EXPECT_EQ((*empty_chunk_result)->size(), 0u);
234
235 EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
236
237 test_completed += 1;
238 return pw::async2::Ready();
239 }
240 } test_task;
241
242 dispatcher.Post(test_task);
243
244 EXPECT_TRUE(dispatcher.RunUntilStalled().IsReady());
245 EXPECT_EQ(test_task.test_completed, 1);
246 }
247
TEST(ForwardingDatagramchannel,PendCloseAwakensAndClosesPeer)248 TEST(ForwardingDatagramchannel, PendCloseAwakensAndClosesPeer) {
249 class TryToReadUntilClosed : public Task {
250 public:
251 TryToReadUntilClosed(DatagramReader& reader) : reader_(reader) {}
252
253 int packets_read = 0;
254 Waker waker;
255
256 private:
257 pw::async2::Poll<> DoPend(Context& cx) final {
258 Poll<Result<MultiBuf>> read = reader_.PendRead(cx);
259 if (read.IsPending()) {
260 PW_ASYNC_STORE_WAKER(
261 cx, waker, "TryToReadUntilClosed is waiting for reader");
262 return Pending();
263 }
264
265 if (read->ok()) {
266 packets_read += 1;
267 EXPECT_TRUE(read->value().empty());
268 return Pending();
269 }
270 EXPECT_EQ(read->status(), pw::Status::FailedPrecondition());
271 return Ready();
272 }
273
274 DatagramReader& reader_;
275 };
276
277 pw::async2::Dispatcher dispatcher;
278 TestChannelPair<pw::channel::DataType::kDatagram> pair;
279 TryToReadUntilClosed read_task(pair->first());
280 dispatcher.Post(read_task);
281
282 EXPECT_EQ(dispatcher.RunUntilStalled(), Pending());
283 EXPECT_EQ(dispatcher.RunUntilStalled(), Pending());
284
285 Waker empty_waker;
286 Context empty_cx(dispatcher, empty_waker);
287
288 // Write a datagram, but close before the datagram is read.
289 EXPECT_EQ(pair->second().PendReadyToWrite(empty_cx), Ready(pw::OkStatus()));
290 PW_TEST_EXPECT_OK(pair->second().StageWrite({}));
291 EXPECT_EQ(pair->second().PendClose(empty_cx), Ready(pw::OkStatus()));
292 EXPECT_FALSE(pair->second().is_read_or_write_open());
293
294 // Closed second, so first is closed for writes, but still open for reads.
295 EXPECT_TRUE(pair->first().is_read_open());
296 EXPECT_FALSE(pair->first().is_write_open());
297
298 // First should read the packet and immediately be marked closed.
299 EXPECT_EQ(read_task.packets_read, 0);
300 EXPECT_EQ(dispatcher.RunUntilStalled(), Pending());
301 EXPECT_EQ(read_task.packets_read, 1);
302
303 EXPECT_FALSE(pair->first().is_read_or_write_open());
304
305 std::move(read_task.waker).Wake(); // wake the task so it runs again
306 EXPECT_EQ(dispatcher.RunUntilStalled(), Ready()); // runs to completion
307
308 EXPECT_FALSE(pair->first().is_read_or_write_open());
309 EXPECT_EQ(read_task.packets_read, 1);
310 }
311
TEST(ForwardingByteChannel,IgnoresEmptyWrites)312 TEST(ForwardingByteChannel, IgnoresEmptyWrites) {
313 pw::async2::Dispatcher dispatcher;
314
315 class : public pw::async2::Task {
316 public:
317 TestChannelPair<pw::channel::DataType::kByte> pair;
318
319 int test_completed = 0;
320
321 private:
322 pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
323 // No data yet
324 EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
325 EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
326
327 // Send nothing first->second
328 EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
329 pair->first().PendReadyToWrite(cx));
330 PW_TEST_EXPECT_OK(pair->first().StageWrite({}));
331
332 // Still no data
333 EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
334 EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
335
336 // Send nothing second->first
337 EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
338 pair->first().PendReadyToWrite(cx));
339 PW_TEST_EXPECT_OK(pair->first().StageWrite({}));
340
341 // Still no data
342 EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
343 EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
344
345 test_completed += 1;
346 return pw::async2::Ready();
347 }
348 } test_task;
349
350 dispatcher.Post(test_task);
351
352 EXPECT_TRUE(dispatcher.RunUntilStalled().IsReady());
353 EXPECT_EQ(test_task.test_completed, 1);
354 }
355
TEST(ForwardingByteChannel,WriteData)356 TEST(ForwardingByteChannel, WriteData) {
357 class ReadTask : public pw::async2::Task {
358 public:
359 ReadTask(pw::channel::ForwardingByteChannelPair& pair) : pair_(pair) {}
360
361 private:
362 pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
363 EXPECT_EQ(pw::async2::Pending(), pair_.first().PendRead(cx));
364
365 auto hello_world_result = pair_.second().PendRead(cx);
366 if (hello_world_result.IsPending()) {
367 return pw::async2::Pending();
368 }
369
370 EXPECT_EQ(CopyToString(hello_world_result->value()), "hello world");
371
372 return pw::async2::Ready();
373 }
374
375 pw::channel::ForwardingByteChannelPair& pair_;
376 };
377
378 class WriteTask : public pw::async2::Task {
379 public:
380 WriteTask(pw::channel::ForwardingByteChannelPair& pair, MultiBuf&& data)
381 : pair_(pair), data_(std::move(data)) {}
382
383 private:
384 pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
385 EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
386 pair_.first().PendReadyToWrite(cx));
387 EXPECT_EQ(pw::OkStatus(), pair_.first().StageWrite(std::move(data_)));
388 return pw::async2::Ready();
389 }
390
391 pw::channel::ForwardingByteChannelPair& pair_;
392 MultiBuf data_;
393 };
394
395 InitializedMultiBuf data("hello world");
396
397 TestChannelPair<pw::channel::DataType::kByte> pair;
398 ReadTask read_task(*pair);
399 WriteTask write_task(*pair, data.Take());
400
401 pw::async2::Dispatcher dispatcher;
402
403 dispatcher.Post(read_task);
404 ASSERT_FALSE(dispatcher.RunUntilStalled().IsReady());
405 ASSERT_FALSE(dispatcher.RunUntilStalled().IsReady());
406
407 dispatcher.Post(write_task);
408 ASSERT_TRUE(dispatcher.RunUntilStalled().IsReady());
409 }
410
TEST(ForwardingByteChannel,WriteDataInMultiplePieces)411 TEST(ForwardingByteChannel, WriteDataInMultiplePieces) {
412 pw::async2::Dispatcher dispatcher;
413
414 class : public pw::async2::Task {
415 public:
416 TestChannelPair<pw::channel::DataType::kByte> pair;
417
418 int test_completed = 0;
419
420 private:
421 pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
422 // No data yet
423 EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
424 EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
425
426 InitializedMultiBuf b1("hello");
427 InitializedMultiBuf b2(" ");
428 InitializedMultiBuf b3("world");
429
430 // Send "hello world" first->second
431 EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
432 pair->first().PendReadyToWrite(cx));
433 EXPECT_EQ(pw::OkStatus(), pair->first().StageWrite(b1.Take()));
434 EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
435 pair->first().PendReadyToWrite(cx));
436 EXPECT_EQ(pw::OkStatus(), pair->first().StageWrite(b2.Take()));
437 EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
438 pair->first().PendReadyToWrite(cx));
439 EXPECT_EQ(pw::OkStatus(), pair->first().StageWrite(b3.Take()));
440
441 EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
442
443 auto hello_world_result = pair->second().PendRead(cx);
444 EXPECT_TRUE(hello_world_result.IsReady());
445
446 EXPECT_EQ(CopyToString(hello_world_result->value()), "hello world");
447
448 // Send nothing second->first
449 EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
450 pair->first().PendReadyToWrite(cx));
451 EXPECT_EQ(pw::OkStatus(), pair->first().StageWrite({}));
452
453 // Still no data
454 EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
455 EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
456
457 test_completed += 1;
458 return pw::async2::Ready();
459 }
460 } test_task;
461
462 dispatcher.Post(test_task);
463
464 EXPECT_TRUE(dispatcher.RunUntilStalled().IsReady());
465 EXPECT_EQ(test_task.test_completed, 1);
466 }
467
TEST(ForwardingByteChannel,PendCloseAwakensAndClosesPeer)468 TEST(ForwardingByteChannel, PendCloseAwakensAndClosesPeer) {
469 class TryToReadUntilClosed : public Task {
470 public:
471 TryToReadUntilClosed(ByteReader& reader) : reader_(reader) {}
472
473 int bytes_read = 0;
474 Waker waker;
475
476 private:
477 pw::async2::Poll<> DoPend(Context& cx) final {
478 Poll<Result<MultiBuf>> read = reader_.PendRead(cx);
479 if (read.IsPending()) {
480 PW_ASYNC_STORE_WAKER(
481 cx, waker, "TryToReadUntilClosed is waiting for reader");
482 return Pending();
483 }
484
485 if (read->ok()) {
486 bytes_read += read->value().size();
487 EXPECT_EQ(read->value().size(), 5u);
488 return Pending();
489 }
490
491 EXPECT_EQ(read->status(), pw::Status::FailedPrecondition());
492 return Ready();
493 }
494 ByteReader& reader_;
495 };
496
497 pw::async2::Dispatcher dispatcher;
498 TestChannelPair<pw::channel::DataType::kByte> pair;
499 TryToReadUntilClosed read_task(pair->first());
500 dispatcher.Post(read_task);
501
502 EXPECT_EQ(dispatcher.RunUntilStalled(), Pending());
503 EXPECT_EQ(dispatcher.RunUntilStalled(), Pending());
504
505 Waker empty_waker;
506 Context empty_cx(dispatcher, empty_waker);
507
508 InitializedMultiBuf data("hello");
509
510 // Write a datagram, but close before the datagram is read.
511 EXPECT_EQ(pair->second().PendReadyToWrite(empty_cx), Ready(pw::OkStatus()));
512 EXPECT_EQ(pair->second().StageWrite(data.Take()), pw::OkStatus());
513 EXPECT_EQ(pair->second().PendClose(empty_cx), Ready(pw::OkStatus()));
514 EXPECT_FALSE(pair->second().is_read_or_write_open());
515
516 // Closed second, so first is closed for writes, but still open for reads.
517 EXPECT_TRUE(pair->first().is_read_open());
518 EXPECT_FALSE(pair->first().is_write_open());
519
520 // First should read the packet and immediately be marked closed.
521 EXPECT_EQ(read_task.bytes_read, 0);
522 EXPECT_EQ(dispatcher.RunUntilStalled(), Pending());
523 EXPECT_EQ(read_task.bytes_read, 5);
524
525 EXPECT_FALSE(pair->second().is_read_or_write_open());
526
527 std::move(read_task.waker).Wake(); // wake the task so it runs again
528 EXPECT_EQ(dispatcher.RunUntilStalled(), Ready()); // runs to completion
529
530 EXPECT_FALSE(pair->first().is_read_or_write_open());
531 EXPECT_EQ(read_task.bytes_read, 5);
532 }
533
534 } // namespace
535