xref: /aosp_15_r20/external/cronet/base/task/post_job.h (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2019 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef BASE_TASK_POST_JOB_H_
6 #define BASE_TASK_POST_JOB_H_
7 
8 #include <limits>
9 
10 #include "base/base_export.h"
11 #include "base/dcheck_is_on.h"
12 #include "base/functional/callback.h"
13 #include "base/location.h"
14 #include "base/memory/stack_allocated.h"
15 
16 namespace base {
17 namespace internal {
18 class JobTaskSource;
19 class PooledTaskRunnerDelegate;
20 }
21 
22 class TaskTraits;
23 enum class TaskPriority : uint8_t;
24 
25 // Delegate that's passed to Job's worker task, providing an entry point to
26 // communicate with the scheduler. To prevent deadlocks, JobDelegate methods
27 // should never be called while holding a user lock.
28 class BASE_EXPORT JobDelegate {
29   STACK_ALLOCATED();
30 
31  public:
32   // A JobDelegate is instantiated for each worker task that is run.
33   // |task_source| is the task source whose worker task is running with this
34   // delegate and |pooled_task_runner_delegate| is used by ShouldYield() to
35   // check whether the pool wants this worker task to yield (null if this worker
36   // should never yield -- e.g. when the main thread is a worker).
37   JobDelegate(internal::JobTaskSource* task_source,
38               internal::PooledTaskRunnerDelegate* pooled_task_runner_delegate);
39 
40   JobDelegate(const JobDelegate&) = delete;
41   JobDelegate& operator=(const JobDelegate&) = delete;
42 
43   ~JobDelegate();
44 
45   // Returns true if this thread *must* return from the worker task on the
46   // current thread ASAP. Workers should periodically invoke ShouldYield (or
47   // YieldIfNeeded()) as often as is reasonable.
48   bool ShouldYield();
49 
50   // If ShouldYield(), this will pause the current thread (allowing it to be
51   // replaced in the pool); no-ops otherwise. If it pauses, it will resume and
52   // return from this call whenever higher priority work completes.
53   // Prefer ShouldYield() over this (only use YieldIfNeeded() when unwinding
54   // the stack is not possible).
55   void YieldIfNeeded();
56 
57   // Notifies the scheduler that max concurrency was increased, and the number
58   // of worker should be adjusted accordingly. See PostJob() for more details.
59   void NotifyConcurrencyIncrease();
60 
61   // Returns a task_id unique among threads currently running this job, such
62   // that GetTaskId() < worker count. To achieve this, the same task_id may be
63   // reused by a different thread after a worker_task returns.
64   uint8_t GetTaskId();
65 
66   // Returns true if the current task is called from the thread currently
67   // running JobHandle::Join().
IsJoiningThread()68   bool IsJoiningThread() const {
69     return pooled_task_runner_delegate_ == nullptr;
70   }
71 
72  private:
73   static constexpr uint8_t kInvalidTaskId = std::numeric_limits<uint8_t>::max();
74 
75   internal::JobTaskSource* task_source_ = nullptr;
76   internal::PooledTaskRunnerDelegate* pooled_task_runner_delegate_ = nullptr;
77   uint8_t task_id_ = kInvalidTaskId;
78 
79 #if DCHECK_IS_ON()
80   // Value returned by the last call to ShouldYield().
81   bool last_should_yield_ = false;
82 #endif
83 };
84 
85 // Handle returned when posting a Job. Provides methods to control execution of
86 // the posted Job. To prevent deadlocks, JobHandle methods should never be
87 // called while holding a user lock.
88 class BASE_EXPORT JobHandle {
89  public:
90   JobHandle();
91 
92   JobHandle(const JobHandle&) = delete;
93   JobHandle& operator=(const JobHandle&) = delete;
94 
95   // A job must either be joined, canceled or detached before the JobHandle is
96   // destroyed.
97   ~JobHandle();
98 
99   JobHandle(JobHandle&&);
100   JobHandle& operator=(JobHandle&&);
101 
102   // Returns true if associated with a Job.
103   explicit operator bool() const { return task_source_ != nullptr; }
104 
105   // Returns true if there's any work pending or any worker running.
106   bool IsActive() const;
107 
108   // Update this Job's priority.
109   void UpdatePriority(TaskPriority new_priority);
110 
111   // Notifies the scheduler that max concurrency was increased, and the number
112   // of workers should be adjusted accordingly. See PostJob() for more details.
113   void NotifyConcurrencyIncrease();
114 
115   // Contributes to the job on this thread. Doesn't return until all tasks have
116   // completed and max concurrency becomes 0. This also promotes this Job's
117   // priority to be at least as high as the calling thread's priority. When
118   // called immediately, prefer CreateJob(...).Join() over PostJob(...).Join()
119   // to avoid having too many workers scheduled for executing the workload.
120   void Join();
121 
122   // Forces all existing workers to yield ASAP. Waits until they have all
123   // returned from the Job's callback before returning.
124   void Cancel();
125 
126   // Forces all existing workers to yield ASAP but doesn’t wait for them.
127   // Warning, this is dangerous if the Job's callback is bound to or has access
128   // to state which may be deleted after this call.
129   void CancelAndDetach();
130 
131   // Can be invoked before ~JobHandle() to avoid waiting on the job completing.
132   void Detach();
133 
134  private:
135   friend class internal::JobTaskSource;
136 
137   explicit JobHandle(scoped_refptr<internal::JobTaskSource> task_source);
138 
139   scoped_refptr<internal::JobTaskSource> task_source_;
140 };
141 
142 // Callback used in PostJob() to control the maximum number of threads calling
143 // the worker task concurrently.
144 
145 // Returns the maximum number of threads which may call a job's worker task
146 // concurrently. |worker_count| is the number of threads currently assigned to
147 // this job which some callers may need to determine their return value.
148 using MaxConcurrencyCallback =
149     RepeatingCallback<size_t(size_t /*worker_count*/)>;
150 
151 // Posts a repeating |worker_task| with specific |traits| to run in parallel on
152 // base::ThreadPool.
153 // Returns a JobHandle associated with the Job, which can be joined, canceled or
154 // detached.
155 // ThreadPool APIs, including PostJob() and methods of the returned JobHandle,
156 // must never be called while holding a lock that could be acquired by
157 // |worker_task| or |max_concurrency_callback| -- that could result in a
158 // deadlock. This is because [1] |max_concurrency_callback| may be invoked while
159 // holding internal ThreadPool lock (A), hence |max_concurrency_callback| can
160 // only use a lock (B) if that lock is *never* held while calling back into a
161 // ThreadPool entry point from any thread (A=>B/B=>A deadlock) and [2]
162 // |worker_task| or |max_concurrency_callback| is invoked synchronously from
163 // JobHandle::Join() (A=>JobHandle::Join()=>A deadlock).
164 // To avoid scheduling overhead, |worker_task| should do as much work as
165 // possible in a loop when invoked, and JobDelegate::ShouldYield() should be
166 // periodically invoked to conditionally exit and let the scheduler prioritize
167 // work.
168 //
169 // A canonical implementation of |worker_task| looks like:
170 //   void WorkerTask(JobDelegate* job_delegate) {
171 //     while (!job_delegate->ShouldYield()) {
172 //       auto work_item = worker_queue.TakeWorkItem(); // Smallest unit of work.
173 //       if (!work_item)
174 //         return:
175 //       ProcessWork(work_item);
176 //     }
177 //   }
178 //
179 // |max_concurrency_callback| controls the maximum number of threads calling
180 // |worker_task| concurrently. |worker_task| is only invoked if the number of
181 // threads previously running |worker_task| was less than the value returned by
182 // |max_concurrency_callback|. In general, |max_concurrency_callback| should
183 // return the latest number of incomplete work items (smallest unit of work)
184 // left to processed. JobHandle/JobDelegate::NotifyConcurrencyIncrease() *must*
185 // be invoked shortly after |max_concurrency_callback| starts returning a value
186 // larger than previously returned values. This usually happens when new work
187 // items are added and the API user wants additional threads to invoke
188 // |worker_task| concurrently. The callbacks may be called concurrently on any
189 // thread until the job is complete. If the job handle is detached, the
190 // callbacks may still be called, so they must not access global state that
191 // could be destroyed.
192 //
193 // |traits| requirements:
194 // - base::ThreadPolicy must be specified if the priority of the task runner
195 //   will ever be increased from BEST_EFFORT.
196 JobHandle BASE_EXPORT PostJob(const Location& from_here,
197                               const TaskTraits& traits,
198                               RepeatingCallback<void(JobDelegate*)> worker_task,
199                               MaxConcurrencyCallback max_concurrency_callback);
200 
201 // Creates and returns a JobHandle associated with a Job. Unlike PostJob(), this
202 // doesn't immediately schedules |worker_task| to run on base::ThreadPool
203 // workers; the Job is then scheduled by calling either
204 // NotifyConcurrencyIncrease() or Join().
205 JobHandle BASE_EXPORT
206 CreateJob(const Location& from_here,
207           const TaskTraits& traits,
208           RepeatingCallback<void(JobDelegate*)> worker_task,
209           MaxConcurrencyCallback max_concurrency_callback);
210 
211 }  // namespace base
212 
213 #endif  // BASE_TASK_POST_JOB_H_
214