xref: /aosp_15_r20/external/webrtc/third_party/abseil-cpp/absl/synchronization/internal/waiter.cc (revision d9f758449e529ab9291ac668be2861e7a55c2422)
1 // Copyright 2017 The Abseil Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "absl/synchronization/internal/waiter.h"
16 
17 #include "absl/base/config.h"
18 
19 #ifdef _WIN32
20 #include <windows.h>
21 #else
22 #include <pthread.h>
23 #include <sys/time.h>
24 #include <unistd.h>
25 #endif
26 
27 #ifdef __linux__
28 #include <linux/futex.h>
29 #include <sys/syscall.h>
30 #endif
31 
32 #ifdef ABSL_HAVE_SEMAPHORE_H
33 #include <semaphore.h>
34 #endif
35 
36 #include <errno.h>
37 #include <stdio.h>
38 #include <time.h>
39 
40 #include <atomic>
41 #include <cassert>
42 #include <cstdint>
43 #include <new>
44 #include <type_traits>
45 
46 #include "absl/base/internal/raw_logging.h"
47 #include "absl/base/internal/thread_identity.h"
48 #include "absl/base/optimization.h"
49 #include "absl/synchronization/internal/kernel_timeout.h"
50 
51 
52 namespace absl {
53 ABSL_NAMESPACE_BEGIN
54 namespace synchronization_internal {
55 
MaybeBecomeIdle()56 static void MaybeBecomeIdle() {
57   base_internal::ThreadIdentity *identity =
58       base_internal::CurrentThreadIdentityIfPresent();
59   assert(identity != nullptr);
60   const bool is_idle = identity->is_idle.load(std::memory_order_relaxed);
61   const int ticker = identity->ticker.load(std::memory_order_relaxed);
62   const int wait_start = identity->wait_start.load(std::memory_order_relaxed);
63   if (!is_idle && ticker - wait_start > Waiter::kIdlePeriods) {
64     identity->is_idle.store(true, std::memory_order_relaxed);
65   }
66 }
67 
68 #if ABSL_WAITER_MODE == ABSL_WAITER_MODE_FUTEX
69 
Waiter()70 Waiter::Waiter() {
71   futex_.store(0, std::memory_order_relaxed);
72 }
73 
Wait(KernelTimeout t)74 bool Waiter::Wait(KernelTimeout t) {
75   // Loop until we can atomically decrement futex from a positive
76   // value, waiting on a futex while we believe it is zero.
77   // Note that, since the thread ticker is just reset, we don't need to check
78   // whether the thread is idle on the very first pass of the loop.
79   bool first_pass = true;
80 
81   while (true) {
82     int32_t x = futex_.load(std::memory_order_relaxed);
83     while (x != 0) {
84       if (!futex_.compare_exchange_weak(x, x - 1,
85                                         std::memory_order_acquire,
86                                         std::memory_order_relaxed)) {
87         continue;  // Raced with someone, retry.
88       }
89       return true;  // Consumed a wakeup, we are done.
90     }
91 
92     if (!first_pass) MaybeBecomeIdle();
93     const int err = Futex::WaitUntil(&futex_, 0, t);
94     if (err != 0) {
95       if (err == -EINTR || err == -EWOULDBLOCK) {
96         // Do nothing, the loop will retry.
97       } else if (err == -ETIMEDOUT) {
98         return false;
99       } else {
100         ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err);
101       }
102     }
103     first_pass = false;
104   }
105 }
106 
Post()107 void Waiter::Post() {
108   if (futex_.fetch_add(1, std::memory_order_release) == 0) {
109     // We incremented from 0, need to wake a potential waiter.
110     Poke();
111   }
112 }
113 
Poke()114 void Waiter::Poke() {
115   // Wake one thread waiting on the futex.
116   const int err = Futex::Wake(&futex_, 1);
117   if (ABSL_PREDICT_FALSE(err < 0)) {
118     ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err);
119   }
120 }
121 
122 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_CONDVAR
123 
124 class PthreadMutexHolder {
125  public:
PthreadMutexHolder(pthread_mutex_t * mu)126   explicit PthreadMutexHolder(pthread_mutex_t *mu) : mu_(mu) {
127     const int err = pthread_mutex_lock(mu_);
128     if (err != 0) {
129       ABSL_RAW_LOG(FATAL, "pthread_mutex_lock failed: %d", err);
130     }
131   }
132 
133   PthreadMutexHolder(const PthreadMutexHolder &rhs) = delete;
134   PthreadMutexHolder &operator=(const PthreadMutexHolder &rhs) = delete;
135 
~PthreadMutexHolder()136   ~PthreadMutexHolder() {
137     const int err = pthread_mutex_unlock(mu_);
138     if (err != 0) {
139       ABSL_RAW_LOG(FATAL, "pthread_mutex_unlock failed: %d", err);
140     }
141   }
142 
143  private:
144   pthread_mutex_t *mu_;
145 };
146 
Waiter()147 Waiter::Waiter() {
148   const int err = pthread_mutex_init(&mu_, 0);
149   if (err != 0) {
150     ABSL_RAW_LOG(FATAL, "pthread_mutex_init failed: %d", err);
151   }
152 
153   const int err2 = pthread_cond_init(&cv_, 0);
154   if (err2 != 0) {
155     ABSL_RAW_LOG(FATAL, "pthread_cond_init failed: %d", err2);
156   }
157 
158   waiter_count_ = 0;
159   wakeup_count_ = 0;
160 }
161 
Wait(KernelTimeout t)162 bool Waiter::Wait(KernelTimeout t) {
163   struct timespec abs_timeout;
164   if (t.has_timeout()) {
165     abs_timeout = t.MakeAbsTimespec();
166   }
167 
168   PthreadMutexHolder h(&mu_);
169   ++waiter_count_;
170   // Loop until we find a wakeup to consume or timeout.
171   // Note that, since the thread ticker is just reset, we don't need to check
172   // whether the thread is idle on the very first pass of the loop.
173   bool first_pass = true;
174   while (wakeup_count_ == 0) {
175     if (!first_pass) MaybeBecomeIdle();
176     // No wakeups available, time to wait.
177     if (!t.has_timeout()) {
178       const int err = pthread_cond_wait(&cv_, &mu_);
179       if (err != 0) {
180         ABSL_RAW_LOG(FATAL, "pthread_cond_wait failed: %d", err);
181       }
182     } else {
183       const int err = pthread_cond_timedwait(&cv_, &mu_, &abs_timeout);
184       if (err == ETIMEDOUT) {
185         --waiter_count_;
186         return false;
187       }
188       if (err != 0) {
189         ABSL_RAW_LOG(FATAL, "pthread_cond_timedwait failed: %d", err);
190       }
191     }
192     first_pass = false;
193   }
194   // Consume a wakeup and we're done.
195   --wakeup_count_;
196   --waiter_count_;
197   return true;
198 }
199 
Post()200 void Waiter::Post() {
201   PthreadMutexHolder h(&mu_);
202   ++wakeup_count_;
203   InternalCondVarPoke();
204 }
205 
Poke()206 void Waiter::Poke() {
207   PthreadMutexHolder h(&mu_);
208   InternalCondVarPoke();
209 }
210 
InternalCondVarPoke()211 void Waiter::InternalCondVarPoke() {
212   if (waiter_count_ != 0) {
213     const int err = pthread_cond_signal(&cv_);
214     if (ABSL_PREDICT_FALSE(err != 0)) {
215       ABSL_RAW_LOG(FATAL, "pthread_cond_signal failed: %d", err);
216     }
217   }
218 }
219 
220 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_SEM
221 
Waiter()222 Waiter::Waiter() {
223   if (sem_init(&sem_, 0, 0) != 0) {
224     ABSL_RAW_LOG(FATAL, "sem_init failed with errno %d\n", errno);
225   }
226   wakeups_.store(0, std::memory_order_relaxed);
227 }
228 
Wait(KernelTimeout t)229 bool Waiter::Wait(KernelTimeout t) {
230   struct timespec abs_timeout;
231   if (t.has_timeout()) {
232     abs_timeout = t.MakeAbsTimespec();
233   }
234 
235   // Loop until we timeout or consume a wakeup.
236   // Note that, since the thread ticker is just reset, we don't need to check
237   // whether the thread is idle on the very first pass of the loop.
238   bool first_pass = true;
239   while (true) {
240     int x = wakeups_.load(std::memory_order_relaxed);
241     while (x != 0) {
242       if (!wakeups_.compare_exchange_weak(x, x - 1,
243                                           std::memory_order_acquire,
244                                           std::memory_order_relaxed)) {
245         continue;  // Raced with someone, retry.
246       }
247       // Successfully consumed a wakeup, we're done.
248       return true;
249     }
250 
251     if (!first_pass) MaybeBecomeIdle();
252     // Nothing to consume, wait (looping on EINTR).
253     while (true) {
254       if (!t.has_timeout()) {
255         if (sem_wait(&sem_) == 0) break;
256         if (errno == EINTR) continue;
257         ABSL_RAW_LOG(FATAL, "sem_wait failed: %d", errno);
258       } else {
259         if (sem_timedwait(&sem_, &abs_timeout) == 0) break;
260         if (errno == EINTR) continue;
261         if (errno == ETIMEDOUT) return false;
262         ABSL_RAW_LOG(FATAL, "sem_timedwait failed: %d", errno);
263       }
264     }
265     first_pass = false;
266   }
267 }
268 
Post()269 void Waiter::Post() {
270   // Post a wakeup.
271   if (wakeups_.fetch_add(1, std::memory_order_release) == 0) {
272     // We incremented from 0, need to wake a potential waiter.
273     Poke();
274   }
275 }
276 
Poke()277 void Waiter::Poke() {
278   if (sem_post(&sem_) != 0) {  // Wake any semaphore waiter.
279     ABSL_RAW_LOG(FATAL, "sem_post failed with errno %d\n", errno);
280   }
281 }
282 
283 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_WIN32
284 
285 class Waiter::WinHelper {
286  public:
GetLock(Waiter * w)287   static SRWLOCK *GetLock(Waiter *w) {
288     return reinterpret_cast<SRWLOCK *>(&w->mu_storage_);
289   }
290 
GetCond(Waiter * w)291   static CONDITION_VARIABLE *GetCond(Waiter *w) {
292     return reinterpret_cast<CONDITION_VARIABLE *>(&w->cv_storage_);
293   }
294 
295   static_assert(sizeof(SRWLOCK) == sizeof(void *),
296                 "`mu_storage_` does not have the same size as SRWLOCK");
297   static_assert(alignof(SRWLOCK) == alignof(void *),
298                 "`mu_storage_` does not have the same alignment as SRWLOCK");
299 
300   static_assert(sizeof(CONDITION_VARIABLE) == sizeof(void *),
301                 "`ABSL_CONDITION_VARIABLE_STORAGE` does not have the same size "
302                 "as `CONDITION_VARIABLE`");
303   static_assert(
304       alignof(CONDITION_VARIABLE) == alignof(void *),
305       "`cv_storage_` does not have the same alignment as `CONDITION_VARIABLE`");
306 
307   // The SRWLOCK and CONDITION_VARIABLE types must be trivially constructible
308   // and destructible because we never call their constructors or destructors.
309   static_assert(std::is_trivially_constructible<SRWLOCK>::value,
310                 "The `SRWLOCK` type must be trivially constructible");
311   static_assert(
312       std::is_trivially_constructible<CONDITION_VARIABLE>::value,
313       "The `CONDITION_VARIABLE` type must be trivially constructible");
314   static_assert(std::is_trivially_destructible<SRWLOCK>::value,
315                 "The `SRWLOCK` type must be trivially destructible");
316   static_assert(std::is_trivially_destructible<CONDITION_VARIABLE>::value,
317                 "The `CONDITION_VARIABLE` type must be trivially destructible");
318 };
319 
320 class LockHolder {
321  public:
LockHolder(SRWLOCK * mu)322   explicit LockHolder(SRWLOCK* mu) : mu_(mu) {
323     AcquireSRWLockExclusive(mu_);
324   }
325 
326   LockHolder(const LockHolder&) = delete;
327   LockHolder& operator=(const LockHolder&) = delete;
328 
~LockHolder()329   ~LockHolder() {
330     ReleaseSRWLockExclusive(mu_);
331   }
332 
333  private:
334   SRWLOCK* mu_;
335 };
336 
Waiter()337 Waiter::Waiter() {
338   auto *mu = ::new (static_cast<void *>(&mu_storage_)) SRWLOCK;
339   auto *cv = ::new (static_cast<void *>(&cv_storage_)) CONDITION_VARIABLE;
340   InitializeSRWLock(mu);
341   InitializeConditionVariable(cv);
342   waiter_count_ = 0;
343   wakeup_count_ = 0;
344 }
345 
Wait(KernelTimeout t)346 bool Waiter::Wait(KernelTimeout t) {
347   SRWLOCK *mu = WinHelper::GetLock(this);
348   CONDITION_VARIABLE *cv = WinHelper::GetCond(this);
349 
350   LockHolder h(mu);
351   ++waiter_count_;
352 
353   // Loop until we find a wakeup to consume or timeout.
354   // Note that, since the thread ticker is just reset, we don't need to check
355   // whether the thread is idle on the very first pass of the loop.
356   bool first_pass = true;
357   while (wakeup_count_ == 0) {
358     if (!first_pass) MaybeBecomeIdle();
359     // No wakeups available, time to wait.
360     if (!SleepConditionVariableSRW(cv, mu, t.InMillisecondsFromNow(), 0)) {
361       // GetLastError() returns a Win32 DWORD, but we assign to
362       // unsigned long to simplify the ABSL_RAW_LOG case below.  The uniform
363       // initialization guarantees this is not a narrowing conversion.
364       const unsigned long err{GetLastError()};  // NOLINT(runtime/int)
365       if (err == ERROR_TIMEOUT) {
366         --waiter_count_;
367         return false;
368       } else {
369         ABSL_RAW_LOG(FATAL, "SleepConditionVariableSRW failed: %lu", err);
370       }
371     }
372     first_pass = false;
373   }
374   // Consume a wakeup and we're done.
375   --wakeup_count_;
376   --waiter_count_;
377   return true;
378 }
379 
Post()380 void Waiter::Post() {
381   LockHolder h(WinHelper::GetLock(this));
382   ++wakeup_count_;
383   InternalCondVarPoke();
384 }
385 
Poke()386 void Waiter::Poke() {
387   LockHolder h(WinHelper::GetLock(this));
388   InternalCondVarPoke();
389 }
390 
InternalCondVarPoke()391 void Waiter::InternalCondVarPoke() {
392   if (waiter_count_ != 0) {
393     WakeConditionVariable(WinHelper::GetCond(this));
394   }
395 }
396 
397 #else
398 #error Unknown ABSL_WAITER_MODE
399 #endif
400 
401 }  // namespace synchronization_internal
402 ABSL_NAMESPACE_END
403 }  // namespace absl
404