xref: /aosp_15_r20/external/pytorch/torch/csrc/Stream.cpp (revision da0073e96a02ea20f0ac840b70461e3646d07c45)
1 #include <pybind11/pybind11.h>
2 #include <torch/csrc/Device.h>
3 #include <torch/csrc/Event.h>
4 #include <torch/csrc/Stream.h>
5 #include <torch/csrc/THP.h>
6 #include <torch/csrc/utils/pybind.h>
7 #include <torch/csrc/utils/pycfunction_helpers.h>
8 #include <torch/csrc/utils/python_arg_parser.h>
9 
10 #include <c10/core/DeviceGuard.h>
11 #include <c10/core/Stream.h>
12 #include <c10/core/impl/DeviceGuardImplInterface.h>
13 #include <c10/util/Exception.h>
14 #include <c10/util/hash.h>
15 #include <structmember.h>
16 #include <cstdint>
17 
18 PyTypeObject* THPStreamClass = nullptr;
19 
THPStream_pynew(PyTypeObject * type,PyObject * args,PyObject * kwargs)20 static PyObject* THPStream_pynew(
21     PyTypeObject* type,
22     PyObject* args,
23     PyObject* kwargs) {
24   HANDLE_TH_ERRORS
25 
26   int64_t stream_id = -1;
27   int64_t device_type = 0;
28   int64_t device_index = 0;
29   int64_t priority = 0;
30 
31   static torch::PythonArgParser parser({
32       "Stream(Device device=None, *, int64_t priority=0)",
33       "Stream(int64_t stream_id, int64_t device_index, int64_t device_type, *, int64_t priority=0)",
34   });
35 
36   torch::ParsedArgs<4> parsed_args;
37   auto r = parser.parse(args, kwargs, parsed_args);
38 
39   std::unique_ptr<c10::DeviceGuard> device_guard_ptr;
40 
41   if (r.idx == 0) {
42     auto default_accelerator = at::getAccelerator(false);
43     auto device = r.deviceOptional(0);
44     if (device.has_value()) {
45       device_type = static_cast<int64_t>(device->type());
46       device_index = static_cast<int64_t>(device->index());
47       // Initialize device guard if device is not None.
48       device_guard_ptr = std::make_unique<c10::DeviceGuard>(device.value());
49     } else {
50       // If device is None, we will use the current accelerator and index.
51       // If the current accelerator is not set, we will use the CPU as device
52       // type.
53       device_type = static_cast<int64_t>(
54           default_accelerator.value_or(c10::DeviceType::CPU));
55       c10::impl::VirtualGuardImpl impl{
56           static_cast<c10::DeviceType>(device_type)};
57       const auto current_device = impl.getDevice();
58       device_index = current_device.index();
59     }
60     priority = r.toInt64WithDefault(1, 0);
61   } else if (r.idx == 1) {
62     stream_id = r.toInt64WithDefault(0, -1);
63     device_index = r.toInt64WithDefault(1, 0);
64     device_type =
65         r.toInt64WithDefault(2, static_cast<int64_t>(c10::DeviceType::CPU));
66     priority = r.toInt64WithDefault(3, 0);
67   } else {
68     TORCH_CHECK(
69         false,
70         "parse stream arg fails please check the usage: ",
71         parser.get_signatures());
72   }
73 
74   THPObjectPtr ptr(type->tp_alloc(type, 0));
75   if (!ptr) {
76     return nullptr;
77   }
78 
79   THPStream* self = (THPStream*)ptr.get();
80 
81   // If torch.Stream is not created from existing Stream, then create a new one.
82   // It requires other device backends override getNewStream method. How the new
83   // stream is created is backend specific. Backend should be able to correctly
84   // manage the lifetime of streams.
85   std::optional<c10::Stream> stream_opt;
86   if (r.idx == 0) {
87     c10::impl::VirtualGuardImpl impl{static_cast<c10::DeviceType>(device_type)};
88     stream_opt = impl.getNewStream(
89         c10::Device(static_cast<c10::DeviceType>(device_type), device_index),
90         static_cast<int>(priority));
91   } else {
92     stream_opt = c10::Stream::unpack3(
93         stream_id,
94         static_cast<c10::DeviceIndex>(device_index),
95         static_cast<c10::DeviceType>(device_type));
96   }
97 
98   TORCH_CHECK(stream_opt.has_value(), "Failed to create stream");
99   self->stream_id = static_cast<int64_t>(stream_opt->id());
100   self->device_index = static_cast<int64_t>(stream_opt->device_index());
101   self->device_type = static_cast<int64_t>(stream_opt->device_type());
102 
103   return (PyObject*)ptr.release();
104   END_HANDLE_TH_ERRORS
105 }
106 
THPStream_Wrap(const c10::Stream & stream)107 PyObject* THPStream_Wrap(const c10::Stream& stream) {
108   HANDLE_TH_ERRORS
109   auto type = (PyTypeObject*)THPStreamClass;
110   THPObjectPtr ptr(type->tp_alloc(type, 0));
111   if (!ptr) {
112     throw python_error();
113   }
114 
115   THPStream* self = (THPStream*)ptr.get();
116   self->stream_id = stream.id();
117   // NOLINTNEXTLINE(bugprone-signed-char-misuse)
118   self->device_index = static_cast<int64_t>(stream.device_index());
119   self->device_type = static_cast<int64_t>(stream.device_type());
120   return ptr.release();
121   END_HANDLE_TH_ERRORS
122 }
123 
THPStream_dealloc(THPStream * self)124 static void THPStream_dealloc(THPStream* self) {
125   Py_TYPE(self)->tp_free((PyObject*)self);
126 }
127 
THPStream_get_device(THPStream * self,void * unused)128 static PyObject* THPStream_get_device(THPStream* self, void* unused) {
129   HANDLE_TH_ERRORS
130   return THPDevice_New(c10::Device(
131       static_cast<c10::DeviceType>(self->device_type),
132       static_cast<c10::DeviceIndex>(self->device_index)));
133   END_HANDLE_TH_ERRORS
134 }
135 
THPStream_query(PyObject * _self,PyObject * noargs)136 static PyObject* THPStream_query(PyObject* _self, PyObject* noargs) {
137   HANDLE_TH_ERRORS
138   auto self = (THPStream*)_self;
139 
140   return PyBool_FromLong(c10::Stream::unpack3(
141                              self->stream_id,
142                              self->device_index,
143                              static_cast<c10::DeviceType>(self->device_type))
144                              .query());
145 
146   END_HANDLE_TH_ERRORS
147 }
148 
THPStream_synchronize(PyObject * _self,PyObject * noargs)149 static PyObject* THPStream_synchronize(PyObject* _self, PyObject* noargs) {
150   HANDLE_TH_ERRORS {
151     pybind11::gil_scoped_release no_gil;
152     auto self = (THPStream*)_self;
153 
154     c10::Stream::unpack3(
155         self->stream_id,
156         self->device_index,
157         static_cast<c10::DeviceType>(self->device_type))
158         .synchronize();
159   }
160   Py_RETURN_NONE;
161   END_HANDLE_TH_ERRORS
162 }
163 
THPStream_wait_event(PyObject * _self,PyObject * _event)164 static PyObject* THPStream_wait_event(PyObject* _self, PyObject* _event) {
165   HANDLE_TH_ERRORS {
166     auto self = (THPStream*)_self;
167     auto event = (THPEvent*)_event;
168     c10::Stream::unpack3(
169         self->stream_id,
170         self->device_index,
171         static_cast<c10::DeviceType>(self->device_type))
172         .wait(event->event);
173   }
174   Py_RETURN_NONE;
175   END_HANDLE_TH_ERRORS
176 }
177 
THPStream_wait_stream(PyObject * _self,PyObject * _other)178 static PyObject* THPStream_wait_stream(PyObject* _self, PyObject* _other) {
179   HANDLE_TH_ERRORS {
180     auto self = (THPStream*)_self;
181     auto other_stream = (THPStream*)_other;
182     c10::Event new_event(
183         static_cast<c10::DeviceType>(other_stream->device_type),
184         c10::EventFlag::PYTORCH_DEFAULT);
185     new_event.record(c10::Stream::unpack3(
186         other_stream->stream_id,
187         other_stream->device_index,
188         static_cast<c10::DeviceType>(other_stream->device_type)));
189     c10::Stream::unpack3(
190         self->stream_id,
191         self->device_index,
192         static_cast<c10::DeviceType>(self->device_type))
193         .wait(new_event);
194   }
195   Py_RETURN_NONE;
196   END_HANDLE_TH_ERRORS
197 }
198 
THPStream_record_event(PyObject * _self,PyObject * args,PyObject * kwargs)199 static PyObject* THPStream_record_event(
200     PyObject* _self,
201     PyObject* args,
202     PyObject* kwargs) {
203   HANDLE_TH_ERRORS
204   auto self = (THPStream*)_self;
205   PyObject* _new_event;
206   PyObject* _event = Py_None;
207 
208   // NOLINTNEXTLINE(cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays)
209   constexpr const char* accepted_args[] = {"event", nullptr};
210   if (!PyArg_ParseTupleAndKeywords(
211           args,
212           kwargs,
213           "|O",
214           // NOLINTNEXTLINE(cppcoreguidelines-pro-type-const-cast)
215           const_cast<char**>(accepted_args),
216           &_event)) {
217     TORCH_CHECK(false, "parse record_event arg fails");
218   }
219   if (_event != Py_None) {
220     // Increase the refcount of the event to avoid it being destroyed.
221     Py_INCREF(_event);
222     _new_event = _event;
223   } else {
224     _new_event = THPEvent_new(
225         static_cast<c10::DeviceType>(self->device_type),
226         c10::EventFlag::PYTORCH_DEFAULT);
227   }
228   auto new_event = (THPEvent*)_new_event;
229   TORCH_CHECK(new_event, "event must not be null");
230   new_event->event.record(c10::Stream::unpack3(
231       self->stream_id,
232       self->device_index,
233       static_cast<c10::DeviceType>(self->device_type)));
234   return (PyObject*)new_event;
235   END_HANDLE_TH_ERRORS
236 }
237 
THPStream_repr(THPStream * self)238 static PyObject* THPStream_repr(THPStream* self) {
239   HANDLE_TH_ERRORS
240   return THPUtils_packString(
241       "torch.Stream device_type=" +
242       c10::DeviceTypeName(
243           static_cast<c10::DeviceType>(self->device_type), true) +
244       ", device_index=" + std::to_string(self->device_index) +
245       ", stream_id=" + std::to_string(self->stream_id));
246   END_HANDLE_TH_ERRORS
247 }
248 
THPStream_hash(THPStream * self)249 static Py_hash_t THPStream_hash(THPStream* self) {
250   return static_cast<long>(at::hash_combine(
251       self->device_type,
252       (at::hash_combine(self->stream_id, self->device_index))));
253 }
254 
THPStream_eq(THPStream * self,THPStream * other)255 static PyObject* THPStream_eq(THPStream* self, THPStream* other) {
256   HANDLE_TH_ERRORS
257   return PyBool_FromLong(
258       (self->stream_id == other->stream_id) &&
259       (self->device_index == other->device_index) &&
260       (self->device_type == other->device_type));
261   END_HANDLE_TH_ERRORS
262 }
263 
THPStream_ne(THPStream * self,THPStream * other)264 static PyObject* THPStream_ne(THPStream* self, THPStream* other) {
265   HANDLE_TH_ERRORS
266   return PyBool_FromLong(
267       (self->stream_id != other->stream_id) ||
268       (self->device_index != other->device_index) ||
269       (self->device_type != other->device_type));
270   END_HANDLE_TH_ERRORS
271 }
272 
THPStream_richcompare(PyObject * self,PyObject * other,int op)273 static PyObject* THPStream_richcompare(
274     PyObject* self,
275     PyObject* other,
276     int op) {
277   PyObject* result = NULL;
278   if (other == Py_None) {
279     result = Py_False;
280   } else {
281     switch (op) {
282       case Py_EQ:
283         result = THPStream_eq((THPStream*)self, (THPStream*)other);
284         break;
285       case Py_NE:
286         result = THPStream_ne((THPStream*)self, (THPStream*)other);
287         break;
288       default:
289         result = Py_False;
290         break;
291     }
292   }
293   Py_XINCREF(result);
294   return result;
295 }
296 
297 // NOLINTNEXTLINE(cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays,cppcoreguidelines-avoid-non-const-global-variables)
298 static struct PyMemberDef THPStream_members[] = {
299     {"stream_id",
300      T_LONGLONG,
301      offsetof(THPStream, stream_id),
302      READONLY,
303      nullptr},
304     {"device_index",
305      T_LONGLONG,
306      offsetof(THPStream, device_index),
307      READONLY,
308      nullptr},
309     {"device_type",
310      T_LONGLONG,
311      offsetof(THPStream, device_type),
312      READONLY,
313      nullptr},
314     {nullptr}};
315 
316 // NOLINTNEXTLINE(cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays,cppcoreguidelines-avoid-non-const-global-variables)
317 static struct PyGetSetDef THPStream_properties[] = {
318     {"device", (getter)THPStream_get_device, nullptr, nullptr, nullptr},
319     {nullptr}};
320 
321 // NOLINTNEXTLINE(cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays,cppcoreguidelines-avoid-non-const-global-variables)
322 static PyMethodDef THPStream_methods[] = {
323     {"query", THPStream_query, METH_NOARGS, nullptr},
324     {"synchronize", THPStream_synchronize, METH_NOARGS, nullptr},
325     {"wait_event", THPStream_wait_event, METH_O, nullptr},
326     {"wait_stream", THPStream_wait_stream, METH_O, nullptr},
327     {"record_event",
328      castPyCFunctionWithKeywords(THPStream_record_event),
329      METH_VARARGS | METH_KEYWORDS,
330      nullptr},
331     {"__eq__", (PyCFunction)THPStream_eq, METH_O, nullptr},
332     {nullptr}};
333 
334 PyTypeObject THPStreamType = {
335     PyVarObject_HEAD_INIT(nullptr, 0) "torch.Stream", /* tp_name */
336     sizeof(THPStream), /* tp_basicsize */
337     0, /* tp_itemsize */
338     (destructor)THPStream_dealloc, /* tp_dealloc */
339     0, /* tp_vectorcall_offset */
340     nullptr, /* tp_getattr */
341     nullptr, /* tp_setattr */
342     nullptr, /* tp_reserved */
343     (reprfunc)THPStream_repr, /* tp_repr */
344     nullptr, /* tp_as_number */
345     nullptr, /* tp_as_sequence */
346     nullptr, /* tp_as_mapping */
347     (hashfunc)THPStream_hash, /* tp_hash  */
348     nullptr, /* tp_call */
349     nullptr, /* tp_str */
350     nullptr, /* tp_getattro */
351     nullptr, /* tp_setattro */
352     nullptr, /* tp_as_buffer */
353     // NOLINTNEXTLINE(misc-redundant-expression)
354     Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
355     nullptr, /* tp_doc */
356     nullptr, /* tp_traverse */
357     nullptr, /* tp_clear */
358     THPStream_richcompare, /* tp_richcompare */
359     0, /* tp_weaklistoffset */
360     nullptr, /* tp_iter */
361     nullptr, /* tp_iternext */
362     THPStream_methods, /* tp_methods */
363     THPStream_members, /* tp_members */
364     THPStream_properties, /* tp_getset */
365     nullptr, /* tp_base */
366     nullptr, /* tp_dict */
367     nullptr, /* tp_descr_get */
368     nullptr, /* tp_descr_set */
369     0, /* tp_dictoffset */
370     nullptr, /* tp_init */
371     nullptr, /* tp_alloc */
372     THPStream_pynew, /* tp_new */
373 };
374 
THPStream_init(PyObject * module)375 void THPStream_init(PyObject* module) {
376   THPStreamClass = &THPStreamType;
377   Py_SET_TYPE(&THPStreamType, &PyType_Type);
378   if (PyType_Ready(&THPStreamType) < 0) {
379     throw python_error();
380   }
381   Py_INCREF(&THPStreamType);
382   if (PyModule_AddObject(module, "Stream", (PyObject*)&THPStreamType) < 0) {
383     throw python_error();
384   }
385 }
386