1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/timer_manager.h"
22
23 #include <inttypes.h>
24
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/log.h>
27
28 #include "src/core/lib/debug/trace.h"
29 #include "src/core/lib/gprpp/crash.h"
30 #include "src/core/lib/gprpp/thd.h"
31 #include "src/core/lib/iomgr/timer.h"
32
33 struct completed_thread {
34 grpc_core::Thread thd;
35 completed_thread* next;
36 };
37
38 extern grpc_core::TraceFlag grpc_timer_check_trace;
39
40 // global mutex
41 static gpr_mu g_mu;
42 // are we multi-threaded
43 static bool g_threaded;
44 // should we start multi-threaded
45 static bool g_start_threaded = true;
46 // cv to wait until a thread is needed
47 static gpr_cv g_cv_wait;
48 // cv for notification when threading ends
49 static gpr_cv g_cv_shutdown;
50 // number of threads in the system
51 static int g_thread_count;
52 // number of threads sitting around waiting
53 static int g_waiter_count;
54 // linked list of threads that have completed (and need joining)
55 static completed_thread* g_completed_threads;
56 // was the manager kicked by the timer system
57 static bool g_kicked;
58 // is there a thread waiting until the next timer should fire?
59 static bool g_has_timed_waiter;
60 // the deadline of the current timed waiter thread (only relevant if
61 // g_has_timed_waiter is true)
62 static grpc_core::Timestamp g_timed_waiter_deadline;
63 // generation counter to track which thread is waiting for the next timer
64 static uint64_t g_timed_waiter_generation;
65 // number of timer wakeups
66 static uint64_t g_wakeups;
67
68 static void timer_thread(void* completed_thread_ptr);
69
gc_completed_threads(void)70 static void gc_completed_threads(void) {
71 if (g_completed_threads != nullptr) {
72 completed_thread* to_gc = g_completed_threads;
73 g_completed_threads = nullptr;
74 gpr_mu_unlock(&g_mu);
75 while (to_gc != nullptr) {
76 to_gc->thd.Join();
77 completed_thread* next = to_gc->next;
78 gpr_free(to_gc);
79 to_gc = next;
80 }
81 gpr_mu_lock(&g_mu);
82 }
83 }
84
start_timer_thread_and_unlock(void)85 static void start_timer_thread_and_unlock(void) {
86 GPR_ASSERT(g_threaded);
87 ++g_waiter_count;
88 ++g_thread_count;
89 gpr_mu_unlock(&g_mu);
90 if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
91 gpr_log(GPR_INFO, "Spawn timer thread");
92 }
93 completed_thread* ct =
94 static_cast<completed_thread*>(gpr_malloc(sizeof(*ct)));
95 ct->thd = grpc_core::Thread("grpc_global_timer", timer_thread, ct);
96 ct->thd.Start();
97 }
98
grpc_timer_manager_tick()99 void grpc_timer_manager_tick() {
100 grpc_core::ExecCtx exec_ctx;
101 grpc_timer_check(nullptr);
102 }
103
run_some_timers()104 static void run_some_timers() {
105 // In the case of timers, the ExecCtx for the thread is declared
106 // in the timer thread itself, but this is the point where we
107 // could start seeing application-level callbacks. No need to
108 // create a new ExecCtx, though, since there already is one and it is
109 // flushed (but not destructed) in this function itself
110 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx(
111 GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD);
112
113 // if there's something to execute...
114 gpr_mu_lock(&g_mu);
115 // remove a waiter from the pool, and start another thread if necessary
116 --g_waiter_count;
117 if (g_waiter_count == 0 && g_threaded) {
118 // The number of timer threads is always increasing until all the threads
119 // are stopped. In rare cases, if a large number of timers fire
120 // simultaneously, we may end up using a large number of threads.
121 start_timer_thread_and_unlock();
122 } else {
123 // if there's no thread waiting with a timeout, kick an existing untimed
124 // waiter so that the next deadline is not missed
125 if (!g_has_timed_waiter) {
126 if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
127 gpr_log(GPR_INFO, "kick untimed waiter");
128 }
129 gpr_cv_signal(&g_cv_wait);
130 }
131 gpr_mu_unlock(&g_mu);
132 }
133 // without our lock, flush the exec_ctx
134 if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
135 gpr_log(GPR_INFO, "flush exec_ctx");
136 }
137 grpc_core::ExecCtx::Get()->Flush();
138 gpr_mu_lock(&g_mu);
139 // garbage collect any threads that are dead
140 gc_completed_threads();
141 // get ready to wait again
142 ++g_waiter_count;
143 gpr_mu_unlock(&g_mu);
144 }
145
146 // wait until 'next' (or forever if there is already a timed waiter in the pool)
147 // returns true if the thread should continue executing (false if it should
148 // shutdown)
wait_until(grpc_core::Timestamp next)149 static bool wait_until(grpc_core::Timestamp next) {
150 gpr_mu_lock(&g_mu);
151 // if we're not threaded anymore, leave
152 if (!g_threaded) {
153 gpr_mu_unlock(&g_mu);
154 return false;
155 }
156
157 // If g_kicked is true at this point, it means there was a kick from the timer
158 // system that the timer-manager threads here missed. We cannot trust 'next'
159 // here any longer (since there might be an earlier deadline). So if g_kicked
160 // is true at this point, we should quickly exit this and get the next
161 // deadline from the timer system
162
163 if (!g_kicked) {
164 // if there's no timed waiter, we should become one: that waiter waits
165 // only until the next timer should expire. All other timers wait forever
166 //
167 // 'g_timed_waiter_generation' is a global generation counter. The idea here
168 // is that the thread becoming a timed-waiter increments and stores this
169 // global counter locally in 'my_timed_waiter_generation' before going to
170 // sleep. After waking up, if my_timed_waiter_generation ==
171 // g_timed_waiter_generation, it can be sure that it was the timed_waiter
172 // thread (and that no other thread took over while this was asleep)
173 //
174 // Initialize my_timed_waiter_generation to some value that is NOT equal to
175 // g_timed_waiter_generation
176 uint64_t my_timed_waiter_generation = g_timed_waiter_generation - 1;
177
178 // If there's no timed waiter, we should become one: that waiter waits only
179 // until the next timer should expire. All other timer threads wait forever
180 // unless their 'next' is earlier than the current timed-waiter's deadline
181 // (in which case the thread with earlier 'next' takes over as the new timed
182 // waiter)
183 if (next != grpc_core::Timestamp::InfFuture()) {
184 if (!g_has_timed_waiter || (next < g_timed_waiter_deadline)) {
185 my_timed_waiter_generation = ++g_timed_waiter_generation;
186 g_has_timed_waiter = true;
187 g_timed_waiter_deadline = next;
188
189 if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
190 grpc_core::Duration wait_time = next - grpc_core::Timestamp::Now();
191 gpr_log(GPR_INFO, "sleep for a %" PRId64 " milliseconds",
192 wait_time.millis());
193 }
194 } else { // g_timed_waiter == true && next >= g_timed_waiter_deadline
195 next = grpc_core::Timestamp::InfFuture();
196 }
197 }
198
199 if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace) &&
200 next == grpc_core::Timestamp::InfFuture()) {
201 gpr_log(GPR_INFO, "sleep until kicked");
202 }
203
204 gpr_cv_wait(&g_cv_wait, &g_mu, next.as_timespec(GPR_CLOCK_MONOTONIC));
205
206 if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
207 gpr_log(GPR_INFO, "wait ended: was_timed:%d kicked:%d",
208 my_timed_waiter_generation == g_timed_waiter_generation,
209 g_kicked);
210 }
211 // if this was the timed waiter, then we need to check timers, and flag
212 // that there's now no timed waiter... we'll look for a replacement if
213 // there's work to do after checking timers (code above)
214 if (my_timed_waiter_generation == g_timed_waiter_generation) {
215 ++g_wakeups;
216 g_has_timed_waiter = false;
217 g_timed_waiter_deadline = grpc_core::Timestamp::InfFuture();
218 }
219 }
220
221 // if this was a kick from the timer system, consume it (and don't stop
222 // this thread yet)
223 if (g_kicked) {
224 grpc_timer_consume_kick();
225 g_kicked = false;
226 }
227
228 gpr_mu_unlock(&g_mu);
229 return true;
230 }
231
timer_main_loop()232 static void timer_main_loop() {
233 for (;;) {
234 grpc_core::Timestamp next = grpc_core::Timestamp::InfFuture();
235 grpc_core::ExecCtx::Get()->InvalidateNow();
236
237 // check timer state, updates next to the next time to run a check
238 switch (grpc_timer_check(&next)) {
239 case GRPC_TIMERS_FIRED:
240 run_some_timers();
241 break;
242 case GRPC_TIMERS_NOT_CHECKED:
243 // This case only happens under contention, meaning more than one timer
244 // manager thread checked timers concurrently.
245
246 // If that happens, we're guaranteed that some other thread has just
247 // checked timers, and this will avalanche into some other thread seeing
248 // empty timers and doing a timed sleep.
249
250 // Consequently, we can just sleep forever here and be happy at some
251 // saved wakeup cycles.
252 if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
253 gpr_log(GPR_INFO, "timers not checked: expect another thread to");
254 }
255 next = grpc_core::Timestamp::InfFuture();
256 ABSL_FALLTHROUGH_INTENDED;
257 case GRPC_TIMERS_CHECKED_AND_EMPTY:
258 if (!wait_until(next)) {
259 return;
260 }
261 break;
262 }
263 }
264 }
265
timer_thread_cleanup(completed_thread * ct)266 static void timer_thread_cleanup(completed_thread* ct) {
267 gpr_mu_lock(&g_mu);
268 // terminate the thread: drop the waiter count, thread count, and let whomever
269 // stopped the threading stuff know that we're done
270 --g_waiter_count;
271 --g_thread_count;
272 if (0 == g_thread_count) {
273 gpr_cv_signal(&g_cv_shutdown);
274 }
275 ct->next = g_completed_threads;
276 g_completed_threads = ct;
277 gpr_mu_unlock(&g_mu);
278 if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
279 gpr_log(GPR_INFO, "End timer thread");
280 }
281 }
282
timer_thread(void * completed_thread_ptr)283 static void timer_thread(void* completed_thread_ptr) {
284 // this threads exec_ctx: we try to run things through to completion here
285 // since it's easy to spin up new threads
286 grpc_core::ExecCtx exec_ctx(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD);
287 timer_main_loop();
288
289 timer_thread_cleanup(static_cast<completed_thread*>(completed_thread_ptr));
290 }
291
start_threads(void)292 static void start_threads(void) {
293 gpr_mu_lock(&g_mu);
294 if (!g_threaded) {
295 g_threaded = true;
296 start_timer_thread_and_unlock();
297 } else {
298 gpr_mu_unlock(&g_mu);
299 }
300 }
301
grpc_timer_manager_init(void)302 void grpc_timer_manager_init(void) {
303 gpr_mu_init(&g_mu);
304 gpr_cv_init(&g_cv_wait);
305 gpr_cv_init(&g_cv_shutdown);
306 g_threaded = false;
307 g_thread_count = 0;
308 g_waiter_count = 0;
309 g_completed_threads = nullptr;
310
311 g_has_timed_waiter = false;
312 g_timed_waiter_deadline = grpc_core::Timestamp::InfFuture();
313
314 if (g_start_threaded) start_threads();
315 }
316
stop_threads(void)317 static void stop_threads(void) {
318 gpr_mu_lock(&g_mu);
319 if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
320 gpr_log(GPR_INFO, "stop timer threads: threaded=%d", g_threaded);
321 }
322 if (g_threaded) {
323 g_threaded = false;
324 gpr_cv_broadcast(&g_cv_wait);
325 if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
326 gpr_log(GPR_INFO, "num timer threads: %d", g_thread_count);
327 }
328 while (g_thread_count > 0) {
329 gpr_cv_wait(&g_cv_shutdown, &g_mu, gpr_inf_future(GPR_CLOCK_MONOTONIC));
330 if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
331 gpr_log(GPR_INFO, "num timer threads: %d", g_thread_count);
332 }
333 gc_completed_threads();
334 }
335 }
336 g_wakeups = 0;
337 gpr_mu_unlock(&g_mu);
338 }
339
grpc_timer_manager_shutdown(void)340 void grpc_timer_manager_shutdown(void) {
341 stop_threads();
342
343 gpr_mu_destroy(&g_mu);
344 gpr_cv_destroy(&g_cv_wait);
345 gpr_cv_destroy(&g_cv_shutdown);
346 }
347
grpc_timer_manager_set_threading(bool enabled)348 void grpc_timer_manager_set_threading(bool enabled) {
349 if (enabled) {
350 start_threads();
351 } else {
352 stop_threads();
353 }
354 }
355
grpc_timer_manager_set_start_threaded(bool enabled)356 void grpc_timer_manager_set_start_threaded(bool enabled) {
357 g_start_threaded = enabled;
358 }
359
grpc_kick_poller(void)360 void grpc_kick_poller(void) {
361 gpr_mu_lock(&g_mu);
362 g_kicked = true;
363 g_has_timed_waiter = false;
364 g_timed_waiter_deadline = grpc_core::Timestamp::InfFuture();
365 ++g_timed_waiter_generation;
366 gpr_cv_signal(&g_cv_wait);
367 gpr_mu_unlock(&g_mu);
368 }
369
grpc_timer_manager_get_wakeups_testonly(void)370 uint64_t grpc_timer_manager_get_wakeups_testonly(void) { return g_wakeups; }
371