xref: /aosp_15_r20/external/pytorch/c10/xpu/XPUStream.cpp (revision da0073e96a02ea20f0ac840b70461e3646d07c45)
1 #include <c10/util/CallOnce.h>
2 #include <c10/util/irange.h>
3 #include <c10/xpu/XPUException.h>
4 #include <c10/xpu/XPUStream.h>
5 
6 #include <atomic>
7 #include <deque>
8 #include <mutex>
9 #include <vector>
10 
11 namespace c10::xpu {
12 namespace {
13 
14 // Global stream state and constants
15 c10::once_flag init_flag;
16 DeviceIndex num_gpus = -1;
17 constexpr int kStreamsPerPoolBits = 5;
18 constexpr int kStreamsPerPool = 1 << kStreamsPerPoolBits;
19 constexpr int kStreamTypeBits = 3;
20 
21 // The SYCL queue pools are lazily initialized when the first queue is requested
22 // for a device. The device flags track the initialization of each device. When
23 // a queue is requested, the next queue in the pool to be returned in a
24 // round-robin fashion, see Note [Stream Management].
25 std::deque<c10::once_flag> device_flags;
26 std::vector<std::array<
27     std::array<std::unique_ptr<sycl::queue>, kStreamsPerPool>,
28     max_compile_time_stream_priorities>>
29     streams;
30 std::deque<
31     std::array<std::atomic<uint32_t>, max_compile_time_stream_priorities>>
32     priority_counters;
33 
34 thread_local std::unique_ptr<StreamId[]> current_streams = nullptr;
35 
36 // Note [StreamId assignment]
37 // ~~~~~~~~~~~~~~~~~~~~~~~~~~
38 // How do we assign stream IDs?
39 //
40 // -- 57 bits --  -- 5 bits -----  -- 3 bits --
41 //     zeros      StreamIdIndex    StreamIdType
42 //
43 // Where StreamIdType:
44 //  000 = normal priority queue
45 //  001 = high priority queue
46 //
47 // StreamId is 64-bit, so we can just rely on regular promotion rules.
48 // We rely on StreamIdIndex and StreamIdType being non-negative;
49 
50 using StreamIdIndex = uint8_t;
51 enum class StreamIdType : uint8_t {
52   // The higher the type number, the higher the priority.
53   NORMAL = 0x0,
54   HIGH = 0X1,
55 };
56 
operator <<(std::ostream & stream,StreamIdType q)57 inline std::ostream& operator<<(std::ostream& stream, StreamIdType q) {
58   switch (q) {
59     case StreamIdType::NORMAL:
60       return stream << "NORMAL";
61     case StreamIdType::HIGH:
62       return stream << "HIGH";
63     default:
64       break;
65   }
66   return stream << static_cast<int16_t>(q);
67 }
68 
streamIdType(StreamId s)69 inline StreamIdType streamIdType(StreamId s) {
70   int mask_for_type = (1 << kStreamTypeBits) - 1;
71   auto st = static_cast<StreamIdType>(s & mask_for_type);
72   TORCH_CHECK(
73       st == StreamIdType::NORMAL || st == StreamIdType::HIGH,
74       "invalid StreamId: ",
75       s);
76   return st;
77 }
78 
streamIdIndex(StreamId s)79 inline StreamIdIndex streamIdIndex(StreamId s) {
80   return static_cast<StreamIdIndex>(
81       (s >> kStreamTypeBits) & ((1 << kStreamsPerPoolBits) - 1));
82 }
83 
makeStreamId(StreamIdType st,StreamIdIndex si)84 inline StreamId makeStreamId(StreamIdType st, StreamIdIndex si) {
85   return (static_cast<StreamId>(si) << kStreamTypeBits) |
86       static_cast<StreamId>(st);
87 }
88 
initGlobalStreamState()89 void initGlobalStreamState() {
90   num_gpus = c10::xpu::device_count();
91   device_flags.resize(num_gpus);
92   streams.resize(num_gpus);
93   priority_counters.resize(num_gpus);
94 }
95 
96 // Creates the reserved SYCL queue pools for the specified device. It should be
97 // call only once.
initDeviceStreamState(DeviceIndex device)98 void initDeviceStreamState(DeviceIndex device) {
99   using namespace sycl::ext::oneapi::property;
100   // Need to align with StreamIdType.
101   const std::vector<sycl::property_list> properties = {
102       {sycl::property::queue::in_order(), queue::priority_normal()},
103       {sycl::property::queue::in_order(), queue::priority_high()}};
104   for (const auto p : c10::irange(max_compile_time_stream_priorities)) {
105     for (const auto i : c10::irange(kStreamsPerPool)) {
106       auto& stream = streams[device][p][i];
107       stream = std::make_unique<sycl::queue>(sycl::queue(
108           c10::xpu::get_device_context(),
109           c10::xpu::get_raw_device(device),
110           c10::xpu::asyncHandler,
111           properties[p]));
112       const c10::impl::PyInterpreter* interp = c10::impl::GPUTrace::get_trace();
113       if (C10_UNLIKELY(interp)) {
114         (*interp)->trace_gpu_stream_creation(
115             c10::kXPU, reinterpret_cast<uintptr_t>(stream.get()));
116       }
117     }
118     priority_counters[device][p] = 0;
119   }
120 }
121 
initXPUStreamsOnce()122 void initXPUStreamsOnce() {
123   c10::call_once(init_flag, initGlobalStreamState);
124 
125   if (current_streams) {
126     return;
127   }
128 
129   // Inits current streams (thread local) to the last queue in the "normal
130   // priority" queue pool. Note: the queue pool have not been initialized yet.
131   // It will be initialized in initDeviceStreamState for the specified device.
132   current_streams = std::make_unique<StreamId[]>(num_gpus);
133   for (const auto i : c10::irange(num_gpus)) {
134     // Assigning the current stream to the last one in the pool can be
135     // beneficial in certain scenarios, particularly when users initialize their
136     // workload to perform computations with the current stream (the last one)
137     // and utilize stream (the first one) from the pool for communication, it
138     // allows for different streams to overlap in computation and communication.
139     current_streams[i] =
140         makeStreamId(StreamIdType::NORMAL, kStreamsPerPool - 1);
141   }
142 }
143 
144 // Creates the reserved sycl queue pools for the specified device to ensure
145 // initialization only occurs once.
initDeviceStreamOnce(DeviceIndex device)146 inline void initDeviceStreamOnce(DeviceIndex device) {
147   c10::call_once(device_flags[device], initDeviceStreamState, device);
148 }
149 
check_device(DeviceIndex device)150 inline void check_device(DeviceIndex device) {
151   TORCH_CHECK(
152       device >= 0 && device < num_gpus,
153       "device is out of range, device is ",
154       static_cast<int16_t>(device),
155       ", total number of device is ",
156       static_cast<int16_t>(num_gpus),
157       ".");
158 }
159 
get_idx(std::atomic<uint32_t> & counter)160 uint32_t get_idx(std::atomic<uint32_t>& counter) {
161   auto raw_idx = counter++;
162   return raw_idx % kStreamsPerPool;
163 }
164 
XPUStreamForId(DeviceIndex device_index,StreamId stream_id)165 XPUStream XPUStreamForId(DeviceIndex device_index, StreamId stream_id) {
166   return XPUStream(
167       XPUStream::UNCHECKED,
168       Stream(
169           Stream::UNSAFE,
170           c10::Device(DeviceType::XPU, device_index),
171           stream_id));
172 }
173 
174 } // anonymous namespace
175 
priority() const176 int XPUStream::priority() const {
177   StreamId stream_id = stream_.id();
178   StreamIdType st = streamIdType(stream_id);
179   // StreamIdType and priority number are inversely related.
180   return -static_cast<int>(st);
181 }
182 
183 // See Note [StreamId assignment]
queue() const184 sycl::queue& XPUStream::queue() const {
185   DeviceIndex device_index = stream_.device_index();
186   StreamId stream_id = stream_.id();
187   StreamIdType st = streamIdType(stream_id);
188   StreamIdIndex si = streamIdIndex(stream_id);
189   switch (st) {
190     case StreamIdType::NORMAL:
191     case StreamIdType::HIGH:
192       return *streams[device_index][static_cast<uint8_t>(st)][si];
193     default:
194       TORCH_CHECK(
195           false,
196           "Unrecognized stream ",
197           stream_,
198           " (I didn't recognize the stream type, ",
199           st,
200           ").",
201           " Did you manufacture the StreamId yourself?  Don't do that;");
202   }
203 }
204 
205 // Returns a stream from the requested pool
206 // Note: The stream pools will be initialized if needed, at the first invocation
207 // to this function.
getStreamFromPool(const int priority,DeviceIndex device)208 XPUStream getStreamFromPool(const int priority, DeviceIndex device) {
209   initXPUStreamsOnce();
210   if (device == -1) {
211     device = c10::xpu::current_device();
212   }
213   check_device(device);
214   TORCH_CHECK(
215       priority <= 0,
216       "Expected XPU stream priority to be less than or equal to 0, got ",
217       priority);
218   // Initializes the stream pools (once)
219   initDeviceStreamOnce(device);
220   auto priority_idx =
221       std::min(-priority, max_compile_time_stream_priorities - 1);
222   const auto idx = get_idx(priority_counters[device][priority_idx]);
223   auto id_type = static_cast<StreamIdType>(priority_idx);
224   return XPUStreamForId(device, makeStreamId(id_type, idx));
225 }
226 
getStreamFromPool(const bool isHighPriority,DeviceIndex device)227 XPUStream getStreamFromPool(const bool isHighPriority, DeviceIndex device) {
228   initXPUStreamsOnce();
229   // If isHighPriority is true, return the stream with the highest priority.
230   int priority = isHighPriority ? -max_compile_time_stream_priorities + 1 : 0;
231   return getStreamFromPool(priority, device);
232 }
233 
234 // Note: The stream pools will be initialized if needed, at the first invocation
235 // to this function.
getCurrentXPUStream(DeviceIndex device)236 XPUStream getCurrentXPUStream(DeviceIndex device) {
237   initXPUStreamsOnce();
238   if (device == -1) {
239     device = c10::xpu::current_device();
240   }
241   check_device(device);
242   // Initializes the stream pool (once)
243   initDeviceStreamOnce(device);
244   return XPUStreamForId(device, current_streams[device]);
245 }
246 
247 // Note: The stream pools will be initialized if needed, at the first invocation
248 // to this function.
setCurrentXPUStream(XPUStream stream)249 void setCurrentXPUStream(XPUStream stream) {
250   initXPUStreamsOnce();
251   current_streams[stream.device_index()] = stream.id();
252 }
253 
operator <<(std::ostream & stream,const XPUStream & s)254 std::ostream& operator<<(std::ostream& stream, const XPUStream& s) {
255   return stream << s.unwrap();
256 }
257 
258 /*
259  * Note [Synchronize Streams on Device]
260  *
261  * There are two stream pools per device to manage our reserved SYCL queues.
262  * When syncStreamsOnDevice is called, all reserved SYCL queues in the pools of
263  * the specified device will be blocked, and wait for their synchronizations. We
264  * realize the semantics via a loop through the stream pools of the specified
265  * device and make each command queue synchronization sequentially.
266  *
267  * There is a semantic gap with device synchronization because only the SYCL
268  * queues we have reserved (in our pools) will be synchronized, rather than
269  * synchronizing all SYCL queues on the specified device.
270  */
271 
272 // Note: The stream pools will be initialized if needed, at the first invocation
273 // to this function.
syncStreamsOnDevice(DeviceIndex device)274 void syncStreamsOnDevice(DeviceIndex device) {
275   initXPUStreamsOnce();
276   if (device == -1) {
277     device = c10::xpu::current_device();
278   }
279   check_device(device);
280   // Initializes the stream pools (once)
281   initDeviceStreamOnce(device);
282 
283   // For each device, we have kStreamsPerPool (32) reserved queues per priority.
284   for (const auto p : c10::irange(max_compile_time_stream_priorities)) {
285     for (const auto i : c10::irange(kStreamsPerPool)) {
286       streams[device][p][i]->wait();
287     }
288   }
289   const c10::impl::PyInterpreter* interp = c10::impl::GPUTrace::get_trace();
290   if (C10_UNLIKELY(interp)) {
291     (*interp)->trace_gpu_device_synchronization(c10::kXPU);
292   }
293 }
294 
295 } // namespace c10::xpu
296