1#!/usr/bin/env python3 2# Copyright 2021 The Pigweed Authors 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may not 5# use this file except in compliance with the License. You may obtain a copy of 6# the License at 7# 8# https://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations under 14# the License. 15"""Tests for the thread analyzer.""" 16 17import unittest 18from pw_thread.thread_analyzer import ThreadInfo, ThreadSnapshotAnalyzer 19from pw_thread_protos import thread_pb2 20import pw_tokenizer 21from pw_tokenizer import tokens 22 23 24class ThreadInfoTest(unittest.TestCase): 25 """Tests that the ThreadInfo class produces expected results.""" 26 27 def test_empty_thread(self): 28 thread_info = ThreadInfo(thread_pb2.Thread()) 29 expected = '\n'.join( 30 ( 31 'Est CPU usage: unknown', 32 'Stack info', 33 ' Current usage: 0x???????? - 0x???????? (size unknown)', 34 ' Est peak usage: size unknown', 35 ' Stack limits: 0x???????? - 0x???????? (size unknown)', 36 ) 37 ) 38 self.assertFalse(thread_info.has_stack_size_limit()) 39 self.assertFalse(thread_info.has_stack_used()) 40 self.assertEqual(expected, str(thread_info)) 41 42 def test_thread_with_cpu_usage(self): 43 thread = thread_pb2.Thread() 44 thread.cpu_usage_hundredths = 1234 45 thread_info = ThreadInfo(thread) 46 47 expected = '\n'.join( 48 ( 49 'Est CPU usage: 12.34%', 50 'Stack info', 51 ' Current usage: 0x???????? - 0x???????? (size unknown)', 52 ' Est peak usage: size unknown', 53 ' Stack limits: 0x???????? - 0x???????? (size unknown)', 54 ) 55 ) 56 self.assertFalse(thread_info.has_stack_size_limit()) 57 self.assertFalse(thread_info.has_stack_used()) 58 self.assertEqual(expected, str(thread_info)) 59 60 def test_thread_with_stack_pointer(self): 61 thread = thread_pb2.Thread() 62 thread.stack_pointer = 0x5AC6A86C 63 thread_info = ThreadInfo(thread) 64 65 expected = '\n'.join( 66 ( 67 'Est CPU usage: unknown', 68 'Stack info', 69 ' Current usage: 0x???????? - 0x5ac6a86c (size unknown)', 70 ' Est peak usage: size unknown', 71 ' Stack limits: 0x???????? - 0x???????? (size unknown)', 72 ) 73 ) 74 self.assertFalse(thread_info.has_stack_size_limit()) 75 self.assertFalse(thread_info.has_stack_used()) 76 self.assertEqual(expected, str(thread_info)) 77 78 def test_thread_with_stack_usage(self): 79 thread = thread_pb2.Thread() 80 thread.stack_start_pointer = 0x5AC6B86C 81 thread.stack_pointer = 0x5AC6A86C 82 thread_info = ThreadInfo(thread) 83 84 expected = '\n'.join( 85 ( 86 'Est CPU usage: unknown', 87 'Stack info', 88 ' Current usage: 0x5ac6b86c - 0x5ac6a86c (4096 bytes)', 89 ' Est peak usage: size unknown', 90 ' Stack limits: 0x5ac6b86c - 0x???????? (size unknown)', 91 ) 92 ) 93 self.assertFalse(thread_info.has_stack_size_limit()) 94 self.assertTrue(thread_info.has_stack_used()) 95 self.assertEqual(expected, str(thread_info)) 96 97 def test_thread_with_all_stack_info(self): 98 thread = thread_pb2.Thread() 99 thread.stack_start_pointer = 0x5AC6B86C 100 thread.stack_end_pointer = 0x5AC6986C 101 thread.stack_pointer = 0x5AC6A86C 102 thread_info = ThreadInfo(thread) 103 104 # pylint: disable=line-too-long 105 expected = '\n'.join( 106 ( 107 'Est CPU usage: unknown', 108 'Stack info', 109 ' Current usage: 0x5ac6b86c - 0x5ac6a86c (4096 bytes, 50.00%)', 110 ' Est peak usage: size unknown', 111 ' Stack limits: 0x5ac6b86c - 0x5ac6986c (8192 bytes)', 112 ) 113 ) 114 # pylint: enable=line-too-long 115 self.assertTrue(thread_info.has_stack_size_limit()) 116 self.assertTrue(thread_info.has_stack_used()) 117 self.assertEqual(expected, str(thread_info)) 118 119 120class ThreadSnapshotAnalyzerTest(unittest.TestCase): 121 """Tests that the ThreadSnapshotAnalyzer class produces expected results.""" 122 123 def test_no_threads(self): 124 analyzer = ThreadSnapshotAnalyzer(thread_pb2.SnapshotThreadInfo()) 125 self.assertEqual('', str(analyzer)) 126 127 def test_one_empty_thread(self): 128 snapshot = thread_pb2.SnapshotThreadInfo() 129 snapshot.threads.append(thread_pb2.Thread()) 130 expected = '\n'.join( 131 ( 132 'Thread State', 133 ' 1 thread running.', 134 '', 135 'Thread (UNKNOWN): [unnamed thread]', 136 'Est CPU usage: unknown', 137 'Stack info', 138 ' Current usage: 0x???????? - 0x???????? (size unknown)', 139 ' Est peak usage: size unknown', 140 ' Stack limits: 0x???????? - 0x???????? (size unknown)', 141 '', 142 ) 143 ) 144 analyzer = ThreadSnapshotAnalyzer(snapshot) 145 self.assertEqual(analyzer.active_thread(), None) 146 self.assertEqual(str(ThreadSnapshotAnalyzer(snapshot)), expected) 147 148 def test_two_threads(self): 149 """Ensures multiple threads are printed correctly.""" 150 snapshot = thread_pb2.SnapshotThreadInfo() 151 152 temp_thread = thread_pb2.Thread() 153 temp_thread.name = 'Idle'.encode() 154 temp_thread.state = thread_pb2.ThreadState.Enum.READY 155 temp_thread.stack_start_pointer = 0x2001AC00 156 temp_thread.stack_end_pointer = 0x2001AA00 157 temp_thread.stack_pointer = 0x2001AB0C 158 temp_thread.stack_pointer_est_peak = 0x2001AA00 159 snapshot.threads.append(temp_thread) 160 161 temp_thread = thread_pb2.Thread() 162 temp_thread.name = 'Alice'.encode() 163 temp_thread.stack_start_pointer = 0x2001B000 164 temp_thread.stack_pointer = 0x2001AE20 165 temp_thread.state = thread_pb2.ThreadState.Enum.BLOCKED 166 snapshot.threads.append(temp_thread) 167 168 # pylint: disable=line-too-long 169 expected = '\n'.join( 170 ( 171 'Thread State', 172 ' 2 threads running.', 173 '', 174 'Thread (READY): Idle', 175 'Est CPU usage: unknown', 176 'Stack info', 177 ' Current usage: 0x2001ac00 - 0x2001ab0c (244 bytes, 47.66%)', 178 ' Est peak usage: 512 bytes, 100.00%', 179 ' Stack limits: 0x2001ac00 - 0x2001aa00 (512 bytes)', 180 '', 181 'Thread (BLOCKED): Alice', 182 'Est CPU usage: unknown', 183 'Stack info', 184 ' Current usage: 0x2001b000 - 0x2001ae20 (480 bytes)', 185 ' Est peak usage: size unknown', 186 ' Stack limits: 0x2001b000 - 0x???????? (size unknown)', 187 '', 188 ) 189 ) 190 # pylint: enable=line-too-long 191 analyzer = ThreadSnapshotAnalyzer(snapshot) 192 self.assertEqual(analyzer.active_thread(), None) 193 self.assertEqual(str(ThreadSnapshotAnalyzer(snapshot)), expected) 194 195 def test_interrupts_with_thread(self): 196 """Ensures interrupts are properly reported as active.""" 197 snapshot = thread_pb2.SnapshotThreadInfo() 198 199 temp_thread = thread_pb2.Thread() 200 temp_thread.name = 'Idle'.encode() 201 temp_thread.state = thread_pb2.ThreadState.Enum.READY 202 temp_thread.stack_start_pointer = 0x2001AC00 203 temp_thread.stack_end_pointer = 0x2001AA00 204 temp_thread.stack_pointer = 0x2001AB0C 205 temp_thread.stack_pointer_est_peak = 0x2001AA00 206 snapshot.threads.append(temp_thread) 207 208 temp_thread = thread_pb2.Thread() 209 temp_thread.name = 'Main/Handler'.encode() 210 temp_thread.stack_start_pointer = 0x2001B000 211 temp_thread.stack_pointer = 0x2001AE20 212 temp_thread.state = thread_pb2.ThreadState.Enum.INTERRUPT_HANDLER 213 snapshot.threads.append(temp_thread) 214 215 # pylint: disable=line-too-long 216 expected = '\n'.join( 217 ( 218 'Thread State', 219 ' 2 threads running, Main/Handler active at the time of capture.', 220 ' ~~~~~~~~~~~~', 221 '', 222 # Ensure the active thread is moved to the top of the list. 223 'Thread (INTERRUPT_HANDLER): Main/Handler <-- [ACTIVE]', 224 'Est CPU usage: unknown', 225 'Stack info', 226 ' Current usage: 0x2001b000 - 0x2001ae20 (480 bytes)', 227 ' Est peak usage: size unknown', 228 ' Stack limits: 0x2001b000 - 0x???????? (size unknown)', 229 '', 230 'Thread (READY): Idle', 231 'Est CPU usage: unknown', 232 'Stack info', 233 ' Current usage: 0x2001ac00 - 0x2001ab0c (244 bytes, 47.66%)', 234 ' Est peak usage: 512 bytes, 100.00%', 235 ' Stack limits: 0x2001ac00 - 0x2001aa00 (512 bytes)', 236 '', 237 ) 238 ) 239 # pylint: enable=line-too-long 240 analyzer = ThreadSnapshotAnalyzer(snapshot) 241 self.assertEqual(analyzer.active_thread(), temp_thread) 242 self.assertEqual(str(ThreadSnapshotAnalyzer(snapshot)), expected) 243 244 def test_active_thread(self): 245 """Ensures the 'active' thread is highlighted.""" 246 snapshot = thread_pb2.SnapshotThreadInfo() 247 248 temp_thread = thread_pb2.Thread() 249 temp_thread.name = 'Idle'.encode() 250 temp_thread.state = thread_pb2.ThreadState.Enum.READY 251 temp_thread.stack_start_pointer = 0x2001AC00 252 temp_thread.stack_end_pointer = 0x2001AA00 253 temp_thread.stack_pointer = 0x2001AB0C 254 temp_thread.stack_pointer_est_peak = 0x2001AC00 + 0x100 255 snapshot.threads.append(temp_thread) 256 257 temp_thread = thread_pb2.Thread() 258 temp_thread.name = 'Main/Handler'.encode() 259 temp_thread.active = True 260 temp_thread.stack_start_pointer = 0x2001B000 261 temp_thread.stack_pointer = 0x2001AE20 262 temp_thread.stack_pointer_est_peak = 0x2001B000 + 0x200 263 temp_thread.state = thread_pb2.ThreadState.Enum.INTERRUPT_HANDLER 264 snapshot.threads.append(temp_thread) 265 266 # pylint: disable=line-too-long 267 expected = '\n'.join( 268 ( 269 'Thread State', 270 ' 2 threads running, Main/Handler active at the time of capture.', 271 ' ~~~~~~~~~~~~', 272 '', 273 # Ensure the active thread is moved to the top of the list. 274 'Thread (INTERRUPT_HANDLER): Main/Handler <-- [ACTIVE]', 275 'Est CPU usage: unknown', 276 'Stack info', 277 ' Current usage: 0x2001b000 - 0x2001ae20 (480 bytes)', 278 ' Est peak usage: 512 bytes', 279 ' Stack limits: 0x2001b000 - 0x???????? (size unknown)', 280 '', 281 'Thread (READY): Idle', 282 'Est CPU usage: unknown', 283 'Stack info', 284 ' Current usage: 0x2001ac00 - 0x2001ab0c (244 bytes, 47.66%)', 285 ' Est peak usage: 256 bytes, 50.00%', 286 ' Stack limits: 0x2001ac00 - 0x2001aa00 (512 bytes)', 287 '', 288 ) 289 ) 290 # pylint: enable=line-too-long 291 analyzer = ThreadSnapshotAnalyzer(snapshot) 292 293 # Ensure the active thread is found. 294 self.assertEqual(analyzer.active_thread(), temp_thread) 295 self.assertEqual(str(ThreadSnapshotAnalyzer(snapshot)), expected) 296 297 def test_tokenized_thread_name(self): 298 """Ensures a tokenized thread name is detokenized.""" 299 snapshot = thread_pb2.SnapshotThreadInfo() 300 detokenizer = pw_tokenizer.Detokenizer( 301 tokens.Database( 302 [ 303 tokens.TokenizedStringEntry( 304 0x46BE7497, 'The thread for Kuzco' 305 ), 306 ] 307 ) 308 ) 309 310 temp_thread = thread_pb2.Thread() 311 temp_thread.name = b'\x97\x74\xBE\x46' 312 snapshot.threads.append(temp_thread) 313 temp_thread.name = b'\x5D\xA8\x66\xAE' 314 snapshot.threads.append(temp_thread) 315 316 # pylint: disable=line-too-long 317 expected = '\n'.join( 318 ( 319 'Thread State', 320 ' 2 threads running.', 321 '', 322 'Thread (UNKNOWN): The thread for Kuzco', 323 'Est CPU usage: unknown', 324 'Stack info', 325 ' Current usage: 0x???????? - 0x???????? (size unknown)', 326 ' Est peak usage: size unknown', 327 ' Stack limits: 0x???????? - 0x???????? (size unknown)', 328 '', 329 'Thread (UNKNOWN): $Xahmrg==', 330 'Est CPU usage: unknown', 331 'Stack info', 332 ' Current usage: 0x???????? - 0x???????? (size unknown)', 333 ' Est peak usage: size unknown', 334 ' Stack limits: 0x???????? - 0x???????? (size unknown)', 335 '', 336 ) 337 ) 338 # pylint: enable=line-too-long 339 analyzer = ThreadSnapshotAnalyzer(snapshot, tokenizer_db=detokenizer) 340 341 # Ensure text dump matches expected contents. 342 self.assertEqual(str(analyzer), expected) 343 344 def test_no_db_tokenized_thread_name(self): 345 """Ensures a tokenized thread name is detokenized.""" 346 snapshot = thread_pb2.SnapshotThreadInfo() 347 348 temp_thread = thread_pb2.Thread() 349 temp_thread.name = b'\x97\x74\xBE\x46' 350 snapshot.threads.append(temp_thread) 351 temp_thread.name = b'\x5D\xA8\x66\xAE' 352 snapshot.threads.append(temp_thread) 353 354 # pylint: disable=line-too-long 355 expected = '\n'.join( 356 ( 357 'Thread State', 358 ' 2 threads running.', 359 '', 360 'Thread (UNKNOWN): $l3S+Rg==', 361 'Est CPU usage: unknown', 362 'Stack info', 363 ' Current usage: 0x???????? - 0x???????? (size unknown)', 364 ' Est peak usage: size unknown', 365 ' Stack limits: 0x???????? - 0x???????? (size unknown)', 366 '', 367 'Thread (UNKNOWN): $Xahmrg==', 368 'Est CPU usage: unknown', 369 'Stack info', 370 ' Current usage: 0x???????? - 0x???????? (size unknown)', 371 ' Est peak usage: size unknown', 372 ' Stack limits: 0x???????? - 0x???????? (size unknown)', 373 '', 374 ) 375 ) 376 # pylint: enable=line-too-long 377 analyzer = ThreadSnapshotAnalyzer(snapshot) 378 379 # Ensure text dump matches expected contents. 380 self.assertEqual(str(analyzer), expected) 381 382 383if __name__ == '__main__': 384 unittest.main() 385