1*61c4878aSAndroid Build Coastguard Worker // Copyright 2022 The Pigweed Authors
2*61c4878aSAndroid Build Coastguard Worker //
3*61c4878aSAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4*61c4878aSAndroid Build Coastguard Worker // use this file except in compliance with the License. You may obtain a copy of
5*61c4878aSAndroid Build Coastguard Worker // the License at
6*61c4878aSAndroid Build Coastguard Worker //
7*61c4878aSAndroid Build Coastguard Worker // https://www.apache.org/licenses/LICENSE-2.0
8*61c4878aSAndroid Build Coastguard Worker //
9*61c4878aSAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software
10*61c4878aSAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11*61c4878aSAndroid Build Coastguard Worker // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12*61c4878aSAndroid Build Coastguard Worker // License for the specific language governing permissions and limitations under
13*61c4878aSAndroid Build Coastguard Worker // the License.
14*61c4878aSAndroid Build Coastguard Worker
15*61c4878aSAndroid Build Coastguard Worker #include "pw_rpc/raw/client_testing.h"
16*61c4878aSAndroid Build Coastguard Worker #include "pw_rpc_test_protos/test.raw_rpc.pb.h"
17*61c4878aSAndroid Build Coastguard Worker #include "pw_sync/binary_semaphore.h"
18*61c4878aSAndroid Build Coastguard Worker #include "pw_thread/non_portable_test_thread_options.h"
19*61c4878aSAndroid Build Coastguard Worker #include "pw_thread/sleep.h"
20*61c4878aSAndroid Build Coastguard Worker #include "pw_thread/thread.h"
21*61c4878aSAndroid Build Coastguard Worker #include "pw_thread/yield.h"
22*61c4878aSAndroid Build Coastguard Worker #include "pw_unit_test/framework.h"
23*61c4878aSAndroid Build Coastguard Worker
24*61c4878aSAndroid Build Coastguard Worker namespace pw::rpc {
25*61c4878aSAndroid Build Coastguard Worker namespace {
26*61c4878aSAndroid Build Coastguard Worker
27*61c4878aSAndroid Build Coastguard Worker using namespace std::chrono_literals;
28*61c4878aSAndroid Build Coastguard Worker
29*61c4878aSAndroid Build Coastguard Worker using test::pw_rpc::raw::TestService;
30*61c4878aSAndroid Build Coastguard Worker
31*61c4878aSAndroid Build Coastguard Worker // These tests cover interactions between a thread moving or destroying an RPC
32*61c4878aSAndroid Build Coastguard Worker // call object and a thread running callbacks for that call. In order to test
33*61c4878aSAndroid Build Coastguard Worker // that the first thread waits for callbacks to complete when trying to move or
34*61c4878aSAndroid Build Coastguard Worker // destroy the call, it is necessary to have the callback thread yield to the
35*61c4878aSAndroid Build Coastguard Worker // other thread. There isn't a good way to synchronize these threads without
36*61c4878aSAndroid Build Coastguard Worker // changing the code under test.
YieldToOtherThread()37*61c4878aSAndroid Build Coastguard Worker void YieldToOtherThread() {
38*61c4878aSAndroid Build Coastguard Worker // Sleep for a while and then yield just to be sure the other thread runs.
39*61c4878aSAndroid Build Coastguard Worker this_thread::sleep_for(100ms);
40*61c4878aSAndroid Build Coastguard Worker this_thread::yield();
41*61c4878aSAndroid Build Coastguard Worker }
42*61c4878aSAndroid Build Coastguard Worker
43*61c4878aSAndroid Build Coastguard Worker class CallbacksTest : public ::testing::Test {
44*61c4878aSAndroid Build Coastguard Worker protected:
CallbacksTest()45*61c4878aSAndroid Build Coastguard Worker CallbacksTest()
46*61c4878aSAndroid Build Coastguard Worker // TODO: b/290860904 - Replace TestOptionsThread0 with
47*61c4878aSAndroid Build Coastguard Worker // TestThreadContext.
48*61c4878aSAndroid Build Coastguard Worker : callback_thread_(thread::test::TestOptionsThread0(),
49*61c4878aSAndroid Build Coastguard Worker [this] { SendResponseAfterSemaphore(); }) {}
50*61c4878aSAndroid Build Coastguard Worker
~CallbacksTest()51*61c4878aSAndroid Build Coastguard Worker ~CallbacksTest() override {
52*61c4878aSAndroid Build Coastguard Worker EXPECT_FALSE(callback_thread_.joinable()); // Tests must join the thread!
53*61c4878aSAndroid Build Coastguard Worker }
54*61c4878aSAndroid Build Coastguard Worker
RespondToCall(const RawClientReaderWriter & call)55*61c4878aSAndroid Build Coastguard Worker void RespondToCall(const RawClientReaderWriter& call) {
56*61c4878aSAndroid Build Coastguard Worker respond_to_call_ = &call;
57*61c4878aSAndroid Build Coastguard Worker }
58*61c4878aSAndroid Build Coastguard Worker
59*61c4878aSAndroid Build Coastguard Worker RawClientTestContext<> context_;
60*61c4878aSAndroid Build Coastguard Worker sync::BinarySemaphore callback_thread_sem_;
61*61c4878aSAndroid Build Coastguard Worker sync::BinarySemaphore main_thread_sem_;
62*61c4878aSAndroid Build Coastguard Worker
63*61c4878aSAndroid Build Coastguard Worker Thread callback_thread_;
64*61c4878aSAndroid Build Coastguard Worker
65*61c4878aSAndroid Build Coastguard Worker // Must be incremented exactly once by the RPC callback in each test.
66*61c4878aSAndroid Build Coastguard Worker volatile int callback_executed_ = 0;
67*61c4878aSAndroid Build Coastguard Worker
68*61c4878aSAndroid Build Coastguard Worker // Variables optionally used by tests. These are in this object so lambads
69*61c4878aSAndroid Build Coastguard Worker // only need to capture [this] to access them.
70*61c4878aSAndroid Build Coastguard Worker volatile bool call_is_in_scope_ = false;
71*61c4878aSAndroid Build Coastguard Worker
72*61c4878aSAndroid Build Coastguard Worker RawClientReaderWriter call_1_;
73*61c4878aSAndroid Build Coastguard Worker RawClientReaderWriter call_2_;
74*61c4878aSAndroid Build Coastguard Worker
75*61c4878aSAndroid Build Coastguard Worker private:
SendResponseAfterSemaphore()76*61c4878aSAndroid Build Coastguard Worker void SendResponseAfterSemaphore() {
77*61c4878aSAndroid Build Coastguard Worker // Wait until the main thread says to send the response.
78*61c4878aSAndroid Build Coastguard Worker callback_thread_sem_.acquire();
79*61c4878aSAndroid Build Coastguard Worker
80*61c4878aSAndroid Build Coastguard Worker context_.server().SendServerStream<TestService::TestBidirectionalStreamRpc>(
81*61c4878aSAndroid Build Coastguard Worker {}, respond_to_call_->id());
82*61c4878aSAndroid Build Coastguard Worker }
83*61c4878aSAndroid Build Coastguard Worker
84*61c4878aSAndroid Build Coastguard Worker const RawClientReaderWriter* respond_to_call_ = &call_1_;
85*61c4878aSAndroid Build Coastguard Worker };
86*61c4878aSAndroid Build Coastguard Worker
TEST_F(CallbacksTest,DestructorWaitsUntilCallbacksComplete)87*61c4878aSAndroid Build Coastguard Worker TEST_F(CallbacksTest, DestructorWaitsUntilCallbacksComplete) {
88*61c4878aSAndroid Build Coastguard Worker if (PW_RPC_USE_GLOBAL_MUTEX == 0) {
89*61c4878aSAndroid Build Coastguard Worker callback_thread_sem_.release();
90*61c4878aSAndroid Build Coastguard Worker callback_thread_.join();
91*61c4878aSAndroid Build Coastguard Worker GTEST_SKIP()
92*61c4878aSAndroid Build Coastguard Worker << "Skipping because locks are disabled, so this thread cannot yield.";
93*61c4878aSAndroid Build Coastguard Worker }
94*61c4878aSAndroid Build Coastguard Worker
95*61c4878aSAndroid Build Coastguard Worker {
96*61c4878aSAndroid Build Coastguard Worker RawClientReaderWriter local_call = TestService::TestBidirectionalStreamRpc(
97*61c4878aSAndroid Build Coastguard Worker context_.client(), context_.channel().id());
98*61c4878aSAndroid Build Coastguard Worker RespondToCall(local_call);
99*61c4878aSAndroid Build Coastguard Worker
100*61c4878aSAndroid Build Coastguard Worker call_is_in_scope_ = true;
101*61c4878aSAndroid Build Coastguard Worker
102*61c4878aSAndroid Build Coastguard Worker local_call.set_on_next([this](ConstByteSpan) {
103*61c4878aSAndroid Build Coastguard Worker main_thread_sem_.release();
104*61c4878aSAndroid Build Coastguard Worker
105*61c4878aSAndroid Build Coastguard Worker // Wait for a while so the main thread tries to destroy the call.
106*61c4878aSAndroid Build Coastguard Worker YieldToOtherThread();
107*61c4878aSAndroid Build Coastguard Worker
108*61c4878aSAndroid Build Coastguard Worker // Now, make sure the call is still in scope. The main thread should
109*61c4878aSAndroid Build Coastguard Worker // block in the call's destructor until this callback completes.
110*61c4878aSAndroid Build Coastguard Worker EXPECT_TRUE(call_is_in_scope_);
111*61c4878aSAndroid Build Coastguard Worker
112*61c4878aSAndroid Build Coastguard Worker callback_executed_ = callback_executed_ + 1;
113*61c4878aSAndroid Build Coastguard Worker });
114*61c4878aSAndroid Build Coastguard Worker
115*61c4878aSAndroid Build Coastguard Worker // Start the callback thread so it can invoke the callback.
116*61c4878aSAndroid Build Coastguard Worker callback_thread_sem_.release();
117*61c4878aSAndroid Build Coastguard Worker
118*61c4878aSAndroid Build Coastguard Worker // Wait until the callback thread starts.
119*61c4878aSAndroid Build Coastguard Worker main_thread_sem_.acquire();
120*61c4878aSAndroid Build Coastguard Worker }
121*61c4878aSAndroid Build Coastguard Worker
122*61c4878aSAndroid Build Coastguard Worker // The callback thread will sleep for a bit. Meanwhile, let the call go out
123*61c4878aSAndroid Build Coastguard Worker // of scope, and mark it as such.
124*61c4878aSAndroid Build Coastguard Worker call_is_in_scope_ = false;
125*61c4878aSAndroid Build Coastguard Worker
126*61c4878aSAndroid Build Coastguard Worker // Wait for the callback thread to finish.
127*61c4878aSAndroid Build Coastguard Worker callback_thread_.join();
128*61c4878aSAndroid Build Coastguard Worker
129*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(callback_executed_, 1);
130*61c4878aSAndroid Build Coastguard Worker }
131*61c4878aSAndroid Build Coastguard Worker
TEST_F(CallbacksTest,MoveActiveCall_WaitsForCallbackToComplete)132*61c4878aSAndroid Build Coastguard Worker TEST_F(CallbacksTest, MoveActiveCall_WaitsForCallbackToComplete) {
133*61c4878aSAndroid Build Coastguard Worker if (PW_RPC_USE_GLOBAL_MUTEX == 0) {
134*61c4878aSAndroid Build Coastguard Worker callback_thread_sem_.release();
135*61c4878aSAndroid Build Coastguard Worker callback_thread_.join();
136*61c4878aSAndroid Build Coastguard Worker GTEST_SKIP()
137*61c4878aSAndroid Build Coastguard Worker << "Skipping because locks are disabled, so this thread cannot yield.";
138*61c4878aSAndroid Build Coastguard Worker }
139*61c4878aSAndroid Build Coastguard Worker
140*61c4878aSAndroid Build Coastguard Worker call_1_ = TestService::TestBidirectionalStreamRpc(
141*61c4878aSAndroid Build Coastguard Worker context_.client(), context_.channel().id(), [this](ConstByteSpan) {
142*61c4878aSAndroid Build Coastguard Worker main_thread_sem_.release(); // Confirm that this thread started
143*61c4878aSAndroid Build Coastguard Worker
144*61c4878aSAndroid Build Coastguard Worker YieldToOtherThread();
145*61c4878aSAndroid Build Coastguard Worker
146*61c4878aSAndroid Build Coastguard Worker callback_executed_ = callback_executed_ + 1;
147*61c4878aSAndroid Build Coastguard Worker });
148*61c4878aSAndroid Build Coastguard Worker
149*61c4878aSAndroid Build Coastguard Worker // Start the callback thread so it can invoke the callback.
150*61c4878aSAndroid Build Coastguard Worker callback_thread_sem_.release();
151*61c4878aSAndroid Build Coastguard Worker
152*61c4878aSAndroid Build Coastguard Worker // Confirm that the callback thread started.
153*61c4878aSAndroid Build Coastguard Worker main_thread_sem_.acquire();
154*61c4878aSAndroid Build Coastguard Worker
155*61c4878aSAndroid Build Coastguard Worker // Move the call object. This thread should wait until the on_completed
156*61c4878aSAndroid Build Coastguard Worker // callback is done.
157*61c4878aSAndroid Build Coastguard Worker EXPECT_TRUE(call_1_.active());
158*61c4878aSAndroid Build Coastguard Worker call_2_ = std::move(call_1_);
159*61c4878aSAndroid Build Coastguard Worker
160*61c4878aSAndroid Build Coastguard Worker // The callback should already have finished. This thread should have waited
161*61c4878aSAndroid Build Coastguard Worker // for it to finish during the move.
162*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(callback_executed_, 1);
163*61c4878aSAndroid Build Coastguard Worker EXPECT_FALSE(call_1_.active());
164*61c4878aSAndroid Build Coastguard Worker EXPECT_TRUE(call_2_.active());
165*61c4878aSAndroid Build Coastguard Worker
166*61c4878aSAndroid Build Coastguard Worker callback_thread_.join();
167*61c4878aSAndroid Build Coastguard Worker }
168*61c4878aSAndroid Build Coastguard Worker
TEST_F(CallbacksTest,MoveOtherCallIntoOwnCallInCallback)169*61c4878aSAndroid Build Coastguard Worker TEST_F(CallbacksTest, MoveOtherCallIntoOwnCallInCallback) {
170*61c4878aSAndroid Build Coastguard Worker call_1_ = TestService::TestBidirectionalStreamRpc(
171*61c4878aSAndroid Build Coastguard Worker context_.client(), context_.channel().id(), [this](ConstByteSpan) {
172*61c4878aSAndroid Build Coastguard Worker main_thread_sem_.release(); // Confirm that this thread started
173*61c4878aSAndroid Build Coastguard Worker
174*61c4878aSAndroid Build Coastguard Worker call_1_ = std::move(call_2_);
175*61c4878aSAndroid Build Coastguard Worker
176*61c4878aSAndroid Build Coastguard Worker callback_executed_ = callback_executed_ + 1;
177*61c4878aSAndroid Build Coastguard Worker });
178*61c4878aSAndroid Build Coastguard Worker
179*61c4878aSAndroid Build Coastguard Worker call_2_ = TestService::TestBidirectionalStreamRpc(context_.client(),
180*61c4878aSAndroid Build Coastguard Worker context_.channel().id());
181*61c4878aSAndroid Build Coastguard Worker
182*61c4878aSAndroid Build Coastguard Worker EXPECT_TRUE(call_1_.active());
183*61c4878aSAndroid Build Coastguard Worker EXPECT_TRUE(call_2_.active());
184*61c4878aSAndroid Build Coastguard Worker
185*61c4878aSAndroid Build Coastguard Worker // Start the callback thread and wait for it to finish.
186*61c4878aSAndroid Build Coastguard Worker callback_thread_sem_.release();
187*61c4878aSAndroid Build Coastguard Worker callback_thread_.join();
188*61c4878aSAndroid Build Coastguard Worker
189*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(callback_executed_, 1);
190*61c4878aSAndroid Build Coastguard Worker EXPECT_TRUE(call_1_.active());
191*61c4878aSAndroid Build Coastguard Worker EXPECT_FALSE(call_2_.active());
192*61c4878aSAndroid Build Coastguard Worker }
193*61c4878aSAndroid Build Coastguard Worker
TEST_F(CallbacksTest,MoveOwnCallInCallback)194*61c4878aSAndroid Build Coastguard Worker TEST_F(CallbacksTest, MoveOwnCallInCallback) {
195*61c4878aSAndroid Build Coastguard Worker call_1_ = TestService::TestBidirectionalStreamRpc(
196*61c4878aSAndroid Build Coastguard Worker context_.client(), context_.channel().id(), [this](ConstByteSpan) {
197*61c4878aSAndroid Build Coastguard Worker main_thread_sem_.release(); // Confirm that this thread started
198*61c4878aSAndroid Build Coastguard Worker
199*61c4878aSAndroid Build Coastguard Worker // Cancel this call first, or the move will deadlock, since the moving
200*61c4878aSAndroid Build Coastguard Worker // thread will wait for the callback thread (both this thread) to
201*61c4878aSAndroid Build Coastguard Worker // terminate if the call is active.
202*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(OkStatus(), call_1_.Cancel());
203*61c4878aSAndroid Build Coastguard Worker call_2_ = std::move(call_1_);
204*61c4878aSAndroid Build Coastguard Worker
205*61c4878aSAndroid Build Coastguard Worker callback_executed_ = callback_executed_ + 1;
206*61c4878aSAndroid Build Coastguard Worker });
207*61c4878aSAndroid Build Coastguard Worker
208*61c4878aSAndroid Build Coastguard Worker call_2_ = TestService::TestBidirectionalStreamRpc(context_.client(),
209*61c4878aSAndroid Build Coastguard Worker context_.channel().id());
210*61c4878aSAndroid Build Coastguard Worker
211*61c4878aSAndroid Build Coastguard Worker EXPECT_TRUE(call_1_.active());
212*61c4878aSAndroid Build Coastguard Worker EXPECT_TRUE(call_2_.active());
213*61c4878aSAndroid Build Coastguard Worker
214*61c4878aSAndroid Build Coastguard Worker // Start the callback thread and wait for it to finish.
215*61c4878aSAndroid Build Coastguard Worker callback_thread_sem_.release();
216*61c4878aSAndroid Build Coastguard Worker callback_thread_.join();
217*61c4878aSAndroid Build Coastguard Worker
218*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(callback_executed_, 1);
219*61c4878aSAndroid Build Coastguard Worker EXPECT_FALSE(call_1_.active());
220*61c4878aSAndroid Build Coastguard Worker EXPECT_FALSE(call_2_.active());
221*61c4878aSAndroid Build Coastguard Worker }
222*61c4878aSAndroid Build Coastguard Worker
TEST_F(CallbacksTest,PacketDroppedIfOnNextIsBusy)223*61c4878aSAndroid Build Coastguard Worker TEST_F(CallbacksTest, PacketDroppedIfOnNextIsBusy) {
224*61c4878aSAndroid Build Coastguard Worker call_1_ = TestService::TestBidirectionalStreamRpc(
225*61c4878aSAndroid Build Coastguard Worker context_.client(), context_.channel().id(), [this](ConstByteSpan) {
226*61c4878aSAndroid Build Coastguard Worker main_thread_sem_.release(); // Confirm that this thread started
227*61c4878aSAndroid Build Coastguard Worker
228*61c4878aSAndroid Build Coastguard Worker callback_thread_sem_.acquire(); // Wait for the main thread to release
229*61c4878aSAndroid Build Coastguard Worker
230*61c4878aSAndroid Build Coastguard Worker callback_executed_ = callback_executed_ + 1;
231*61c4878aSAndroid Build Coastguard Worker });
232*61c4878aSAndroid Build Coastguard Worker
233*61c4878aSAndroid Build Coastguard Worker // Start the callback thread.
234*61c4878aSAndroid Build Coastguard Worker callback_thread_sem_.release();
235*61c4878aSAndroid Build Coastguard Worker
236*61c4878aSAndroid Build Coastguard Worker main_thread_sem_.acquire(); // Confirm that the callback is running
237*61c4878aSAndroid Build Coastguard Worker
238*61c4878aSAndroid Build Coastguard Worker // Handle a few packets for this call, which should be dropped since on_next
239*61c4878aSAndroid Build Coastguard Worker // is busy. callback_executed_ should remain at 1.
240*61c4878aSAndroid Build Coastguard Worker for (int i = 0; i < 5; ++i) {
241*61c4878aSAndroid Build Coastguard Worker context_.server().SendServerStream<TestService::TestBidirectionalStreamRpc>(
242*61c4878aSAndroid Build Coastguard Worker {}, call_1_.id());
243*61c4878aSAndroid Build Coastguard Worker }
244*61c4878aSAndroid Build Coastguard Worker
245*61c4878aSAndroid Build Coastguard Worker // Wait for the callback thread to finish.
246*61c4878aSAndroid Build Coastguard Worker callback_thread_sem_.release();
247*61c4878aSAndroid Build Coastguard Worker callback_thread_.join();
248*61c4878aSAndroid Build Coastguard Worker
249*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(callback_executed_, 1);
250*61c4878aSAndroid Build Coastguard Worker }
251*61c4878aSAndroid Build Coastguard Worker
252*61c4878aSAndroid Build Coastguard Worker } // namespace
253*61c4878aSAndroid Build Coastguard Worker } // namespace pw::rpc
254