xref: /aosp_15_r20/external/openscreen/platform/impl/task_runner.h (revision 3f982cf4871df8771c9d4abe6e9a6f8d829b2736)
1*3f982cf4SFabien Sanglard // Copyright 2019 The Chromium Authors. All rights reserved.
2*3f982cf4SFabien Sanglard // Use of this source code is governed by a BSD-style license that can be
3*3f982cf4SFabien Sanglard // found in the LICENSE file.
4*3f982cf4SFabien Sanglard 
5*3f982cf4SFabien Sanglard #ifndef PLATFORM_IMPL_TASK_RUNNER_H_
6*3f982cf4SFabien Sanglard #define PLATFORM_IMPL_TASK_RUNNER_H_
7*3f982cf4SFabien Sanglard 
8*3f982cf4SFabien Sanglard #include <condition_variable>  // NOLINT
9*3f982cf4SFabien Sanglard #include <map>
10*3f982cf4SFabien Sanglard #include <memory>
11*3f982cf4SFabien Sanglard #include <mutex>
12*3f982cf4SFabien Sanglard #include <thread>
13*3f982cf4SFabien Sanglard #include <utility>
14*3f982cf4SFabien Sanglard #include <vector>
15*3f982cf4SFabien Sanglard 
16*3f982cf4SFabien Sanglard #include "absl/base/thread_annotations.h"
17*3f982cf4SFabien Sanglard #include "absl/types/optional.h"
18*3f982cf4SFabien Sanglard #include "platform/api/task_runner.h"
19*3f982cf4SFabien Sanglard #include "platform/api/time.h"
20*3f982cf4SFabien Sanglard #include "platform/base/error.h"
21*3f982cf4SFabien Sanglard #include "util/trace_logging.h"
22*3f982cf4SFabien Sanglard 
23*3f982cf4SFabien Sanglard namespace openscreen {
24*3f982cf4SFabien Sanglard 
25*3f982cf4SFabien Sanglard class TaskRunnerImpl final : public TaskRunner {
26*3f982cf4SFabien Sanglard  public:
27*3f982cf4SFabien Sanglard   using Task = TaskRunner::Task;
28*3f982cf4SFabien Sanglard 
29*3f982cf4SFabien Sanglard   class TaskWaiter {
30*3f982cf4SFabien Sanglard    public:
31*3f982cf4SFabien Sanglard     virtual ~TaskWaiter() = default;
32*3f982cf4SFabien Sanglard 
33*3f982cf4SFabien Sanglard     // These calls should be thread-safe.  The absolute minimum is that
34*3f982cf4SFabien Sanglard     // OnTaskPosted must be safe to call from another thread while this is
35*3f982cf4SFabien Sanglard     // inside WaitForTaskToBePosted.  NOTE: There may be spurious wakeups from
36*3f982cf4SFabien Sanglard     // WaitForTaskToBePosted depending on whether the specific implementation
37*3f982cf4SFabien Sanglard     // chooses to clear queued WakeUps before entering WaitForTaskToBePosted.
38*3f982cf4SFabien Sanglard 
39*3f982cf4SFabien Sanglard     // Blocks until some event occurs, which means new tasks may have been
40*3f982cf4SFabien Sanglard     // posted.  Wait may only block up to |timeout| where 0 means don't block at
41*3f982cf4SFabien Sanglard     // all (not block forever).
42*3f982cf4SFabien Sanglard     virtual Error WaitForTaskToBePosted(Clock::duration timeout) = 0;
43*3f982cf4SFabien Sanglard 
44*3f982cf4SFabien Sanglard     // If a WaitForTaskToBePosted call is currently blocking, unblock it
45*3f982cf4SFabien Sanglard     // immediately.
46*3f982cf4SFabien Sanglard     virtual void OnTaskPosted() = 0;
47*3f982cf4SFabien Sanglard   };
48*3f982cf4SFabien Sanglard 
49*3f982cf4SFabien Sanglard   explicit TaskRunnerImpl(
50*3f982cf4SFabien Sanglard       ClockNowFunctionPtr now_function,
51*3f982cf4SFabien Sanglard       TaskWaiter* event_waiter = nullptr,
52*3f982cf4SFabien Sanglard       Clock::duration waiter_timeout = std::chrono::milliseconds(100));
53*3f982cf4SFabien Sanglard 
54*3f982cf4SFabien Sanglard   // TaskRunner overrides
55*3f982cf4SFabien Sanglard   ~TaskRunnerImpl() final;
56*3f982cf4SFabien Sanglard   void PostPackagedTask(Task task) final;
57*3f982cf4SFabien Sanglard   void PostPackagedTaskWithDelay(Task task, Clock::duration delay) final;
58*3f982cf4SFabien Sanglard   bool IsRunningOnTaskRunner() final;
59*3f982cf4SFabien Sanglard 
60*3f982cf4SFabien Sanglard   // Blocks the current thread, executing tasks from the queue with the desired
61*3f982cf4SFabien Sanglard   // timing; and does not return until some time after RequestStopSoon() is
62*3f982cf4SFabien Sanglard   // called.
63*3f982cf4SFabien Sanglard   void RunUntilStopped();
64*3f982cf4SFabien Sanglard 
65*3f982cf4SFabien Sanglard   // Blocks the current thread, executing tasks from the queue with the desired
66*3f982cf4SFabien Sanglard   // timing; and does not return until some time after the current process is
67*3f982cf4SFabien Sanglard   // signaled with SIGINT or SIGTERM, or after RequestStopSoon() is called.
68*3f982cf4SFabien Sanglard   void RunUntilSignaled();
69*3f982cf4SFabien Sanglard 
70*3f982cf4SFabien Sanglard   // Thread-safe method for requesting the TaskRunner to stop running after all
71*3f982cf4SFabien Sanglard   // non-delayed tasks in the queue have run. This behavior allows final
72*3f982cf4SFabien Sanglard   // clean-up tasks to be executed before the TaskRunner stops.
73*3f982cf4SFabien Sanglard   //
74*3f982cf4SFabien Sanglard   // If any non-delayed tasks post additional non-delayed tasks, those will be
75*3f982cf4SFabien Sanglard   // run as well before returning.
76*3f982cf4SFabien Sanglard   void RequestStopSoon();
77*3f982cf4SFabien Sanglard 
78*3f982cf4SFabien Sanglard  private:
79*3f982cf4SFabien Sanglard #if defined(ENABLE_TRACE_LOGGING)
80*3f982cf4SFabien Sanglard   // Wrapper around a Task used to store the TraceId Metadata along with the
81*3f982cf4SFabien Sanglard   // task itself, and to set the current TraceIdHierarchy before executing the
82*3f982cf4SFabien Sanglard   // task.
83*3f982cf4SFabien Sanglard   class TaskWithMetadata {
84*3f982cf4SFabien Sanglard    public:
85*3f982cf4SFabien Sanglard     // NOTE: 'explicit' keyword omitted so that conversion construtor can be
86*3f982cf4SFabien Sanglard     // used. This simplifies switching between 'Task' and 'TaskWithMetadata'
87*3f982cf4SFabien Sanglard     // based on the compilation flag.
TaskWithMetadata(Task task)88*3f982cf4SFabien Sanglard     TaskWithMetadata(Task task)  // NOLINT
89*3f982cf4SFabien Sanglard         : task_(std::move(task)), trace_ids_(TRACE_HIERARCHY) {}
90*3f982cf4SFabien Sanglard 
operator()91*3f982cf4SFabien Sanglard     void operator()() {
92*3f982cf4SFabien Sanglard       TRACE_SET_HIERARCHY(trace_ids_);
93*3f982cf4SFabien Sanglard       std::move(task_)();
94*3f982cf4SFabien Sanglard     }
95*3f982cf4SFabien Sanglard 
96*3f982cf4SFabien Sanglard    private:
97*3f982cf4SFabien Sanglard     Task task_;
98*3f982cf4SFabien Sanglard     TraceIdHierarchy trace_ids_;
99*3f982cf4SFabien Sanglard   };
100*3f982cf4SFabien Sanglard #else   // !defined(ENABLE_TRACE_LOGGING)
101*3f982cf4SFabien Sanglard   using TaskWithMetadata = Task;
102*3f982cf4SFabien Sanglard #endif  // defined(ENABLE_TRACE_LOGGING)
103*3f982cf4SFabien Sanglard 
104*3f982cf4SFabien Sanglard   // Helper that runs all tasks in |running_tasks_| and then clears it.
105*3f982cf4SFabien Sanglard   void RunRunnableTasks();
106*3f982cf4SFabien Sanglard 
107*3f982cf4SFabien Sanglard   // Look at all tasks in the delayed task queue, then schedule them if the
108*3f982cf4SFabien Sanglard   // minimum delay time has elapsed.
109*3f982cf4SFabien Sanglard   void ScheduleDelayedTasks();
110*3f982cf4SFabien Sanglard 
111*3f982cf4SFabien Sanglard   // Transfers all ready-to-run tasks from |tasks_| to |running_tasks_|. If
112*3f982cf4SFabien Sanglard   // there are no ready-to-run tasks, and |is_running_| is true, this method
113*3f982cf4SFabien Sanglard   // will block waiting for new tasks. Returns true if any tasks were
114*3f982cf4SFabien Sanglard   // transferred.
115*3f982cf4SFabien Sanglard   bool GrabMoreRunnableTasks();
116*3f982cf4SFabien Sanglard 
117*3f982cf4SFabien Sanglard   const ClockNowFunctionPtr now_function_;
118*3f982cf4SFabien Sanglard 
119*3f982cf4SFabien Sanglard   // Flag that indicates whether the task runner loop should continue. This is
120*3f982cf4SFabien Sanglard   // only meant to be read/written on the thread executing RunUntilStopped().
121*3f982cf4SFabien Sanglard   bool is_running_;
122*3f982cf4SFabien Sanglard 
123*3f982cf4SFabien Sanglard   // This mutex is used for |tasks_| and |delayed_tasks_|, and also for
124*3f982cf4SFabien Sanglard   // notifying the run loop to wake up when it is waiting for a task to be added
125*3f982cf4SFabien Sanglard   // to the queue in |run_loop_wakeup_|.
126*3f982cf4SFabien Sanglard   std::mutex task_mutex_;
127*3f982cf4SFabien Sanglard   std::vector<TaskWithMetadata> tasks_ GUARDED_BY(task_mutex_);
128*3f982cf4SFabien Sanglard   std::multimap<Clock::time_point, TaskWithMetadata> delayed_tasks_
129*3f982cf4SFabien Sanglard       GUARDED_BY(task_mutex_);
130*3f982cf4SFabien Sanglard 
131*3f982cf4SFabien Sanglard   // When |task_waiter_| is nullptr, |run_loop_wakeup_| is used for sleeping the
132*3f982cf4SFabien Sanglard   // task runner.  Otherwise, |run_loop_wakeup_| isn't used and |task_waiter_|
133*3f982cf4SFabien Sanglard   // is used instead (along with |waiter_timeout_|).
134*3f982cf4SFabien Sanglard   std::condition_variable run_loop_wakeup_;
135*3f982cf4SFabien Sanglard   TaskWaiter* const task_waiter_;
136*3f982cf4SFabien Sanglard   Clock::duration waiter_timeout_;
137*3f982cf4SFabien Sanglard 
138*3f982cf4SFabien Sanglard   // To prevent excessive re-allocation of the underlying array of the |tasks_|
139*3f982cf4SFabien Sanglard   // vector, use an A/B vector-swap mechanism. |running_tasks_| starts out
140*3f982cf4SFabien Sanglard   // empty, and is swapped with |tasks_| when it is time to run the Tasks.
141*3f982cf4SFabien Sanglard   std::vector<TaskWithMetadata> running_tasks_;
142*3f982cf4SFabien Sanglard 
143*3f982cf4SFabien Sanglard   std::thread::id task_runner_thread_id_;
144*3f982cf4SFabien Sanglard 
145*3f982cf4SFabien Sanglard   OSP_DISALLOW_COPY_AND_ASSIGN(TaskRunnerImpl);
146*3f982cf4SFabien Sanglard };
147*3f982cf4SFabien Sanglard }  // namespace openscreen
148*3f982cf4SFabien Sanglard 
149*3f982cf4SFabien Sanglard #endif  // PLATFORM_IMPL_TASK_RUNNER_H_
150