1# Copyright 2021 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Library to analyze and dump Thread protos and Thread snapshots into text.""" 15 16import binascii 17from typing import Callable, Mapping 18import pw_tokenizer 19from pw_symbolizer import LlvmSymbolizer, Symbolizer 20from pw_tokenizer import proto as proto_detokenizer 21from pw_thread_protos import thread_pb2 22 23THREAD_STATE_TO_STRING: Mapping[int, str] = { 24 thread_pb2.ThreadState.Enum.UNKNOWN: 'UNKNOWN', 25 thread_pb2.ThreadState.Enum.INTERRUPT_HANDLER: 'INTERRUPT_HANDLER', 26 thread_pb2.ThreadState.Enum.RUNNING: 'RUNNING', 27 thread_pb2.ThreadState.Enum.READY: 'READY', 28 thread_pb2.ThreadState.Enum.SUSPENDED: 'SUSPENDED', 29 thread_pb2.ThreadState.Enum.BLOCKED: 'BLOCKED', 30 thread_pb2.ThreadState.Enum.INACTIVE: 'INACTIVE', 31} 32 33 34def process_snapshot( 35 serialized_snapshot: bytes, 36 tokenizer_db: pw_tokenizer.Detokenizer | None = None, 37 symbolizer: Symbolizer | None = None, 38 user_processing_callback: Callable[[bytes], str] | None = None, 39) -> str: 40 """Processes snapshot threads, producing a multi-line string.""" 41 captured_threads = thread_pb2.SnapshotThreadInfo() 42 captured_threads.ParseFromString(serialized_snapshot) 43 if symbolizer is None: 44 symbolizer = LlvmSymbolizer() 45 46 return str( 47 ThreadSnapshotAnalyzer( 48 captured_threads, tokenizer_db, symbolizer, user_processing_callback 49 ) 50 ) 51 52 53class ThreadInfo: 54 """Provides CPU and stack information that can be inferred from a Thread.""" 55 56 _UNKNOWN_VALUE_STR = '0x' + '?' * 8 57 58 def __init__(self, thread: thread_pb2.Thread): 59 self._thread = thread 60 61 def _cpu_used_str(self) -> str: 62 if not self._thread.HasField('cpu_usage_hundredths'): 63 return 'unknown' 64 cpu_last_percent = self._thread.cpu_usage_hundredths / 100 65 return f'{cpu_last_percent:.2f}%' 66 67 def _stack_size_limit_limit_str(self) -> str: 68 if not self.has_stack_size_limit(): 69 return 'size unknown' 70 71 return f'{self.stack_size_limit()} bytes' 72 73 def _stack_used_str(self) -> str: 74 if not self.has_stack_used(): 75 return 'size unknown' 76 77 used_str = f'{self.stack_used()} bytes' 78 if not self.has_stack_size_limit(): 79 return used_str 80 used_str += f', {100*self.stack_used()/self.stack_size_limit():.2f}%' 81 return used_str 82 83 def _stack_pointer_est_peak_str(self) -> str: 84 if not self.has_stack_pointer_est_peak(): 85 return 'size unknown' 86 87 high_used_str = f'{self.stack_pointer_est_peak()} bytes' 88 if not self.has_stack_size_limit(): 89 return high_used_str 90 high_water_mark_percent = ( 91 100 * self.stack_pointer_est_peak() / self.stack_size_limit() 92 ) 93 high_used_str += f', {high_water_mark_percent:.2f}%' 94 return high_used_str 95 96 def _stack_used_range_str(self) -> str: 97 start_str = ( 98 f'0x{self._thread.stack_start_pointer:08x}' 99 if self._thread.HasField('stack_start_pointer') 100 else ThreadInfo._UNKNOWN_VALUE_STR 101 ) 102 end_str = ( 103 f'0x{self._thread.stack_pointer:08x}' 104 if self._thread.HasField('stack_pointer') 105 else ThreadInfo._UNKNOWN_VALUE_STR 106 ) 107 108 # TODO(amontanez): Would be nice to represent stack growth direction. 109 return f'{start_str} - {end_str} ({self._stack_used_str()})' 110 111 def _stack_limit_range_str(self) -> str: 112 start_str = ( 113 f'0x{self._thread.stack_start_pointer:08x}' 114 if self._thread.HasField('stack_start_pointer') 115 else ThreadInfo._UNKNOWN_VALUE_STR 116 ) 117 end_str = ( 118 f'0x{self._thread.stack_end_pointer:08x}' 119 if self._thread.HasField('stack_end_pointer') 120 else ThreadInfo._UNKNOWN_VALUE_STR 121 ) 122 123 # TODO(amontanez): Would be nice to represent stack growth direction. 124 return f'{start_str} - {end_str} ({self._stack_size_limit_limit_str()})' 125 126 def _stack_pointer_str(self) -> str: 127 return ( 128 f'0x{self._thread.stack_end_pointer:08x}' 129 if self._thread.HasField('stack_pointer') 130 else ThreadInfo._UNKNOWN_VALUE_STR 131 ) 132 133 def has_stack_size_limit(self) -> bool: 134 """Returns true if there's enough info to calculate stack size.""" 135 return self._thread.HasField( 136 'stack_start_pointer' 137 ) and self._thread.HasField('stack_end_pointer') 138 139 def stack_size_limit(self) -> int: 140 """Returns the stack size limit in bytes. 141 142 Precondition: 143 has_stack_size_limit() must be true. 144 """ 145 assert self.has_stack_size_limit(), 'Missing stack size information' 146 return abs( 147 self._thread.stack_start_pointer - self._thread.stack_end_pointer 148 ) 149 150 def has_stack_used(self) -> bool: 151 """Returns true if there's enough info to calculate stack usage.""" 152 return self._thread.HasField( 153 'stack_start_pointer' 154 ) and self._thread.HasField('stack_pointer') 155 156 def stack_used(self) -> int: 157 """Returns the stack usage in bytes. 158 159 Precondition: 160 has_stack_used() must be true. 161 """ 162 assert self.has_stack_used(), 'Missing stack usage information' 163 return abs( 164 self._thread.stack_start_pointer - self._thread.stack_pointer 165 ) 166 167 def has_stack_pointer_est_peak(self) -> bool: 168 """Returns true if there's enough info to calculate estimate 169 used stack. 170 """ 171 return self._thread.HasField( 172 'stack_start_pointer' 173 ) and self._thread.HasField('stack_pointer_est_peak') 174 175 def stack_pointer_est_peak(self) -> int: 176 """Returns the max estimated used stack usage in bytes. 177 178 Precondition: 179 has_stack_estimated_used_bytes() must be true. 180 """ 181 assert self.has_stack_pointer_est_peak(), 'Missing stack est. peak' 182 return abs( 183 self._thread.stack_start_pointer 184 - self._thread.stack_pointer_est_peak 185 ) 186 187 def __str__(self) -> str: 188 output = [ 189 f'Est CPU usage: {self._cpu_used_str()}', 190 'Stack info', 191 f' Current usage: {self._stack_used_range_str()}', 192 f' Est peak usage: {self._stack_pointer_est_peak_str()}', 193 f' Stack limits: {self._stack_limit_range_str()}', 194 ] 195 return '\n'.join(output) 196 197 198class ThreadSnapshotAnalyzer: 199 """This class simplifies dumping contents of a snapshot Metadata message.""" 200 201 def __init__( 202 self, 203 threads: thread_pb2.SnapshotThreadInfo, 204 tokenizer_db: pw_tokenizer.Detokenizer | None = None, 205 symbolizer: Symbolizer | None = None, 206 user_processing_callback: Callable[[bytes], str] | None = None, 207 ): 208 self._threads = threads.threads 209 self._tokenizer_db = ( 210 tokenizer_db 211 if tokenizer_db is not None 212 else pw_tokenizer.Detokenizer(None) 213 ) 214 if symbolizer is not None: 215 self._symbolizer = symbolizer 216 else: 217 self._symbolizer = LlvmSymbolizer() 218 self._user_processing_callback = user_processing_callback 219 220 for thread in self._threads: 221 proto_detokenizer.detokenize_fields(self._tokenizer_db, thread) 222 223 def active_thread(self) -> thread_pb2.Thread | None: 224 """The thread that requested the snapshot capture.""" 225 # First check if an interrupt handler was active. 226 for thread in self._threads: 227 if thread.state == thread_pb2.ThreadState.Enum.INTERRUPT_HANDLER: 228 return thread 229 if thread.active: # The deprecated legacy way to report this. 230 return thread 231 232 # If not, search for a running thread. 233 for thread in self._threads: 234 if thread.state == thread_pb2.ThreadState.Enum.RUNNING: 235 return thread 236 237 return None 238 239 def __str__(self) -> str: 240 """outputs a pw.snapshot.Metadata proto as a multi-line string.""" 241 output: list[str] = [] 242 if not self._threads: 243 return '' 244 245 output.append('Thread State') 246 plural = '' if len(self._threads) == 1 else 's' 247 thread_state_overview = f' {len(self._threads)} thread{plural} running' 248 requesting_thread = self.active_thread() 249 if not requesting_thread: 250 thread_state_overview += '.' 251 output.append(thread_state_overview) 252 else: 253 thread_state_overview += ', ' 254 underline = ' ' * len(thread_state_overview) + '~' * len( 255 requesting_thread.name.decode() 256 ) 257 thread_state_overview += ( 258 f'{requesting_thread.name.decode()}' 259 ' active at the time of capture.' 260 ) 261 output.append(thread_state_overview) 262 output.append(underline) 263 264 output.append('') 265 266 # Put the active thread at the front. 267 requesting_thread = self.active_thread() 268 if requesting_thread is not None: 269 self._threads.remove(requesting_thread) 270 self._threads.insert(0, requesting_thread) 271 272 for thread in self._threads: 273 thread_name = thread.name.decode() 274 if not thread_name: 275 thread_name = '[unnamed thread]' 276 thread_headline = ( 277 'Thread ' 278 f'({THREAD_STATE_TO_STRING[thread.state]}): ' 279 f'{thread_name}' 280 ) 281 if self.active_thread() == thread: 282 thread_headline += ' <-- [ACTIVE]' 283 output.append(thread_headline) 284 output.append(str(ThreadInfo(thread))) 285 if thread.raw_backtrace: 286 output.append( 287 self._symbolizer.dump_stack_trace(thread.raw_backtrace) 288 ) 289 if thread.raw_stack: 290 output.append('Raw Stack') 291 output.append( 292 binascii.hexlify(thread.raw_stack, b'\n', 32).decode( 293 'utf-8' 294 ) 295 ) 296 if self._user_processing_callback is not None: 297 output.append( 298 self._user_processing_callback(thread.SerializeToString()) 299 ) 300 301 # Blank line between threads for nicer formatting. 302 output.append('') 303 304 return '\n'.join(output) 305