xref: /aosp_15_r20/external/pigweed/pw_rpc/nanopb/server_reader_writer_test.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_rpc/nanopb/server_reader_writer.h"
16 
17 #include <optional>
18 
19 #include "pw_rpc/nanopb/fake_channel_output.h"
20 #include "pw_rpc/nanopb/test_method_context.h"
21 #include "pw_rpc/service.h"
22 #include "pw_rpc_test_protos/test.rpc.pb.h"
23 #include "pw_unit_test/framework.h"
24 
25 namespace pw::rpc {
26 namespace {
27 
28 class TestServiceImpl final
29     : public test::pw_rpc::nanopb::TestService::Service<TestServiceImpl> {
30  public:
TestUnaryRpc(const pw_rpc_test_TestRequest &,pw_rpc_test_TestResponse &)31   Status TestUnaryRpc(const pw_rpc_test_TestRequest&,
32                       pw_rpc_test_TestResponse&) {
33     return OkStatus();
34   }
35 
TestAnotherUnaryRpc(const pw_rpc_test_TestRequest &,NanopbUnaryResponder<pw_rpc_test_TestResponse> &)36   void TestAnotherUnaryRpc(const pw_rpc_test_TestRequest&,
37                            NanopbUnaryResponder<pw_rpc_test_TestResponse>&) {}
38 
TestServerStreamRpc(const pw_rpc_test_TestRequest &,NanopbServerWriter<pw_rpc_test_TestStreamResponse> &)39   void TestServerStreamRpc(
40       const pw_rpc_test_TestRequest&,
41       NanopbServerWriter<pw_rpc_test_TestStreamResponse>&) {}
42 
TestClientStreamRpc(NanopbServerReader<pw_rpc_test_TestRequest,pw_rpc_test_TestStreamResponse> &)43   void TestClientStreamRpc(
44       NanopbServerReader<pw_rpc_test_TestRequest,
45                          pw_rpc_test_TestStreamResponse>&) {}
46 
TestBidirectionalStreamRpc(NanopbServerReaderWriter<pw_rpc_test_TestRequest,pw_rpc_test_TestStreamResponse> &)47   void TestBidirectionalStreamRpc(
48       NanopbServerReaderWriter<pw_rpc_test_TestRequest,
49                                pw_rpc_test_TestStreamResponse>&) {}
50 };
51 
52 template <auto kMethod>
53 struct ReaderWriterTestContext {
54   using Info = internal::MethodInfo<kMethod>;
55 
56   static constexpr uint32_t kChannelId = 1;
57 
ReaderWriterTestContextpw::rpc::__anonfa544afa0111::ReaderWriterTestContext58   ReaderWriterTestContext()
59       : channel(Channel::Create<kChannelId>(&output)),
60         server(span(&channel, 1)) {}
61 
62   TestServiceImpl service;
63   NanopbFakeChannelOutput<4> output;
64   Channel channel;
65   Server server;
66 };
67 
68 using test::pw_rpc::nanopb::TestService;
69 
TEST(NanopbUnaryResponder,DefaultConstructed)70 TEST(NanopbUnaryResponder, DefaultConstructed) {
71   NanopbUnaryResponder<pw_rpc_test_TestResponse> call;
72 
73   ASSERT_FALSE(call.active());
74   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
75 
76   EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
77 
78   call.set_on_error([](Status) {});
79 }
80 
TEST(NanopbServerWriter,DefaultConstructed)81 TEST(NanopbServerWriter, DefaultConstructed) {
82   NanopbServerWriter<pw_rpc_test_TestStreamResponse> call;
83 
84   ASSERT_FALSE(call.active());
85   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
86 
87   EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
88   EXPECT_EQ(Status::FailedPrecondition(), call.Finish(OkStatus()));
89 
90   call.set_on_error([](Status) {});
91 }
92 
TEST(NanopbServerReader,DefaultConstructed)93 TEST(NanopbServerReader, DefaultConstructed) {
94   NanopbServerReader<pw_rpc_test_TestRequest, pw_rpc_test_TestStreamResponse>
95       call;
96 
97   ASSERT_FALSE(call.active());
98   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
99 
100   EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
101 
102   call.set_on_next([](const pw_rpc_test_TestRequest&) {});
103   call.set_on_error([](Status) {});
104 }
105 
TEST(NanopbServerReaderWriter,DefaultConstructed)106 TEST(NanopbServerReaderWriter, DefaultConstructed) {
107   NanopbServerReaderWriter<pw_rpc_test_TestRequest,
108                            pw_rpc_test_TestStreamResponse>
109       call;
110 
111   ASSERT_FALSE(call.active());
112   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
113 
114   EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
115   EXPECT_EQ(Status::FailedPrecondition(), call.Finish(OkStatus()));
116 
117   call.set_on_next([](const pw_rpc_test_TestRequest&) {});
118   call.set_on_error([](Status) {});
119 }
120 
TEST(NanopbUnaryResponder,Closed)121 TEST(NanopbUnaryResponder, Closed) {
122   ReaderWriterTestContext<TestService::TestUnaryRpc> ctx;
123   NanopbUnaryResponder call =
124       NanopbUnaryResponder<pw_rpc_test_TestResponse>::Open<
125           TestService::TestUnaryRpc>(ctx.server, ctx.channel.id(), ctx.service);
126   ASSERT_EQ(OkStatus(), call.Finish({}, OkStatus()));
127 
128   ASSERT_FALSE(call.active());
129   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
130 
131   EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
132 
133   call.set_on_error([](Status) {});
134 }
135 
TEST(NanopbServerWriter,Closed)136 TEST(NanopbServerWriter, Closed) {
137   ReaderWriterTestContext<TestService::TestServerStreamRpc> ctx;
138   NanopbServerWriter call =
139       NanopbServerWriter<pw_rpc_test_TestStreamResponse>::Open<
140           TestService::TestServerStreamRpc>(
141           ctx.server, ctx.channel.id(), ctx.service);
142   ASSERT_EQ(OkStatus(), call.Finish(OkStatus()));
143 
144   ASSERT_FALSE(call.active());
145   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
146 
147   EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
148   EXPECT_EQ(Status::FailedPrecondition(), call.Finish(OkStatus()));
149 
150   call.set_on_error([](Status) {});
151 }
152 
TEST(NanopbServerWriter,TryClosedFailed)153 TEST(NanopbServerWriter, TryClosedFailed) {
154   ReaderWriterTestContext<TestService::TestServerStreamRpc> ctx;
155   NanopbServerWriter call =
156       NanopbServerWriter<pw_rpc_test_TestStreamResponse>::Open<
157           TestService::TestServerStreamRpc>(
158           ctx.server, ctx.channel.id(), ctx.service);
159   // Sets ChannelOutput to always return false.
160   ctx.output.set_send_status(Status::Unknown());
161   ASSERT_EQ(Status::Unknown(), call.TryFinish(OkStatus()));
162 
163   // Call should be still alive.
164   ASSERT_TRUE(call.active());
165 }
166 
TEST(NanopbServerWriter,TryCloseSuccessful)167 TEST(NanopbServerWriter, TryCloseSuccessful) {
168   ReaderWriterTestContext<TestService::TestServerStreamRpc> ctx;
169   NanopbServerWriter call =
170       NanopbServerWriter<pw_rpc_test_TestStreamResponse>::Open<
171           TestService::TestServerStreamRpc>(
172           ctx.server, ctx.channel.id(), ctx.service);
173   // Sets ChannelOutput to always return false.
174   ctx.output.set_send_status(Status::Unknown());
175   ASSERT_EQ(Status::Unknown(), call.TryFinish(OkStatus()));
176 
177   // Call should be still alive.
178   ASSERT_TRUE(call.active());
179 
180   // Tries to close the call again, with ChannelOutput set to return ok.
181   ctx.output.set_send_status(OkStatus());
182   ASSERT_EQ(OkStatus(), call.TryFinish(OkStatus()));
183   // Call should be closed.
184   ASSERT_FALSE(call.active());
185 }
186 
TEST(NanopbServerReader,Closed)187 TEST(NanopbServerReader, Closed) {
188   ReaderWriterTestContext<TestService::TestClientStreamRpc> ctx;
189   NanopbServerReader call = NanopbServerReader<pw_rpc_test_TestRequest,
190                                                pw_rpc_test_TestStreamResponse>::
191       Open<TestService::TestClientStreamRpc>(
192           ctx.server, ctx.channel.id(), ctx.service);
193   ASSERT_EQ(OkStatus(), call.Finish({}, OkStatus()));
194 
195   ASSERT_FALSE(call.active());
196   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
197 
198   EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
199 
200   call.set_on_next([](const pw_rpc_test_TestRequest&) {});
201   call.set_on_error([](Status) {});
202 }
203 
TEST(NanopbServerReader,TryClosedFailed)204 TEST(NanopbServerReader, TryClosedFailed) {
205   ReaderWriterTestContext<TestService::TestClientStreamRpc> ctx;
206   NanopbServerReader call = NanopbServerReader<pw_rpc_test_TestRequest,
207                                                pw_rpc_test_TestStreamResponse>::
208       Open<TestService::TestClientStreamRpc>(
209           ctx.server, ctx.channel.id(), ctx.service);
210   // Sets ChannelOutput to always return false.
211   ctx.output.set_send_status(Status::Unknown());
212   ASSERT_EQ(Status::Unknown(), call.TryFinish({}, OkStatus()));
213 
214   // Call should be still alive.
215   ASSERT_TRUE(call.active());
216 }
217 
TEST(NanopbServerReader,TryCloseSuccessful)218 TEST(NanopbServerReader, TryCloseSuccessful) {
219   ReaderWriterTestContext<TestService::TestClientStreamRpc> ctx;
220   NanopbServerReader call = NanopbServerReader<pw_rpc_test_TestRequest,
221                                                pw_rpc_test_TestStreamResponse>::
222       Open<TestService::TestClientStreamRpc>(
223           ctx.server, ctx.channel.id(), ctx.service);
224   // Sets ChannelOutput to always return false.
225   ctx.output.set_send_status(Status::Unknown());
226   ASSERT_EQ(Status::Unknown(), call.TryFinish({}, OkStatus()));
227 
228   // Call should be still alive.
229   ASSERT_TRUE(call.active());
230 
231   // Tries to close the call again, with ChannelOutput set to return ok.
232   ctx.output.set_send_status(OkStatus());
233   ASSERT_EQ(OkStatus(), call.TryFinish({}, OkStatus()));
234   // Call should be closed.
235   ASSERT_FALSE(call.active());
236 }
237 
TEST(NanopbServerReaderWriter,Closed)238 TEST(NanopbServerReaderWriter, Closed) {
239   ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
240   NanopbServerReaderWriter call =
241       NanopbServerReaderWriter<pw_rpc_test_TestRequest,
242                                pw_rpc_test_TestStreamResponse>::
243           Open<TestService::TestBidirectionalStreamRpc>(
244               ctx.server, ctx.channel.id(), ctx.service);
245   ASSERT_EQ(OkStatus(), call.Finish(OkStatus()));
246 
247   ASSERT_FALSE(call.active());
248   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
249 
250   EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
251   EXPECT_EQ(Status::FailedPrecondition(), call.Finish(OkStatus()));
252 
253   call.set_on_next([](const pw_rpc_test_TestRequest&) {});
254   call.set_on_error([](Status) {});
255 }
256 
TEST(NanopbServerReaderWriter,TryClosedFailed)257 TEST(NanopbServerReaderWriter, TryClosedFailed) {
258   ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
259   NanopbServerReaderWriter call =
260       NanopbServerReaderWriter<pw_rpc_test_TestRequest,
261                                pw_rpc_test_TestStreamResponse>::
262           Open<TestService::TestBidirectionalStreamRpc>(
263               ctx.server, ctx.channel.id(), ctx.service);
264   // Sets ChannelOutput to always return false.
265   ctx.output.set_send_status(Status::Unknown());
266   ASSERT_EQ(Status::Unknown(), call.TryFinish(OkStatus()));
267 
268   // Call should be still alive.
269   ASSERT_TRUE(call.active());
270 }
271 
TEST(NanopbServerReaderWriter,TryCloseSuccessful)272 TEST(NanopbServerReaderWriter, TryCloseSuccessful) {
273   ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
274   NanopbServerReaderWriter call =
275       NanopbServerReaderWriter<pw_rpc_test_TestRequest,
276                                pw_rpc_test_TestStreamResponse>::
277           Open<TestService::TestBidirectionalStreamRpc>(
278               ctx.server, ctx.channel.id(), ctx.service);
279   // Sets ChannelOutput to always return false.
280   ctx.output.set_send_status(Status::Unknown());
281   ASSERT_EQ(Status::Unknown(), call.TryFinish(OkStatus()));
282 
283   // Call should be still alive.
284   ASSERT_TRUE(call.active());
285 
286   // Tries to close the call again, with ChannelOutput set to return ok.
287   ctx.output.set_send_status(OkStatus());
288   ASSERT_EQ(OkStatus(), call.TryFinish(OkStatus()));
289   // Call should be closed.
290   ASSERT_FALSE(call.active());
291 }
292 
TEST(NanopbUnaryResponder,Open_ReturnsUsableResponder)293 TEST(NanopbUnaryResponder, Open_ReturnsUsableResponder) {
294   ReaderWriterTestContext<TestService::TestUnaryRpc> ctx;
295   NanopbUnaryResponder responder =
296       NanopbUnaryResponder<pw_rpc_test_TestResponse>::Open<
297           TestService::TestUnaryRpc>(ctx.server, ctx.channel.id(), ctx.service);
298 
299   ASSERT_EQ(OkStatus(),
300             responder.Finish({.value = 4321, .repeated_field = {}}));
301 
302   EXPECT_EQ(ctx.output.last_response<TestService::TestUnaryRpc>().value, 4321);
303   EXPECT_EQ(ctx.output.last_status(), OkStatus());
304 }
305 
TEST(NanopbServerWriter,Open_ReturnsUsableWriter)306 TEST(NanopbServerWriter, Open_ReturnsUsableWriter) {
307   ReaderWriterTestContext<TestService::TestServerStreamRpc> ctx;
308   NanopbServerWriter responder =
309       NanopbServerWriter<pw_rpc_test_TestStreamResponse>::Open<
310           TestService::TestServerStreamRpc>(
311           ctx.server, ctx.channel.id(), ctx.service);
312 
313   ASSERT_EQ(OkStatus(), responder.Write({.chunk = {}, .number = 321}));
314   ASSERT_EQ(OkStatus(), responder.Finish());
315 
316   EXPECT_EQ(ctx.output.last_response<TestService::TestServerStreamRpc>().number,
317             321u);
318   EXPECT_EQ(ctx.output.last_status(), OkStatus());
319 }
320 
TEST(NanopbServerReader,Open_ReturnsUsableReader)321 TEST(NanopbServerReader, Open_ReturnsUsableReader) {
322   ReaderWriterTestContext<TestService::TestClientStreamRpc> ctx;
323   NanopbServerReader responder =
324       NanopbServerReader<pw_rpc_test_TestRequest,
325                          pw_rpc_test_TestStreamResponse>::
326           Open<TestService::TestClientStreamRpc>(
327               ctx.server, ctx.channel.id(), ctx.service);
328 
329   ASSERT_EQ(OkStatus(), responder.Finish({.chunk = {}, .number = 321}));
330 
331   EXPECT_EQ(ctx.output.last_response<TestService::TestClientStreamRpc>().number,
332             321u);
333 }
334 
TEST(NanopbServerReaderWriter,Open_ReturnsUsableReaderWriter)335 TEST(NanopbServerReaderWriter, Open_ReturnsUsableReaderWriter) {
336   ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
337   NanopbServerReaderWriter responder =
338       NanopbServerReaderWriter<pw_rpc_test_TestRequest,
339                                pw_rpc_test_TestStreamResponse>::
340           Open<TestService::TestBidirectionalStreamRpc>(
341               ctx.server, ctx.channel.id(), ctx.service);
342 
343   ASSERT_EQ(OkStatus(), responder.Write({.chunk = {}, .number = 321}));
344   ASSERT_EQ(OkStatus(), responder.Finish(Status::NotFound()));
345 
346   EXPECT_EQ(ctx.output.last_response<TestService::TestBidirectionalStreamRpc>()
347                 .number,
348             321u);
349   EXPECT_EQ(ctx.output.last_status(), Status::NotFound());
350 }
351 
TEST(RawServerReaderWriter,Open_UnknownChannel)352 TEST(RawServerReaderWriter, Open_UnknownChannel) {
353   ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
354   ASSERT_EQ(OkStatus(), ctx.server.CloseChannel(ctx.kChannelId));
355 
356   NanopbServerReaderWriter call =
357       NanopbServerReaderWriter<pw_rpc_test_TestRequest,
358                                pw_rpc_test_TestStreamResponse>::
359           Open<TestService::TestBidirectionalStreamRpc>(
360               ctx.server, ctx.kChannelId, ctx.service);
361 
362   EXPECT_TRUE(call.active());
363   EXPECT_EQ(call.channel_id(), ctx.kChannelId);
364   EXPECT_EQ(Status::Unavailable(), call.Write({}));
365 
366   ASSERT_EQ(OkStatus(), ctx.server.OpenChannel(ctx.kChannelId, ctx.output));
367 
368   EXPECT_EQ(OkStatus(), call.Write({}));
369   EXPECT_TRUE(call.active());
370 
371   EXPECT_EQ(OkStatus(), call.Finish());
372   EXPECT_FALSE(call.active());
373   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
374 }
375 
TEST(RawServerReaderWriter,Open_MultipleTimes_CancelsPrevious)376 TEST(RawServerReaderWriter, Open_MultipleTimes_CancelsPrevious) {
377   ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
378 
379   NanopbServerReaderWriter one =
380       NanopbServerReaderWriter<pw_rpc_test_TestRequest,
381                                pw_rpc_test_TestStreamResponse>::
382           Open<TestService::TestBidirectionalStreamRpc>(
383               ctx.server, ctx.kChannelId, ctx.service);
384 
385   std::optional<Status> error;
386   one.set_on_error([&error](Status status) { error = status; });
387 
388   ASSERT_TRUE(one.active());
389 
390   NanopbServerReaderWriter two =
391       NanopbServerReaderWriter<pw_rpc_test_TestRequest,
392                                pw_rpc_test_TestStreamResponse>::
393           Open<TestService::TestBidirectionalStreamRpc>(
394               ctx.server, ctx.kChannelId, ctx.service);
395 
396   EXPECT_FALSE(one.active());
397   EXPECT_TRUE(two.active());
398 
399   EXPECT_EQ(Status::Cancelled(), error);
400 }
401 
TEST(NanopbServerReader,CallbacksMoveCorrectly)402 TEST(NanopbServerReader, CallbacksMoveCorrectly) {
403   PW_NANOPB_TEST_METHOD_CONTEXT(TestServiceImpl, TestClientStreamRpc) ctx;
404 
405   NanopbServerReader call_1 = ctx.reader();
406 
407   ASSERT_TRUE(call_1.active());
408 
409   pw_rpc_test_TestRequest received_request = {.integer = 12345678,
410                                               .status_code = 1};
411 
412   call_1.set_on_next([&received_request](const pw_rpc_test_TestRequest& value) {
413     received_request = value;
414   });
415 
416   NanopbServerReader<pw_rpc_test_TestRequest, pw_rpc_test_TestStreamResponse>
417       call_2;
418   call_2 = std::move(call_1);
419 
420   constexpr pw_rpc_test_TestRequest request{.integer = 600613,
421                                             .status_code = 2};
422   ctx.SendClientStream(request);
423   EXPECT_EQ(request.integer, received_request.integer);
424   EXPECT_EQ(request.status_code, received_request.status_code);
425 }
426 
427 }  // namespace
428 }  // namespace pw::rpc
429