1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/lib/promise/observable.h"
16
17 #include <cstdint>
18 #include <limits>
19 #include <thread>
20 #include <vector>
21
22 #include "absl/strings/str_join.h"
23 #include "gmock/gmock.h"
24 #include "gtest/gtest.h"
25
26 #include "src/core/lib/gprpp/notification.h"
27 #include "src/core/lib/promise/loop.h"
28 #include "src/core/lib/promise/map.h"
29
30 using testing::Mock;
31 using testing::StrictMock;
32
33 namespace grpc_core {
34 namespace {
35
36 class MockActivity : public Activity, public Wakeable {
37 public:
38 MOCK_METHOD(void, WakeupRequested, ());
39
ForceImmediateRepoll(WakeupMask)40 void ForceImmediateRepoll(WakeupMask) override { WakeupRequested(); }
Orphan()41 void Orphan() override {}
MakeOwningWaker()42 Waker MakeOwningWaker() override { return Waker(this, 0); }
MakeNonOwningWaker()43 Waker MakeNonOwningWaker() override { return Waker(this, 0); }
Wakeup(WakeupMask)44 void Wakeup(WakeupMask) override { WakeupRequested(); }
WakeupAsync(WakeupMask)45 void WakeupAsync(WakeupMask) override { WakeupRequested(); }
Drop(WakeupMask)46 void Drop(WakeupMask) override {}
DebugTag() const47 std::string DebugTag() const override { return "MockActivity"; }
ActivityDebugTag(WakeupMask) const48 std::string ActivityDebugTag(WakeupMask) const override { return DebugTag(); }
49
Activate()50 void Activate() {
51 if (scoped_activity_ != nullptr) return;
52 scoped_activity_ = std::make_unique<ScopedActivity>(this);
53 }
54
Deactivate()55 void Deactivate() { scoped_activity_.reset(); }
56
57 private:
58 std::unique_ptr<ScopedActivity> scoped_activity_;
59 };
60
61 MATCHER(IsPending, "") {
62 if (arg.ready()) {
63 *result_listener << "is ready";
64 return false;
65 }
66 return true;
67 }
68
69 MATCHER(IsReady, "") {
70 if (arg.pending()) {
71 *result_listener << "is pending";
72 return false;
73 }
74 return true;
75 }
76
77 MATCHER_P(IsReady, value, "") {
78 if (arg.pending()) {
79 *result_listener << "is pending";
80 return false;
81 }
82 if (arg.value() != value) {
83 *result_listener << "is " << ::testing::PrintToString(arg.value());
84 return false;
85 }
86 return true;
87 }
88
TEST(ObservableTest,ImmediateNext)89 TEST(ObservableTest, ImmediateNext) {
90 Observable<int> observable(1);
91 auto next = observable.Next(0);
92 EXPECT_THAT(next(), IsReady(1));
93 }
94
TEST(ObservableTest,SetBecomesImmediateNext1)95 TEST(ObservableTest, SetBecomesImmediateNext1) {
96 Observable<int> observable(0);
97 auto next = observable.Next(0);
98 observable.Set(1);
99 EXPECT_THAT(next(), IsReady(1));
100 }
101
TEST(ObservableTest,SetBecomesImmediateNext2)102 TEST(ObservableTest, SetBecomesImmediateNext2) {
103 Observable<int> observable(0);
104 observable.Set(1);
105 auto next = observable.Next(0);
106 EXPECT_THAT(next(), IsReady(1));
107 }
108
TEST(ObservableTest,SameValueGetsPending)109 TEST(ObservableTest, SameValueGetsPending) {
110 StrictMock<MockActivity> activity;
111 activity.Activate();
112 Observable<int> observable(1);
113 auto next = observable.Next(1);
114 EXPECT_THAT(next(), IsPending());
115 EXPECT_THAT(next(), IsPending());
116 EXPECT_THAT(next(), IsPending());
117 EXPECT_THAT(next(), IsPending());
118 }
119
TEST(ObservableTest,ChangeValueWakesUp)120 TEST(ObservableTest, ChangeValueWakesUp) {
121 StrictMock<MockActivity> activity;
122 activity.Activate();
123 Observable<int> observable(1);
124 auto next = observable.Next(1);
125 EXPECT_THAT(next(), IsPending());
126 EXPECT_CALL(activity, WakeupRequested());
127 observable.Set(2);
128 Mock::VerifyAndClearExpectations(&activity);
129 EXPECT_THAT(next(), IsReady(2));
130 }
131
TEST(ObservableTest,NextWhen)132 TEST(ObservableTest, NextWhen) {
133 StrictMock<MockActivity> activity;
134 activity.Activate();
135 Observable<int> observable(1);
136 auto next = observable.NextWhen([](int i) { return i == 3; });
137 EXPECT_THAT(next(), IsPending());
138 EXPECT_CALL(activity, WakeupRequested());
139 observable.Set(2);
140 EXPECT_THAT(next(), IsPending());
141 EXPECT_CALL(activity, WakeupRequested());
142 observable.Set(3);
143 Mock::VerifyAndClearExpectations(&activity);
144 EXPECT_THAT(next(), IsReady(3));
145 }
146
TEST(ObservableTest,MultipleActivitiesWakeUp)147 TEST(ObservableTest, MultipleActivitiesWakeUp) {
148 StrictMock<MockActivity> activity1;
149 StrictMock<MockActivity> activity2;
150 Observable<int> observable(1);
151 auto next1 = observable.Next(1);
152 auto next2 = observable.Next(1);
153 {
154 activity1.Activate();
155 EXPECT_THAT(next1(), IsPending());
156 }
157 {
158 activity2.Activate();
159 EXPECT_THAT(next2(), IsPending());
160 }
161 EXPECT_CALL(activity1, WakeupRequested());
162 EXPECT_CALL(activity2, WakeupRequested());
163 observable.Set(2);
164 Mock::VerifyAndClearExpectations(&activity1);
165 Mock::VerifyAndClearExpectations(&activity2);
166 EXPECT_THAT(next1(), IsReady(2));
167 EXPECT_THAT(next2(), IsReady(2));
168 }
169
170 class ThreadWakeupScheduler {
171 public:
172 template <typename ActivityType>
173 class BoundScheduler {
174 public:
BoundScheduler(ThreadWakeupScheduler)175 explicit BoundScheduler(ThreadWakeupScheduler) {}
ScheduleWakeup()176 void ScheduleWakeup() {
177 std::thread t(
178 [this] { static_cast<ActivityType*>(this)->RunScheduledWakeup(); });
179 t.detach();
180 }
181 };
182 };
183
TEST(ObservableTest,Stress)184 TEST(ObservableTest, Stress) {
185 static constexpr uint64_t kEnd = std::numeric_limits<uint64_t>::max();
186 std::vector<uint64_t> values1;
187 std::vector<uint64_t> values2;
188 uint64_t current1 = 0;
189 uint64_t current2 = 0;
190 Notification done1;
191 Notification done2;
192 Observable<uint64_t> observable(0);
193 auto activity1 = MakeActivity(
194 Loop([&observable, ¤t1, &values1] {
195 return Map(
196 observable.Next(current1),
197 [&values1, ¤t1](uint64_t value) -> LoopCtl<absl::Status> {
198 values1.push_back(value);
199 current1 = value;
200 if (value == kEnd) return absl::OkStatus();
201 return Continue{};
202 });
203 }),
204 ThreadWakeupScheduler(), [&done1](absl::Status status) {
205 EXPECT_TRUE(status.ok()) << status.ToString();
206 done1.Notify();
207 });
208 auto activity2 = MakeActivity(
209 Loop([&observable, ¤t2, &values2] {
210 return Map(
211 observable.Next(current2),
212 [&values2, ¤t2](uint64_t value) -> LoopCtl<absl::Status> {
213 values2.push_back(value);
214 current2 = value;
215 if (value == kEnd) return absl::OkStatus();
216 return Continue{};
217 });
218 }),
219 ThreadWakeupScheduler(), [&done2](absl::Status status) {
220 EXPECT_TRUE(status.ok()) << status.ToString();
221 done2.Notify();
222 });
223 for (uint64_t i = 0; i < 1000000; i++) {
224 observable.Set(i);
225 }
226 observable.Set(kEnd);
227 done1.WaitForNotification();
228 done2.WaitForNotification();
229 ASSERT_GE(values1.size(), 1);
230 ASSERT_GE(values2.size(), 1);
231 EXPECT_EQ(values1.back(), kEnd);
232 EXPECT_EQ(values2.back(), kEnd);
233 }
234
235 } // namespace
236 } // namespace grpc_core
237
main(int argc,char ** argv)238 int main(int argc, char** argv) {
239 gpr_log_verbosity_init();
240 ::testing::InitGoogleTest(&argc, argv);
241 return RUN_ALL_TESTS();
242 }
243