1 #include <gtest/gtest.h>
2
3 #include <c10/util/irange.h>
4 #include <c10/xpu/XPUStream.h>
5 #include <c10/xpu/test/impl/XPUTest.h>
6 #include <optional>
7
8 #include <thread>
9 #include <unordered_set>
10
has_xpu()11 bool has_xpu() {
12 return c10::xpu::device_count() > 0;
13 }
14
TEST(XPUStreamTest,CopyAndMoveTest)15 TEST(XPUStreamTest, CopyAndMoveTest) {
16 if (!has_xpu()) {
17 return;
18 }
19
20 int32_t device = -1;
21 sycl::queue queue;
22 c10::xpu::XPUStream copyStream = c10::xpu::getStreamFromPool();
23 {
24 auto s = c10::xpu::getStreamFromPool();
25 device = s.device_index();
26 queue = s.queue();
27
28 copyStream = s;
29
30 EXPECT_EQ(copyStream.device_index(), device);
31 EXPECT_EQ(copyStream.queue(), queue);
32 }
33
34 EXPECT_EQ(copyStream.device_index(), device);
35 EXPECT_EQ(copyStream.queue(), queue);
36
37 // Tests that moving works as expected and preserves the stream
38 c10::xpu::XPUStream moveStream = c10::xpu::getStreamFromPool();
39 {
40 auto s = c10::xpu::getStreamFromPool();
41 device = s.device_index();
42 queue = s.queue();
43
44 moveStream = std::move(s);
45
46 EXPECT_EQ(moveStream.device_index(), device);
47 EXPECT_EQ(moveStream.queue(), queue);
48 }
49
50 EXPECT_EQ(moveStream.device_index(), device);
51 EXPECT_EQ(moveStream.queue(), queue);
52 }
53
TEST(XPUStreamTest,StreamBehavior)54 TEST(XPUStreamTest, StreamBehavior) {
55 if (!has_xpu()) {
56 return;
57 }
58
59 c10::xpu::XPUStream stream = c10::xpu::getStreamFromPool();
60 EXPECT_EQ(stream.device_type(), c10::kXPU);
61 c10::xpu::setCurrentXPUStream(stream);
62 c10::xpu::XPUStream cur_stream = c10::xpu::getCurrentXPUStream();
63
64 EXPECT_EQ(cur_stream, stream);
65 EXPECT_EQ(stream.priority(), 0);
66
67 auto [least_priority, greatest_priority] =
68 c10::xpu::XPUStream::priority_range();
69 EXPECT_EQ(least_priority, 0);
70 EXPECT_TRUE(greatest_priority < 0);
71
72 stream = c10::xpu::getStreamFromPool(/* isHighPriority */ true);
73 EXPECT_TRUE(stream.priority() < 0);
74
75 if (c10::xpu::device_count() <= 1) {
76 return;
77 }
78
79 c10::xpu::set_device(0);
80 stream = c10::xpu::getStreamFromPool(false, 1);
81 EXPECT_EQ(stream.device_index(), 1);
82 EXPECT_NE(stream.device_index(), c10::xpu::current_device());
83 }
84
thread_fun(std::optional<c10::xpu::XPUStream> & cur_thread_stream)85 void thread_fun(std::optional<c10::xpu::XPUStream>& cur_thread_stream) {
86 auto new_stream = c10::xpu::getStreamFromPool();
87 c10::xpu::setCurrentXPUStream(new_stream);
88 cur_thread_stream = {c10::xpu::getCurrentXPUStream()};
89 EXPECT_EQ(*cur_thread_stream, new_stream);
90 }
91
92 // Ensures streams are thread local
TEST(XPUStreamTest,MultithreadStreamBehavior)93 TEST(XPUStreamTest, MultithreadStreamBehavior) {
94 if (!has_xpu()) {
95 return;
96 }
97 std::optional<c10::xpu::XPUStream> s0, s1;
98
99 std::thread t0{thread_fun, std::ref(s0)};
100 std::thread t1{thread_fun, std::ref(s1)};
101 t0.join();
102 t1.join();
103
104 c10::xpu::XPUStream cur_stream = c10::xpu::getCurrentXPUStream();
105
106 EXPECT_NE(cur_stream, *s0);
107 EXPECT_NE(cur_stream, *s1);
108 EXPECT_NE(s0, s1);
109 }
110
111 // Ensure queue pool round-robin fashion
TEST(XPUStreamTest,StreamPoolRoundRobinTest)112 TEST(XPUStreamTest, StreamPoolRoundRobinTest) {
113 if (!has_xpu()) {
114 return;
115 }
116
117 std::vector<c10::xpu::XPUStream> streams{};
118 for (C10_UNUSED const auto _ : c10::irange(200)) {
119 streams.emplace_back(c10::xpu::getStreamFromPool());
120 }
121
122 std::unordered_set<sycl::queue> queue_set{};
123 bool hasDuplicates = false;
124 for (const auto i : c10::irange(streams.size())) {
125 auto& queue = streams[i].queue();
126 auto result_pair = queue_set.insert(queue);
127 if (!result_pair.second) { // already existed
128 hasDuplicates = true;
129 } else { // newly inserted
130 EXPECT_TRUE(!hasDuplicates);
131 }
132 }
133 EXPECT_TRUE(hasDuplicates);
134
135 auto stream = c10::xpu::getStreamFromPool(/* isHighPriority */ true);
136 auto result_pair = queue_set.insert(stream.queue());
137 EXPECT_TRUE(result_pair.second);
138 }
139
asyncMemCopy(sycl::queue & queue,int * dst,int * src,size_t numBytes)140 void asyncMemCopy(sycl::queue& queue, int* dst, int* src, size_t numBytes) {
141 queue.memcpy(dst, src, numBytes);
142 }
143
TEST(XPUStreamTest,StreamFunction)144 TEST(XPUStreamTest, StreamFunction) {
145 if (!has_xpu()) {
146 return;
147 }
148
149 constexpr int numel = 1024;
150 int hostData[numel];
151 initHostData(hostData, numel);
152
153 auto stream = c10::xpu::getStreamFromPool();
154 EXPECT_TRUE(stream.query());
155 int* deviceData = sycl::malloc_device<int>(numel, stream);
156
157 // H2D
158 asyncMemCopy(stream, deviceData, hostData, sizeof(int) * numel);
159 c10::xpu::syncStreamsOnDevice();
160 EXPECT_TRUE(stream.query());
161
162 clearHostData(hostData, numel);
163
164 // D2H
165 asyncMemCopy(stream, hostData, deviceData, sizeof(int) * numel);
166 c10::xpu::syncStreamsOnDevice();
167
168 validateHostData(hostData, numel);
169
170 stream = c10::xpu::getStreamFromPool(-1);
171
172 clearHostData(hostData, numel);
173
174 // D2H
175 asyncMemCopy(stream, hostData, deviceData, sizeof(int) * numel);
176 c10::xpu::syncStreamsOnDevice();
177
178 validateHostData(hostData, numel);
179 sycl::free(deviceData, c10::xpu::get_device_context());
180 }
181