1import contextlib 2import os 3import sys 4import tracemalloc 5import unittest 6from unittest.mock import patch 7from test.support.script_helper import (assert_python_ok, assert_python_failure, 8 interpreter_requires_environment) 9from test import support 10from test.support import os_helper 11 12try: 13 import _testcapi 14except ImportError: 15 _testcapi = None 16 17 18EMPTY_STRING_SIZE = sys.getsizeof(b'') 19INVALID_NFRAME = (-1, 2**30) 20 21 22def get_frames(nframe, lineno_delta): 23 frames = [] 24 frame = sys._getframe(1) 25 for index in range(nframe): 26 code = frame.f_code 27 lineno = frame.f_lineno + lineno_delta 28 frames.append((code.co_filename, lineno)) 29 lineno_delta = 0 30 frame = frame.f_back 31 if frame is None: 32 break 33 return tuple(frames) 34 35def allocate_bytes(size): 36 nframe = tracemalloc.get_traceback_limit() 37 bytes_len = (size - EMPTY_STRING_SIZE) 38 frames = get_frames(nframe, 1) 39 data = b'x' * bytes_len 40 return data, tracemalloc.Traceback(frames, min(len(frames), nframe)) 41 42def create_snapshots(): 43 traceback_limit = 2 44 45 # _tracemalloc._get_traces() returns a list of (domain, size, 46 # traceback_frames) tuples. traceback_frames is a tuple of (filename, 47 # line_number) tuples. 48 raw_traces = [ 49 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 50 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 51 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 52 53 (1, 2, (('a.py', 5), ('b.py', 4)), 3), 54 55 (2, 66, (('b.py', 1),), 1), 56 57 (3, 7, (('<unknown>', 0),), 1), 58 ] 59 snapshot = tracemalloc.Snapshot(raw_traces, traceback_limit) 60 61 raw_traces2 = [ 62 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 63 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 64 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 65 66 (2, 2, (('a.py', 5), ('b.py', 4)), 3), 67 (2, 5000, (('a.py', 5), ('b.py', 4)), 3), 68 69 (4, 400, (('c.py', 578),), 1), 70 ] 71 snapshot2 = tracemalloc.Snapshot(raw_traces2, traceback_limit) 72 73 return (snapshot, snapshot2) 74 75def frame(filename, lineno): 76 return tracemalloc._Frame((filename, lineno)) 77 78def traceback(*frames): 79 return tracemalloc.Traceback(frames) 80 81def traceback_lineno(filename, lineno): 82 return traceback((filename, lineno)) 83 84def traceback_filename(filename): 85 return traceback_lineno(filename, 0) 86 87 88class TestTraceback(unittest.TestCase): 89 def test_repr(self): 90 def get_repr(*args) -> str: 91 return repr(tracemalloc.Traceback(*args)) 92 93 self.assertEqual(get_repr(()), "<Traceback ()>") 94 self.assertEqual(get_repr((), 0), "<Traceback () total_nframe=0>") 95 96 frames = (("f1", 1), ("f2", 2)) 97 exp_repr_frames = ( 98 "(<Frame filename='f2' lineno=2>," 99 " <Frame filename='f1' lineno=1>)" 100 ) 101 self.assertEqual(get_repr(frames), 102 f"<Traceback {exp_repr_frames}>") 103 self.assertEqual(get_repr(frames, 2), 104 f"<Traceback {exp_repr_frames} total_nframe=2>") 105 106 107class TestTracemallocEnabled(unittest.TestCase): 108 def setUp(self): 109 if tracemalloc.is_tracing(): 110 self.skipTest("tracemalloc must be stopped before the test") 111 112 tracemalloc.start(1) 113 114 def tearDown(self): 115 tracemalloc.stop() 116 117 def test_get_tracemalloc_memory(self): 118 data = [allocate_bytes(123) for count in range(1000)] 119 size = tracemalloc.get_tracemalloc_memory() 120 self.assertGreaterEqual(size, 0) 121 122 tracemalloc.clear_traces() 123 size2 = tracemalloc.get_tracemalloc_memory() 124 self.assertGreaterEqual(size2, 0) 125 self.assertLessEqual(size2, size) 126 127 def test_get_object_traceback(self): 128 tracemalloc.clear_traces() 129 obj_size = 12345 130 obj, obj_traceback = allocate_bytes(obj_size) 131 traceback = tracemalloc.get_object_traceback(obj) 132 self.assertEqual(traceback, obj_traceback) 133 134 def test_new_reference(self): 135 tracemalloc.clear_traces() 136 # gc.collect() indirectly calls PyList_ClearFreeList() 137 support.gc_collect() 138 139 # Create a list and "destroy it": put it in the PyListObject free list 140 obj = [] 141 obj = None 142 143 # Create a list which should reuse the previously created empty list 144 obj = [] 145 146 nframe = tracemalloc.get_traceback_limit() 147 frames = get_frames(nframe, -3) 148 obj_traceback = tracemalloc.Traceback(frames, min(len(frames), nframe)) 149 150 traceback = tracemalloc.get_object_traceback(obj) 151 self.assertIsNotNone(traceback) 152 self.assertEqual(traceback, obj_traceback) 153 154 def test_set_traceback_limit(self): 155 obj_size = 10 156 157 tracemalloc.stop() 158 self.assertRaises(ValueError, tracemalloc.start, -1) 159 160 tracemalloc.stop() 161 tracemalloc.start(10) 162 obj2, obj2_traceback = allocate_bytes(obj_size) 163 traceback = tracemalloc.get_object_traceback(obj2) 164 self.assertEqual(len(traceback), 10) 165 self.assertEqual(traceback, obj2_traceback) 166 167 tracemalloc.stop() 168 tracemalloc.start(1) 169 obj, obj_traceback = allocate_bytes(obj_size) 170 traceback = tracemalloc.get_object_traceback(obj) 171 self.assertEqual(len(traceback), 1) 172 self.assertEqual(traceback, obj_traceback) 173 174 def find_trace(self, traces, traceback): 175 for trace in traces: 176 if trace[2] == traceback._frames: 177 return trace 178 179 self.fail("trace not found") 180 181 def test_get_traces(self): 182 tracemalloc.clear_traces() 183 obj_size = 12345 184 obj, obj_traceback = allocate_bytes(obj_size) 185 186 traces = tracemalloc._get_traces() 187 trace = self.find_trace(traces, obj_traceback) 188 189 self.assertIsInstance(trace, tuple) 190 domain, size, traceback, length = trace 191 self.assertEqual(size, obj_size) 192 self.assertEqual(traceback, obj_traceback._frames) 193 194 tracemalloc.stop() 195 self.assertEqual(tracemalloc._get_traces(), []) 196 197 def test_get_traces_intern_traceback(self): 198 # dummy wrappers to get more useful and identical frames in the traceback 199 def allocate_bytes2(size): 200 return allocate_bytes(size) 201 def allocate_bytes3(size): 202 return allocate_bytes2(size) 203 def allocate_bytes4(size): 204 return allocate_bytes3(size) 205 206 # Ensure that two identical tracebacks are not duplicated 207 tracemalloc.stop() 208 tracemalloc.start(4) 209 obj_size = 123 210 obj1, obj1_traceback = allocate_bytes4(obj_size) 211 obj2, obj2_traceback = allocate_bytes4(obj_size) 212 213 traces = tracemalloc._get_traces() 214 215 obj1_traceback._frames = tuple(reversed(obj1_traceback._frames)) 216 obj2_traceback._frames = tuple(reversed(obj2_traceback._frames)) 217 218 trace1 = self.find_trace(traces, obj1_traceback) 219 trace2 = self.find_trace(traces, obj2_traceback) 220 domain1, size1, traceback1, length1 = trace1 221 domain2, size2, traceback2, length2 = trace2 222 self.assertIs(traceback2, traceback1) 223 224 def test_get_traced_memory(self): 225 # Python allocates some internals objects, so the test must tolerate 226 # a small difference between the expected size and the real usage 227 max_error = 2048 228 229 # allocate one object 230 obj_size = 1024 * 1024 231 tracemalloc.clear_traces() 232 obj, obj_traceback = allocate_bytes(obj_size) 233 size, peak_size = tracemalloc.get_traced_memory() 234 self.assertGreaterEqual(size, obj_size) 235 self.assertGreaterEqual(peak_size, size) 236 237 self.assertLessEqual(size - obj_size, max_error) 238 self.assertLessEqual(peak_size - size, max_error) 239 240 # destroy the object 241 obj = None 242 size2, peak_size2 = tracemalloc.get_traced_memory() 243 self.assertLess(size2, size) 244 self.assertGreaterEqual(size - size2, obj_size - max_error) 245 self.assertGreaterEqual(peak_size2, peak_size) 246 247 # clear_traces() must reset traced memory counters 248 tracemalloc.clear_traces() 249 self.assertEqual(tracemalloc.get_traced_memory(), (0, 0)) 250 251 # allocate another object 252 obj, obj_traceback = allocate_bytes(obj_size) 253 size, peak_size = tracemalloc.get_traced_memory() 254 self.assertGreaterEqual(size, obj_size) 255 256 # stop() also resets traced memory counters 257 tracemalloc.stop() 258 self.assertEqual(tracemalloc.get_traced_memory(), (0, 0)) 259 260 def test_clear_traces(self): 261 obj, obj_traceback = allocate_bytes(123) 262 traceback = tracemalloc.get_object_traceback(obj) 263 self.assertIsNotNone(traceback) 264 265 tracemalloc.clear_traces() 266 traceback2 = tracemalloc.get_object_traceback(obj) 267 self.assertIsNone(traceback2) 268 269 def test_reset_peak(self): 270 # Python allocates some internals objects, so the test must tolerate 271 # a small difference between the expected size and the real usage 272 tracemalloc.clear_traces() 273 274 # Example: allocate a large piece of memory, temporarily 275 large_sum = sum(list(range(100000))) 276 size1, peak1 = tracemalloc.get_traced_memory() 277 278 # reset_peak() resets peak to traced memory: peak2 < peak1 279 tracemalloc.reset_peak() 280 size2, peak2 = tracemalloc.get_traced_memory() 281 self.assertGreaterEqual(peak2, size2) 282 self.assertLess(peak2, peak1) 283 284 # check that peak continue to be updated if new memory is allocated: 285 # peak3 > peak2 286 obj_size = 1024 * 1024 287 obj, obj_traceback = allocate_bytes(obj_size) 288 size3, peak3 = tracemalloc.get_traced_memory() 289 self.assertGreaterEqual(peak3, size3) 290 self.assertGreater(peak3, peak2) 291 self.assertGreaterEqual(peak3 - peak2, obj_size) 292 293 def test_is_tracing(self): 294 tracemalloc.stop() 295 self.assertFalse(tracemalloc.is_tracing()) 296 297 tracemalloc.start() 298 self.assertTrue(tracemalloc.is_tracing()) 299 300 def test_snapshot(self): 301 obj, source = allocate_bytes(123) 302 303 # take a snapshot 304 snapshot = tracemalloc.take_snapshot() 305 306 # This can vary 307 self.assertGreater(snapshot.traces[1].traceback.total_nframe, 10) 308 309 # write on disk 310 snapshot.dump(os_helper.TESTFN) 311 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 312 313 # load from disk 314 snapshot2 = tracemalloc.Snapshot.load(os_helper.TESTFN) 315 self.assertEqual(snapshot2.traces, snapshot.traces) 316 317 # tracemalloc must be tracing memory allocations to take a snapshot 318 tracemalloc.stop() 319 with self.assertRaises(RuntimeError) as cm: 320 tracemalloc.take_snapshot() 321 self.assertEqual(str(cm.exception), 322 "the tracemalloc module must be tracing memory " 323 "allocations to take a snapshot") 324 325 def test_snapshot_save_attr(self): 326 # take a snapshot with a new attribute 327 snapshot = tracemalloc.take_snapshot() 328 snapshot.test_attr = "new" 329 snapshot.dump(os_helper.TESTFN) 330 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 331 332 # load() should recreate the attribute 333 snapshot2 = tracemalloc.Snapshot.load(os_helper.TESTFN) 334 self.assertEqual(snapshot2.test_attr, "new") 335 336 def fork_child(self): 337 if not tracemalloc.is_tracing(): 338 return 2 339 340 obj_size = 12345 341 obj, obj_traceback = allocate_bytes(obj_size) 342 traceback = tracemalloc.get_object_traceback(obj) 343 if traceback is None: 344 return 3 345 346 # everything is fine 347 return 0 348 349 @support.requires_fork() 350 def test_fork(self): 351 # check that tracemalloc is still working after fork 352 pid = os.fork() 353 if not pid: 354 # child 355 exitcode = 1 356 try: 357 exitcode = self.fork_child() 358 finally: 359 os._exit(exitcode) 360 else: 361 support.wait_process(pid, exitcode=0) 362 363 def test_no_incomplete_frames(self): 364 tracemalloc.stop() 365 tracemalloc.start(8) 366 367 def f(x): 368 def g(): 369 return x 370 return g 371 372 obj = f(0).__closure__[0] 373 traceback = tracemalloc.get_object_traceback(obj) 374 self.assertIn("test_tracemalloc", traceback[-1].filename) 375 self.assertNotIn("test_tracemalloc", traceback[-2].filename) 376 377 378class TestSnapshot(unittest.TestCase): 379 maxDiff = 4000 380 381 def test_create_snapshot(self): 382 raw_traces = [(0, 5, (('a.py', 2),), 10)] 383 384 with contextlib.ExitStack() as stack: 385 stack.enter_context(patch.object(tracemalloc, 'is_tracing', 386 return_value=True)) 387 stack.enter_context(patch.object(tracemalloc, 'get_traceback_limit', 388 return_value=5)) 389 stack.enter_context(patch.object(tracemalloc, '_get_traces', 390 return_value=raw_traces)) 391 392 snapshot = tracemalloc.take_snapshot() 393 self.assertEqual(snapshot.traceback_limit, 5) 394 self.assertEqual(len(snapshot.traces), 1) 395 trace = snapshot.traces[0] 396 self.assertEqual(trace.size, 5) 397 self.assertEqual(trace.traceback.total_nframe, 10) 398 self.assertEqual(len(trace.traceback), 1) 399 self.assertEqual(trace.traceback[0].filename, 'a.py') 400 self.assertEqual(trace.traceback[0].lineno, 2) 401 402 def test_filter_traces(self): 403 snapshot, snapshot2 = create_snapshots() 404 filter1 = tracemalloc.Filter(False, "b.py") 405 filter2 = tracemalloc.Filter(True, "a.py", 2) 406 filter3 = tracemalloc.Filter(True, "a.py", 5) 407 408 original_traces = list(snapshot.traces._traces) 409 410 # exclude b.py 411 snapshot3 = snapshot.filter_traces((filter1,)) 412 self.assertEqual(snapshot3.traces._traces, [ 413 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 414 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 415 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 416 (1, 2, (('a.py', 5), ('b.py', 4)), 3), 417 (3, 7, (('<unknown>', 0),), 1), 418 ]) 419 420 # filter_traces() must not touch the original snapshot 421 self.assertEqual(snapshot.traces._traces, original_traces) 422 423 # only include two lines of a.py 424 snapshot4 = snapshot3.filter_traces((filter2, filter3)) 425 self.assertEqual(snapshot4.traces._traces, [ 426 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 427 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 428 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 429 (1, 2, (('a.py', 5), ('b.py', 4)), 3), 430 ]) 431 432 # No filter: just duplicate the snapshot 433 snapshot5 = snapshot.filter_traces(()) 434 self.assertIsNot(snapshot5, snapshot) 435 self.assertIsNot(snapshot5.traces, snapshot.traces) 436 self.assertEqual(snapshot5.traces, snapshot.traces) 437 438 self.assertRaises(TypeError, snapshot.filter_traces, filter1) 439 440 def test_filter_traces_domain(self): 441 snapshot, snapshot2 = create_snapshots() 442 filter1 = tracemalloc.Filter(False, "a.py", domain=1) 443 filter2 = tracemalloc.Filter(True, "a.py", domain=1) 444 445 original_traces = list(snapshot.traces._traces) 446 447 # exclude a.py of domain 1 448 snapshot3 = snapshot.filter_traces((filter1,)) 449 self.assertEqual(snapshot3.traces._traces, [ 450 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 451 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 452 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 453 (2, 66, (('b.py', 1),), 1), 454 (3, 7, (('<unknown>', 0),), 1), 455 ]) 456 457 # include domain 1 458 snapshot3 = snapshot.filter_traces((filter1,)) 459 self.assertEqual(snapshot3.traces._traces, [ 460 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 461 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 462 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 463 (2, 66, (('b.py', 1),), 1), 464 (3, 7, (('<unknown>', 0),), 1), 465 ]) 466 467 def test_filter_traces_domain_filter(self): 468 snapshot, snapshot2 = create_snapshots() 469 filter1 = tracemalloc.DomainFilter(False, domain=3) 470 filter2 = tracemalloc.DomainFilter(True, domain=3) 471 472 # exclude domain 2 473 snapshot3 = snapshot.filter_traces((filter1,)) 474 self.assertEqual(snapshot3.traces._traces, [ 475 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 476 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 477 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 478 (1, 2, (('a.py', 5), ('b.py', 4)), 3), 479 (2, 66, (('b.py', 1),), 1), 480 ]) 481 482 # include domain 2 483 snapshot3 = snapshot.filter_traces((filter2,)) 484 self.assertEqual(snapshot3.traces._traces, [ 485 (3, 7, (('<unknown>', 0),), 1), 486 ]) 487 488 def test_snapshot_group_by_line(self): 489 snapshot, snapshot2 = create_snapshots() 490 tb_0 = traceback_lineno('<unknown>', 0) 491 tb_a_2 = traceback_lineno('a.py', 2) 492 tb_a_5 = traceback_lineno('a.py', 5) 493 tb_b_1 = traceback_lineno('b.py', 1) 494 tb_c_578 = traceback_lineno('c.py', 578) 495 496 # stats per file and line 497 stats1 = snapshot.statistics('lineno') 498 self.assertEqual(stats1, [ 499 tracemalloc.Statistic(tb_b_1, 66, 1), 500 tracemalloc.Statistic(tb_a_2, 30, 3), 501 tracemalloc.Statistic(tb_0, 7, 1), 502 tracemalloc.Statistic(tb_a_5, 2, 1), 503 ]) 504 505 # stats per file and line (2) 506 stats2 = snapshot2.statistics('lineno') 507 self.assertEqual(stats2, [ 508 tracemalloc.Statistic(tb_a_5, 5002, 2), 509 tracemalloc.Statistic(tb_c_578, 400, 1), 510 tracemalloc.Statistic(tb_a_2, 30, 3), 511 ]) 512 513 # stats diff per file and line 514 statistics = snapshot2.compare_to(snapshot, 'lineno') 515 self.assertEqual(statistics, [ 516 tracemalloc.StatisticDiff(tb_a_5, 5002, 5000, 2, 1), 517 tracemalloc.StatisticDiff(tb_c_578, 400, 400, 1, 1), 518 tracemalloc.StatisticDiff(tb_b_1, 0, -66, 0, -1), 519 tracemalloc.StatisticDiff(tb_0, 0, -7, 0, -1), 520 tracemalloc.StatisticDiff(tb_a_2, 30, 0, 3, 0), 521 ]) 522 523 def test_snapshot_group_by_file(self): 524 snapshot, snapshot2 = create_snapshots() 525 tb_0 = traceback_filename('<unknown>') 526 tb_a = traceback_filename('a.py') 527 tb_b = traceback_filename('b.py') 528 tb_c = traceback_filename('c.py') 529 530 # stats per file 531 stats1 = snapshot.statistics('filename') 532 self.assertEqual(stats1, [ 533 tracemalloc.Statistic(tb_b, 66, 1), 534 tracemalloc.Statistic(tb_a, 32, 4), 535 tracemalloc.Statistic(tb_0, 7, 1), 536 ]) 537 538 # stats per file (2) 539 stats2 = snapshot2.statistics('filename') 540 self.assertEqual(stats2, [ 541 tracemalloc.Statistic(tb_a, 5032, 5), 542 tracemalloc.Statistic(tb_c, 400, 1), 543 ]) 544 545 # stats diff per file 546 diff = snapshot2.compare_to(snapshot, 'filename') 547 self.assertEqual(diff, [ 548 tracemalloc.StatisticDiff(tb_a, 5032, 5000, 5, 1), 549 tracemalloc.StatisticDiff(tb_c, 400, 400, 1, 1), 550 tracemalloc.StatisticDiff(tb_b, 0, -66, 0, -1), 551 tracemalloc.StatisticDiff(tb_0, 0, -7, 0, -1), 552 ]) 553 554 def test_snapshot_group_by_traceback(self): 555 snapshot, snapshot2 = create_snapshots() 556 557 # stats per file 558 tb1 = traceback(('a.py', 2), ('b.py', 4)) 559 tb2 = traceback(('a.py', 5), ('b.py', 4)) 560 tb3 = traceback(('b.py', 1)) 561 tb4 = traceback(('<unknown>', 0)) 562 stats1 = snapshot.statistics('traceback') 563 self.assertEqual(stats1, [ 564 tracemalloc.Statistic(tb3, 66, 1), 565 tracemalloc.Statistic(tb1, 30, 3), 566 tracemalloc.Statistic(tb4, 7, 1), 567 tracemalloc.Statistic(tb2, 2, 1), 568 ]) 569 570 # stats per file (2) 571 tb5 = traceback(('c.py', 578)) 572 stats2 = snapshot2.statistics('traceback') 573 self.assertEqual(stats2, [ 574 tracemalloc.Statistic(tb2, 5002, 2), 575 tracemalloc.Statistic(tb5, 400, 1), 576 tracemalloc.Statistic(tb1, 30, 3), 577 ]) 578 579 # stats diff per file 580 diff = snapshot2.compare_to(snapshot, 'traceback') 581 self.assertEqual(diff, [ 582 tracemalloc.StatisticDiff(tb2, 5002, 5000, 2, 1), 583 tracemalloc.StatisticDiff(tb5, 400, 400, 1, 1), 584 tracemalloc.StatisticDiff(tb3, 0, -66, 0, -1), 585 tracemalloc.StatisticDiff(tb4, 0, -7, 0, -1), 586 tracemalloc.StatisticDiff(tb1, 30, 0, 3, 0), 587 ]) 588 589 self.assertRaises(ValueError, 590 snapshot.statistics, 'traceback', cumulative=True) 591 592 def test_snapshot_group_by_cumulative(self): 593 snapshot, snapshot2 = create_snapshots() 594 tb_0 = traceback_filename('<unknown>') 595 tb_a = traceback_filename('a.py') 596 tb_b = traceback_filename('b.py') 597 tb_a_2 = traceback_lineno('a.py', 2) 598 tb_a_5 = traceback_lineno('a.py', 5) 599 tb_b_1 = traceback_lineno('b.py', 1) 600 tb_b_4 = traceback_lineno('b.py', 4) 601 602 # per file 603 stats = snapshot.statistics('filename', True) 604 self.assertEqual(stats, [ 605 tracemalloc.Statistic(tb_b, 98, 5), 606 tracemalloc.Statistic(tb_a, 32, 4), 607 tracemalloc.Statistic(tb_0, 7, 1), 608 ]) 609 610 # per line 611 stats = snapshot.statistics('lineno', True) 612 self.assertEqual(stats, [ 613 tracemalloc.Statistic(tb_b_1, 66, 1), 614 tracemalloc.Statistic(tb_b_4, 32, 4), 615 tracemalloc.Statistic(tb_a_2, 30, 3), 616 tracemalloc.Statistic(tb_0, 7, 1), 617 tracemalloc.Statistic(tb_a_5, 2, 1), 618 ]) 619 620 def test_trace_format(self): 621 snapshot, snapshot2 = create_snapshots() 622 trace = snapshot.traces[0] 623 self.assertEqual(str(trace), 'b.py:4: 10 B') 624 traceback = trace.traceback 625 self.assertEqual(str(traceback), 'b.py:4') 626 frame = traceback[0] 627 self.assertEqual(str(frame), 'b.py:4') 628 629 def test_statistic_format(self): 630 snapshot, snapshot2 = create_snapshots() 631 stats = snapshot.statistics('lineno') 632 stat = stats[0] 633 self.assertEqual(str(stat), 634 'b.py:1: size=66 B, count=1, average=66 B') 635 636 def test_statistic_diff_format(self): 637 snapshot, snapshot2 = create_snapshots() 638 stats = snapshot2.compare_to(snapshot, 'lineno') 639 stat = stats[0] 640 self.assertEqual(str(stat), 641 'a.py:5: size=5002 B (+5000 B), count=2 (+1), average=2501 B') 642 643 def test_slices(self): 644 snapshot, snapshot2 = create_snapshots() 645 self.assertEqual(snapshot.traces[:2], 646 (snapshot.traces[0], snapshot.traces[1])) 647 648 traceback = snapshot.traces[0].traceback 649 self.assertEqual(traceback[:2], 650 (traceback[0], traceback[1])) 651 652 def test_format_traceback(self): 653 snapshot, snapshot2 = create_snapshots() 654 def getline(filename, lineno): 655 return ' <%s, %s>' % (filename, lineno) 656 with unittest.mock.patch('tracemalloc.linecache.getline', 657 side_effect=getline): 658 tb = snapshot.traces[0].traceback 659 self.assertEqual(tb.format(), 660 [' File "b.py", line 4', 661 ' <b.py, 4>', 662 ' File "a.py", line 2', 663 ' <a.py, 2>']) 664 665 self.assertEqual(tb.format(limit=1), 666 [' File "a.py", line 2', 667 ' <a.py, 2>']) 668 669 self.assertEqual(tb.format(limit=-1), 670 [' File "b.py", line 4', 671 ' <b.py, 4>']) 672 673 self.assertEqual(tb.format(most_recent_first=True), 674 [' File "a.py", line 2', 675 ' <a.py, 2>', 676 ' File "b.py", line 4', 677 ' <b.py, 4>']) 678 679 self.assertEqual(tb.format(limit=1, most_recent_first=True), 680 [' File "a.py", line 2', 681 ' <a.py, 2>']) 682 683 self.assertEqual(tb.format(limit=-1, most_recent_first=True), 684 [' File "b.py", line 4', 685 ' <b.py, 4>']) 686 687 688class TestFilters(unittest.TestCase): 689 maxDiff = 2048 690 691 def test_filter_attributes(self): 692 # test default values 693 f = tracemalloc.Filter(True, "abc") 694 self.assertEqual(f.inclusive, True) 695 self.assertEqual(f.filename_pattern, "abc") 696 self.assertIsNone(f.lineno) 697 self.assertEqual(f.all_frames, False) 698 699 # test custom values 700 f = tracemalloc.Filter(False, "test.py", 123, True) 701 self.assertEqual(f.inclusive, False) 702 self.assertEqual(f.filename_pattern, "test.py") 703 self.assertEqual(f.lineno, 123) 704 self.assertEqual(f.all_frames, True) 705 706 # parameters passed by keyword 707 f = tracemalloc.Filter(inclusive=False, filename_pattern="test.py", lineno=123, all_frames=True) 708 self.assertEqual(f.inclusive, False) 709 self.assertEqual(f.filename_pattern, "test.py") 710 self.assertEqual(f.lineno, 123) 711 self.assertEqual(f.all_frames, True) 712 713 # read-only attribute 714 self.assertRaises(AttributeError, setattr, f, "filename_pattern", "abc") 715 716 def test_filter_match(self): 717 # filter without line number 718 f = tracemalloc.Filter(True, "abc") 719 self.assertTrue(f._match_frame("abc", 0)) 720 self.assertTrue(f._match_frame("abc", 5)) 721 self.assertTrue(f._match_frame("abc", 10)) 722 self.assertFalse(f._match_frame("12356", 0)) 723 self.assertFalse(f._match_frame("12356", 5)) 724 self.assertFalse(f._match_frame("12356", 10)) 725 726 f = tracemalloc.Filter(False, "abc") 727 self.assertFalse(f._match_frame("abc", 0)) 728 self.assertFalse(f._match_frame("abc", 5)) 729 self.assertFalse(f._match_frame("abc", 10)) 730 self.assertTrue(f._match_frame("12356", 0)) 731 self.assertTrue(f._match_frame("12356", 5)) 732 self.assertTrue(f._match_frame("12356", 10)) 733 734 # filter with line number > 0 735 f = tracemalloc.Filter(True, "abc", 5) 736 self.assertFalse(f._match_frame("abc", 0)) 737 self.assertTrue(f._match_frame("abc", 5)) 738 self.assertFalse(f._match_frame("abc", 10)) 739 self.assertFalse(f._match_frame("12356", 0)) 740 self.assertFalse(f._match_frame("12356", 5)) 741 self.assertFalse(f._match_frame("12356", 10)) 742 743 f = tracemalloc.Filter(False, "abc", 5) 744 self.assertTrue(f._match_frame("abc", 0)) 745 self.assertFalse(f._match_frame("abc", 5)) 746 self.assertTrue(f._match_frame("abc", 10)) 747 self.assertTrue(f._match_frame("12356", 0)) 748 self.assertTrue(f._match_frame("12356", 5)) 749 self.assertTrue(f._match_frame("12356", 10)) 750 751 # filter with line number 0 752 f = tracemalloc.Filter(True, "abc", 0) 753 self.assertTrue(f._match_frame("abc", 0)) 754 self.assertFalse(f._match_frame("abc", 5)) 755 self.assertFalse(f._match_frame("abc", 10)) 756 self.assertFalse(f._match_frame("12356", 0)) 757 self.assertFalse(f._match_frame("12356", 5)) 758 self.assertFalse(f._match_frame("12356", 10)) 759 760 f = tracemalloc.Filter(False, "abc", 0) 761 self.assertFalse(f._match_frame("abc", 0)) 762 self.assertTrue(f._match_frame("abc", 5)) 763 self.assertTrue(f._match_frame("abc", 10)) 764 self.assertTrue(f._match_frame("12356", 0)) 765 self.assertTrue(f._match_frame("12356", 5)) 766 self.assertTrue(f._match_frame("12356", 10)) 767 768 def test_filter_match_filename(self): 769 def fnmatch(inclusive, filename, pattern): 770 f = tracemalloc.Filter(inclusive, pattern) 771 return f._match_frame(filename, 0) 772 773 self.assertTrue(fnmatch(True, "abc", "abc")) 774 self.assertFalse(fnmatch(True, "12356", "abc")) 775 self.assertFalse(fnmatch(True, "<unknown>", "abc")) 776 777 self.assertFalse(fnmatch(False, "abc", "abc")) 778 self.assertTrue(fnmatch(False, "12356", "abc")) 779 self.assertTrue(fnmatch(False, "<unknown>", "abc")) 780 781 def test_filter_match_filename_joker(self): 782 def fnmatch(filename, pattern): 783 filter = tracemalloc.Filter(True, pattern) 784 return filter._match_frame(filename, 0) 785 786 # empty string 787 self.assertFalse(fnmatch('abc', '')) 788 self.assertFalse(fnmatch('', 'abc')) 789 self.assertTrue(fnmatch('', '')) 790 self.assertTrue(fnmatch('', '*')) 791 792 # no * 793 self.assertTrue(fnmatch('abc', 'abc')) 794 self.assertFalse(fnmatch('abc', 'abcd')) 795 self.assertFalse(fnmatch('abc', 'def')) 796 797 # a* 798 self.assertTrue(fnmatch('abc', 'a*')) 799 self.assertTrue(fnmatch('abc', 'abc*')) 800 self.assertFalse(fnmatch('abc', 'b*')) 801 self.assertFalse(fnmatch('abc', 'abcd*')) 802 803 # a*b 804 self.assertTrue(fnmatch('abc', 'a*c')) 805 self.assertTrue(fnmatch('abcdcx', 'a*cx')) 806 self.assertFalse(fnmatch('abb', 'a*c')) 807 self.assertFalse(fnmatch('abcdce', 'a*cx')) 808 809 # a*b*c 810 self.assertTrue(fnmatch('abcde', 'a*c*e')) 811 self.assertTrue(fnmatch('abcbdefeg', 'a*bd*eg')) 812 self.assertFalse(fnmatch('abcdd', 'a*c*e')) 813 self.assertFalse(fnmatch('abcbdefef', 'a*bd*eg')) 814 815 # replace .pyc suffix with .py 816 self.assertTrue(fnmatch('a.pyc', 'a.py')) 817 self.assertTrue(fnmatch('a.py', 'a.pyc')) 818 819 if os.name == 'nt': 820 # case insensitive 821 self.assertTrue(fnmatch('aBC', 'ABc')) 822 self.assertTrue(fnmatch('aBcDe', 'Ab*dE')) 823 824 self.assertTrue(fnmatch('a.pyc', 'a.PY')) 825 self.assertTrue(fnmatch('a.py', 'a.PYC')) 826 else: 827 # case sensitive 828 self.assertFalse(fnmatch('aBC', 'ABc')) 829 self.assertFalse(fnmatch('aBcDe', 'Ab*dE')) 830 831 self.assertFalse(fnmatch('a.pyc', 'a.PY')) 832 self.assertFalse(fnmatch('a.py', 'a.PYC')) 833 834 if os.name == 'nt': 835 # normalize alternate separator "/" to the standard separator "\" 836 self.assertTrue(fnmatch(r'a/b', r'a\b')) 837 self.assertTrue(fnmatch(r'a\b', r'a/b')) 838 self.assertTrue(fnmatch(r'a/b\c', r'a\b/c')) 839 self.assertTrue(fnmatch(r'a/b/c', r'a\b\c')) 840 else: 841 # there is no alternate separator 842 self.assertFalse(fnmatch(r'a/b', r'a\b')) 843 self.assertFalse(fnmatch(r'a\b', r'a/b')) 844 self.assertFalse(fnmatch(r'a/b\c', r'a\b/c')) 845 self.assertFalse(fnmatch(r'a/b/c', r'a\b\c')) 846 847 # as of 3.5, .pyo is no longer munged to .py 848 self.assertFalse(fnmatch('a.pyo', 'a.py')) 849 850 def test_filter_match_trace(self): 851 t1 = (("a.py", 2), ("b.py", 3)) 852 t2 = (("b.py", 4), ("b.py", 5)) 853 t3 = (("c.py", 5), ('<unknown>', 0)) 854 unknown = (('<unknown>', 0),) 855 856 f = tracemalloc.Filter(True, "b.py", all_frames=True) 857 self.assertTrue(f._match_traceback(t1)) 858 self.assertTrue(f._match_traceback(t2)) 859 self.assertFalse(f._match_traceback(t3)) 860 self.assertFalse(f._match_traceback(unknown)) 861 862 f = tracemalloc.Filter(True, "b.py", all_frames=False) 863 self.assertFalse(f._match_traceback(t1)) 864 self.assertTrue(f._match_traceback(t2)) 865 self.assertFalse(f._match_traceback(t3)) 866 self.assertFalse(f._match_traceback(unknown)) 867 868 f = tracemalloc.Filter(False, "b.py", all_frames=True) 869 self.assertFalse(f._match_traceback(t1)) 870 self.assertFalse(f._match_traceback(t2)) 871 self.assertTrue(f._match_traceback(t3)) 872 self.assertTrue(f._match_traceback(unknown)) 873 874 f = tracemalloc.Filter(False, "b.py", all_frames=False) 875 self.assertTrue(f._match_traceback(t1)) 876 self.assertFalse(f._match_traceback(t2)) 877 self.assertTrue(f._match_traceback(t3)) 878 self.assertTrue(f._match_traceback(unknown)) 879 880 f = tracemalloc.Filter(False, "<unknown>", all_frames=False) 881 self.assertTrue(f._match_traceback(t1)) 882 self.assertTrue(f._match_traceback(t2)) 883 self.assertTrue(f._match_traceback(t3)) 884 self.assertFalse(f._match_traceback(unknown)) 885 886 f = tracemalloc.Filter(True, "<unknown>", all_frames=True) 887 self.assertFalse(f._match_traceback(t1)) 888 self.assertFalse(f._match_traceback(t2)) 889 self.assertTrue(f._match_traceback(t3)) 890 self.assertTrue(f._match_traceback(unknown)) 891 892 f = tracemalloc.Filter(False, "<unknown>", all_frames=True) 893 self.assertTrue(f._match_traceback(t1)) 894 self.assertTrue(f._match_traceback(t2)) 895 self.assertFalse(f._match_traceback(t3)) 896 self.assertFalse(f._match_traceback(unknown)) 897 898 899class TestCommandLine(unittest.TestCase): 900 def test_env_var_disabled_by_default(self): 901 # not tracing by default 902 code = 'import tracemalloc; print(tracemalloc.is_tracing())' 903 ok, stdout, stderr = assert_python_ok('-c', code) 904 stdout = stdout.rstrip() 905 self.assertEqual(stdout, b'False') 906 907 @unittest.skipIf(interpreter_requires_environment(), 908 'Cannot run -E tests when PYTHON env vars are required.') 909 def test_env_var_ignored_with_E(self): 910 """PYTHON* environment variables must be ignored when -E is present.""" 911 code = 'import tracemalloc; print(tracemalloc.is_tracing())' 912 ok, stdout, stderr = assert_python_ok('-E', '-c', code, PYTHONTRACEMALLOC='1') 913 stdout = stdout.rstrip() 914 self.assertEqual(stdout, b'False') 915 916 def test_env_var_disabled(self): 917 # tracing at startup 918 code = 'import tracemalloc; print(tracemalloc.is_tracing())' 919 ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='0') 920 stdout = stdout.rstrip() 921 self.assertEqual(stdout, b'False') 922 923 def test_env_var_enabled_at_startup(self): 924 # tracing at startup 925 code = 'import tracemalloc; print(tracemalloc.is_tracing())' 926 ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='1') 927 stdout = stdout.rstrip() 928 self.assertEqual(stdout, b'True') 929 930 def test_env_limit(self): 931 # start and set the number of frames 932 code = 'import tracemalloc; print(tracemalloc.get_traceback_limit())' 933 ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='10') 934 stdout = stdout.rstrip() 935 self.assertEqual(stdout, b'10') 936 937 def check_env_var_invalid(self, nframe): 938 with support.SuppressCrashReport(): 939 ok, stdout, stderr = assert_python_failure( 940 '-c', 'pass', 941 PYTHONTRACEMALLOC=str(nframe)) 942 943 if b'ValueError: the number of frames must be in range' in stderr: 944 return 945 if b'PYTHONTRACEMALLOC: invalid number of frames' in stderr: 946 return 947 self.fail(f"unexpected output: {stderr!a}") 948 949 950 def test_env_var_invalid(self): 951 for nframe in INVALID_NFRAME: 952 with self.subTest(nframe=nframe): 953 self.check_env_var_invalid(nframe) 954 955 def test_sys_xoptions(self): 956 for xoptions, nframe in ( 957 ('tracemalloc', 1), 958 ('tracemalloc=1', 1), 959 ('tracemalloc=15', 15), 960 ): 961 with self.subTest(xoptions=xoptions, nframe=nframe): 962 code = 'import tracemalloc; print(tracemalloc.get_traceback_limit())' 963 ok, stdout, stderr = assert_python_ok('-X', xoptions, '-c', code) 964 stdout = stdout.rstrip() 965 self.assertEqual(stdout, str(nframe).encode('ascii')) 966 967 def check_sys_xoptions_invalid(self, nframe): 968 args = ('-X', 'tracemalloc=%s' % nframe, '-c', 'pass') 969 with support.SuppressCrashReport(): 970 ok, stdout, stderr = assert_python_failure(*args) 971 972 if b'ValueError: the number of frames must be in range' in stderr: 973 return 974 if b'-X tracemalloc=NFRAME: invalid number of frames' in stderr: 975 return 976 self.fail(f"unexpected output: {stderr!a}") 977 978 def test_sys_xoptions_invalid(self): 979 for nframe in INVALID_NFRAME: 980 with self.subTest(nframe=nframe): 981 self.check_sys_xoptions_invalid(nframe) 982 983 @unittest.skipIf(_testcapi is None, 'need _testcapi') 984 def test_pymem_alloc0(self): 985 # Issue #21639: Check that PyMem_Malloc(0) with tracemalloc enabled 986 # does not crash. 987 code = 'import _testcapi; _testcapi.test_pymem_alloc0(); 1' 988 assert_python_ok('-X', 'tracemalloc', '-c', code) 989 990 991@unittest.skipIf(_testcapi is None, 'need _testcapi') 992class TestCAPI(unittest.TestCase): 993 maxDiff = 80 * 20 994 995 def setUp(self): 996 if tracemalloc.is_tracing(): 997 self.skipTest("tracemalloc must be stopped before the test") 998 999 self.domain = 5 1000 self.size = 123 1001 self.obj = allocate_bytes(self.size)[0] 1002 1003 # for the type "object", id(obj) is the address of its memory block. 1004 # This type is not tracked by the garbage collector 1005 self.ptr = id(self.obj) 1006 1007 def tearDown(self): 1008 tracemalloc.stop() 1009 1010 def get_traceback(self): 1011 frames = _testcapi.tracemalloc_get_traceback(self.domain, self.ptr) 1012 if frames is not None: 1013 return tracemalloc.Traceback(frames) 1014 else: 1015 return None 1016 1017 def track(self, release_gil=False, nframe=1): 1018 frames = get_frames(nframe, 1) 1019 _testcapi.tracemalloc_track(self.domain, self.ptr, self.size, 1020 release_gil) 1021 return frames 1022 1023 def untrack(self): 1024 _testcapi.tracemalloc_untrack(self.domain, self.ptr) 1025 1026 def get_traced_memory(self): 1027 # Get the traced size in the domain 1028 snapshot = tracemalloc.take_snapshot() 1029 domain_filter = tracemalloc.DomainFilter(True, self.domain) 1030 snapshot = snapshot.filter_traces([domain_filter]) 1031 return sum(trace.size for trace in snapshot.traces) 1032 1033 def check_track(self, release_gil): 1034 nframe = 5 1035 tracemalloc.start(nframe) 1036 1037 size = tracemalloc.get_traced_memory()[0] 1038 1039 frames = self.track(release_gil, nframe) 1040 self.assertEqual(self.get_traceback(), 1041 tracemalloc.Traceback(frames)) 1042 1043 self.assertEqual(self.get_traced_memory(), self.size) 1044 1045 def test_track(self): 1046 self.check_track(False) 1047 1048 def test_track_without_gil(self): 1049 # check that calling _PyTraceMalloc_Track() without holding the GIL 1050 # works too 1051 self.check_track(True) 1052 1053 def test_track_already_tracked(self): 1054 nframe = 5 1055 tracemalloc.start(nframe) 1056 1057 # track a first time 1058 self.track() 1059 1060 # calling _PyTraceMalloc_Track() must remove the old trace and add 1061 # a new trace with the new traceback 1062 frames = self.track(nframe=nframe) 1063 self.assertEqual(self.get_traceback(), 1064 tracemalloc.Traceback(frames)) 1065 1066 def test_untrack(self): 1067 tracemalloc.start() 1068 1069 self.track() 1070 self.assertIsNotNone(self.get_traceback()) 1071 self.assertEqual(self.get_traced_memory(), self.size) 1072 1073 # untrack must remove the trace 1074 self.untrack() 1075 self.assertIsNone(self.get_traceback()) 1076 self.assertEqual(self.get_traced_memory(), 0) 1077 1078 # calling _PyTraceMalloc_Untrack() multiple times must not crash 1079 self.untrack() 1080 self.untrack() 1081 1082 def test_stop_track(self): 1083 tracemalloc.start() 1084 tracemalloc.stop() 1085 1086 with self.assertRaises(RuntimeError): 1087 self.track() 1088 self.assertIsNone(self.get_traceback()) 1089 1090 def test_stop_untrack(self): 1091 tracemalloc.start() 1092 self.track() 1093 1094 tracemalloc.stop() 1095 with self.assertRaises(RuntimeError): 1096 self.untrack() 1097 1098 1099if __name__ == "__main__": 1100 unittest.main() 1101