xref: /aosp_15_r20/external/grpc-grpc/src/core/lib/iomgr/exec_ctx.h (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPC_SRC_CORE_LIB_IOMGR_EXEC_CTX_H
20 #define GRPC_SRC_CORE_LIB_IOMGR_EXEC_CTX_H
21 
22 #include <grpc/support/port_platform.h>
23 
24 #include <limits>
25 
26 #if __APPLE__
27 // Provides TARGET_OS_IPHONE
28 #include <TargetConditionals.h>
29 #endif
30 
31 #include <grpc/impl/grpc_types.h>
32 #include <grpc/support/atm.h>
33 #include <grpc/support/cpu.h>
34 #include <grpc/support/log.h>
35 #include <grpc/support/time.h>
36 
37 #include "src/core/lib/gpr/time_precise.h"
38 #include "src/core/lib/gprpp/crash.h"
39 #include "src/core/lib/gprpp/debug_location.h"
40 #include "src/core/lib/gprpp/fork.h"
41 #include "src/core/lib/gprpp/time.h"
42 #include "src/core/lib/iomgr/closure.h"
43 
44 #if !defined(_WIN32) || !defined(_DLL)
45 #define EXEC_CTX exec_ctx_
46 #define CALLBACK_EXEC_CTX callback_exec_ctx_
47 #else
48 #define EXEC_CTX exec_ctx()
49 #define CALLBACK_EXEC_CTX callback_exec_ctx()
50 #endif
51 
52 /// A combiner represents a list of work to be executed later.
53 /// Forward declared here to avoid a circular dependency with combiner.h.
54 typedef struct grpc_combiner grpc_combiner;
55 
56 // This exec_ctx is ready to return: either pre-populated, or cached as soon as
57 // the finish_check returns true
58 #define GRPC_EXEC_CTX_FLAG_IS_FINISHED 1
59 // The exec_ctx's thread is (potentially) owned by a call or channel: care
60 // should be given to not delete said call/channel from this exec_ctx
61 #define GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP 2
62 // This exec ctx was initialized by an internal thread, and should not
63 // be counted by fork handlers
64 #define GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD 4
65 
66 // This application callback exec ctx was initialized by an internal thread, and
67 // should not be counted by fork handlers
68 #define GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD 1
69 
70 namespace grpc_core {
71 class Combiner;
72 /// Execution context.
73 /// A bag of data that collects information along a callstack.
74 /// It is created on the stack at core entry points (public API or iomgr), and
75 /// stored internally as a thread-local variable.
76 ///
77 /// Generally, to create an exec_ctx instance, add the following line at the top
78 /// of the public API entry point or at the start of a thread's work function :
79 ///
80 /// ExecCtx exec_ctx;
81 ///
82 /// Access the created ExecCtx instance using :
83 /// ExecCtx::Get()
84 ///
85 /// Specific responsibilities (this may grow in the future):
86 /// - track a list of core work that needs to be delayed until the base of the
87 ///   call stack (this provides a convenient mechanism to run callbacks
88 ///   without worrying about locking issues)
89 /// - provide a decision maker (via IsReadyToFinish) that provides a
90 ///   signal as to whether a borrowed thread should continue to do work or
91 ///   should actively try to finish up and get this thread back to its owner
92 ///
93 /// CONVENTIONS:
94 /// - Instance of this must ALWAYS be constructed on the stack, never
95 ///   heap allocated.
96 /// - Do not pass exec_ctx as a parameter to a function. Always access it using
97 ///   ExecCtx::Get().
98 /// - NOTE: In the future, the convention is likely to change to allow only one
99 ///         ExecCtx on a thread's stack at the same time. The TODO below
100 ///         discusses this plan in more detail.
101 ///
102 /// TODO(yashykt): Only allow one "active" ExecCtx on a thread at the same time.
103 ///               Stage 1: If a new one is created on the stack, it should just
104 ///               pass-through to the underlying ExecCtx deeper in the thread's
105 ///               stack.
106 ///               Stage 2: Assert if a 2nd one is ever created on the stack
107 ///               since that implies a core re-entry outside of application
108 ///               callbacks.
109 ///
110 class GRPC_DLL ExecCtx {
111  public:
112   /// Default Constructor
113 
ExecCtx()114   ExecCtx() : flags_(GRPC_EXEC_CTX_FLAG_IS_FINISHED) {
115     Fork::IncExecCtxCount();
116     Set(this);
117   }
118 
119   /// Parameterised Constructor
ExecCtx(uintptr_t fl)120   explicit ExecCtx(uintptr_t fl) : flags_(fl) {
121     if (!(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags_)) {
122       Fork::IncExecCtxCount();
123     }
124     Set(this);
125   }
126 
127   /// Destructor
~ExecCtx()128   virtual ~ExecCtx() {
129     flags_ |= GRPC_EXEC_CTX_FLAG_IS_FINISHED;
130     Flush();
131     Set(last_exec_ctx_);
132     if (!(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags_)) {
133       Fork::DecExecCtxCount();
134     }
135   }
136 
137   /// Disallow copy and assignment operators
138   ExecCtx(const ExecCtx&) = delete;
139   ExecCtx& operator=(const ExecCtx&) = delete;
140 
141   struct CombinerData {
142     // currently active combiner: updated only via combiner.c
143     Combiner* active_combiner;
144     // last active combiner in the active combiner list
145     Combiner* last_combiner;
146   };
147 
148   /// Only to be used by grpc-combiner code
combiner_data()149   CombinerData* combiner_data() { return &combiner_data_; }
150 
151   /// Return pointer to grpc_closure_list
closure_list()152   grpc_closure_list* closure_list() { return &closure_list_; }
153 
154   /// Return flags
flags()155   uintptr_t flags() { return flags_; }
156 
157   /// Checks if there is work to be done
HasWork()158   bool HasWork() {
159     return combiner_data_.active_combiner != nullptr ||
160            !grpc_closure_list_empty(closure_list_);
161   }
162 
163   /// Flush any work that has been enqueued onto this grpc_exec_ctx.
164   /// Caller must guarantee that no interfering locks are held.
165   /// Returns true if work was performed, false otherwise.
166   ///
167   bool Flush();
168 
169   /// Returns true if we'd like to leave this execution context as soon as
170   /// possible: useful for deciding whether to do something more or not
171   /// depending on outside context.
172   ///
IsReadyToFinish()173   bool IsReadyToFinish() {
174     if ((flags_ & GRPC_EXEC_CTX_FLAG_IS_FINISHED) == 0) {
175       if (CheckReadyToFinish()) {
176         flags_ |= GRPC_EXEC_CTX_FLAG_IS_FINISHED;
177         return true;
178       }
179       return false;
180     } else {
181       return true;
182     }
183   }
184 
SetReadyToFinishFlag()185   void SetReadyToFinishFlag() { flags_ |= GRPC_EXEC_CTX_FLAG_IS_FINISHED; }
186 
Now()187   Timestamp Now() { return Timestamp::Now(); }
188 
InvalidateNow()189   void InvalidateNow() {
190 #if !TARGET_OS_IPHONE
191     time_cache_.InvalidateCache();
192 #endif
193   }
194 
SetNowIomgrShutdown()195   void SetNowIomgrShutdown() {
196 #if !TARGET_OS_IPHONE
197     // We get to do a test only set now on this path just because iomgr
198     // is getting removed and no point adding more interfaces for it.
199     time_cache_.TestOnlySetNow(Timestamp::InfFuture());
200 #endif
201   }
202 
TestOnlySetNow(Timestamp now)203   void TestOnlySetNow(Timestamp now) {
204 #if !TARGET_OS_IPHONE
205     time_cache_.TestOnlySetNow(now);
206 #endif
207   }
208 
209   /// Gets pointer to current exec_ctx.
Get()210   static ExecCtx* Get() { return EXEC_CTX; }
211 
212   static void Run(const DebugLocation& location, grpc_closure* closure,
213                   grpc_error_handle error);
214 
215   static void RunList(const DebugLocation& location, grpc_closure_list* list);
216 
217  protected:
218   /// Check if ready to finish.
CheckReadyToFinish()219   virtual bool CheckReadyToFinish() { return false; }
220 
221   /// Disallow delete on ExecCtx.
delete(void *)222   static void operator delete(void* /* p */) { abort(); }
223 
224  private:
225   /// Set EXEC_CTX to ctx.
Set(ExecCtx * ctx)226   static void Set(ExecCtx* ctx) { EXEC_CTX = ctx; }
227 
228   grpc_closure_list closure_list_ = GRPC_CLOSURE_LIST_INIT;
229   CombinerData combiner_data_ = {nullptr, nullptr};
230   uintptr_t flags_;
231 
232 #if !TARGET_OS_IPHONE
233   ScopedTimeCache time_cache_;
234 #endif
235 
236 #if !defined(_WIN32) || !defined(_DLL)
237   static thread_local ExecCtx* exec_ctx_;
238 #else
239   // cannot be thread_local data member (e.g. exec_ctx_) on windows
240   static ExecCtx*& exec_ctx();
241 #endif
242   ExecCtx* last_exec_ctx_ = Get();
243 };
244 
245 /// Application-callback execution context.
246 /// A bag of data that collects information along a callstack.
247 /// It is created on the stack at core entry points, and stored internally
248 /// as a thread-local variable.
249 ///
250 /// There are three key differences between this structure and ExecCtx:
251 ///   1. ApplicationCallbackExecCtx builds a list of application-level
252 ///      callbacks, but ExecCtx builds a list of internal callbacks to invoke.
253 ///   2. ApplicationCallbackExecCtx invokes its callbacks only at destruction;
254 ///      there is no explicit Flush method.
255 ///   3. If more than one ApplicationCallbackExecCtx is created on the thread's
256 ///      stack, only the one closest to the base of the stack is actually
257 ///      active and this is the only one that enqueues application callbacks.
258 ///      (Unlike ExecCtx, it is not feasible to prevent multiple of these on the
259 ///      stack since the executing application callback may itself enter core.
260 ///      However, the new one created will just pass callbacks through to the
261 ///      base one and those will not be executed until the return to the
262 ///      destructor of the base one, preventing unlimited stack growth.)
263 ///
264 /// This structure exists because application callbacks may themselves cause a
265 /// core re-entry (e.g., through a public API call) and if that call in turn
266 /// causes another application-callback, there could be arbitrarily growing
267 /// stacks of core re-entries. Instead, any application callbacks instead should
268 /// not be invoked until other core work is done and other application callbacks
269 /// have completed. To accomplish this, any application callback should be
270 /// enqueued using ApplicationCallbackExecCtx::Enqueue .
271 ///
272 /// CONVENTIONS:
273 /// - Instances of this must ALWAYS be constructed on the stack, never
274 ///   heap allocated.
275 /// - Instances of this are generally constructed before ExecCtx when needed.
276 ///   The only exception is for ExecCtx's that are explicitly flushed and
277 ///   that survive beyond the scope of the function that can cause application
278 ///   callbacks to be invoked (e.g., in the timer thread).
279 ///
280 /// Generally, core entry points that may trigger application-level callbacks
281 /// will have the following declarations:
282 ///
283 /// ApplicationCallbackExecCtx callback_exec_ctx;
284 /// ExecCtx exec_ctx;
285 ///
286 /// This ordering is important to make sure that the ApplicationCallbackExecCtx
287 /// is destroyed after the ExecCtx (to prevent the re-entry problem described
288 /// above, as well as making sure that ExecCtx core callbacks are invoked first)
289 ///
290 ///
291 
292 class GRPC_DLL ApplicationCallbackExecCtx {
293  public:
294   /// Default Constructor
ApplicationCallbackExecCtx()295   ApplicationCallbackExecCtx() { Set(this, flags_); }
296 
297   /// Parameterised Constructor
ApplicationCallbackExecCtx(uintptr_t fl)298   explicit ApplicationCallbackExecCtx(uintptr_t fl) : flags_(fl) {
299     Set(this, flags_);
300   }
301 
~ApplicationCallbackExecCtx()302   ~ApplicationCallbackExecCtx() {
303     if (Get() == this) {
304       while (head_ != nullptr) {
305         auto* f = head_;
306         head_ = f->internal_next;
307         if (f->internal_next == nullptr) {
308           tail_ = nullptr;
309         }
310         (*f->functor_run)(f, f->internal_success);
311       }
312       CALLBACK_EXEC_CTX = nullptr;
313       if (!(GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags_)) {
314         Fork::DecExecCtxCount();
315       }
316     } else {
317       GPR_DEBUG_ASSERT(head_ == nullptr);
318       GPR_DEBUG_ASSERT(tail_ == nullptr);
319     }
320   }
321 
Flags()322   uintptr_t Flags() { return flags_; }
323 
Get()324   static ApplicationCallbackExecCtx* Get() { return CALLBACK_EXEC_CTX; }
325 
Set(ApplicationCallbackExecCtx * exec_ctx,uintptr_t flags)326   static void Set(ApplicationCallbackExecCtx* exec_ctx, uintptr_t flags) {
327     if (Get() == nullptr) {
328       if (!(GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags)) {
329         Fork::IncExecCtxCount();
330       }
331       CALLBACK_EXEC_CTX = exec_ctx;
332     }
333   }
334 
Enqueue(grpc_completion_queue_functor * functor,int is_success)335   static void Enqueue(grpc_completion_queue_functor* functor, int is_success) {
336     functor->internal_success = is_success;
337     functor->internal_next = nullptr;
338 
339     ApplicationCallbackExecCtx* ctx = Get();
340 
341     if (ctx->head_ == nullptr) {
342       ctx->head_ = functor;
343     }
344     if (ctx->tail_ != nullptr) {
345       ctx->tail_->internal_next = functor;
346     }
347     ctx->tail_ = functor;
348   }
349 
Available()350   static bool Available() { return Get() != nullptr; }
351 
352  private:
353   uintptr_t flags_{0u};
354   grpc_completion_queue_functor* head_{nullptr};
355   grpc_completion_queue_functor* tail_{nullptr};
356 
357 #if !defined(_WIN32) || !defined(_DLL)
358   static thread_local ApplicationCallbackExecCtx* callback_exec_ctx_;
359 #else
360   // cannot be thread_local data member (e.g. callback_exec_ctx_) on windows
361   static ApplicationCallbackExecCtx*& callback_exec_ctx();
362 #endif
363 };
364 
365 template <typename F>
EnsureRunInExecCtx(F f)366 void EnsureRunInExecCtx(F f) {
367   if (ExecCtx::Get() == nullptr) {
368     ApplicationCallbackExecCtx app_ctx;
369     ExecCtx exec_ctx;
370     f();
371   } else {
372     f();
373   }
374 }
375 
376 #undef EXEC_CTX
377 #undef CALLBACK_EXEC_CTX
378 
379 }  // namespace grpc_core
380 
381 #endif  // GRPC_SRC_CORE_LIB_IOMGR_EXEC_CTX_H
382