xref: /aosp_15_r20/external/pigweed/pw_rpc/raw/server_reader_writer_test.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_rpc/raw/server_reader_writer.h"
16 
17 #include <optional>
18 
19 #include "pw_bytes/array.h"
20 #include "pw_bytes/span.h"
21 #include "pw_rpc/internal/lock.h"
22 #include "pw_rpc/raw/fake_channel_output.h"
23 #include "pw_rpc/service.h"
24 #include "pw_rpc/writer.h"
25 #include "pw_rpc_test_protos/test.raw_rpc.pb.h"
26 #include "pw_status/status.h"
27 #include "pw_status/status_with_size.h"
28 #include "pw_unit_test/framework.h"
29 
30 namespace pw::rpc {
31 namespace {
32 
33 class TestServiceImpl final
34     : public test::pw_rpc::raw::TestService::Service<TestServiceImpl> {
35  public:
TestUnaryRpc(ConstByteSpan,RawUnaryResponder &)36   static void TestUnaryRpc(ConstByteSpan, RawUnaryResponder&) {}
37 
TestAnotherUnaryRpc(ConstByteSpan,RawUnaryResponder &)38   void TestAnotherUnaryRpc(ConstByteSpan, RawUnaryResponder&) {}
39 
TestServerStreamRpc(ConstByteSpan,RawServerWriter &)40   void TestServerStreamRpc(ConstByteSpan, RawServerWriter&) {}
41 
TestClientStreamRpc(RawServerReader &)42   void TestClientStreamRpc(RawServerReader&) {}
43 
TestBidirectionalStreamRpc(RawServerReaderWriter &)44   void TestBidirectionalStreamRpc(RawServerReaderWriter&) {}
45 };
46 
47 struct ReaderWriterTestContext {
48   static constexpr uint32_t kChannelId = 1;
49 
ReaderWriterTestContextpw::rpc::__anon3ecdfdc60111::ReaderWriterTestContext50   ReaderWriterTestContext()
51       : channel(Channel::Create<kChannelId>(&output)),
52         server(span(&channel, 1)) {}
53 
54   TestServiceImpl service;
55   RawFakeChannelOutput<4> output;
56   Channel channel;
57   Server server;
58 };
59 
60 using test::pw_rpc::raw::TestService;
61 
TEST(RawUnaryResponder,DefaultConstructed)62 TEST(RawUnaryResponder, DefaultConstructed) {
63   RawUnaryResponder call;
64 
65   ASSERT_FALSE(call.active());
66   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
67 
68   EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
69 
70   call.set_on_error([](Status) {});
71 }
72 
TEST(RawServerWriter,DefaultConstructed)73 TEST(RawServerWriter, DefaultConstructed) {
74   RawServerWriter call;
75 
76   ASSERT_FALSE(call.active());
77   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
78 
79   EXPECT_EQ(Status::FailedPrecondition(), call.Write(ConstByteSpan()));
80   EXPECT_EQ(Status::FailedPrecondition(), call.Finish(OkStatus()));
81 
82   call.set_on_error([](Status) {});
83 }
84 
TEST(RawServerReader,DefaultConstructed)85 TEST(RawServerReader, DefaultConstructed) {
86   RawServerReader call;
87 
88   ASSERT_FALSE(call.active());
89   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
90 
91   EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
92 
93   call.set_on_next([](ConstByteSpan) {});
94   call.set_on_error([](Status) {});
95 }
96 
TEST(RawServerReaderWriter,DefaultConstructed)97 TEST(RawServerReaderWriter, DefaultConstructed) {
98   RawServerReaderWriter call;
99 
100   ASSERT_FALSE(call.active());
101   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
102 
103   EXPECT_EQ(Status::FailedPrecondition(), call.Write(ConstByteSpan()));
104   EXPECT_EQ(Status::FailedPrecondition(), call.Finish(Status::Cancelled()));
105 
106   call.set_on_next([](ConstByteSpan) {});
107   call.set_on_error([](Status) {});
108 }
109 
TEST(RawUnaryResponder,Closed)110 TEST(RawUnaryResponder, Closed) {
111   ReaderWriterTestContext ctx;
112   RawUnaryResponder call = RawUnaryResponder::Open<TestService::TestUnaryRpc>(
113       ctx.server, ctx.channel.id(), ctx.service);
114   ASSERT_EQ(OkStatus(), call.Finish({}, OkStatus()));
115 
116   ASSERT_FALSE(call.active());
117   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
118 
119   EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
120 
121   call.set_on_error([](Status) {});
122 }
123 
TEST(RawUnaryResponder,TryCloseFailed)124 TEST(RawUnaryResponder, TryCloseFailed) {
125   ReaderWriterTestContext ctx;
126   RawUnaryResponder call = RawUnaryResponder::Open<TestService::TestUnaryRpc>(
127       ctx.server, ctx.channel.id(), ctx.service);
128   // Sets ChannelOutput to always return false.
129   ctx.output.set_send_status(Status::Unknown());
130   ASSERT_EQ(Status::Unknown(), call.TryFinish({}, OkStatus()));
131 
132   // Call should be still alive.
133   ASSERT_TRUE(call.active());
134 }
135 
TEST(RawUnaryResponder,TryCloseSuccessful)136 TEST(RawUnaryResponder, TryCloseSuccessful) {
137   ReaderWriterTestContext ctx;
138   RawUnaryResponder call = RawUnaryResponder::Open<TestService::TestUnaryRpc>(
139       ctx.server, ctx.channel.id(), ctx.service);
140   // Sets ChannelOutput to always return false.
141   ctx.output.set_send_status(Status::Unknown());
142   ASSERT_EQ(Status::Unknown(), call.TryFinish({}, OkStatus()));
143 
144   // Call should be still alive.
145   ASSERT_TRUE(call.active());
146 
147   // Tries to close the call again, with ChannelOutput set to return ok.
148   ctx.output.set_send_status(OkStatus());
149   ASSERT_EQ(OkStatus(), call.TryFinish({}, OkStatus()));
150   // Call should be closed.
151   ASSERT_FALSE(call.active());
152 }
153 
TEST(RawUnaryResponder,FinishCallback_Successful)154 TEST(RawUnaryResponder, FinishCallback_Successful) {
155   ReaderWriterTestContext ctx;
156   RawUnaryResponder call = RawUnaryResponder::Open<TestService::TestUnaryRpc>(
157       ctx.server, ctx.channel.id(), ctx.service);
158 
159   constexpr auto kData = bytes::Initialized<8>(0xff);
160   ASSERT_EQ(OkStatus(),
161             call.FinishCallback(
162                 [&kData](ByteSpan buffer) {
163                   std::memcpy(buffer.data(), kData.data(), kData.size());
164                   return StatusWithSize(kData.size());
165                 },
166                 OkStatus()));
167 
168   EXPECT_EQ(std::memcmp(
169                 ctx.output.payloads<TestService::TestUnaryRpc>().back().data(),
170                 kData.data(),
171                 kData.size()),
172             0);
173   EXPECT_FALSE(call.active());
174 }
175 
TEST(RawUnaryResponder,TryFinishCallback_Successful)176 TEST(RawUnaryResponder, TryFinishCallback_Successful) {
177   ReaderWriterTestContext ctx;
178   RawUnaryResponder call = RawUnaryResponder::Open<TestService::TestUnaryRpc>(
179       ctx.server, ctx.channel.id(), ctx.service);
180 
181   constexpr auto kData = bytes::Initialized<8>(0xdd);
182   ASSERT_EQ(OkStatus(),
183             call.TryFinishCallback(
184                 [&kData](ByteSpan buffer) {
185                   std::memcpy(buffer.data(), kData.data(), kData.size());
186                   return StatusWithSize(kData.size());
187                 },
188                 OkStatus()));
189 
190   EXPECT_EQ(std::memcmp(
191                 ctx.output.payloads<TestService::TestUnaryRpc>().back().data(),
192                 kData.data(),
193                 kData.size()),
194             0);
195   EXPECT_FALSE(call.active());
196 }
197 
TEST(RawUnaryResponder,TryFinishCallback_ChannelError)198 TEST(RawUnaryResponder, TryFinishCallback_ChannelError) {
199   ReaderWriterTestContext ctx;
200   RawUnaryResponder call = RawUnaryResponder::Open<TestService::TestUnaryRpc>(
201       ctx.server, ctx.channel.id(), ctx.service);
202 
203   ctx.output.set_send_status(Status::Unknown());
204 
205   constexpr auto kData = bytes::Initialized<8>(0xdd);
206   ASSERT_EQ(Status::Unknown(),
207             call.TryFinishCallback(
208                 [&kData](ByteSpan buffer) {
209                   std::memcpy(buffer.data(), kData.data(), kData.size());
210                   return StatusWithSize(kData.size());
211                 },
212                 OkStatus()));
213 
214   // Call should remain active.
215   EXPECT_TRUE(call.active());
216 }
217 
TEST(RawServerWriter,Closed)218 TEST(RawServerWriter, Closed) {
219   ReaderWriterTestContext ctx;
220   RawServerWriter call =
221       RawServerWriter::Open<TestService::TestServerStreamRpc>(
222           ctx.server, ctx.channel.id(), ctx.service);
223   ASSERT_EQ(OkStatus(), call.Finish(OkStatus()));
224 
225   ASSERT_FALSE(call.active());
226   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
227 
228   EXPECT_EQ(Status::FailedPrecondition(), call.Write(ConstByteSpan()));
229   EXPECT_EQ(Status::FailedPrecondition(), call.Finish(OkStatus()));
230 
231   call.set_on_error([](Status) {});
232 }
233 
TEST(RawServerWriter,TryCloseFailed)234 TEST(RawServerWriter, TryCloseFailed) {
235   ReaderWriterTestContext ctx;
236   RawServerWriter call =
237       RawServerWriter::Open<TestService::TestServerStreamRpc>(
238           ctx.server, ctx.channel.id(), ctx.service);
239   // Sets ChannelOutput to always return false.
240   ctx.output.set_send_status(Status::Unknown());
241   ASSERT_EQ(Status::Unknown(), call.TryFinish(OkStatus()));
242 
243   // Call should be still alive.
244   ASSERT_TRUE(call.active());
245 }
246 
TEST(RawServerWriter,TryCloseSuccessful)247 TEST(RawServerWriter, TryCloseSuccessful) {
248   ReaderWriterTestContext ctx;
249   RawServerWriter call =
250       RawServerWriter::Open<TestService::TestServerStreamRpc>(
251           ctx.server, ctx.channel.id(), ctx.service);
252   // Sets ChannelOutput to always return false.
253   ctx.output.set_send_status(Status::Unknown());
254   ASSERT_EQ(Status::Unknown(), call.TryFinish(OkStatus()));
255 
256   // Call should be still alive.
257   ASSERT_TRUE(call.active());
258 
259   // Tries to close the call again, with ChannelOutput set to return ok.
260   ctx.output.set_send_status(OkStatus());
261   ASSERT_EQ(OkStatus(), call.TryFinish(OkStatus()));
262   // Call should be closed.
263   ASSERT_FALSE(call.active());
264 }
265 
TEST(RawServerReader,Closed)266 TEST(RawServerReader, Closed) {
267   ReaderWriterTestContext ctx;
268   RawServerReader call =
269       RawServerReader::Open<TestService::TestClientStreamRpc>(
270           ctx.server, ctx.channel.id(), ctx.service);
271   ASSERT_EQ(OkStatus(), call.Finish({}, OkStatus()));
272 
273   ASSERT_FALSE(call.active());
274   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
275 
276   EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
277 
278   call.set_on_next([](ConstByteSpan) {});
279   call.set_on_error([](Status) {});
280 }
281 
TEST(RawServerReader,TryCloseFailed)282 TEST(RawServerReader, TryCloseFailed) {
283   ReaderWriterTestContext ctx;
284   RawServerReader call =
285       RawServerReader::Open<TestService::TestClientStreamRpc>(
286           ctx.server, ctx.channel.id(), ctx.service);
287   // Sets ChannelOutput to always return false.
288   ctx.output.set_send_status(Status::Unknown());
289   ASSERT_EQ(Status::Unknown(), call.TryFinish({}, OkStatus()));
290 
291   // Call should be still alive.
292   ASSERT_TRUE(call.active());
293 }
294 
TEST(RawServerReader,TryCloseSuccessful)295 TEST(RawServerReader, TryCloseSuccessful) {
296   ReaderWriterTestContext ctx;
297   RawServerReader call =
298       RawServerReader::Open<TestService::TestClientStreamRpc>(
299           ctx.server, ctx.channel.id(), ctx.service);
300   // Sets ChannelOutput to always return false.
301   ctx.output.set_send_status(Status::Unknown());
302   ASSERT_EQ(Status::Unknown(), call.TryFinish({}, OkStatus()));
303 
304   // Call should be still alive.
305   ASSERT_TRUE(call.active());
306 
307   // Tries to close the call again, with ChannelOutput set to return ok.
308   ctx.output.set_send_status(OkStatus());
309   ASSERT_EQ(OkStatus(), call.TryFinish({}, OkStatus()));
310   // Call should be closed.
311   ASSERT_FALSE(call.active());
312 }
313 
TEST(RawServerReaderWriter,Closed)314 TEST(RawServerReaderWriter, Closed) {
315   ReaderWriterTestContext ctx;
316   RawServerReaderWriter call =
317       RawServerReaderWriter::Open<TestService::TestBidirectionalStreamRpc>(
318           ctx.server, ctx.channel.id(), ctx.service);
319   ASSERT_EQ(OkStatus(), call.Finish(OkStatus()));
320 
321   ASSERT_FALSE(call.active());
322   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
323 
324   EXPECT_EQ(Status::FailedPrecondition(), call.Write(ConstByteSpan()));
325   EXPECT_EQ(Status::FailedPrecondition(), call.Finish(Status::Cancelled()));
326 
327   call.set_on_next([](ConstByteSpan) {});
328   call.set_on_error([](Status) {});
329 }
330 
TEST(RawServerReaderWriter,TryCloseFailed)331 TEST(RawServerReaderWriter, TryCloseFailed) {
332   ReaderWriterTestContext ctx;
333   RawServerReaderWriter call =
334       RawServerReaderWriter::Open<TestService::TestBidirectionalStreamRpc>(
335           ctx.server, ctx.channel.id(), ctx.service);
336   // Sets ChannelOutput to always return false.
337   ctx.output.set_send_status(Status::Unknown());
338   ASSERT_EQ(Status::Unknown(), call.TryFinish(OkStatus()));
339 
340   // Call should be still alive.
341   ASSERT_TRUE(call.active());
342 }
343 
TEST(RawServerReaderWriter,TryCloseSuccessful)344 TEST(RawServerReaderWriter, TryCloseSuccessful) {
345   ReaderWriterTestContext ctx;
346   RawServerReaderWriter call =
347       RawServerReaderWriter::Open<TestService::TestBidirectionalStreamRpc>(
348           ctx.server, ctx.channel.id(), ctx.service);
349   // Sets ChannelOutput to always return false.
350   ctx.output.set_send_status(Status::Unknown());
351   ASSERT_EQ(Status::Unknown(), call.TryFinish(OkStatus()));
352 
353   // Call should be still alive.
354   ASSERT_TRUE(call.active());
355 
356   // Tries to close the call again, with ChannelOutput set to return ok.
357   ctx.output.set_send_status(OkStatus());
358   ASSERT_EQ(OkStatus(), call.TryFinish(OkStatus()));
359   // Call should be closed.
360   ASSERT_FALSE(call.active());
361 }
362 
TEST(RawUnaryResponder,Open_ReturnsUsableResponder)363 TEST(RawUnaryResponder, Open_ReturnsUsableResponder) {
364   ReaderWriterTestContext ctx;
365   RawUnaryResponder call = RawUnaryResponder::Open<TestService::TestUnaryRpc>(
366       ctx.server, ctx.channel.id(), ctx.service);
367 
368   EXPECT_EQ(call.channel_id(), ctx.channel.id());
369   EXPECT_EQ(OkStatus(), call.Finish(as_bytes(span("hello from pw_rpc"))));
370 
371   EXPECT_STREQ(
372       reinterpret_cast<const char*>(
373           ctx.output.payloads<TestService::TestUnaryRpc>().back().data()),
374       "hello from pw_rpc");
375 }
376 
TEST(RawServerReaderWriter,Open_UnknownChannel)377 TEST(RawServerReaderWriter, Open_UnknownChannel) {
378   ReaderWriterTestContext ctx;
379   ASSERT_EQ(OkStatus(), ctx.server.CloseChannel(ctx.kChannelId));
380 
381   RawServerReaderWriter call =
382       RawServerReaderWriter::Open<TestService::TestBidirectionalStreamRpc>(
383           ctx.server, ctx.kChannelId, ctx.service);
384 
385   EXPECT_TRUE(call.active());
386   EXPECT_EQ(call.channel_id(), ctx.kChannelId);
387   EXPECT_EQ(Status::Unavailable(), call.Write(ConstByteSpan()));
388 
389   ASSERT_EQ(OkStatus(), ctx.server.OpenChannel(ctx.kChannelId, ctx.output));
390 
391   EXPECT_EQ(OkStatus(), call.Write(ConstByteSpan()));
392   EXPECT_TRUE(call.active());
393 
394   EXPECT_EQ(OkStatus(), call.Finish());
395   EXPECT_FALSE(call.active());
396   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
397 }
398 
TEST(RawUnaryResponder,Open_MultipleTimes_CancelsPrevious)399 TEST(RawUnaryResponder, Open_MultipleTimes_CancelsPrevious) {
400   ReaderWriterTestContext ctx;
401 
402   RawUnaryResponder one = RawUnaryResponder::Open<TestService::TestUnaryRpc>(
403       ctx.server, ctx.channel.id(), ctx.service);
404 
405   std::optional<Status> error;
406   one.set_on_error([&error](Status status) { error = status; });
407 
408   ASSERT_TRUE(one.active());
409 
410   RawUnaryResponder two = RawUnaryResponder::Open<TestService::TestUnaryRpc>(
411       ctx.server, ctx.channel.id(), ctx.service);
412 
413   ASSERT_FALSE(one.active());
414   ASSERT_TRUE(two.active());
415 
416   EXPECT_EQ(Status::Cancelled(), error);
417 }
418 
TEST(RawServerWriter,Open_ReturnsUsableWriter)419 TEST(RawServerWriter, Open_ReturnsUsableWriter) {
420   ReaderWriterTestContext ctx;
421   RawServerWriter call =
422       RawServerWriter::Open<TestService::TestServerStreamRpc>(
423           ctx.server, ctx.channel.id(), ctx.service);
424 
425   EXPECT_EQ(call.channel_id(), ctx.channel.id());
426   EXPECT_EQ(OkStatus(), call.Write(as_bytes(span("321"))));
427 
428   EXPECT_STREQ(reinterpret_cast<const char*>(
429                    ctx.output.payloads<TestService::TestServerStreamRpc>()
430                        .back()
431                        .data()),
432                "321");
433 }
434 
TEST(RawServerReader,Open_ReturnsUsableReader)435 TEST(RawServerReader, Open_ReturnsUsableReader) {
436   ReaderWriterTestContext ctx;
437   RawServerReader call =
438       RawServerReader::Open<TestService::TestClientStreamRpc>(
439           ctx.server, ctx.channel.id(), ctx.service);
440 
441   EXPECT_EQ(call.channel_id(), ctx.channel.id());
442   EXPECT_EQ(OkStatus(), call.Finish(as_bytes(span("This is a message"))));
443 
444   EXPECT_STREQ(reinterpret_cast<const char*>(
445                    ctx.output.payloads<TestService::TestClientStreamRpc>()
446                        .back()
447                        .data()),
448                "This is a message");
449 }
450 
TEST(RawServerReaderWriter,Open_ReturnsUsableReaderWriter)451 TEST(RawServerReaderWriter, Open_ReturnsUsableReaderWriter) {
452   ReaderWriterTestContext ctx;
453   RawServerReaderWriter call =
454       RawServerReaderWriter::Open<TestService::TestBidirectionalStreamRpc>(
455           ctx.server, ctx.channel.id(), ctx.service);
456 
457   EXPECT_EQ(call.channel_id(), ctx.channel.id());
458   EXPECT_EQ(OkStatus(), call.Write(as_bytes(span("321"))));
459 
460   EXPECT_STREQ(
461       reinterpret_cast<const char*>(
462           ctx.output.payloads<TestService::TestBidirectionalStreamRpc>()
463               .back()
464               .data()),
465       "321");
466 }
467 
TEST(RawUnaryResponder,Move_FinishesActiveCall)468 TEST(RawUnaryResponder, Move_FinishesActiveCall) {
469   ReaderWriterTestContext ctx;
470   RawUnaryResponder active_call =
471       RawUnaryResponder::Open<TestService::TestUnaryRpc>(
472           ctx.server, ctx.channel.id(), ctx.service);
473 
474   RawUnaryResponder inactive_call;
475 
476   active_call = std::move(inactive_call);
477 
478   const auto completions = ctx.output.completions<TestService::TestUnaryRpc>();
479   ASSERT_EQ(completions.size(), 1u);
480   EXPECT_EQ(completions.back(), OkStatus());
481 }
482 
TEST(RawUnaryResponder,Move_DifferentActiveCalls_ClosesFirstOnly)483 TEST(RawUnaryResponder, Move_DifferentActiveCalls_ClosesFirstOnly) {
484   ReaderWriterTestContext ctx;
485   RawUnaryResponder active_call =
486       RawUnaryResponder::Open<TestService::TestUnaryRpc>(
487           ctx.server, ctx.channel.id(), ctx.service);
488 
489   RawUnaryResponder new_active_call =
490       RawUnaryResponder::Open<TestService::TestAnotherUnaryRpc>(
491           ctx.server, ctx.channel.id(), ctx.service);
492 
493   EXPECT_TRUE(active_call.active());
494   EXPECT_TRUE(new_active_call.active());
495 
496   active_call = std::move(new_active_call);
497 
498   const auto completions = ctx.output.completions<TestService::TestUnaryRpc>();
499   ASSERT_EQ(completions.size(), 1u);
500   EXPECT_EQ(completions.back(), OkStatus());
501 
502   EXPECT_TRUE(
503       ctx.output.completions<TestService::TestAnotherUnaryRpc>().empty());
504 }
505 
TEST(RawUnaryResponder,ReplaceActiveCall_DoesNotFinishCall)506 TEST(RawUnaryResponder, ReplaceActiveCall_DoesNotFinishCall) {
507   ReaderWriterTestContext ctx;
508   RawUnaryResponder active_call =
509       RawUnaryResponder::Open<TestService::TestUnaryRpc>(
510           ctx.server, ctx.channel.id(), ctx.service);
511 
512   RawUnaryResponder new_active_call =
513       RawUnaryResponder::Open<TestService::TestUnaryRpc>(
514           ctx.server, ctx.channel.id(), ctx.service);
515 
516   active_call = std::move(new_active_call);
517 
518   ASSERT_TRUE(ctx.output.completions<TestService::TestUnaryRpc>().empty());
519 
520   constexpr const char kData[] = "Some data!";
521   EXPECT_EQ(
522       OkStatus(),
523       active_call.Finish(as_bytes(span(kData)), Status::InvalidArgument()));
524 
525   EXPECT_STREQ(
526       reinterpret_cast<const char*>(
527           ctx.output.payloads<TestService::TestUnaryRpc>().back().data()),
528       kData);
529 
530   const auto completions = ctx.output.completions<TestService::TestUnaryRpc>();
531   ASSERT_EQ(completions.size(), 1u);
532   EXPECT_EQ(completions.back(), Status::InvalidArgument());
533 }
534 
TEST(RawUnaryResponder,OutOfScope_FinishesActiveCall)535 TEST(RawUnaryResponder, OutOfScope_FinishesActiveCall) {
536   ReaderWriterTestContext ctx;
537 
538   {
539     RawUnaryResponder call = RawUnaryResponder::Open<TestService::TestUnaryRpc>(
540         ctx.server, ctx.channel.id(), ctx.service);
541     ASSERT_TRUE(ctx.output.completions<TestService::TestUnaryRpc>().empty());
542   }
543 
544   const auto completions = ctx.output.completions<TestService::TestUnaryRpc>();
545   ASSERT_EQ(completions.size(), 1u);
546   EXPECT_EQ(completions.back(), OkStatus());
547 }
548 
TEST(RawServerWriter,Move_InactiveToActive_FinishesActiveCall)549 TEST(RawServerWriter, Move_InactiveToActive_FinishesActiveCall) {
550   ReaderWriterTestContext ctx;
551   RawServerWriter active_call =
552       RawServerWriter::Open<TestService::TestServerStreamRpc>(
553           ctx.server, ctx.channel.id(), ctx.service);
554 
555   RawServerWriter inactive_call;
556 
557   active_call = std::move(inactive_call);
558 
559   const auto completions =
560       ctx.output.completions<TestService::TestServerStreamRpc>();
561   ASSERT_EQ(completions.size(), 1u);
562   EXPECT_EQ(completions.back(), OkStatus());
563 }
564 
TEST(RawServerWriter,ReplaceActiveCall_DoesNotFinishCall)565 TEST(RawServerWriter, ReplaceActiveCall_DoesNotFinishCall) {
566   ReaderWriterTestContext ctx;
567   RawServerWriter active_call =
568       RawServerWriter::Open<TestService::TestServerStreamRpc>(
569           ctx.server, ctx.channel.id(), ctx.service);
570 
571   RawServerWriter new_active_call =
572       RawServerWriter::Open<TestService::TestServerStreamRpc>(
573           ctx.server, ctx.channel.id(), ctx.service);
574 
575   active_call = std::move(new_active_call);
576 
577   ASSERT_TRUE(
578       ctx.output.completions<TestService::TestServerStreamRpc>().empty());
579 
580   constexpr const char kData[] = "Some data!";
581   EXPECT_EQ(OkStatus(), active_call.Write(as_bytes(span(kData))));
582 
583   EXPECT_STREQ(reinterpret_cast<const char*>(
584                    ctx.output.payloads<TestService::TestServerStreamRpc>()
585                        .back()
586                        .data()),
587                kData);
588 }
589 
590 constexpr const char kWriterData[] = "20X6";
591 
WriteAsWriter(Writer & writer)592 void WriteAsWriter(Writer& writer) {
593   ASSERT_TRUE(writer.active());
594   ASSERT_EQ(writer.channel_id(), ReaderWriterTestContext::kChannelId);
595 
596   EXPECT_EQ(OkStatus(), writer.Write(as_bytes(span(kWriterData))));
597 }
598 
TEST(RawServerWriter,UsableAsWriter)599 TEST(RawServerWriter, UsableAsWriter) {
600   ReaderWriterTestContext ctx;
601   RawServerWriter call =
602       RawServerWriter::Open<TestService::TestServerStreamRpc>(
603           ctx.server, ctx.channel.id(), ctx.service);
604 
605   WriteAsWriter(call.as_writer());
606 
607   EXPECT_STREQ(reinterpret_cast<const char*>(
608                    ctx.output.payloads<TestService::TestServerStreamRpc>()
609                        .back()
610                        .data()),
611                kWriterData);
612 }
613 
TEST(RawServerReaderWriter,UsableAsWriter)614 TEST(RawServerReaderWriter, UsableAsWriter) {
615   ReaderWriterTestContext ctx;
616   RawServerReaderWriter call =
617       RawServerReaderWriter::Open<TestService::TestBidirectionalStreamRpc>(
618           ctx.server, ctx.channel.id(), ctx.service);
619 
620   WriteAsWriter(call.as_writer());
621 
622   EXPECT_STREQ(
623       reinterpret_cast<const char*>(
624           ctx.output.payloads<TestService::TestBidirectionalStreamRpc>()
625               .back()
626               .data()),
627       kWriterData);
628 }
629 
630 }  // namespace
631 }  // namespace pw::rpc
632