1import contextlib
2import os
3import sys
4import tracemalloc
5import unittest
6from unittest.mock import patch
7from test.support.script_helper import (assert_python_ok, assert_python_failure,
8                                        interpreter_requires_environment)
9from test import support
10from test.support import os_helper
11
12try:
13    import _testcapi
14except ImportError:
15    _testcapi = None
16
17
18EMPTY_STRING_SIZE = sys.getsizeof(b'')
19INVALID_NFRAME = (-1, 2**30)
20
21
22def get_frames(nframe, lineno_delta):
23    frames = []
24    frame = sys._getframe(1)
25    for index in range(nframe):
26        code = frame.f_code
27        lineno = frame.f_lineno + lineno_delta
28        frames.append((code.co_filename, lineno))
29        lineno_delta = 0
30        frame = frame.f_back
31        if frame is None:
32            break
33    return tuple(frames)
34
35def allocate_bytes(size):
36    nframe = tracemalloc.get_traceback_limit()
37    bytes_len = (size - EMPTY_STRING_SIZE)
38    frames = get_frames(nframe, 1)
39    data = b'x' * bytes_len
40    return data, tracemalloc.Traceback(frames, min(len(frames), nframe))
41
42def create_snapshots():
43    traceback_limit = 2
44
45    # _tracemalloc._get_traces() returns a list of (domain, size,
46    # traceback_frames) tuples. traceback_frames is a tuple of (filename,
47    # line_number) tuples.
48    raw_traces = [
49        (0, 10, (('a.py', 2), ('b.py', 4)), 3),
50        (0, 10, (('a.py', 2), ('b.py', 4)), 3),
51        (0, 10, (('a.py', 2), ('b.py', 4)), 3),
52
53        (1, 2, (('a.py', 5), ('b.py', 4)), 3),
54
55        (2, 66, (('b.py', 1),), 1),
56
57        (3, 7, (('<unknown>', 0),), 1),
58    ]
59    snapshot = tracemalloc.Snapshot(raw_traces, traceback_limit)
60
61    raw_traces2 = [
62        (0, 10, (('a.py', 2), ('b.py', 4)), 3),
63        (0, 10, (('a.py', 2), ('b.py', 4)), 3),
64        (0, 10, (('a.py', 2), ('b.py', 4)), 3),
65
66        (2, 2, (('a.py', 5), ('b.py', 4)), 3),
67        (2, 5000, (('a.py', 5), ('b.py', 4)), 3),
68
69        (4, 400, (('c.py', 578),), 1),
70    ]
71    snapshot2 = tracemalloc.Snapshot(raw_traces2, traceback_limit)
72
73    return (snapshot, snapshot2)
74
75def frame(filename, lineno):
76    return tracemalloc._Frame((filename, lineno))
77
78def traceback(*frames):
79    return tracemalloc.Traceback(frames)
80
81def traceback_lineno(filename, lineno):
82    return traceback((filename, lineno))
83
84def traceback_filename(filename):
85    return traceback_lineno(filename, 0)
86
87
88class TestTraceback(unittest.TestCase):
89    def test_repr(self):
90        def get_repr(*args) -> str:
91            return repr(tracemalloc.Traceback(*args))
92
93        self.assertEqual(get_repr(()), "<Traceback ()>")
94        self.assertEqual(get_repr((), 0), "<Traceback () total_nframe=0>")
95
96        frames = (("f1", 1), ("f2", 2))
97        exp_repr_frames = (
98            "(<Frame filename='f2' lineno=2>,"
99            " <Frame filename='f1' lineno=1>)"
100        )
101        self.assertEqual(get_repr(frames),
102                         f"<Traceback {exp_repr_frames}>")
103        self.assertEqual(get_repr(frames, 2),
104                         f"<Traceback {exp_repr_frames} total_nframe=2>")
105
106
107class TestTracemallocEnabled(unittest.TestCase):
108    def setUp(self):
109        if tracemalloc.is_tracing():
110            self.skipTest("tracemalloc must be stopped before the test")
111
112        tracemalloc.start(1)
113
114    def tearDown(self):
115        tracemalloc.stop()
116
117    def test_get_tracemalloc_memory(self):
118        data = [allocate_bytes(123) for count in range(1000)]
119        size = tracemalloc.get_tracemalloc_memory()
120        self.assertGreaterEqual(size, 0)
121
122        tracemalloc.clear_traces()
123        size2 = tracemalloc.get_tracemalloc_memory()
124        self.assertGreaterEqual(size2, 0)
125        self.assertLessEqual(size2, size)
126
127    def test_get_object_traceback(self):
128        tracemalloc.clear_traces()
129        obj_size = 12345
130        obj, obj_traceback = allocate_bytes(obj_size)
131        traceback = tracemalloc.get_object_traceback(obj)
132        self.assertEqual(traceback, obj_traceback)
133
134    def test_new_reference(self):
135        tracemalloc.clear_traces()
136        # gc.collect() indirectly calls PyList_ClearFreeList()
137        support.gc_collect()
138
139        # Create a list and "destroy it": put it in the PyListObject free list
140        obj = []
141        obj = None
142
143        # Create a list which should reuse the previously created empty list
144        obj = []
145
146        nframe = tracemalloc.get_traceback_limit()
147        frames = get_frames(nframe, -3)
148        obj_traceback = tracemalloc.Traceback(frames, min(len(frames), nframe))
149
150        traceback = tracemalloc.get_object_traceback(obj)
151        self.assertIsNotNone(traceback)
152        self.assertEqual(traceback, obj_traceback)
153
154    def test_set_traceback_limit(self):
155        obj_size = 10
156
157        tracemalloc.stop()
158        self.assertRaises(ValueError, tracemalloc.start, -1)
159
160        tracemalloc.stop()
161        tracemalloc.start(10)
162        obj2, obj2_traceback = allocate_bytes(obj_size)
163        traceback = tracemalloc.get_object_traceback(obj2)
164        self.assertEqual(len(traceback), 10)
165        self.assertEqual(traceback, obj2_traceback)
166
167        tracemalloc.stop()
168        tracemalloc.start(1)
169        obj, obj_traceback = allocate_bytes(obj_size)
170        traceback = tracemalloc.get_object_traceback(obj)
171        self.assertEqual(len(traceback), 1)
172        self.assertEqual(traceback, obj_traceback)
173
174    def find_trace(self, traces, traceback):
175        for trace in traces:
176            if trace[2] == traceback._frames:
177                return trace
178
179        self.fail("trace not found")
180
181    def test_get_traces(self):
182        tracemalloc.clear_traces()
183        obj_size = 12345
184        obj, obj_traceback = allocate_bytes(obj_size)
185
186        traces = tracemalloc._get_traces()
187        trace = self.find_trace(traces, obj_traceback)
188
189        self.assertIsInstance(trace, tuple)
190        domain, size, traceback, length = trace
191        self.assertEqual(size, obj_size)
192        self.assertEqual(traceback, obj_traceback._frames)
193
194        tracemalloc.stop()
195        self.assertEqual(tracemalloc._get_traces(), [])
196
197    def test_get_traces_intern_traceback(self):
198        # dummy wrappers to get more useful and identical frames in the traceback
199        def allocate_bytes2(size):
200            return allocate_bytes(size)
201        def allocate_bytes3(size):
202            return allocate_bytes2(size)
203        def allocate_bytes4(size):
204            return allocate_bytes3(size)
205
206        # Ensure that two identical tracebacks are not duplicated
207        tracemalloc.stop()
208        tracemalloc.start(4)
209        obj_size = 123
210        obj1, obj1_traceback = allocate_bytes4(obj_size)
211        obj2, obj2_traceback = allocate_bytes4(obj_size)
212
213        traces = tracemalloc._get_traces()
214
215        obj1_traceback._frames = tuple(reversed(obj1_traceback._frames))
216        obj2_traceback._frames = tuple(reversed(obj2_traceback._frames))
217
218        trace1 = self.find_trace(traces, obj1_traceback)
219        trace2 = self.find_trace(traces, obj2_traceback)
220        domain1, size1, traceback1, length1 = trace1
221        domain2, size2, traceback2, length2 = trace2
222        self.assertIs(traceback2, traceback1)
223
224    def test_get_traced_memory(self):
225        # Python allocates some internals objects, so the test must tolerate
226        # a small difference between the expected size and the real usage
227        max_error = 2048
228
229        # allocate one object
230        obj_size = 1024 * 1024
231        tracemalloc.clear_traces()
232        obj, obj_traceback = allocate_bytes(obj_size)
233        size, peak_size = tracemalloc.get_traced_memory()
234        self.assertGreaterEqual(size, obj_size)
235        self.assertGreaterEqual(peak_size, size)
236
237        self.assertLessEqual(size - obj_size, max_error)
238        self.assertLessEqual(peak_size - size, max_error)
239
240        # destroy the object
241        obj = None
242        size2, peak_size2 = tracemalloc.get_traced_memory()
243        self.assertLess(size2, size)
244        self.assertGreaterEqual(size - size2, obj_size - max_error)
245        self.assertGreaterEqual(peak_size2, peak_size)
246
247        # clear_traces() must reset traced memory counters
248        tracemalloc.clear_traces()
249        self.assertEqual(tracemalloc.get_traced_memory(), (0, 0))
250
251        # allocate another object
252        obj, obj_traceback = allocate_bytes(obj_size)
253        size, peak_size = tracemalloc.get_traced_memory()
254        self.assertGreaterEqual(size, obj_size)
255
256        # stop() also resets traced memory counters
257        tracemalloc.stop()
258        self.assertEqual(tracemalloc.get_traced_memory(), (0, 0))
259
260    def test_clear_traces(self):
261        obj, obj_traceback = allocate_bytes(123)
262        traceback = tracemalloc.get_object_traceback(obj)
263        self.assertIsNotNone(traceback)
264
265        tracemalloc.clear_traces()
266        traceback2 = tracemalloc.get_object_traceback(obj)
267        self.assertIsNone(traceback2)
268
269    def test_reset_peak(self):
270        # Python allocates some internals objects, so the test must tolerate
271        # a small difference between the expected size and the real usage
272        tracemalloc.clear_traces()
273
274        # Example: allocate a large piece of memory, temporarily
275        large_sum = sum(list(range(100000)))
276        size1, peak1 = tracemalloc.get_traced_memory()
277
278        # reset_peak() resets peak to traced memory: peak2 < peak1
279        tracemalloc.reset_peak()
280        size2, peak2 = tracemalloc.get_traced_memory()
281        self.assertGreaterEqual(peak2, size2)
282        self.assertLess(peak2, peak1)
283
284        # check that peak continue to be updated if new memory is allocated:
285        # peak3 > peak2
286        obj_size = 1024 * 1024
287        obj, obj_traceback = allocate_bytes(obj_size)
288        size3, peak3 = tracemalloc.get_traced_memory()
289        self.assertGreaterEqual(peak3, size3)
290        self.assertGreater(peak3, peak2)
291        self.assertGreaterEqual(peak3 - peak2, obj_size)
292
293    def test_is_tracing(self):
294        tracemalloc.stop()
295        self.assertFalse(tracemalloc.is_tracing())
296
297        tracemalloc.start()
298        self.assertTrue(tracemalloc.is_tracing())
299
300    def test_snapshot(self):
301        obj, source = allocate_bytes(123)
302
303        # take a snapshot
304        snapshot = tracemalloc.take_snapshot()
305
306        # This can vary
307        self.assertGreater(snapshot.traces[1].traceback.total_nframe, 10)
308
309        # write on disk
310        snapshot.dump(os_helper.TESTFN)
311        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
312
313        # load from disk
314        snapshot2 = tracemalloc.Snapshot.load(os_helper.TESTFN)
315        self.assertEqual(snapshot2.traces, snapshot.traces)
316
317        # tracemalloc must be tracing memory allocations to take a snapshot
318        tracemalloc.stop()
319        with self.assertRaises(RuntimeError) as cm:
320            tracemalloc.take_snapshot()
321        self.assertEqual(str(cm.exception),
322                         "the tracemalloc module must be tracing memory "
323                         "allocations to take a snapshot")
324
325    def test_snapshot_save_attr(self):
326        # take a snapshot with a new attribute
327        snapshot = tracemalloc.take_snapshot()
328        snapshot.test_attr = "new"
329        snapshot.dump(os_helper.TESTFN)
330        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
331
332        # load() should recreate the attribute
333        snapshot2 = tracemalloc.Snapshot.load(os_helper.TESTFN)
334        self.assertEqual(snapshot2.test_attr, "new")
335
336    def fork_child(self):
337        if not tracemalloc.is_tracing():
338            return 2
339
340        obj_size = 12345
341        obj, obj_traceback = allocate_bytes(obj_size)
342        traceback = tracemalloc.get_object_traceback(obj)
343        if traceback is None:
344            return 3
345
346        # everything is fine
347        return 0
348
349    @support.requires_fork()
350    def test_fork(self):
351        # check that tracemalloc is still working after fork
352        pid = os.fork()
353        if not pid:
354            # child
355            exitcode = 1
356            try:
357                exitcode = self.fork_child()
358            finally:
359                os._exit(exitcode)
360        else:
361            support.wait_process(pid, exitcode=0)
362
363    def test_no_incomplete_frames(self):
364        tracemalloc.stop()
365        tracemalloc.start(8)
366
367        def f(x):
368            def g():
369                return x
370            return g
371
372        obj = f(0).__closure__[0]
373        traceback = tracemalloc.get_object_traceback(obj)
374        self.assertIn("test_tracemalloc", traceback[-1].filename)
375        self.assertNotIn("test_tracemalloc", traceback[-2].filename)
376
377
378class TestSnapshot(unittest.TestCase):
379    maxDiff = 4000
380
381    def test_create_snapshot(self):
382        raw_traces = [(0, 5, (('a.py', 2),), 10)]
383
384        with contextlib.ExitStack() as stack:
385            stack.enter_context(patch.object(tracemalloc, 'is_tracing',
386                                             return_value=True))
387            stack.enter_context(patch.object(tracemalloc, 'get_traceback_limit',
388                                             return_value=5))
389            stack.enter_context(patch.object(tracemalloc, '_get_traces',
390                                             return_value=raw_traces))
391
392            snapshot = tracemalloc.take_snapshot()
393            self.assertEqual(snapshot.traceback_limit, 5)
394            self.assertEqual(len(snapshot.traces), 1)
395            trace = snapshot.traces[0]
396            self.assertEqual(trace.size, 5)
397            self.assertEqual(trace.traceback.total_nframe, 10)
398            self.assertEqual(len(trace.traceback), 1)
399            self.assertEqual(trace.traceback[0].filename, 'a.py')
400            self.assertEqual(trace.traceback[0].lineno, 2)
401
402    def test_filter_traces(self):
403        snapshot, snapshot2 = create_snapshots()
404        filter1 = tracemalloc.Filter(False, "b.py")
405        filter2 = tracemalloc.Filter(True, "a.py", 2)
406        filter3 = tracemalloc.Filter(True, "a.py", 5)
407
408        original_traces = list(snapshot.traces._traces)
409
410        # exclude b.py
411        snapshot3 = snapshot.filter_traces((filter1,))
412        self.assertEqual(snapshot3.traces._traces, [
413            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
414            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
415            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
416            (1, 2, (('a.py', 5), ('b.py', 4)), 3),
417            (3, 7, (('<unknown>', 0),), 1),
418        ])
419
420        # filter_traces() must not touch the original snapshot
421        self.assertEqual(snapshot.traces._traces, original_traces)
422
423        # only include two lines of a.py
424        snapshot4 = snapshot3.filter_traces((filter2, filter3))
425        self.assertEqual(snapshot4.traces._traces, [
426            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
427            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
428            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
429            (1, 2, (('a.py', 5), ('b.py', 4)), 3),
430        ])
431
432        # No filter: just duplicate the snapshot
433        snapshot5 = snapshot.filter_traces(())
434        self.assertIsNot(snapshot5, snapshot)
435        self.assertIsNot(snapshot5.traces, snapshot.traces)
436        self.assertEqual(snapshot5.traces, snapshot.traces)
437
438        self.assertRaises(TypeError, snapshot.filter_traces, filter1)
439
440    def test_filter_traces_domain(self):
441        snapshot, snapshot2 = create_snapshots()
442        filter1 = tracemalloc.Filter(False, "a.py", domain=1)
443        filter2 = tracemalloc.Filter(True, "a.py", domain=1)
444
445        original_traces = list(snapshot.traces._traces)
446
447        # exclude a.py of domain 1
448        snapshot3 = snapshot.filter_traces((filter1,))
449        self.assertEqual(snapshot3.traces._traces, [
450            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
451            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
452            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
453            (2, 66, (('b.py', 1),), 1),
454            (3, 7, (('<unknown>', 0),), 1),
455        ])
456
457        # include domain 1
458        snapshot3 = snapshot.filter_traces((filter1,))
459        self.assertEqual(snapshot3.traces._traces, [
460            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
461            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
462            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
463            (2, 66, (('b.py', 1),), 1),
464            (3, 7, (('<unknown>', 0),), 1),
465        ])
466
467    def test_filter_traces_domain_filter(self):
468        snapshot, snapshot2 = create_snapshots()
469        filter1 = tracemalloc.DomainFilter(False, domain=3)
470        filter2 = tracemalloc.DomainFilter(True, domain=3)
471
472        # exclude domain 2
473        snapshot3 = snapshot.filter_traces((filter1,))
474        self.assertEqual(snapshot3.traces._traces, [
475            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
476            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
477            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
478            (1, 2, (('a.py', 5), ('b.py', 4)), 3),
479            (2, 66, (('b.py', 1),), 1),
480        ])
481
482        # include domain 2
483        snapshot3 = snapshot.filter_traces((filter2,))
484        self.assertEqual(snapshot3.traces._traces, [
485            (3, 7, (('<unknown>', 0),), 1),
486        ])
487
488    def test_snapshot_group_by_line(self):
489        snapshot, snapshot2 = create_snapshots()
490        tb_0 = traceback_lineno('<unknown>', 0)
491        tb_a_2 = traceback_lineno('a.py', 2)
492        tb_a_5 = traceback_lineno('a.py', 5)
493        tb_b_1 = traceback_lineno('b.py', 1)
494        tb_c_578 = traceback_lineno('c.py', 578)
495
496        # stats per file and line
497        stats1 = snapshot.statistics('lineno')
498        self.assertEqual(stats1, [
499            tracemalloc.Statistic(tb_b_1, 66, 1),
500            tracemalloc.Statistic(tb_a_2, 30, 3),
501            tracemalloc.Statistic(tb_0, 7, 1),
502            tracemalloc.Statistic(tb_a_5, 2, 1),
503        ])
504
505        # stats per file and line (2)
506        stats2 = snapshot2.statistics('lineno')
507        self.assertEqual(stats2, [
508            tracemalloc.Statistic(tb_a_5, 5002, 2),
509            tracemalloc.Statistic(tb_c_578, 400, 1),
510            tracemalloc.Statistic(tb_a_2, 30, 3),
511        ])
512
513        # stats diff per file and line
514        statistics = snapshot2.compare_to(snapshot, 'lineno')
515        self.assertEqual(statistics, [
516            tracemalloc.StatisticDiff(tb_a_5, 5002, 5000, 2, 1),
517            tracemalloc.StatisticDiff(tb_c_578, 400, 400, 1, 1),
518            tracemalloc.StatisticDiff(tb_b_1, 0, -66, 0, -1),
519            tracemalloc.StatisticDiff(tb_0, 0, -7, 0, -1),
520            tracemalloc.StatisticDiff(tb_a_2, 30, 0, 3, 0),
521        ])
522
523    def test_snapshot_group_by_file(self):
524        snapshot, snapshot2 = create_snapshots()
525        tb_0 = traceback_filename('<unknown>')
526        tb_a = traceback_filename('a.py')
527        tb_b = traceback_filename('b.py')
528        tb_c = traceback_filename('c.py')
529
530        # stats per file
531        stats1 = snapshot.statistics('filename')
532        self.assertEqual(stats1, [
533            tracemalloc.Statistic(tb_b, 66, 1),
534            tracemalloc.Statistic(tb_a, 32, 4),
535            tracemalloc.Statistic(tb_0, 7, 1),
536        ])
537
538        # stats per file (2)
539        stats2 = snapshot2.statistics('filename')
540        self.assertEqual(stats2, [
541            tracemalloc.Statistic(tb_a, 5032, 5),
542            tracemalloc.Statistic(tb_c, 400, 1),
543        ])
544
545        # stats diff per file
546        diff = snapshot2.compare_to(snapshot, 'filename')
547        self.assertEqual(diff, [
548            tracemalloc.StatisticDiff(tb_a, 5032, 5000, 5, 1),
549            tracemalloc.StatisticDiff(tb_c, 400, 400, 1, 1),
550            tracemalloc.StatisticDiff(tb_b, 0, -66, 0, -1),
551            tracemalloc.StatisticDiff(tb_0, 0, -7, 0, -1),
552        ])
553
554    def test_snapshot_group_by_traceback(self):
555        snapshot, snapshot2 = create_snapshots()
556
557        # stats per file
558        tb1 = traceback(('a.py', 2), ('b.py', 4))
559        tb2 = traceback(('a.py', 5), ('b.py', 4))
560        tb3 = traceback(('b.py', 1))
561        tb4 = traceback(('<unknown>', 0))
562        stats1 = snapshot.statistics('traceback')
563        self.assertEqual(stats1, [
564            tracemalloc.Statistic(tb3, 66, 1),
565            tracemalloc.Statistic(tb1, 30, 3),
566            tracemalloc.Statistic(tb4, 7, 1),
567            tracemalloc.Statistic(tb2, 2, 1),
568        ])
569
570        # stats per file (2)
571        tb5 = traceback(('c.py', 578))
572        stats2 = snapshot2.statistics('traceback')
573        self.assertEqual(stats2, [
574            tracemalloc.Statistic(tb2, 5002, 2),
575            tracemalloc.Statistic(tb5, 400, 1),
576            tracemalloc.Statistic(tb1, 30, 3),
577        ])
578
579        # stats diff per file
580        diff = snapshot2.compare_to(snapshot, 'traceback')
581        self.assertEqual(diff, [
582            tracemalloc.StatisticDiff(tb2, 5002, 5000, 2, 1),
583            tracemalloc.StatisticDiff(tb5, 400, 400, 1, 1),
584            tracemalloc.StatisticDiff(tb3, 0, -66, 0, -1),
585            tracemalloc.StatisticDiff(tb4, 0, -7, 0, -1),
586            tracemalloc.StatisticDiff(tb1, 30, 0, 3, 0),
587        ])
588
589        self.assertRaises(ValueError,
590                          snapshot.statistics, 'traceback', cumulative=True)
591
592    def test_snapshot_group_by_cumulative(self):
593        snapshot, snapshot2 = create_snapshots()
594        tb_0 = traceback_filename('<unknown>')
595        tb_a = traceback_filename('a.py')
596        tb_b = traceback_filename('b.py')
597        tb_a_2 = traceback_lineno('a.py', 2)
598        tb_a_5 = traceback_lineno('a.py', 5)
599        tb_b_1 = traceback_lineno('b.py', 1)
600        tb_b_4 = traceback_lineno('b.py', 4)
601
602        # per file
603        stats = snapshot.statistics('filename', True)
604        self.assertEqual(stats, [
605            tracemalloc.Statistic(tb_b, 98, 5),
606            tracemalloc.Statistic(tb_a, 32, 4),
607            tracemalloc.Statistic(tb_0, 7, 1),
608        ])
609
610        # per line
611        stats = snapshot.statistics('lineno', True)
612        self.assertEqual(stats, [
613            tracemalloc.Statistic(tb_b_1, 66, 1),
614            tracemalloc.Statistic(tb_b_4, 32, 4),
615            tracemalloc.Statistic(tb_a_2, 30, 3),
616            tracemalloc.Statistic(tb_0, 7, 1),
617            tracemalloc.Statistic(tb_a_5, 2, 1),
618        ])
619
620    def test_trace_format(self):
621        snapshot, snapshot2 = create_snapshots()
622        trace = snapshot.traces[0]
623        self.assertEqual(str(trace), 'b.py:4: 10 B')
624        traceback = trace.traceback
625        self.assertEqual(str(traceback), 'b.py:4')
626        frame = traceback[0]
627        self.assertEqual(str(frame), 'b.py:4')
628
629    def test_statistic_format(self):
630        snapshot, snapshot2 = create_snapshots()
631        stats = snapshot.statistics('lineno')
632        stat = stats[0]
633        self.assertEqual(str(stat),
634                         'b.py:1: size=66 B, count=1, average=66 B')
635
636    def test_statistic_diff_format(self):
637        snapshot, snapshot2 = create_snapshots()
638        stats = snapshot2.compare_to(snapshot, 'lineno')
639        stat = stats[0]
640        self.assertEqual(str(stat),
641                         'a.py:5: size=5002 B (+5000 B), count=2 (+1), average=2501 B')
642
643    def test_slices(self):
644        snapshot, snapshot2 = create_snapshots()
645        self.assertEqual(snapshot.traces[:2],
646                         (snapshot.traces[0], snapshot.traces[1]))
647
648        traceback = snapshot.traces[0].traceback
649        self.assertEqual(traceback[:2],
650                         (traceback[0], traceback[1]))
651
652    def test_format_traceback(self):
653        snapshot, snapshot2 = create_snapshots()
654        def getline(filename, lineno):
655            return '  <%s, %s>' % (filename, lineno)
656        with unittest.mock.patch('tracemalloc.linecache.getline',
657                                 side_effect=getline):
658            tb = snapshot.traces[0].traceback
659            self.assertEqual(tb.format(),
660                             ['  File "b.py", line 4',
661                              '    <b.py, 4>',
662                              '  File "a.py", line 2',
663                              '    <a.py, 2>'])
664
665            self.assertEqual(tb.format(limit=1),
666                             ['  File "a.py", line 2',
667                              '    <a.py, 2>'])
668
669            self.assertEqual(tb.format(limit=-1),
670                             ['  File "b.py", line 4',
671                              '    <b.py, 4>'])
672
673            self.assertEqual(tb.format(most_recent_first=True),
674                             ['  File "a.py", line 2',
675                              '    <a.py, 2>',
676                              '  File "b.py", line 4',
677                              '    <b.py, 4>'])
678
679            self.assertEqual(tb.format(limit=1, most_recent_first=True),
680                             ['  File "a.py", line 2',
681                              '    <a.py, 2>'])
682
683            self.assertEqual(tb.format(limit=-1, most_recent_first=True),
684                             ['  File "b.py", line 4',
685                              '    <b.py, 4>'])
686
687
688class TestFilters(unittest.TestCase):
689    maxDiff = 2048
690
691    def test_filter_attributes(self):
692        # test default values
693        f = tracemalloc.Filter(True, "abc")
694        self.assertEqual(f.inclusive, True)
695        self.assertEqual(f.filename_pattern, "abc")
696        self.assertIsNone(f.lineno)
697        self.assertEqual(f.all_frames, False)
698
699        # test custom values
700        f = tracemalloc.Filter(False, "test.py", 123, True)
701        self.assertEqual(f.inclusive, False)
702        self.assertEqual(f.filename_pattern, "test.py")
703        self.assertEqual(f.lineno, 123)
704        self.assertEqual(f.all_frames, True)
705
706        # parameters passed by keyword
707        f = tracemalloc.Filter(inclusive=False, filename_pattern="test.py", lineno=123, all_frames=True)
708        self.assertEqual(f.inclusive, False)
709        self.assertEqual(f.filename_pattern, "test.py")
710        self.assertEqual(f.lineno, 123)
711        self.assertEqual(f.all_frames, True)
712
713        # read-only attribute
714        self.assertRaises(AttributeError, setattr, f, "filename_pattern", "abc")
715
716    def test_filter_match(self):
717        # filter without line number
718        f = tracemalloc.Filter(True, "abc")
719        self.assertTrue(f._match_frame("abc", 0))
720        self.assertTrue(f._match_frame("abc", 5))
721        self.assertTrue(f._match_frame("abc", 10))
722        self.assertFalse(f._match_frame("12356", 0))
723        self.assertFalse(f._match_frame("12356", 5))
724        self.assertFalse(f._match_frame("12356", 10))
725
726        f = tracemalloc.Filter(False, "abc")
727        self.assertFalse(f._match_frame("abc", 0))
728        self.assertFalse(f._match_frame("abc", 5))
729        self.assertFalse(f._match_frame("abc", 10))
730        self.assertTrue(f._match_frame("12356", 0))
731        self.assertTrue(f._match_frame("12356", 5))
732        self.assertTrue(f._match_frame("12356", 10))
733
734        # filter with line number > 0
735        f = tracemalloc.Filter(True, "abc", 5)
736        self.assertFalse(f._match_frame("abc", 0))
737        self.assertTrue(f._match_frame("abc", 5))
738        self.assertFalse(f._match_frame("abc", 10))
739        self.assertFalse(f._match_frame("12356", 0))
740        self.assertFalse(f._match_frame("12356", 5))
741        self.assertFalse(f._match_frame("12356", 10))
742
743        f = tracemalloc.Filter(False, "abc", 5)
744        self.assertTrue(f._match_frame("abc", 0))
745        self.assertFalse(f._match_frame("abc", 5))
746        self.assertTrue(f._match_frame("abc", 10))
747        self.assertTrue(f._match_frame("12356", 0))
748        self.assertTrue(f._match_frame("12356", 5))
749        self.assertTrue(f._match_frame("12356", 10))
750
751        # filter with line number 0
752        f = tracemalloc.Filter(True, "abc", 0)
753        self.assertTrue(f._match_frame("abc", 0))
754        self.assertFalse(f._match_frame("abc", 5))
755        self.assertFalse(f._match_frame("abc", 10))
756        self.assertFalse(f._match_frame("12356", 0))
757        self.assertFalse(f._match_frame("12356", 5))
758        self.assertFalse(f._match_frame("12356", 10))
759
760        f = tracemalloc.Filter(False, "abc", 0)
761        self.assertFalse(f._match_frame("abc", 0))
762        self.assertTrue(f._match_frame("abc", 5))
763        self.assertTrue(f._match_frame("abc", 10))
764        self.assertTrue(f._match_frame("12356", 0))
765        self.assertTrue(f._match_frame("12356", 5))
766        self.assertTrue(f._match_frame("12356", 10))
767
768    def test_filter_match_filename(self):
769        def fnmatch(inclusive, filename, pattern):
770            f = tracemalloc.Filter(inclusive, pattern)
771            return f._match_frame(filename, 0)
772
773        self.assertTrue(fnmatch(True, "abc", "abc"))
774        self.assertFalse(fnmatch(True, "12356", "abc"))
775        self.assertFalse(fnmatch(True, "<unknown>", "abc"))
776
777        self.assertFalse(fnmatch(False, "abc", "abc"))
778        self.assertTrue(fnmatch(False, "12356", "abc"))
779        self.assertTrue(fnmatch(False, "<unknown>", "abc"))
780
781    def test_filter_match_filename_joker(self):
782        def fnmatch(filename, pattern):
783            filter = tracemalloc.Filter(True, pattern)
784            return filter._match_frame(filename, 0)
785
786        # empty string
787        self.assertFalse(fnmatch('abc', ''))
788        self.assertFalse(fnmatch('', 'abc'))
789        self.assertTrue(fnmatch('', ''))
790        self.assertTrue(fnmatch('', '*'))
791
792        # no *
793        self.assertTrue(fnmatch('abc', 'abc'))
794        self.assertFalse(fnmatch('abc', 'abcd'))
795        self.assertFalse(fnmatch('abc', 'def'))
796
797        # a*
798        self.assertTrue(fnmatch('abc', 'a*'))
799        self.assertTrue(fnmatch('abc', 'abc*'))
800        self.assertFalse(fnmatch('abc', 'b*'))
801        self.assertFalse(fnmatch('abc', 'abcd*'))
802
803        # a*b
804        self.assertTrue(fnmatch('abc', 'a*c'))
805        self.assertTrue(fnmatch('abcdcx', 'a*cx'))
806        self.assertFalse(fnmatch('abb', 'a*c'))
807        self.assertFalse(fnmatch('abcdce', 'a*cx'))
808
809        # a*b*c
810        self.assertTrue(fnmatch('abcde', 'a*c*e'))
811        self.assertTrue(fnmatch('abcbdefeg', 'a*bd*eg'))
812        self.assertFalse(fnmatch('abcdd', 'a*c*e'))
813        self.assertFalse(fnmatch('abcbdefef', 'a*bd*eg'))
814
815        # replace .pyc suffix with .py
816        self.assertTrue(fnmatch('a.pyc', 'a.py'))
817        self.assertTrue(fnmatch('a.py', 'a.pyc'))
818
819        if os.name == 'nt':
820            # case insensitive
821            self.assertTrue(fnmatch('aBC', 'ABc'))
822            self.assertTrue(fnmatch('aBcDe', 'Ab*dE'))
823
824            self.assertTrue(fnmatch('a.pyc', 'a.PY'))
825            self.assertTrue(fnmatch('a.py', 'a.PYC'))
826        else:
827            # case sensitive
828            self.assertFalse(fnmatch('aBC', 'ABc'))
829            self.assertFalse(fnmatch('aBcDe', 'Ab*dE'))
830
831            self.assertFalse(fnmatch('a.pyc', 'a.PY'))
832            self.assertFalse(fnmatch('a.py', 'a.PYC'))
833
834        if os.name == 'nt':
835            # normalize alternate separator "/" to the standard separator "\"
836            self.assertTrue(fnmatch(r'a/b', r'a\b'))
837            self.assertTrue(fnmatch(r'a\b', r'a/b'))
838            self.assertTrue(fnmatch(r'a/b\c', r'a\b/c'))
839            self.assertTrue(fnmatch(r'a/b/c', r'a\b\c'))
840        else:
841            # there is no alternate separator
842            self.assertFalse(fnmatch(r'a/b', r'a\b'))
843            self.assertFalse(fnmatch(r'a\b', r'a/b'))
844            self.assertFalse(fnmatch(r'a/b\c', r'a\b/c'))
845            self.assertFalse(fnmatch(r'a/b/c', r'a\b\c'))
846
847        # as of 3.5, .pyo is no longer munged to .py
848        self.assertFalse(fnmatch('a.pyo', 'a.py'))
849
850    def test_filter_match_trace(self):
851        t1 = (("a.py", 2), ("b.py", 3))
852        t2 = (("b.py", 4), ("b.py", 5))
853        t3 = (("c.py", 5), ('<unknown>', 0))
854        unknown = (('<unknown>', 0),)
855
856        f = tracemalloc.Filter(True, "b.py", all_frames=True)
857        self.assertTrue(f._match_traceback(t1))
858        self.assertTrue(f._match_traceback(t2))
859        self.assertFalse(f._match_traceback(t3))
860        self.assertFalse(f._match_traceback(unknown))
861
862        f = tracemalloc.Filter(True, "b.py", all_frames=False)
863        self.assertFalse(f._match_traceback(t1))
864        self.assertTrue(f._match_traceback(t2))
865        self.assertFalse(f._match_traceback(t3))
866        self.assertFalse(f._match_traceback(unknown))
867
868        f = tracemalloc.Filter(False, "b.py", all_frames=True)
869        self.assertFalse(f._match_traceback(t1))
870        self.assertFalse(f._match_traceback(t2))
871        self.assertTrue(f._match_traceback(t3))
872        self.assertTrue(f._match_traceback(unknown))
873
874        f = tracemalloc.Filter(False, "b.py", all_frames=False)
875        self.assertTrue(f._match_traceback(t1))
876        self.assertFalse(f._match_traceback(t2))
877        self.assertTrue(f._match_traceback(t3))
878        self.assertTrue(f._match_traceback(unknown))
879
880        f = tracemalloc.Filter(False, "<unknown>", all_frames=False)
881        self.assertTrue(f._match_traceback(t1))
882        self.assertTrue(f._match_traceback(t2))
883        self.assertTrue(f._match_traceback(t3))
884        self.assertFalse(f._match_traceback(unknown))
885
886        f = tracemalloc.Filter(True, "<unknown>", all_frames=True)
887        self.assertFalse(f._match_traceback(t1))
888        self.assertFalse(f._match_traceback(t2))
889        self.assertTrue(f._match_traceback(t3))
890        self.assertTrue(f._match_traceback(unknown))
891
892        f = tracemalloc.Filter(False, "<unknown>", all_frames=True)
893        self.assertTrue(f._match_traceback(t1))
894        self.assertTrue(f._match_traceback(t2))
895        self.assertFalse(f._match_traceback(t3))
896        self.assertFalse(f._match_traceback(unknown))
897
898
899class TestCommandLine(unittest.TestCase):
900    def test_env_var_disabled_by_default(self):
901        # not tracing by default
902        code = 'import tracemalloc; print(tracemalloc.is_tracing())'
903        ok, stdout, stderr = assert_python_ok('-c', code)
904        stdout = stdout.rstrip()
905        self.assertEqual(stdout, b'False')
906
907    @unittest.skipIf(interpreter_requires_environment(),
908                     'Cannot run -E tests when PYTHON env vars are required.')
909    def test_env_var_ignored_with_E(self):
910        """PYTHON* environment variables must be ignored when -E is present."""
911        code = 'import tracemalloc; print(tracemalloc.is_tracing())'
912        ok, stdout, stderr = assert_python_ok('-E', '-c', code, PYTHONTRACEMALLOC='1')
913        stdout = stdout.rstrip()
914        self.assertEqual(stdout, b'False')
915
916    def test_env_var_disabled(self):
917        # tracing at startup
918        code = 'import tracemalloc; print(tracemalloc.is_tracing())'
919        ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='0')
920        stdout = stdout.rstrip()
921        self.assertEqual(stdout, b'False')
922
923    def test_env_var_enabled_at_startup(self):
924        # tracing at startup
925        code = 'import tracemalloc; print(tracemalloc.is_tracing())'
926        ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='1')
927        stdout = stdout.rstrip()
928        self.assertEqual(stdout, b'True')
929
930    def test_env_limit(self):
931        # start and set the number of frames
932        code = 'import tracemalloc; print(tracemalloc.get_traceback_limit())'
933        ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='10')
934        stdout = stdout.rstrip()
935        self.assertEqual(stdout, b'10')
936
937    def check_env_var_invalid(self, nframe):
938        with support.SuppressCrashReport():
939            ok, stdout, stderr = assert_python_failure(
940                '-c', 'pass',
941                PYTHONTRACEMALLOC=str(nframe))
942
943        if b'ValueError: the number of frames must be in range' in stderr:
944            return
945        if b'PYTHONTRACEMALLOC: invalid number of frames' in stderr:
946            return
947        self.fail(f"unexpected output: {stderr!a}")
948
949
950    def test_env_var_invalid(self):
951        for nframe in INVALID_NFRAME:
952            with self.subTest(nframe=nframe):
953                self.check_env_var_invalid(nframe)
954
955    def test_sys_xoptions(self):
956        for xoptions, nframe in (
957            ('tracemalloc', 1),
958            ('tracemalloc=1', 1),
959            ('tracemalloc=15', 15),
960        ):
961            with self.subTest(xoptions=xoptions, nframe=nframe):
962                code = 'import tracemalloc; print(tracemalloc.get_traceback_limit())'
963                ok, stdout, stderr = assert_python_ok('-X', xoptions, '-c', code)
964                stdout = stdout.rstrip()
965                self.assertEqual(stdout, str(nframe).encode('ascii'))
966
967    def check_sys_xoptions_invalid(self, nframe):
968        args = ('-X', 'tracemalloc=%s' % nframe, '-c', 'pass')
969        with support.SuppressCrashReport():
970            ok, stdout, stderr = assert_python_failure(*args)
971
972        if b'ValueError: the number of frames must be in range' in stderr:
973            return
974        if b'-X tracemalloc=NFRAME: invalid number of frames' in stderr:
975            return
976        self.fail(f"unexpected output: {stderr!a}")
977
978    def test_sys_xoptions_invalid(self):
979        for nframe in INVALID_NFRAME:
980            with self.subTest(nframe=nframe):
981                self.check_sys_xoptions_invalid(nframe)
982
983    @unittest.skipIf(_testcapi is None, 'need _testcapi')
984    def test_pymem_alloc0(self):
985        # Issue #21639: Check that PyMem_Malloc(0) with tracemalloc enabled
986        # does not crash.
987        code = 'import _testcapi; _testcapi.test_pymem_alloc0(); 1'
988        assert_python_ok('-X', 'tracemalloc', '-c', code)
989
990
991@unittest.skipIf(_testcapi is None, 'need _testcapi')
992class TestCAPI(unittest.TestCase):
993    maxDiff = 80 * 20
994
995    def setUp(self):
996        if tracemalloc.is_tracing():
997            self.skipTest("tracemalloc must be stopped before the test")
998
999        self.domain = 5
1000        self.size = 123
1001        self.obj = allocate_bytes(self.size)[0]
1002
1003        # for the type "object", id(obj) is the address of its memory block.
1004        # This type is not tracked by the garbage collector
1005        self.ptr = id(self.obj)
1006
1007    def tearDown(self):
1008        tracemalloc.stop()
1009
1010    def get_traceback(self):
1011        frames = _testcapi.tracemalloc_get_traceback(self.domain, self.ptr)
1012        if frames is not None:
1013            return tracemalloc.Traceback(frames)
1014        else:
1015            return None
1016
1017    def track(self, release_gil=False, nframe=1):
1018        frames = get_frames(nframe, 1)
1019        _testcapi.tracemalloc_track(self.domain, self.ptr, self.size,
1020                                    release_gil)
1021        return frames
1022
1023    def untrack(self):
1024        _testcapi.tracemalloc_untrack(self.domain, self.ptr)
1025
1026    def get_traced_memory(self):
1027        # Get the traced size in the domain
1028        snapshot = tracemalloc.take_snapshot()
1029        domain_filter = tracemalloc.DomainFilter(True, self.domain)
1030        snapshot = snapshot.filter_traces([domain_filter])
1031        return sum(trace.size for trace in snapshot.traces)
1032
1033    def check_track(self, release_gil):
1034        nframe = 5
1035        tracemalloc.start(nframe)
1036
1037        size = tracemalloc.get_traced_memory()[0]
1038
1039        frames = self.track(release_gil, nframe)
1040        self.assertEqual(self.get_traceback(),
1041                         tracemalloc.Traceback(frames))
1042
1043        self.assertEqual(self.get_traced_memory(), self.size)
1044
1045    def test_track(self):
1046        self.check_track(False)
1047
1048    def test_track_without_gil(self):
1049        # check that calling _PyTraceMalloc_Track() without holding the GIL
1050        # works too
1051        self.check_track(True)
1052
1053    def test_track_already_tracked(self):
1054        nframe = 5
1055        tracemalloc.start(nframe)
1056
1057        # track a first time
1058        self.track()
1059
1060        # calling _PyTraceMalloc_Track() must remove the old trace and add
1061        # a new trace with the new traceback
1062        frames = self.track(nframe=nframe)
1063        self.assertEqual(self.get_traceback(),
1064                         tracemalloc.Traceback(frames))
1065
1066    def test_untrack(self):
1067        tracemalloc.start()
1068
1069        self.track()
1070        self.assertIsNotNone(self.get_traceback())
1071        self.assertEqual(self.get_traced_memory(), self.size)
1072
1073        # untrack must remove the trace
1074        self.untrack()
1075        self.assertIsNone(self.get_traceback())
1076        self.assertEqual(self.get_traced_memory(), 0)
1077
1078        # calling _PyTraceMalloc_Untrack() multiple times must not crash
1079        self.untrack()
1080        self.untrack()
1081
1082    def test_stop_track(self):
1083        tracemalloc.start()
1084        tracemalloc.stop()
1085
1086        with self.assertRaises(RuntimeError):
1087            self.track()
1088        self.assertIsNone(self.get_traceback())
1089
1090    def test_stop_untrack(self):
1091        tracemalloc.start()
1092        self.track()
1093
1094        tracemalloc.stop()
1095        with self.assertRaises(RuntimeError):
1096            self.untrack()
1097
1098
1099if __name__ == "__main__":
1100    unittest.main()
1101