xref: /aosp_15_r20/external/grpc-grpc/src/core/lib/event_engine/thread_pool/thread_count.h (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2023 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_THREAD_POOL_THREAD_COUNT_H
15 #define GRPC_SRC_CORE_LIB_EVENT_ENGINE_THREAD_POOL_THREAD_COUNT_H
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include <atomic>
20 #include <cstddef>
21 #include <cstdint>
22 #include <numeric>
23 #include <utility>
24 #include <vector>
25 
26 #include "absl/base/thread_annotations.h"
27 
28 #include <grpc/support/cpu.h>
29 
30 #include "src/core/lib/gpr/useful.h"
31 #include "src/core/lib/gprpp/sync.h"
32 #include "src/core/lib/gprpp/time.h"
33 
34 namespace grpc_event_engine {
35 namespace experimental {
36 
37 // Tracks counts across some fixed number of shards.
38 // It is intended for fast increment/decrement operations, but a slower overall
39 // count operation.
40 class BusyThreadCount {
41  public:
42   // Increments a per-shard counter on construction, decrements on destruction.
43   class AutoThreadCounter {
44    public:
AutoThreadCounter(BusyThreadCount * counter,size_t idx)45     AutoThreadCounter(BusyThreadCount* counter, size_t idx)
46         : counter_(counter), idx_(idx) {
47       counter_->Increment(idx_);
48     }
~AutoThreadCounter()49     ~AutoThreadCounter() {
50       if (counter_ != nullptr) counter_->Decrement(idx_);
51     }
52     // not copyable
53     AutoThreadCounter(const AutoThreadCounter&) = delete;
54     AutoThreadCounter& operator=(const AutoThreadCounter&) = delete;
55     // moveable
AutoThreadCounter(AutoThreadCounter && other)56     AutoThreadCounter(AutoThreadCounter&& other) noexcept {
57       counter_ = std::exchange(other.counter_, nullptr);
58       idx_ = other.idx_;
59     }
60     AutoThreadCounter& operator=(AutoThreadCounter&& other) noexcept {
61       counter_ = std::exchange(other.counter_, nullptr);
62       idx_ = other.idx_;
63       return *this;
64     }
65 
66    private:
67     BusyThreadCount* counter_;
68     size_t idx_;
69   };
70 
BusyThreadCount()71   BusyThreadCount() : shards_(grpc_core::Clamp(gpr_cpu_num_cores(), 2u, 64u)) {}
MakeAutoThreadCounter(size_t idx)72   AutoThreadCounter MakeAutoThreadCounter(size_t idx) {
73     return AutoThreadCounter(this, idx);
74   };
Increment(size_t idx)75   void Increment(size_t idx) {
76     shards_[idx].busy_count.fetch_add(1, std::memory_order_relaxed);
77   }
Decrement(size_t idx)78   void Decrement(size_t idx) {
79     shards_[idx].busy_count.fetch_sub(1, std::memory_order_relaxed);
80   }
count()81   size_t count() {
82     return std::accumulate(
83         shards_.begin(), shards_.end(), 0, [](size_t running, ShardedData& d) {
84           return running + d.busy_count.load(std::memory_order_relaxed);
85         });
86   }
87   // Returns some valid index into the per-shard data, which is rotated on every
88   // call to distribute load and reduce contention.
NextIndex()89   size_t NextIndex() { return next_idx_.fetch_add(1) % shards_.size(); }
90 
91  private:
92 // We want to ensure that this data structure lands on different cachelines per
93 // cpu. With C++17 we can do so explicitly with an `alignas` specifier. Prior
94 // versions we can at best approximate it by padding the structure. It'll
95 // probably work out ok, but it's not guaranteed across allocators.
96 // TODO(ctiller): When we move to C++17 delete the duplicate definition.
97 #if __cplusplus >= 201703L
98   struct ShardedData {
99     std::atomic<size_t> busy_count{0};
100   } GPR_ALIGN_STRUCT(GPR_CACHELINE_SIZE);
101 #else
102   struct ShardedDataHeader {
103     std::atomic<size_t> busy_count{0};
104   };
105   struct ShardedData : public ShardedDataHeader {
106     uint8_t padding[GPR_CACHELINE_SIZE - sizeof(ShardedDataHeader)];
107   };
108 #endif
109 
110   std::vector<ShardedData> shards_;
111   std::atomic<size_t> next_idx_{0};
112 };
113 
114 // Tracks the number of living threads. It is intended for a fast count
115 // operation, with relatively slower increment/decrement operations.
116 class LivingThreadCount {
117  public:
118   // Increments the global counter on construction, decrements on destruction.
119   class AutoThreadCounter {
120    public:
AutoThreadCounter(LivingThreadCount * counter)121     explicit AutoThreadCounter(LivingThreadCount* counter) : counter_(counter) {
122       counter_->Increment();
123     }
~AutoThreadCounter()124     ~AutoThreadCounter() {
125       if (counter_ != nullptr) counter_->Decrement();
126     }
127     // not copyable
128     AutoThreadCounter(const AutoThreadCounter&) = delete;
129     AutoThreadCounter& operator=(const AutoThreadCounter&) = delete;
130     // moveable
AutoThreadCounter(AutoThreadCounter && other)131     AutoThreadCounter(AutoThreadCounter&& other) noexcept {
132       counter_ = std::exchange(other.counter_, nullptr);
133     }
134     AutoThreadCounter& operator=(AutoThreadCounter&& other) noexcept {
135       counter_ = std::exchange(other.counter_, nullptr);
136       return *this;
137     }
138 
139    private:
140     LivingThreadCount* counter_;
141   };
142 
MakeAutoThreadCounter()143   AutoThreadCounter MakeAutoThreadCounter() { return AutoThreadCounter(this); };
Increment()144   void Increment() ABSL_LOCKS_EXCLUDED(mu_) {
145     grpc_core::MutexLock lock(&mu_);
146     ++living_count_;
147     cv_.SignalAll();
148   }
Decrement()149   void Decrement() ABSL_LOCKS_EXCLUDED(mu_) {
150     grpc_core::MutexLock lock(&mu_);
151     --living_count_;
152     cv_.SignalAll();
153   }
154   // Blocks the calling thread until the desired number of threads is reached.
155   // If the thread count does not change for some given `stuck_timeout`
156   // duration, this method returns error. If the thread count does change, the
157   // timeout clock is reset.
158   absl::Status BlockUntilThreadCount(size_t desired_threads, const char* why,
159                                      grpc_core::Duration stuck_timeout)
160       ABSL_LOCKS_EXCLUDED(mu_);
count()161   size_t count() ABSL_LOCKS_EXCLUDED(mu_) {
162     grpc_core::MutexLock lock(&mu_);
163     return CountLocked();
164   }
165 
166  private:
CountLocked()167   size_t CountLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
168     return living_count_;
169   }
170   size_t WaitForCountChange(size_t desired_threads,
171                             grpc_core::Duration timeout);
172 
173   grpc_core::Mutex mu_;
174   grpc_core::CondVar cv_ ABSL_GUARDED_BY(mu_);
175   size_t living_count_ ABSL_GUARDED_BY(mu_) = 0;
176 };
177 
178 }  // namespace experimental
179 }  // namespace grpc_event_engine
180 
181 #endif  // GRPC_SRC_CORE_LIB_EVENT_ENGINE_THREAD_POOL_THREAD_COUNT_H
182