xref: /aosp_15_r20/external/pytorch/test/profiler/test_cpp_thread.py (revision da0073e96a02ea20f0ac840b70461e3646d07c45)
1# Owner(s): ["oncall: profiler"]
2
3import os
4import shutil
5import subprocess
6from unittest import skipIf
7
8import torch
9import torch.utils.cpp_extension
10from torch.testing._internal.common_utils import IS_WINDOWS, run_tests, TestCase
11
12
13def remove_build_path():
14    default_build_root = torch.utils.cpp_extension.get_default_build_root()
15    if os.path.exists(default_build_root):
16        if IS_WINDOWS:
17            # rmtree returns permission error: [WinError 5] Access is denied
18            # on Windows, this is a word-around
19            subprocess.run(["rm", "-rf", default_build_root], stdout=subprocess.PIPE)
20        else:
21            shutil.rmtree(default_build_root)
22
23
24def is_fbcode():
25    return not hasattr(torch.version, "git_version")
26
27
28if is_fbcode():
29    import caffe2.test.profiler_test_cpp_thread_lib as cpp
30else:
31    # cpp extensions use relative paths. Those paths are relative to
32    # this file, so we'll change the working directory temporarily
33    old_working_dir = os.getcwd()
34    os.chdir(os.path.dirname(os.path.abspath(__file__)))
35
36    cpp = torch.utils.cpp_extension.load(
37        name="profiler_test_cpp_thread_lib",
38        sources=[
39            "test_cpp_thread.cpp",
40        ],
41        verbose=True,
42    )
43
44    # return the working directory (see setUp)
45    os.chdir(old_working_dir)
46
47
48KinetoProfiler = None
49IterationCount = 5
50ActivateIteration = 2
51
52
53def blueprint(text):
54    print(f"\33[34m{text}\33[0m")
55
56
57# onIterationStart() will be called by C++ training engine in cpp_thread_test_lib.cpp
58class PythonProfilerEventHandler(cpp.ProfilerEventHandler):
59    def onIterationStart(self, iteration: int) -> None:
60        global KinetoProfiler, IterationCount
61        # it is important to start the profiler on the same thread that step() is called
62        # and yes, onIterationStart() will always be called on the same thread
63        if iteration == 0:
64            # this also means step() starts on iteration 1, not 0
65            KinetoProfiler.start()
66            blueprint("starting kineto profiler")
67        elif iteration == IterationCount - 1:
68            KinetoProfiler.stop()
69            blueprint("stopping kineto profiler")
70        else:
71            blueprint("stepping kineto profiler")
72            KinetoProfiler.step()
73
74    def emulateTraining(self, iteration: int, thread_id: int) -> None:
75        # blueprint(f"training iteration {iteration} in thread {thread_id}")
76        device = torch.device("cuda")
77        # device = torch.device("cpu")
78        with torch.autograd.profiler.record_function("user_function"):
79            a = torch.ones(1, device=device)
80            b = torch.ones(1, device=device)
81            torch.add(a, b).cpu()
82            torch.cuda.synchronize()
83
84
85class CppThreadTest(TestCase):
86    ThreadCount = 20  # set to 2 for debugging
87    EventHandler = None
88    TraceObject = None
89
90    @classmethod
91    def setUpClass(cls) -> None:
92        super(TestCase, cls).setUpClass()
93        CppThreadTest.EventHandler = PythonProfilerEventHandler()
94        cpp.ProfilerEventHandler.Register(CppThreadTest.EventHandler)
95
96    @classmethod
97    def tearDownClass(cls):
98        if not is_fbcode():
99            remove_build_path()
100
101    def setUp(self) -> None:
102        if not torch.cuda.is_available():
103            self.skipTest("Test machine does not have cuda")
104
105        # this clears off events from initialization
106        self.start_profiler(False)
107        cpp.start_threads(1, IterationCount, False)
108
109    def start_profiler(self, profile_memory):
110        global KinetoProfiler
111        KinetoProfiler = torch.profiler.profile(
112            schedule=torch.profiler.schedule(
113                wait=1, warmup=1, active=ActivateIteration, repeat=1
114            ),
115            on_trace_ready=self.set_trace,
116            with_stack=True,
117            profile_memory=profile_memory,
118            record_shapes=True,
119        )
120
121    def set_trace(self, trace_obj) -> None:
122        CppThreadTest.TraceObject = trace_obj
123
124    def assert_text(self, condition, text, msg):
125        if condition:
126            print(f"\33[32m{text}\33[0m")
127        else:
128            print(f"\33[31m{text}\33[0m")
129        self.assertTrue(condition, msg)
130
131    def check_trace(self, expected, mem=False) -> None:
132        blueprint("verifying trace")
133        event_list = CppThreadTest.TraceObject.events()
134        for key, values in expected.items():
135            count = values[0]
136            min_count = count * (ActivateIteration - 1)
137            device = values[1]
138            filtered = filter(
139                lambda ev: ev.name == key
140                and str(ev.device_type) == f"DeviceType.{device}",
141                event_list,
142            )
143
144            if mem:
145                actual = 0
146                for ev in filtered:
147                    sev = str(ev)
148                    has_cuda_memory_usage = (
149                        sev.find("cuda_memory_usage=0 ") < 0
150                        and sev.find("cuda_memory_usage=") > 0
151                    )
152                    if has_cuda_memory_usage:
153                        actual += 1
154                self.assert_text(
155                    actual >= min_count,
156                    f"{key}: {actual} >= {min_count}",
157                    "not enough event with cuda_memory_usage set",
158                )
159            else:
160                actual = len(list(filtered))
161                if count == 1:  # test_without
162                    count *= ActivateIteration
163                    self.assert_text(
164                        actual == count,
165                        f"{key}: {actual} == {count}",
166                        "baseline event count incorrect",
167                    )
168                else:
169                    self.assert_text(
170                        actual >= min_count,
171                        f"{key}: {actual} >= {min_count}",
172                        "not enough event recorded",
173                    )
174
175    @skipIf(
176        IS_WINDOWS,
177        "Failing on windows cuda, see https://github.com/pytorch/pytorch/pull/130037 for slightly more context",
178    )
179    def test_with_enable_profiler_in_child_thread(self) -> None:
180        self.start_profiler(False)
181        cpp.start_threads(self.ThreadCount, IterationCount, True)
182        self.check_trace(
183            {
184                "aten::add": [self.ThreadCount, "CPU"],
185                "user_function": [self.ThreadCount, "CUDA"],
186            }
187        )
188
189    @skipIf(
190        IS_WINDOWS,
191        "Failing on windows cuda, see https://github.com/pytorch/pytorch/pull/130037 for slightly more context",
192    )
193    def test_without_enable_profiler_in_child_thread(self) -> None:
194        self.start_profiler(False)
195        cpp.start_threads(self.ThreadCount, IterationCount, False)
196        self.check_trace(
197            {
198                "aten::add": [1, "CPU"],
199                "user_function": [1, "CUDA"],
200            }
201        )
202
203    @skipIf(
204        IS_WINDOWS,
205        "Failing on windows cuda, see https://github.com/pytorch/pytorch/pull/130037 for slightly more context",
206    )
207    def test_profile_memory(self) -> None:
208        self.start_profiler(True)
209        cpp.start_threads(self.ThreadCount, IterationCount, True)
210        self.check_trace(
211            {
212                "aten::add": [self.ThreadCount, "CPU"],
213            },
214            mem=True,
215        )
216
217
218if __name__ == "__main__":
219    run_tests()
220