xref: /aosp_15_r20/external/grpc-grpc/test/core/iomgr/combiner_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "src/core/lib/iomgr/combiner.h"
20 
21 #include <thread>
22 
23 #include <gtest/gtest.h>
24 
25 #include <grpc/grpc.h>
26 #include <grpc/support/alloc.h>
27 #include <grpc/support/log.h>
28 
29 #include "src/core/lib/gpr/useful.h"
30 #include "src/core/lib/gprpp/crash.h"
31 #include "src/core/lib/gprpp/notification.h"
32 #include "src/core/lib/gprpp/thd.h"
33 #include "test/core/util/test_config.h"
34 
TEST(CombinerTest,TestNoOp)35 TEST(CombinerTest, TestNoOp) {
36   grpc_core::ExecCtx exec_ctx;
37   GRPC_COMBINER_UNREF(grpc_combiner_create(
38                           grpc_event_engine::experimental::CreateEventEngine()),
39                       "test_no_op");
40 }
41 
set_event_to_true(void * value,grpc_error_handle)42 static void set_event_to_true(void* value, grpc_error_handle /*error*/) {
43   gpr_event_set(static_cast<gpr_event*>(value), reinterpret_cast<void*>(1));
44 }
45 
TEST(CombinerTest,TestExecuteOne)46 TEST(CombinerTest, TestExecuteOne) {
47   grpc_core::Combiner* lock = grpc_combiner_create(
48       grpc_event_engine::experimental::CreateEventEngine());
49   gpr_event done;
50   gpr_event_init(&done);
51   grpc_core::ExecCtx exec_ctx;
52   lock->Run(GRPC_CLOSURE_CREATE(set_event_to_true, &done, nullptr),
53             absl::OkStatus());
54   grpc_core::ExecCtx::Get()->Flush();
55   ASSERT_NE(gpr_event_wait(&done, grpc_timeout_seconds_to_deadline(5)),
56             nullptr);
57   GRPC_COMBINER_UNREF(lock, "test_execute_one");
58 }
59 
60 typedef struct {
61   size_t ctr;
62   grpc_core::Combiner* lock;
63   gpr_event done;
64 } thd_args;
65 
66 typedef struct {
67   size_t* ctr;
68   size_t value;
69 } ex_args;
70 
check_one(void * a,grpc_error_handle)71 static void check_one(void* a, grpc_error_handle /*error*/) {
72   ex_args* args = static_cast<ex_args*>(a);
73   ASSERT_EQ(*args->ctr, args->value - 1);
74   *args->ctr = args->value;
75   gpr_free(a);
76 }
77 
execute_many_loop(void * a)78 static void execute_many_loop(void* a) {
79   thd_args* args = static_cast<thd_args*>(a);
80   grpc_core::ExecCtx exec_ctx;
81   size_t n = 1;
82   for (size_t i = 0; i < 10; i++) {
83     for (size_t j = 0; j < 10000; j++) {
84       ex_args* c = static_cast<ex_args*>(gpr_malloc(sizeof(*c)));
85       c->ctr = &args->ctr;
86       c->value = n++;
87       args->lock->Run(GRPC_CLOSURE_CREATE(check_one, c, nullptr),
88                       absl::OkStatus());
89       grpc_core::ExecCtx::Get()->Flush();
90     }
91     // sleep for a little bit, to test a combiner draining and another thread
92     // picking it up
93     gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(100));
94   }
95   args->lock->Run(GRPC_CLOSURE_CREATE(set_event_to_true, &args->done, nullptr),
96                   absl::OkStatus());
97 }
98 
TEST(CombinerTest,TestExecuteMany)99 TEST(CombinerTest, TestExecuteMany) {
100   grpc_core::Combiner* lock = grpc_combiner_create(
101       grpc_event_engine::experimental::CreateEventEngine());
102   grpc_core::Thread thds[10];
103   thd_args ta[GPR_ARRAY_SIZE(thds)];
104   for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) {
105     ta[i].ctr = 0;
106     ta[i].lock = lock;
107     gpr_event_init(&ta[i].done);
108     thds[i] = grpc_core::Thread("grpc_execute_many", execute_many_loop, &ta[i]);
109     thds[i].Start();
110   }
111   for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) {
112     ASSERT_NE(gpr_event_wait(&ta[i].done, gpr_inf_future(GPR_CLOCK_REALTIME)),
113               nullptr);
114     thds[i].Join();
115   }
116   grpc_core::ExecCtx exec_ctx;
117   GRPC_COMBINER_UNREF(lock, "test_execute_many");
118 }
119 
120 static gpr_event got_in_finally;
121 
in_finally(void *,grpc_error_handle)122 static void in_finally(void* /*arg*/, grpc_error_handle /*error*/) {
123   gpr_event_set(&got_in_finally, reinterpret_cast<void*>(1));
124 }
125 
add_finally(void * arg,grpc_error_handle)126 static void add_finally(void* arg, grpc_error_handle /*error*/) {
127   static_cast<grpc_core::Combiner*>(arg)->FinallyRun(
128       GRPC_CLOSURE_CREATE(in_finally, arg, nullptr), absl::OkStatus());
129 }
130 
TEST(CombinerTest,TestExecuteFinally)131 TEST(CombinerTest, TestExecuteFinally) {
132   grpc_core::Combiner* lock = grpc_combiner_create(
133       grpc_event_engine::experimental::CreateEventEngine());
134   grpc_core::ExecCtx exec_ctx;
135   gpr_event_init(&got_in_finally);
136   lock->Run(GRPC_CLOSURE_CREATE(add_finally, lock, nullptr), absl::OkStatus());
137   grpc_core::ExecCtx::Get()->Flush();
138   ASSERT_NE(
139       gpr_event_wait(&got_in_finally, grpc_timeout_seconds_to_deadline(5)),
140       nullptr);
141   GRPC_COMBINER_UNREF(lock, "test_execute_finally");
142 }
143 
TEST(CombinerTest,TestForceOffload)144 TEST(CombinerTest, TestForceOffload) {
145   grpc_core::Combiner* lock = grpc_combiner_create(
146       grpc_event_engine::experimental::CreateEventEngine());
147   grpc_core::ExecCtx exec_ctx;
148   grpc_core::Notification done;
149   const auto start_thread = std::this_thread::get_id();
150   lock->Run(grpc_core::NewClosure([&](grpc_error_handle) {
151               // Initial execution should get done in the exec ctx flush below,
152               // so thread stays the same.
153               EXPECT_EQ(start_thread, std::this_thread::get_id());
154               lock->Run(grpc_core::NewClosure([&](grpc_error_handle) {
155                           // Next one should stick to the same thread too
156                           // (proves we're not offloading all the time).
157                           EXPECT_EQ(start_thread, std::this_thread::get_id());
158                           // Force the offload.
159                           lock->ForceOffload();
160                           lock->Run(
161                               grpc_core::NewClosure([&](grpc_error_handle) {
162                                 // We should see *not* the starting thread being
163                                 // the executor now.
164                                 EXPECT_NE(start_thread,
165                                           std::this_thread::get_id());
166                                 done.Notify();
167                               }),
168                               absl::OkStatus());
169                         }),
170                         absl::OkStatus());
171             }),
172             absl::OkStatus());
173   exec_ctx.Flush();
174   done.WaitForNotification();
175   GRPC_COMBINER_UNREF(lock, "test_force_offload");
176 }
177 
main(int argc,char ** argv)178 int main(int argc, char** argv) {
179   grpc::testing::TestEnvironment env(&argc, argv);
180   ::testing::InitGoogleTest(&argc, argv);
181   grpc::testing::TestGrpcScope grpc_scope;
182   return RUN_ALL_TESTS();
183 }
184