xref: /aosp_15_r20/external/cronet/base/task/thread_pool/job_task_source.h (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2019 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef BASE_TASK_THREAD_POOL_JOB_TASK_SOURCE_H_
6 #define BASE_TASK_THREAD_POOL_JOB_TASK_SOURCE_H_
7 
8 #include <stddef.h>
9 
10 #include <atomic>
11 #include <limits>
12 #include <optional>
13 #include <utility>
14 
15 #include "base/base_export.h"
16 #include "base/functional/callback.h"
17 #include "base/memory/raw_ptr.h"
18 #include "base/synchronization/condition_variable.h"
19 #include "base/task/common/checked_lock.h"
20 #include "base/task/common/task_annotator.h"
21 #include "base/task/post_job.h"
22 #include "base/task/task_traits.h"
23 #include "base/task/thread_pool/task.h"
24 #include "base/task/thread_pool/task_source.h"
25 #include "base/task/thread_pool/task_source_sort_key.h"
26 
27 namespace base {
28 namespace internal {
29 
30 class PooledTaskRunnerDelegate;
31 
32 // A JobTaskSource generates many Tasks from a single RepeatingClosure.
33 //
34 // Derived classes control the intended concurrency with GetMaxConcurrency().
35 class BASE_EXPORT JobTaskSource : public TaskSource {
36  public:
37   JobTaskSource(const Location& from_here,
38                 const TaskTraits& traits,
39                 RepeatingCallback<void(JobDelegate*)> worker_task,
40                 MaxConcurrencyCallback max_concurrency_callback,
41                 PooledTaskRunnerDelegate* delegate);
42   JobTaskSource(const JobTaskSource&) = delete;
43   JobTaskSource& operator=(const JobTaskSource&) = delete;
44 
CreateJobHandle(scoped_refptr<internal::JobTaskSource> task_source)45   static JobHandle CreateJobHandle(
46       scoped_refptr<internal::JobTaskSource> task_source) {
47     return JobHandle(std::move(task_source));
48   }
49 
50   // Called before the task source is enqueued to initialize task metadata.
51   void WillEnqueue(int sequence_num, TaskAnnotator& annotator);
52 
53   // Notifies this task source that max concurrency was increased, and the
54   // number of worker should be adjusted.
55   void NotifyConcurrencyIncrease();
56 
57   // Informs this JobTaskSource that the current thread would like to join and
58   // contribute to running |worker_task|. Returns true if the joining thread can
59   // contribute (RunJoinTask() can be called), or false if joining was completed
60   // and all other workers returned because either there's no work remaining or
61   // Job was cancelled.
62   bool WillJoin();
63 
64   // Contributes to running |worker_task| and returns true if the joining thread
65   // can contribute again (RunJoinTask() can be called again), or false if
66   // joining was completed and all other workers returned because either there's
67   // no work remaining or Job was cancelled. This should be called only after
68   // WillJoin() or RunJoinTask() previously returned true.
69   bool RunJoinTask();
70 
71   // Cancels this JobTaskSource, causing all workers to yield and WillRunTask()
72   // to return RunStatus::kDisallowed.
73   void Cancel(TaskSource::Transaction* transaction = nullptr);
74 
75   // TaskSource:
76   ExecutionEnvironment GetExecutionEnvironment() override;
77   size_t GetRemainingConcurrency() const override;
78   TaskSourceSortKey GetSortKey() const override;
79   TimeTicks GetDelayedSortKey() const override;
80   bool HasReadyTasks(TimeTicks now) const override;
81 
82   bool IsActive() const;
83   size_t GetWorkerCount() const;
84 
85   // Returns the maximum number of tasks from this TaskSource that can run
86   // concurrently.
87   size_t GetMaxConcurrency() const;
88 
89   uint8_t AcquireTaskId();
90   void ReleaseTaskId(uint8_t task_id);
91 
92   // Returns true if a worker should return from the worker task on the current
93   // thread ASAP.
94   bool ShouldYield();
95 
delegate()96   PooledTaskRunnerDelegate* delegate() const { return delegate_; }
97 
98  private:
99   // Atomic internal state to track the number of workers running a task from
100   // this JobTaskSource and whether this JobTaskSource is canceled. All
101   // operations are performed with std::memory_order_relaxed as State is only
102   // ever modified under a lock or read atomically (optimistic read).
103   class State {
104    public:
105     static constexpr uint32_t kCanceledMask = 1;
106     static constexpr int kWorkerCountBitOffset = 1;
107     static constexpr uint32_t kWorkerCountIncrement = 1
108                                                       << kWorkerCountBitOffset;
109 
110     struct Value {
worker_countValue111       uint8_t worker_count() const {
112         return static_cast<uint8_t>(value >> kWorkerCountBitOffset);
113       }
114       // Returns true if canceled.
is_canceledValue115       bool is_canceled() const { return value & kCanceledMask; }
116 
117       uint32_t value;
118     };
119 
120     State();
121     ~State();
122 
123     // Sets as canceled. Returns the state
124     // before the operation.
125     Value Cancel();
126 
127     // Increments the worker count by 1. Returns the state before the operation.
128     Value IncrementWorkerCount();
129 
130     // Decrements the worker count by 1. Returns the state before the operation.
131     Value DecrementWorkerCount();
132 
133     // Loads and returns the state.
134     Value Load() const;
135 
136    private:
137     std::atomic<uint32_t> value_{0};
138   };
139 
140   // Atomic flag that indicates if the joining thread is currently waiting on
141   // another worker to yield or to signal.
142   class JoinFlag {
143    public:
144     static constexpr uint32_t kNotWaiting = 0;
145     static constexpr uint32_t kWaitingForWorkerToSignal = 1;
146     static constexpr uint32_t kWaitingForWorkerToYield = 3;
147     // kWaitingForWorkerToYield is 3 because the impl relies on the following
148     // property.
149     static_assert((kWaitingForWorkerToYield & kWaitingForWorkerToSignal) ==
150                       kWaitingForWorkerToSignal,
151                   "");
152 
153     JoinFlag();
154     ~JoinFlag();
155 
156     // Returns true if the status is not kNotWaiting, using
157     // std::memory_order_relaxed.
IsWaiting()158     bool IsWaiting() {
159       return value_.load(std::memory_order_relaxed) != kNotWaiting;
160     }
161 
162     // Resets the status as kNotWaiting  using std::memory_order_relaxed.
163     void Reset();
164 
165     // Sets the status as kWaitingForWorkerToYield using
166     // std::memory_order_relaxed.
167     void SetWaiting();
168 
169     // If the flag is kWaitingForWorkerToYield, returns true indicating that the
170     // worker should yield, and atomically updates to kWaitingForWorkerToSignal
171     // (using std::memory_order_relaxed) to ensure that a single worker yields
172     // in response to SetWaiting().
173     bool ShouldWorkerYield();
174 
175     // If the flag is kWaiting*, returns true indicating that the worker should
176     // signal, and atomically updates to kNotWaiting (using
177     // std::memory_order_relaxed) to ensure that a single worker signals in
178     // response to SetWaiting().
179     bool ShouldWorkerSignal();
180 
181    private:
182     std::atomic<uint32_t> value_{kNotWaiting};
183   };
184 
185   ~JobTaskSource() override;
186 
187   // Called from the joining thread. Waits for the worker count to be below or
188   // equal to max concurrency (will happen when a worker calls
189   // DidProcessTask()). Returns true if the joining thread should run a task, or
190   // false if joining was completed and all other workers returned because
191   // either there's no work remaining or Job was cancelled.
192   bool WaitForParticipationOpportunity() EXCLUSIVE_LOCKS_REQUIRED(worker_lock_);
193 
194   size_t GetMaxConcurrency(size_t worker_count) const;
195 
196   // TaskSource:
197   RunStatus WillRunTask() override;
198   Task TakeTask(TaskSource::Transaction* transaction) override;
199   std::optional<Task> Clear(TaskSource::Transaction* transaction) override;
200   bool DidProcessTask(TaskSource::Transaction* transaction) override;
201   bool WillReEnqueue(TimeTicks now,
202                      TaskSource::Transaction* transaction) override;
203   bool OnBecomeReady() override;
204 
205   // Synchronizes access to workers state.
206   mutable CheckedLock worker_lock_{UniversalSuccessor()};
207 
208   // Current atomic state (atomic despite the lock to allow optimistic reads
209   // and cancellation without the lock).
210   State state_ GUARDED_BY(worker_lock_);
211   // Normally, |join_flag_| is protected by |lock_|, except in ShouldYield()
212   // hence the use of atomics.
213   JoinFlag join_flag_ GUARDED_BY(worker_lock_);
214   // Signaled when |join_flag_| is kWaiting* and a worker returns.
215   std::optional<ConditionVariable> worker_released_condition_
216       GUARDED_BY(worker_lock_);
217 
218   std::atomic<uint32_t> assigned_task_ids_{0};
219 
220   RepeatingCallback<size_t(size_t)> max_concurrency_callback_;
221 
222   // Worker task set by the job owner.
223   RepeatingCallback<void(JobDelegate*)> worker_task_;
224   // Task returned from TakeTask(), that calls |worker_task_| internally.
225   RepeatingClosure primary_task_;
226 
227   TaskMetadata task_metadata_;
228 
229   const TimeTicks ready_time_;
230   raw_ptr<PooledTaskRunnerDelegate, LeakedDanglingUntriaged> delegate_;
231 };
232 
233 }  // namespace internal
234 }  // namespace base
235 
236 #endif  // BASE_TASK_THREAD_POOL_JOB_TASK_SOURCE_H_
237