1 #include <c10/core/impl/GPUTrace.h>
2 #include <c10/cuda/CUDAFunctions.h>
3 #include <c10/cuda/CUDAGuard.h>
4 #include <c10/cuda/CUDAStream.h>
5 #include <c10/util/CallOnce.h>
6 #include <c10/util/Exception.h>
7 #include <c10/util/irange.h>
8
9 #include <array>
10 #include <atomic>
11 #include <cstdint>
12
13 namespace c10::cuda {
14
15 namespace {
16
17 // Global stream state and constants
18 static c10::once_flag init_flag;
19 static DeviceIndex num_gpus = -1;
20 static constexpr int kStreamsPerPoolBits = 5;
21 static constexpr int kStreamsPerPool = 1 << kStreamsPerPoolBits;
22 static constexpr unsigned int kDefaultFlags = cudaStreamNonBlocking;
23 static constexpr int kStreamTypeBits = 4;
24
25 static int max_stream_priorities;
26
27 // Non-default streams
28 // Note: the number of CUDA devices is determined at run time,
29 // and the low and high priority pools are lazily initialized
30 // when the first stream is requested for a device.
31 // The device flags track the initialization of each device, while
32 // the low and high priority counters track, for each device, the next stream
33 // in the pool to be returned when a stream is requested (round-robin fashion
34 // , see the note in CUDAStream.h).
35 // The streams are "leaked": they are created but never destroyed because the
36 // destruction of global variables could happen after the CUDA runtime has
37 // already been destroyed and thus invoking cudaStreamDestroy could lead to a
38 // crash. It's likely an issue in CUDA, but to be safe - let's just "forget"
39 // the destruction.
40 #if !defined(USE_ROCM)
41 // CUDA-only: used to initializes the stream pools (once)
42 static std::array<c10::once_flag, C10_COMPILE_TIME_MAX_GPUS> device_flags;
43 #endif
44 static std::array<
45 std::array<std::atomic<uint32_t>, C10_COMPILE_TIME_MAX_GPUS>,
46 c10::cuda::max_compile_time_stream_priorities>
47 priority_counters;
48
49 static std::array<
50 std::array<
51 std::array<cudaStream_t, kStreamsPerPool>,
52 C10_COMPILE_TIME_MAX_GPUS>,
53 c10::cuda::max_compile_time_stream_priorities>
54 streams;
55 #ifdef USE_ROCM
56 static c10::once_flag
57 stream_flags[c10::cuda::max_compile_time_stream_priorities]
58 [C10_COMPILE_TIME_MAX_GPUS][kStreamsPerPool];
59 #endif
60
61 // Note [HIP Lazy Streams]
62 // ~~~~~~~~~~~~~~~~~~~~~~~
63 // For ROCm/HIP, each stream is lazily initialized rather than creating all
64 // streams when the first stream is requested. HIP streams are not as
65 // lightweight as CUDA streams; the pooling strategy can affect performance.
66 // Rather than changing the pooling implementation, ROCm/HIP will lazy init
67 // each stream when it is first requested.
68
69 // Note [StreamId assignment]
70 // ~~~~~~~~~~~~~~~~~~~~~~~~~~
71 // How do we assign stream IDs?
72 //
73 // -- 54 bits -- -- 5 bits ----- -- 4 bits -- --1 bit --
74 // zeros stream id index StreamIdType Ext/native stream
75 // ignored for ext ignored for ext
76 // for external stream, StreamID is a cudaStream_t pointer
77 // this means that last bit will always be 0
78 // so when constructing StreamId for a native stream we set last bit to 1
79 // to distinguish between native and external streams
80 //
81 //
82 // We are obligated to treat the stream ID 0 as the default stream, per the
83 // invariant specified in c10::Stream, so this is one exception to
84 // "last bit = 1 for native streams". However, all other numbers are entirely
85 // an internal implementation detail, we reserve the right to renumber streams
86 // however we like.
87 //
88 // Note that it is really important that the MSB is zero; StreamId is a
89 // *signed* integer, and unsigned to signed conversion outside of the
90 // bounds of signed integer representation is undefined behavior. You
91 // could work around this with something like
92 // https://stackoverflow.com/questions/13150449/efficient-unsigned-to-signed-cast-avoiding-implementation-defined-behavior
93 // but it seems a bit overkill for this.
94 //
95 // Also, external managed stream pointers (cudaStream_t) can be directly stored
96 // in the Id field so in this case, we need to check the stream alignment.
97
98 class StreamIdType {
99 // StreamIdType encodes whether this stream is DEFAULT, EXTernal or
100 // for all other native streams, the stream priority (higher value is higher
101 // priority)
102 private:
103 uint8_t stream_type;
104
105 public:
106 static const uint8_t DEFAULT = 0x0;
107 static const uint8_t EXT = 0xF;
108
109 public:
StreamIdType(const uint8_t _stream_type)110 StreamIdType(const uint8_t _stream_type) : stream_type(_stream_type) {}
111
isExt() const112 bool isExt() const {
113 return EXT == stream_type;
114 }
115
isDefault() const116 bool isDefault() const {
117 return DEFAULT == stream_type;
118 }
119
getStreamType() const120 uint8_t getStreamType() const {
121 return stream_type;
122 }
123 };
124
operator <<(std::ostream & stream,StreamIdType s)125 std::ostream& operator<<(std::ostream& stream, StreamIdType s) {
126 if (s.isDefault()) {
127 stream << "DEFAULT";
128 } else if (s.isExt()) {
129 stream << "EXT";
130 } else {
131 stream << "PRIORITY " << int(s.getStreamType());
132 }
133 return stream;
134 }
135
136 // StreamId is 64-bit, so we can just rely on regular promotion rules.
137 // We rely on streamIdIndex and streamIdType being non-negative;
138 // see Note [Hazard when concatenating signed integers]
139
streamIdType(StreamId s)140 static inline StreamIdType streamIdType(StreamId s) {
141 // Externally allocated streams have their id being the cudaStream_ptr
142 // so the last bit will be 0
143 if ((!(s & 1)) && s) {
144 return StreamIdType(StreamIdType::EXT);
145 }
146 // last bit is external/internal stream, the mask should start from second
147 // rightmost bit
148 int mask_for_type = (1 << kStreamTypeBits) - 1;
149 auto val = (s >> 1) & mask_for_type;
150 TORCH_INTERNAL_ASSERT(val || !(s & 1), "invalid StreamId", s);
151 return StreamIdType(val);
152 }
153
streamIdIndex(StreamId s)154 static inline size_t streamIdIndex(StreamId s) {
155 return static_cast<size_t>(
156 (s >> (kStreamTypeBits + 1)) & ((1 << kStreamsPerPoolBits) - 1));
157 }
158
makeStreamId(StreamIdType st,size_t si)159 StreamId makeStreamId(StreamIdType st, size_t si) {
160 if (st.isDefault()) {
161 return static_cast<StreamId>(0);
162 }
163 return (static_cast<StreamId>(si) << (kStreamTypeBits + 1)) |
164 static_cast<StreamId>(st.getStreamType() << 1) | 1;
165 }
166
167 // Thread-local current streams
168 // NOLINTNEXTLINE(*-arrays)
169 static thread_local std::unique_ptr<StreamId[]> current_streams = nullptr;
170
171 // Populates global values.
172 // Warning: this function must only be called once!
initGlobalStreamState()173 static void initGlobalStreamState() {
174 num_gpus = device_count();
175 // Check if the number of GPUs matches the expected compile-time max number
176 // of GPUs.
177 TORCH_CHECK(
178 num_gpus <= C10_COMPILE_TIME_MAX_GPUS,
179 "Number of CUDA devices on the machine is larger than the compiled "
180 "max number of gpus expected (",
181 C10_COMPILE_TIME_MAX_GPUS,
182 "). Increase that and recompile.");
183 int leastPriority = -1, greatestPriority = -1;
184 C10_CUDA_CHECK(
185 cudaDeviceGetStreamPriorityRange(&leastPriority, &greatestPriority));
186 // Note [HIP stream priorities]
187 // HIP stream priorities are 1=low, 0=default, -1=high which differs from CUDA
188 // which is 0=default, -1=high, -2=higher etc.
189 // Clamp leastPriority to 0 for HIP.
190 #ifdef USE_ROCM
191 leastPriority = 0;
192 #endif
193 // greatestPriority is negative
194 auto range = leastPriority - greatestPriority + 1;
195 max_stream_priorities = range >= c10::cuda::max_compile_time_stream_priorities
196 ? c10::cuda::max_compile_time_stream_priorities
197 : range;
198 }
199
200 // Init a single CUDA or HIP stream
201 // See Note [HIP Lazy Streams]
initSingleStream(int p,DeviceIndex device_index,int i)202 static void initSingleStream(int p, DeviceIndex device_index, int i) {
203 auto& stream = streams[p][device_index][i];
204 auto pri = -p; // lower number is higher priority
205
206 C10_CUDA_CHECK(cudaStreamCreateWithPriority(&stream, kDefaultFlags, pri));
207 const c10::impl::PyInterpreter* interp = c10::impl::GPUTrace::get_trace();
208 if (C10_UNLIKELY(interp)) {
209 (*interp)->trace_gpu_stream_creation(
210 c10::kCUDA, reinterpret_cast<uintptr_t>(stream));
211 priority_counters[p][device_index] = 0;
212 }
213 }
214
215 // Creates the low and high priority stream pools for the specified device
216 // Warning: only call once per device!
initDeviceStreamState(DeviceIndex device_index)217 static void initDeviceStreamState(DeviceIndex device_index) {
218 // Switches to the requested device so streams are properly associated
219 // with it.
220 CUDAGuard device_guard{device_index};
221 for (const auto i : c10::irange(kStreamsPerPool)) {
222 for (const auto p : c10::irange(max_stream_priorities)) {
223 initSingleStream(p, device_index, i);
224 }
225 }
226 }
227
228 // Init front-end to ensure initialization only occurs once
initCUDAStreamsOnce()229 static void initCUDAStreamsOnce() {
230 // Inits default streams (once, globally)
231 c10::call_once(init_flag, initGlobalStreamState);
232
233 if (current_streams) {
234 return;
235 }
236
237 // Inits current streams (thread local) to default streams
238 // NOLINTNEXTLINE(*-arrays)
239 current_streams = std::make_unique<StreamId[]>(num_gpus);
240 for (const auto i : c10::irange(num_gpus)) {
241 current_streams[i] = makeStreamId(StreamIdType::DEFAULT, 0);
242 }
243 }
244
245 // Helper to verify the GPU index is valid
check_gpu(DeviceIndex device_index)246 static inline void check_gpu(DeviceIndex device_index) {
247 TORCH_INTERNAL_ASSERT(device_index >= 0 && device_index < num_gpus);
248 }
249
250 // Helper to determine the index of the stream to return
251 // Note: Streams are returned round-robin (see note in CUDAStream.h)
get_idx(std::atomic<uint32_t> & counter)252 static uint32_t get_idx(std::atomic<uint32_t>& counter) {
253 auto raw_idx = counter++;
254 return raw_idx % kStreamsPerPool;
255 }
256
CUDAStreamForId(DeviceIndex device_index,StreamId stream_id)257 CUDAStream CUDAStreamForId(DeviceIndex device_index, StreamId stream_id) {
258 return CUDAStream(
259 CUDAStream::UNCHECKED,
260 Stream(
261 Stream::UNSAFE,
262 c10::Device(DeviceType::CUDA, device_index),
263 stream_id));
264 }
265
266 } // anonymous namespace
267
268 // See Note [StreamId assignment]
stream() const269 cudaStream_t CUDAStream::stream() const {
270 c10::DeviceIndex device_index = stream_.device_index();
271 StreamId stream_id = stream_.id();
272 StreamIdType st = streamIdType(stream_id);
273 size_t si = streamIdIndex(stream_id);
274 if (st.isDefault()) {
275 TORCH_INTERNAL_ASSERT(
276 si == 0,
277 "Unrecognized stream ",
278 stream_,
279 " (I think this should be the default stream, but I got a non-zero index ",
280 si,
281 ").",
282 " Did you manufacture the StreamId yourself? Don't do that; use the",
283 " official API like c10::cuda::getStreamFromPool() to get a new stream.");
284 return nullptr;
285 } else if (st.isExt()) {
286 // NOLINTNEXTLINE(performance-no-int-to-ptr)
287 return reinterpret_cast<cudaStream_t>(stream_id);
288 } else {
289 auto streamType = st.getStreamType();
290 TORCH_INTERNAL_ASSERT(
291 streamType >= 1 && streamType <= max_stream_priorities,
292 "Unrecognized stream ",
293 stream_,
294 " (I didn't recognize the stream type, ",
295 st,
296 " with the value ",
297 streamType,
298 ")");
299 #ifdef USE_ROCM
300 // See Note [HIP Lazy Streams]
301 c10::call_once(
302 stream_flags[st.getStreamType() - 1][device_index][si],
303 initSingleStream,
304 st.getStreamType() - 1,
305 device_index,
306 si);
307 #endif
308 return streams[st.getStreamType() - 1][device_index][si];
309 }
310 }
311
312 // Returns a stream from the requested pool
313 // Note: when called the first time on a device, this will create the
314 // stream pools for that device.
getStreamFromPool(const int priority,DeviceIndex device_index)315 CUDAStream getStreamFromPool(const int priority, DeviceIndex device_index) {
316 initCUDAStreamsOnce();
317 if (device_index == -1) {
318 device_index = current_device();
319 c10::cuda::SetTargetDevice();
320 }
321 TORCH_CHECK(
322 priority <= 0,
323 "Expected cuda stream priority to be less than or equal to 0, got ",
324 priority);
325 check_gpu(device_index);
326 #if !defined(USE_ROCM)
327 // See Note [HIP Lazy Streams]
328 // CUDA-only: Initializes the stream pools (once)
329 c10::call_once(
330 device_flags[device_index], initDeviceStreamState, device_index);
331 #endif
332 auto pri_idx = -priority;
333 pri_idx =
334 std::min(pri_idx, max_stream_priorities - 1); // pri_idx is zero-based
335 const auto idx = get_idx(priority_counters[pri_idx][device_index]);
336 StreamIdType id_type = StreamIdType(pri_idx + 1);
337 return CUDAStreamForId(device_index, makeStreamId(id_type, idx));
338 }
339
getStreamFromPool(const bool isHighPriority,DeviceIndex device)340 CUDAStream getStreamFromPool(const bool isHighPriority, DeviceIndex device) {
341 initCUDAStreamsOnce();
342 int priority = isHighPriority ? -max_stream_priorities + 1 : 0;
343 return getStreamFromPool(priority, device);
344 }
345
getStreamFromExternal(cudaStream_t ext_stream,DeviceIndex device_index)346 CUDAStream getStreamFromExternal(
347 cudaStream_t ext_stream,
348 DeviceIndex device_index) {
349 // The stream pointer will be the actual id
350 return CUDAStreamForId(device_index, reinterpret_cast<int64_t>(ext_stream));
351 }
352
getDefaultCUDAStream(DeviceIndex device_index)353 CUDAStream getDefaultCUDAStream(DeviceIndex device_index) {
354 initCUDAStreamsOnce();
355 if (device_index == -1) {
356 device_index = current_device();
357 c10::cuda::SetTargetDevice();
358 }
359 check_gpu(device_index);
360 return CUDAStreamForId(device_index, makeStreamId(StreamIdType::DEFAULT, 0));
361 }
362
getCurrentCUDAStream(DeviceIndex device_index)363 CUDAStream getCurrentCUDAStream(DeviceIndex device_index) {
364 initCUDAStreamsOnce();
365 if (device_index == -1) {
366 device_index = current_device();
367 c10::cuda::SetTargetDevice();
368 }
369 check_gpu(device_index);
370 return CUDAStreamForId(device_index, current_streams[device_index]);
371 }
372
setCurrentCUDAStream(CUDAStream stream)373 void setCurrentCUDAStream(CUDAStream stream) {
374 initCUDAStreamsOnce();
375 current_streams[stream.device_index()] = stream.id();
376 }
377
operator <<(std::ostream & stream,const CUDAStream & s)378 std::ostream& operator<<(std::ostream& stream, const CUDAStream& s) {
379 return stream << s.unwrap();
380 }
381
382 } // namespace c10::cuda
383