# Owner(s): ["oncall: distributed"] import re import sys import torch import torch.cuda import torch.cuda.nccl as nccl import torch.distributed as c10d from torch.testing._internal.common_cuda import TEST_CUDA, TEST_MULTIGPU from torch.testing._internal.common_device_type import ( dtypes, instantiate_device_type_tests, ) from torch.testing._internal.common_utils import ( IS_WINDOWS, load_tests, NoTest, run_tests, skip_but_pass_in_sandcastle_if, TEST_WITH_ROCM, TestCase, ) HIP_VERSION = ( 0.0 if torch.version.hip is None else float(re.search(r"^\d+\.\d+", torch.version.hip)[0]) ) # load_tests from common_utils is used to automatically filter tests for # sharding on sandcastle. This line silences flake warnings load_tests = load_tests nGPUs = torch.cuda.device_count() if not TEST_CUDA: print("CUDA not available, skipping tests", file=sys.stderr) TestCase = NoTest # noqa: F811 datatypes = [torch.float] if ( TEST_CUDA and c10d.is_nccl_available() and nccl.version() >= (2, 10) ) or TEST_WITH_ROCM: datatypes.append(torch.bfloat16) class TestNCCL(TestCase): @skip_but_pass_in_sandcastle_if(IS_WINDOWS, "NCCL doesn't support Windows") def test_unique_id(self, device): uid = nccl.unique_id() self.assertIsInstance(uid, bytes) self.assertGreater(len(uid), 1) @skip_but_pass_in_sandcastle_if( TEST_WITH_ROCM and HIP_VERSION < 3.5, "Skip NCCL tests for ROCm" ) @skip_but_pass_in_sandcastle_if(IS_WINDOWS, "NCCL doesn't support Windows") @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "only one GPU detected") @dtypes(*datatypes) def test_broadcast(self, device, dtype): expected = torch.zeros(128).uniform_().to(dtype=dtype) tensors = [expected.cuda()] for device in range(1, torch.cuda.device_count()): tensors.append(torch.zeros(128, dtype=dtype, device=device)) nccl.broadcast(tensors) for i in range(torch.cuda.device_count()): self.assertEqual(tensors[i], expected) # Test with tuple tensors = [expected.cuda()] for device in range(1, torch.cuda.device_count()): tensors.append(torch.zeros(128, dtype=dtype, device=device)) nccl.broadcast(tuple(tensors)) for i in range(torch.cuda.device_count()): self.assertEqual(tensors[i], expected) @skip_but_pass_in_sandcastle_if( TEST_WITH_ROCM and HIP_VERSION < 3.5, "Skip NCCL tests for ROCm" ) @skip_but_pass_in_sandcastle_if(IS_WINDOWS, "NCCL doesn't support Windows") @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "only one GPU detected") @dtypes(*datatypes) def test_reduce(self, device, dtype): cpu_tensors = [ torch.zeros(128).uniform_().to(dtype=dtype) for i in range(nGPUs) ] expected = torch.zeros(128, dtype=dtype) for t in cpu_tensors: expected.add_(t) tensors = [cpu_tensors[i].cuda(i) for i in range(nGPUs)] nccl.reduce(tensors) self.assertEqual(tensors[0], expected) # Test with tuple tensors = [cpu_tensors[i].cuda(i) for i in range(nGPUs)] nccl.reduce(tuple(tensors)) self.assertEqual(tensors[0], expected) @skip_but_pass_in_sandcastle_if(IS_WINDOWS, "NCCL doesn't support Windows") @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "only one GPU detected") @skip_but_pass_in_sandcastle_if( TEST_WITH_ROCM and HIP_VERSION < 3.5 and dtype == torch.bfloat16, # noqa: F821 "Skip bfloat16 test for ROCm < 3.5", ) @dtypes(*datatypes) def test_all_reduce(self, device, dtype): cpu_tensors = [ torch.zeros(128).uniform_().to(dtype=dtype) for i in range(nGPUs) ] expected = torch.zeros(128, dtype=dtype) for t in cpu_tensors: expected.add_(t) tensors = [cpu_tensors[i].cuda(i) for i in range(nGPUs)] nccl.all_reduce(tensors) for tensor in tensors: self.assertEqual(tensor, expected) # Test with tuple. tensors = tuple(cpu_tensors[i].cuda(i) for i in range(nGPUs)) nccl.all_reduce(tensors) for tensor in tensors: self.assertEqual(tensor, expected) # Test with set. tensors = {cpu_tensors[i].cuda(i) for i in range(nGPUs)} nccl.all_reduce(tensors) for tensor in tensors: self.assertEqual(tensor, expected) @skip_but_pass_in_sandcastle_if( TEST_WITH_ROCM and HIP_VERSION < 3.5, "Skip NCCL tests for ROCm" ) @skip_but_pass_in_sandcastle_if(IS_WINDOWS, "NCCL doesn't support Windows") def test_collective_errors(self, device): t = torch.rand(10).cuda(0) with self.assertRaisesRegex( TypeError, "Inputs should be a collection of tensors" ): nccl.all_reduce(t) with self.assertRaisesRegex( TypeError, "Inputs should be a collection of tensors" ): nccl.reduce(t) with self.assertRaisesRegex( TypeError, "Inputs should be a collection of tensors" ): nccl.broadcast(t) with self.assertRaisesRegex( TypeError, "Inputs should be a collection of tensors" ): nccl.all_gather(t, t) with self.assertRaisesRegex( TypeError, "Inputs should be a collection of tensors" ): nccl.reduce_scatter(t, t) @skip_but_pass_in_sandcastle_if( TEST_WITH_ROCM and HIP_VERSION < 3.5, "Skip NCCL tests for ROCm" ) @skip_but_pass_in_sandcastle_if(IS_WINDOWS, "NCCL doesn't support Windows") @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "only one GPU detected") @dtypes(*datatypes) def test_all_gather(self, device, dtype): cpu_inputs = [torch.zeros(128).uniform_().to(dtype=dtype) for i in range(nGPUs)] expected = torch.cat(cpu_inputs, 0) inputs = [cpu_inputs[i].cuda(i) for i in range(nGPUs)] outputs = [ torch.zeros(128 * nGPUs, device=i, dtype=dtype) for i in range(nGPUs) ] nccl.all_gather(inputs, outputs) for tensor in outputs: self.assertEqual(tensor, expected) # Test with tuple. inputs = [cpu_inputs[i].cuda(i) for i in range(nGPUs)] outputs = [ torch.zeros(128 * nGPUs, device=i, dtype=dtype) for i in range(nGPUs) ] nccl.all_gather(tuple(inputs), tuple(outputs)) for tensor in outputs: self.assertEqual(tensor, expected) @skip_but_pass_in_sandcastle_if( TEST_WITH_ROCM and HIP_VERSION < 3.5, "Skip NCCL tests for ROCm" ) @skip_but_pass_in_sandcastle_if(IS_WINDOWS, "NCCL doesn't support Windows") @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "only one GPU detected") @dtypes(*datatypes) def test_reduce_scatter(self, device, dtype): in_size = 32 * nGPUs out_size = 32 cpu_inputs = [ torch.zeros(in_size).uniform_().to(dtype=dtype) for i in range(nGPUs) ] expected = torch.zeros(in_size, dtype=dtype) for t in cpu_inputs: expected.add_(t) expected = expected.view(nGPUs, 32) inputs = [cpu_inputs[i].cuda(i) for i in range(nGPUs)] outputs = [torch.zeros(out_size, device=i, dtype=dtype) for i in range(nGPUs)] nccl.reduce_scatter(inputs, outputs) for i in range(nGPUs): self.assertEqual(outputs[i], expected[i]) # Test with tuple inputs = [cpu_inputs[i].cuda(i) for i in range(nGPUs)] outputs = [torch.zeros(out_size, device=i, dtype=dtype) for i in range(nGPUs)] nccl.reduce_scatter(tuple(inputs), tuple(outputs)) for i in range(nGPUs): self.assertEqual(outputs[i], expected[i]) instantiate_device_type_tests(TestNCCL, globals(), only_for="cuda") if __name__ == "__main__": run_tests()