1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_rpc/nanopb/server_reader_writer.h"
16
17 #include <optional>
18
19 #include "pw_rpc/nanopb/fake_channel_output.h"
20 #include "pw_rpc/nanopb/test_method_context.h"
21 #include "pw_rpc/service.h"
22 #include "pw_rpc_test_protos/test.rpc.pb.h"
23 #include "pw_unit_test/framework.h"
24
25 namespace pw::rpc {
26 namespace {
27
28 class TestServiceImpl final
29 : public test::pw_rpc::nanopb::TestService::Service<TestServiceImpl> {
30 public:
TestUnaryRpc(const pw_rpc_test_TestRequest &,pw_rpc_test_TestResponse &)31 Status TestUnaryRpc(const pw_rpc_test_TestRequest&,
32 pw_rpc_test_TestResponse&) {
33 return OkStatus();
34 }
35
TestAnotherUnaryRpc(const pw_rpc_test_TestRequest &,NanopbUnaryResponder<pw_rpc_test_TestResponse> &)36 void TestAnotherUnaryRpc(const pw_rpc_test_TestRequest&,
37 NanopbUnaryResponder<pw_rpc_test_TestResponse>&) {}
38
TestServerStreamRpc(const pw_rpc_test_TestRequest &,NanopbServerWriter<pw_rpc_test_TestStreamResponse> &)39 void TestServerStreamRpc(
40 const pw_rpc_test_TestRequest&,
41 NanopbServerWriter<pw_rpc_test_TestStreamResponse>&) {}
42
TestClientStreamRpc(NanopbServerReader<pw_rpc_test_TestRequest,pw_rpc_test_TestStreamResponse> &)43 void TestClientStreamRpc(
44 NanopbServerReader<pw_rpc_test_TestRequest,
45 pw_rpc_test_TestStreamResponse>&) {}
46
TestBidirectionalStreamRpc(NanopbServerReaderWriter<pw_rpc_test_TestRequest,pw_rpc_test_TestStreamResponse> &)47 void TestBidirectionalStreamRpc(
48 NanopbServerReaderWriter<pw_rpc_test_TestRequest,
49 pw_rpc_test_TestStreamResponse>&) {}
50 };
51
52 template <auto kMethod>
53 struct ReaderWriterTestContext {
54 using Info = internal::MethodInfo<kMethod>;
55
56 static constexpr uint32_t kChannelId = 1;
57
ReaderWriterTestContextpw::rpc::__anonfa544afa0111::ReaderWriterTestContext58 ReaderWriterTestContext()
59 : channel(Channel::Create<kChannelId>(&output)),
60 server(span(&channel, 1)) {}
61
62 TestServiceImpl service;
63 NanopbFakeChannelOutput<4> output;
64 Channel channel;
65 Server server;
66 };
67
68 using test::pw_rpc::nanopb::TestService;
69
TEST(NanopbUnaryResponder,DefaultConstructed)70 TEST(NanopbUnaryResponder, DefaultConstructed) {
71 NanopbUnaryResponder<pw_rpc_test_TestResponse> call;
72
73 ASSERT_FALSE(call.active());
74 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
75
76 EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
77
78 call.set_on_error([](Status) {});
79 }
80
TEST(NanopbServerWriter,DefaultConstructed)81 TEST(NanopbServerWriter, DefaultConstructed) {
82 NanopbServerWriter<pw_rpc_test_TestStreamResponse> call;
83
84 ASSERT_FALSE(call.active());
85 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
86
87 EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
88 EXPECT_EQ(Status::FailedPrecondition(), call.Finish(OkStatus()));
89
90 call.set_on_error([](Status) {});
91 }
92
TEST(NanopbServerReader,DefaultConstructed)93 TEST(NanopbServerReader, DefaultConstructed) {
94 NanopbServerReader<pw_rpc_test_TestRequest, pw_rpc_test_TestStreamResponse>
95 call;
96
97 ASSERT_FALSE(call.active());
98 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
99
100 EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
101
102 call.set_on_next([](const pw_rpc_test_TestRequest&) {});
103 call.set_on_error([](Status) {});
104 }
105
TEST(NanopbServerReaderWriter,DefaultConstructed)106 TEST(NanopbServerReaderWriter, DefaultConstructed) {
107 NanopbServerReaderWriter<pw_rpc_test_TestRequest,
108 pw_rpc_test_TestStreamResponse>
109 call;
110
111 ASSERT_FALSE(call.active());
112 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
113
114 EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
115 EXPECT_EQ(Status::FailedPrecondition(), call.Finish(OkStatus()));
116
117 call.set_on_next([](const pw_rpc_test_TestRequest&) {});
118 call.set_on_error([](Status) {});
119 }
120
TEST(NanopbUnaryResponder,Closed)121 TEST(NanopbUnaryResponder, Closed) {
122 ReaderWriterTestContext<TestService::TestUnaryRpc> ctx;
123 NanopbUnaryResponder call =
124 NanopbUnaryResponder<pw_rpc_test_TestResponse>::Open<
125 TestService::TestUnaryRpc>(ctx.server, ctx.channel.id(), ctx.service);
126 ASSERT_EQ(OkStatus(), call.Finish({}, OkStatus()));
127
128 ASSERT_FALSE(call.active());
129 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
130
131 EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
132
133 call.set_on_error([](Status) {});
134 }
135
TEST(NanopbServerWriter,Closed)136 TEST(NanopbServerWriter, Closed) {
137 ReaderWriterTestContext<TestService::TestServerStreamRpc> ctx;
138 NanopbServerWriter call =
139 NanopbServerWriter<pw_rpc_test_TestStreamResponse>::Open<
140 TestService::TestServerStreamRpc>(
141 ctx.server, ctx.channel.id(), ctx.service);
142 ASSERT_EQ(OkStatus(), call.Finish(OkStatus()));
143
144 ASSERT_FALSE(call.active());
145 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
146
147 EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
148 EXPECT_EQ(Status::FailedPrecondition(), call.Finish(OkStatus()));
149
150 call.set_on_error([](Status) {});
151 }
152
TEST(NanopbServerWriter,TryClosedFailed)153 TEST(NanopbServerWriter, TryClosedFailed) {
154 ReaderWriterTestContext<TestService::TestServerStreamRpc> ctx;
155 NanopbServerWriter call =
156 NanopbServerWriter<pw_rpc_test_TestStreamResponse>::Open<
157 TestService::TestServerStreamRpc>(
158 ctx.server, ctx.channel.id(), ctx.service);
159 // Sets ChannelOutput to always return false.
160 ctx.output.set_send_status(Status::Unknown());
161 ASSERT_EQ(Status::Unknown(), call.TryFinish(OkStatus()));
162
163 // Call should be still alive.
164 ASSERT_TRUE(call.active());
165 }
166
TEST(NanopbServerWriter,TryCloseSuccessful)167 TEST(NanopbServerWriter, TryCloseSuccessful) {
168 ReaderWriterTestContext<TestService::TestServerStreamRpc> ctx;
169 NanopbServerWriter call =
170 NanopbServerWriter<pw_rpc_test_TestStreamResponse>::Open<
171 TestService::TestServerStreamRpc>(
172 ctx.server, ctx.channel.id(), ctx.service);
173 // Sets ChannelOutput to always return false.
174 ctx.output.set_send_status(Status::Unknown());
175 ASSERT_EQ(Status::Unknown(), call.TryFinish(OkStatus()));
176
177 // Call should be still alive.
178 ASSERT_TRUE(call.active());
179
180 // Tries to close the call again, with ChannelOutput set to return ok.
181 ctx.output.set_send_status(OkStatus());
182 ASSERT_EQ(OkStatus(), call.TryFinish(OkStatus()));
183 // Call should be closed.
184 ASSERT_FALSE(call.active());
185 }
186
TEST(NanopbServerReader,Closed)187 TEST(NanopbServerReader, Closed) {
188 ReaderWriterTestContext<TestService::TestClientStreamRpc> ctx;
189 NanopbServerReader call = NanopbServerReader<pw_rpc_test_TestRequest,
190 pw_rpc_test_TestStreamResponse>::
191 Open<TestService::TestClientStreamRpc>(
192 ctx.server, ctx.channel.id(), ctx.service);
193 ASSERT_EQ(OkStatus(), call.Finish({}, OkStatus()));
194
195 ASSERT_FALSE(call.active());
196 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
197
198 EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
199
200 call.set_on_next([](const pw_rpc_test_TestRequest&) {});
201 call.set_on_error([](Status) {});
202 }
203
TEST(NanopbServerReader,TryClosedFailed)204 TEST(NanopbServerReader, TryClosedFailed) {
205 ReaderWriterTestContext<TestService::TestClientStreamRpc> ctx;
206 NanopbServerReader call = NanopbServerReader<pw_rpc_test_TestRequest,
207 pw_rpc_test_TestStreamResponse>::
208 Open<TestService::TestClientStreamRpc>(
209 ctx.server, ctx.channel.id(), ctx.service);
210 // Sets ChannelOutput to always return false.
211 ctx.output.set_send_status(Status::Unknown());
212 ASSERT_EQ(Status::Unknown(), call.TryFinish({}, OkStatus()));
213
214 // Call should be still alive.
215 ASSERT_TRUE(call.active());
216 }
217
TEST(NanopbServerReader,TryCloseSuccessful)218 TEST(NanopbServerReader, TryCloseSuccessful) {
219 ReaderWriterTestContext<TestService::TestClientStreamRpc> ctx;
220 NanopbServerReader call = NanopbServerReader<pw_rpc_test_TestRequest,
221 pw_rpc_test_TestStreamResponse>::
222 Open<TestService::TestClientStreamRpc>(
223 ctx.server, ctx.channel.id(), ctx.service);
224 // Sets ChannelOutput to always return false.
225 ctx.output.set_send_status(Status::Unknown());
226 ASSERT_EQ(Status::Unknown(), call.TryFinish({}, OkStatus()));
227
228 // Call should be still alive.
229 ASSERT_TRUE(call.active());
230
231 // Tries to close the call again, with ChannelOutput set to return ok.
232 ctx.output.set_send_status(OkStatus());
233 ASSERT_EQ(OkStatus(), call.TryFinish({}, OkStatus()));
234 // Call should be closed.
235 ASSERT_FALSE(call.active());
236 }
237
TEST(NanopbServerReaderWriter,Closed)238 TEST(NanopbServerReaderWriter, Closed) {
239 ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
240 NanopbServerReaderWriter call =
241 NanopbServerReaderWriter<pw_rpc_test_TestRequest,
242 pw_rpc_test_TestStreamResponse>::
243 Open<TestService::TestBidirectionalStreamRpc>(
244 ctx.server, ctx.channel.id(), ctx.service);
245 ASSERT_EQ(OkStatus(), call.Finish(OkStatus()));
246
247 ASSERT_FALSE(call.active());
248 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
249
250 EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
251 EXPECT_EQ(Status::FailedPrecondition(), call.Finish(OkStatus()));
252
253 call.set_on_next([](const pw_rpc_test_TestRequest&) {});
254 call.set_on_error([](Status) {});
255 }
256
TEST(NanopbServerReaderWriter,TryClosedFailed)257 TEST(NanopbServerReaderWriter, TryClosedFailed) {
258 ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
259 NanopbServerReaderWriter call =
260 NanopbServerReaderWriter<pw_rpc_test_TestRequest,
261 pw_rpc_test_TestStreamResponse>::
262 Open<TestService::TestBidirectionalStreamRpc>(
263 ctx.server, ctx.channel.id(), ctx.service);
264 // Sets ChannelOutput to always return false.
265 ctx.output.set_send_status(Status::Unknown());
266 ASSERT_EQ(Status::Unknown(), call.TryFinish(OkStatus()));
267
268 // Call should be still alive.
269 ASSERT_TRUE(call.active());
270 }
271
TEST(NanopbServerReaderWriter,TryCloseSuccessful)272 TEST(NanopbServerReaderWriter, TryCloseSuccessful) {
273 ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
274 NanopbServerReaderWriter call =
275 NanopbServerReaderWriter<pw_rpc_test_TestRequest,
276 pw_rpc_test_TestStreamResponse>::
277 Open<TestService::TestBidirectionalStreamRpc>(
278 ctx.server, ctx.channel.id(), ctx.service);
279 // Sets ChannelOutput to always return false.
280 ctx.output.set_send_status(Status::Unknown());
281 ASSERT_EQ(Status::Unknown(), call.TryFinish(OkStatus()));
282
283 // Call should be still alive.
284 ASSERT_TRUE(call.active());
285
286 // Tries to close the call again, with ChannelOutput set to return ok.
287 ctx.output.set_send_status(OkStatus());
288 ASSERT_EQ(OkStatus(), call.TryFinish(OkStatus()));
289 // Call should be closed.
290 ASSERT_FALSE(call.active());
291 }
292
TEST(NanopbUnaryResponder,Open_ReturnsUsableResponder)293 TEST(NanopbUnaryResponder, Open_ReturnsUsableResponder) {
294 ReaderWriterTestContext<TestService::TestUnaryRpc> ctx;
295 NanopbUnaryResponder responder =
296 NanopbUnaryResponder<pw_rpc_test_TestResponse>::Open<
297 TestService::TestUnaryRpc>(ctx.server, ctx.channel.id(), ctx.service);
298
299 ASSERT_EQ(OkStatus(),
300 responder.Finish({.value = 4321, .repeated_field = {}}));
301
302 EXPECT_EQ(ctx.output.last_response<TestService::TestUnaryRpc>().value, 4321);
303 EXPECT_EQ(ctx.output.last_status(), OkStatus());
304 }
305
TEST(NanopbServerWriter,Open_ReturnsUsableWriter)306 TEST(NanopbServerWriter, Open_ReturnsUsableWriter) {
307 ReaderWriterTestContext<TestService::TestServerStreamRpc> ctx;
308 NanopbServerWriter responder =
309 NanopbServerWriter<pw_rpc_test_TestStreamResponse>::Open<
310 TestService::TestServerStreamRpc>(
311 ctx.server, ctx.channel.id(), ctx.service);
312
313 ASSERT_EQ(OkStatus(), responder.Write({.chunk = {}, .number = 321}));
314 ASSERT_EQ(OkStatus(), responder.Finish());
315
316 EXPECT_EQ(ctx.output.last_response<TestService::TestServerStreamRpc>().number,
317 321u);
318 EXPECT_EQ(ctx.output.last_status(), OkStatus());
319 }
320
TEST(NanopbServerReader,Open_ReturnsUsableReader)321 TEST(NanopbServerReader, Open_ReturnsUsableReader) {
322 ReaderWriterTestContext<TestService::TestClientStreamRpc> ctx;
323 NanopbServerReader responder =
324 NanopbServerReader<pw_rpc_test_TestRequest,
325 pw_rpc_test_TestStreamResponse>::
326 Open<TestService::TestClientStreamRpc>(
327 ctx.server, ctx.channel.id(), ctx.service);
328
329 ASSERT_EQ(OkStatus(), responder.Finish({.chunk = {}, .number = 321}));
330
331 EXPECT_EQ(ctx.output.last_response<TestService::TestClientStreamRpc>().number,
332 321u);
333 }
334
TEST(NanopbServerReaderWriter,Open_ReturnsUsableReaderWriter)335 TEST(NanopbServerReaderWriter, Open_ReturnsUsableReaderWriter) {
336 ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
337 NanopbServerReaderWriter responder =
338 NanopbServerReaderWriter<pw_rpc_test_TestRequest,
339 pw_rpc_test_TestStreamResponse>::
340 Open<TestService::TestBidirectionalStreamRpc>(
341 ctx.server, ctx.channel.id(), ctx.service);
342
343 ASSERT_EQ(OkStatus(), responder.Write({.chunk = {}, .number = 321}));
344 ASSERT_EQ(OkStatus(), responder.Finish(Status::NotFound()));
345
346 EXPECT_EQ(ctx.output.last_response<TestService::TestBidirectionalStreamRpc>()
347 .number,
348 321u);
349 EXPECT_EQ(ctx.output.last_status(), Status::NotFound());
350 }
351
TEST(RawServerReaderWriter,Open_UnknownChannel)352 TEST(RawServerReaderWriter, Open_UnknownChannel) {
353 ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
354 ASSERT_EQ(OkStatus(), ctx.server.CloseChannel(ctx.kChannelId));
355
356 NanopbServerReaderWriter call =
357 NanopbServerReaderWriter<pw_rpc_test_TestRequest,
358 pw_rpc_test_TestStreamResponse>::
359 Open<TestService::TestBidirectionalStreamRpc>(
360 ctx.server, ctx.kChannelId, ctx.service);
361
362 EXPECT_TRUE(call.active());
363 EXPECT_EQ(call.channel_id(), ctx.kChannelId);
364 EXPECT_EQ(Status::Unavailable(), call.Write({}));
365
366 ASSERT_EQ(OkStatus(), ctx.server.OpenChannel(ctx.kChannelId, ctx.output));
367
368 EXPECT_EQ(OkStatus(), call.Write({}));
369 EXPECT_TRUE(call.active());
370
371 EXPECT_EQ(OkStatus(), call.Finish());
372 EXPECT_FALSE(call.active());
373 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
374 }
375
TEST(RawServerReaderWriter,Open_MultipleTimes_CancelsPrevious)376 TEST(RawServerReaderWriter, Open_MultipleTimes_CancelsPrevious) {
377 ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
378
379 NanopbServerReaderWriter one =
380 NanopbServerReaderWriter<pw_rpc_test_TestRequest,
381 pw_rpc_test_TestStreamResponse>::
382 Open<TestService::TestBidirectionalStreamRpc>(
383 ctx.server, ctx.kChannelId, ctx.service);
384
385 std::optional<Status> error;
386 one.set_on_error([&error](Status status) { error = status; });
387
388 ASSERT_TRUE(one.active());
389
390 NanopbServerReaderWriter two =
391 NanopbServerReaderWriter<pw_rpc_test_TestRequest,
392 pw_rpc_test_TestStreamResponse>::
393 Open<TestService::TestBidirectionalStreamRpc>(
394 ctx.server, ctx.kChannelId, ctx.service);
395
396 EXPECT_FALSE(one.active());
397 EXPECT_TRUE(two.active());
398
399 EXPECT_EQ(Status::Cancelled(), error);
400 }
401
TEST(NanopbServerReader,CallbacksMoveCorrectly)402 TEST(NanopbServerReader, CallbacksMoveCorrectly) {
403 PW_NANOPB_TEST_METHOD_CONTEXT(TestServiceImpl, TestClientStreamRpc) ctx;
404
405 NanopbServerReader call_1 = ctx.reader();
406
407 ASSERT_TRUE(call_1.active());
408
409 pw_rpc_test_TestRequest received_request = {.integer = 12345678,
410 .status_code = 1};
411
412 call_1.set_on_next([&received_request](const pw_rpc_test_TestRequest& value) {
413 received_request = value;
414 });
415
416 NanopbServerReader<pw_rpc_test_TestRequest, pw_rpc_test_TestStreamResponse>
417 call_2;
418 call_2 = std::move(call_1);
419
420 constexpr pw_rpc_test_TestRequest request{.integer = 600613,
421 .status_code = 2};
422 ctx.SendClientStream(request);
423 EXPECT_EQ(request.integer, received_request.integer);
424 EXPECT_EQ(request.status_code, received_request.status_code);
425 }
426
427 } // namespace
428 } // namespace pw::rpc
429