xref: /aosp_15_r20/external/webrtc/rtc_base/task_queue_win.cc (revision d9f758449e529ab9291ac668be2861e7a55c2422)
1*d9f75844SAndroid Build Coastguard Worker /*
2*d9f75844SAndroid Build Coastguard Worker  *  Copyright 2016 The WebRTC Project Authors. All rights reserved.
3*d9f75844SAndroid Build Coastguard Worker  *
4*d9f75844SAndroid Build Coastguard Worker  *  Use of this source code is governed by a BSD-style license
5*d9f75844SAndroid Build Coastguard Worker  *  that can be found in the LICENSE file in the root of the source
6*d9f75844SAndroid Build Coastguard Worker  *  tree. An additional intellectual property rights grant can be found
7*d9f75844SAndroid Build Coastguard Worker  *  in the file PATENTS.  All contributing project authors may
8*d9f75844SAndroid Build Coastguard Worker  *  be found in the AUTHORS file in the root of the source tree.
9*d9f75844SAndroid Build Coastguard Worker  */
10*d9f75844SAndroid Build Coastguard Worker 
11*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/task_queue_win.h"
12*d9f75844SAndroid Build Coastguard Worker 
13*d9f75844SAndroid Build Coastguard Worker // clang-format off
14*d9f75844SAndroid Build Coastguard Worker // clang formating would change include order.
15*d9f75844SAndroid Build Coastguard Worker 
16*d9f75844SAndroid Build Coastguard Worker // Include winsock2.h before including <windows.h> to maintain consistency with
17*d9f75844SAndroid Build Coastguard Worker // win32.h. To include win32.h directly, it must be broken out into its own
18*d9f75844SAndroid Build Coastguard Worker // build target.
19*d9f75844SAndroid Build Coastguard Worker #include <winsock2.h>
20*d9f75844SAndroid Build Coastguard Worker #include <windows.h>
21*d9f75844SAndroid Build Coastguard Worker #include <sal.h>       // Must come after windows headers.
22*d9f75844SAndroid Build Coastguard Worker #include <mmsystem.h>  // Must come after windows headers.
23*d9f75844SAndroid Build Coastguard Worker // clang-format on
24*d9f75844SAndroid Build Coastguard Worker #include <string.h>
25*d9f75844SAndroid Build Coastguard Worker 
26*d9f75844SAndroid Build Coastguard Worker #include <algorithm>
27*d9f75844SAndroid Build Coastguard Worker #include <functional>
28*d9f75844SAndroid Build Coastguard Worker #include <memory>
29*d9f75844SAndroid Build Coastguard Worker #include <queue>
30*d9f75844SAndroid Build Coastguard Worker #include <utility>
31*d9f75844SAndroid Build Coastguard Worker 
32*d9f75844SAndroid Build Coastguard Worker #include "absl/functional/any_invocable.h"
33*d9f75844SAndroid Build Coastguard Worker #include "absl/strings/string_view.h"
34*d9f75844SAndroid Build Coastguard Worker #include "absl/types/optional.h"
35*d9f75844SAndroid Build Coastguard Worker #include "api/task_queue/task_queue_base.h"
36*d9f75844SAndroid Build Coastguard Worker #include "api/units/time_delta.h"
37*d9f75844SAndroid Build Coastguard Worker #include "api/units/timestamp.h"
38*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/arraysize.h"
39*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/checks.h"
40*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/event.h"
41*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/logging.h"
42*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/numerics/safe_conversions.h"
43*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/platform_thread.h"
44*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/synchronization/mutex.h"
45*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/time_utils.h"
46*d9f75844SAndroid Build Coastguard Worker 
47*d9f75844SAndroid Build Coastguard Worker namespace webrtc {
48*d9f75844SAndroid Build Coastguard Worker namespace {
49*d9f75844SAndroid Build Coastguard Worker #define WM_QUEUE_DELAYED_TASK WM_USER + 2
50*d9f75844SAndroid Build Coastguard Worker 
InitializeQueueThread(ULONG_PTR param)51*d9f75844SAndroid Build Coastguard Worker void CALLBACK InitializeQueueThread(ULONG_PTR param) {
52*d9f75844SAndroid Build Coastguard Worker   MSG msg;
53*d9f75844SAndroid Build Coastguard Worker   ::PeekMessage(&msg, nullptr, WM_USER, WM_USER, PM_NOREMOVE);
54*d9f75844SAndroid Build Coastguard Worker   rtc::Event* data = reinterpret_cast<rtc::Event*>(param);
55*d9f75844SAndroid Build Coastguard Worker   data->Set();
56*d9f75844SAndroid Build Coastguard Worker }
57*d9f75844SAndroid Build Coastguard Worker 
TaskQueuePriorityToThreadPriority(TaskQueueFactory::Priority priority)58*d9f75844SAndroid Build Coastguard Worker rtc::ThreadPriority TaskQueuePriorityToThreadPriority(
59*d9f75844SAndroid Build Coastguard Worker     TaskQueueFactory::Priority priority) {
60*d9f75844SAndroid Build Coastguard Worker   switch (priority) {
61*d9f75844SAndroid Build Coastguard Worker     case TaskQueueFactory::Priority::HIGH:
62*d9f75844SAndroid Build Coastguard Worker       return rtc::ThreadPriority::kRealtime;
63*d9f75844SAndroid Build Coastguard Worker     case TaskQueueFactory::Priority::LOW:
64*d9f75844SAndroid Build Coastguard Worker       return rtc::ThreadPriority::kLow;
65*d9f75844SAndroid Build Coastguard Worker     case TaskQueueFactory::Priority::NORMAL:
66*d9f75844SAndroid Build Coastguard Worker       return rtc::ThreadPriority::kNormal;
67*d9f75844SAndroid Build Coastguard Worker   }
68*d9f75844SAndroid Build Coastguard Worker }
69*d9f75844SAndroid Build Coastguard Worker 
CurrentTime()70*d9f75844SAndroid Build Coastguard Worker Timestamp CurrentTime() {
71*d9f75844SAndroid Build Coastguard Worker   static const UINT kPeriod = 1;
72*d9f75844SAndroid Build Coastguard Worker   bool high_res = (timeBeginPeriod(kPeriod) == TIMERR_NOERROR);
73*d9f75844SAndroid Build Coastguard Worker   Timestamp ret = Timestamp::Micros(rtc::TimeMicros());
74*d9f75844SAndroid Build Coastguard Worker   if (high_res)
75*d9f75844SAndroid Build Coastguard Worker     timeEndPeriod(kPeriod);
76*d9f75844SAndroid Build Coastguard Worker   return ret;
77*d9f75844SAndroid Build Coastguard Worker }
78*d9f75844SAndroid Build Coastguard Worker 
79*d9f75844SAndroid Build Coastguard Worker class DelayedTaskInfo {
80*d9f75844SAndroid Build Coastguard Worker  public:
81*d9f75844SAndroid Build Coastguard Worker   // Default ctor needed to support priority_queue::pop().
DelayedTaskInfo()82*d9f75844SAndroid Build Coastguard Worker   DelayedTaskInfo() {}
DelayedTaskInfo(TimeDelta delay,absl::AnyInvocable<void ()&&> task)83*d9f75844SAndroid Build Coastguard Worker   DelayedTaskInfo(TimeDelta delay, absl::AnyInvocable<void() &&> task)
84*d9f75844SAndroid Build Coastguard Worker       : due_time_(CurrentTime() + delay), task_(std::move(task)) {}
85*d9f75844SAndroid Build Coastguard Worker   DelayedTaskInfo(DelayedTaskInfo&&) = default;
86*d9f75844SAndroid Build Coastguard Worker 
87*d9f75844SAndroid Build Coastguard Worker   // Implement for priority_queue.
operator >(const DelayedTaskInfo & other) const88*d9f75844SAndroid Build Coastguard Worker   bool operator>(const DelayedTaskInfo& other) const {
89*d9f75844SAndroid Build Coastguard Worker     return due_time_ > other.due_time_;
90*d9f75844SAndroid Build Coastguard Worker   }
91*d9f75844SAndroid Build Coastguard Worker 
92*d9f75844SAndroid Build Coastguard Worker   // Required by priority_queue::pop().
93*d9f75844SAndroid Build Coastguard Worker   DelayedTaskInfo& operator=(DelayedTaskInfo&& other) = default;
94*d9f75844SAndroid Build Coastguard Worker 
95*d9f75844SAndroid Build Coastguard Worker   // See below for why this method is const.
Run() const96*d9f75844SAndroid Build Coastguard Worker   void Run() const {
97*d9f75844SAndroid Build Coastguard Worker     RTC_DCHECK(task_);
98*d9f75844SAndroid Build Coastguard Worker     std::move(task_)();
99*d9f75844SAndroid Build Coastguard Worker   }
100*d9f75844SAndroid Build Coastguard Worker 
due_time() const101*d9f75844SAndroid Build Coastguard Worker   Timestamp due_time() const { return due_time_; }
102*d9f75844SAndroid Build Coastguard Worker 
103*d9f75844SAndroid Build Coastguard Worker  private:
104*d9f75844SAndroid Build Coastguard Worker   Timestamp due_time_ = Timestamp::Zero();
105*d9f75844SAndroid Build Coastguard Worker 
106*d9f75844SAndroid Build Coastguard Worker   // `task` needs to be mutable because std::priority_queue::top() returns
107*d9f75844SAndroid Build Coastguard Worker   // a const reference and a key in an ordered queue must not be changed.
108*d9f75844SAndroid Build Coastguard Worker   // There are two basic workarounds, one using const_cast, which would also
109*d9f75844SAndroid Build Coastguard Worker   // make the key (`due_time`), non-const and the other is to make the non-key
110*d9f75844SAndroid Build Coastguard Worker   // (`task`), mutable.
111*d9f75844SAndroid Build Coastguard Worker   // Because of this, the `task` variable is made private and can only be
112*d9f75844SAndroid Build Coastguard Worker   // mutated by calling the `Run()` method.
113*d9f75844SAndroid Build Coastguard Worker   mutable absl::AnyInvocable<void() &&> task_;
114*d9f75844SAndroid Build Coastguard Worker };
115*d9f75844SAndroid Build Coastguard Worker 
116*d9f75844SAndroid Build Coastguard Worker class MultimediaTimer {
117*d9f75844SAndroid Build Coastguard Worker  public:
118*d9f75844SAndroid Build Coastguard Worker   // Note: We create an event that requires manual reset.
MultimediaTimer()119*d9f75844SAndroid Build Coastguard Worker   MultimediaTimer() : event_(::CreateEvent(nullptr, true, false, nullptr)) {}
120*d9f75844SAndroid Build Coastguard Worker 
~MultimediaTimer()121*d9f75844SAndroid Build Coastguard Worker   ~MultimediaTimer() {
122*d9f75844SAndroid Build Coastguard Worker     Cancel();
123*d9f75844SAndroid Build Coastguard Worker     ::CloseHandle(event_);
124*d9f75844SAndroid Build Coastguard Worker   }
125*d9f75844SAndroid Build Coastguard Worker 
126*d9f75844SAndroid Build Coastguard Worker   MultimediaTimer(const MultimediaTimer&) = delete;
127*d9f75844SAndroid Build Coastguard Worker   MultimediaTimer& operator=(const MultimediaTimer&) = delete;
128*d9f75844SAndroid Build Coastguard Worker 
StartOneShotTimer(UINT delay_ms)129*d9f75844SAndroid Build Coastguard Worker   bool StartOneShotTimer(UINT delay_ms) {
130*d9f75844SAndroid Build Coastguard Worker     RTC_DCHECK_EQ(0, timer_id_);
131*d9f75844SAndroid Build Coastguard Worker     RTC_DCHECK(event_ != nullptr);
132*d9f75844SAndroid Build Coastguard Worker     timer_id_ =
133*d9f75844SAndroid Build Coastguard Worker         ::timeSetEvent(delay_ms, 0, reinterpret_cast<LPTIMECALLBACK>(event_), 0,
134*d9f75844SAndroid Build Coastguard Worker                        TIME_ONESHOT | TIME_CALLBACK_EVENT_SET);
135*d9f75844SAndroid Build Coastguard Worker     return timer_id_ != 0;
136*d9f75844SAndroid Build Coastguard Worker   }
137*d9f75844SAndroid Build Coastguard Worker 
Cancel()138*d9f75844SAndroid Build Coastguard Worker   void Cancel() {
139*d9f75844SAndroid Build Coastguard Worker     if (timer_id_) {
140*d9f75844SAndroid Build Coastguard Worker       ::timeKillEvent(timer_id_);
141*d9f75844SAndroid Build Coastguard Worker       timer_id_ = 0;
142*d9f75844SAndroid Build Coastguard Worker     }
143*d9f75844SAndroid Build Coastguard Worker     // Now that timer is killed and not able to set the event, reset the event.
144*d9f75844SAndroid Build Coastguard Worker     // Doing it in opposite order is racy because event may be set between
145*d9f75844SAndroid Build Coastguard Worker     // event was reset and timer is killed leaving MultimediaTimer in surprising
146*d9f75844SAndroid Build Coastguard Worker     // state where both event is set and timer is canceled.
147*d9f75844SAndroid Build Coastguard Worker     ::ResetEvent(event_);
148*d9f75844SAndroid Build Coastguard Worker   }
149*d9f75844SAndroid Build Coastguard Worker 
event_for_wait()150*d9f75844SAndroid Build Coastguard Worker   HANDLE* event_for_wait() { return &event_; }
151*d9f75844SAndroid Build Coastguard Worker 
152*d9f75844SAndroid Build Coastguard Worker  private:
153*d9f75844SAndroid Build Coastguard Worker   HANDLE event_ = nullptr;
154*d9f75844SAndroid Build Coastguard Worker   MMRESULT timer_id_ = 0;
155*d9f75844SAndroid Build Coastguard Worker };
156*d9f75844SAndroid Build Coastguard Worker 
157*d9f75844SAndroid Build Coastguard Worker class TaskQueueWin : public TaskQueueBase {
158*d9f75844SAndroid Build Coastguard Worker  public:
159*d9f75844SAndroid Build Coastguard Worker   TaskQueueWin(absl::string_view queue_name, rtc::ThreadPriority priority);
160*d9f75844SAndroid Build Coastguard Worker   ~TaskQueueWin() override = default;
161*d9f75844SAndroid Build Coastguard Worker 
162*d9f75844SAndroid Build Coastguard Worker   void Delete() override;
163*d9f75844SAndroid Build Coastguard Worker   void PostTask(absl::AnyInvocable<void() &&> task) override;
164*d9f75844SAndroid Build Coastguard Worker   void PostDelayedTask(absl::AnyInvocable<void() &&> task,
165*d9f75844SAndroid Build Coastguard Worker                        TimeDelta delay) override;
166*d9f75844SAndroid Build Coastguard Worker   void PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
167*d9f75844SAndroid Build Coastguard Worker                                     TimeDelta delay) override;
168*d9f75844SAndroid Build Coastguard Worker   void RunPendingTasks();
169*d9f75844SAndroid Build Coastguard Worker 
170*d9f75844SAndroid Build Coastguard Worker  private:
171*d9f75844SAndroid Build Coastguard Worker   void RunThreadMain();
172*d9f75844SAndroid Build Coastguard Worker   bool ProcessQueuedMessages();
173*d9f75844SAndroid Build Coastguard Worker   void RunDueTasks();
174*d9f75844SAndroid Build Coastguard Worker   void ScheduleNextTimer();
175*d9f75844SAndroid Build Coastguard Worker   void CancelTimers();
176*d9f75844SAndroid Build Coastguard Worker 
177*d9f75844SAndroid Build Coastguard Worker   MultimediaTimer timer_;
178*d9f75844SAndroid Build Coastguard Worker   // Since priority_queue<> by defult orders items in terms of
179*d9f75844SAndroid Build Coastguard Worker   // largest->smallest, using std::less<>, and we want smallest->largest,
180*d9f75844SAndroid Build Coastguard Worker   // we would like to use std::greater<> here.
181*d9f75844SAndroid Build Coastguard Worker   std::priority_queue<DelayedTaskInfo,
182*d9f75844SAndroid Build Coastguard Worker                       std::vector<DelayedTaskInfo>,
183*d9f75844SAndroid Build Coastguard Worker                       std::greater<DelayedTaskInfo>>
184*d9f75844SAndroid Build Coastguard Worker       timer_tasks_;
185*d9f75844SAndroid Build Coastguard Worker   UINT_PTR timer_id_ = 0;
186*d9f75844SAndroid Build Coastguard Worker   rtc::PlatformThread thread_;
187*d9f75844SAndroid Build Coastguard Worker   Mutex pending_lock_;
188*d9f75844SAndroid Build Coastguard Worker   std::queue<absl::AnyInvocable<void() &&>> pending_
189*d9f75844SAndroid Build Coastguard Worker       RTC_GUARDED_BY(pending_lock_);
190*d9f75844SAndroid Build Coastguard Worker   HANDLE in_queue_;
191*d9f75844SAndroid Build Coastguard Worker };
192*d9f75844SAndroid Build Coastguard Worker 
TaskQueueWin(absl::string_view queue_name,rtc::ThreadPriority priority)193*d9f75844SAndroid Build Coastguard Worker TaskQueueWin::TaskQueueWin(absl::string_view queue_name,
194*d9f75844SAndroid Build Coastguard Worker                            rtc::ThreadPriority priority)
195*d9f75844SAndroid Build Coastguard Worker     : in_queue_(::CreateEvent(nullptr, true, false, nullptr)) {
196*d9f75844SAndroid Build Coastguard Worker   RTC_DCHECK(in_queue_);
197*d9f75844SAndroid Build Coastguard Worker   thread_ = rtc::PlatformThread::SpawnJoinable(
198*d9f75844SAndroid Build Coastguard Worker       [this] { RunThreadMain(); }, queue_name,
199*d9f75844SAndroid Build Coastguard Worker       rtc::ThreadAttributes().SetPriority(priority));
200*d9f75844SAndroid Build Coastguard Worker 
201*d9f75844SAndroid Build Coastguard Worker   rtc::Event event(false, false);
202*d9f75844SAndroid Build Coastguard Worker   RTC_CHECK(thread_.QueueAPC(&InitializeQueueThread,
203*d9f75844SAndroid Build Coastguard Worker                              reinterpret_cast<ULONG_PTR>(&event)));
204*d9f75844SAndroid Build Coastguard Worker   event.Wait(rtc::Event::kForever);
205*d9f75844SAndroid Build Coastguard Worker }
206*d9f75844SAndroid Build Coastguard Worker 
Delete()207*d9f75844SAndroid Build Coastguard Worker void TaskQueueWin::Delete() {
208*d9f75844SAndroid Build Coastguard Worker   RTC_DCHECK(!IsCurrent());
209*d9f75844SAndroid Build Coastguard Worker   RTC_CHECK(thread_.GetHandle() != absl::nullopt);
210*d9f75844SAndroid Build Coastguard Worker   while (
211*d9f75844SAndroid Build Coastguard Worker       !::PostThreadMessage(GetThreadId(*thread_.GetHandle()), WM_QUIT, 0, 0)) {
212*d9f75844SAndroid Build Coastguard Worker     RTC_CHECK_EQ(ERROR_NOT_ENOUGH_QUOTA, ::GetLastError());
213*d9f75844SAndroid Build Coastguard Worker     Sleep(1);
214*d9f75844SAndroid Build Coastguard Worker   }
215*d9f75844SAndroid Build Coastguard Worker   thread_.Finalize();
216*d9f75844SAndroid Build Coastguard Worker   ::CloseHandle(in_queue_);
217*d9f75844SAndroid Build Coastguard Worker   delete this;
218*d9f75844SAndroid Build Coastguard Worker }
219*d9f75844SAndroid Build Coastguard Worker 
PostTask(absl::AnyInvocable<void ()&&> task)220*d9f75844SAndroid Build Coastguard Worker void TaskQueueWin::PostTask(absl::AnyInvocable<void() &&> task) {
221*d9f75844SAndroid Build Coastguard Worker   MutexLock lock(&pending_lock_);
222*d9f75844SAndroid Build Coastguard Worker   pending_.push(std::move(task));
223*d9f75844SAndroid Build Coastguard Worker   ::SetEvent(in_queue_);
224*d9f75844SAndroid Build Coastguard Worker }
225*d9f75844SAndroid Build Coastguard Worker 
PostDelayedTask(absl::AnyInvocable<void ()&&> task,TimeDelta delay)226*d9f75844SAndroid Build Coastguard Worker void TaskQueueWin::PostDelayedTask(absl::AnyInvocable<void() &&> task,
227*d9f75844SAndroid Build Coastguard Worker                                    TimeDelta delay) {
228*d9f75844SAndroid Build Coastguard Worker   if (delay <= TimeDelta::Zero()) {
229*d9f75844SAndroid Build Coastguard Worker     PostTask(std::move(task));
230*d9f75844SAndroid Build Coastguard Worker     return;
231*d9f75844SAndroid Build Coastguard Worker   }
232*d9f75844SAndroid Build Coastguard Worker 
233*d9f75844SAndroid Build Coastguard Worker   auto* task_info = new DelayedTaskInfo(delay, std::move(task));
234*d9f75844SAndroid Build Coastguard Worker   RTC_CHECK(thread_.GetHandle() != absl::nullopt);
235*d9f75844SAndroid Build Coastguard Worker   if (!::PostThreadMessage(GetThreadId(*thread_.GetHandle()),
236*d9f75844SAndroid Build Coastguard Worker                            WM_QUEUE_DELAYED_TASK, 0,
237*d9f75844SAndroid Build Coastguard Worker                            reinterpret_cast<LPARAM>(task_info))) {
238*d9f75844SAndroid Build Coastguard Worker     delete task_info;
239*d9f75844SAndroid Build Coastguard Worker   }
240*d9f75844SAndroid Build Coastguard Worker }
241*d9f75844SAndroid Build Coastguard Worker 
PostDelayedHighPrecisionTask(absl::AnyInvocable<void ()&&> task,TimeDelta delay)242*d9f75844SAndroid Build Coastguard Worker void TaskQueueWin::PostDelayedHighPrecisionTask(
243*d9f75844SAndroid Build Coastguard Worker     absl::AnyInvocable<void() &&> task,
244*d9f75844SAndroid Build Coastguard Worker     TimeDelta delay) {
245*d9f75844SAndroid Build Coastguard Worker   PostDelayedTask(std::move(task), delay);
246*d9f75844SAndroid Build Coastguard Worker }
247*d9f75844SAndroid Build Coastguard Worker 
RunPendingTasks()248*d9f75844SAndroid Build Coastguard Worker void TaskQueueWin::RunPendingTasks() {
249*d9f75844SAndroid Build Coastguard Worker   while (true) {
250*d9f75844SAndroid Build Coastguard Worker     absl::AnyInvocable<void() &&> task;
251*d9f75844SAndroid Build Coastguard Worker     {
252*d9f75844SAndroid Build Coastguard Worker       MutexLock lock(&pending_lock_);
253*d9f75844SAndroid Build Coastguard Worker       if (pending_.empty())
254*d9f75844SAndroid Build Coastguard Worker         break;
255*d9f75844SAndroid Build Coastguard Worker       task = std::move(pending_.front());
256*d9f75844SAndroid Build Coastguard Worker       pending_.pop();
257*d9f75844SAndroid Build Coastguard Worker     }
258*d9f75844SAndroid Build Coastguard Worker 
259*d9f75844SAndroid Build Coastguard Worker     std::move(task)();
260*d9f75844SAndroid Build Coastguard Worker   }
261*d9f75844SAndroid Build Coastguard Worker }
262*d9f75844SAndroid Build Coastguard Worker 
RunThreadMain()263*d9f75844SAndroid Build Coastguard Worker void TaskQueueWin::RunThreadMain() {
264*d9f75844SAndroid Build Coastguard Worker   CurrentTaskQueueSetter set_current(this);
265*d9f75844SAndroid Build Coastguard Worker   HANDLE handles[2] = {*timer_.event_for_wait(), in_queue_};
266*d9f75844SAndroid Build Coastguard Worker   while (true) {
267*d9f75844SAndroid Build Coastguard Worker     // Make sure we do an alertable wait as that's required to allow APCs to run
268*d9f75844SAndroid Build Coastguard Worker     // (e.g. required for InitializeQueueThread and stopping the thread in
269*d9f75844SAndroid Build Coastguard Worker     // PlatformThread).
270*d9f75844SAndroid Build Coastguard Worker     DWORD result = ::MsgWaitForMultipleObjectsEx(
271*d9f75844SAndroid Build Coastguard Worker         arraysize(handles), handles, INFINITE, QS_ALLEVENTS, MWMO_ALERTABLE);
272*d9f75844SAndroid Build Coastguard Worker     RTC_CHECK_NE(WAIT_FAILED, result);
273*d9f75844SAndroid Build Coastguard Worker     if (result == (WAIT_OBJECT_0 + 2)) {
274*d9f75844SAndroid Build Coastguard Worker       // There are messages in the message queue that need to be handled.
275*d9f75844SAndroid Build Coastguard Worker       if (!ProcessQueuedMessages())
276*d9f75844SAndroid Build Coastguard Worker         break;
277*d9f75844SAndroid Build Coastguard Worker     }
278*d9f75844SAndroid Build Coastguard Worker 
279*d9f75844SAndroid Build Coastguard Worker     if (result == WAIT_OBJECT_0 ||
280*d9f75844SAndroid Build Coastguard Worker         (!timer_tasks_.empty() &&
281*d9f75844SAndroid Build Coastguard Worker          ::WaitForSingleObject(*timer_.event_for_wait(), 0) == WAIT_OBJECT_0)) {
282*d9f75844SAndroid Build Coastguard Worker       // The multimedia timer was signaled.
283*d9f75844SAndroid Build Coastguard Worker       timer_.Cancel();
284*d9f75844SAndroid Build Coastguard Worker       RunDueTasks();
285*d9f75844SAndroid Build Coastguard Worker       ScheduleNextTimer();
286*d9f75844SAndroid Build Coastguard Worker     }
287*d9f75844SAndroid Build Coastguard Worker 
288*d9f75844SAndroid Build Coastguard Worker     if (result == (WAIT_OBJECT_0 + 1)) {
289*d9f75844SAndroid Build Coastguard Worker       ::ResetEvent(in_queue_);
290*d9f75844SAndroid Build Coastguard Worker       RunPendingTasks();
291*d9f75844SAndroid Build Coastguard Worker     }
292*d9f75844SAndroid Build Coastguard Worker   }
293*d9f75844SAndroid Build Coastguard Worker }
294*d9f75844SAndroid Build Coastguard Worker 
ProcessQueuedMessages()295*d9f75844SAndroid Build Coastguard Worker bool TaskQueueWin::ProcessQueuedMessages() {
296*d9f75844SAndroid Build Coastguard Worker   MSG msg = {};
297*d9f75844SAndroid Build Coastguard Worker   // To protect against overly busy message queues, we limit the time
298*d9f75844SAndroid Build Coastguard Worker   // we process tasks to a few milliseconds. If we don't do that, there's
299*d9f75844SAndroid Build Coastguard Worker   // a chance that timer tasks won't ever run.
300*d9f75844SAndroid Build Coastguard Worker   static constexpr TimeDelta kMaxTaskProcessingTime = TimeDelta::Millis(500);
301*d9f75844SAndroid Build Coastguard Worker   Timestamp start = CurrentTime();
302*d9f75844SAndroid Build Coastguard Worker   while (::PeekMessage(&msg, nullptr, 0, 0, PM_REMOVE) &&
303*d9f75844SAndroid Build Coastguard Worker          msg.message != WM_QUIT) {
304*d9f75844SAndroid Build Coastguard Worker     if (!msg.hwnd) {
305*d9f75844SAndroid Build Coastguard Worker       switch (msg.message) {
306*d9f75844SAndroid Build Coastguard Worker         case WM_QUEUE_DELAYED_TASK: {
307*d9f75844SAndroid Build Coastguard Worker           std::unique_ptr<DelayedTaskInfo> info(
308*d9f75844SAndroid Build Coastguard Worker               reinterpret_cast<DelayedTaskInfo*>(msg.lParam));
309*d9f75844SAndroid Build Coastguard Worker           bool need_to_schedule_timers =
310*d9f75844SAndroid Build Coastguard Worker               timer_tasks_.empty() ||
311*d9f75844SAndroid Build Coastguard Worker               timer_tasks_.top().due_time() > info->due_time();
312*d9f75844SAndroid Build Coastguard Worker           timer_tasks_.push(std::move(*info));
313*d9f75844SAndroid Build Coastguard Worker           if (need_to_schedule_timers) {
314*d9f75844SAndroid Build Coastguard Worker             CancelTimers();
315*d9f75844SAndroid Build Coastguard Worker             ScheduleNextTimer();
316*d9f75844SAndroid Build Coastguard Worker           }
317*d9f75844SAndroid Build Coastguard Worker           break;
318*d9f75844SAndroid Build Coastguard Worker         }
319*d9f75844SAndroid Build Coastguard Worker         case WM_TIMER: {
320*d9f75844SAndroid Build Coastguard Worker           RTC_DCHECK_EQ(timer_id_, msg.wParam);
321*d9f75844SAndroid Build Coastguard Worker           ::KillTimer(nullptr, msg.wParam);
322*d9f75844SAndroid Build Coastguard Worker           timer_id_ = 0;
323*d9f75844SAndroid Build Coastguard Worker           RunDueTasks();
324*d9f75844SAndroid Build Coastguard Worker           ScheduleNextTimer();
325*d9f75844SAndroid Build Coastguard Worker           break;
326*d9f75844SAndroid Build Coastguard Worker         }
327*d9f75844SAndroid Build Coastguard Worker         default:
328*d9f75844SAndroid Build Coastguard Worker           RTC_DCHECK_NOTREACHED();
329*d9f75844SAndroid Build Coastguard Worker           break;
330*d9f75844SAndroid Build Coastguard Worker       }
331*d9f75844SAndroid Build Coastguard Worker     } else {
332*d9f75844SAndroid Build Coastguard Worker       ::TranslateMessage(&msg);
333*d9f75844SAndroid Build Coastguard Worker       ::DispatchMessage(&msg);
334*d9f75844SAndroid Build Coastguard Worker     }
335*d9f75844SAndroid Build Coastguard Worker 
336*d9f75844SAndroid Build Coastguard Worker     if (CurrentTime() > start + kMaxTaskProcessingTime)
337*d9f75844SAndroid Build Coastguard Worker       break;
338*d9f75844SAndroid Build Coastguard Worker   }
339*d9f75844SAndroid Build Coastguard Worker   return msg.message != WM_QUIT;
340*d9f75844SAndroid Build Coastguard Worker }
341*d9f75844SAndroid Build Coastguard Worker 
RunDueTasks()342*d9f75844SAndroid Build Coastguard Worker void TaskQueueWin::RunDueTasks() {
343*d9f75844SAndroid Build Coastguard Worker   RTC_DCHECK(!timer_tasks_.empty());
344*d9f75844SAndroid Build Coastguard Worker   Timestamp now = CurrentTime();
345*d9f75844SAndroid Build Coastguard Worker   do {
346*d9f75844SAndroid Build Coastguard Worker     const auto& top = timer_tasks_.top();
347*d9f75844SAndroid Build Coastguard Worker     if (top.due_time() > now)
348*d9f75844SAndroid Build Coastguard Worker       break;
349*d9f75844SAndroid Build Coastguard Worker     top.Run();
350*d9f75844SAndroid Build Coastguard Worker     timer_tasks_.pop();
351*d9f75844SAndroid Build Coastguard Worker   } while (!timer_tasks_.empty());
352*d9f75844SAndroid Build Coastguard Worker }
353*d9f75844SAndroid Build Coastguard Worker 
ScheduleNextTimer()354*d9f75844SAndroid Build Coastguard Worker void TaskQueueWin::ScheduleNextTimer() {
355*d9f75844SAndroid Build Coastguard Worker   RTC_DCHECK_EQ(timer_id_, 0);
356*d9f75844SAndroid Build Coastguard Worker   if (timer_tasks_.empty())
357*d9f75844SAndroid Build Coastguard Worker     return;
358*d9f75844SAndroid Build Coastguard Worker 
359*d9f75844SAndroid Build Coastguard Worker   const auto& next_task = timer_tasks_.top();
360*d9f75844SAndroid Build Coastguard Worker   TimeDelta delay =
361*d9f75844SAndroid Build Coastguard Worker       std::max(TimeDelta::Zero(), next_task.due_time() - CurrentTime());
362*d9f75844SAndroid Build Coastguard Worker   uint32_t milliseconds = delay.RoundUpTo(TimeDelta::Millis(1)).ms<uint32_t>();
363*d9f75844SAndroid Build Coastguard Worker   if (!timer_.StartOneShotTimer(milliseconds))
364*d9f75844SAndroid Build Coastguard Worker     timer_id_ = ::SetTimer(nullptr, 0, milliseconds, nullptr);
365*d9f75844SAndroid Build Coastguard Worker }
366*d9f75844SAndroid Build Coastguard Worker 
CancelTimers()367*d9f75844SAndroid Build Coastguard Worker void TaskQueueWin::CancelTimers() {
368*d9f75844SAndroid Build Coastguard Worker   timer_.Cancel();
369*d9f75844SAndroid Build Coastguard Worker   if (timer_id_) {
370*d9f75844SAndroid Build Coastguard Worker     ::KillTimer(nullptr, timer_id_);
371*d9f75844SAndroid Build Coastguard Worker     timer_id_ = 0;
372*d9f75844SAndroid Build Coastguard Worker   }
373*d9f75844SAndroid Build Coastguard Worker }
374*d9f75844SAndroid Build Coastguard Worker 
375*d9f75844SAndroid Build Coastguard Worker class TaskQueueWinFactory : public TaskQueueFactory {
376*d9f75844SAndroid Build Coastguard Worker  public:
CreateTaskQueue(absl::string_view name,Priority priority) const377*d9f75844SAndroid Build Coastguard Worker   std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
378*d9f75844SAndroid Build Coastguard Worker       absl::string_view name,
379*d9f75844SAndroid Build Coastguard Worker       Priority priority) const override {
380*d9f75844SAndroid Build Coastguard Worker     return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
381*d9f75844SAndroid Build Coastguard Worker         new TaskQueueWin(name, TaskQueuePriorityToThreadPriority(priority)));
382*d9f75844SAndroid Build Coastguard Worker   }
383*d9f75844SAndroid Build Coastguard Worker };
384*d9f75844SAndroid Build Coastguard Worker 
385*d9f75844SAndroid Build Coastguard Worker }  // namespace
386*d9f75844SAndroid Build Coastguard Worker 
CreateTaskQueueWinFactory()387*d9f75844SAndroid Build Coastguard Worker std::unique_ptr<TaskQueueFactory> CreateTaskQueueWinFactory() {
388*d9f75844SAndroid Build Coastguard Worker   return std::make_unique<TaskQueueWinFactory>();
389*d9f75844SAndroid Build Coastguard Worker }
390*d9f75844SAndroid Build Coastguard Worker 
391*d9f75844SAndroid Build Coastguard Worker }  // namespace webrtc
392