xref: /aosp_15_r20/external/grpc-grpc/test/core/surface/completion_queue_threading_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <inttypes.h>
20 #include <stdlib.h>
21 
22 #include <memory>
23 
24 #include "absl/status/status.h"
25 #include "gtest/gtest.h"
26 
27 #include <grpc/grpc.h>
28 #include <grpc/support/alloc.h>
29 #include <grpc/support/log.h>
30 #include <grpc/support/sync.h>
31 #include <grpc/support/time.h>
32 
33 #include "src/core/lib/gpr/useful.h"
34 #include "src/core/lib/gprpp/crash.h"
35 #include "src/core/lib/gprpp/thd.h"
36 #include "src/core/lib/iomgr/exec_ctx.h"
37 #include "src/core/lib/surface/completion_queue.h"
38 #include "test/core/util/test_config.h"
39 
40 #define LOG_TEST(x) gpr_log(GPR_INFO, "%s", x)
41 
create_test_tag(void)42 static void* create_test_tag(void) {
43   static intptr_t i = 0;
44   return reinterpret_cast<void*>(++i);
45 }
46 
47 // helper for tests to shutdown correctly and tersely
shutdown_and_destroy(grpc_completion_queue * cc)48 static void shutdown_and_destroy(grpc_completion_queue* cc) {
49   grpc_event ev;
50   grpc_completion_queue_shutdown(cc);
51 
52   switch (grpc_get_cq_completion_type(cc)) {
53     case GRPC_CQ_NEXT: {
54       ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
55                                       nullptr);
56       break;
57     }
58     case GRPC_CQ_PLUCK: {
59       ev = grpc_completion_queue_pluck(
60           cc, create_test_tag(), gpr_inf_past(GPR_CLOCK_REALTIME), nullptr);
61       break;
62     }
63     default: {
64       gpr_log(GPR_ERROR, "Unknown completion type");
65       break;
66     }
67   }
68 
69   ASSERT_EQ(ev.type, GRPC_QUEUE_SHUTDOWN);
70   grpc_completion_queue_destroy(cc);
71 }
72 
do_nothing_end_completion(void *,grpc_cq_completion *)73 static void do_nothing_end_completion(void* /*arg*/,
74                                       grpc_cq_completion* /*c*/) {}
75 
76 struct thread_state {
77   grpc_completion_queue* cc;
78   void* tag;
79 };
80 
pluck_one(void * arg)81 static void pluck_one(void* arg) {
82   struct thread_state* state = static_cast<struct thread_state*>(arg);
83   grpc_completion_queue_pluck(state->cc, state->tag,
84                               gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
85 }
86 
test_too_many_plucks(void)87 static void test_too_many_plucks(void) {
88   grpc_event ev;
89   grpc_completion_queue* cc;
90   void* tags[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
91   grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)];
92   grpc_core::Thread threads[GPR_ARRAY_SIZE(tags)];
93   struct thread_state thread_states[GPR_ARRAY_SIZE(tags)];
94   grpc_core::ExecCtx exec_ctx;
95   unsigned i, j;
96 
97   LOG_TEST("test_too_many_plucks");
98 
99   cc = grpc_completion_queue_create_for_pluck(nullptr);
100 
101   for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
102     tags[i] = create_test_tag();
103     for (j = 0; j < i; j++) {
104       ASSERT_NE(tags[i], tags[j]);
105     }
106     thread_states[i].cc = cc;
107     thread_states[i].tag = tags[i];
108     threads[i] =
109         grpc_core::Thread("grpc_pluck_test", pluck_one, thread_states + i);
110     threads[i].Start();
111   }
112 
113   // wait until all other threads are plucking
114   gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(1000));
115 
116   ev = grpc_completion_queue_pluck(cc, create_test_tag(),
117                                    gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
118   ASSERT_EQ(ev.type, GRPC_QUEUE_TIMEOUT);
119 
120   for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
121     ASSERT_TRUE(grpc_cq_begin_op(cc, tags[i]));
122     grpc_cq_end_op(cc, tags[i], absl::OkStatus(), do_nothing_end_completion,
123                    nullptr, &completions[i]);
124   }
125 
126   for (auto& th : threads) {
127     th.Join();
128   }
129 
130   shutdown_and_destroy(cc);
131 }
132 
133 #define TEST_THREAD_EVENTS 10000
134 
135 typedef struct test_thread_options {
136   gpr_event on_started;
137   gpr_event* phase1;
138   gpr_event on_phase1_done;
139   gpr_event* phase2;
140   gpr_event on_finished;
141   size_t events_triggered;
142   int id;
143   grpc_completion_queue* cc;
144 } test_thread_options;
145 
ten_seconds_time(void)146 gpr_timespec ten_seconds_time(void) {
147   return grpc_timeout_seconds_to_deadline(10);
148 }
149 
free_completion(void *,grpc_cq_completion * completion)150 static void free_completion(void* /*arg*/, grpc_cq_completion* completion) {
151   gpr_free(completion);
152 }
153 
producer_thread(void * arg)154 static void producer_thread(void* arg) {
155   test_thread_options* opt = static_cast<test_thread_options*>(arg);
156   int i;
157 
158   gpr_log(GPR_INFO, "producer %d started", opt->id);
159   gpr_event_set(&opt->on_started, reinterpret_cast<void*>(1));
160   ASSERT_TRUE(gpr_event_wait(opt->phase1, ten_seconds_time()));
161 
162   gpr_log(GPR_INFO, "producer %d phase 1", opt->id);
163   for (i = 0; i < TEST_THREAD_EVENTS; i++) {
164     ASSERT_TRUE(grpc_cq_begin_op(opt->cc, (void*)(intptr_t)1));
165   }
166 
167   gpr_log(GPR_INFO, "producer %d phase 1 done", opt->id);
168   gpr_event_set(&opt->on_phase1_done, reinterpret_cast<void*>(1));
169   ASSERT_TRUE(gpr_event_wait(opt->phase2, ten_seconds_time()));
170 
171   gpr_log(GPR_INFO, "producer %d phase 2", opt->id);
172   for (i = 0; i < TEST_THREAD_EVENTS; i++) {
173     grpc_core::ExecCtx exec_ctx;
174     grpc_cq_end_op(opt->cc, reinterpret_cast<void*>(1), absl::OkStatus(),
175                    free_completion, nullptr,
176                    static_cast<grpc_cq_completion*>(
177                        gpr_malloc(sizeof(grpc_cq_completion))));
178     opt->events_triggered++;
179   }
180 
181   gpr_log(GPR_INFO, "producer %d phase 2 done", opt->id);
182   gpr_event_set(&opt->on_finished, reinterpret_cast<void*>(1));
183 }
184 
consumer_thread(void * arg)185 static void consumer_thread(void* arg) {
186   test_thread_options* opt = static_cast<test_thread_options*>(arg);
187   grpc_event ev;
188 
189   gpr_log(GPR_INFO, "consumer %d started", opt->id);
190   gpr_event_set(&opt->on_started, reinterpret_cast<void*>(1));
191   ASSERT_TRUE(gpr_event_wait(opt->phase1, ten_seconds_time()));
192 
193   gpr_log(GPR_INFO, "consumer %d phase 1", opt->id);
194 
195   gpr_log(GPR_INFO, "consumer %d phase 1 done", opt->id);
196   gpr_event_set(&opt->on_phase1_done, reinterpret_cast<void*>(1));
197   ASSERT_TRUE(gpr_event_wait(opt->phase2, ten_seconds_time()));
198 
199   gpr_log(GPR_INFO, "consumer %d phase 2", opt->id);
200   for (;;) {
201     ev = grpc_completion_queue_next(
202         opt->cc, gpr_inf_future(GPR_CLOCK_MONOTONIC), nullptr);
203     switch (ev.type) {
204       case GRPC_OP_COMPLETE:
205         ASSERT_TRUE(ev.success);
206         opt->events_triggered++;
207         break;
208       case GRPC_QUEUE_SHUTDOWN:
209         gpr_log(GPR_INFO, "consumer %d phase 2 done", opt->id);
210         gpr_event_set(&opt->on_finished, reinterpret_cast<void*>(1));
211         return;
212       case GRPC_QUEUE_TIMEOUT:
213         grpc_core::Crash("Invalid timeout received");
214     }
215   }
216 }
217 
test_threading(size_t producers,size_t consumers)218 static void test_threading(size_t producers, size_t consumers) {
219   test_thread_options* options = static_cast<test_thread_options*>(
220       gpr_malloc((producers + consumers) * sizeof(test_thread_options)));
221   gpr_event phase1 = GPR_EVENT_INIT;
222   gpr_event phase2 = GPR_EVENT_INIT;
223   grpc_completion_queue* cc = grpc_completion_queue_create_for_next(nullptr);
224   size_t i;
225   size_t total_consumed = 0;
226   static int optid = 101;
227 
228   gpr_log(GPR_INFO, "%s: %" PRIuPTR " producers, %" PRIuPTR " consumers",
229           "test_threading", producers, consumers);
230 
231   // start all threads: they will wait for phase1
232   grpc_core::Thread* threads = static_cast<grpc_core::Thread*>(
233       gpr_malloc(sizeof(*threads) * (producers + consumers)));
234   for (i = 0; i < producers + consumers; i++) {
235     gpr_event_init(&options[i].on_started);
236     gpr_event_init(&options[i].on_phase1_done);
237     gpr_event_init(&options[i].on_finished);
238     options[i].phase1 = &phase1;
239     options[i].phase2 = &phase2;
240     options[i].events_triggered = 0;
241     options[i].cc = cc;
242     options[i].id = optid++;
243 
244     bool ok;
245     threads[i] = grpc_core::Thread(
246         i < producers ? "grpc_producer" : "grpc_consumer",
247         i < producers ? producer_thread : consumer_thread, options + i, &ok);
248     ASSERT_TRUE(ok);
249     threads[i].Start();
250     gpr_event_wait(&options[i].on_started, ten_seconds_time());
251   }
252 
253   // start phase1: producers will pre-declare all operations they will
254   // complete
255   gpr_log(GPR_INFO, "start phase 1");
256   gpr_event_set(&phase1, reinterpret_cast<void*>(1));
257 
258   gpr_log(GPR_INFO, "wait phase 1");
259   for (i = 0; i < producers + consumers; i++) {
260     ASSERT_TRUE(gpr_event_wait(&options[i].on_phase1_done, ten_seconds_time()));
261   }
262   gpr_log(GPR_INFO, "done phase 1");
263 
264   // start phase2: operations will complete, and consumers will consume them
265   gpr_log(GPR_INFO, "start phase 2");
266   gpr_event_set(&phase2, reinterpret_cast<void*>(1));
267 
268   // in parallel, we shutdown the completion channel - all events should still
269   // be consumed
270   grpc_completion_queue_shutdown(cc);
271 
272   // join all threads
273   gpr_log(GPR_INFO, "wait phase 2");
274   for (i = 0; i < producers + consumers; i++) {
275     ASSERT_TRUE(gpr_event_wait(&options[i].on_finished, ten_seconds_time()));
276   }
277   gpr_log(GPR_INFO, "done phase 2");
278 
279   // destroy the completion channel
280   grpc_completion_queue_destroy(cc);
281 
282   for (i = 0; i < producers + consumers; i++) {
283     threads[i].Join();
284   }
285   gpr_free(threads);
286 
287   // verify that everything was produced and consumed
288   for (i = 0; i < producers + consumers; i++) {
289     if (i < producers) {
290       ASSERT_EQ(options[i].events_triggered, TEST_THREAD_EVENTS);
291     } else {
292       total_consumed += options[i].events_triggered;
293     }
294   }
295   ASSERT_EQ(total_consumed, producers * TEST_THREAD_EVENTS);
296 
297   gpr_free(options);
298 }
299 
TEST(CompletionQueueThreadingTest,MainTest)300 TEST(CompletionQueueThreadingTest, MainTest) {
301   grpc_init();
302   test_too_many_plucks();
303   test_threading(1, 1);
304   test_threading(1, 10);
305   test_threading(10, 1);
306   test_threading(10, 10);
307   grpc_shutdown();
308 }
309 
main(int argc,char ** argv)310 int main(int argc, char** argv) {
311   grpc::testing::TestEnvironment env(&argc, argv);
312   ::testing::InitGoogleTest(&argc, argv);
313   grpc::testing::TestGrpcScope grpc_scope;
314   return RUN_ALL_TESTS();
315 }
316