1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_rpc/raw/server_reader_writer.h"
16
17 #include <optional>
18
19 #include "pw_bytes/array.h"
20 #include "pw_bytes/span.h"
21 #include "pw_rpc/internal/lock.h"
22 #include "pw_rpc/raw/fake_channel_output.h"
23 #include "pw_rpc/service.h"
24 #include "pw_rpc/writer.h"
25 #include "pw_rpc_test_protos/test.raw_rpc.pb.h"
26 #include "pw_status/status.h"
27 #include "pw_status/status_with_size.h"
28 #include "pw_unit_test/framework.h"
29
30 namespace pw::rpc {
31 namespace {
32
33 class TestServiceImpl final
34 : public test::pw_rpc::raw::TestService::Service<TestServiceImpl> {
35 public:
TestUnaryRpc(ConstByteSpan,RawUnaryResponder &)36 static void TestUnaryRpc(ConstByteSpan, RawUnaryResponder&) {}
37
TestAnotherUnaryRpc(ConstByteSpan,RawUnaryResponder &)38 void TestAnotherUnaryRpc(ConstByteSpan, RawUnaryResponder&) {}
39
TestServerStreamRpc(ConstByteSpan,RawServerWriter &)40 void TestServerStreamRpc(ConstByteSpan, RawServerWriter&) {}
41
TestClientStreamRpc(RawServerReader &)42 void TestClientStreamRpc(RawServerReader&) {}
43
TestBidirectionalStreamRpc(RawServerReaderWriter &)44 void TestBidirectionalStreamRpc(RawServerReaderWriter&) {}
45 };
46
47 struct ReaderWriterTestContext {
48 static constexpr uint32_t kChannelId = 1;
49
ReaderWriterTestContextpw::rpc::__anon3ecdfdc60111::ReaderWriterTestContext50 ReaderWriterTestContext()
51 : channel(Channel::Create<kChannelId>(&output)),
52 server(span(&channel, 1)) {}
53
54 TestServiceImpl service;
55 RawFakeChannelOutput<4> output;
56 Channel channel;
57 Server server;
58 };
59
60 using test::pw_rpc::raw::TestService;
61
TEST(RawUnaryResponder,DefaultConstructed)62 TEST(RawUnaryResponder, DefaultConstructed) {
63 RawUnaryResponder call;
64
65 ASSERT_FALSE(call.active());
66 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
67
68 EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
69
70 call.set_on_error([](Status) {});
71 }
72
TEST(RawServerWriter,DefaultConstructed)73 TEST(RawServerWriter, DefaultConstructed) {
74 RawServerWriter call;
75
76 ASSERT_FALSE(call.active());
77 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
78
79 EXPECT_EQ(Status::FailedPrecondition(), call.Write(ConstByteSpan()));
80 EXPECT_EQ(Status::FailedPrecondition(), call.Finish(OkStatus()));
81
82 call.set_on_error([](Status) {});
83 }
84
TEST(RawServerReader,DefaultConstructed)85 TEST(RawServerReader, DefaultConstructed) {
86 RawServerReader call;
87
88 ASSERT_FALSE(call.active());
89 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
90
91 EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
92
93 call.set_on_next([](ConstByteSpan) {});
94 call.set_on_error([](Status) {});
95 }
96
TEST(RawServerReaderWriter,DefaultConstructed)97 TEST(RawServerReaderWriter, DefaultConstructed) {
98 RawServerReaderWriter call;
99
100 ASSERT_FALSE(call.active());
101 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
102
103 EXPECT_EQ(Status::FailedPrecondition(), call.Write(ConstByteSpan()));
104 EXPECT_EQ(Status::FailedPrecondition(), call.Finish(Status::Cancelled()));
105
106 call.set_on_next([](ConstByteSpan) {});
107 call.set_on_error([](Status) {});
108 }
109
TEST(RawUnaryResponder,Closed)110 TEST(RawUnaryResponder, Closed) {
111 ReaderWriterTestContext ctx;
112 RawUnaryResponder call = RawUnaryResponder::Open<TestService::TestUnaryRpc>(
113 ctx.server, ctx.channel.id(), ctx.service);
114 ASSERT_EQ(OkStatus(), call.Finish({}, OkStatus()));
115
116 ASSERT_FALSE(call.active());
117 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
118
119 EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
120
121 call.set_on_error([](Status) {});
122 }
123
TEST(RawUnaryResponder,TryCloseFailed)124 TEST(RawUnaryResponder, TryCloseFailed) {
125 ReaderWriterTestContext ctx;
126 RawUnaryResponder call = RawUnaryResponder::Open<TestService::TestUnaryRpc>(
127 ctx.server, ctx.channel.id(), ctx.service);
128 // Sets ChannelOutput to always return false.
129 ctx.output.set_send_status(Status::Unknown());
130 ASSERT_EQ(Status::Unknown(), call.TryFinish({}, OkStatus()));
131
132 // Call should be still alive.
133 ASSERT_TRUE(call.active());
134 }
135
TEST(RawUnaryResponder,TryCloseSuccessful)136 TEST(RawUnaryResponder, TryCloseSuccessful) {
137 ReaderWriterTestContext ctx;
138 RawUnaryResponder call = RawUnaryResponder::Open<TestService::TestUnaryRpc>(
139 ctx.server, ctx.channel.id(), ctx.service);
140 // Sets ChannelOutput to always return false.
141 ctx.output.set_send_status(Status::Unknown());
142 ASSERT_EQ(Status::Unknown(), call.TryFinish({}, OkStatus()));
143
144 // Call should be still alive.
145 ASSERT_TRUE(call.active());
146
147 // Tries to close the call again, with ChannelOutput set to return ok.
148 ctx.output.set_send_status(OkStatus());
149 ASSERT_EQ(OkStatus(), call.TryFinish({}, OkStatus()));
150 // Call should be closed.
151 ASSERT_FALSE(call.active());
152 }
153
TEST(RawUnaryResponder,FinishCallback_Successful)154 TEST(RawUnaryResponder, FinishCallback_Successful) {
155 ReaderWriterTestContext ctx;
156 RawUnaryResponder call = RawUnaryResponder::Open<TestService::TestUnaryRpc>(
157 ctx.server, ctx.channel.id(), ctx.service);
158
159 constexpr auto kData = bytes::Initialized<8>(0xff);
160 ASSERT_EQ(OkStatus(),
161 call.FinishCallback(
162 [&kData](ByteSpan buffer) {
163 std::memcpy(buffer.data(), kData.data(), kData.size());
164 return StatusWithSize(kData.size());
165 },
166 OkStatus()));
167
168 EXPECT_EQ(std::memcmp(
169 ctx.output.payloads<TestService::TestUnaryRpc>().back().data(),
170 kData.data(),
171 kData.size()),
172 0);
173 EXPECT_FALSE(call.active());
174 }
175
TEST(RawUnaryResponder,TryFinishCallback_Successful)176 TEST(RawUnaryResponder, TryFinishCallback_Successful) {
177 ReaderWriterTestContext ctx;
178 RawUnaryResponder call = RawUnaryResponder::Open<TestService::TestUnaryRpc>(
179 ctx.server, ctx.channel.id(), ctx.service);
180
181 constexpr auto kData = bytes::Initialized<8>(0xdd);
182 ASSERT_EQ(OkStatus(),
183 call.TryFinishCallback(
184 [&kData](ByteSpan buffer) {
185 std::memcpy(buffer.data(), kData.data(), kData.size());
186 return StatusWithSize(kData.size());
187 },
188 OkStatus()));
189
190 EXPECT_EQ(std::memcmp(
191 ctx.output.payloads<TestService::TestUnaryRpc>().back().data(),
192 kData.data(),
193 kData.size()),
194 0);
195 EXPECT_FALSE(call.active());
196 }
197
TEST(RawUnaryResponder,TryFinishCallback_ChannelError)198 TEST(RawUnaryResponder, TryFinishCallback_ChannelError) {
199 ReaderWriterTestContext ctx;
200 RawUnaryResponder call = RawUnaryResponder::Open<TestService::TestUnaryRpc>(
201 ctx.server, ctx.channel.id(), ctx.service);
202
203 ctx.output.set_send_status(Status::Unknown());
204
205 constexpr auto kData = bytes::Initialized<8>(0xdd);
206 ASSERT_EQ(Status::Unknown(),
207 call.TryFinishCallback(
208 [&kData](ByteSpan buffer) {
209 std::memcpy(buffer.data(), kData.data(), kData.size());
210 return StatusWithSize(kData.size());
211 },
212 OkStatus()));
213
214 // Call should remain active.
215 EXPECT_TRUE(call.active());
216 }
217
TEST(RawServerWriter,Closed)218 TEST(RawServerWriter, Closed) {
219 ReaderWriterTestContext ctx;
220 RawServerWriter call =
221 RawServerWriter::Open<TestService::TestServerStreamRpc>(
222 ctx.server, ctx.channel.id(), ctx.service);
223 ASSERT_EQ(OkStatus(), call.Finish(OkStatus()));
224
225 ASSERT_FALSE(call.active());
226 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
227
228 EXPECT_EQ(Status::FailedPrecondition(), call.Write(ConstByteSpan()));
229 EXPECT_EQ(Status::FailedPrecondition(), call.Finish(OkStatus()));
230
231 call.set_on_error([](Status) {});
232 }
233
TEST(RawServerWriter,TryCloseFailed)234 TEST(RawServerWriter, TryCloseFailed) {
235 ReaderWriterTestContext ctx;
236 RawServerWriter call =
237 RawServerWriter::Open<TestService::TestServerStreamRpc>(
238 ctx.server, ctx.channel.id(), ctx.service);
239 // Sets ChannelOutput to always return false.
240 ctx.output.set_send_status(Status::Unknown());
241 ASSERT_EQ(Status::Unknown(), call.TryFinish(OkStatus()));
242
243 // Call should be still alive.
244 ASSERT_TRUE(call.active());
245 }
246
TEST(RawServerWriter,TryCloseSuccessful)247 TEST(RawServerWriter, TryCloseSuccessful) {
248 ReaderWriterTestContext ctx;
249 RawServerWriter call =
250 RawServerWriter::Open<TestService::TestServerStreamRpc>(
251 ctx.server, ctx.channel.id(), ctx.service);
252 // Sets ChannelOutput to always return false.
253 ctx.output.set_send_status(Status::Unknown());
254 ASSERT_EQ(Status::Unknown(), call.TryFinish(OkStatus()));
255
256 // Call should be still alive.
257 ASSERT_TRUE(call.active());
258
259 // Tries to close the call again, with ChannelOutput set to return ok.
260 ctx.output.set_send_status(OkStatus());
261 ASSERT_EQ(OkStatus(), call.TryFinish(OkStatus()));
262 // Call should be closed.
263 ASSERT_FALSE(call.active());
264 }
265
TEST(RawServerReader,Closed)266 TEST(RawServerReader, Closed) {
267 ReaderWriterTestContext ctx;
268 RawServerReader call =
269 RawServerReader::Open<TestService::TestClientStreamRpc>(
270 ctx.server, ctx.channel.id(), ctx.service);
271 ASSERT_EQ(OkStatus(), call.Finish({}, OkStatus()));
272
273 ASSERT_FALSE(call.active());
274 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
275
276 EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
277
278 call.set_on_next([](ConstByteSpan) {});
279 call.set_on_error([](Status) {});
280 }
281
TEST(RawServerReader,TryCloseFailed)282 TEST(RawServerReader, TryCloseFailed) {
283 ReaderWriterTestContext ctx;
284 RawServerReader call =
285 RawServerReader::Open<TestService::TestClientStreamRpc>(
286 ctx.server, ctx.channel.id(), ctx.service);
287 // Sets ChannelOutput to always return false.
288 ctx.output.set_send_status(Status::Unknown());
289 ASSERT_EQ(Status::Unknown(), call.TryFinish({}, OkStatus()));
290
291 // Call should be still alive.
292 ASSERT_TRUE(call.active());
293 }
294
TEST(RawServerReader,TryCloseSuccessful)295 TEST(RawServerReader, TryCloseSuccessful) {
296 ReaderWriterTestContext ctx;
297 RawServerReader call =
298 RawServerReader::Open<TestService::TestClientStreamRpc>(
299 ctx.server, ctx.channel.id(), ctx.service);
300 // Sets ChannelOutput to always return false.
301 ctx.output.set_send_status(Status::Unknown());
302 ASSERT_EQ(Status::Unknown(), call.TryFinish({}, OkStatus()));
303
304 // Call should be still alive.
305 ASSERT_TRUE(call.active());
306
307 // Tries to close the call again, with ChannelOutput set to return ok.
308 ctx.output.set_send_status(OkStatus());
309 ASSERT_EQ(OkStatus(), call.TryFinish({}, OkStatus()));
310 // Call should be closed.
311 ASSERT_FALSE(call.active());
312 }
313
TEST(RawServerReaderWriter,Closed)314 TEST(RawServerReaderWriter, Closed) {
315 ReaderWriterTestContext ctx;
316 RawServerReaderWriter call =
317 RawServerReaderWriter::Open<TestService::TestBidirectionalStreamRpc>(
318 ctx.server, ctx.channel.id(), ctx.service);
319 ASSERT_EQ(OkStatus(), call.Finish(OkStatus()));
320
321 ASSERT_FALSE(call.active());
322 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
323
324 EXPECT_EQ(Status::FailedPrecondition(), call.Write(ConstByteSpan()));
325 EXPECT_EQ(Status::FailedPrecondition(), call.Finish(Status::Cancelled()));
326
327 call.set_on_next([](ConstByteSpan) {});
328 call.set_on_error([](Status) {});
329 }
330
TEST(RawServerReaderWriter,TryCloseFailed)331 TEST(RawServerReaderWriter, TryCloseFailed) {
332 ReaderWriterTestContext ctx;
333 RawServerReaderWriter call =
334 RawServerReaderWriter::Open<TestService::TestBidirectionalStreamRpc>(
335 ctx.server, ctx.channel.id(), ctx.service);
336 // Sets ChannelOutput to always return false.
337 ctx.output.set_send_status(Status::Unknown());
338 ASSERT_EQ(Status::Unknown(), call.TryFinish(OkStatus()));
339
340 // Call should be still alive.
341 ASSERT_TRUE(call.active());
342 }
343
TEST(RawServerReaderWriter,TryCloseSuccessful)344 TEST(RawServerReaderWriter, TryCloseSuccessful) {
345 ReaderWriterTestContext ctx;
346 RawServerReaderWriter call =
347 RawServerReaderWriter::Open<TestService::TestBidirectionalStreamRpc>(
348 ctx.server, ctx.channel.id(), ctx.service);
349 // Sets ChannelOutput to always return false.
350 ctx.output.set_send_status(Status::Unknown());
351 ASSERT_EQ(Status::Unknown(), call.TryFinish(OkStatus()));
352
353 // Call should be still alive.
354 ASSERT_TRUE(call.active());
355
356 // Tries to close the call again, with ChannelOutput set to return ok.
357 ctx.output.set_send_status(OkStatus());
358 ASSERT_EQ(OkStatus(), call.TryFinish(OkStatus()));
359 // Call should be closed.
360 ASSERT_FALSE(call.active());
361 }
362
TEST(RawUnaryResponder,Open_ReturnsUsableResponder)363 TEST(RawUnaryResponder, Open_ReturnsUsableResponder) {
364 ReaderWriterTestContext ctx;
365 RawUnaryResponder call = RawUnaryResponder::Open<TestService::TestUnaryRpc>(
366 ctx.server, ctx.channel.id(), ctx.service);
367
368 EXPECT_EQ(call.channel_id(), ctx.channel.id());
369 EXPECT_EQ(OkStatus(), call.Finish(as_bytes(span("hello from pw_rpc"))));
370
371 EXPECT_STREQ(
372 reinterpret_cast<const char*>(
373 ctx.output.payloads<TestService::TestUnaryRpc>().back().data()),
374 "hello from pw_rpc");
375 }
376
TEST(RawServerReaderWriter,Open_UnknownChannel)377 TEST(RawServerReaderWriter, Open_UnknownChannel) {
378 ReaderWriterTestContext ctx;
379 ASSERT_EQ(OkStatus(), ctx.server.CloseChannel(ctx.kChannelId));
380
381 RawServerReaderWriter call =
382 RawServerReaderWriter::Open<TestService::TestBidirectionalStreamRpc>(
383 ctx.server, ctx.kChannelId, ctx.service);
384
385 EXPECT_TRUE(call.active());
386 EXPECT_EQ(call.channel_id(), ctx.kChannelId);
387 EXPECT_EQ(Status::Unavailable(), call.Write(ConstByteSpan()));
388
389 ASSERT_EQ(OkStatus(), ctx.server.OpenChannel(ctx.kChannelId, ctx.output));
390
391 EXPECT_EQ(OkStatus(), call.Write(ConstByteSpan()));
392 EXPECT_TRUE(call.active());
393
394 EXPECT_EQ(OkStatus(), call.Finish());
395 EXPECT_FALSE(call.active());
396 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
397 }
398
TEST(RawUnaryResponder,Open_MultipleTimes_CancelsPrevious)399 TEST(RawUnaryResponder, Open_MultipleTimes_CancelsPrevious) {
400 ReaderWriterTestContext ctx;
401
402 RawUnaryResponder one = RawUnaryResponder::Open<TestService::TestUnaryRpc>(
403 ctx.server, ctx.channel.id(), ctx.service);
404
405 std::optional<Status> error;
406 one.set_on_error([&error](Status status) { error = status; });
407
408 ASSERT_TRUE(one.active());
409
410 RawUnaryResponder two = RawUnaryResponder::Open<TestService::TestUnaryRpc>(
411 ctx.server, ctx.channel.id(), ctx.service);
412
413 ASSERT_FALSE(one.active());
414 ASSERT_TRUE(two.active());
415
416 EXPECT_EQ(Status::Cancelled(), error);
417 }
418
TEST(RawServerWriter,Open_ReturnsUsableWriter)419 TEST(RawServerWriter, Open_ReturnsUsableWriter) {
420 ReaderWriterTestContext ctx;
421 RawServerWriter call =
422 RawServerWriter::Open<TestService::TestServerStreamRpc>(
423 ctx.server, ctx.channel.id(), ctx.service);
424
425 EXPECT_EQ(call.channel_id(), ctx.channel.id());
426 EXPECT_EQ(OkStatus(), call.Write(as_bytes(span("321"))));
427
428 EXPECT_STREQ(reinterpret_cast<const char*>(
429 ctx.output.payloads<TestService::TestServerStreamRpc>()
430 .back()
431 .data()),
432 "321");
433 }
434
TEST(RawServerReader,Open_ReturnsUsableReader)435 TEST(RawServerReader, Open_ReturnsUsableReader) {
436 ReaderWriterTestContext ctx;
437 RawServerReader call =
438 RawServerReader::Open<TestService::TestClientStreamRpc>(
439 ctx.server, ctx.channel.id(), ctx.service);
440
441 EXPECT_EQ(call.channel_id(), ctx.channel.id());
442 EXPECT_EQ(OkStatus(), call.Finish(as_bytes(span("This is a message"))));
443
444 EXPECT_STREQ(reinterpret_cast<const char*>(
445 ctx.output.payloads<TestService::TestClientStreamRpc>()
446 .back()
447 .data()),
448 "This is a message");
449 }
450
TEST(RawServerReaderWriter,Open_ReturnsUsableReaderWriter)451 TEST(RawServerReaderWriter, Open_ReturnsUsableReaderWriter) {
452 ReaderWriterTestContext ctx;
453 RawServerReaderWriter call =
454 RawServerReaderWriter::Open<TestService::TestBidirectionalStreamRpc>(
455 ctx.server, ctx.channel.id(), ctx.service);
456
457 EXPECT_EQ(call.channel_id(), ctx.channel.id());
458 EXPECT_EQ(OkStatus(), call.Write(as_bytes(span("321"))));
459
460 EXPECT_STREQ(
461 reinterpret_cast<const char*>(
462 ctx.output.payloads<TestService::TestBidirectionalStreamRpc>()
463 .back()
464 .data()),
465 "321");
466 }
467
TEST(RawUnaryResponder,Move_FinishesActiveCall)468 TEST(RawUnaryResponder, Move_FinishesActiveCall) {
469 ReaderWriterTestContext ctx;
470 RawUnaryResponder active_call =
471 RawUnaryResponder::Open<TestService::TestUnaryRpc>(
472 ctx.server, ctx.channel.id(), ctx.service);
473
474 RawUnaryResponder inactive_call;
475
476 active_call = std::move(inactive_call);
477
478 const auto completions = ctx.output.completions<TestService::TestUnaryRpc>();
479 ASSERT_EQ(completions.size(), 1u);
480 EXPECT_EQ(completions.back(), OkStatus());
481 }
482
TEST(RawUnaryResponder,Move_DifferentActiveCalls_ClosesFirstOnly)483 TEST(RawUnaryResponder, Move_DifferentActiveCalls_ClosesFirstOnly) {
484 ReaderWriterTestContext ctx;
485 RawUnaryResponder active_call =
486 RawUnaryResponder::Open<TestService::TestUnaryRpc>(
487 ctx.server, ctx.channel.id(), ctx.service);
488
489 RawUnaryResponder new_active_call =
490 RawUnaryResponder::Open<TestService::TestAnotherUnaryRpc>(
491 ctx.server, ctx.channel.id(), ctx.service);
492
493 EXPECT_TRUE(active_call.active());
494 EXPECT_TRUE(new_active_call.active());
495
496 active_call = std::move(new_active_call);
497
498 const auto completions = ctx.output.completions<TestService::TestUnaryRpc>();
499 ASSERT_EQ(completions.size(), 1u);
500 EXPECT_EQ(completions.back(), OkStatus());
501
502 EXPECT_TRUE(
503 ctx.output.completions<TestService::TestAnotherUnaryRpc>().empty());
504 }
505
TEST(RawUnaryResponder,ReplaceActiveCall_DoesNotFinishCall)506 TEST(RawUnaryResponder, ReplaceActiveCall_DoesNotFinishCall) {
507 ReaderWriterTestContext ctx;
508 RawUnaryResponder active_call =
509 RawUnaryResponder::Open<TestService::TestUnaryRpc>(
510 ctx.server, ctx.channel.id(), ctx.service);
511
512 RawUnaryResponder new_active_call =
513 RawUnaryResponder::Open<TestService::TestUnaryRpc>(
514 ctx.server, ctx.channel.id(), ctx.service);
515
516 active_call = std::move(new_active_call);
517
518 ASSERT_TRUE(ctx.output.completions<TestService::TestUnaryRpc>().empty());
519
520 constexpr const char kData[] = "Some data!";
521 EXPECT_EQ(
522 OkStatus(),
523 active_call.Finish(as_bytes(span(kData)), Status::InvalidArgument()));
524
525 EXPECT_STREQ(
526 reinterpret_cast<const char*>(
527 ctx.output.payloads<TestService::TestUnaryRpc>().back().data()),
528 kData);
529
530 const auto completions = ctx.output.completions<TestService::TestUnaryRpc>();
531 ASSERT_EQ(completions.size(), 1u);
532 EXPECT_EQ(completions.back(), Status::InvalidArgument());
533 }
534
TEST(RawUnaryResponder,OutOfScope_FinishesActiveCall)535 TEST(RawUnaryResponder, OutOfScope_FinishesActiveCall) {
536 ReaderWriterTestContext ctx;
537
538 {
539 RawUnaryResponder call = RawUnaryResponder::Open<TestService::TestUnaryRpc>(
540 ctx.server, ctx.channel.id(), ctx.service);
541 ASSERT_TRUE(ctx.output.completions<TestService::TestUnaryRpc>().empty());
542 }
543
544 const auto completions = ctx.output.completions<TestService::TestUnaryRpc>();
545 ASSERT_EQ(completions.size(), 1u);
546 EXPECT_EQ(completions.back(), OkStatus());
547 }
548
TEST(RawServerWriter,Move_InactiveToActive_FinishesActiveCall)549 TEST(RawServerWriter, Move_InactiveToActive_FinishesActiveCall) {
550 ReaderWriterTestContext ctx;
551 RawServerWriter active_call =
552 RawServerWriter::Open<TestService::TestServerStreamRpc>(
553 ctx.server, ctx.channel.id(), ctx.service);
554
555 RawServerWriter inactive_call;
556
557 active_call = std::move(inactive_call);
558
559 const auto completions =
560 ctx.output.completions<TestService::TestServerStreamRpc>();
561 ASSERT_EQ(completions.size(), 1u);
562 EXPECT_EQ(completions.back(), OkStatus());
563 }
564
TEST(RawServerWriter,ReplaceActiveCall_DoesNotFinishCall)565 TEST(RawServerWriter, ReplaceActiveCall_DoesNotFinishCall) {
566 ReaderWriterTestContext ctx;
567 RawServerWriter active_call =
568 RawServerWriter::Open<TestService::TestServerStreamRpc>(
569 ctx.server, ctx.channel.id(), ctx.service);
570
571 RawServerWriter new_active_call =
572 RawServerWriter::Open<TestService::TestServerStreamRpc>(
573 ctx.server, ctx.channel.id(), ctx.service);
574
575 active_call = std::move(new_active_call);
576
577 ASSERT_TRUE(
578 ctx.output.completions<TestService::TestServerStreamRpc>().empty());
579
580 constexpr const char kData[] = "Some data!";
581 EXPECT_EQ(OkStatus(), active_call.Write(as_bytes(span(kData))));
582
583 EXPECT_STREQ(reinterpret_cast<const char*>(
584 ctx.output.payloads<TestService::TestServerStreamRpc>()
585 .back()
586 .data()),
587 kData);
588 }
589
590 constexpr const char kWriterData[] = "20X6";
591
WriteAsWriter(Writer & writer)592 void WriteAsWriter(Writer& writer) {
593 ASSERT_TRUE(writer.active());
594 ASSERT_EQ(writer.channel_id(), ReaderWriterTestContext::kChannelId);
595
596 EXPECT_EQ(OkStatus(), writer.Write(as_bytes(span(kWriterData))));
597 }
598
TEST(RawServerWriter,UsableAsWriter)599 TEST(RawServerWriter, UsableAsWriter) {
600 ReaderWriterTestContext ctx;
601 RawServerWriter call =
602 RawServerWriter::Open<TestService::TestServerStreamRpc>(
603 ctx.server, ctx.channel.id(), ctx.service);
604
605 WriteAsWriter(call.as_writer());
606
607 EXPECT_STREQ(reinterpret_cast<const char*>(
608 ctx.output.payloads<TestService::TestServerStreamRpc>()
609 .back()
610 .data()),
611 kWriterData);
612 }
613
TEST(RawServerReaderWriter,UsableAsWriter)614 TEST(RawServerReaderWriter, UsableAsWriter) {
615 ReaderWriterTestContext ctx;
616 RawServerReaderWriter call =
617 RawServerReaderWriter::Open<TestService::TestBidirectionalStreamRpc>(
618 ctx.server, ctx.channel.id(), ctx.service);
619
620 WriteAsWriter(call.as_writer());
621
622 EXPECT_STREQ(
623 reinterpret_cast<const char*>(
624 ctx.output.payloads<TestService::TestBidirectionalStreamRpc>()
625 .back()
626 .data()),
627 kWriterData);
628 }
629
630 } // namespace
631 } // namespace pw::rpc
632