1 // Copyright 2018 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #ifndef NET_BASE_PRIORITIZED_TASK_RUNNER_H_
6 #define NET_BASE_PRIORITIZED_TASK_RUNNER_H_
7
8 #include <stdint.h>
9
10 #include <utility>
11 #include <vector>
12
13 #include "base/functional/bind.h"
14 #include "base/functional/callback.h"
15 #include "base/location.h"
16 #include "base/memory/ref_counted.h"
17 #include "base/synchronization/lock.h"
18 #include "base/task/post_task_and_reply_with_result_internal.h"
19 #include "base/task/task_traits.h"
20 #include "base/thread_annotations.h"
21 #include "net/base/net_export.h"
22
23 namespace base {
24 class TaskRunner;
25 } // namespace base
26
27 namespace net {
28
29 namespace internal {
30 template <typename ReturnType>
ReturnAsParamAdapter(base::OnceCallback<ReturnType ()> func,ReturnType * result)31 void ReturnAsParamAdapter(base::OnceCallback<ReturnType()> func,
32 ReturnType* result) {
33 *result = std::move(func).Run();
34 }
35
36 // Adapts a T* result to a callblack that expects a T.
37 template <typename TaskReturnType, typename ReplyArgType>
ReplyAdapter(base::OnceCallback<void (ReplyArgType)> callback,TaskReturnType * result)38 void ReplyAdapter(base::OnceCallback<void(ReplyArgType)> callback,
39 TaskReturnType* result) {
40 std::move(callback).Run(std::move(*result));
41 }
42 } // namespace internal
43
44 // PrioritizedTaskRunner allows for prioritization of posted tasks and their
45 // replies. It provides up to 2^32 priority levels. All tasks posted via the
46 // PrioritizedTaskRunner will run in priority order. All replies from
47 // PostTaskAndReply will also run in priority order. Be careful, as it is
48 // possible to starve a task.
49 class NET_EXPORT_PRIVATE PrioritizedTaskRunner
50 : public base::RefCountedThreadSafe<PrioritizedTaskRunner> {
51 public:
52 enum class ReplyRunnerType { kStandard, kPrioritized };
53 explicit PrioritizedTaskRunner(const base::TaskTraits& task_traits);
54 PrioritizedTaskRunner(const PrioritizedTaskRunner&) = delete;
55 PrioritizedTaskRunner& operator=(const PrioritizedTaskRunner&) = delete;
56
57 // Similar to TaskRunner::PostTaskAndReply, except that the task runs at
58 // |priority|. Priority 0 is the highest priority and will run before other
59 // priority values. Multiple tasks with the same |priority| value are run in
60 // order of posting. The replies are also run in prioritized order on the
61 // calling taskrunner.
62 void PostTaskAndReply(const base::Location& from_here,
63 base::OnceClosure task,
64 base::OnceClosure reply,
65 uint32_t priority);
66
67 // Similar to TaskRunner::PostTaskAndReplyWithResult, except that the task
68 // runs at |priority|. See PostTaskAndReply for a description of |priority|.
69 template <typename TaskReturnType, typename ReplyArgType>
PostTaskAndReplyWithResult(const base::Location & from_here,base::OnceCallback<TaskReturnType ()> task,base::OnceCallback<void (ReplyArgType)> reply,uint32_t priority)70 void PostTaskAndReplyWithResult(const base::Location& from_here,
71 base::OnceCallback<TaskReturnType()> task,
72 base::OnceCallback<void(ReplyArgType)> reply,
73 uint32_t priority) {
74 TaskReturnType* result = new TaskReturnType();
75 return PostTaskAndReply(
76 from_here,
77 base::BindOnce(&internal::ReturnAsParamAdapter<TaskReturnType>,
78 std::move(task), result),
79 base::BindOnce(&internal::ReplyAdapter<TaskReturnType, ReplyArgType>,
80 std::move(reply), base::Owned(result)),
81 priority);
82 }
83
SetTaskRunnerForTesting(scoped_refptr<base::TaskRunner> task_runner)84 void SetTaskRunnerForTesting(scoped_refptr<base::TaskRunner> task_runner) {
85 task_runner_for_testing_ = std::move(task_runner);
86 }
87
88 private:
89 friend class base::RefCountedThreadSafe<PrioritizedTaskRunner>;
90
91 struct Job {
92 Job(const base::Location& from_here,
93 base::OnceClosure task,
94 base::OnceClosure reply,
95 uint32_t priority,
96 uint32_t task_count);
97 Job();
98 Job(const Job&) = delete;
99 Job& operator=(const Job&) = delete;
100 ~Job();
101
102 Job(Job&& other);
103 Job& operator=(Job&& other);
104
105 base::Location from_here;
106 base::OnceClosure task;
107 base::OnceClosure reply;
108 uint32_t priority = 0;
109 uint32_t task_count = 0;
110 };
111
112 struct JobComparer;
113
114 // A heap of Jobs. Thread-safe.
115 class JobPriorityQueue {
116 public:
117 JobPriorityQueue();
118 ~JobPriorityQueue();
119
120 JobPriorityQueue(const JobPriorityQueue&) = delete;
121 JobPriorityQueue& operator=(const JobPriorityQueue&) = delete;
122
123 // Add a Job to the heap.
124 void Push(Job job);
125
126 // Return the current highest-priority job and remove it from the heap.
127 Job Pop();
128
129 private:
130 // This cannot be a std::priority_queue because there is no way to extract
131 // a move-only type from a std::priority_queue.
132 std::vector<Job> heap_ GUARDED_BY(lock_);
133 base::Lock lock_;
134 };
135
136 void RunTaskAndPostReply();
137 void RunReply();
138
139 ~PrioritizedTaskRunner();
140
141 // Accessed on both task_runner_ and the reply task runner.
142 JobPriorityQueue task_jobs_;
143 JobPriorityQueue reply_jobs_;
144
145 const base::TaskTraits task_traits_;
146 scoped_refptr<base::TaskRunner> task_runner_for_testing_;
147
148 // Used to preserve order of jobs of equal priority. This can overflow and
149 // cause periodic priority inversion. This should be infrequent enough to be
150 // of negligible impact.
151 uint32_t task_count_ = 0;
152 };
153
154 } // namespace net
155
156 #endif // NET_BASE_PRIORITIZED_TASK_RUNNER_H_
157