1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_rpc/pwpb/client_reader_writer.h"
16
17 #include <optional>
18
19 #include "public/pw_rpc/pwpb/client_reader_writer.h"
20 #include "pw_rpc/pwpb/client_testing.h"
21 #include "pw_rpc_test_protos/test.rpc.pwpb.h"
22 #include "pw_unit_test/framework.h"
23
24 PW_MODIFY_DIAGNOSTICS_PUSH();
25 PW_MODIFY_DIAGNOSTIC(ignored, "-Wmissing-field-initializers");
26
27 namespace pw::rpc {
28 namespace {
29
30 using test::pw_rpc::pwpb::TestService;
31
32 namespace TestRequest = ::pw::rpc::test::pwpb::TestRequest;
33 namespace TestResponse = ::pw::rpc::test::pwpb::TestResponse;
34 namespace TestStreamResponse = ::pw::rpc::test::pwpb::TestStreamResponse;
35
FailIfCalled(Status)36 void FailIfCalled(Status) { FAIL(); }
37 template <typename T>
FailIfOnNextCalled(const T &)38 void FailIfOnNextCalled(const T&) {
39 FAIL();
40 }
41 template <typename T>
FailIfOnCompletedCalled(const T &,Status)42 void FailIfOnCompletedCalled(const T&, Status) {
43 FAIL();
44 }
45
TEST(PwpbUnaryReceiver,DefaultConstructed)46 TEST(PwpbUnaryReceiver, DefaultConstructed) {
47 PwpbUnaryReceiver<TestResponse::Message> call;
48
49 ASSERT_FALSE(call.active());
50 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
51
52 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
53
54 call.set_on_completed([](const TestResponse::Message&, Status) {});
55 call.set_on_error([](Status) {});
56 }
57
TEST(PwpbClientWriter,DefaultConstructed)58 TEST(PwpbClientWriter, DefaultConstructed) {
59 PwpbClientWriter<TestRequest::Message, TestStreamResponse::Message> call;
60
61 ASSERT_FALSE(call.active());
62 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
63
64 EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
65 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
66 EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
67
68 call.set_on_completed([](const TestStreamResponse::Message&, Status) {});
69 call.set_on_error([](Status) {});
70 }
71
TEST(PwpbClientReader,DefaultConstructed)72 TEST(PwpbClientReader, DefaultConstructed) {
73 PwpbClientReader<TestStreamResponse::Message> call;
74
75 ASSERT_FALSE(call.active());
76 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
77
78 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
79 EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
80
81 call.set_on_completed([](Status) {});
82 call.set_on_next([](const TestStreamResponse::Message&) {});
83 call.set_on_error([](Status) {});
84 }
85
TEST(PwpbClientReaderWriter,DefaultConstructed)86 TEST(PwpbClientReaderWriter, DefaultConstructed) {
87 PwpbClientReaderWriter<TestRequest::Message, TestStreamResponse::Message>
88 call;
89
90 ASSERT_FALSE(call.active());
91 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
92
93 EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
94 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
95 EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
96
97 call.set_on_completed([](Status) {});
98 call.set_on_next([](const TestStreamResponse::Message&) {});
99 call.set_on_error([](Status) {});
100 }
101
TEST(PwpbClientWriter,RequestCompletion)102 TEST(PwpbClientWriter, RequestCompletion) {
103 PwpbClientTestContext ctx;
104 PwpbClientWriter<TestRequest::Message, TestStreamResponse::Message> call =
105 TestService::TestClientStreamRpc(
106 ctx.client(),
107 ctx.channel().id(),
108 FailIfOnCompletedCalled<TestStreamResponse::Message>,
109 FailIfCalled);
110 ASSERT_EQ(OkStatus(), call.RequestCompletion());
111
112 ASSERT_TRUE(call.active());
113 EXPECT_EQ(call.channel_id(), ctx.channel().id());
114
115 EXPECT_EQ(OkStatus(), call.Write({}));
116 EXPECT_EQ(OkStatus(), call.RequestCompletion());
117 EXPECT_EQ(OkStatus(), call.Cancel());
118
119 call.set_on_completed([](const TestStreamResponse::Message&, Status) {});
120 call.set_on_error([](Status) {});
121 }
122
TEST(PwpbClientReader,RequestCompletion)123 TEST(PwpbClientReader, RequestCompletion) {
124 PwpbClientTestContext ctx;
125 PwpbClientReader<TestStreamResponse::Message> call =
126 TestService::TestServerStreamRpc(
127 ctx.client(),
128 ctx.channel().id(),
129 {},
130 FailIfOnNextCalled<TestStreamResponse::Message>,
131 FailIfCalled,
132 FailIfCalled);
133 ASSERT_EQ(OkStatus(), call.RequestCompletion());
134
135 ASSERT_TRUE(call.active());
136 EXPECT_EQ(call.channel_id(), ctx.channel().id());
137
138 EXPECT_EQ(OkStatus(), call.RequestCompletion());
139 EXPECT_EQ(OkStatus(), call.Cancel());
140
141 call.set_on_completed([](Status) {});
142 call.set_on_next([](const TestStreamResponse::Message&) {});
143 call.set_on_error([](Status) {});
144 }
145
TEST(PwpbClientReaderWriter,RequestCompletion)146 TEST(PwpbClientReaderWriter, RequestCompletion) {
147 PwpbClientTestContext ctx;
148 PwpbClientReaderWriter<TestRequest::Message, TestStreamResponse::Message>
149 call = TestService::TestBidirectionalStreamRpc(
150 ctx.client(),
151 ctx.channel().id(),
152 FailIfOnNextCalled<TestStreamResponse::Message>,
153 FailIfCalled,
154 FailIfCalled);
155 ASSERT_EQ(OkStatus(), call.RequestCompletion());
156
157 ASSERT_TRUE(call.active());
158 EXPECT_EQ(call.channel_id(), ctx.channel().id());
159
160 EXPECT_EQ(OkStatus(), call.Write({}));
161 EXPECT_EQ(OkStatus(), call.RequestCompletion());
162 EXPECT_EQ(OkStatus(), call.Cancel());
163
164 call.set_on_completed([](Status) {});
165 call.set_on_next([](const TestStreamResponse::Message&) {});
166 call.set_on_error([](Status) {});
167 }
168
TEST(PwpbUnaryReceiver,Closed)169 TEST(PwpbUnaryReceiver, Closed) {
170 PwpbClientTestContext ctx;
171 PwpbUnaryReceiver<TestResponse::Message> call =
172 TestService::TestUnaryRpc(ctx.client(),
173 ctx.channel().id(),
174 {},
175 FailIfOnCompletedCalled<TestResponse::Message>,
176 FailIfCalled);
177 ASSERT_EQ(OkStatus(), call.Cancel());
178
179 ASSERT_FALSE(call.active());
180 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
181
182 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
183
184 call.set_on_completed([](const TestResponse::Message&, Status) {});
185 call.set_on_error([](Status) {});
186 }
187
TEST(PwpbClientWriter,Closed)188 TEST(PwpbClientWriter, Closed) {
189 PwpbClientTestContext ctx;
190 PwpbClientWriter<TestRequest::Message, TestStreamResponse::Message> call =
191 TestService::TestClientStreamRpc(
192 ctx.client(),
193 ctx.channel().id(),
194 FailIfOnCompletedCalled<TestStreamResponse::Message>,
195 FailIfCalled);
196 ASSERT_EQ(OkStatus(), call.Cancel());
197
198 ASSERT_FALSE(call.active());
199 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
200
201 EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
202 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
203 EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
204
205 call.set_on_completed([](const TestStreamResponse::Message&, Status) {});
206 call.set_on_error([](Status) {});
207 }
208
TEST(PwpbClientReader,Closed)209 TEST(PwpbClientReader, Closed) {
210 PwpbClientTestContext ctx;
211 PwpbClientReader<TestStreamResponse::Message> call =
212 TestService::TestServerStreamRpc(
213 ctx.client(),
214 ctx.channel().id(),
215 {},
216 FailIfOnNextCalled<TestStreamResponse::Message>,
217 FailIfCalled,
218 FailIfCalled);
219 ASSERT_EQ(OkStatus(), call.Cancel());
220
221 ASSERT_FALSE(call.active());
222 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
223
224 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
225 EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
226
227 call.set_on_completed([](Status) {});
228 call.set_on_next([](const TestStreamResponse::Message&) {});
229 call.set_on_error([](Status) {});
230 }
231
TEST(PwpbClientReaderWriter,Closed)232 TEST(PwpbClientReaderWriter, Closed) {
233 PwpbClientTestContext ctx;
234 PwpbClientReaderWriter<TestRequest::Message, TestStreamResponse::Message>
235 call = TestService::TestBidirectionalStreamRpc(
236 ctx.client(),
237 ctx.channel().id(),
238 FailIfOnNextCalled<TestStreamResponse::Message>,
239 FailIfCalled,
240 FailIfCalled);
241 ASSERT_EQ(OkStatus(), call.Cancel());
242
243 ASSERT_FALSE(call.active());
244 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
245
246 EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
247 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
248 EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
249
250 call.set_on_completed([](Status) {});
251 call.set_on_next([](const TestStreamResponse::Message&) {});
252 call.set_on_error([](Status) {});
253 }
254
TEST(PwpbUnaryReceiver,CloseAndWaitForCallbacks)255 TEST(PwpbUnaryReceiver, CloseAndWaitForCallbacks) {
256 PwpbClientTestContext ctx;
257 PwpbUnaryReceiver<TestResponse::Message> call =
258 TestService::TestUnaryRpc(ctx.client(),
259 ctx.channel().id(),
260 {},
261 FailIfOnCompletedCalled<TestResponse::Message>,
262 FailIfCalled);
263 call.CloseAndWaitForCallbacks();
264
265 ASSERT_FALSE(call.active());
266 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
267
268 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
269
270 call.set_on_completed([](const TestResponse::Message&, Status) {});
271 call.set_on_error([](Status) {});
272 }
273
TEST(PwpbClientWriter,CloseAndWaitForCallbacks)274 TEST(PwpbClientWriter, CloseAndWaitForCallbacks) {
275 PwpbClientTestContext ctx;
276 PwpbClientWriter<TestRequest::Message, TestStreamResponse::Message> call =
277 TestService::TestClientStreamRpc(
278 ctx.client(),
279 ctx.channel().id(),
280 FailIfOnCompletedCalled<TestStreamResponse::Message>,
281 FailIfCalled);
282 call.CloseAndWaitForCallbacks();
283
284 ASSERT_FALSE(call.active());
285 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
286
287 EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
288 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
289 EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
290
291 call.set_on_completed([](const TestStreamResponse::Message&, Status) {});
292 call.set_on_error([](Status) {});
293 }
294
TEST(PwpbClientReader,CloseAndWaitForCallbacks)295 TEST(PwpbClientReader, CloseAndWaitForCallbacks) {
296 PwpbClientTestContext ctx;
297 PwpbClientReader<TestStreamResponse::Message> call =
298 TestService::TestServerStreamRpc(
299 ctx.client(),
300 ctx.channel().id(),
301 {},
302 FailIfOnNextCalled<TestStreamResponse::Message>,
303 FailIfCalled,
304 FailIfCalled);
305 call.CloseAndWaitForCallbacks();
306
307 ASSERT_FALSE(call.active());
308 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
309
310 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
311 EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
312
313 call.set_on_completed([](Status) {});
314 call.set_on_next([](const TestStreamResponse::Message&) {});
315 call.set_on_error([](Status) {});
316 }
317
TEST(PwpbClientReaderWriter,CloseAndWaitForCallbacks)318 TEST(PwpbClientReaderWriter, CloseAndWaitForCallbacks) {
319 PwpbClientTestContext ctx;
320 PwpbClientReaderWriter<TestRequest::Message, TestStreamResponse::Message>
321 call = TestService::TestBidirectionalStreamRpc(
322 ctx.client(),
323 ctx.channel().id(),
324 FailIfOnNextCalled<TestStreamResponse::Message>,
325 FailIfCalled,
326 FailIfCalled);
327 call.CloseAndWaitForCallbacks();
328
329 ASSERT_FALSE(call.active());
330 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
331
332 EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
333 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
334 EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
335
336 call.set_on_completed([](Status) {});
337 call.set_on_next([](const TestStreamResponse::Message&) {});
338 call.set_on_error([](Status) {});
339 }
340
TEST(PwpbUnaryReceiver,CallbacksMoveCorrectly)341 TEST(PwpbUnaryReceiver, CallbacksMoveCorrectly) {
342 PwpbClientTestContext ctx;
343
344 struct {
345 TestResponse::Message payload = {.value = 12345678};
346 std::optional<Status> status;
347 } reply;
348
349 PwpbUnaryReceiver<TestResponse::Message> call_2;
350 {
351 PwpbUnaryReceiver call_1 = TestService::TestUnaryRpc(
352 ctx.client(),
353 ctx.channel().id(),
354 {},
355 [&reply](const TestResponse::Message& response, Status status) {
356 reply.payload = response;
357 reply.status = status;
358 });
359
360 call_2 = std::move(call_1);
361 }
362
363 ctx.server().SendResponse<TestService::TestUnaryRpc>({.value = 9000},
364 Status::NotFound());
365 EXPECT_EQ(reply.payload.value, 9000);
366 EXPECT_EQ(reply.status, Status::NotFound());
367 }
368
TEST(PwpbClientReaderWriter,CallbacksMoveCorrectly)369 TEST(PwpbClientReaderWriter, CallbacksMoveCorrectly) {
370 PwpbClientTestContext ctx;
371
372 TestStreamResponse::Message payload = {.chunk = {}, .number = 13579};
373
374 PwpbClientReaderWriter<TestRequest::Message, TestStreamResponse::Message>
375 call_2;
376 {
377 PwpbClientReaderWriter call_1 = TestService::TestBidirectionalStreamRpc(
378 ctx.client(),
379 ctx.channel().id(),
380 [&payload](const TestStreamResponse::Message& response) {
381 payload = response;
382 });
383
384 call_2 = std::move(call_1);
385 }
386
387 ctx.server().SendServerStream<TestService::TestBidirectionalStreamRpc>(
388 {.chunk = {}, .number = 5050});
389 EXPECT_EQ(payload.number, 5050u);
390 }
391
392 } // namespace
393 } // namespace pw::rpc
394
395 PW_MODIFY_DIAGNOSTICS_POP();
396