1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #ifndef GRPC_SRC_CORE_LIB_IOMGR_EXEC_CTX_H
20 #define GRPC_SRC_CORE_LIB_IOMGR_EXEC_CTX_H
21
22 #include <grpc/support/port_platform.h>
23
24 #include <limits>
25
26 #if __APPLE__
27 // Provides TARGET_OS_IPHONE
28 #include <TargetConditionals.h>
29 #endif
30
31 #include <grpc/impl/grpc_types.h>
32 #include <grpc/support/atm.h>
33 #include <grpc/support/cpu.h>
34 #include <grpc/support/log.h>
35 #include <grpc/support/time.h>
36
37 #include "src/core/lib/gpr/time_precise.h"
38 #include "src/core/lib/gprpp/crash.h"
39 #include "src/core/lib/gprpp/debug_location.h"
40 #include "src/core/lib/gprpp/fork.h"
41 #include "src/core/lib/gprpp/time.h"
42 #include "src/core/lib/iomgr/closure.h"
43
44 #if !defined(_WIN32) || !defined(_DLL)
45 #define EXEC_CTX exec_ctx_
46 #define CALLBACK_EXEC_CTX callback_exec_ctx_
47 #else
48 #define EXEC_CTX exec_ctx()
49 #define CALLBACK_EXEC_CTX callback_exec_ctx()
50 #endif
51
52 /// A combiner represents a list of work to be executed later.
53 /// Forward declared here to avoid a circular dependency with combiner.h.
54 typedef struct grpc_combiner grpc_combiner;
55
56 // This exec_ctx is ready to return: either pre-populated, or cached as soon as
57 // the finish_check returns true
58 #define GRPC_EXEC_CTX_FLAG_IS_FINISHED 1
59 // The exec_ctx's thread is (potentially) owned by a call or channel: care
60 // should be given to not delete said call/channel from this exec_ctx
61 #define GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP 2
62 // This exec ctx was initialized by an internal thread, and should not
63 // be counted by fork handlers
64 #define GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD 4
65
66 // This application callback exec ctx was initialized by an internal thread, and
67 // should not be counted by fork handlers
68 #define GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD 1
69
70 namespace grpc_core {
71 class Combiner;
72 /// Execution context.
73 /// A bag of data that collects information along a callstack.
74 /// It is created on the stack at core entry points (public API or iomgr), and
75 /// stored internally as a thread-local variable.
76 ///
77 /// Generally, to create an exec_ctx instance, add the following line at the top
78 /// of the public API entry point or at the start of a thread's work function :
79 ///
80 /// ExecCtx exec_ctx;
81 ///
82 /// Access the created ExecCtx instance using :
83 /// ExecCtx::Get()
84 ///
85 /// Specific responsibilities (this may grow in the future):
86 /// - track a list of core work that needs to be delayed until the base of the
87 /// call stack (this provides a convenient mechanism to run callbacks
88 /// without worrying about locking issues)
89 /// - provide a decision maker (via IsReadyToFinish) that provides a
90 /// signal as to whether a borrowed thread should continue to do work or
91 /// should actively try to finish up and get this thread back to its owner
92 ///
93 /// CONVENTIONS:
94 /// - Instance of this must ALWAYS be constructed on the stack, never
95 /// heap allocated.
96 /// - Do not pass exec_ctx as a parameter to a function. Always access it using
97 /// ExecCtx::Get().
98 /// - NOTE: In the future, the convention is likely to change to allow only one
99 /// ExecCtx on a thread's stack at the same time. The TODO below
100 /// discusses this plan in more detail.
101 ///
102 /// TODO(yashykt): Only allow one "active" ExecCtx on a thread at the same time.
103 /// Stage 1: If a new one is created on the stack, it should just
104 /// pass-through to the underlying ExecCtx deeper in the thread's
105 /// stack.
106 /// Stage 2: Assert if a 2nd one is ever created on the stack
107 /// since that implies a core re-entry outside of application
108 /// callbacks.
109 ///
110 class GRPC_DLL ExecCtx {
111 public:
112 /// Default Constructor
113
ExecCtx()114 ExecCtx() : flags_(GRPC_EXEC_CTX_FLAG_IS_FINISHED) {
115 Fork::IncExecCtxCount();
116 Set(this);
117 }
118
119 /// Parameterised Constructor
ExecCtx(uintptr_t fl)120 explicit ExecCtx(uintptr_t fl) : flags_(fl) {
121 if (!(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags_)) {
122 Fork::IncExecCtxCount();
123 }
124 Set(this);
125 }
126
127 /// Destructor
~ExecCtx()128 virtual ~ExecCtx() {
129 flags_ |= GRPC_EXEC_CTX_FLAG_IS_FINISHED;
130 Flush();
131 Set(last_exec_ctx_);
132 if (!(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags_)) {
133 Fork::DecExecCtxCount();
134 }
135 }
136
137 /// Disallow copy and assignment operators
138 ExecCtx(const ExecCtx&) = delete;
139 ExecCtx& operator=(const ExecCtx&) = delete;
140
141 struct CombinerData {
142 // currently active combiner: updated only via combiner.c
143 Combiner* active_combiner;
144 // last active combiner in the active combiner list
145 Combiner* last_combiner;
146 };
147
148 /// Only to be used by grpc-combiner code
combiner_data()149 CombinerData* combiner_data() { return &combiner_data_; }
150
151 /// Return pointer to grpc_closure_list
closure_list()152 grpc_closure_list* closure_list() { return &closure_list_; }
153
154 /// Return flags
flags()155 uintptr_t flags() { return flags_; }
156
157 /// Checks if there is work to be done
HasWork()158 bool HasWork() {
159 return combiner_data_.active_combiner != nullptr ||
160 !grpc_closure_list_empty(closure_list_);
161 }
162
163 /// Flush any work that has been enqueued onto this grpc_exec_ctx.
164 /// Caller must guarantee that no interfering locks are held.
165 /// Returns true if work was performed, false otherwise.
166 ///
167 bool Flush();
168
169 /// Returns true if we'd like to leave this execution context as soon as
170 /// possible: useful for deciding whether to do something more or not
171 /// depending on outside context.
172 ///
IsReadyToFinish()173 bool IsReadyToFinish() {
174 if ((flags_ & GRPC_EXEC_CTX_FLAG_IS_FINISHED) == 0) {
175 if (CheckReadyToFinish()) {
176 flags_ |= GRPC_EXEC_CTX_FLAG_IS_FINISHED;
177 return true;
178 }
179 return false;
180 } else {
181 return true;
182 }
183 }
184
SetReadyToFinishFlag()185 void SetReadyToFinishFlag() { flags_ |= GRPC_EXEC_CTX_FLAG_IS_FINISHED; }
186
Now()187 Timestamp Now() { return Timestamp::Now(); }
188
InvalidateNow()189 void InvalidateNow() {
190 #if !TARGET_OS_IPHONE
191 time_cache_.InvalidateCache();
192 #endif
193 }
194
SetNowIomgrShutdown()195 void SetNowIomgrShutdown() {
196 #if !TARGET_OS_IPHONE
197 // We get to do a test only set now on this path just because iomgr
198 // is getting removed and no point adding more interfaces for it.
199 time_cache_.TestOnlySetNow(Timestamp::InfFuture());
200 #endif
201 }
202
TestOnlySetNow(Timestamp now)203 void TestOnlySetNow(Timestamp now) {
204 #if !TARGET_OS_IPHONE
205 time_cache_.TestOnlySetNow(now);
206 #endif
207 }
208
209 /// Gets pointer to current exec_ctx.
Get()210 static ExecCtx* Get() { return EXEC_CTX; }
211
212 static void Run(const DebugLocation& location, grpc_closure* closure,
213 grpc_error_handle error);
214
215 static void RunList(const DebugLocation& location, grpc_closure_list* list);
216
217 protected:
218 /// Check if ready to finish.
CheckReadyToFinish()219 virtual bool CheckReadyToFinish() { return false; }
220
221 /// Disallow delete on ExecCtx.
delete(void *)222 static void operator delete(void* /* p */) { abort(); }
223
224 private:
225 /// Set EXEC_CTX to ctx.
Set(ExecCtx * ctx)226 static void Set(ExecCtx* ctx) { EXEC_CTX = ctx; }
227
228 grpc_closure_list closure_list_ = GRPC_CLOSURE_LIST_INIT;
229 CombinerData combiner_data_ = {nullptr, nullptr};
230 uintptr_t flags_;
231
232 #if !TARGET_OS_IPHONE
233 ScopedTimeCache time_cache_;
234 #endif
235
236 #if !defined(_WIN32) || !defined(_DLL)
237 static thread_local ExecCtx* exec_ctx_;
238 #else
239 // cannot be thread_local data member (e.g. exec_ctx_) on windows
240 static ExecCtx*& exec_ctx();
241 #endif
242 ExecCtx* last_exec_ctx_ = Get();
243 };
244
245 /// Application-callback execution context.
246 /// A bag of data that collects information along a callstack.
247 /// It is created on the stack at core entry points, and stored internally
248 /// as a thread-local variable.
249 ///
250 /// There are three key differences between this structure and ExecCtx:
251 /// 1. ApplicationCallbackExecCtx builds a list of application-level
252 /// callbacks, but ExecCtx builds a list of internal callbacks to invoke.
253 /// 2. ApplicationCallbackExecCtx invokes its callbacks only at destruction;
254 /// there is no explicit Flush method.
255 /// 3. If more than one ApplicationCallbackExecCtx is created on the thread's
256 /// stack, only the one closest to the base of the stack is actually
257 /// active and this is the only one that enqueues application callbacks.
258 /// (Unlike ExecCtx, it is not feasible to prevent multiple of these on the
259 /// stack since the executing application callback may itself enter core.
260 /// However, the new one created will just pass callbacks through to the
261 /// base one and those will not be executed until the return to the
262 /// destructor of the base one, preventing unlimited stack growth.)
263 ///
264 /// This structure exists because application callbacks may themselves cause a
265 /// core re-entry (e.g., through a public API call) and if that call in turn
266 /// causes another application-callback, there could be arbitrarily growing
267 /// stacks of core re-entries. Instead, any application callbacks instead should
268 /// not be invoked until other core work is done and other application callbacks
269 /// have completed. To accomplish this, any application callback should be
270 /// enqueued using ApplicationCallbackExecCtx::Enqueue .
271 ///
272 /// CONVENTIONS:
273 /// - Instances of this must ALWAYS be constructed on the stack, never
274 /// heap allocated.
275 /// - Instances of this are generally constructed before ExecCtx when needed.
276 /// The only exception is for ExecCtx's that are explicitly flushed and
277 /// that survive beyond the scope of the function that can cause application
278 /// callbacks to be invoked (e.g., in the timer thread).
279 ///
280 /// Generally, core entry points that may trigger application-level callbacks
281 /// will have the following declarations:
282 ///
283 /// ApplicationCallbackExecCtx callback_exec_ctx;
284 /// ExecCtx exec_ctx;
285 ///
286 /// This ordering is important to make sure that the ApplicationCallbackExecCtx
287 /// is destroyed after the ExecCtx (to prevent the re-entry problem described
288 /// above, as well as making sure that ExecCtx core callbacks are invoked first)
289 ///
290 ///
291
292 class GRPC_DLL ApplicationCallbackExecCtx {
293 public:
294 /// Default Constructor
ApplicationCallbackExecCtx()295 ApplicationCallbackExecCtx() { Set(this, flags_); }
296
297 /// Parameterised Constructor
ApplicationCallbackExecCtx(uintptr_t fl)298 explicit ApplicationCallbackExecCtx(uintptr_t fl) : flags_(fl) {
299 Set(this, flags_);
300 }
301
~ApplicationCallbackExecCtx()302 ~ApplicationCallbackExecCtx() {
303 if (Get() == this) {
304 while (head_ != nullptr) {
305 auto* f = head_;
306 head_ = f->internal_next;
307 if (f->internal_next == nullptr) {
308 tail_ = nullptr;
309 }
310 (*f->functor_run)(f, f->internal_success);
311 }
312 CALLBACK_EXEC_CTX = nullptr;
313 if (!(GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags_)) {
314 Fork::DecExecCtxCount();
315 }
316 } else {
317 GPR_DEBUG_ASSERT(head_ == nullptr);
318 GPR_DEBUG_ASSERT(tail_ == nullptr);
319 }
320 }
321
Flags()322 uintptr_t Flags() { return flags_; }
323
Get()324 static ApplicationCallbackExecCtx* Get() { return CALLBACK_EXEC_CTX; }
325
Set(ApplicationCallbackExecCtx * exec_ctx,uintptr_t flags)326 static void Set(ApplicationCallbackExecCtx* exec_ctx, uintptr_t flags) {
327 if (Get() == nullptr) {
328 if (!(GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags)) {
329 Fork::IncExecCtxCount();
330 }
331 CALLBACK_EXEC_CTX = exec_ctx;
332 }
333 }
334
Enqueue(grpc_completion_queue_functor * functor,int is_success)335 static void Enqueue(grpc_completion_queue_functor* functor, int is_success) {
336 functor->internal_success = is_success;
337 functor->internal_next = nullptr;
338
339 ApplicationCallbackExecCtx* ctx = Get();
340
341 if (ctx->head_ == nullptr) {
342 ctx->head_ = functor;
343 }
344 if (ctx->tail_ != nullptr) {
345 ctx->tail_->internal_next = functor;
346 }
347 ctx->tail_ = functor;
348 }
349
Available()350 static bool Available() { return Get() != nullptr; }
351
352 private:
353 uintptr_t flags_{0u};
354 grpc_completion_queue_functor* head_{nullptr};
355 grpc_completion_queue_functor* tail_{nullptr};
356
357 #if !defined(_WIN32) || !defined(_DLL)
358 static thread_local ApplicationCallbackExecCtx* callback_exec_ctx_;
359 #else
360 // cannot be thread_local data member (e.g. callback_exec_ctx_) on windows
361 static ApplicationCallbackExecCtx*& callback_exec_ctx();
362 #endif
363 };
364
365 template <typename F>
EnsureRunInExecCtx(F f)366 void EnsureRunInExecCtx(F f) {
367 if (ExecCtx::Get() == nullptr) {
368 ApplicationCallbackExecCtx app_ctx;
369 ExecCtx exec_ctx;
370 f();
371 } else {
372 f();
373 }
374 }
375
376 #undef EXEC_CTX
377 #undef CALLBACK_EXEC_CTX
378
379 } // namespace grpc_core
380
381 #endif // GRPC_SRC_CORE_LIB_IOMGR_EXEC_CTX_H
382