xref: /aosp_15_r20/external/pytorch/torch/csrc/distributed/rpc/tensorpipe_cuda.cpp (revision da0073e96a02ea20f0ac840b70461e3646d07c45)
1 #include <torch/csrc/distributed/rpc/tensorpipe_agent.h>
2 #include <torch/csrc/distributed/rpc/tensorpipe_utils.h>
3 
4 #if defined(USE_TENSORPIPE) && !defined(USE_ROCM)
5 
6 #include <c10/cuda/CUDACachingAllocator.h>
7 #include <c10/cuda/CUDAGuard.h>
8 #include <c10/cuda/CUDAStream.h>
9 
10 C10_DIAGNOSTIC_PUSH_AND_IGNORED_IF_DEFINED("-Wdeprecated")
11 #include <tensorpipe/tensorpipe.h>
12 #include <tensorpipe/tensorpipe_cuda.h>
13 C10_DIAGNOSTIC_POP()
14 
15 namespace torch::distributed::rpc {
16 namespace {
17 
18 #if TENSORPIPE_HAS_CUDA_IPC_CHANNEL
19 
makeCudaIpcChannel()20 std::unique_ptr<ChannelRegistration> makeCudaIpcChannel() {
21   auto context = tensorpipe::channel::cuda_ipc::create();
22   return std::make_unique<ChannelRegistration>(
23       ChannelRegistration{std::move(context), kCudaIpcChannelPriority});
24 }
25 
26 // The cuda_ipc channels use cudaMemcpy to transmit CUDA tensor across processes
27 C10_REGISTER_CREATOR(TensorPipeChannelRegistry, cuda_ipc, makeCudaIpcChannel);
28 
29 #endif
30 
31 #if TENSORPIPE_HAS_CUDA_GDR_CHANNEL
32 
makeCudaGdrChannel()33 std::unique_ptr<ChannelRegistration> makeCudaGdrChannel() {
34   auto context = tensorpipe::channel::cuda_gdr::create();
35   return std::make_unique<ChannelRegistration>(
36       ChannelRegistration{std::move(context), kCudaGdrChannelPriority});
37 }
38 
39 // The cuda_gdr channel sends CUDA memory over InfiniBand using GPUDirect RDMA.
40 // It directly registers the user-provided tensor with libibverbs, an operation
41 // which is expensive the first time, but it then caches the registration in
42 // order to amortize the cost and get low latency for subsequent transfers. A
43 // ready-to-send/ready-to-receive handshake is still needed before the transfer
44 // in order to ensure readiness and to agree on the device indices and thus the
45 // queue pair to use. It automatically pairs each GPU to the "closest" NIC if
46 // there are multiple of them (closest = longest prefix match in PCI tree).
47 C10_REGISTER_CREATOR(TensorPipeChannelRegistry, cuda_gdr, makeCudaGdrChannel);
48 
49 #endif
50 
makeCudaXthChannel()51 std::unique_ptr<ChannelRegistration> makeCudaXthChannel() {
52   auto context = tensorpipe::channel::cuda_xth::create();
53   return std::make_unique<ChannelRegistration>(
54       ChannelRegistration{std::move(context), kCudaXthChannelPriority});
55 }
56 
57 // The cuda_xth channel supports same-process GPU-to-GPU comm
58 C10_REGISTER_CREATOR(TensorPipeChannelRegistry, cuda_xth, makeCudaXthChannel);
59 
makeCudaBasicChannel()60 std::unique_ptr<ChannelRegistration> makeCudaBasicChannel() {
61   auto context = tensorpipe::channel::cuda_basic::create(
62       tensorpipe::channel::basic::create());
63   return std::make_unique<ChannelRegistration>(
64       ChannelRegistration{std::move(context), kCudaBasicChannelPriority});
65 }
66 
67 // The cuda_basic is the fallback channel for GPU-to-GPU comm
68 C10_REGISTER_CREATOR(
69     TensorPipeChannelRegistry,
70     cuda_basic,
71     makeCudaBasicChannel);
72 
73 class TensorpipeCudaConverter : public TensorpipeDeviceTypeConverter {
74  public:
prepareTensorForSending(const c10::Storage & storage,const std::vector<c10::Stream> & streams,tensorpipe::Message & message) const75   std::optional<std::vector<char>> prepareTensorForSending(
76       const c10::Storage& storage,
77       const std::vector<c10::Stream>& streams,
78       tensorpipe::Message& message) const override {
79     auto stream =
80         at::cuda::CUDAStream(getStreamForDevice(streams, storage.device()));
81     // record tensor data ptrs on TensorPipe streams, so that the tensors
82     // won't be destructed before TensorPipe finishing sending them.
83     c10::cuda::CUDACachingAllocator::recordStream(storage.data_ptr(), stream);
84 
85     tensorpipe::CudaBuffer buffer;
86     buffer.ptr = static_cast<char*>(storage.mutable_data());
87     buffer.stream = stream.stream();
88 
89     tensorpipe::Message::Tensor tensor;
90     tensor.buffer = buffer;
91     tensor.length = storage.nbytes();
92 
93     message.tensors.push_back(std::move(tensor));
94 
95     return std::nullopt;
96   }
97 
allocateTensorForReceiving(c10::DeviceIndex deviceIndex,size_t length,const std::vector<c10::Stream> & streams,tensorpipe::Allocation & allocation) const98   at::DataPtr allocateTensorForReceiving(
99       c10::DeviceIndex deviceIndex,
100       size_t length,
101       const std::vector<c10::Stream>& streams,
102       tensorpipe::Allocation& allocation) const override {
103     c10::Device device(c10::kCUDA, deviceIndex);
104     at::cuda::CUDAStream stream(getStreamForDevice(streams, device));
105     // CUDACachingAllocator will call recordStream accordingly on the current
106     // stream.
107     at::cuda::CUDAStreamGuard guard(stream);
108     at::DataPtr dataPtr =
109         c10::cuda::CUDACachingAllocator::get()->allocate(length);
110 
111     tensorpipe::CudaBuffer buffer;
112     buffer.ptr = dataPtr.get();
113     buffer.stream = stream.stream();
114 
115     tensorpipe::Allocation::Tensor tensor;
116     tensor.buffer = buffer;
117 
118     allocation.tensors.push_back(tensor);
119 
120     return dataPtr;
121   }
122 };
123 
124 C10_REGISTER_TENSORPIPE_DEVICE_TYPE_CONVERTER(CUDA, TensorpipeCudaConverter);
125 
126 } // namespace
127 } // namespace torch::distributed::rpc
128 
129 #endif
130