1# Owner(s): ["oncall: profiler"] 2 3import os 4import shutil 5import subprocess 6from unittest import skipIf 7 8import torch 9import torch.utils.cpp_extension 10from torch.testing._internal.common_utils import IS_WINDOWS, run_tests, TestCase 11 12 13def remove_build_path(): 14 default_build_root = torch.utils.cpp_extension.get_default_build_root() 15 if os.path.exists(default_build_root): 16 if IS_WINDOWS: 17 # rmtree returns permission error: [WinError 5] Access is denied 18 # on Windows, this is a word-around 19 subprocess.run(["rm", "-rf", default_build_root], stdout=subprocess.PIPE) 20 else: 21 shutil.rmtree(default_build_root) 22 23 24def is_fbcode(): 25 return not hasattr(torch.version, "git_version") 26 27 28if is_fbcode(): 29 import caffe2.test.profiler_test_cpp_thread_lib as cpp 30else: 31 # cpp extensions use relative paths. Those paths are relative to 32 # this file, so we'll change the working directory temporarily 33 old_working_dir = os.getcwd() 34 os.chdir(os.path.dirname(os.path.abspath(__file__))) 35 36 cpp = torch.utils.cpp_extension.load( 37 name="profiler_test_cpp_thread_lib", 38 sources=[ 39 "test_cpp_thread.cpp", 40 ], 41 verbose=True, 42 ) 43 44 # return the working directory (see setUp) 45 os.chdir(old_working_dir) 46 47 48KinetoProfiler = None 49IterationCount = 5 50ActivateIteration = 2 51 52 53def blueprint(text): 54 print(f"\33[34m{text}\33[0m") 55 56 57# onIterationStart() will be called by C++ training engine in cpp_thread_test_lib.cpp 58class PythonProfilerEventHandler(cpp.ProfilerEventHandler): 59 def onIterationStart(self, iteration: int) -> None: 60 global KinetoProfiler, IterationCount 61 # it is important to start the profiler on the same thread that step() is called 62 # and yes, onIterationStart() will always be called on the same thread 63 if iteration == 0: 64 # this also means step() starts on iteration 1, not 0 65 KinetoProfiler.start() 66 blueprint("starting kineto profiler") 67 elif iteration == IterationCount - 1: 68 KinetoProfiler.stop() 69 blueprint("stopping kineto profiler") 70 else: 71 blueprint("stepping kineto profiler") 72 KinetoProfiler.step() 73 74 def emulateTraining(self, iteration: int, thread_id: int) -> None: 75 # blueprint(f"training iteration {iteration} in thread {thread_id}") 76 device = torch.device("cuda") 77 # device = torch.device("cpu") 78 with torch.autograd.profiler.record_function("user_function"): 79 a = torch.ones(1, device=device) 80 b = torch.ones(1, device=device) 81 torch.add(a, b).cpu() 82 torch.cuda.synchronize() 83 84 85class CppThreadTest(TestCase): 86 ThreadCount = 20 # set to 2 for debugging 87 EventHandler = None 88 TraceObject = None 89 90 @classmethod 91 def setUpClass(cls) -> None: 92 super(TestCase, cls).setUpClass() 93 CppThreadTest.EventHandler = PythonProfilerEventHandler() 94 cpp.ProfilerEventHandler.Register(CppThreadTest.EventHandler) 95 96 @classmethod 97 def tearDownClass(cls): 98 if not is_fbcode(): 99 remove_build_path() 100 101 def setUp(self) -> None: 102 if not torch.cuda.is_available(): 103 self.skipTest("Test machine does not have cuda") 104 105 # this clears off events from initialization 106 self.start_profiler(False) 107 cpp.start_threads(1, IterationCount, False) 108 109 def start_profiler(self, profile_memory): 110 global KinetoProfiler 111 KinetoProfiler = torch.profiler.profile( 112 schedule=torch.profiler.schedule( 113 wait=1, warmup=1, active=ActivateIteration, repeat=1 114 ), 115 on_trace_ready=self.set_trace, 116 with_stack=True, 117 profile_memory=profile_memory, 118 record_shapes=True, 119 ) 120 121 def set_trace(self, trace_obj) -> None: 122 CppThreadTest.TraceObject = trace_obj 123 124 def assert_text(self, condition, text, msg): 125 if condition: 126 print(f"\33[32m{text}\33[0m") 127 else: 128 print(f"\33[31m{text}\33[0m") 129 self.assertTrue(condition, msg) 130 131 def check_trace(self, expected, mem=False) -> None: 132 blueprint("verifying trace") 133 event_list = CppThreadTest.TraceObject.events() 134 for key, values in expected.items(): 135 count = values[0] 136 min_count = count * (ActivateIteration - 1) 137 device = values[1] 138 filtered = filter( 139 lambda ev: ev.name == key 140 and str(ev.device_type) == f"DeviceType.{device}", 141 event_list, 142 ) 143 144 if mem: 145 actual = 0 146 for ev in filtered: 147 sev = str(ev) 148 has_cuda_memory_usage = ( 149 sev.find("cuda_memory_usage=0 ") < 0 150 and sev.find("cuda_memory_usage=") > 0 151 ) 152 if has_cuda_memory_usage: 153 actual += 1 154 self.assert_text( 155 actual >= min_count, 156 f"{key}: {actual} >= {min_count}", 157 "not enough event with cuda_memory_usage set", 158 ) 159 else: 160 actual = len(list(filtered)) 161 if count == 1: # test_without 162 count *= ActivateIteration 163 self.assert_text( 164 actual == count, 165 f"{key}: {actual} == {count}", 166 "baseline event count incorrect", 167 ) 168 else: 169 self.assert_text( 170 actual >= min_count, 171 f"{key}: {actual} >= {min_count}", 172 "not enough event recorded", 173 ) 174 175 @skipIf( 176 IS_WINDOWS, 177 "Failing on windows cuda, see https://github.com/pytorch/pytorch/pull/130037 for slightly more context", 178 ) 179 def test_with_enable_profiler_in_child_thread(self) -> None: 180 self.start_profiler(False) 181 cpp.start_threads(self.ThreadCount, IterationCount, True) 182 self.check_trace( 183 { 184 "aten::add": [self.ThreadCount, "CPU"], 185 "user_function": [self.ThreadCount, "CUDA"], 186 } 187 ) 188 189 @skipIf( 190 IS_WINDOWS, 191 "Failing on windows cuda, see https://github.com/pytorch/pytorch/pull/130037 for slightly more context", 192 ) 193 def test_without_enable_profiler_in_child_thread(self) -> None: 194 self.start_profiler(False) 195 cpp.start_threads(self.ThreadCount, IterationCount, False) 196 self.check_trace( 197 { 198 "aten::add": [1, "CPU"], 199 "user_function": [1, "CUDA"], 200 } 201 ) 202 203 @skipIf( 204 IS_WINDOWS, 205 "Failing on windows cuda, see https://github.com/pytorch/pytorch/pull/130037 for slightly more context", 206 ) 207 def test_profile_memory(self) -> None: 208 self.start_profiler(True) 209 cpp.start_threads(self.ThreadCount, IterationCount, True) 210 self.check_trace( 211 { 212 "aten::add": [self.ThreadCount, "CPU"], 213 }, 214 mem=True, 215 ) 216 217 218if __name__ == "__main__": 219 run_tests() 220