xref: /aosp_15_r20/external/pytorch/c10/cuda/CUDAStream.cpp (revision da0073e96a02ea20f0ac840b70461e3646d07c45)
1 #include <c10/core/impl/GPUTrace.h>
2 #include <c10/cuda/CUDAFunctions.h>
3 #include <c10/cuda/CUDAGuard.h>
4 #include <c10/cuda/CUDAStream.h>
5 #include <c10/util/CallOnce.h>
6 #include <c10/util/Exception.h>
7 #include <c10/util/irange.h>
8 
9 #include <array>
10 #include <atomic>
11 #include <cstdint>
12 
13 namespace c10::cuda {
14 
15 namespace {
16 
17 // Global stream state and constants
18 static c10::once_flag init_flag;
19 static DeviceIndex num_gpus = -1;
20 static constexpr int kStreamsPerPoolBits = 5;
21 static constexpr int kStreamsPerPool = 1 << kStreamsPerPoolBits;
22 static constexpr unsigned int kDefaultFlags = cudaStreamNonBlocking;
23 static constexpr int kStreamTypeBits = 4;
24 
25 static int max_stream_priorities;
26 
27 // Non-default streams
28 // Note: the number of CUDA devices is determined at run time,
29 // and the low and high priority pools are lazily initialized
30 // when the first stream is requested for a device.
31 // The device flags track the initialization of each device, while
32 // the low and high priority counters track, for each device, the next stream
33 // in the pool to be returned when a stream is requested (round-robin fashion
34 // , see the note in CUDAStream.h).
35 // The streams are "leaked": they are created but never destroyed because the
36 // destruction of global variables could happen after the CUDA runtime has
37 // already been destroyed and thus invoking cudaStreamDestroy could lead to a
38 // crash. It's likely an issue in CUDA, but to be safe - let's just "forget"
39 // the destruction.
40 #if !defined(USE_ROCM)
41 // CUDA-only: used to initializes the stream pools (once)
42 static std::array<c10::once_flag, C10_COMPILE_TIME_MAX_GPUS> device_flags;
43 #endif
44 static std::array<
45     std::array<std::atomic<uint32_t>, C10_COMPILE_TIME_MAX_GPUS>,
46     c10::cuda::max_compile_time_stream_priorities>
47     priority_counters;
48 
49 static std::array<
50     std::array<
51         std::array<cudaStream_t, kStreamsPerPool>,
52         C10_COMPILE_TIME_MAX_GPUS>,
53     c10::cuda::max_compile_time_stream_priorities>
54     streams;
55 #ifdef USE_ROCM
56 static c10::once_flag
57     stream_flags[c10::cuda::max_compile_time_stream_priorities]
58                 [C10_COMPILE_TIME_MAX_GPUS][kStreamsPerPool];
59 #endif
60 
61 // Note [HIP Lazy Streams]
62 // ~~~~~~~~~~~~~~~~~~~~~~~
63 // For ROCm/HIP, each stream is lazily initialized rather than creating all
64 // streams when the first stream is requested. HIP streams are not as
65 // lightweight as CUDA streams; the pooling strategy can affect performance.
66 // Rather than changing the pooling implementation, ROCm/HIP will lazy init
67 // each stream when it is first requested.
68 
69 // Note [StreamId assignment]
70 // ~~~~~~~~~~~~~~~~~~~~~~~~~~
71 // How do we assign stream IDs?
72 //
73 // -- 54 bits --  -- 5 bits -----  -- 4 bits --     --1 bit --
74 // zeros          stream id index  StreamIdType     Ext/native stream
75 //                ignored for ext   ignored for ext
76 // for external stream, StreamID is a cudaStream_t pointer
77 // this means that last bit will always be 0
78 // so when constructing StreamId for a native stream we set last bit to 1
79 // to distinguish between native and external streams
80 //
81 //
82 // We are obligated to treat the stream ID 0 as the default stream, per the
83 // invariant specified in c10::Stream, so this is one exception to
84 // "last bit = 1 for native streams". However, all other numbers are entirely
85 // an internal implementation detail, we reserve the right to renumber streams
86 // however we like.
87 //
88 // Note that it is really important that the MSB is zero; StreamId is a
89 // *signed* integer, and unsigned to signed conversion outside of the
90 // bounds of signed integer representation is undefined behavior.  You
91 // could work around this with something like
92 // https://stackoverflow.com/questions/13150449/efficient-unsigned-to-signed-cast-avoiding-implementation-defined-behavior
93 // but it seems a bit overkill for this.
94 //
95 // Also, external managed stream pointers (cudaStream_t) can be directly stored
96 // in the Id field so in this case, we need to check the stream alignment.
97 
98 class StreamIdType {
99   // StreamIdType encodes whether this stream is DEFAULT, EXTernal or
100   // for all other native streams, the stream priority (higher value is higher
101   // priority)
102  private:
103   uint8_t stream_type;
104 
105  public:
106   static const uint8_t DEFAULT = 0x0;
107   static const uint8_t EXT = 0xF;
108 
109  public:
StreamIdType(const uint8_t _stream_type)110   StreamIdType(const uint8_t _stream_type) : stream_type(_stream_type) {}
111 
isExt() const112   bool isExt() const {
113     return EXT == stream_type;
114   }
115 
isDefault() const116   bool isDefault() const {
117     return DEFAULT == stream_type;
118   }
119 
getStreamType() const120   uint8_t getStreamType() const {
121     return stream_type;
122   }
123 };
124 
operator <<(std::ostream & stream,StreamIdType s)125 std::ostream& operator<<(std::ostream& stream, StreamIdType s) {
126   if (s.isDefault()) {
127     stream << "DEFAULT";
128   } else if (s.isExt()) {
129     stream << "EXT";
130   } else {
131     stream << "PRIORITY " << int(s.getStreamType());
132   }
133   return stream;
134 }
135 
136 // StreamId is 64-bit, so we can just rely on regular promotion rules.
137 // We rely on streamIdIndex and streamIdType being non-negative;
138 // see Note [Hazard when concatenating signed integers]
139 
streamIdType(StreamId s)140 static inline StreamIdType streamIdType(StreamId s) {
141   // Externally allocated streams have their id being the cudaStream_ptr
142   // so the last bit will be 0
143   if ((!(s & 1)) && s) {
144     return StreamIdType(StreamIdType::EXT);
145   }
146   // last bit is external/internal stream, the mask should start from second
147   // rightmost bit
148   int mask_for_type = (1 << kStreamTypeBits) - 1;
149   auto val = (s >> 1) & mask_for_type;
150   TORCH_INTERNAL_ASSERT(val || !(s & 1), "invalid StreamId", s);
151   return StreamIdType(val);
152 }
153 
streamIdIndex(StreamId s)154 static inline size_t streamIdIndex(StreamId s) {
155   return static_cast<size_t>(
156       (s >> (kStreamTypeBits + 1)) & ((1 << kStreamsPerPoolBits) - 1));
157 }
158 
makeStreamId(StreamIdType st,size_t si)159 StreamId makeStreamId(StreamIdType st, size_t si) {
160   if (st.isDefault()) {
161     return static_cast<StreamId>(0);
162   }
163   return (static_cast<StreamId>(si) << (kStreamTypeBits + 1)) |
164       static_cast<StreamId>(st.getStreamType() << 1) | 1;
165 }
166 
167 // Thread-local current streams
168 // NOLINTNEXTLINE(*-arrays)
169 static thread_local std::unique_ptr<StreamId[]> current_streams = nullptr;
170 
171 // Populates global values.
172 // Warning: this function must only be called once!
initGlobalStreamState()173 static void initGlobalStreamState() {
174   num_gpus = device_count();
175   // Check if the number of GPUs matches the expected compile-time max number
176   // of GPUs.
177   TORCH_CHECK(
178       num_gpus <= C10_COMPILE_TIME_MAX_GPUS,
179       "Number of CUDA devices on the machine is larger than the compiled "
180       "max number of gpus expected (",
181       C10_COMPILE_TIME_MAX_GPUS,
182       "). Increase that and recompile.");
183   int leastPriority = -1, greatestPriority = -1;
184   C10_CUDA_CHECK(
185       cudaDeviceGetStreamPriorityRange(&leastPriority, &greatestPriority));
186   // Note [HIP stream priorities]
187   // HIP stream priorities are 1=low, 0=default, -1=high which differs from CUDA
188   // which is 0=default, -1=high, -2=higher etc.
189   // Clamp leastPriority to 0 for HIP.
190 #ifdef USE_ROCM
191   leastPriority = 0;
192 #endif
193   // greatestPriority is negative
194   auto range = leastPriority - greatestPriority + 1;
195   max_stream_priorities = range >= c10::cuda::max_compile_time_stream_priorities
196       ? c10::cuda::max_compile_time_stream_priorities
197       : range;
198 }
199 
200 // Init a single CUDA or HIP stream
201 // See Note [HIP Lazy Streams]
initSingleStream(int p,DeviceIndex device_index,int i)202 static void initSingleStream(int p, DeviceIndex device_index, int i) {
203   auto& stream = streams[p][device_index][i];
204   auto pri = -p; // lower number is higher priority
205 
206   C10_CUDA_CHECK(cudaStreamCreateWithPriority(&stream, kDefaultFlags, pri));
207   const c10::impl::PyInterpreter* interp = c10::impl::GPUTrace::get_trace();
208   if (C10_UNLIKELY(interp)) {
209     (*interp)->trace_gpu_stream_creation(
210         c10::kCUDA, reinterpret_cast<uintptr_t>(stream));
211     priority_counters[p][device_index] = 0;
212   }
213 }
214 
215 // Creates the low and high priority stream pools for the specified device
216 // Warning: only call once per device!
initDeviceStreamState(DeviceIndex device_index)217 static void initDeviceStreamState(DeviceIndex device_index) {
218   // Switches to the requested device so streams are properly associated
219   // with it.
220   CUDAGuard device_guard{device_index};
221   for (const auto i : c10::irange(kStreamsPerPool)) {
222     for (const auto p : c10::irange(max_stream_priorities)) {
223       initSingleStream(p, device_index, i);
224     }
225   }
226 }
227 
228 // Init front-end to ensure initialization only occurs once
initCUDAStreamsOnce()229 static void initCUDAStreamsOnce() {
230   // Inits default streams (once, globally)
231   c10::call_once(init_flag, initGlobalStreamState);
232 
233   if (current_streams) {
234     return;
235   }
236 
237   // Inits current streams (thread local) to default streams
238   // NOLINTNEXTLINE(*-arrays)
239   current_streams = std::make_unique<StreamId[]>(num_gpus);
240   for (const auto i : c10::irange(num_gpus)) {
241     current_streams[i] = makeStreamId(StreamIdType::DEFAULT, 0);
242   }
243 }
244 
245 // Helper to verify the GPU index is valid
check_gpu(DeviceIndex device_index)246 static inline void check_gpu(DeviceIndex device_index) {
247   TORCH_INTERNAL_ASSERT(device_index >= 0 && device_index < num_gpus);
248 }
249 
250 // Helper to determine the index of the stream to return
251 // Note: Streams are returned round-robin (see note in CUDAStream.h)
get_idx(std::atomic<uint32_t> & counter)252 static uint32_t get_idx(std::atomic<uint32_t>& counter) {
253   auto raw_idx = counter++;
254   return raw_idx % kStreamsPerPool;
255 }
256 
CUDAStreamForId(DeviceIndex device_index,StreamId stream_id)257 CUDAStream CUDAStreamForId(DeviceIndex device_index, StreamId stream_id) {
258   return CUDAStream(
259       CUDAStream::UNCHECKED,
260       Stream(
261           Stream::UNSAFE,
262           c10::Device(DeviceType::CUDA, device_index),
263           stream_id));
264 }
265 
266 } // anonymous namespace
267 
268 // See Note [StreamId assignment]
stream() const269 cudaStream_t CUDAStream::stream() const {
270   c10::DeviceIndex device_index = stream_.device_index();
271   StreamId stream_id = stream_.id();
272   StreamIdType st = streamIdType(stream_id);
273   size_t si = streamIdIndex(stream_id);
274   if (st.isDefault()) {
275     TORCH_INTERNAL_ASSERT(
276         si == 0,
277         "Unrecognized stream ",
278         stream_,
279         " (I think this should be the default stream, but I got a non-zero index ",
280         si,
281         ").",
282         " Did you manufacture the StreamId yourself?  Don't do that; use the",
283         " official API like c10::cuda::getStreamFromPool() to get a new stream.");
284     return nullptr;
285   } else if (st.isExt()) {
286     // NOLINTNEXTLINE(performance-no-int-to-ptr)
287     return reinterpret_cast<cudaStream_t>(stream_id);
288   } else {
289     auto streamType = st.getStreamType();
290     TORCH_INTERNAL_ASSERT(
291         streamType >= 1 && streamType <= max_stream_priorities,
292         "Unrecognized stream ",
293         stream_,
294         " (I didn't recognize the stream type, ",
295         st,
296         " with the value ",
297         streamType,
298         ")");
299 #ifdef USE_ROCM
300     // See Note [HIP Lazy Streams]
301     c10::call_once(
302         stream_flags[st.getStreamType() - 1][device_index][si],
303         initSingleStream,
304         st.getStreamType() - 1,
305         device_index,
306         si);
307 #endif
308     return streams[st.getStreamType() - 1][device_index][si];
309   }
310 }
311 
312 // Returns a stream from the requested pool
313 // Note: when called the first time on a device, this will create the
314 // stream pools for that device.
getStreamFromPool(const int priority,DeviceIndex device_index)315 CUDAStream getStreamFromPool(const int priority, DeviceIndex device_index) {
316   initCUDAStreamsOnce();
317   if (device_index == -1) {
318     device_index = current_device();
319     c10::cuda::SetTargetDevice();
320   }
321   TORCH_CHECK(
322       priority <= 0,
323       "Expected cuda stream priority to be less than or equal to 0, got ",
324       priority);
325   check_gpu(device_index);
326 #if !defined(USE_ROCM)
327   // See Note [HIP Lazy Streams]
328   // CUDA-only: Initializes the stream pools (once)
329   c10::call_once(
330       device_flags[device_index], initDeviceStreamState, device_index);
331 #endif
332   auto pri_idx = -priority;
333   pri_idx =
334       std::min(pri_idx, max_stream_priorities - 1); // pri_idx is zero-based
335   const auto idx = get_idx(priority_counters[pri_idx][device_index]);
336   StreamIdType id_type = StreamIdType(pri_idx + 1);
337   return CUDAStreamForId(device_index, makeStreamId(id_type, idx));
338 }
339 
getStreamFromPool(const bool isHighPriority,DeviceIndex device)340 CUDAStream getStreamFromPool(const bool isHighPriority, DeviceIndex device) {
341   initCUDAStreamsOnce();
342   int priority = isHighPriority ? -max_stream_priorities + 1 : 0;
343   return getStreamFromPool(priority, device);
344 }
345 
getStreamFromExternal(cudaStream_t ext_stream,DeviceIndex device_index)346 CUDAStream getStreamFromExternal(
347     cudaStream_t ext_stream,
348     DeviceIndex device_index) {
349   // The stream pointer will be the actual id
350   return CUDAStreamForId(device_index, reinterpret_cast<int64_t>(ext_stream));
351 }
352 
getDefaultCUDAStream(DeviceIndex device_index)353 CUDAStream getDefaultCUDAStream(DeviceIndex device_index) {
354   initCUDAStreamsOnce();
355   if (device_index == -1) {
356     device_index = current_device();
357     c10::cuda::SetTargetDevice();
358   }
359   check_gpu(device_index);
360   return CUDAStreamForId(device_index, makeStreamId(StreamIdType::DEFAULT, 0));
361 }
362 
getCurrentCUDAStream(DeviceIndex device_index)363 CUDAStream getCurrentCUDAStream(DeviceIndex device_index) {
364   initCUDAStreamsOnce();
365   if (device_index == -1) {
366     device_index = current_device();
367     c10::cuda::SetTargetDevice();
368   }
369   check_gpu(device_index);
370   return CUDAStreamForId(device_index, current_streams[device_index]);
371 }
372 
setCurrentCUDAStream(CUDAStream stream)373 void setCurrentCUDAStream(CUDAStream stream) {
374   initCUDAStreamsOnce();
375   current_streams[stream.device_index()] = stream.id();
376 }
377 
operator <<(std::ostream & stream,const CUDAStream & s)378 std::ostream& operator<<(std::ostream& stream, const CUDAStream& s) {
379   return stream << s.unwrap();
380 }
381 
382 } // namespace c10::cuda
383