xref: /aosp_15_r20/external/pigweed/pw_thread/py/pw_thread/thread_analyzer.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1# Copyright 2021 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Library to analyze and dump Thread protos and Thread snapshots into text."""
15
16import binascii
17from typing import Callable, Mapping
18import pw_tokenizer
19from pw_symbolizer import LlvmSymbolizer, Symbolizer
20from pw_tokenizer import proto as proto_detokenizer
21from pw_thread_protos import thread_pb2
22
23THREAD_STATE_TO_STRING: Mapping[int, str] = {
24    thread_pb2.ThreadState.Enum.UNKNOWN: 'UNKNOWN',
25    thread_pb2.ThreadState.Enum.INTERRUPT_HANDLER: 'INTERRUPT_HANDLER',
26    thread_pb2.ThreadState.Enum.RUNNING: 'RUNNING',
27    thread_pb2.ThreadState.Enum.READY: 'READY',
28    thread_pb2.ThreadState.Enum.SUSPENDED: 'SUSPENDED',
29    thread_pb2.ThreadState.Enum.BLOCKED: 'BLOCKED',
30    thread_pb2.ThreadState.Enum.INACTIVE: 'INACTIVE',
31}
32
33
34def process_snapshot(
35    serialized_snapshot: bytes,
36    tokenizer_db: pw_tokenizer.Detokenizer | None = None,
37    symbolizer: Symbolizer | None = None,
38    user_processing_callback: Callable[[bytes], str] | None = None,
39) -> str:
40    """Processes snapshot threads, producing a multi-line string."""
41    captured_threads = thread_pb2.SnapshotThreadInfo()
42    captured_threads.ParseFromString(serialized_snapshot)
43    if symbolizer is None:
44        symbolizer = LlvmSymbolizer()
45
46    return str(
47        ThreadSnapshotAnalyzer(
48            captured_threads, tokenizer_db, symbolizer, user_processing_callback
49        )
50    )
51
52
53class ThreadInfo:
54    """Provides CPU and stack information that can be inferred from a Thread."""
55
56    _UNKNOWN_VALUE_STR = '0x' + '?' * 8
57
58    def __init__(self, thread: thread_pb2.Thread):
59        self._thread = thread
60
61    def _cpu_used_str(self) -> str:
62        if not self._thread.HasField('cpu_usage_hundredths'):
63            return 'unknown'
64        cpu_last_percent = self._thread.cpu_usage_hundredths / 100
65        return f'{cpu_last_percent:.2f}%'
66
67    def _stack_size_limit_limit_str(self) -> str:
68        if not self.has_stack_size_limit():
69            return 'size unknown'
70
71        return f'{self.stack_size_limit()} bytes'
72
73    def _stack_used_str(self) -> str:
74        if not self.has_stack_used():
75            return 'size unknown'
76
77        used_str = f'{self.stack_used()} bytes'
78        if not self.has_stack_size_limit():
79            return used_str
80        used_str += f', {100*self.stack_used()/self.stack_size_limit():.2f}%'
81        return used_str
82
83    def _stack_pointer_est_peak_str(self) -> str:
84        if not self.has_stack_pointer_est_peak():
85            return 'size unknown'
86
87        high_used_str = f'{self.stack_pointer_est_peak()} bytes'
88        if not self.has_stack_size_limit():
89            return high_used_str
90        high_water_mark_percent = (
91            100 * self.stack_pointer_est_peak() / self.stack_size_limit()
92        )
93        high_used_str += f', {high_water_mark_percent:.2f}%'
94        return high_used_str
95
96    def _stack_used_range_str(self) -> str:
97        start_str = (
98            f'0x{self._thread.stack_start_pointer:08x}'
99            if self._thread.HasField('stack_start_pointer')
100            else ThreadInfo._UNKNOWN_VALUE_STR
101        )
102        end_str = (
103            f'0x{self._thread.stack_pointer:08x}'
104            if self._thread.HasField('stack_pointer')
105            else ThreadInfo._UNKNOWN_VALUE_STR
106        )
107
108        # TODO(amontanez): Would be nice to represent stack growth direction.
109        return f'{start_str} - {end_str} ({self._stack_used_str()})'
110
111    def _stack_limit_range_str(self) -> str:
112        start_str = (
113            f'0x{self._thread.stack_start_pointer:08x}'
114            if self._thread.HasField('stack_start_pointer')
115            else ThreadInfo._UNKNOWN_VALUE_STR
116        )
117        end_str = (
118            f'0x{self._thread.stack_end_pointer:08x}'
119            if self._thread.HasField('stack_end_pointer')
120            else ThreadInfo._UNKNOWN_VALUE_STR
121        )
122
123        # TODO(amontanez): Would be nice to represent stack growth direction.
124        return f'{start_str} - {end_str} ({self._stack_size_limit_limit_str()})'
125
126    def _stack_pointer_str(self) -> str:
127        return (
128            f'0x{self._thread.stack_end_pointer:08x}'
129            if self._thread.HasField('stack_pointer')
130            else ThreadInfo._UNKNOWN_VALUE_STR
131        )
132
133    def has_stack_size_limit(self) -> bool:
134        """Returns true if there's enough info to calculate stack size."""
135        return self._thread.HasField(
136            'stack_start_pointer'
137        ) and self._thread.HasField('stack_end_pointer')
138
139    def stack_size_limit(self) -> int:
140        """Returns the stack size limit in bytes.
141
142        Precondition:
143            has_stack_size_limit() must be true.
144        """
145        assert self.has_stack_size_limit(), 'Missing stack size information'
146        return abs(
147            self._thread.stack_start_pointer - self._thread.stack_end_pointer
148        )
149
150    def has_stack_used(self) -> bool:
151        """Returns true if there's enough info to calculate stack usage."""
152        return self._thread.HasField(
153            'stack_start_pointer'
154        ) and self._thread.HasField('stack_pointer')
155
156    def stack_used(self) -> int:
157        """Returns the stack usage in bytes.
158
159        Precondition:
160            has_stack_used() must be true.
161        """
162        assert self.has_stack_used(), 'Missing stack usage information'
163        return abs(
164            self._thread.stack_start_pointer - self._thread.stack_pointer
165        )
166
167    def has_stack_pointer_est_peak(self) -> bool:
168        """Returns true if there's enough info to calculate estimate
169        used stack.
170        """
171        return self._thread.HasField(
172            'stack_start_pointer'
173        ) and self._thread.HasField('stack_pointer_est_peak')
174
175    def stack_pointer_est_peak(self) -> int:
176        """Returns the max estimated used stack usage in bytes.
177
178        Precondition:
179            has_stack_estimated_used_bytes() must be true.
180        """
181        assert self.has_stack_pointer_est_peak(), 'Missing stack est. peak'
182        return abs(
183            self._thread.stack_start_pointer
184            - self._thread.stack_pointer_est_peak
185        )
186
187    def __str__(self) -> str:
188        output = [
189            f'Est CPU usage: {self._cpu_used_str()}',
190            'Stack info',
191            f'  Current usage:   {self._stack_used_range_str()}',
192            f'  Est peak usage:  {self._stack_pointer_est_peak_str()}',
193            f'  Stack limits:    {self._stack_limit_range_str()}',
194        ]
195        return '\n'.join(output)
196
197
198class ThreadSnapshotAnalyzer:
199    """This class simplifies dumping contents of a snapshot Metadata message."""
200
201    def __init__(
202        self,
203        threads: thread_pb2.SnapshotThreadInfo,
204        tokenizer_db: pw_tokenizer.Detokenizer | None = None,
205        symbolizer: Symbolizer | None = None,
206        user_processing_callback: Callable[[bytes], str] | None = None,
207    ):
208        self._threads = threads.threads
209        self._tokenizer_db = (
210            tokenizer_db
211            if tokenizer_db is not None
212            else pw_tokenizer.Detokenizer(None)
213        )
214        if symbolizer is not None:
215            self._symbolizer = symbolizer
216        else:
217            self._symbolizer = LlvmSymbolizer()
218        self._user_processing_callback = user_processing_callback
219
220        for thread in self._threads:
221            proto_detokenizer.detokenize_fields(self._tokenizer_db, thread)
222
223    def active_thread(self) -> thread_pb2.Thread | None:
224        """The thread that requested the snapshot capture."""
225        # First check if an interrupt handler was active.
226        for thread in self._threads:
227            if thread.state == thread_pb2.ThreadState.Enum.INTERRUPT_HANDLER:
228                return thread
229            if thread.active:  # The deprecated legacy way to report this.
230                return thread
231
232        # If not, search for a running thread.
233        for thread in self._threads:
234            if thread.state == thread_pb2.ThreadState.Enum.RUNNING:
235                return thread
236
237        return None
238
239    def __str__(self) -> str:
240        """outputs a pw.snapshot.Metadata proto as a multi-line string."""
241        output: list[str] = []
242        if not self._threads:
243            return ''
244
245        output.append('Thread State')
246        plural = '' if len(self._threads) == 1 else 's'
247        thread_state_overview = f'  {len(self._threads)} thread{plural} running'
248        requesting_thread = self.active_thread()
249        if not requesting_thread:
250            thread_state_overview += '.'
251            output.append(thread_state_overview)
252        else:
253            thread_state_overview += ', '
254            underline = ' ' * len(thread_state_overview) + '~' * len(
255                requesting_thread.name.decode()
256            )
257            thread_state_overview += (
258                f'{requesting_thread.name.decode()}'
259                ' active at the time of capture.'
260            )
261            output.append(thread_state_overview)
262            output.append(underline)
263
264        output.append('')
265
266        # Put the active thread at the front.
267        requesting_thread = self.active_thread()
268        if requesting_thread is not None:
269            self._threads.remove(requesting_thread)
270            self._threads.insert(0, requesting_thread)
271
272        for thread in self._threads:
273            thread_name = thread.name.decode()
274            if not thread_name:
275                thread_name = '[unnamed thread]'
276            thread_headline = (
277                'Thread '
278                f'({THREAD_STATE_TO_STRING[thread.state]}): '
279                f'{thread_name}'
280            )
281            if self.active_thread() == thread:
282                thread_headline += ' <-- [ACTIVE]'
283            output.append(thread_headline)
284            output.append(str(ThreadInfo(thread)))
285            if thread.raw_backtrace:
286                output.append(
287                    self._symbolizer.dump_stack_trace(thread.raw_backtrace)
288                )
289            if thread.raw_stack:
290                output.append('Raw Stack')
291                output.append(
292                    binascii.hexlify(thread.raw_stack, b'\n', 32).decode(
293                        'utf-8'
294                    )
295                )
296            if self._user_processing_callback is not None:
297                output.append(
298                    self._user_processing_callback(thread.SerializeToString())
299                )
300
301            # Blank line between threads for nicer formatting.
302            output.append('')
303
304        return '\n'.join(output)
305