xref: /aosp_15_r20/external/grpc-grpc/test/core/promise/observable_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/core/lib/promise/observable.h"
16 
17 #include <cstdint>
18 #include <limits>
19 #include <thread>
20 #include <vector>
21 
22 #include "absl/strings/str_join.h"
23 #include "gmock/gmock.h"
24 #include "gtest/gtest.h"
25 
26 #include "src/core/lib/gprpp/notification.h"
27 #include "src/core/lib/promise/loop.h"
28 #include "src/core/lib/promise/map.h"
29 
30 using testing::Mock;
31 using testing::StrictMock;
32 
33 namespace grpc_core {
34 namespace {
35 
36 class MockActivity : public Activity, public Wakeable {
37  public:
38   MOCK_METHOD(void, WakeupRequested, ());
39 
ForceImmediateRepoll(WakeupMask)40   void ForceImmediateRepoll(WakeupMask) override { WakeupRequested(); }
Orphan()41   void Orphan() override {}
MakeOwningWaker()42   Waker MakeOwningWaker() override { return Waker(this, 0); }
MakeNonOwningWaker()43   Waker MakeNonOwningWaker() override { return Waker(this, 0); }
Wakeup(WakeupMask)44   void Wakeup(WakeupMask) override { WakeupRequested(); }
WakeupAsync(WakeupMask)45   void WakeupAsync(WakeupMask) override { WakeupRequested(); }
Drop(WakeupMask)46   void Drop(WakeupMask) override {}
DebugTag() const47   std::string DebugTag() const override { return "MockActivity"; }
ActivityDebugTag(WakeupMask) const48   std::string ActivityDebugTag(WakeupMask) const override { return DebugTag(); }
49 
Activate()50   void Activate() {
51     if (scoped_activity_ != nullptr) return;
52     scoped_activity_ = std::make_unique<ScopedActivity>(this);
53   }
54 
Deactivate()55   void Deactivate() { scoped_activity_.reset(); }
56 
57  private:
58   std::unique_ptr<ScopedActivity> scoped_activity_;
59 };
60 
61 MATCHER(IsPending, "") {
62   if (arg.ready()) {
63     *result_listener << "is ready";
64     return false;
65   }
66   return true;
67 }
68 
69 MATCHER(IsReady, "") {
70   if (arg.pending()) {
71     *result_listener << "is pending";
72     return false;
73   }
74   return true;
75 }
76 
77 MATCHER_P(IsReady, value, "") {
78   if (arg.pending()) {
79     *result_listener << "is pending";
80     return false;
81   }
82   if (arg.value() != value) {
83     *result_listener << "is " << ::testing::PrintToString(arg.value());
84     return false;
85   }
86   return true;
87 }
88 
TEST(ObservableTest,ImmediateNext)89 TEST(ObservableTest, ImmediateNext) {
90   Observable<int> observable(1);
91   auto next = observable.Next(0);
92   EXPECT_THAT(next(), IsReady(1));
93 }
94 
TEST(ObservableTest,SetBecomesImmediateNext1)95 TEST(ObservableTest, SetBecomesImmediateNext1) {
96   Observable<int> observable(0);
97   auto next = observable.Next(0);
98   observable.Set(1);
99   EXPECT_THAT(next(), IsReady(1));
100 }
101 
TEST(ObservableTest,SetBecomesImmediateNext2)102 TEST(ObservableTest, SetBecomesImmediateNext2) {
103   Observable<int> observable(0);
104   observable.Set(1);
105   auto next = observable.Next(0);
106   EXPECT_THAT(next(), IsReady(1));
107 }
108 
TEST(ObservableTest,SameValueGetsPending)109 TEST(ObservableTest, SameValueGetsPending) {
110   StrictMock<MockActivity> activity;
111   activity.Activate();
112   Observable<int> observable(1);
113   auto next = observable.Next(1);
114   EXPECT_THAT(next(), IsPending());
115   EXPECT_THAT(next(), IsPending());
116   EXPECT_THAT(next(), IsPending());
117   EXPECT_THAT(next(), IsPending());
118 }
119 
TEST(ObservableTest,ChangeValueWakesUp)120 TEST(ObservableTest, ChangeValueWakesUp) {
121   StrictMock<MockActivity> activity;
122   activity.Activate();
123   Observable<int> observable(1);
124   auto next = observable.Next(1);
125   EXPECT_THAT(next(), IsPending());
126   EXPECT_CALL(activity, WakeupRequested());
127   observable.Set(2);
128   Mock::VerifyAndClearExpectations(&activity);
129   EXPECT_THAT(next(), IsReady(2));
130 }
131 
TEST(ObservableTest,NextWhen)132 TEST(ObservableTest, NextWhen) {
133   StrictMock<MockActivity> activity;
134   activity.Activate();
135   Observable<int> observable(1);
136   auto next = observable.NextWhen([](int i) { return i == 3; });
137   EXPECT_THAT(next(), IsPending());
138   EXPECT_CALL(activity, WakeupRequested());
139   observable.Set(2);
140   EXPECT_THAT(next(), IsPending());
141   EXPECT_CALL(activity, WakeupRequested());
142   observable.Set(3);
143   Mock::VerifyAndClearExpectations(&activity);
144   EXPECT_THAT(next(), IsReady(3));
145 }
146 
TEST(ObservableTest,MultipleActivitiesWakeUp)147 TEST(ObservableTest, MultipleActivitiesWakeUp) {
148   StrictMock<MockActivity> activity1;
149   StrictMock<MockActivity> activity2;
150   Observable<int> observable(1);
151   auto next1 = observable.Next(1);
152   auto next2 = observable.Next(1);
153   {
154     activity1.Activate();
155     EXPECT_THAT(next1(), IsPending());
156   }
157   {
158     activity2.Activate();
159     EXPECT_THAT(next2(), IsPending());
160   }
161   EXPECT_CALL(activity1, WakeupRequested());
162   EXPECT_CALL(activity2, WakeupRequested());
163   observable.Set(2);
164   Mock::VerifyAndClearExpectations(&activity1);
165   Mock::VerifyAndClearExpectations(&activity2);
166   EXPECT_THAT(next1(), IsReady(2));
167   EXPECT_THAT(next2(), IsReady(2));
168 }
169 
170 class ThreadWakeupScheduler {
171  public:
172   template <typename ActivityType>
173   class BoundScheduler {
174    public:
BoundScheduler(ThreadWakeupScheduler)175     explicit BoundScheduler(ThreadWakeupScheduler) {}
ScheduleWakeup()176     void ScheduleWakeup() {
177       std::thread t(
178           [this] { static_cast<ActivityType*>(this)->RunScheduledWakeup(); });
179       t.detach();
180     }
181   };
182 };
183 
TEST(ObservableTest,Stress)184 TEST(ObservableTest, Stress) {
185   static constexpr uint64_t kEnd = std::numeric_limits<uint64_t>::max();
186   std::vector<uint64_t> values1;
187   std::vector<uint64_t> values2;
188   uint64_t current1 = 0;
189   uint64_t current2 = 0;
190   Notification done1;
191   Notification done2;
192   Observable<uint64_t> observable(0);
193   auto activity1 = MakeActivity(
194       Loop([&observable, &current1, &values1] {
195         return Map(
196             observable.Next(current1),
197             [&values1, &current1](uint64_t value) -> LoopCtl<absl::Status> {
198               values1.push_back(value);
199               current1 = value;
200               if (value == kEnd) return absl::OkStatus();
201               return Continue{};
202             });
203       }),
204       ThreadWakeupScheduler(), [&done1](absl::Status status) {
205         EXPECT_TRUE(status.ok()) << status.ToString();
206         done1.Notify();
207       });
208   auto activity2 = MakeActivity(
209       Loop([&observable, &current2, &values2] {
210         return Map(
211             observable.Next(current2),
212             [&values2, &current2](uint64_t value) -> LoopCtl<absl::Status> {
213               values2.push_back(value);
214               current2 = value;
215               if (value == kEnd) return absl::OkStatus();
216               return Continue{};
217             });
218       }),
219       ThreadWakeupScheduler(), [&done2](absl::Status status) {
220         EXPECT_TRUE(status.ok()) << status.ToString();
221         done2.Notify();
222       });
223   for (uint64_t i = 0; i < 1000000; i++) {
224     observable.Set(i);
225   }
226   observable.Set(kEnd);
227   done1.WaitForNotification();
228   done2.WaitForNotification();
229   ASSERT_GE(values1.size(), 1);
230   ASSERT_GE(values2.size(), 1);
231   EXPECT_EQ(values1.back(), kEnd);
232   EXPECT_EQ(values2.back(), kEnd);
233 }
234 
235 }  // namespace
236 }  // namespace grpc_core
237 
main(int argc,char ** argv)238 int main(int argc, char** argv) {
239   gpr_log_verbosity_init();
240   ::testing::InitGoogleTest(&argc, argv);
241   return RUN_ALL_TESTS();
242 }
243