1*3f982cf4SFabien Sanglard // Copyright 2019 The Chromium Authors. All rights reserved. 2*3f982cf4SFabien Sanglard // Use of this source code is governed by a BSD-style license that can be 3*3f982cf4SFabien Sanglard // found in the LICENSE file. 4*3f982cf4SFabien Sanglard 5*3f982cf4SFabien Sanglard #ifndef PLATFORM_IMPL_TASK_RUNNER_H_ 6*3f982cf4SFabien Sanglard #define PLATFORM_IMPL_TASK_RUNNER_H_ 7*3f982cf4SFabien Sanglard 8*3f982cf4SFabien Sanglard #include <condition_variable> // NOLINT 9*3f982cf4SFabien Sanglard #include <map> 10*3f982cf4SFabien Sanglard #include <memory> 11*3f982cf4SFabien Sanglard #include <mutex> 12*3f982cf4SFabien Sanglard #include <thread> 13*3f982cf4SFabien Sanglard #include <utility> 14*3f982cf4SFabien Sanglard #include <vector> 15*3f982cf4SFabien Sanglard 16*3f982cf4SFabien Sanglard #include "absl/base/thread_annotations.h" 17*3f982cf4SFabien Sanglard #include "absl/types/optional.h" 18*3f982cf4SFabien Sanglard #include "platform/api/task_runner.h" 19*3f982cf4SFabien Sanglard #include "platform/api/time.h" 20*3f982cf4SFabien Sanglard #include "platform/base/error.h" 21*3f982cf4SFabien Sanglard #include "util/trace_logging.h" 22*3f982cf4SFabien Sanglard 23*3f982cf4SFabien Sanglard namespace openscreen { 24*3f982cf4SFabien Sanglard 25*3f982cf4SFabien Sanglard class TaskRunnerImpl final : public TaskRunner { 26*3f982cf4SFabien Sanglard public: 27*3f982cf4SFabien Sanglard using Task = TaskRunner::Task; 28*3f982cf4SFabien Sanglard 29*3f982cf4SFabien Sanglard class TaskWaiter { 30*3f982cf4SFabien Sanglard public: 31*3f982cf4SFabien Sanglard virtual ~TaskWaiter() = default; 32*3f982cf4SFabien Sanglard 33*3f982cf4SFabien Sanglard // These calls should be thread-safe. The absolute minimum is that 34*3f982cf4SFabien Sanglard // OnTaskPosted must be safe to call from another thread while this is 35*3f982cf4SFabien Sanglard // inside WaitForTaskToBePosted. NOTE: There may be spurious wakeups from 36*3f982cf4SFabien Sanglard // WaitForTaskToBePosted depending on whether the specific implementation 37*3f982cf4SFabien Sanglard // chooses to clear queued WakeUps before entering WaitForTaskToBePosted. 38*3f982cf4SFabien Sanglard 39*3f982cf4SFabien Sanglard // Blocks until some event occurs, which means new tasks may have been 40*3f982cf4SFabien Sanglard // posted. Wait may only block up to |timeout| where 0 means don't block at 41*3f982cf4SFabien Sanglard // all (not block forever). 42*3f982cf4SFabien Sanglard virtual Error WaitForTaskToBePosted(Clock::duration timeout) = 0; 43*3f982cf4SFabien Sanglard 44*3f982cf4SFabien Sanglard // If a WaitForTaskToBePosted call is currently blocking, unblock it 45*3f982cf4SFabien Sanglard // immediately. 46*3f982cf4SFabien Sanglard virtual void OnTaskPosted() = 0; 47*3f982cf4SFabien Sanglard }; 48*3f982cf4SFabien Sanglard 49*3f982cf4SFabien Sanglard explicit TaskRunnerImpl( 50*3f982cf4SFabien Sanglard ClockNowFunctionPtr now_function, 51*3f982cf4SFabien Sanglard TaskWaiter* event_waiter = nullptr, 52*3f982cf4SFabien Sanglard Clock::duration waiter_timeout = std::chrono::milliseconds(100)); 53*3f982cf4SFabien Sanglard 54*3f982cf4SFabien Sanglard // TaskRunner overrides 55*3f982cf4SFabien Sanglard ~TaskRunnerImpl() final; 56*3f982cf4SFabien Sanglard void PostPackagedTask(Task task) final; 57*3f982cf4SFabien Sanglard void PostPackagedTaskWithDelay(Task task, Clock::duration delay) final; 58*3f982cf4SFabien Sanglard bool IsRunningOnTaskRunner() final; 59*3f982cf4SFabien Sanglard 60*3f982cf4SFabien Sanglard // Blocks the current thread, executing tasks from the queue with the desired 61*3f982cf4SFabien Sanglard // timing; and does not return until some time after RequestStopSoon() is 62*3f982cf4SFabien Sanglard // called. 63*3f982cf4SFabien Sanglard void RunUntilStopped(); 64*3f982cf4SFabien Sanglard 65*3f982cf4SFabien Sanglard // Blocks the current thread, executing tasks from the queue with the desired 66*3f982cf4SFabien Sanglard // timing; and does not return until some time after the current process is 67*3f982cf4SFabien Sanglard // signaled with SIGINT or SIGTERM, or after RequestStopSoon() is called. 68*3f982cf4SFabien Sanglard void RunUntilSignaled(); 69*3f982cf4SFabien Sanglard 70*3f982cf4SFabien Sanglard // Thread-safe method for requesting the TaskRunner to stop running after all 71*3f982cf4SFabien Sanglard // non-delayed tasks in the queue have run. This behavior allows final 72*3f982cf4SFabien Sanglard // clean-up tasks to be executed before the TaskRunner stops. 73*3f982cf4SFabien Sanglard // 74*3f982cf4SFabien Sanglard // If any non-delayed tasks post additional non-delayed tasks, those will be 75*3f982cf4SFabien Sanglard // run as well before returning. 76*3f982cf4SFabien Sanglard void RequestStopSoon(); 77*3f982cf4SFabien Sanglard 78*3f982cf4SFabien Sanglard private: 79*3f982cf4SFabien Sanglard #if defined(ENABLE_TRACE_LOGGING) 80*3f982cf4SFabien Sanglard // Wrapper around a Task used to store the TraceId Metadata along with the 81*3f982cf4SFabien Sanglard // task itself, and to set the current TraceIdHierarchy before executing the 82*3f982cf4SFabien Sanglard // task. 83*3f982cf4SFabien Sanglard class TaskWithMetadata { 84*3f982cf4SFabien Sanglard public: 85*3f982cf4SFabien Sanglard // NOTE: 'explicit' keyword omitted so that conversion construtor can be 86*3f982cf4SFabien Sanglard // used. This simplifies switching between 'Task' and 'TaskWithMetadata' 87*3f982cf4SFabien Sanglard // based on the compilation flag. TaskWithMetadata(Task task)88*3f982cf4SFabien Sanglard TaskWithMetadata(Task task) // NOLINT 89*3f982cf4SFabien Sanglard : task_(std::move(task)), trace_ids_(TRACE_HIERARCHY) {} 90*3f982cf4SFabien Sanglard operator()91*3f982cf4SFabien Sanglard void operator()() { 92*3f982cf4SFabien Sanglard TRACE_SET_HIERARCHY(trace_ids_); 93*3f982cf4SFabien Sanglard std::move(task_)(); 94*3f982cf4SFabien Sanglard } 95*3f982cf4SFabien Sanglard 96*3f982cf4SFabien Sanglard private: 97*3f982cf4SFabien Sanglard Task task_; 98*3f982cf4SFabien Sanglard TraceIdHierarchy trace_ids_; 99*3f982cf4SFabien Sanglard }; 100*3f982cf4SFabien Sanglard #else // !defined(ENABLE_TRACE_LOGGING) 101*3f982cf4SFabien Sanglard using TaskWithMetadata = Task; 102*3f982cf4SFabien Sanglard #endif // defined(ENABLE_TRACE_LOGGING) 103*3f982cf4SFabien Sanglard 104*3f982cf4SFabien Sanglard // Helper that runs all tasks in |running_tasks_| and then clears it. 105*3f982cf4SFabien Sanglard void RunRunnableTasks(); 106*3f982cf4SFabien Sanglard 107*3f982cf4SFabien Sanglard // Look at all tasks in the delayed task queue, then schedule them if the 108*3f982cf4SFabien Sanglard // minimum delay time has elapsed. 109*3f982cf4SFabien Sanglard void ScheduleDelayedTasks(); 110*3f982cf4SFabien Sanglard 111*3f982cf4SFabien Sanglard // Transfers all ready-to-run tasks from |tasks_| to |running_tasks_|. If 112*3f982cf4SFabien Sanglard // there are no ready-to-run tasks, and |is_running_| is true, this method 113*3f982cf4SFabien Sanglard // will block waiting for new tasks. Returns true if any tasks were 114*3f982cf4SFabien Sanglard // transferred. 115*3f982cf4SFabien Sanglard bool GrabMoreRunnableTasks(); 116*3f982cf4SFabien Sanglard 117*3f982cf4SFabien Sanglard const ClockNowFunctionPtr now_function_; 118*3f982cf4SFabien Sanglard 119*3f982cf4SFabien Sanglard // Flag that indicates whether the task runner loop should continue. This is 120*3f982cf4SFabien Sanglard // only meant to be read/written on the thread executing RunUntilStopped(). 121*3f982cf4SFabien Sanglard bool is_running_; 122*3f982cf4SFabien Sanglard 123*3f982cf4SFabien Sanglard // This mutex is used for |tasks_| and |delayed_tasks_|, and also for 124*3f982cf4SFabien Sanglard // notifying the run loop to wake up when it is waiting for a task to be added 125*3f982cf4SFabien Sanglard // to the queue in |run_loop_wakeup_|. 126*3f982cf4SFabien Sanglard std::mutex task_mutex_; 127*3f982cf4SFabien Sanglard std::vector<TaskWithMetadata> tasks_ GUARDED_BY(task_mutex_); 128*3f982cf4SFabien Sanglard std::multimap<Clock::time_point, TaskWithMetadata> delayed_tasks_ 129*3f982cf4SFabien Sanglard GUARDED_BY(task_mutex_); 130*3f982cf4SFabien Sanglard 131*3f982cf4SFabien Sanglard // When |task_waiter_| is nullptr, |run_loop_wakeup_| is used for sleeping the 132*3f982cf4SFabien Sanglard // task runner. Otherwise, |run_loop_wakeup_| isn't used and |task_waiter_| 133*3f982cf4SFabien Sanglard // is used instead (along with |waiter_timeout_|). 134*3f982cf4SFabien Sanglard std::condition_variable run_loop_wakeup_; 135*3f982cf4SFabien Sanglard TaskWaiter* const task_waiter_; 136*3f982cf4SFabien Sanglard Clock::duration waiter_timeout_; 137*3f982cf4SFabien Sanglard 138*3f982cf4SFabien Sanglard // To prevent excessive re-allocation of the underlying array of the |tasks_| 139*3f982cf4SFabien Sanglard // vector, use an A/B vector-swap mechanism. |running_tasks_| starts out 140*3f982cf4SFabien Sanglard // empty, and is swapped with |tasks_| when it is time to run the Tasks. 141*3f982cf4SFabien Sanglard std::vector<TaskWithMetadata> running_tasks_; 142*3f982cf4SFabien Sanglard 143*3f982cf4SFabien Sanglard std::thread::id task_runner_thread_id_; 144*3f982cf4SFabien Sanglard 145*3f982cf4SFabien Sanglard OSP_DISALLOW_COPY_AND_ASSIGN(TaskRunnerImpl); 146*3f982cf4SFabien Sanglard }; 147*3f982cf4SFabien Sanglard } // namespace openscreen 148*3f982cf4SFabien Sanglard 149*3f982cf4SFabien Sanglard #endif // PLATFORM_IMPL_TASK_RUNNER_H_ 150