1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_rpc/nanopb/client_reader_writer.h"
16
17 #include <optional>
18
19 #include "pw_rpc/nanopb/client_testing.h"
20 #include "pw_rpc_test_protos/test.rpc.pb.h"
21 #include "pw_unit_test/framework.h"
22
23 PW_MODIFY_DIAGNOSTICS_PUSH();
24 PW_MODIFY_DIAGNOSTIC(ignored, "-Wmissing-field-initializers");
25
26 namespace pw::rpc {
27 namespace {
28
29 using test::pw_rpc::nanopb::TestService;
30
FailIfCalled(Status)31 void FailIfCalled(Status) { FAIL(); }
32 template <typename T>
FailIfOnNextCalled(const T &)33 void FailIfOnNextCalled(const T&) {
34 FAIL();
35 }
36 template <typename T>
FailIfOnCompletedCalled(const T &,Status)37 void FailIfOnCompletedCalled(const T&, Status) {
38 FAIL();
39 }
40
TEST(NanopbUnaryReceiver,DefaultConstructed)41 TEST(NanopbUnaryReceiver, DefaultConstructed) {
42 NanopbUnaryReceiver<pw_rpc_test_TestResponse> call;
43
44 ASSERT_FALSE(call.active());
45 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
46
47 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
48
49 call.set_on_completed([](const pw_rpc_test_TestResponse&, Status) {});
50 call.set_on_error([](Status) {});
51 }
52
TEST(NanopbClientWriter,DefaultConstructed)53 TEST(NanopbClientWriter, DefaultConstructed) {
54 NanopbClientWriter<pw_rpc_test_TestRequest, pw_rpc_test_TestStreamResponse>
55 call;
56
57 ASSERT_FALSE(call.active());
58 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
59
60 EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
61 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
62 EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
63
64 call.set_on_completed([](const pw_rpc_test_TestStreamResponse&, Status) {});
65 call.set_on_error([](Status) {});
66 }
67
TEST(NanopbClientReader,DefaultConstructed)68 TEST(NanopbClientReader, DefaultConstructed) {
69 NanopbClientReader<pw_rpc_test_TestStreamResponse> call;
70
71 ASSERT_FALSE(call.active());
72 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
73
74 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
75 EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
76
77 call.set_on_completed([](Status) {});
78 call.set_on_next([](const pw_rpc_test_TestStreamResponse&) {});
79 call.set_on_error([](Status) {});
80 }
81
TEST(NanopbClientReaderWriter,DefaultConstructed)82 TEST(NanopbClientReaderWriter, DefaultConstructed) {
83 NanopbClientReaderWriter<pw_rpc_test_TestRequest,
84 pw_rpc_test_TestStreamResponse>
85 call;
86
87 ASSERT_FALSE(call.active());
88 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
89
90 EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
91 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
92 EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
93
94 call.set_on_completed([](Status) {});
95 call.set_on_next([](const pw_rpc_test_TestStreamResponse&) {});
96 call.set_on_error([](Status) {});
97 }
98
TEST(NanopbClientWriter,RequestCompletion)99 TEST(NanopbClientWriter, RequestCompletion) {
100 NanopbClientTestContext ctx;
101 NanopbClientWriter<pw_rpc_test_TestRequest, pw_rpc_test_TestStreamResponse>
102 call = TestService::TestClientStreamRpc(
103 ctx.client(),
104 ctx.channel().id(),
105 FailIfOnCompletedCalled<pw_rpc_test_TestStreamResponse>,
106 FailIfCalled);
107 ASSERT_EQ(OkStatus(), call.RequestCompletion());
108
109 ASSERT_TRUE(call.active());
110 EXPECT_EQ(call.channel_id(), ctx.channel().id());
111
112 EXPECT_EQ(OkStatus(), call.Write({}));
113 EXPECT_EQ(OkStatus(), call.RequestCompletion());
114 EXPECT_EQ(OkStatus(), call.Cancel());
115
116 call.set_on_completed([](const pw_rpc_test_TestStreamResponse&, Status) {});
117 call.set_on_error([](Status) {});
118 }
119
TEST(NanopbClientReader,RequestCompletion)120 TEST(NanopbClientReader, RequestCompletion) {
121 NanopbClientTestContext ctx;
122 NanopbClientReader<pw_rpc_test_TestStreamResponse> call =
123 TestService::TestServerStreamRpc(
124 ctx.client(),
125 ctx.channel().id(),
126 {},
127 FailIfOnNextCalled<pw_rpc_test_TestStreamResponse>,
128 FailIfCalled,
129 FailIfCalled);
130 ASSERT_EQ(OkStatus(), call.RequestCompletion());
131
132 ASSERT_TRUE(call.active());
133 EXPECT_EQ(call.channel_id(), ctx.channel().id());
134
135 EXPECT_EQ(OkStatus(), call.RequestCompletion());
136 EXPECT_EQ(OkStatus(), call.Cancel());
137
138 call.set_on_completed([](Status) {});
139 call.set_on_next([](const pw_rpc_test_TestStreamResponse&) {});
140 call.set_on_error([](Status) {});
141 }
142
TEST(NanopbClientReaderWriter,RequestCompletion)143 TEST(NanopbClientReaderWriter, RequestCompletion) {
144 NanopbClientTestContext ctx;
145 NanopbClientReaderWriter<pw_rpc_test_TestRequest,
146 pw_rpc_test_TestStreamResponse>
147 call = TestService::TestBidirectionalStreamRpc(
148 ctx.client(),
149 ctx.channel().id(),
150 FailIfOnNextCalled<pw_rpc_test_TestStreamResponse>,
151 FailIfCalled,
152 FailIfCalled);
153 ASSERT_EQ(OkStatus(), call.RequestCompletion());
154
155 ASSERT_TRUE(call.active());
156 EXPECT_EQ(call.channel_id(), ctx.channel().id());
157
158 EXPECT_EQ(OkStatus(), call.Write({}));
159 EXPECT_EQ(OkStatus(), call.RequestCompletion());
160 EXPECT_EQ(OkStatus(), call.Cancel());
161
162 call.set_on_completed([](Status) {});
163 call.set_on_next([](const pw_rpc_test_TestStreamResponse&) {});
164 call.set_on_error([](Status) {});
165 }
166
TEST(NanopbUnaryReceiver,Closed)167 TEST(NanopbUnaryReceiver, Closed) {
168 NanopbClientTestContext ctx;
169 NanopbUnaryReceiver<pw_rpc_test_TestResponse> call =
170 TestService::TestUnaryRpc(
171 ctx.client(),
172 ctx.channel().id(),
173 {},
174 FailIfOnCompletedCalled<pw_rpc_test_TestResponse>,
175 FailIfCalled);
176 ASSERT_EQ(OkStatus(), call.Cancel());
177
178 ASSERT_FALSE(call.active());
179 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
180
181 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
182
183 call.set_on_completed([](const pw_rpc_test_TestResponse&, Status) {});
184 call.set_on_error([](Status) {});
185 }
186
TEST(NanopbClientWriter,Closed)187 TEST(NanopbClientWriter, Closed) {
188 NanopbClientTestContext ctx;
189 NanopbClientWriter<pw_rpc_test_TestRequest, pw_rpc_test_TestStreamResponse>
190 call = TestService::TestClientStreamRpc(
191 ctx.client(),
192 ctx.channel().id(),
193 FailIfOnCompletedCalled<pw_rpc_test_TestStreamResponse>,
194 FailIfCalled);
195 ASSERT_EQ(OkStatus(), call.Cancel());
196
197 ASSERT_FALSE(call.active());
198 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
199
200 EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
201 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
202 EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
203
204 call.set_on_completed([](const pw_rpc_test_TestStreamResponse&, Status) {});
205 call.set_on_error([](Status) {});
206 }
207
TEST(NanopbClientReader,Closed)208 TEST(NanopbClientReader, Closed) {
209 NanopbClientTestContext ctx;
210 NanopbClientReader<pw_rpc_test_TestStreamResponse> call =
211 TestService::TestServerStreamRpc(
212 ctx.client(),
213 ctx.channel().id(),
214 {},
215 FailIfOnNextCalled<pw_rpc_test_TestStreamResponse>,
216 FailIfCalled,
217 FailIfCalled);
218 ASSERT_EQ(OkStatus(), call.Cancel());
219
220 ASSERT_FALSE(call.active());
221 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
222
223 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
224 EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
225
226 call.set_on_completed([](Status) {});
227 call.set_on_next([](const pw_rpc_test_TestStreamResponse&) {});
228 call.set_on_error([](Status) {});
229 }
230
TEST(NanopbClientReaderWriter,Closed)231 TEST(NanopbClientReaderWriter, Closed) {
232 NanopbClientTestContext ctx;
233 NanopbClientReaderWriter<pw_rpc_test_TestRequest,
234 pw_rpc_test_TestStreamResponse>
235 call = TestService::TestBidirectionalStreamRpc(
236 ctx.client(),
237 ctx.channel().id(),
238 FailIfOnNextCalled<pw_rpc_test_TestStreamResponse>,
239 FailIfCalled,
240 FailIfCalled);
241 ASSERT_EQ(OkStatus(), call.Cancel());
242
243 ASSERT_FALSE(call.active());
244 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
245
246 EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
247 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
248 EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
249
250 call.set_on_completed([](Status) {});
251 call.set_on_next([](const pw_rpc_test_TestStreamResponse&) {});
252 call.set_on_error([](Status) {});
253 }
254
TEST(NanopbUnaryReceiver,CloseAndWaitForCallbacks)255 TEST(NanopbUnaryReceiver, CloseAndWaitForCallbacks) {
256 NanopbClientTestContext ctx;
257 NanopbUnaryReceiver<pw_rpc_test_TestResponse> call =
258 TestService::TestUnaryRpc(
259 ctx.client(),
260 ctx.channel().id(),
261 {},
262 FailIfOnCompletedCalled<pw_rpc_test_TestResponse>,
263 FailIfCalled);
264 call.CloseAndWaitForCallbacks();
265
266 ASSERT_FALSE(call.active());
267 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
268
269 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
270
271 call.set_on_completed([](const pw_rpc_test_TestResponse&, Status) {});
272 call.set_on_error([](Status) {});
273 }
274
TEST(NanopbClientWriter,CloseAndWaitForCallbacks)275 TEST(NanopbClientWriter, CloseAndWaitForCallbacks) {
276 NanopbClientTestContext ctx;
277 NanopbClientWriter<pw_rpc_test_TestRequest, pw_rpc_test_TestStreamResponse>
278 call = TestService::TestClientStreamRpc(
279 ctx.client(),
280 ctx.channel().id(),
281 FailIfOnCompletedCalled<pw_rpc_test_TestStreamResponse>,
282 FailIfCalled);
283 call.CloseAndWaitForCallbacks();
284
285 ASSERT_FALSE(call.active());
286 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
287
288 EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
289 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
290 EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
291
292 call.set_on_completed([](const pw_rpc_test_TestStreamResponse&, Status) {});
293 call.set_on_error([](Status) {});
294 }
295
TEST(NanopbClientReader,CloseAndWaitForCallbacks)296 TEST(NanopbClientReader, CloseAndWaitForCallbacks) {
297 NanopbClientTestContext ctx;
298 NanopbClientReader<pw_rpc_test_TestStreamResponse> call =
299 TestService::TestServerStreamRpc(
300 ctx.client(),
301 ctx.channel().id(),
302 {},
303 FailIfOnNextCalled<pw_rpc_test_TestStreamResponse>,
304 FailIfCalled,
305 FailIfCalled);
306 call.CloseAndWaitForCallbacks();
307
308 ASSERT_FALSE(call.active());
309 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
310
311 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
312 EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
313
314 call.set_on_completed([](Status) {});
315 call.set_on_next([](const pw_rpc_test_TestStreamResponse&) {});
316 call.set_on_error([](Status) {});
317 }
318
TEST(NanopbClientReaderWriter,CloseAndWaitForCallbacks)319 TEST(NanopbClientReaderWriter, CloseAndWaitForCallbacks) {
320 NanopbClientTestContext ctx;
321 NanopbClientReaderWriter<pw_rpc_test_TestRequest,
322 pw_rpc_test_TestStreamResponse>
323 call = TestService::TestBidirectionalStreamRpc(
324 ctx.client(),
325 ctx.channel().id(),
326 FailIfOnNextCalled<pw_rpc_test_TestStreamResponse>,
327 FailIfCalled,
328 FailIfCalled);
329 call.CloseAndWaitForCallbacks();
330
331 ASSERT_FALSE(call.active());
332 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
333
334 EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
335 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
336 EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
337
338 call.set_on_completed([](Status) {});
339 call.set_on_next([](const pw_rpc_test_TestStreamResponse&) {});
340 call.set_on_error([](Status) {});
341 }
342
TEST(NanopbUnaryReceiver,CallbacksMoveCorrectly)343 TEST(NanopbUnaryReceiver, CallbacksMoveCorrectly) {
344 NanopbClientTestContext ctx;
345
346 struct {
347 pw_rpc_test_TestResponse payload = {.value = 12345678};
348 std::optional<Status> status;
349 } reply;
350
351 NanopbUnaryReceiver<pw_rpc_test_TestResponse> call_2;
352 {
353 NanopbUnaryReceiver call_1 = TestService::TestUnaryRpc(
354 ctx.client(),
355 ctx.channel().id(),
356 {},
357 [&reply](const pw_rpc_test_TestResponse& response, Status status) {
358 reply.payload = response;
359 reply.status = status;
360 });
361
362 call_2 = std::move(call_1);
363 }
364
365 ctx.server().SendResponse<TestService::TestUnaryRpc>({.value = 9000},
366 Status::NotFound());
367 EXPECT_EQ(reply.payload.value, 9000);
368 EXPECT_EQ(reply.status, Status::NotFound());
369 }
370
TEST(NanopbClientReaderWriter,CallbacksMoveCorrectly)371 TEST(NanopbClientReaderWriter, CallbacksMoveCorrectly) {
372 NanopbClientTestContext ctx;
373
374 pw_rpc_test_TestStreamResponse payload = {.chunk = {}, .number = 13579};
375
376 NanopbClientReaderWriter<pw_rpc_test_TestRequest,
377 pw_rpc_test_TestStreamResponse>
378 call_2;
379 {
380 NanopbClientReaderWriter call_1 = TestService::TestBidirectionalStreamRpc(
381 ctx.client(),
382 ctx.channel().id(),
383 [&payload](const pw_rpc_test_TestStreamResponse& response) {
384 payload = response;
385 });
386
387 call_2 = std::move(call_1);
388 }
389
390 ctx.server().SendServerStream<TestService::TestBidirectionalStreamRpc>(
391 {.chunk = {}, .number = 5050});
392 EXPECT_EQ(payload.number, 5050u);
393 }
394
395 } // namespace
396 } // namespace pw::rpc
397
398 PW_MODIFY_DIAGNOSTICS_POP();
399