xref: /aosp_15_r20/external/grpc-grpc/src/core/lib/iomgr/timer_manager.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/timer_manager.h"
22 
23 #include <inttypes.h>
24 
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/log.h>
27 
28 #include "src/core/lib/debug/trace.h"
29 #include "src/core/lib/gprpp/crash.h"
30 #include "src/core/lib/gprpp/thd.h"
31 #include "src/core/lib/iomgr/timer.h"
32 
33 struct completed_thread {
34   grpc_core::Thread thd;
35   completed_thread* next;
36 };
37 
38 extern grpc_core::TraceFlag grpc_timer_check_trace;
39 
40 // global mutex
41 static gpr_mu g_mu;
42 // are we multi-threaded
43 static bool g_threaded;
44 // should we start multi-threaded
45 static bool g_start_threaded = true;
46 // cv to wait until a thread is needed
47 static gpr_cv g_cv_wait;
48 // cv for notification when threading ends
49 static gpr_cv g_cv_shutdown;
50 // number of threads in the system
51 static int g_thread_count;
52 // number of threads sitting around waiting
53 static int g_waiter_count;
54 // linked list of threads that have completed (and need joining)
55 static completed_thread* g_completed_threads;
56 // was the manager kicked by the timer system
57 static bool g_kicked;
58 // is there a thread waiting until the next timer should fire?
59 static bool g_has_timed_waiter;
60 // the deadline of the current timed waiter thread (only relevant if
61 // g_has_timed_waiter is true)
62 static grpc_core::Timestamp g_timed_waiter_deadline;
63 // generation counter to track which thread is waiting for the next timer
64 static uint64_t g_timed_waiter_generation;
65 // number of timer wakeups
66 static uint64_t g_wakeups;
67 
68 static void timer_thread(void* completed_thread_ptr);
69 
gc_completed_threads(void)70 static void gc_completed_threads(void) {
71   if (g_completed_threads != nullptr) {
72     completed_thread* to_gc = g_completed_threads;
73     g_completed_threads = nullptr;
74     gpr_mu_unlock(&g_mu);
75     while (to_gc != nullptr) {
76       to_gc->thd.Join();
77       completed_thread* next = to_gc->next;
78       gpr_free(to_gc);
79       to_gc = next;
80     }
81     gpr_mu_lock(&g_mu);
82   }
83 }
84 
start_timer_thread_and_unlock(void)85 static void start_timer_thread_and_unlock(void) {
86   GPR_ASSERT(g_threaded);
87   ++g_waiter_count;
88   ++g_thread_count;
89   gpr_mu_unlock(&g_mu);
90   if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
91     gpr_log(GPR_INFO, "Spawn timer thread");
92   }
93   completed_thread* ct =
94       static_cast<completed_thread*>(gpr_malloc(sizeof(*ct)));
95   ct->thd = grpc_core::Thread("grpc_global_timer", timer_thread, ct);
96   ct->thd.Start();
97 }
98 
grpc_timer_manager_tick()99 void grpc_timer_manager_tick() {
100   grpc_core::ExecCtx exec_ctx;
101   grpc_timer_check(nullptr);
102 }
103 
run_some_timers()104 static void run_some_timers() {
105   // In the case of timers, the ExecCtx for the thread is declared
106   // in the timer thread itself, but this is the point where we
107   // could start seeing application-level callbacks. No need to
108   // create a new ExecCtx, though, since there already is one and it is
109   // flushed (but not destructed) in this function itself
110   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx(
111       GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD);
112 
113   // if there's something to execute...
114   gpr_mu_lock(&g_mu);
115   // remove a waiter from the pool, and start another thread if necessary
116   --g_waiter_count;
117   if (g_waiter_count == 0 && g_threaded) {
118     // The number of timer threads is always increasing until all the threads
119     // are stopped. In rare cases, if a large number of timers fire
120     // simultaneously, we may end up using a large number of threads.
121     start_timer_thread_and_unlock();
122   } else {
123     // if there's no thread waiting with a timeout, kick an existing untimed
124     // waiter so that the next deadline is not missed
125     if (!g_has_timed_waiter) {
126       if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
127         gpr_log(GPR_INFO, "kick untimed waiter");
128       }
129       gpr_cv_signal(&g_cv_wait);
130     }
131     gpr_mu_unlock(&g_mu);
132   }
133   // without our lock, flush the exec_ctx
134   if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
135     gpr_log(GPR_INFO, "flush exec_ctx");
136   }
137   grpc_core::ExecCtx::Get()->Flush();
138   gpr_mu_lock(&g_mu);
139   // garbage collect any threads that are dead
140   gc_completed_threads();
141   // get ready to wait again
142   ++g_waiter_count;
143   gpr_mu_unlock(&g_mu);
144 }
145 
146 // wait until 'next' (or forever if there is already a timed waiter in the pool)
147 // returns true if the thread should continue executing (false if it should
148 // shutdown)
wait_until(grpc_core::Timestamp next)149 static bool wait_until(grpc_core::Timestamp next) {
150   gpr_mu_lock(&g_mu);
151   // if we're not threaded anymore, leave
152   if (!g_threaded) {
153     gpr_mu_unlock(&g_mu);
154     return false;
155   }
156 
157   // If g_kicked is true at this point, it means there was a kick from the timer
158   // system that the timer-manager threads here missed. We cannot trust 'next'
159   // here any longer (since there might be an earlier deadline). So if g_kicked
160   // is true at this point, we should quickly exit this and get the next
161   // deadline from the timer system
162 
163   if (!g_kicked) {
164     // if there's no timed waiter, we should become one: that waiter waits
165     // only until the next timer should expire. All other timers wait forever
166     //
167     // 'g_timed_waiter_generation' is a global generation counter. The idea here
168     // is that the thread becoming a timed-waiter increments and stores this
169     // global counter locally in 'my_timed_waiter_generation' before going to
170     // sleep. After waking up, if my_timed_waiter_generation ==
171     // g_timed_waiter_generation, it can be sure that it was the timed_waiter
172     // thread (and that no other thread took over while this was asleep)
173     //
174     // Initialize my_timed_waiter_generation to some value that is NOT equal to
175     // g_timed_waiter_generation
176     uint64_t my_timed_waiter_generation = g_timed_waiter_generation - 1;
177 
178     // If there's no timed waiter, we should become one: that waiter waits only
179     // until the next timer should expire. All other timer threads wait forever
180     // unless their 'next' is earlier than the current timed-waiter's deadline
181     // (in which case the thread with earlier 'next' takes over as the new timed
182     // waiter)
183     if (next != grpc_core::Timestamp::InfFuture()) {
184       if (!g_has_timed_waiter || (next < g_timed_waiter_deadline)) {
185         my_timed_waiter_generation = ++g_timed_waiter_generation;
186         g_has_timed_waiter = true;
187         g_timed_waiter_deadline = next;
188 
189         if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
190           grpc_core::Duration wait_time = next - grpc_core::Timestamp::Now();
191           gpr_log(GPR_INFO, "sleep for a %" PRId64 " milliseconds",
192                   wait_time.millis());
193         }
194       } else {  // g_timed_waiter == true && next >= g_timed_waiter_deadline
195         next = grpc_core::Timestamp::InfFuture();
196       }
197     }
198 
199     if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace) &&
200         next == grpc_core::Timestamp::InfFuture()) {
201       gpr_log(GPR_INFO, "sleep until kicked");
202     }
203 
204     gpr_cv_wait(&g_cv_wait, &g_mu, next.as_timespec(GPR_CLOCK_MONOTONIC));
205 
206     if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
207       gpr_log(GPR_INFO, "wait ended: was_timed:%d kicked:%d",
208               my_timed_waiter_generation == g_timed_waiter_generation,
209               g_kicked);
210     }
211     // if this was the timed waiter, then we need to check timers, and flag
212     // that there's now no timed waiter... we'll look for a replacement if
213     // there's work to do after checking timers (code above)
214     if (my_timed_waiter_generation == g_timed_waiter_generation) {
215       ++g_wakeups;
216       g_has_timed_waiter = false;
217       g_timed_waiter_deadline = grpc_core::Timestamp::InfFuture();
218     }
219   }
220 
221   // if this was a kick from the timer system, consume it (and don't stop
222   // this thread yet)
223   if (g_kicked) {
224     grpc_timer_consume_kick();
225     g_kicked = false;
226   }
227 
228   gpr_mu_unlock(&g_mu);
229   return true;
230 }
231 
timer_main_loop()232 static void timer_main_loop() {
233   for (;;) {
234     grpc_core::Timestamp next = grpc_core::Timestamp::InfFuture();
235     grpc_core::ExecCtx::Get()->InvalidateNow();
236 
237     // check timer state, updates next to the next time to run a check
238     switch (grpc_timer_check(&next)) {
239       case GRPC_TIMERS_FIRED:
240         run_some_timers();
241         break;
242       case GRPC_TIMERS_NOT_CHECKED:
243         // This case only happens under contention, meaning more than one timer
244         // manager thread checked timers concurrently.
245 
246         // If that happens, we're guaranteed that some other thread has just
247         // checked timers, and this will avalanche into some other thread seeing
248         // empty timers and doing a timed sleep.
249 
250         // Consequently, we can just sleep forever here and be happy at some
251         // saved wakeup cycles.
252         if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
253           gpr_log(GPR_INFO, "timers not checked: expect another thread to");
254         }
255         next = grpc_core::Timestamp::InfFuture();
256         ABSL_FALLTHROUGH_INTENDED;
257       case GRPC_TIMERS_CHECKED_AND_EMPTY:
258         if (!wait_until(next)) {
259           return;
260         }
261         break;
262     }
263   }
264 }
265 
timer_thread_cleanup(completed_thread * ct)266 static void timer_thread_cleanup(completed_thread* ct) {
267   gpr_mu_lock(&g_mu);
268   // terminate the thread: drop the waiter count, thread count, and let whomever
269   // stopped the threading stuff know that we're done
270   --g_waiter_count;
271   --g_thread_count;
272   if (0 == g_thread_count) {
273     gpr_cv_signal(&g_cv_shutdown);
274   }
275   ct->next = g_completed_threads;
276   g_completed_threads = ct;
277   gpr_mu_unlock(&g_mu);
278   if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
279     gpr_log(GPR_INFO, "End timer thread");
280   }
281 }
282 
timer_thread(void * completed_thread_ptr)283 static void timer_thread(void* completed_thread_ptr) {
284   // this threads exec_ctx: we try to run things through to completion here
285   // since it's easy to spin up new threads
286   grpc_core::ExecCtx exec_ctx(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD);
287   timer_main_loop();
288 
289   timer_thread_cleanup(static_cast<completed_thread*>(completed_thread_ptr));
290 }
291 
start_threads(void)292 static void start_threads(void) {
293   gpr_mu_lock(&g_mu);
294   if (!g_threaded) {
295     g_threaded = true;
296     start_timer_thread_and_unlock();
297   } else {
298     gpr_mu_unlock(&g_mu);
299   }
300 }
301 
grpc_timer_manager_init(void)302 void grpc_timer_manager_init(void) {
303   gpr_mu_init(&g_mu);
304   gpr_cv_init(&g_cv_wait);
305   gpr_cv_init(&g_cv_shutdown);
306   g_threaded = false;
307   g_thread_count = 0;
308   g_waiter_count = 0;
309   g_completed_threads = nullptr;
310 
311   g_has_timed_waiter = false;
312   g_timed_waiter_deadline = grpc_core::Timestamp::InfFuture();
313 
314   if (g_start_threaded) start_threads();
315 }
316 
stop_threads(void)317 static void stop_threads(void) {
318   gpr_mu_lock(&g_mu);
319   if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
320     gpr_log(GPR_INFO, "stop timer threads: threaded=%d", g_threaded);
321   }
322   if (g_threaded) {
323     g_threaded = false;
324     gpr_cv_broadcast(&g_cv_wait);
325     if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
326       gpr_log(GPR_INFO, "num timer threads: %d", g_thread_count);
327     }
328     while (g_thread_count > 0) {
329       gpr_cv_wait(&g_cv_shutdown, &g_mu, gpr_inf_future(GPR_CLOCK_MONOTONIC));
330       if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
331         gpr_log(GPR_INFO, "num timer threads: %d", g_thread_count);
332       }
333       gc_completed_threads();
334     }
335   }
336   g_wakeups = 0;
337   gpr_mu_unlock(&g_mu);
338 }
339 
grpc_timer_manager_shutdown(void)340 void grpc_timer_manager_shutdown(void) {
341   stop_threads();
342 
343   gpr_mu_destroy(&g_mu);
344   gpr_cv_destroy(&g_cv_wait);
345   gpr_cv_destroy(&g_cv_shutdown);
346 }
347 
grpc_timer_manager_set_threading(bool enabled)348 void grpc_timer_manager_set_threading(bool enabled) {
349   if (enabled) {
350     start_threads();
351   } else {
352     stop_threads();
353   }
354 }
355 
grpc_timer_manager_set_start_threaded(bool enabled)356 void grpc_timer_manager_set_start_threaded(bool enabled) {
357   g_start_threaded = enabled;
358 }
359 
grpc_kick_poller(void)360 void grpc_kick_poller(void) {
361   gpr_mu_lock(&g_mu);
362   g_kicked = true;
363   g_has_timed_waiter = false;
364   g_timed_waiter_deadline = grpc_core::Timestamp::InfFuture();
365   ++g_timed_waiter_generation;
366   gpr_cv_signal(&g_cv_wait);
367   gpr_mu_unlock(&g_mu);
368 }
369 
grpc_timer_manager_get_wakeups_testonly(void)370 uint64_t grpc_timer_manager_get_wakeups_testonly(void) { return g_wakeups; }
371