xref: /aosp_15_r20/external/pytorch/c10/xpu/test/impl/XPUStreamTest.cpp (revision da0073e96a02ea20f0ac840b70461e3646d07c45)
1 #include <gtest/gtest.h>
2 
3 #include <c10/util/irange.h>
4 #include <c10/xpu/XPUStream.h>
5 #include <c10/xpu/test/impl/XPUTest.h>
6 #include <optional>
7 
8 #include <thread>
9 #include <unordered_set>
10 
has_xpu()11 bool has_xpu() {
12   return c10::xpu::device_count() > 0;
13 }
14 
TEST(XPUStreamTest,CopyAndMoveTest)15 TEST(XPUStreamTest, CopyAndMoveTest) {
16   if (!has_xpu()) {
17     return;
18   }
19 
20   int32_t device = -1;
21   sycl::queue queue;
22   c10::xpu::XPUStream copyStream = c10::xpu::getStreamFromPool();
23   {
24     auto s = c10::xpu::getStreamFromPool();
25     device = s.device_index();
26     queue = s.queue();
27 
28     copyStream = s;
29 
30     EXPECT_EQ(copyStream.device_index(), device);
31     EXPECT_EQ(copyStream.queue(), queue);
32   }
33 
34   EXPECT_EQ(copyStream.device_index(), device);
35   EXPECT_EQ(copyStream.queue(), queue);
36 
37   // Tests that moving works as expected and preserves the stream
38   c10::xpu::XPUStream moveStream = c10::xpu::getStreamFromPool();
39   {
40     auto s = c10::xpu::getStreamFromPool();
41     device = s.device_index();
42     queue = s.queue();
43 
44     moveStream = std::move(s);
45 
46     EXPECT_EQ(moveStream.device_index(), device);
47     EXPECT_EQ(moveStream.queue(), queue);
48   }
49 
50   EXPECT_EQ(moveStream.device_index(), device);
51   EXPECT_EQ(moveStream.queue(), queue);
52 }
53 
TEST(XPUStreamTest,StreamBehavior)54 TEST(XPUStreamTest, StreamBehavior) {
55   if (!has_xpu()) {
56     return;
57   }
58 
59   c10::xpu::XPUStream stream = c10::xpu::getStreamFromPool();
60   EXPECT_EQ(stream.device_type(), c10::kXPU);
61   c10::xpu::setCurrentXPUStream(stream);
62   c10::xpu::XPUStream cur_stream = c10::xpu::getCurrentXPUStream();
63 
64   EXPECT_EQ(cur_stream, stream);
65   EXPECT_EQ(stream.priority(), 0);
66 
67   auto [least_priority, greatest_priority] =
68       c10::xpu::XPUStream::priority_range();
69   EXPECT_EQ(least_priority, 0);
70   EXPECT_TRUE(greatest_priority < 0);
71 
72   stream = c10::xpu::getStreamFromPool(/* isHighPriority */ true);
73   EXPECT_TRUE(stream.priority() < 0);
74 
75   if (c10::xpu::device_count() <= 1) {
76     return;
77   }
78 
79   c10::xpu::set_device(0);
80   stream = c10::xpu::getStreamFromPool(false, 1);
81   EXPECT_EQ(stream.device_index(), 1);
82   EXPECT_NE(stream.device_index(), c10::xpu::current_device());
83 }
84 
thread_fun(std::optional<c10::xpu::XPUStream> & cur_thread_stream)85 void thread_fun(std::optional<c10::xpu::XPUStream>& cur_thread_stream) {
86   auto new_stream = c10::xpu::getStreamFromPool();
87   c10::xpu::setCurrentXPUStream(new_stream);
88   cur_thread_stream = {c10::xpu::getCurrentXPUStream()};
89   EXPECT_EQ(*cur_thread_stream, new_stream);
90 }
91 
92 // Ensures streams are thread local
TEST(XPUStreamTest,MultithreadStreamBehavior)93 TEST(XPUStreamTest, MultithreadStreamBehavior) {
94   if (!has_xpu()) {
95     return;
96   }
97   std::optional<c10::xpu::XPUStream> s0, s1;
98 
99   std::thread t0{thread_fun, std::ref(s0)};
100   std::thread t1{thread_fun, std::ref(s1)};
101   t0.join();
102   t1.join();
103 
104   c10::xpu::XPUStream cur_stream = c10::xpu::getCurrentXPUStream();
105 
106   EXPECT_NE(cur_stream, *s0);
107   EXPECT_NE(cur_stream, *s1);
108   EXPECT_NE(s0, s1);
109 }
110 
111 // Ensure queue pool round-robin fashion
TEST(XPUStreamTest,StreamPoolRoundRobinTest)112 TEST(XPUStreamTest, StreamPoolRoundRobinTest) {
113   if (!has_xpu()) {
114     return;
115   }
116 
117   std::vector<c10::xpu::XPUStream> streams{};
118   for (C10_UNUSED const auto _ : c10::irange(200)) {
119     streams.emplace_back(c10::xpu::getStreamFromPool());
120   }
121 
122   std::unordered_set<sycl::queue> queue_set{};
123   bool hasDuplicates = false;
124   for (const auto i : c10::irange(streams.size())) {
125     auto& queue = streams[i].queue();
126     auto result_pair = queue_set.insert(queue);
127     if (!result_pair.second) { // already existed
128       hasDuplicates = true;
129     } else { // newly inserted
130       EXPECT_TRUE(!hasDuplicates);
131     }
132   }
133   EXPECT_TRUE(hasDuplicates);
134 
135   auto stream = c10::xpu::getStreamFromPool(/* isHighPriority */ true);
136   auto result_pair = queue_set.insert(stream.queue());
137   EXPECT_TRUE(result_pair.second);
138 }
139 
asyncMemCopy(sycl::queue & queue,int * dst,int * src,size_t numBytes)140 void asyncMemCopy(sycl::queue& queue, int* dst, int* src, size_t numBytes) {
141   queue.memcpy(dst, src, numBytes);
142 }
143 
TEST(XPUStreamTest,StreamFunction)144 TEST(XPUStreamTest, StreamFunction) {
145   if (!has_xpu()) {
146     return;
147   }
148 
149   constexpr int numel = 1024;
150   int hostData[numel];
151   initHostData(hostData, numel);
152 
153   auto stream = c10::xpu::getStreamFromPool();
154   EXPECT_TRUE(stream.query());
155   int* deviceData = sycl::malloc_device<int>(numel, stream);
156 
157   // H2D
158   asyncMemCopy(stream, deviceData, hostData, sizeof(int) * numel);
159   c10::xpu::syncStreamsOnDevice();
160   EXPECT_TRUE(stream.query());
161 
162   clearHostData(hostData, numel);
163 
164   // D2H
165   asyncMemCopy(stream, hostData, deviceData, sizeof(int) * numel);
166   c10::xpu::syncStreamsOnDevice();
167 
168   validateHostData(hostData, numel);
169 
170   stream = c10::xpu::getStreamFromPool(-1);
171 
172   clearHostData(hostData, numel);
173 
174   // D2H
175   asyncMemCopy(stream, hostData, deviceData, sizeof(int) * numel);
176   c10::xpu::syncStreamsOnDevice();
177 
178   validateHostData(hostData, numel);
179   sycl::free(deviceData, c10::xpu::get_device_context());
180 }
181