xref: /aosp_15_r20/external/libgav1/src/utils/threadpool.h (revision 095378508e87ed692bf8dfeb34008b65b3735891)
1*09537850SAkhilesh Sanikop /*
2*09537850SAkhilesh Sanikop  * Copyright 2019 The libgav1 Authors
3*09537850SAkhilesh Sanikop  *
4*09537850SAkhilesh Sanikop  * Licensed under the Apache License, Version 2.0 (the "License");
5*09537850SAkhilesh Sanikop  * you may not use this file except in compliance with the License.
6*09537850SAkhilesh Sanikop  * You may obtain a copy of the License at
7*09537850SAkhilesh Sanikop  *
8*09537850SAkhilesh Sanikop  *      http://www.apache.org/licenses/LICENSE-2.0
9*09537850SAkhilesh Sanikop  *
10*09537850SAkhilesh Sanikop  * Unless required by applicable law or agreed to in writing, software
11*09537850SAkhilesh Sanikop  * distributed under the License is distributed on an "AS IS" BASIS,
12*09537850SAkhilesh Sanikop  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13*09537850SAkhilesh Sanikop  * See the License for the specific language governing permissions and
14*09537850SAkhilesh Sanikop  * limitations under the License.
15*09537850SAkhilesh Sanikop  */
16*09537850SAkhilesh Sanikop 
17*09537850SAkhilesh Sanikop #ifndef LIBGAV1_SRC_UTILS_THREADPOOL_H_
18*09537850SAkhilesh Sanikop #define LIBGAV1_SRC_UTILS_THREADPOOL_H_
19*09537850SAkhilesh Sanikop 
20*09537850SAkhilesh Sanikop #include <functional>
21*09537850SAkhilesh Sanikop #include <memory>
22*09537850SAkhilesh Sanikop 
23*09537850SAkhilesh Sanikop #if defined(__APPLE__)
24*09537850SAkhilesh Sanikop #include <TargetConditionals.h>
25*09537850SAkhilesh Sanikop #endif
26*09537850SAkhilesh Sanikop 
27*09537850SAkhilesh Sanikop #if !defined(LIBGAV1_THREADPOOL_USE_STD_MUTEX)
28*09537850SAkhilesh Sanikop #if defined(__ANDROID__) || (defined(TARGET_OS_IPHONE) && TARGET_OS_IPHONE)
29*09537850SAkhilesh Sanikop #define LIBGAV1_THREADPOOL_USE_STD_MUTEX 1
30*09537850SAkhilesh Sanikop #else
31*09537850SAkhilesh Sanikop #define LIBGAV1_THREADPOOL_USE_STD_MUTEX 0
32*09537850SAkhilesh Sanikop #endif
33*09537850SAkhilesh Sanikop #endif
34*09537850SAkhilesh Sanikop 
35*09537850SAkhilesh Sanikop #if LIBGAV1_THREADPOOL_USE_STD_MUTEX
36*09537850SAkhilesh Sanikop #include <condition_variable>  // NOLINT (unapproved c++11 header)
37*09537850SAkhilesh Sanikop #include <mutex>               // NOLINT (unapproved c++11 header)
38*09537850SAkhilesh Sanikop #else
39*09537850SAkhilesh Sanikop // absl::Mutex & absl::CondVar are significantly faster than the pthread
40*09537850SAkhilesh Sanikop // variants on platforms other than Android. iOS may deadlock on Shutdown()
41*09537850SAkhilesh Sanikop // using absl, see b/142251739.
42*09537850SAkhilesh Sanikop #include "absl/base/thread_annotations.h"
43*09537850SAkhilesh Sanikop #include "absl/synchronization/mutex.h"
44*09537850SAkhilesh Sanikop #endif
45*09537850SAkhilesh Sanikop 
46*09537850SAkhilesh Sanikop #include "src/utils/compiler_attributes.h"
47*09537850SAkhilesh Sanikop #include "src/utils/executor.h"
48*09537850SAkhilesh Sanikop #include "src/utils/memory.h"
49*09537850SAkhilesh Sanikop #include "src/utils/unbounded_queue.h"
50*09537850SAkhilesh Sanikop 
51*09537850SAkhilesh Sanikop namespace libgav1 {
52*09537850SAkhilesh Sanikop 
53*09537850SAkhilesh Sanikop // An implementation of ThreadPool using POSIX threads (pthreads) or Windows
54*09537850SAkhilesh Sanikop // threads.
55*09537850SAkhilesh Sanikop //
56*09537850SAkhilesh Sanikop // - The pool allocates a fixed number of worker threads on instantiation.
57*09537850SAkhilesh Sanikop // - The worker threads will pick up work jobs as they arrive.
58*09537850SAkhilesh Sanikop // - If all workers are busy, work jobs are queued for later execution.
59*09537850SAkhilesh Sanikop //
60*09537850SAkhilesh Sanikop // The thread pool is shut down when the pool is destroyed.
61*09537850SAkhilesh Sanikop //
62*09537850SAkhilesh Sanikop // Example usage of the thread pool:
63*09537850SAkhilesh Sanikop //   {
64*09537850SAkhilesh Sanikop //     std::unique_ptr<ThreadPool> pool = ThreadPool::Create(4);
65*09537850SAkhilesh Sanikop //     for (int i = 0; i < 100; ++i) {  // Dispatch 100 jobs.
66*09537850SAkhilesh Sanikop //       pool->Schedule([&my_data]() { MyFunction(&my_data); });
67*09537850SAkhilesh Sanikop //     }
68*09537850SAkhilesh Sanikop //   } // ThreadPool gets destroyed only when all jobs are done.
69*09537850SAkhilesh Sanikop class ThreadPool : public Executor, public Allocable {
70*09537850SAkhilesh Sanikop  public:
71*09537850SAkhilesh Sanikop   // Creates the thread pool with the specified number of worker threads.
72*09537850SAkhilesh Sanikop   // If num_threads is 1, the closures are run in FIFO order.
73*09537850SAkhilesh Sanikop   static std::unique_ptr<ThreadPool> Create(int num_threads);
74*09537850SAkhilesh Sanikop 
75*09537850SAkhilesh Sanikop   // Like the above factory method, but also sets the name prefix for threads.
76*09537850SAkhilesh Sanikop   static std::unique_ptr<ThreadPool> Create(const char name_prefix[],
77*09537850SAkhilesh Sanikop                                             int num_threads);
78*09537850SAkhilesh Sanikop 
79*09537850SAkhilesh Sanikop   // The destructor will shut down the thread pool and all jobs are executed.
80*09537850SAkhilesh Sanikop   // Note that after shutdown, the thread pool does not accept further jobs.
81*09537850SAkhilesh Sanikop   ~ThreadPool() override;
82*09537850SAkhilesh Sanikop 
83*09537850SAkhilesh Sanikop   // Adds the specified "closure" to the queue for processing. If worker threads
84*09537850SAkhilesh Sanikop   // are available, "closure" will run immediately. Otherwise "closure" is
85*09537850SAkhilesh Sanikop   // queued for later execution.
86*09537850SAkhilesh Sanikop   //
87*09537850SAkhilesh Sanikop   // NOTE: If the internal queue is full and cannot be resized because of an
88*09537850SAkhilesh Sanikop   // out-of-memory error, the current thread runs "closure" before returning
89*09537850SAkhilesh Sanikop   // from Schedule(). For our use cases, this seems better than the
90*09537850SAkhilesh Sanikop   // alternatives:
91*09537850SAkhilesh Sanikop   //   1. Return a failure status.
92*09537850SAkhilesh Sanikop   //   2. Have the current thread wait until the queue is not full.
93*09537850SAkhilesh Sanikop   void Schedule(std::function<void()> closure) override;
94*09537850SAkhilesh Sanikop 
95*09537850SAkhilesh Sanikop   int num_threads() const;
96*09537850SAkhilesh Sanikop 
97*09537850SAkhilesh Sanikop  private:
98*09537850SAkhilesh Sanikop   class WorkerThread;
99*09537850SAkhilesh Sanikop 
100*09537850SAkhilesh Sanikop   // Creates the thread pool with the specified number of worker threads.
101*09537850SAkhilesh Sanikop   // If num_threads is 1, the closures are run in FIFO order.
102*09537850SAkhilesh Sanikop   ThreadPool(const char name_prefix[], std::unique_ptr<WorkerThread*[]> threads,
103*09537850SAkhilesh Sanikop              int num_threads);
104*09537850SAkhilesh Sanikop 
105*09537850SAkhilesh Sanikop   // Starts the worker pool.
106*09537850SAkhilesh Sanikop   LIBGAV1_MUST_USE_RESULT bool StartWorkers();
107*09537850SAkhilesh Sanikop 
108*09537850SAkhilesh Sanikop   void WorkerFunction();
109*09537850SAkhilesh Sanikop 
110*09537850SAkhilesh Sanikop   // Shuts down the thread pool, i.e. worker threads finish their work and
111*09537850SAkhilesh Sanikop   // pick up new jobs until the queue is empty. This call will block until
112*09537850SAkhilesh Sanikop   // the shutdown is complete.
113*09537850SAkhilesh Sanikop   //
114*09537850SAkhilesh Sanikop   // Note: If a worker encounters an empty queue after this call, it will exit.
115*09537850SAkhilesh Sanikop   // Other workers might still be running, and if the queue fills up again, the
116*09537850SAkhilesh Sanikop   // thread pool will continue to operate with a decreased number of workers.
117*09537850SAkhilesh Sanikop   // It is up to the caller to prevent adding new jobs.
118*09537850SAkhilesh Sanikop   void Shutdown();
119*09537850SAkhilesh Sanikop 
120*09537850SAkhilesh Sanikop #if LIBGAV1_THREADPOOL_USE_STD_MUTEX
121*09537850SAkhilesh Sanikop 
LockMutex()122*09537850SAkhilesh Sanikop   void LockMutex() { queue_mutex_.lock(); }
UnlockMutex()123*09537850SAkhilesh Sanikop   void UnlockMutex() { queue_mutex_.unlock(); }
124*09537850SAkhilesh Sanikop 
Wait()125*09537850SAkhilesh Sanikop   void Wait() {
126*09537850SAkhilesh Sanikop     std::unique_lock<std::mutex> queue_lock(queue_mutex_, std::adopt_lock);
127*09537850SAkhilesh Sanikop     condition_.wait(queue_lock);
128*09537850SAkhilesh Sanikop     queue_lock.release();
129*09537850SAkhilesh Sanikop   }
130*09537850SAkhilesh Sanikop 
SignalOne()131*09537850SAkhilesh Sanikop   void SignalOne() { condition_.notify_one(); }
SignalAll()132*09537850SAkhilesh Sanikop   void SignalAll() { condition_.notify_all(); }
133*09537850SAkhilesh Sanikop 
134*09537850SAkhilesh Sanikop   std::condition_variable condition_;
135*09537850SAkhilesh Sanikop   std::mutex queue_mutex_;
136*09537850SAkhilesh Sanikop 
137*09537850SAkhilesh Sanikop #else  // !LIBGAV1_THREADPOOL_USE_STD_MUTEX
138*09537850SAkhilesh Sanikop 
LockMutex()139*09537850SAkhilesh Sanikop   void LockMutex() ABSL_EXCLUSIVE_LOCK_FUNCTION() { queue_mutex_.Lock(); }
UnlockMutex()140*09537850SAkhilesh Sanikop   void UnlockMutex() ABSL_UNLOCK_FUNCTION() { queue_mutex_.Unlock(); }
Wait()141*09537850SAkhilesh Sanikop   void Wait() { condition_.Wait(&queue_mutex_); }
SignalOne()142*09537850SAkhilesh Sanikop   void SignalOne() { condition_.Signal(); }
SignalAll()143*09537850SAkhilesh Sanikop   void SignalAll() { condition_.SignalAll(); }
144*09537850SAkhilesh Sanikop 
145*09537850SAkhilesh Sanikop   absl::CondVar condition_;
146*09537850SAkhilesh Sanikop   absl::Mutex queue_mutex_;
147*09537850SAkhilesh Sanikop 
148*09537850SAkhilesh Sanikop #endif  // LIBGAV1_THREADPOOL_USE_STD_MUTEX
149*09537850SAkhilesh Sanikop 
150*09537850SAkhilesh Sanikop   UnboundedQueue<std::function<void()>> queue_ LIBGAV1_GUARDED_BY(queue_mutex_);
151*09537850SAkhilesh Sanikop   // If not all the worker threads are created, the first entry after the
152*09537850SAkhilesh Sanikop   // created worker threads is a null pointer.
153*09537850SAkhilesh Sanikop   const std::unique_ptr<WorkerThread*[]> threads_;
154*09537850SAkhilesh Sanikop 
155*09537850SAkhilesh Sanikop   bool exit_threads_ LIBGAV1_GUARDED_BY(queue_mutex_) = false;
156*09537850SAkhilesh Sanikop   const int num_threads_ = 0;
157*09537850SAkhilesh Sanikop   // name_prefix_ is a C string, whose length is restricted to 16 characters,
158*09537850SAkhilesh Sanikop   // including the terminating null byte ('\0'). This restriction comes from
159*09537850SAkhilesh Sanikop   // the Linux pthread_setname_np() function.
160*09537850SAkhilesh Sanikop   char name_prefix_[16];
161*09537850SAkhilesh Sanikop };
162*09537850SAkhilesh Sanikop 
163*09537850SAkhilesh Sanikop }  // namespace libgav1
164*09537850SAkhilesh Sanikop 
165*09537850SAkhilesh Sanikop #undef LIBGAV1_THREADPOOL_USE_STD_MUTEX
166*09537850SAkhilesh Sanikop 
167*09537850SAkhilesh Sanikop #endif  // LIBGAV1_SRC_UTILS_THREADPOOL_H_
168