1# Owner(s): ["oncall: distributed"] 2 3import copy 4import os 5import sys 6import tempfile 7 8import test_c10d_spawn 9from test_c10d_spawn import _torch_dist_nn_available, TestDistributedNNFunctions 10 11import torch 12import torch.distributed as c10d 13import torch.nn as nn 14from torch.testing._internal.common_cuda import TEST_CUDA, TEST_MULTIGPU 15from torch.testing._internal.common_distributed import ( 16 create_device, 17 requires_gloo, 18 skip_if_lt_x_gpu, 19) 20from torch.testing._internal.common_utils import ( 21 run_tests, 22 skip_but_pass_in_sandcastle_if, 23 TEST_WITH_DEV_DBG_ASAN, 24 TestCase, 25) 26 27 28# Fails on Python-3.9, see https://github.com/pytorch/pytorch/issues/51619 29if sys.version_info < (3, 9): 30 31 class ProcessGroupShareTensorTest( 32 test_c10d_spawn.AbstractProcessGroupShareTensorTest, TestCase 33 ): 34 @classmethod 35 def opts(cls, threads=2): 36 opts = c10d.ProcessGroupGloo._Options() 37 opts._timeout = 5.0 38 opts._devices = [create_device(interface="lo")] 39 opts._threads = threads 40 return opts 41 42 @classmethod 43 def _init_pg_gloo(cls, rank, filename, world_size): 44 store = c10d.FileStore(filename, world_size) 45 backend = c10d.ProcessGroupGloo( 46 store, rank, world_size, ProcessGroupShareTensorTest.opts() 47 ) 48 # set process group backends manually 49 c10d.init_process_group( 50 backend="gloo", store=store, rank=rank, world_size=world_size 51 ) 52 pg = c10d.distributed_c10d._get_default_group() 53 pg._register_backend( 54 torch.device("cpu"), c10d.ProcessGroup.BackendType.GLOO, backend 55 ) 56 pg._register_backend( 57 torch.device("cuda"), c10d.ProcessGroup.BackendType.GLOO, backend 58 ) 59 60 return pg 61 62 @skip_but_pass_in_sandcastle_if( 63 not TEST_MULTIGPU, "At least 2 CUDA GPUS needed" 64 ) 65 def test_shared_broadcast_gloo(self): 66 self._test_multiprocess( 67 ProcessGroupShareTensorTest._test_broadcast_process, 68 [torch.ones(2, 2).to(i) * i for i in range(self.world_size)], 69 ProcessGroupShareTensorTest._init_pg_gloo, 70 1, 71 ) 72 73 @skip_but_pass_in_sandcastle_if( 74 not TEST_MULTIGPU, "At least 2 CUDA GPUS needed" 75 ) 76 def test_shared_allreduce_gloo(self): 77 self._test_multiprocess( 78 ProcessGroupShareTensorTest._test_allreduce_process, 79 [torch.ones(2, 2).to(i) for i in range(self.world_size)], 80 ProcessGroupShareTensorTest._init_pg_gloo, 81 1, 82 ) 83 84 @skip_but_pass_in_sandcastle_if( 85 not TEST_MULTIGPU, "At least 2 CUDA GPUS needed" 86 ) 87 def test_shared_allgather_gloo(self): 88 self._test_multiprocess( 89 ProcessGroupShareTensorTest._test_allgather_process, 90 [torch.ones(2, 2).to(i) * i for i in range(self.world_size)], 91 ProcessGroupShareTensorTest._init_pg_gloo, 92 self.world_size, 93 ) 94 95 @classmethod 96 def _test_allgather_chunk_process( 97 cls, rank, filename, shared_tensor, world_size, init_pg, c2p, p2c 98 ): 99 pg = init_pg(rank, filename, world_size) 100 chunks = torch.chunk(shared_tensor, world_size, dim=0) 101 x = chunks[rank] 102 ys = [torch.zeros_like(x) for _ in range(world_size)] 103 pg.allgather(ys, x).wait() 104 c2p.put((rank, chunks[0].to("cpu"), ys[0].to("cpu"))) 105 c2p.put((rank, chunks[1].to("cpu"), ys[1].to("cpu"))) 106 p2c.get() 107 108 @skip_but_pass_in_sandcastle_if( 109 not TEST_MULTIGPU, "At least 2 CUDA GPUS needed" 110 ) 111 def test_shared_allgather_chunk_gloo(self): 112 self._test_multiprocess( 113 ProcessGroupShareTensorTest._test_allgather_chunk_process, 114 torch.tensor(range(4)).reshape(2, 2), 115 ProcessGroupShareTensorTest._init_pg_gloo, 116 self.world_size, 117 ) 118 119 120class DistributedDataParallelSingleProcessTest(TestCase): 121 def setUp(self): 122 self.rank = 0 123 self.world_size = 1 124 self.file = tempfile.NamedTemporaryFile(delete=False) # noqa: P201 125 126 def tearDown(self): 127 try: 128 os.remove(self.file.name) 129 except OSError: 130 pass 131 132 def _test_base(self, net, inp, check_allclose=True): 133 store = c10d.FileStore(self.file.name, self.world_size) 134 c10d.init_process_group( 135 backend="gloo", store=store, rank=self.rank, world_size=self.world_size 136 ) 137 process_group = c10d.distributed_c10d._get_default_group() 138 if inp[0].is_cuda: 139 device_ids = [torch.cuda.current_device()] 140 else: 141 device_ids = None 142 143 ddp = nn.parallel.DistributedDataParallel( 144 copy.deepcopy(net), device_ids=device_ids, process_group=process_group 145 ) 146 147 net_opt = torch.optim.Adam(net.parameters(), lr=0.001) 148 ddp_opt = torch.optim.Adam(ddp.parameters(), lr=0.001) 149 150 for i, j in zip(ddp.parameters(), net.parameters()): 151 self.assertTrue(i.allclose(j)) 152 153 for _ in range(10): 154 net_out = net(*inp) 155 ddp_out = ddp(*inp) 156 157 net_out.sum().backward() 158 ddp_out.sum().backward() 159 160 net_opt.step() 161 ddp_opt.step() 162 163 if check_allclose: 164 for i, j in zip(ddp.parameters(), net.parameters()): 165 self.assertTrue(i.allclose(j)) 166 167 @requires_gloo() 168 def test_cpu(self): 169 self._test_base(nn.Linear(2, 2), [torch.randn(30, 2)]) 170 171 @requires_gloo() 172 @skip_but_pass_in_sandcastle_if(not TEST_CUDA, "At least 1 CUDA GPUS needed") 173 def test_cuda(self): 174 self._test_base(nn.Linear(2, 2).to(0), [torch.randn(30, 2).to(0)]) 175 176 @requires_gloo() 177 @skip_but_pass_in_sandcastle_if(not TEST_CUDA, "At least 1 CUDA GPUS needed") 178 def test_rnn(self): 179 # This test is inspired by the bug reported in 180 # https://github.com/pytorch/pytorch/issues/36268 181 BATCH_SIZE = 12 # Divisible by 2, 3, 4 182 INPUT_DIM = 256 183 OUTPUT_DIM = 256 184 HIDDEN_DIM = 256 185 N_LAYERS = 3 186 SEQ_LEN = 100 187 188 class Net(nn.Module): 189 def __init__(self, input_dim, hidden_dim, output_dim, hidden_layers): 190 super().__init__() 191 self.input_dim = input_dim 192 self.hidden_dim = hidden_dim 193 self.output_dim = output_dim 194 self.hidden_layers = hidden_layers 195 196 self.lstm = nn.LSTM( 197 input_dim, hidden_dim, hidden_layers, batch_first=True 198 ) 199 self.h2o = nn.Linear(hidden_dim, output_dim) 200 201 def forward(self, x, y): 202 self.lstm.flatten_parameters() 203 h_t, _ = self.lstm(x) 204 output = self.h2o(h_t) 205 loss = nn.functional.mse_loss(output, y) 206 return loss 207 208 net = Net(INPUT_DIM, HIDDEN_DIM, OUTPUT_DIM, N_LAYERS).to(0) 209 inp = [ 210 torch.randn((BATCH_SIZE, SEQ_LEN, INPUT_DIM)).to(0), 211 torch.rand((BATCH_SIZE, SEQ_LEN, OUTPUT_DIM)).to(0), 212 ] 213 214 # Not checking result allclose as the parameter inconsistency exist 215 # prior to this change. See #37079 216 self._test_base(net, inp, check_allclose=False) 217 218 219# Skip dev-asan as torch + multiprocessing spawn have known issues 220if not TEST_WITH_DEV_DBG_ASAN: 221 222 class TestDistributedNNFunctionsGloo(TestDistributedNNFunctions): 223 # Test Common Ops First. 224 @requires_gloo() 225 @skip_if_lt_x_gpu(2) 226 @skip_but_pass_in_sandcastle_if( 227 not _torch_dist_nn_available, "torch.distributed.nn is not available" 228 ) 229 def test_broadcast(self): 230 self._test_broadcast("gloo") 231 232 @requires_gloo() 233 @skip_if_lt_x_gpu(2) 234 @skip_but_pass_in_sandcastle_if( 235 not _torch_dist_nn_available, "torch.distributed.nn is not available" 236 ) 237 def test_reduce(self): 238 self._test_reduce("gloo") 239 240 @requires_gloo() 241 @skip_if_lt_x_gpu(2) 242 @skip_but_pass_in_sandcastle_if( 243 not _torch_dist_nn_available, "torch.distributed.nn is not available" 244 ) 245 def test_allreduce(self): 246 self._test_allreduce("gloo") 247 248 @requires_gloo() 249 @skip_if_lt_x_gpu(2) 250 @skip_but_pass_in_sandcastle_if( 251 not _torch_dist_nn_available, "torch.distributed.nn is not available" 252 ) 253 def test_all_gather(self): 254 self._test_all_gather("gloo") 255 256 @requires_gloo() 257 @skip_if_lt_x_gpu(2) 258 @skip_but_pass_in_sandcastle_if( 259 not _torch_dist_nn_available, "torch.distributed.nn is not available" 260 ) 261 def test_all_to_all(self): 262 self._test_all_to_all("gloo") 263 264 @requires_gloo() 265 @skip_if_lt_x_gpu(2) 266 @skip_but_pass_in_sandcastle_if( 267 not _torch_dist_nn_available, "torch.distributed.nn is not available" 268 ) 269 def test_all_to_all_single(self): 270 self._test_all_to_all_single("gloo") 271 272 # Test Ops only supported in GLOO. 273 @requires_gloo() 274 @skip_if_lt_x_gpu(2) 275 @skip_but_pass_in_sandcastle_if( 276 not _torch_dist_nn_available, "torch.distributed.nn is not available" 277 ) 278 def test_gather(self): 279 store = c10d.FileStore(self.file_name, self.world_size) 280 # This is required because these functions calls directly to the .dist and needs 281 # the world to be initialized 282 c10d.init_process_group( 283 store=store, rank=self.rank, world_size=self.world_size, backend="gloo" 284 ) 285 device = torch.device(f"cuda:{self.rank}") 286 x = torch.ones(5, 5, device=device) + self.rank 287 x.requires_grad = True 288 tensors = torch.distributed.nn.gather(x, 1) 289 if self.rank == 1: 290 for i, t in enumerate(tensors): 291 self.assertEqual(t, torch.ones(5, 5, device=device) + i) 292 elif self.rank == 0: 293 for i, t in enumerate(tensors): 294 zeros = torch.zeros(5, 5, device=device) 295 self.assertEqual(t, zeros) 296 y = torch.sum(torch.stack(tensors), axis=0) 297 z = y.sin().sum() 298 z.backward() 299 300 # Test gradient 301 x_s = 3 * torch.ones(5, 5, device=device) 302 self.assertEqual(x.grad, x_s.cos()) 303 304 @requires_gloo() 305 @skip_if_lt_x_gpu(2) 306 @skip_but_pass_in_sandcastle_if( 307 not _torch_dist_nn_available, "torch.distributed.nn is not available" 308 ) 309 def test_scatter(self): 310 store = c10d.FileStore(self.file_name, self.world_size) 311 # This is required because these functions calls directly to the .dist and needs 312 # the world to be initialized 313 c10d.init_process_group( 314 store=store, rank=self.rank, world_size=self.world_size, backend="gloo" 315 ) 316 device = torch.device(f"cuda:{self.rank}") 317 x0 = torch.ones(5, 5, device=device) 318 x1 = torch.ones(5, 5, device=device) + 1 319 x0.requires_grad = True 320 x1.requires_grad = True 321 322 y = torch.distributed.nn.scatter([x0, x1], 1) 323 if self.rank == 1: 324 self.assertEqual(y, 1 + torch.ones(5, 5, device=device)) 325 elif self.rank == 0: 326 self.assertEqual(y, torch.ones(5, 5, device=device)) 327 z = y.sin().sum() 328 z.backward() 329 330 # Test gradient 331 if self.rank == 1: 332 x0_s = torch.ones(5, 5, device=device).cos() 333 x1_s = (2 * torch.ones(5, 5, device=device)).cos() 334 self.assertEqual(x0.grad, x0_s) 335 self.assertEqual(x1.grad, x1_s) 336 if self.rank == 0: 337 self.assertEqual(x0.grad, torch.zeros(5, 5, device=device)) 338 339 340if __name__ == "__main__": 341 run_tests() 342