1*09537850SAkhilesh Sanikop /* 2*09537850SAkhilesh Sanikop * Copyright 2019 The libgav1 Authors 3*09537850SAkhilesh Sanikop * 4*09537850SAkhilesh Sanikop * Licensed under the Apache License, Version 2.0 (the "License"); 5*09537850SAkhilesh Sanikop * you may not use this file except in compliance with the License. 6*09537850SAkhilesh Sanikop * You may obtain a copy of the License at 7*09537850SAkhilesh Sanikop * 8*09537850SAkhilesh Sanikop * http://www.apache.org/licenses/LICENSE-2.0 9*09537850SAkhilesh Sanikop * 10*09537850SAkhilesh Sanikop * Unless required by applicable law or agreed to in writing, software 11*09537850SAkhilesh Sanikop * distributed under the License is distributed on an "AS IS" BASIS, 12*09537850SAkhilesh Sanikop * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13*09537850SAkhilesh Sanikop * See the License for the specific language governing permissions and 14*09537850SAkhilesh Sanikop * limitations under the License. 15*09537850SAkhilesh Sanikop */ 16*09537850SAkhilesh Sanikop 17*09537850SAkhilesh Sanikop #ifndef LIBGAV1_SRC_UTILS_THREADPOOL_H_ 18*09537850SAkhilesh Sanikop #define LIBGAV1_SRC_UTILS_THREADPOOL_H_ 19*09537850SAkhilesh Sanikop 20*09537850SAkhilesh Sanikop #include <functional> 21*09537850SAkhilesh Sanikop #include <memory> 22*09537850SAkhilesh Sanikop 23*09537850SAkhilesh Sanikop #if defined(__APPLE__) 24*09537850SAkhilesh Sanikop #include <TargetConditionals.h> 25*09537850SAkhilesh Sanikop #endif 26*09537850SAkhilesh Sanikop 27*09537850SAkhilesh Sanikop #if !defined(LIBGAV1_THREADPOOL_USE_STD_MUTEX) 28*09537850SAkhilesh Sanikop #if defined(__ANDROID__) || (defined(TARGET_OS_IPHONE) && TARGET_OS_IPHONE) 29*09537850SAkhilesh Sanikop #define LIBGAV1_THREADPOOL_USE_STD_MUTEX 1 30*09537850SAkhilesh Sanikop #else 31*09537850SAkhilesh Sanikop #define LIBGAV1_THREADPOOL_USE_STD_MUTEX 0 32*09537850SAkhilesh Sanikop #endif 33*09537850SAkhilesh Sanikop #endif 34*09537850SAkhilesh Sanikop 35*09537850SAkhilesh Sanikop #if LIBGAV1_THREADPOOL_USE_STD_MUTEX 36*09537850SAkhilesh Sanikop #include <condition_variable> // NOLINT (unapproved c++11 header) 37*09537850SAkhilesh Sanikop #include <mutex> // NOLINT (unapproved c++11 header) 38*09537850SAkhilesh Sanikop #else 39*09537850SAkhilesh Sanikop // absl::Mutex & absl::CondVar are significantly faster than the pthread 40*09537850SAkhilesh Sanikop // variants on platforms other than Android. iOS may deadlock on Shutdown() 41*09537850SAkhilesh Sanikop // using absl, see b/142251739. 42*09537850SAkhilesh Sanikop #include "absl/base/thread_annotations.h" 43*09537850SAkhilesh Sanikop #include "absl/synchronization/mutex.h" 44*09537850SAkhilesh Sanikop #endif 45*09537850SAkhilesh Sanikop 46*09537850SAkhilesh Sanikop #include "src/utils/compiler_attributes.h" 47*09537850SAkhilesh Sanikop #include "src/utils/executor.h" 48*09537850SAkhilesh Sanikop #include "src/utils/memory.h" 49*09537850SAkhilesh Sanikop #include "src/utils/unbounded_queue.h" 50*09537850SAkhilesh Sanikop 51*09537850SAkhilesh Sanikop namespace libgav1 { 52*09537850SAkhilesh Sanikop 53*09537850SAkhilesh Sanikop // An implementation of ThreadPool using POSIX threads (pthreads) or Windows 54*09537850SAkhilesh Sanikop // threads. 55*09537850SAkhilesh Sanikop // 56*09537850SAkhilesh Sanikop // - The pool allocates a fixed number of worker threads on instantiation. 57*09537850SAkhilesh Sanikop // - The worker threads will pick up work jobs as they arrive. 58*09537850SAkhilesh Sanikop // - If all workers are busy, work jobs are queued for later execution. 59*09537850SAkhilesh Sanikop // 60*09537850SAkhilesh Sanikop // The thread pool is shut down when the pool is destroyed. 61*09537850SAkhilesh Sanikop // 62*09537850SAkhilesh Sanikop // Example usage of the thread pool: 63*09537850SAkhilesh Sanikop // { 64*09537850SAkhilesh Sanikop // std::unique_ptr<ThreadPool> pool = ThreadPool::Create(4); 65*09537850SAkhilesh Sanikop // for (int i = 0; i < 100; ++i) { // Dispatch 100 jobs. 66*09537850SAkhilesh Sanikop // pool->Schedule([&my_data]() { MyFunction(&my_data); }); 67*09537850SAkhilesh Sanikop // } 68*09537850SAkhilesh Sanikop // } // ThreadPool gets destroyed only when all jobs are done. 69*09537850SAkhilesh Sanikop class ThreadPool : public Executor, public Allocable { 70*09537850SAkhilesh Sanikop public: 71*09537850SAkhilesh Sanikop // Creates the thread pool with the specified number of worker threads. 72*09537850SAkhilesh Sanikop // If num_threads is 1, the closures are run in FIFO order. 73*09537850SAkhilesh Sanikop static std::unique_ptr<ThreadPool> Create(int num_threads); 74*09537850SAkhilesh Sanikop 75*09537850SAkhilesh Sanikop // Like the above factory method, but also sets the name prefix for threads. 76*09537850SAkhilesh Sanikop static std::unique_ptr<ThreadPool> Create(const char name_prefix[], 77*09537850SAkhilesh Sanikop int num_threads); 78*09537850SAkhilesh Sanikop 79*09537850SAkhilesh Sanikop // The destructor will shut down the thread pool and all jobs are executed. 80*09537850SAkhilesh Sanikop // Note that after shutdown, the thread pool does not accept further jobs. 81*09537850SAkhilesh Sanikop ~ThreadPool() override; 82*09537850SAkhilesh Sanikop 83*09537850SAkhilesh Sanikop // Adds the specified "closure" to the queue for processing. If worker threads 84*09537850SAkhilesh Sanikop // are available, "closure" will run immediately. Otherwise "closure" is 85*09537850SAkhilesh Sanikop // queued for later execution. 86*09537850SAkhilesh Sanikop // 87*09537850SAkhilesh Sanikop // NOTE: If the internal queue is full and cannot be resized because of an 88*09537850SAkhilesh Sanikop // out-of-memory error, the current thread runs "closure" before returning 89*09537850SAkhilesh Sanikop // from Schedule(). For our use cases, this seems better than the 90*09537850SAkhilesh Sanikop // alternatives: 91*09537850SAkhilesh Sanikop // 1. Return a failure status. 92*09537850SAkhilesh Sanikop // 2. Have the current thread wait until the queue is not full. 93*09537850SAkhilesh Sanikop void Schedule(std::function<void()> closure) override; 94*09537850SAkhilesh Sanikop 95*09537850SAkhilesh Sanikop int num_threads() const; 96*09537850SAkhilesh Sanikop 97*09537850SAkhilesh Sanikop private: 98*09537850SAkhilesh Sanikop class WorkerThread; 99*09537850SAkhilesh Sanikop 100*09537850SAkhilesh Sanikop // Creates the thread pool with the specified number of worker threads. 101*09537850SAkhilesh Sanikop // If num_threads is 1, the closures are run in FIFO order. 102*09537850SAkhilesh Sanikop ThreadPool(const char name_prefix[], std::unique_ptr<WorkerThread*[]> threads, 103*09537850SAkhilesh Sanikop int num_threads); 104*09537850SAkhilesh Sanikop 105*09537850SAkhilesh Sanikop // Starts the worker pool. 106*09537850SAkhilesh Sanikop LIBGAV1_MUST_USE_RESULT bool StartWorkers(); 107*09537850SAkhilesh Sanikop 108*09537850SAkhilesh Sanikop void WorkerFunction(); 109*09537850SAkhilesh Sanikop 110*09537850SAkhilesh Sanikop // Shuts down the thread pool, i.e. worker threads finish their work and 111*09537850SAkhilesh Sanikop // pick up new jobs until the queue is empty. This call will block until 112*09537850SAkhilesh Sanikop // the shutdown is complete. 113*09537850SAkhilesh Sanikop // 114*09537850SAkhilesh Sanikop // Note: If a worker encounters an empty queue after this call, it will exit. 115*09537850SAkhilesh Sanikop // Other workers might still be running, and if the queue fills up again, the 116*09537850SAkhilesh Sanikop // thread pool will continue to operate with a decreased number of workers. 117*09537850SAkhilesh Sanikop // It is up to the caller to prevent adding new jobs. 118*09537850SAkhilesh Sanikop void Shutdown(); 119*09537850SAkhilesh Sanikop 120*09537850SAkhilesh Sanikop #if LIBGAV1_THREADPOOL_USE_STD_MUTEX 121*09537850SAkhilesh Sanikop LockMutex()122*09537850SAkhilesh Sanikop void LockMutex() { queue_mutex_.lock(); } UnlockMutex()123*09537850SAkhilesh Sanikop void UnlockMutex() { queue_mutex_.unlock(); } 124*09537850SAkhilesh Sanikop Wait()125*09537850SAkhilesh Sanikop void Wait() { 126*09537850SAkhilesh Sanikop std::unique_lock<std::mutex> queue_lock(queue_mutex_, std::adopt_lock); 127*09537850SAkhilesh Sanikop condition_.wait(queue_lock); 128*09537850SAkhilesh Sanikop queue_lock.release(); 129*09537850SAkhilesh Sanikop } 130*09537850SAkhilesh Sanikop SignalOne()131*09537850SAkhilesh Sanikop void SignalOne() { condition_.notify_one(); } SignalAll()132*09537850SAkhilesh Sanikop void SignalAll() { condition_.notify_all(); } 133*09537850SAkhilesh Sanikop 134*09537850SAkhilesh Sanikop std::condition_variable condition_; 135*09537850SAkhilesh Sanikop std::mutex queue_mutex_; 136*09537850SAkhilesh Sanikop 137*09537850SAkhilesh Sanikop #else // !LIBGAV1_THREADPOOL_USE_STD_MUTEX 138*09537850SAkhilesh Sanikop LockMutex()139*09537850SAkhilesh Sanikop void LockMutex() ABSL_EXCLUSIVE_LOCK_FUNCTION() { queue_mutex_.Lock(); } UnlockMutex()140*09537850SAkhilesh Sanikop void UnlockMutex() ABSL_UNLOCK_FUNCTION() { queue_mutex_.Unlock(); } Wait()141*09537850SAkhilesh Sanikop void Wait() { condition_.Wait(&queue_mutex_); } SignalOne()142*09537850SAkhilesh Sanikop void SignalOne() { condition_.Signal(); } SignalAll()143*09537850SAkhilesh Sanikop void SignalAll() { condition_.SignalAll(); } 144*09537850SAkhilesh Sanikop 145*09537850SAkhilesh Sanikop absl::CondVar condition_; 146*09537850SAkhilesh Sanikop absl::Mutex queue_mutex_; 147*09537850SAkhilesh Sanikop 148*09537850SAkhilesh Sanikop #endif // LIBGAV1_THREADPOOL_USE_STD_MUTEX 149*09537850SAkhilesh Sanikop 150*09537850SAkhilesh Sanikop UnboundedQueue<std::function<void()>> queue_ LIBGAV1_GUARDED_BY(queue_mutex_); 151*09537850SAkhilesh Sanikop // If not all the worker threads are created, the first entry after the 152*09537850SAkhilesh Sanikop // created worker threads is a null pointer. 153*09537850SAkhilesh Sanikop const std::unique_ptr<WorkerThread*[]> threads_; 154*09537850SAkhilesh Sanikop 155*09537850SAkhilesh Sanikop bool exit_threads_ LIBGAV1_GUARDED_BY(queue_mutex_) = false; 156*09537850SAkhilesh Sanikop const int num_threads_ = 0; 157*09537850SAkhilesh Sanikop // name_prefix_ is a C string, whose length is restricted to 16 characters, 158*09537850SAkhilesh Sanikop // including the terminating null byte ('\0'). This restriction comes from 159*09537850SAkhilesh Sanikop // the Linux pthread_setname_np() function. 160*09537850SAkhilesh Sanikop char name_prefix_[16]; 161*09537850SAkhilesh Sanikop }; 162*09537850SAkhilesh Sanikop 163*09537850SAkhilesh Sanikop } // namespace libgav1 164*09537850SAkhilesh Sanikop 165*09537850SAkhilesh Sanikop #undef LIBGAV1_THREADPOOL_USE_STD_MUTEX 166*09537850SAkhilesh Sanikop 167*09537850SAkhilesh Sanikop #endif // LIBGAV1_SRC_UTILS_THREADPOOL_H_ 168