xref: /aosp_15_r20/external/pigweed/pw_rpc/pwpb/client_integration_test.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_assert/check.h"
16 #include "pw_rpc/benchmark.rpc.pwpb.h"
17 #include "pw_rpc/integration_testing.h"
18 #include "pw_sync/binary_semaphore.h"
19 #include "pw_unit_test/framework.h"
20 
21 namespace pwpb_rpc_test {
22 namespace {
23 
24 using namespace std::chrono_literals;
25 using pw::ByteSpan;
26 using pw::ConstByteSpan;
27 using pw::Function;
28 using pw::OkStatus;
29 using pw::Status;
30 
31 using pw::rpc::pw_rpc::pwpb::Benchmark;
32 
33 constexpr int kIterations = 10;
34 
35 class PayloadReceiver {
36  public:
Wait()37   const char* Wait() {
38     constexpr auto kWaitTimeout = 1500ms;
39     PW_CHECK(sem_.try_acquire_for(kWaitTimeout));
40     return reinterpret_cast<const char*>(payload_.payload.data());
41   }
42 
UnaryOnCompleted()43   Function<void(const pw::rpc::Payload::Message&, Status)> UnaryOnCompleted() {
44     return [this](const pw::rpc::Payload::Message& data, Status) {
45       CopyPayload(data);
46     };
47   }
48 
OnNext()49   Function<void(const pw::rpc::Payload::Message&)> OnNext() {
50     return [this](const pw::rpc::Payload::Message& data) { CopyPayload(data); };
51   }
52 
53  private:
CopyPayload(const pw::rpc::Payload::Message & data)54   void CopyPayload(const pw::rpc::Payload::Message& data) {
55     payload_ = data;
56     sem_.release();
57   }
58 
59   pw::sync::BinarySemaphore sem_;
60   pw::rpc::Payload::Message payload_ = {};
61 };
62 
63 template <size_t kSize>
Payload(const char (& string)[kSize])64 pw::rpc::Payload::Message Payload(const char (&string)[kSize]) {
65   static_assert(kSize <= sizeof(pw::rpc::Payload::Message::payload));
66   pw::rpc::Payload::Message payload{};
67   payload.payload.resize(kSize);
68   std::memcpy(payload.payload.data(), string, kSize);
69   return payload;
70 }
71 
72 const Benchmark::Client kClient(pw::rpc::integration_test::client(),
73                                 pw::rpc::integration_test::kChannelId);
74 
TEST(PwpbRpcIntegrationTest,Unary)75 TEST(PwpbRpcIntegrationTest, Unary) {
76   char value[] = {"hello, world!"};
77 
78   for (int i = 0; i < kIterations; ++i) {
79     PayloadReceiver receiver;
80 
81     value[0] = static_cast<char>(i);
82     pw::rpc::PwpbUnaryReceiver call =
83         kClient.UnaryEcho(Payload(value), receiver.UnaryOnCompleted());
84     ASSERT_STREQ(receiver.Wait(), value);
85   }
86 }
87 
TEST(PwpbRpcIntegrationTest,Unary_ReuseCall)88 TEST(PwpbRpcIntegrationTest, Unary_ReuseCall) {
89   pw::rpc::PwpbUnaryReceiver<pw::rpc::Payload::Message> call;
90   char value[] = {"O_o "};
91 
92   for (int i = 0; i < kIterations; ++i) {
93     PayloadReceiver receiver;
94 
95     value[sizeof(value) - 2] = static_cast<char>(i);
96     call = kClient.UnaryEcho(Payload(value), receiver.UnaryOnCompleted());
97     ASSERT_STREQ(receiver.Wait(), value);
98   }
99 }
100 
TEST(PwpbRpcIntegrationTest,Unary_DiscardCalls)101 TEST(PwpbRpcIntegrationTest, Unary_DiscardCalls) {
102   constexpr int iterations = PW_RPC_USE_GLOBAL_MUTEX ? 10000 : 1;
103   for (int i = 0; i < iterations; ++i) {
104     kClient.UnaryEcho(Payload("O_o"));
105   }
106 }
107 
TEST(PwpbRpcIntegrationTest,BidirectionalStreaming_MoveCalls)108 TEST(PwpbRpcIntegrationTest, BidirectionalStreaming_MoveCalls) {
109   for (int i = 0; i < kIterations; ++i) {
110     PayloadReceiver receiver;
111     pw::rpc::PwpbClientReaderWriter call =
112         kClient.BidirectionalEcho(receiver.OnNext());
113 
114     ASSERT_EQ(OkStatus(), call.Write(Payload("Yello")));
115     ASSERT_STREQ(receiver.Wait(), "Yello");
116 
117     pw::rpc::PwpbClientReaderWriter<pw::rpc::Payload::Message,
118                                     pw::rpc::Payload::Message>
119         new_call = std::move(call);
120 
121     // NOLINTNEXTLINE(bugprone-use-after-move)
122     EXPECT_EQ(Status::FailedPrecondition(), call.Write(Payload("Dello")));
123 
124     ASSERT_EQ(OkStatus(), new_call.Write(Payload("Dello")));
125     ASSERT_STREQ(receiver.Wait(), "Dello");
126 
127     call = std::move(new_call);
128 
129     // NOLINTNEXTLINE(bugprone-use-after-move)
130     EXPECT_EQ(Status::FailedPrecondition(), new_call.Write(Payload("Dello")));
131 
132     ASSERT_EQ(OkStatus(), call.Write(Payload("???")));
133     ASSERT_STREQ(receiver.Wait(), "???");
134 
135     EXPECT_EQ(OkStatus(), call.Cancel());
136     EXPECT_EQ(Status::FailedPrecondition(), new_call.Cancel());
137   }
138 }
139 
TEST(PwpbRpcIntegrationTest,BidirectionalStreaming_ReuseCall)140 TEST(PwpbRpcIntegrationTest, BidirectionalStreaming_ReuseCall) {
141   pw::rpc::PwpbClientReaderWriter<pw::rpc::Payload::Message,
142                                   pw::rpc::Payload::Message>
143       call;
144 
145   for (int i = 0; i < kIterations; ++i) {
146     PayloadReceiver receiver;
147     call = kClient.BidirectionalEcho(receiver.OnNext());
148 
149     ASSERT_EQ(OkStatus(), call.Write(Payload("Yello")));
150     ASSERT_STREQ(receiver.Wait(), "Yello");
151 
152     ASSERT_EQ(OkStatus(), call.Write(Payload("Dello")));
153     ASSERT_STREQ(receiver.Wait(), "Dello");
154 
155     ASSERT_EQ(OkStatus(), call.Write(Payload("???")));
156     ASSERT_STREQ(receiver.Wait(), "???");
157   }
158 }
159 
160 }  // namespace
161 }  // namespace pwpb_rpc_test
162