xref: /aosp_15_r20/external/pytorch/test/distributed/test_c10d_spawn_gloo.py (revision da0073e96a02ea20f0ac840b70461e3646d07c45)
1# Owner(s): ["oncall: distributed"]
2
3import copy
4import os
5import sys
6import tempfile
7
8import test_c10d_spawn
9from test_c10d_spawn import _torch_dist_nn_available, TestDistributedNNFunctions
10
11import torch
12import torch.distributed as c10d
13import torch.nn as nn
14from torch.testing._internal.common_cuda import TEST_CUDA, TEST_MULTIGPU
15from torch.testing._internal.common_distributed import (
16    create_device,
17    requires_gloo,
18    skip_if_lt_x_gpu,
19)
20from torch.testing._internal.common_utils import (
21    run_tests,
22    skip_but_pass_in_sandcastle_if,
23    TEST_WITH_DEV_DBG_ASAN,
24    TestCase,
25)
26
27
28# Fails on Python-3.9, see https://github.com/pytorch/pytorch/issues/51619
29if sys.version_info < (3, 9):
30
31    class ProcessGroupShareTensorTest(
32        test_c10d_spawn.AbstractProcessGroupShareTensorTest, TestCase
33    ):
34        @classmethod
35        def opts(cls, threads=2):
36            opts = c10d.ProcessGroupGloo._Options()
37            opts._timeout = 5.0
38            opts._devices = [create_device(interface="lo")]
39            opts._threads = threads
40            return opts
41
42        @classmethod
43        def _init_pg_gloo(cls, rank, filename, world_size):
44            store = c10d.FileStore(filename, world_size)
45            backend = c10d.ProcessGroupGloo(
46                store, rank, world_size, ProcessGroupShareTensorTest.opts()
47            )
48            # set process group backends manually
49            c10d.init_process_group(
50                backend="gloo", store=store, rank=rank, world_size=world_size
51            )
52            pg = c10d.distributed_c10d._get_default_group()
53            pg._register_backend(
54                torch.device("cpu"), c10d.ProcessGroup.BackendType.GLOO, backend
55            )
56            pg._register_backend(
57                torch.device("cuda"), c10d.ProcessGroup.BackendType.GLOO, backend
58            )
59
60            return pg
61
62        @skip_but_pass_in_sandcastle_if(
63            not TEST_MULTIGPU, "At least 2 CUDA GPUS needed"
64        )
65        def test_shared_broadcast_gloo(self):
66            self._test_multiprocess(
67                ProcessGroupShareTensorTest._test_broadcast_process,
68                [torch.ones(2, 2).to(i) * i for i in range(self.world_size)],
69                ProcessGroupShareTensorTest._init_pg_gloo,
70                1,
71            )
72
73        @skip_but_pass_in_sandcastle_if(
74            not TEST_MULTIGPU, "At least 2 CUDA GPUS needed"
75        )
76        def test_shared_allreduce_gloo(self):
77            self._test_multiprocess(
78                ProcessGroupShareTensorTest._test_allreduce_process,
79                [torch.ones(2, 2).to(i) for i in range(self.world_size)],
80                ProcessGroupShareTensorTest._init_pg_gloo,
81                1,
82            )
83
84        @skip_but_pass_in_sandcastle_if(
85            not TEST_MULTIGPU, "At least 2 CUDA GPUS needed"
86        )
87        def test_shared_allgather_gloo(self):
88            self._test_multiprocess(
89                ProcessGroupShareTensorTest._test_allgather_process,
90                [torch.ones(2, 2).to(i) * i for i in range(self.world_size)],
91                ProcessGroupShareTensorTest._init_pg_gloo,
92                self.world_size,
93            )
94
95        @classmethod
96        def _test_allgather_chunk_process(
97            cls, rank, filename, shared_tensor, world_size, init_pg, c2p, p2c
98        ):
99            pg = init_pg(rank, filename, world_size)
100            chunks = torch.chunk(shared_tensor, world_size, dim=0)
101            x = chunks[rank]
102            ys = [torch.zeros_like(x) for _ in range(world_size)]
103            pg.allgather(ys, x).wait()
104            c2p.put((rank, chunks[0].to("cpu"), ys[0].to("cpu")))
105            c2p.put((rank, chunks[1].to("cpu"), ys[1].to("cpu")))
106            p2c.get()
107
108        @skip_but_pass_in_sandcastle_if(
109            not TEST_MULTIGPU, "At least 2 CUDA GPUS needed"
110        )
111        def test_shared_allgather_chunk_gloo(self):
112            self._test_multiprocess(
113                ProcessGroupShareTensorTest._test_allgather_chunk_process,
114                torch.tensor(range(4)).reshape(2, 2),
115                ProcessGroupShareTensorTest._init_pg_gloo,
116                self.world_size,
117            )
118
119
120class DistributedDataParallelSingleProcessTest(TestCase):
121    def setUp(self):
122        self.rank = 0
123        self.world_size = 1
124        self.file = tempfile.NamedTemporaryFile(delete=False)  # noqa: P201
125
126    def tearDown(self):
127        try:
128            os.remove(self.file.name)
129        except OSError:
130            pass
131
132    def _test_base(self, net, inp, check_allclose=True):
133        store = c10d.FileStore(self.file.name, self.world_size)
134        c10d.init_process_group(
135            backend="gloo", store=store, rank=self.rank, world_size=self.world_size
136        )
137        process_group = c10d.distributed_c10d._get_default_group()
138        if inp[0].is_cuda:
139            device_ids = [torch.cuda.current_device()]
140        else:
141            device_ids = None
142
143        ddp = nn.parallel.DistributedDataParallel(
144            copy.deepcopy(net), device_ids=device_ids, process_group=process_group
145        )
146
147        net_opt = torch.optim.Adam(net.parameters(), lr=0.001)
148        ddp_opt = torch.optim.Adam(ddp.parameters(), lr=0.001)
149
150        for i, j in zip(ddp.parameters(), net.parameters()):
151            self.assertTrue(i.allclose(j))
152
153        for _ in range(10):
154            net_out = net(*inp)
155            ddp_out = ddp(*inp)
156
157            net_out.sum().backward()
158            ddp_out.sum().backward()
159
160            net_opt.step()
161            ddp_opt.step()
162
163        if check_allclose:
164            for i, j in zip(ddp.parameters(), net.parameters()):
165                self.assertTrue(i.allclose(j))
166
167    @requires_gloo()
168    def test_cpu(self):
169        self._test_base(nn.Linear(2, 2), [torch.randn(30, 2)])
170
171    @requires_gloo()
172    @skip_but_pass_in_sandcastle_if(not TEST_CUDA, "At least 1 CUDA GPUS needed")
173    def test_cuda(self):
174        self._test_base(nn.Linear(2, 2).to(0), [torch.randn(30, 2).to(0)])
175
176    @requires_gloo()
177    @skip_but_pass_in_sandcastle_if(not TEST_CUDA, "At least 1 CUDA GPUS needed")
178    def test_rnn(self):
179        # This test is inspired by the bug reported in
180        # https://github.com/pytorch/pytorch/issues/36268
181        BATCH_SIZE = 12  # Divisible by 2, 3, 4
182        INPUT_DIM = 256
183        OUTPUT_DIM = 256
184        HIDDEN_DIM = 256
185        N_LAYERS = 3
186        SEQ_LEN = 100
187
188        class Net(nn.Module):
189            def __init__(self, input_dim, hidden_dim, output_dim, hidden_layers):
190                super().__init__()
191                self.input_dim = input_dim
192                self.hidden_dim = hidden_dim
193                self.output_dim = output_dim
194                self.hidden_layers = hidden_layers
195
196                self.lstm = nn.LSTM(
197                    input_dim, hidden_dim, hidden_layers, batch_first=True
198                )
199                self.h2o = nn.Linear(hidden_dim, output_dim)
200
201            def forward(self, x, y):
202                self.lstm.flatten_parameters()
203                h_t, _ = self.lstm(x)
204                output = self.h2o(h_t)
205                loss = nn.functional.mse_loss(output, y)
206                return loss
207
208        net = Net(INPUT_DIM, HIDDEN_DIM, OUTPUT_DIM, N_LAYERS).to(0)
209        inp = [
210            torch.randn((BATCH_SIZE, SEQ_LEN, INPUT_DIM)).to(0),
211            torch.rand((BATCH_SIZE, SEQ_LEN, OUTPUT_DIM)).to(0),
212        ]
213
214        # Not checking result allclose as the parameter inconsistency exist
215        # prior to this change. See #37079
216        self._test_base(net, inp, check_allclose=False)
217
218
219# Skip dev-asan as torch + multiprocessing spawn have known issues
220if not TEST_WITH_DEV_DBG_ASAN:
221
222    class TestDistributedNNFunctionsGloo(TestDistributedNNFunctions):
223        # Test Common Ops First.
224        @requires_gloo()
225        @skip_if_lt_x_gpu(2)
226        @skip_but_pass_in_sandcastle_if(
227            not _torch_dist_nn_available, "torch.distributed.nn is not available"
228        )
229        def test_broadcast(self):
230            self._test_broadcast("gloo")
231
232        @requires_gloo()
233        @skip_if_lt_x_gpu(2)
234        @skip_but_pass_in_sandcastle_if(
235            not _torch_dist_nn_available, "torch.distributed.nn is not available"
236        )
237        def test_reduce(self):
238            self._test_reduce("gloo")
239
240        @requires_gloo()
241        @skip_if_lt_x_gpu(2)
242        @skip_but_pass_in_sandcastle_if(
243            not _torch_dist_nn_available, "torch.distributed.nn is not available"
244        )
245        def test_allreduce(self):
246            self._test_allreduce("gloo")
247
248        @requires_gloo()
249        @skip_if_lt_x_gpu(2)
250        @skip_but_pass_in_sandcastle_if(
251            not _torch_dist_nn_available, "torch.distributed.nn is not available"
252        )
253        def test_all_gather(self):
254            self._test_all_gather("gloo")
255
256        @requires_gloo()
257        @skip_if_lt_x_gpu(2)
258        @skip_but_pass_in_sandcastle_if(
259            not _torch_dist_nn_available, "torch.distributed.nn is not available"
260        )
261        def test_all_to_all(self):
262            self._test_all_to_all("gloo")
263
264        @requires_gloo()
265        @skip_if_lt_x_gpu(2)
266        @skip_but_pass_in_sandcastle_if(
267            not _torch_dist_nn_available, "torch.distributed.nn is not available"
268        )
269        def test_all_to_all_single(self):
270            self._test_all_to_all_single("gloo")
271
272        # Test Ops only supported in GLOO.
273        @requires_gloo()
274        @skip_if_lt_x_gpu(2)
275        @skip_but_pass_in_sandcastle_if(
276            not _torch_dist_nn_available, "torch.distributed.nn is not available"
277        )
278        def test_gather(self):
279            store = c10d.FileStore(self.file_name, self.world_size)
280            # This is required because these functions calls directly to the .dist and needs
281            # the world to be initialized
282            c10d.init_process_group(
283                store=store, rank=self.rank, world_size=self.world_size, backend="gloo"
284            )
285            device = torch.device(f"cuda:{self.rank}")
286            x = torch.ones(5, 5, device=device) + self.rank
287            x.requires_grad = True
288            tensors = torch.distributed.nn.gather(x, 1)
289            if self.rank == 1:
290                for i, t in enumerate(tensors):
291                    self.assertEqual(t, torch.ones(5, 5, device=device) + i)
292            elif self.rank == 0:
293                for i, t in enumerate(tensors):
294                    zeros = torch.zeros(5, 5, device=device)
295                    self.assertEqual(t, zeros)
296            y = torch.sum(torch.stack(tensors), axis=0)
297            z = y.sin().sum()
298            z.backward()
299
300            # Test gradient
301            x_s = 3 * torch.ones(5, 5, device=device)
302            self.assertEqual(x.grad, x_s.cos())
303
304        @requires_gloo()
305        @skip_if_lt_x_gpu(2)
306        @skip_but_pass_in_sandcastle_if(
307            not _torch_dist_nn_available, "torch.distributed.nn is not available"
308        )
309        def test_scatter(self):
310            store = c10d.FileStore(self.file_name, self.world_size)
311            # This is required because these functions calls directly to the .dist and needs
312            # the world to be initialized
313            c10d.init_process_group(
314                store=store, rank=self.rank, world_size=self.world_size, backend="gloo"
315            )
316            device = torch.device(f"cuda:{self.rank}")
317            x0 = torch.ones(5, 5, device=device)
318            x1 = torch.ones(5, 5, device=device) + 1
319            x0.requires_grad = True
320            x1.requires_grad = True
321
322            y = torch.distributed.nn.scatter([x0, x1], 1)
323            if self.rank == 1:
324                self.assertEqual(y, 1 + torch.ones(5, 5, device=device))
325            elif self.rank == 0:
326                self.assertEqual(y, torch.ones(5, 5, device=device))
327            z = y.sin().sum()
328            z.backward()
329
330            # Test gradient
331            if self.rank == 1:
332                x0_s = torch.ones(5, 5, device=device).cos()
333                x1_s = (2 * torch.ones(5, 5, device=device)).cos()
334                self.assertEqual(x0.grad, x0_s)
335                self.assertEqual(x1.grad, x1_s)
336            if self.rank == 0:
337                self.assertEqual(x0.grad, torch.zeros(5, 5, device=device))
338
339
340if __name__ == "__main__":
341    run_tests()
342