1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_assert/check.h"
16 #include "pw_rpc/benchmark.rpc.pwpb.h"
17 #include "pw_rpc/integration_testing.h"
18 #include "pw_sync/binary_semaphore.h"
19 #include "pw_unit_test/framework.h"
20
21 namespace pwpb_rpc_test {
22 namespace {
23
24 using namespace std::chrono_literals;
25 using pw::ByteSpan;
26 using pw::ConstByteSpan;
27 using pw::Function;
28 using pw::OkStatus;
29 using pw::Status;
30
31 using pw::rpc::pw_rpc::pwpb::Benchmark;
32
33 constexpr int kIterations = 10;
34
35 class PayloadReceiver {
36 public:
Wait()37 const char* Wait() {
38 constexpr auto kWaitTimeout = 1500ms;
39 PW_CHECK(sem_.try_acquire_for(kWaitTimeout));
40 return reinterpret_cast<const char*>(payload_.payload.data());
41 }
42
UnaryOnCompleted()43 Function<void(const pw::rpc::Payload::Message&, Status)> UnaryOnCompleted() {
44 return [this](const pw::rpc::Payload::Message& data, Status) {
45 CopyPayload(data);
46 };
47 }
48
OnNext()49 Function<void(const pw::rpc::Payload::Message&)> OnNext() {
50 return [this](const pw::rpc::Payload::Message& data) { CopyPayload(data); };
51 }
52
53 private:
CopyPayload(const pw::rpc::Payload::Message & data)54 void CopyPayload(const pw::rpc::Payload::Message& data) {
55 payload_ = data;
56 sem_.release();
57 }
58
59 pw::sync::BinarySemaphore sem_;
60 pw::rpc::Payload::Message payload_ = {};
61 };
62
63 template <size_t kSize>
Payload(const char (& string)[kSize])64 pw::rpc::Payload::Message Payload(const char (&string)[kSize]) {
65 static_assert(kSize <= sizeof(pw::rpc::Payload::Message::payload));
66 pw::rpc::Payload::Message payload{};
67 payload.payload.resize(kSize);
68 std::memcpy(payload.payload.data(), string, kSize);
69 return payload;
70 }
71
72 const Benchmark::Client kClient(pw::rpc::integration_test::client(),
73 pw::rpc::integration_test::kChannelId);
74
TEST(PwpbRpcIntegrationTest,Unary)75 TEST(PwpbRpcIntegrationTest, Unary) {
76 char value[] = {"hello, world!"};
77
78 for (int i = 0; i < kIterations; ++i) {
79 PayloadReceiver receiver;
80
81 value[0] = static_cast<char>(i);
82 pw::rpc::PwpbUnaryReceiver call =
83 kClient.UnaryEcho(Payload(value), receiver.UnaryOnCompleted());
84 ASSERT_STREQ(receiver.Wait(), value);
85 }
86 }
87
TEST(PwpbRpcIntegrationTest,Unary_ReuseCall)88 TEST(PwpbRpcIntegrationTest, Unary_ReuseCall) {
89 pw::rpc::PwpbUnaryReceiver<pw::rpc::Payload::Message> call;
90 char value[] = {"O_o "};
91
92 for (int i = 0; i < kIterations; ++i) {
93 PayloadReceiver receiver;
94
95 value[sizeof(value) - 2] = static_cast<char>(i);
96 call = kClient.UnaryEcho(Payload(value), receiver.UnaryOnCompleted());
97 ASSERT_STREQ(receiver.Wait(), value);
98 }
99 }
100
TEST(PwpbRpcIntegrationTest,Unary_DiscardCalls)101 TEST(PwpbRpcIntegrationTest, Unary_DiscardCalls) {
102 constexpr int iterations = PW_RPC_USE_GLOBAL_MUTEX ? 10000 : 1;
103 for (int i = 0; i < iterations; ++i) {
104 kClient.UnaryEcho(Payload("O_o"));
105 }
106 }
107
TEST(PwpbRpcIntegrationTest,BidirectionalStreaming_MoveCalls)108 TEST(PwpbRpcIntegrationTest, BidirectionalStreaming_MoveCalls) {
109 for (int i = 0; i < kIterations; ++i) {
110 PayloadReceiver receiver;
111 pw::rpc::PwpbClientReaderWriter call =
112 kClient.BidirectionalEcho(receiver.OnNext());
113
114 ASSERT_EQ(OkStatus(), call.Write(Payload("Yello")));
115 ASSERT_STREQ(receiver.Wait(), "Yello");
116
117 pw::rpc::PwpbClientReaderWriter<pw::rpc::Payload::Message,
118 pw::rpc::Payload::Message>
119 new_call = std::move(call);
120
121 // NOLINTNEXTLINE(bugprone-use-after-move)
122 EXPECT_EQ(Status::FailedPrecondition(), call.Write(Payload("Dello")));
123
124 ASSERT_EQ(OkStatus(), new_call.Write(Payload("Dello")));
125 ASSERT_STREQ(receiver.Wait(), "Dello");
126
127 call = std::move(new_call);
128
129 // NOLINTNEXTLINE(bugprone-use-after-move)
130 EXPECT_EQ(Status::FailedPrecondition(), new_call.Write(Payload("Dello")));
131
132 ASSERT_EQ(OkStatus(), call.Write(Payload("???")));
133 ASSERT_STREQ(receiver.Wait(), "???");
134
135 EXPECT_EQ(OkStatus(), call.Cancel());
136 EXPECT_EQ(Status::FailedPrecondition(), new_call.Cancel());
137 }
138 }
139
TEST(PwpbRpcIntegrationTest,BidirectionalStreaming_ReuseCall)140 TEST(PwpbRpcIntegrationTest, BidirectionalStreaming_ReuseCall) {
141 pw::rpc::PwpbClientReaderWriter<pw::rpc::Payload::Message,
142 pw::rpc::Payload::Message>
143 call;
144
145 for (int i = 0; i < kIterations; ++i) {
146 PayloadReceiver receiver;
147 call = kClient.BidirectionalEcho(receiver.OnNext());
148
149 ASSERT_EQ(OkStatus(), call.Write(Payload("Yello")));
150 ASSERT_STREQ(receiver.Wait(), "Yello");
151
152 ASSERT_EQ(OkStatus(), call.Write(Payload("Dello")));
153 ASSERT_STREQ(receiver.Wait(), "Dello");
154
155 ASSERT_EQ(OkStatus(), call.Write(Payload("???")));
156 ASSERT_STREQ(receiver.Wait(), "???");
157 }
158 }
159
160 } // namespace
161 } // namespace pwpb_rpc_test
162