xref: /aosp_15_r20/external/libgav1/src/utils/threadpool.cc (revision 095378508e87ed692bf8dfeb34008b65b3735891)
1 // Copyright 2019 The libgav1 Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/utils/threadpool.h"
16 
17 #if defined(_MSC_VER)
18 #include <process.h>
19 #include <windows.h>
20 #else  // defined(_MSC_VER)
21 #include <pthread.h>
22 #endif  // defined(_MSC_VER)
23 #if defined(__ANDROID__) || defined(__GLIBC__)
24 #include <sys/types.h>
25 #include <unistd.h>
26 #endif
27 #include <algorithm>
28 #include <cassert>
29 #include <cinttypes>
30 #include <cstddef>
31 #include <cstdint>
32 #include <cstdio>
33 #include <cstring>
34 #include <new>
35 #include <utility>
36 
37 #if defined(__ANDROID__)
38 #include <chrono>  // NOLINT (unapproved c++11 header)
39 #endif
40 
41 // Define the GetTid() function, a wrapper for the gettid() system call in
42 // Linux.
43 #if defined(__ANDROID__)
GetTid()44 static pid_t GetTid() { return gettid(); }
45 #elif defined(__GLIBC__)
46 // The glibc wrapper for the gettid() system call was added in glibc 2.30.
47 // Emulate it for older versions of glibc.
48 #if __GLIBC__ > 2 || (__GLIBC__ == 2 && __GLIBC_MINOR__ >= 30)
GetTid()49 static pid_t GetTid() { return gettid(); }
50 #else  // Older than glibc 2.30
51 #include <sys/syscall.h>
52 
GetTid()53 static pid_t GetTid() { return static_cast<pid_t>(syscall(SYS_gettid)); }
54 #endif  // glibc 2.30 or later.
55 #endif  // defined(__GLIBC__)
56 
57 namespace libgav1 {
58 
59 #if defined(__ANDROID__)
60 namespace {
61 
62 using Clock = std::chrono::steady_clock;
63 using Duration = Clock::duration;
64 constexpr Duration kBusyWaitDuration =
65     std::chrono::duration_cast<Duration>(std::chrono::duration<double>(2e-3));
66 
67 }  // namespace
68 #endif  // defined(__ANDROID__)
69 
70 // static
Create(int num_threads)71 std::unique_ptr<ThreadPool> ThreadPool::Create(int num_threads) {
72   return Create(/*name_prefix=*/"", num_threads);
73 }
74 
75 // static
Create(const char name_prefix[],int num_threads)76 std::unique_ptr<ThreadPool> ThreadPool::Create(const char name_prefix[],
77                                                int num_threads) {
78   if (name_prefix == nullptr || num_threads <= 0) return nullptr;
79   std::unique_ptr<WorkerThread*[]> threads(new (std::nothrow)
80                                                WorkerThread*[num_threads]);
81   if (threads == nullptr) return nullptr;
82   std::unique_ptr<ThreadPool> pool(new (std::nothrow) ThreadPool(
83       name_prefix, std::move(threads), num_threads));
84   if (pool != nullptr && !pool->StartWorkers()) {
85     pool = nullptr;
86   }
87   return pool;
88 }
89 
ThreadPool(const char name_prefix[],std::unique_ptr<WorkerThread * []> threads,int num_threads)90 ThreadPool::ThreadPool(const char name_prefix[],
91                        std::unique_ptr<WorkerThread*[]> threads,
92                        int num_threads)
93     : threads_(std::move(threads)), num_threads_(num_threads) {
94   threads_[0] = nullptr;
95   assert(name_prefix != nullptr);
96   const size_t name_prefix_len =
97       std::min(strlen(name_prefix), sizeof(name_prefix_) - 1);
98   memcpy(name_prefix_, name_prefix, name_prefix_len);
99   name_prefix_[name_prefix_len] = '\0';
100 }
101 
~ThreadPool()102 ThreadPool::~ThreadPool() { Shutdown(); }
103 
Schedule(std::function<void ()> closure)104 void ThreadPool::Schedule(std::function<void()> closure) {
105   LockMutex();
106   if (!queue_.GrowIfNeeded()) {
107     // queue_ is full and we can't grow it. Run |closure| directly.
108     UnlockMutex();
109     closure();
110     return;
111   }
112   queue_.Push(std::move(closure));
113   UnlockMutex();
114   SignalOne();
115 }
116 
num_threads() const117 int ThreadPool::num_threads() const { return num_threads_; }
118 
119 // A simple implementation that mirrors the non-portable Thread.  We may
120 // choose to expand this in the future as a portable implementation of
121 // Thread, or replace it at such a time as one is implemented.
122 class ThreadPool::WorkerThread : public Allocable {
123  public:
124   // Creates and starts a thread that runs pool->WorkerFunction().
125   explicit WorkerThread(ThreadPool* pool);
126 
127   // Not copyable or movable.
128   WorkerThread(const WorkerThread&) = delete;
129   WorkerThread& operator=(const WorkerThread&) = delete;
130 
131   // REQUIRES: Join() must have been called if Start() was called and
132   // succeeded.
133   ~WorkerThread() = default;
134 
135   LIBGAV1_MUST_USE_RESULT bool Start();
136 
137   // Joins with the running thread.
138   void Join();
139 
140  private:
141 #if defined(_MSC_VER)
142   static unsigned int __stdcall ThreadBody(void* arg);
143 #else
144   static void* ThreadBody(void* arg);
145 #endif
146 
147   void SetupName();
148   void Run();
149 
150   ThreadPool* pool_;
151 #if defined(_MSC_VER)
152   HANDLE handle_;
153 #else
154   pthread_t thread_;
155 #endif
156 };
157 
WorkerThread(ThreadPool * pool)158 ThreadPool::WorkerThread::WorkerThread(ThreadPool* pool) : pool_(pool) {}
159 
160 #if defined(_MSC_VER)
161 
Start()162 bool ThreadPool::WorkerThread::Start() {
163   // Since our code calls the C run-time library (CRT), use _beginthreadex
164   // rather than CreateThread. Microsoft documentation says "If a thread
165   // created using CreateThread calls the CRT, the CRT may terminate the
166   // process in low-memory conditions."
167   uintptr_t handle = _beginthreadex(
168       /*security=*/nullptr, /*stack_size=*/0, ThreadBody, this,
169       /*initflag=*/CREATE_SUSPENDED, /*thrdaddr=*/nullptr);
170   if (handle == 0) return false;
171   handle_ = reinterpret_cast<HANDLE>(handle);
172   ResumeThread(handle_);
173   return true;
174 }
175 
Join()176 void ThreadPool::WorkerThread::Join() {
177   WaitForSingleObject(handle_, INFINITE);
178   CloseHandle(handle_);
179 }
180 
ThreadBody(void * arg)181 unsigned int ThreadPool::WorkerThread::ThreadBody(void* arg) {
182   auto* thread = static_cast<WorkerThread*>(arg);
183   thread->Run();
184   return 0;
185 }
186 
SetupName()187 void ThreadPool::WorkerThread::SetupName() {
188   // Not currently supported on Windows.
189 }
190 
191 #else  // defined(_MSC_VER)
192 
Start()193 bool ThreadPool::WorkerThread::Start() {
194   return pthread_create(&thread_, nullptr, ThreadBody, this) == 0;
195 }
196 
Join()197 void ThreadPool::WorkerThread::Join() { pthread_join(thread_, nullptr); }
198 
ThreadBody(void * arg)199 void* ThreadPool::WorkerThread::ThreadBody(void* arg) {
200   auto* thread = static_cast<WorkerThread*>(arg);
201   thread->Run();
202   return nullptr;
203 }
204 
SetupName()205 void ThreadPool::WorkerThread::SetupName() {
206   if (pool_->name_prefix_[0] != '\0') {
207 #if defined(__APPLE__)
208     // Apple's version of pthread_setname_np takes one argument and operates on
209     // the current thread only. Also, pthread_mach_thread_np is Apple-specific.
210     // The maximum size of the |name| buffer was noted in the Chromium source
211     // code and was confirmed by experiments.
212     char name[64];
213     mach_port_t id = pthread_mach_thread_np(pthread_self());
214     int rv = snprintf(name, sizeof(name), "%s/%" PRId64, pool_->name_prefix_,
215                       static_cast<int64_t>(id));
216     assert(rv >= 0);
217     rv = pthread_setname_np(name);
218     assert(rv == 0);
219     static_cast<void>(rv);
220 #elif defined(__ANDROID__) || (defined(__GLIBC__) && !defined(__GNU__))
221     // If the |name| buffer is longer than 16 bytes, pthread_setname_np fails
222     // with error 34 (ERANGE) on Android.
223     char name[16];
224     pid_t id = GetTid();
225     int rv = snprintf(name, sizeof(name), "%s/%" PRId64, pool_->name_prefix_,
226                       static_cast<int64_t>(id));
227     assert(rv >= 0);
228     rv = pthread_setname_np(pthread_self(), name);
229     assert(rv == 0);
230     static_cast<void>(rv);
231 #endif
232   }
233 }
234 
235 #endif  // defined(_MSC_VER)
236 
Run()237 void ThreadPool::WorkerThread::Run() {
238   SetupName();
239   pool_->WorkerFunction();
240 }
241 
StartWorkers()242 bool ThreadPool::StartWorkers() {
243   if (!queue_.Init()) return false;
244   for (int i = 0; i < num_threads_; ++i) {
245     threads_[i] = new (std::nothrow) WorkerThread(this);
246     if (threads_[i] == nullptr) return false;
247     if (!threads_[i]->Start()) {
248       delete threads_[i];
249       threads_[i] = nullptr;
250       return false;
251     }
252   }
253   return true;
254 }
255 
WorkerFunction()256 void ThreadPool::WorkerFunction() {
257   LockMutex();
258   while (true) {
259     if (queue_.Empty()) {
260       if (exit_threads_) {
261         break;  // Queue is empty and exit was requested.
262       }
263 #if defined(__ANDROID__)
264       // On android, if we go to a conditional wait right away, the CPU governor
265       // kicks in and starts shutting the cores down. So we do a very small busy
266       // wait to see if we get our next job within that period. This
267       // significantly improves the performance of common cases of tile parallel
268       // decoding. If we don't receive a job in the busy wait time, we then go
269       // to an actual conditional wait as usual.
270       UnlockMutex();
271       bool found_job = false;
272       const auto wait_start = Clock::now();
273       while (Clock::now() - wait_start < kBusyWaitDuration) {
274         LockMutex();
275         if (!queue_.Empty()) {
276           found_job = true;
277           break;
278         }
279         UnlockMutex();
280       }
281       // If |found_job| is true, we simply continue since we already hold the
282       // mutex and we know for sure that the |queue_| is not empty.
283       if (found_job) continue;
284       // Since |found_job_| was false, the mutex is not being held at this
285       // point.
286       LockMutex();
287       // Ensure that the queue is still empty.
288       if (!queue_.Empty()) continue;
289       if (exit_threads_) {
290         break;  // Queue is empty and exit was requested.
291       }
292 #endif  // defined(__ANDROID__)
293       // Queue is still empty, wait for signal or broadcast.
294       Wait();
295     } else {
296       // Take a job from the queue.
297       std::function<void()> job = std::move(queue_.Front());
298       queue_.Pop();
299 
300       UnlockMutex();
301       // Note that it is good practice to surround this with a try/catch so
302       // the thread pool doesn't go to hell if the job throws an exception.
303       // This is omitted here because Google3 doesn't like exceptions.
304       std::move(job)();
305       job = nullptr;
306 
307       LockMutex();
308     }
309   }
310   UnlockMutex();
311 }
312 
Shutdown()313 void ThreadPool::Shutdown() {
314   // Tell worker threads how to exit.
315   LockMutex();
316   exit_threads_ = true;
317   UnlockMutex();
318   SignalAll();
319 
320   // Join all workers. This will block.
321   for (int i = 0; i < num_threads_; ++i) {
322     if (threads_[i] == nullptr) break;
323     threads_[i]->Join();
324     delete threads_[i];
325   }
326 }
327 
328 }  // namespace libgav1
329