1 // Copyright 2023 The gRPC Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 #ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_THREAD_POOL_THREAD_COUNT_H 15 #define GRPC_SRC_CORE_LIB_EVENT_ENGINE_THREAD_POOL_THREAD_COUNT_H 16 17 #include <grpc/support/port_platform.h> 18 19 #include <atomic> 20 #include <cstddef> 21 #include <cstdint> 22 #include <numeric> 23 #include <utility> 24 #include <vector> 25 26 #include "absl/base/thread_annotations.h" 27 28 #include <grpc/support/cpu.h> 29 30 #include "src/core/lib/gpr/useful.h" 31 #include "src/core/lib/gprpp/sync.h" 32 #include "src/core/lib/gprpp/time.h" 33 34 namespace grpc_event_engine { 35 namespace experimental { 36 37 // Tracks counts across some fixed number of shards. 38 // It is intended for fast increment/decrement operations, but a slower overall 39 // count operation. 40 class BusyThreadCount { 41 public: 42 // Increments a per-shard counter on construction, decrements on destruction. 43 class AutoThreadCounter { 44 public: AutoThreadCounter(BusyThreadCount * counter,size_t idx)45 AutoThreadCounter(BusyThreadCount* counter, size_t idx) 46 : counter_(counter), idx_(idx) { 47 counter_->Increment(idx_); 48 } ~AutoThreadCounter()49 ~AutoThreadCounter() { 50 if (counter_ != nullptr) counter_->Decrement(idx_); 51 } 52 // not copyable 53 AutoThreadCounter(const AutoThreadCounter&) = delete; 54 AutoThreadCounter& operator=(const AutoThreadCounter&) = delete; 55 // moveable AutoThreadCounter(AutoThreadCounter && other)56 AutoThreadCounter(AutoThreadCounter&& other) noexcept { 57 counter_ = std::exchange(other.counter_, nullptr); 58 idx_ = other.idx_; 59 } 60 AutoThreadCounter& operator=(AutoThreadCounter&& other) noexcept { 61 counter_ = std::exchange(other.counter_, nullptr); 62 idx_ = other.idx_; 63 return *this; 64 } 65 66 private: 67 BusyThreadCount* counter_; 68 size_t idx_; 69 }; 70 BusyThreadCount()71 BusyThreadCount() : shards_(grpc_core::Clamp(gpr_cpu_num_cores(), 2u, 64u)) {} MakeAutoThreadCounter(size_t idx)72 AutoThreadCounter MakeAutoThreadCounter(size_t idx) { 73 return AutoThreadCounter(this, idx); 74 }; Increment(size_t idx)75 void Increment(size_t idx) { 76 shards_[idx].busy_count.fetch_add(1, std::memory_order_relaxed); 77 } Decrement(size_t idx)78 void Decrement(size_t idx) { 79 shards_[idx].busy_count.fetch_sub(1, std::memory_order_relaxed); 80 } count()81 size_t count() { 82 return std::accumulate( 83 shards_.begin(), shards_.end(), 0, [](size_t running, ShardedData& d) { 84 return running + d.busy_count.load(std::memory_order_relaxed); 85 }); 86 } 87 // Returns some valid index into the per-shard data, which is rotated on every 88 // call to distribute load and reduce contention. NextIndex()89 size_t NextIndex() { return next_idx_.fetch_add(1) % shards_.size(); } 90 91 private: 92 // We want to ensure that this data structure lands on different cachelines per 93 // cpu. With C++17 we can do so explicitly with an `alignas` specifier. Prior 94 // versions we can at best approximate it by padding the structure. It'll 95 // probably work out ok, but it's not guaranteed across allocators. 96 // TODO(ctiller): When we move to C++17 delete the duplicate definition. 97 #if __cplusplus >= 201703L 98 struct ShardedData { 99 std::atomic<size_t> busy_count{0}; 100 } GPR_ALIGN_STRUCT(GPR_CACHELINE_SIZE); 101 #else 102 struct ShardedDataHeader { 103 std::atomic<size_t> busy_count{0}; 104 }; 105 struct ShardedData : public ShardedDataHeader { 106 uint8_t padding[GPR_CACHELINE_SIZE - sizeof(ShardedDataHeader)]; 107 }; 108 #endif 109 110 std::vector<ShardedData> shards_; 111 std::atomic<size_t> next_idx_{0}; 112 }; 113 114 // Tracks the number of living threads. It is intended for a fast count 115 // operation, with relatively slower increment/decrement operations. 116 class LivingThreadCount { 117 public: 118 // Increments the global counter on construction, decrements on destruction. 119 class AutoThreadCounter { 120 public: AutoThreadCounter(LivingThreadCount * counter)121 explicit AutoThreadCounter(LivingThreadCount* counter) : counter_(counter) { 122 counter_->Increment(); 123 } ~AutoThreadCounter()124 ~AutoThreadCounter() { 125 if (counter_ != nullptr) counter_->Decrement(); 126 } 127 // not copyable 128 AutoThreadCounter(const AutoThreadCounter&) = delete; 129 AutoThreadCounter& operator=(const AutoThreadCounter&) = delete; 130 // moveable AutoThreadCounter(AutoThreadCounter && other)131 AutoThreadCounter(AutoThreadCounter&& other) noexcept { 132 counter_ = std::exchange(other.counter_, nullptr); 133 } 134 AutoThreadCounter& operator=(AutoThreadCounter&& other) noexcept { 135 counter_ = std::exchange(other.counter_, nullptr); 136 return *this; 137 } 138 139 private: 140 LivingThreadCount* counter_; 141 }; 142 MakeAutoThreadCounter()143 AutoThreadCounter MakeAutoThreadCounter() { return AutoThreadCounter(this); }; Increment()144 void Increment() ABSL_LOCKS_EXCLUDED(mu_) { 145 grpc_core::MutexLock lock(&mu_); 146 ++living_count_; 147 cv_.SignalAll(); 148 } Decrement()149 void Decrement() ABSL_LOCKS_EXCLUDED(mu_) { 150 grpc_core::MutexLock lock(&mu_); 151 --living_count_; 152 cv_.SignalAll(); 153 } 154 // Blocks the calling thread until the desired number of threads is reached. 155 // If the thread count does not change for some given `stuck_timeout` 156 // duration, this method returns error. If the thread count does change, the 157 // timeout clock is reset. 158 absl::Status BlockUntilThreadCount(size_t desired_threads, const char* why, 159 grpc_core::Duration stuck_timeout) 160 ABSL_LOCKS_EXCLUDED(mu_); count()161 size_t count() ABSL_LOCKS_EXCLUDED(mu_) { 162 grpc_core::MutexLock lock(&mu_); 163 return CountLocked(); 164 } 165 166 private: CountLocked()167 size_t CountLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { 168 return living_count_; 169 } 170 size_t WaitForCountChange(size_t desired_threads, 171 grpc_core::Duration timeout); 172 173 grpc_core::Mutex mu_; 174 grpc_core::CondVar cv_ ABSL_GUARDED_BY(mu_); 175 size_t living_count_ ABSL_GUARDED_BY(mu_) = 0; 176 }; 177 178 } // namespace experimental 179 } // namespace grpc_event_engine 180 181 #endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_THREAD_POOL_THREAD_COUNT_H 182