xref: /aosp_15_r20/external/pigweed/pw_thread/py/thread_analyzer_test.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1#!/usr/bin/env python3
2# Copyright 2021 The Pigweed Authors
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5# use this file except in compliance with the License. You may obtain a copy of
6# the License at
7#
8#     https://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations under
14# the License.
15"""Tests for the thread analyzer."""
16
17import unittest
18from pw_thread.thread_analyzer import ThreadInfo, ThreadSnapshotAnalyzer
19from pw_thread_protos import thread_pb2
20import pw_tokenizer
21from pw_tokenizer import tokens
22
23
24class ThreadInfoTest(unittest.TestCase):
25    """Tests that the ThreadInfo class produces expected results."""
26
27    def test_empty_thread(self):
28        thread_info = ThreadInfo(thread_pb2.Thread())
29        expected = '\n'.join(
30            (
31                'Est CPU usage: unknown',
32                'Stack info',
33                '  Current usage:   0x???????? - 0x???????? (size unknown)',
34                '  Est peak usage:  size unknown',
35                '  Stack limits:    0x???????? - 0x???????? (size unknown)',
36            )
37        )
38        self.assertFalse(thread_info.has_stack_size_limit())
39        self.assertFalse(thread_info.has_stack_used())
40        self.assertEqual(expected, str(thread_info))
41
42    def test_thread_with_cpu_usage(self):
43        thread = thread_pb2.Thread()
44        thread.cpu_usage_hundredths = 1234
45        thread_info = ThreadInfo(thread)
46
47        expected = '\n'.join(
48            (
49                'Est CPU usage: 12.34%',
50                'Stack info',
51                '  Current usage:   0x???????? - 0x???????? (size unknown)',
52                '  Est peak usage:  size unknown',
53                '  Stack limits:    0x???????? - 0x???????? (size unknown)',
54            )
55        )
56        self.assertFalse(thread_info.has_stack_size_limit())
57        self.assertFalse(thread_info.has_stack_used())
58        self.assertEqual(expected, str(thread_info))
59
60    def test_thread_with_stack_pointer(self):
61        thread = thread_pb2.Thread()
62        thread.stack_pointer = 0x5AC6A86C
63        thread_info = ThreadInfo(thread)
64
65        expected = '\n'.join(
66            (
67                'Est CPU usage: unknown',
68                'Stack info',
69                '  Current usage:   0x???????? - 0x5ac6a86c (size unknown)',
70                '  Est peak usage:  size unknown',
71                '  Stack limits:    0x???????? - 0x???????? (size unknown)',
72            )
73        )
74        self.assertFalse(thread_info.has_stack_size_limit())
75        self.assertFalse(thread_info.has_stack_used())
76        self.assertEqual(expected, str(thread_info))
77
78    def test_thread_with_stack_usage(self):
79        thread = thread_pb2.Thread()
80        thread.stack_start_pointer = 0x5AC6B86C
81        thread.stack_pointer = 0x5AC6A86C
82        thread_info = ThreadInfo(thread)
83
84        expected = '\n'.join(
85            (
86                'Est CPU usage: unknown',
87                'Stack info',
88                '  Current usage:   0x5ac6b86c - 0x5ac6a86c (4096 bytes)',
89                '  Est peak usage:  size unknown',
90                '  Stack limits:    0x5ac6b86c - 0x???????? (size unknown)',
91            )
92        )
93        self.assertFalse(thread_info.has_stack_size_limit())
94        self.assertTrue(thread_info.has_stack_used())
95        self.assertEqual(expected, str(thread_info))
96
97    def test_thread_with_all_stack_info(self):
98        thread = thread_pb2.Thread()
99        thread.stack_start_pointer = 0x5AC6B86C
100        thread.stack_end_pointer = 0x5AC6986C
101        thread.stack_pointer = 0x5AC6A86C
102        thread_info = ThreadInfo(thread)
103
104        # pylint: disable=line-too-long
105        expected = '\n'.join(
106            (
107                'Est CPU usage: unknown',
108                'Stack info',
109                '  Current usage:   0x5ac6b86c - 0x5ac6a86c (4096 bytes, 50.00%)',
110                '  Est peak usage:  size unknown',
111                '  Stack limits:    0x5ac6b86c - 0x5ac6986c (8192 bytes)',
112            )
113        )
114        # pylint: enable=line-too-long
115        self.assertTrue(thread_info.has_stack_size_limit())
116        self.assertTrue(thread_info.has_stack_used())
117        self.assertEqual(expected, str(thread_info))
118
119
120class ThreadSnapshotAnalyzerTest(unittest.TestCase):
121    """Tests that the ThreadSnapshotAnalyzer class produces expected results."""
122
123    def test_no_threads(self):
124        analyzer = ThreadSnapshotAnalyzer(thread_pb2.SnapshotThreadInfo())
125        self.assertEqual('', str(analyzer))
126
127    def test_one_empty_thread(self):
128        snapshot = thread_pb2.SnapshotThreadInfo()
129        snapshot.threads.append(thread_pb2.Thread())
130        expected = '\n'.join(
131            (
132                'Thread State',
133                '  1 thread running.',
134                '',
135                'Thread (UNKNOWN): [unnamed thread]',
136                'Est CPU usage: unknown',
137                'Stack info',
138                '  Current usage:   0x???????? - 0x???????? (size unknown)',
139                '  Est peak usage:  size unknown',
140                '  Stack limits:    0x???????? - 0x???????? (size unknown)',
141                '',
142            )
143        )
144        analyzer = ThreadSnapshotAnalyzer(snapshot)
145        self.assertEqual(analyzer.active_thread(), None)
146        self.assertEqual(str(ThreadSnapshotAnalyzer(snapshot)), expected)
147
148    def test_two_threads(self):
149        """Ensures multiple threads are printed correctly."""
150        snapshot = thread_pb2.SnapshotThreadInfo()
151
152        temp_thread = thread_pb2.Thread()
153        temp_thread.name = 'Idle'.encode()
154        temp_thread.state = thread_pb2.ThreadState.Enum.READY
155        temp_thread.stack_start_pointer = 0x2001AC00
156        temp_thread.stack_end_pointer = 0x2001AA00
157        temp_thread.stack_pointer = 0x2001AB0C
158        temp_thread.stack_pointer_est_peak = 0x2001AA00
159        snapshot.threads.append(temp_thread)
160
161        temp_thread = thread_pb2.Thread()
162        temp_thread.name = 'Alice'.encode()
163        temp_thread.stack_start_pointer = 0x2001B000
164        temp_thread.stack_pointer = 0x2001AE20
165        temp_thread.state = thread_pb2.ThreadState.Enum.BLOCKED
166        snapshot.threads.append(temp_thread)
167
168        # pylint: disable=line-too-long
169        expected = '\n'.join(
170            (
171                'Thread State',
172                '  2 threads running.',
173                '',
174                'Thread (READY): Idle',
175                'Est CPU usage: unknown',
176                'Stack info',
177                '  Current usage:   0x2001ac00 - 0x2001ab0c (244 bytes, 47.66%)',
178                '  Est peak usage:  512 bytes, 100.00%',
179                '  Stack limits:    0x2001ac00 - 0x2001aa00 (512 bytes)',
180                '',
181                'Thread (BLOCKED): Alice',
182                'Est CPU usage: unknown',
183                'Stack info',
184                '  Current usage:   0x2001b000 - 0x2001ae20 (480 bytes)',
185                '  Est peak usage:  size unknown',
186                '  Stack limits:    0x2001b000 - 0x???????? (size unknown)',
187                '',
188            )
189        )
190        # pylint: enable=line-too-long
191        analyzer = ThreadSnapshotAnalyzer(snapshot)
192        self.assertEqual(analyzer.active_thread(), None)
193        self.assertEqual(str(ThreadSnapshotAnalyzer(snapshot)), expected)
194
195    def test_interrupts_with_thread(self):
196        """Ensures interrupts are properly reported as active."""
197        snapshot = thread_pb2.SnapshotThreadInfo()
198
199        temp_thread = thread_pb2.Thread()
200        temp_thread.name = 'Idle'.encode()
201        temp_thread.state = thread_pb2.ThreadState.Enum.READY
202        temp_thread.stack_start_pointer = 0x2001AC00
203        temp_thread.stack_end_pointer = 0x2001AA00
204        temp_thread.stack_pointer = 0x2001AB0C
205        temp_thread.stack_pointer_est_peak = 0x2001AA00
206        snapshot.threads.append(temp_thread)
207
208        temp_thread = thread_pb2.Thread()
209        temp_thread.name = 'Main/Handler'.encode()
210        temp_thread.stack_start_pointer = 0x2001B000
211        temp_thread.stack_pointer = 0x2001AE20
212        temp_thread.state = thread_pb2.ThreadState.Enum.INTERRUPT_HANDLER
213        snapshot.threads.append(temp_thread)
214
215        # pylint: disable=line-too-long
216        expected = '\n'.join(
217            (
218                'Thread State',
219                '  2 threads running, Main/Handler active at the time of capture.',
220                '                     ~~~~~~~~~~~~',
221                '',
222                # Ensure the active thread is moved to the top of the list.
223                'Thread (INTERRUPT_HANDLER): Main/Handler <-- [ACTIVE]',
224                'Est CPU usage: unknown',
225                'Stack info',
226                '  Current usage:   0x2001b000 - 0x2001ae20 (480 bytes)',
227                '  Est peak usage:  size unknown',
228                '  Stack limits:    0x2001b000 - 0x???????? (size unknown)',
229                '',
230                'Thread (READY): Idle',
231                'Est CPU usage: unknown',
232                'Stack info',
233                '  Current usage:   0x2001ac00 - 0x2001ab0c (244 bytes, 47.66%)',
234                '  Est peak usage:  512 bytes, 100.00%',
235                '  Stack limits:    0x2001ac00 - 0x2001aa00 (512 bytes)',
236                '',
237            )
238        )
239        # pylint: enable=line-too-long
240        analyzer = ThreadSnapshotAnalyzer(snapshot)
241        self.assertEqual(analyzer.active_thread(), temp_thread)
242        self.assertEqual(str(ThreadSnapshotAnalyzer(snapshot)), expected)
243
244    def test_active_thread(self):
245        """Ensures the 'active' thread is highlighted."""
246        snapshot = thread_pb2.SnapshotThreadInfo()
247
248        temp_thread = thread_pb2.Thread()
249        temp_thread.name = 'Idle'.encode()
250        temp_thread.state = thread_pb2.ThreadState.Enum.READY
251        temp_thread.stack_start_pointer = 0x2001AC00
252        temp_thread.stack_end_pointer = 0x2001AA00
253        temp_thread.stack_pointer = 0x2001AB0C
254        temp_thread.stack_pointer_est_peak = 0x2001AC00 + 0x100
255        snapshot.threads.append(temp_thread)
256
257        temp_thread = thread_pb2.Thread()
258        temp_thread.name = 'Main/Handler'.encode()
259        temp_thread.active = True
260        temp_thread.stack_start_pointer = 0x2001B000
261        temp_thread.stack_pointer = 0x2001AE20
262        temp_thread.stack_pointer_est_peak = 0x2001B000 + 0x200
263        temp_thread.state = thread_pb2.ThreadState.Enum.INTERRUPT_HANDLER
264        snapshot.threads.append(temp_thread)
265
266        # pylint: disable=line-too-long
267        expected = '\n'.join(
268            (
269                'Thread State',
270                '  2 threads running, Main/Handler active at the time of capture.',
271                '                     ~~~~~~~~~~~~',
272                '',
273                # Ensure the active thread is moved to the top of the list.
274                'Thread (INTERRUPT_HANDLER): Main/Handler <-- [ACTIVE]',
275                'Est CPU usage: unknown',
276                'Stack info',
277                '  Current usage:   0x2001b000 - 0x2001ae20 (480 bytes)',
278                '  Est peak usage:  512 bytes',
279                '  Stack limits:    0x2001b000 - 0x???????? (size unknown)',
280                '',
281                'Thread (READY): Idle',
282                'Est CPU usage: unknown',
283                'Stack info',
284                '  Current usage:   0x2001ac00 - 0x2001ab0c (244 bytes, 47.66%)',
285                '  Est peak usage:  256 bytes, 50.00%',
286                '  Stack limits:    0x2001ac00 - 0x2001aa00 (512 bytes)',
287                '',
288            )
289        )
290        # pylint: enable=line-too-long
291        analyzer = ThreadSnapshotAnalyzer(snapshot)
292
293        # Ensure the active thread is found.
294        self.assertEqual(analyzer.active_thread(), temp_thread)
295        self.assertEqual(str(ThreadSnapshotAnalyzer(snapshot)), expected)
296
297    def test_tokenized_thread_name(self):
298        """Ensures a tokenized thread name is detokenized."""
299        snapshot = thread_pb2.SnapshotThreadInfo()
300        detokenizer = pw_tokenizer.Detokenizer(
301            tokens.Database(
302                [
303                    tokens.TokenizedStringEntry(
304                        0x46BE7497, 'The thread for Kuzco'
305                    ),
306                ]
307            )
308        )
309
310        temp_thread = thread_pb2.Thread()
311        temp_thread.name = b'\x97\x74\xBE\x46'
312        snapshot.threads.append(temp_thread)
313        temp_thread.name = b'\x5D\xA8\x66\xAE'
314        snapshot.threads.append(temp_thread)
315
316        # pylint: disable=line-too-long
317        expected = '\n'.join(
318            (
319                'Thread State',
320                '  2 threads running.',
321                '',
322                'Thread (UNKNOWN): The thread for Kuzco',
323                'Est CPU usage: unknown',
324                'Stack info',
325                '  Current usage:   0x???????? - 0x???????? (size unknown)',
326                '  Est peak usage:  size unknown',
327                '  Stack limits:    0x???????? - 0x???????? (size unknown)',
328                '',
329                'Thread (UNKNOWN): $Xahmrg==',
330                'Est CPU usage: unknown',
331                'Stack info',
332                '  Current usage:   0x???????? - 0x???????? (size unknown)',
333                '  Est peak usage:  size unknown',
334                '  Stack limits:    0x???????? - 0x???????? (size unknown)',
335                '',
336            )
337        )
338        # pylint: enable=line-too-long
339        analyzer = ThreadSnapshotAnalyzer(snapshot, tokenizer_db=detokenizer)
340
341        # Ensure text dump matches expected contents.
342        self.assertEqual(str(analyzer), expected)
343
344    def test_no_db_tokenized_thread_name(self):
345        """Ensures a tokenized thread name is detokenized."""
346        snapshot = thread_pb2.SnapshotThreadInfo()
347
348        temp_thread = thread_pb2.Thread()
349        temp_thread.name = b'\x97\x74\xBE\x46'
350        snapshot.threads.append(temp_thread)
351        temp_thread.name = b'\x5D\xA8\x66\xAE'
352        snapshot.threads.append(temp_thread)
353
354        # pylint: disable=line-too-long
355        expected = '\n'.join(
356            (
357                'Thread State',
358                '  2 threads running.',
359                '',
360                'Thread (UNKNOWN): $l3S+Rg==',
361                'Est CPU usage: unknown',
362                'Stack info',
363                '  Current usage:   0x???????? - 0x???????? (size unknown)',
364                '  Est peak usage:  size unknown',
365                '  Stack limits:    0x???????? - 0x???????? (size unknown)',
366                '',
367                'Thread (UNKNOWN): $Xahmrg==',
368                'Est CPU usage: unknown',
369                'Stack info',
370                '  Current usage:   0x???????? - 0x???????? (size unknown)',
371                '  Est peak usage:  size unknown',
372                '  Stack limits:    0x???????? - 0x???????? (size unknown)',
373                '',
374            )
375        )
376        # pylint: enable=line-too-long
377        analyzer = ThreadSnapshotAnalyzer(snapshot)
378
379        # Ensure text dump matches expected contents.
380        self.assertEqual(str(analyzer), expected)
381
382
383if __name__ == '__main__':
384    unittest.main()
385