1#!/usr/bin/env python3 2# Copyright 2020 The Pigweed Authors 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may not 5# use this file except in compliance with the License. You may obtain a copy of 6# the License at 7# 8# https://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations under 14# the License. 15"""Tests for detokenize.""" 16 17import base64 18import concurrent 19import datetime as dt 20import functools 21import io 22import os 23from pathlib import Path 24import struct 25import tempfile 26from typing import Any, Callable, NamedTuple 27import unittest 28from unittest import mock 29 30from pw_tokenizer import database 31from pw_tokenizer import detokenize 32from pw_tokenizer import elf_reader 33from pw_tokenizer import tokens 34 35 36# This function is not part of this test. It was used to generate the binary 37# strings for EMPTY_ELF and ELF_WITH_TOKENIZER_SECTIONS. It takes a path and 38# returns a Python byte string suitable for copying into Python source code. 39def path_to_byte_string(path: str) -> str: 40 with open(path, 'rb') as fd: 41 data = fd.read() 42 43 output: list[str] = [] 44 indices = iter(range(len(data))) 45 46 while True: 47 line = '' 48 49 while len(line) < 70: 50 try: 51 i = next(indices) 52 except StopIteration: 53 break 54 55 line += repr(data[i : i + 1])[2:-1].replace("'", r'\'') 56 57 if not line: 58 return ''.join(output) 59 60 output.append(" b'{}'\n".format(''.join(line))) 61 62 63# This is an empty ELF file. It was created from the ELF file for 64# tokenize_test.cc with the command: 65# 66# arm-none-eabi-objcopy -S --only-section NO_SECTIONS_PLEASE <ELF> <OUTPUT> 67# 68# The resulting ELF was converted to a Python binary string using 69# path_to_byte_string function above. 70EMPTY_ELF = ( 71 b'\x7fELF\x01\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00(\x00\x01' 72 b'\x00\x00\x00\xd1\x83\x00\x084\x00\x00\x00\xe0\x00\x00\x00\x00\x04\x00\x05' 73 b'4\x00 \x00\x05\x00(\x00\x02\x00\x01\x00\x01\x00\x00\x00\xd4\x00\x00\x00' 74 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x07\x00' 75 b'\x00\x00\x00\x00\x01\x00\x01\x00\x00\x00\xd4\x00\x00\x00\x00\x00\x00\x00' 76 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x06\x00\x00\x00\x00\x00' 77 b'\x01\x00\x01\x00\x00\x00\xd4\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 78 b'\x00\x00\x00\x00\x00\x00\x00\x00\x06\x00\x00\x00\x00\x00\x01\x00\x01\x00' 79 b'\x00\x00\xd4\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 80 b'\x00\x00\x00\x00\x06\x00\x00\x00\x00\x00\x01\x00\x01\x00\x00\x00\xd4\x00' 81 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 82 b'\x06\x00\x00\x00\x00\x00\x01\x00\x00.shstrtab\x00\x00\x00\x00\x00\x00\x00' 83 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 84 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01' 85 b'\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xd4\x00\x00' 86 b'\x00\x0b\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00' 87 b'\x00\x00\x00' 88) 89 90# This is an ELF file with only the pw_tokenizer sections. It was created 91# from a tokenize_test binary built for the STM32F429i Discovery board. The 92# pw_tokenizer sections were extracted with this command: 93# 94# arm-none-eabi-objcopy -S --only-section ".pw_tokenizer*" <ELF> <OUTPUT> 95# 96ELF_WITH_TOKENIZER_SECTIONS_PATH = Path(__file__).parent.joinpath( 97 'example_binary_with_tokenized_strings.elf' 98) 99ELF_WITH_TOKENIZER_SECTIONS = ELF_WITH_TOKENIZER_SECTIONS_PATH.read_bytes() 100 101DEFAULT_DOMAIN_TOKENS = 22 102 103# 27 total since token 881436a0="The answer is: %s" is in two domains. 104ALL_DOMAIN_TOKENS = 26 + 1 105 106# 0x2e668cd6 is 'Jello, world!' (which is also used in database_test.py). 107JELLO_WORLD_TOKEN = b'\xd6\x8c\x66\x2e' 108 109 110class DetokenizeTest(unittest.TestCase): 111 """Tests the detokenize.Detokenizer.""" 112 113 def test_simple(self) -> None: 114 detok = detokenize.Detokenizer( 115 tokens.Database( 116 [ 117 tokens.TokenizedStringEntry( 118 0xCDAB, '%02d %s %c%%', date_removed=dt.datetime.now() 119 ) 120 ] 121 ) 122 ) 123 self.assertEqual( 124 str(detok.detokenize(b'\xab\xcd\0\0\x02\x03Two\x66')), '01 Two 3%' 125 ) 126 127 def test_detokenize_extra_data_is_unsuccessful(self) -> None: 128 detok = detokenize.Detokenizer( 129 tokens.Database( 130 [ 131 tokens.TokenizedStringEntry( 132 1, 'no args', date_removed=dt.datetime(1, 1, 1) 133 ) 134 ] 135 ) 136 ) 137 138 result = detok.detokenize(b'\x01\0\0\0\x04args') 139 self.assertEqual(len(result.failures), 1) 140 string, args, remaining = result.failures[0] 141 self.assertEqual('no args', string) 142 self.assertFalse(args) 143 self.assertEqual(b'\x04args', remaining) 144 self.assertEqual('no args', string) 145 self.assertEqual('no args', str(result)) 146 147 def test_detokenize_zero_extend_short_token_with_no_args(self) -> None: 148 detok = detokenize.Detokenizer( 149 tokens.Database( 150 [tokens.TokenizedStringEntry(0xCDAB, 'This token is 16 bits')] 151 ) 152 ) 153 self.assertEqual( 154 str(detok.detokenize(b'\xab\xcd')), 'This token is 16 bits' 155 ) 156 157 def test_detokenize_missing_data_is_unsuccessful(self) -> None: 158 detok = detokenize.Detokenizer( 159 tokens.Database( 160 [ 161 tokens.TokenizedStringEntry( 162 2, '%s', date_removed=dt.datetime(1, 1, 1) 163 ) 164 ] 165 ) 166 ) 167 168 result = detok.detokenize(b'\x02\0\0\0') 169 string, args, remaining = result.failures[0] 170 self.assertEqual('%s', string) 171 self.assertEqual(len(args), 1) 172 self.assertEqual(b'', remaining) 173 self.assertEqual(len(result.failures), 1) 174 self.assertEqual('%s', str(result)) 175 176 def test_detokenize_missing_data_with_errors_is_unsuccessful(self) -> None: 177 detok = detokenize.Detokenizer( 178 tokens.Database( 179 [ 180 tokens.TokenizedStringEntry( 181 2, '%s', date_removed=dt.datetime(1, 1, 1) 182 ) 183 ] 184 ), 185 show_errors=True, 186 ) 187 188 result = detok.detokenize(b'\x02\0\0\0') 189 string, args, remaining = result.failures[0] 190 self.assertIn('%s MISSING', string) 191 self.assertEqual(len(args), 1) 192 self.assertEqual(b'', remaining) 193 self.assertEqual(len(result.failures), 1) 194 self.assertIn('%s MISSING', str(result)) 195 196 def test_unparsed_data(self) -> None: 197 detok = detokenize.Detokenizer( 198 tokens.Database( 199 [ 200 tokens.TokenizedStringEntry( 201 1, 'no args', date_removed=dt.datetime(100, 1, 1) 202 ), 203 ] 204 ) 205 ) 206 result = detok.detokenize(b'\x01\0\0\0o_o') 207 self.assertFalse(result.ok()) 208 self.assertEqual('no args', str(result)) 209 self.assertIn('o_o', repr(result)) 210 self.assertIn('decoding failed', result.error_message()) 211 212 def test_empty_db(self) -> None: 213 detok = detokenize.Detokenizer(io.BytesIO(EMPTY_ELF)) 214 self.assertFalse(detok.detokenize(b'\x12\x34\0\0').ok()) 215 self.assertIn( 216 'unknown token', detok.detokenize(b'1234').error_message() 217 ) 218 self.assertIn('unknown token', repr(detok.detokenize(b'1234'))) 219 220 self.assertEqual( 221 '$' + base64.b64encode(b'1234').decode(), 222 str(detok.detokenize(b'1234')), 223 ) 224 225 self.assertIsNone(detok.detokenize(b'').token) 226 227 def test_empty_db_show_errors(self) -> None: 228 detok = detokenize.Detokenizer(io.BytesIO(EMPTY_ELF), show_errors=True) 229 self.assertFalse(detok.detokenize(b'\x12\x34\0\0').ok()) 230 self.assertIn( 231 'unknown token', detok.detokenize(b'1234').error_message() 232 ) 233 self.assertIn('unknown token', repr(detok.detokenize(b'1234'))) 234 self.assertIn('unknown token', str(detok.detokenize(b'1234'))) 235 236 self.assertIsNone(detok.detokenize(b'').token) 237 238 def test_missing_token_show_errors(self) -> None: 239 detok = detokenize.Detokenizer(io.BytesIO(EMPTY_ELF), show_errors=True) 240 self.assertIn('missing token', detok.detokenize(b'').error_message()) 241 self.assertIn('missing token', str(detok.detokenize(b''))) 242 243 def test_missing_token(self) -> None: 244 detok = detokenize.Detokenizer(io.BytesIO(EMPTY_ELF)) 245 self.assertIn('missing token', detok.detokenize(b'').error_message()) 246 self.assertEqual('$', str(detok.detokenize(b''))) 247 248 def test_unknown_shorter_token_show_error(self) -> None: 249 detok = detokenize.Detokenizer(io.BytesIO(EMPTY_ELF), show_errors=True) 250 251 self.assertIn('unknown token', detok.detokenize(b'1').error_message()) 252 self.assertIn('unknown token', str(detok.detokenize(b'1'))) 253 self.assertIn('unknown token', repr(detok.detokenize(b'1'))) 254 255 self.assertIn('unknown token', detok.detokenize(b'123').error_message()) 256 self.assertIn('unknown token', str(detok.detokenize(b'123'))) 257 self.assertIn('unknown token', repr(detok.detokenize(b'123'))) 258 259 def test_unknown_shorter_token(self) -> None: 260 detok = detokenize.Detokenizer(io.BytesIO(EMPTY_ELF)) 261 262 self.assertEqual( 263 'unknown token 00000001', detok.detokenize(b'\1').error_message() 264 ) 265 self.assertEqual( 266 '$' + base64.b64encode(b'\1\0\0\0').decode(), 267 str(detok.detokenize(b'\1')), 268 ) 269 self.assertIn('unknown token 00000001', repr(detok.detokenize(b'\1'))) 270 271 self.assertEqual( 272 'unknown token 00030201', 273 detok.detokenize(b'\1\2\3').error_message(), 274 ) 275 self.assertEqual( 276 '$' + base64.b64encode(b'\1\2\3\0').decode(), 277 str(detok.detokenize(b'\1\2\3')), 278 ) 279 self.assertIn( 280 'unknown token 00030201', repr(detok.detokenize(b'\1\2\3')) 281 ) 282 283 def test_decode_from_elf_data(self) -> None: 284 detok = detokenize.Detokenizer(io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS)) 285 286 self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok()) 287 self.assertEqual( 288 str(detok.detokenize(JELLO_WORLD_TOKEN)), 'Jello, world!' 289 ) 290 291 undecoded_args = detok.detokenize(JELLO_WORLD_TOKEN + b'some junk') 292 self.assertFalse(undecoded_args.ok()) 293 self.assertEqual(str(undecoded_args), 'Jello, world!') 294 295 self.assertTrue(detok.detokenize(b'\0\0\0\0').ok()) 296 self.assertEqual(str(detok.detokenize(b'\0\0\0\0')), '') 297 298 def test_decode_from_elf_file(self) -> None: 299 """Test decoding from an elf file.""" 300 detok = detokenize.Detokenizer(io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS)) 301 expected_tokens = frozenset(detok.database.token_to_entries.keys()) 302 303 with tempfile.NamedTemporaryFile('wb', delete=False) as elf: 304 try: 305 elf.write(ELF_WITH_TOKENIZER_SECTIONS) 306 elf.close() 307 308 # Open ELF by file object 309 with open(elf.name, 'rb') as fd: 310 detok = detokenize.Detokenizer(fd) 311 312 self.assertEqual( 313 expected_tokens, 314 frozenset(detok.database.token_to_entries.keys()), 315 ) 316 317 # Open ELF by path 318 detok = detokenize.Detokenizer(elf.name) 319 self.assertEqual( 320 expected_tokens, 321 frozenset(detok.database.token_to_entries.keys()), 322 ) 323 324 # Open ELF by elf_reader.Elf 325 with open(elf.name, 'rb') as fd: 326 detok = detokenize.Detokenizer(elf_reader.Elf(fd)) 327 328 self.assertEqual( 329 expected_tokens, 330 frozenset(detok.database.token_to_entries.keys()), 331 ) 332 finally: 333 os.unlink(elf.name) 334 335 def test_decode_from_csv_file(self) -> None: 336 detok = detokenize.Detokenizer(io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS)) 337 expected_tokens = frozenset(detok.database.token_to_entries.keys()) 338 339 csv_database = str(detok.database) 340 self.assertEqual(len(csv_database.splitlines()), ALL_DOMAIN_TOKENS) 341 342 with tempfile.NamedTemporaryFile('w', delete=False) as csv_file: 343 try: 344 csv_file.write(csv_database) 345 csv_file.close() 346 347 # Open CSV by path 348 detok = detokenize.Detokenizer(csv_file.name) 349 self.assertEqual( 350 expected_tokens, 351 frozenset(detok.database.token_to_entries.keys()), 352 ) 353 354 # Open CSV by file object 355 with open(csv_file.name) as fd: 356 detok = detokenize.Detokenizer(fd) 357 358 self.assertEqual( 359 expected_tokens, 360 frozenset(detok.database.token_to_entries.keys()), 361 ) 362 finally: 363 os.unlink(csv_file.name) 364 365 def test_create_detokenizer_with_token_database(self) -> None: 366 detok = detokenize.Detokenizer(io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS)) 367 expected_tokens = frozenset(detok.database.token_to_entries.keys()) 368 369 detok = detokenize.Detokenizer(detok.database) 370 self.assertEqual( 371 expected_tokens, frozenset(detok.database.token_to_entries.keys()) 372 ) 373 374 375class DetokenizeWithCollisions(unittest.TestCase): 376 """Tests collision resolution.""" 377 378 def setUp(self) -> None: 379 super().setUp() 380 token = 0xBAAD 381 382 # Database with several conflicting tokens. 383 self.detok = detokenize.Detokenizer( 384 tokens.Database( 385 [ 386 tokens.TokenizedStringEntry( 387 token, 'REMOVED', date_removed=dt.datetime(9, 1, 1) 388 ), 389 tokens.TokenizedStringEntry(token, 'newer'), 390 tokens.TokenizedStringEntry( 391 token, 'A: %d', date_removed=dt.datetime(30, 5, 9) 392 ), 393 tokens.TokenizedStringEntry( 394 token, 'B: %c', date_removed=dt.datetime(30, 5, 10) 395 ), 396 tokens.TokenizedStringEntry(token, 'C: %s'), 397 tokens.TokenizedStringEntry(token, '%d%u'), 398 tokens.TokenizedStringEntry(token, '%s%u %d'), 399 tokens.TokenizedStringEntry(1, '%s'), 400 tokens.TokenizedStringEntry(1, '%d'), 401 tokens.TokenizedStringEntry(2, 'Three %s %s %s'), 402 tokens.TokenizedStringEntry(2, 'Five %d %d %d %d %s'), 403 ] 404 ) 405 ) 406 407 def test_collision_no_args_favors_most_recently_present(self) -> None: 408 no_args = self.detok.detokenize(b'\xad\xba\0\0') 409 self.assertFalse(no_args.ok()) 410 self.assertEqual(len(no_args.successes), 2) 411 self.assertEqual(len(no_args.failures), 5) 412 self.assertEqual(len(no_args.matches()), 7) 413 self.assertEqual(str(no_args), 'newer') 414 best_result = no_args.best_result() 415 assert best_result is not None 416 self.assertEqual(len(best_result.args), 0) 417 self.assertEqual(best_result[0], 'newer') 418 419 def test_collision_one_integer_arg_favors_most_recently_present( 420 self, 421 ) -> None: 422 multiple_correct = self.detok.detokenize(b'\xad\xba\0\0\x7a') 423 self.assertFalse(multiple_correct.ok()) 424 self.assertIn('ERROR', repr(multiple_correct)) 425 self.assertEqual(len(multiple_correct.successes), 2) 426 self.assertEqual(len(multiple_correct.failures), 5) 427 self.assertEqual(len(multiple_correct.matches()), 7) 428 self.assertEqual(str(multiple_correct), 'B: =') 429 430 def test_collision_one_integer_arg_favor_successful_decode(self) -> None: 431 # One string decodes successfully, since the arg is out of range for %c. 432 int_arg = self.detok.detokenize(b'\xad\xba\0\0\xfe\xff\xff\xff\x0f') 433 self.assertTrue(int_arg.ok()) 434 self.assertEqual(str(int_arg), 'A: 2147483647') 435 436 def test_collision_one_string_arg_favors_successful_decode(self) -> None: 437 # One string decodes successfully, since decoding the argument as an 438 # integer does not decode all the data. 439 string_arg = self.detok.detokenize(b'\xad\xba\0\0\x02Hi') 440 self.assertTrue(string_arg.ok()) 441 self.assertEqual(str(string_arg), 'C: Hi') 442 443 def test_collision_one_string_arg_favors_decoding_all_data(self) -> None: 444 result = self.detok.detokenize(b'\1\0\0\0\x83hi') 445 self.assertEqual(len(result.failures), 2) 446 # Should resolve to the string since %d would leave one byte behind. 447 self.assertEqual(str(result), '%s') 448 449 def test_collision_multiple_args_favors_decoding_more_arguments( 450 self, 451 ) -> None: 452 result = self.detok.detokenize(b'\2\0\0\0\1\2\1\4\5') 453 self.assertEqual(len(result.matches()), 2) 454 self.assertEqual(result.matches()[0][0], 'Five -1 1 -1 2 %s') 455 self.assertEqual(result.matches()[1][0], 'Three \2 \4 %s') 456 457 def test_collision_multiple_args_favors_decoding_all_arguments( 458 self, 459 ) -> None: 460 unambiguous = self.detok.detokenize(b'\xad\xba\0\0\x01#\x00\x01') 461 self.assertTrue(unambiguous.ok()) 462 self.assertEqual(len(unambiguous.matches()), 7) 463 self.assertEqual('#0 -1', str(unambiguous)) 464 self.assertIn('#0 -1', repr(unambiguous)) 465 466 467class ManualPoolExecutor(concurrent.futures.Executor): 468 """A stubbed pool executor that captures the most recent work request 469 and holds it until the public process method is manually called.""" 470 471 def __init__(self) -> None: 472 super().__init__() 473 self._func = None 474 475 # pylint: disable=arguments-differ 476 def submit(self, func, *args, **kwargs): 477 """Submits work to the pool, stashing the partial for later use.""" 478 self._func = functools.partial(func, *args, **kwargs) 479 480 def process(self) -> None: 481 """Processes the latest func submitted to the pool.""" 482 if self._func is not None: 483 self._func() 484 self._func = None 485 486 487class InlinePoolExecutor(concurrent.futures.Executor): 488 """A stubbed pool executor that runs work immediately, inline.""" 489 490 # pylint: disable=arguments-differ 491 def submit(self, func, *args, **kwargs): 492 """Submits work to the pool, stashing the partial for later use.""" 493 func(*args, **kwargs) 494 495 496@mock.patch('os.path.getmtime') 497class AutoUpdatingDetokenizerTest(unittest.TestCase): 498 """Tests the AutoUpdatingDetokenizer class.""" 499 500 def test_update(self, mock_getmtime) -> None: 501 """Tests the update command.""" 502 503 db = database.load_token_database( 504 io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS) 505 ) 506 self.assertEqual(len(db), ALL_DOMAIN_TOKENS) 507 508 the_time = [100] 509 510 def move_back_time_if_file_exists(path) -> int: 511 if os.path.exists(path): 512 the_time[0] -= 1 513 return the_time[0] 514 515 raise FileNotFoundError 516 517 mock_getmtime.side_effect = move_back_time_if_file_exists 518 519 with tempfile.NamedTemporaryFile('wb', delete=False) as file: 520 try: 521 file.close() 522 523 pool = ManualPoolExecutor() 524 detok = detokenize.AutoUpdatingDetokenizer( 525 file.name, min_poll_period_s=0, pool=pool 526 ) 527 self.assertFalse(detok.detokenize(JELLO_WORLD_TOKEN).ok()) 528 529 with open(file.name, 'wb') as fd: 530 tokens.write_binary(db, fd) 531 532 # After the change but before the pool runs in another thread, 533 # the token should not exist. 534 self.assertFalse(detok.detokenize(JELLO_WORLD_TOKEN).ok()) 535 536 # After the pool is allowed to process, it should. 537 pool.process() 538 self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok()) 539 finally: 540 os.unlink(file.name) 541 542 def test_update_with_directory(self, mock_getmtime) -> None: 543 """Tests the update command with a directory format database.""" 544 db = database.load_token_database( 545 io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS) 546 ) 547 self.assertEqual(len(db), ALL_DOMAIN_TOKENS) 548 549 the_time = [100] 550 551 def move_back_time_if_file_exists(path: str) -> int: 552 if os.path.exists(path): 553 the_time[0] -= 1 554 return the_time[0] 555 556 raise FileNotFoundError 557 558 mock_getmtime.side_effect = move_back_time_if_file_exists 559 560 with tempfile.TemporaryDirectory() as dbdir: 561 with tempfile.NamedTemporaryFile( 562 'wb', delete=False, suffix='.pw_tokenizer.csv', dir=dbdir 563 ) as matching_suffix_file, tempfile.NamedTemporaryFile( 564 'wb', delete=False, suffix='.not.right', dir=dbdir 565 ) as mismatched_suffix_file: 566 try: 567 matching_suffix_file.close() 568 mismatched_suffix_file.close() 569 570 pool = ManualPoolExecutor() 571 detok = detokenize.AutoUpdatingDetokenizer( 572 dbdir, min_poll_period_s=0, pool=pool 573 ) 574 self.assertFalse(detok.detokenize(JELLO_WORLD_TOKEN).ok()) 575 576 with open(mismatched_suffix_file.name, 'wb') as fd: 577 tokens.write_csv(db, fd) 578 pool.process() 579 self.assertFalse(detok.detokenize(JELLO_WORLD_TOKEN).ok()) 580 581 with open(matching_suffix_file.name, 'wb') as fd: 582 tokens.write_csv(db, fd) 583 584 # After the change but before the pool runs in another 585 # thread, the token should not exist. 586 self.assertFalse(detok.detokenize(JELLO_WORLD_TOKEN).ok()) 587 pool.process() 588 589 # After the pool is allowed to process, it should. 590 self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok()) 591 finally: 592 os.unlink(mismatched_suffix_file.name) 593 os.unlink(matching_suffix_file.name) 594 os.rmdir(dbdir) 595 596 # The database stays around if the file is deleted. 597 self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok()) 598 599 def test_no_update_if_time_is_same(self, mock_getmtime) -> None: 600 mock_getmtime.return_value = 100 601 602 with tempfile.NamedTemporaryFile('wb', delete=False) as file: 603 try: 604 tokens.write_csv( 605 database.load_token_database( 606 io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS) 607 ), 608 file, 609 ) 610 file.close() 611 612 detok = detokenize.AutoUpdatingDetokenizer( 613 file.name, min_poll_period_s=0, pool=InlinePoolExecutor() 614 ) 615 self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok()) 616 617 # Empty the database, but keep the mock modified time the same. 618 with open(file.name, 'wb'): 619 pass 620 621 self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok()) 622 self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok()) 623 624 # Move back time so the now-empty file is reloaded. 625 mock_getmtime.return_value = 50 626 self.assertFalse(detok.detokenize(JELLO_WORLD_TOKEN).ok()) 627 finally: 628 os.unlink(file.name) 629 630 def test_token_domain_in_str(self, _) -> None: 631 """Tests a str containing a domain.""" 632 detok = detokenize.AutoUpdatingDetokenizer( 633 f'{ELF_WITH_TOKENIZER_SECTIONS_PATH}#', # Default domain 634 min_poll_period_s=0, 635 pool=InlinePoolExecutor(), 636 ) 637 self.assertEqual(len(detok.database), DEFAULT_DOMAIN_TOKENS) 638 639 def test_token_domain_in_path(self, _) -> None: 640 """Tests a Path() containing a domain.""" 641 detok = detokenize.AutoUpdatingDetokenizer( 642 Path(f'{ELF_WITH_TOKENIZER_SECTIONS_PATH}#'), 643 min_poll_period_s=0, 644 pool=InlinePoolExecutor(), 645 ) 646 self.assertEqual(len(detok.database), DEFAULT_DOMAIN_TOKENS) 647 648 def test_token_no_domain_in_str(self, _) -> None: 649 """Tests a str without a domain, which loads all domains.""" 650 detok = detokenize.AutoUpdatingDetokenizer( 651 str(ELF_WITH_TOKENIZER_SECTIONS_PATH), 652 min_poll_period_s=0, 653 pool=InlinePoolExecutor(), 654 ) 655 self.assertEqual(len(detok.database), ALL_DOMAIN_TOKENS) 656 657 def test_token_no_domain_in_path(self, _) -> None: 658 """Tests a Path() without a domain, which loads all domains.""" 659 detok = detokenize.AutoUpdatingDetokenizer( 660 ELF_WITH_TOKENIZER_SECTIONS_PATH, 661 min_poll_period_s=0, 662 pool=InlinePoolExecutor(), 663 ) 664 self.assertEqual(len(detok.database), ALL_DOMAIN_TOKENS) 665 666 def test_invalid_domain_specification(self, _) -> None: 667 with self.assertRaises(ValueError, msg='Too many # delimiters'): 668 detokenize.AutoUpdatingDetokenizer( 669 f'{ELF_WITH_TOKENIZER_SECTIONS_PATH}##', 670 min_poll_period_s=0, 671 pool=InlinePoolExecutor(), 672 ) 673 674 675def _next_char(message: bytes) -> bytes: 676 return bytes(b + 1 for b in message) 677 678 679class NestedMessageParserTest(unittest.TestCase): 680 """Tests parsing prefixed messages.""" 681 682 class _Case(NamedTuple): 683 data: bytes 684 expected: bytes 685 title: str 686 transform: Callable[[bytes], bytes] = _next_char 687 688 TRANSFORM_TEST_CASES = ( 689 _Case(b'$abcd', b'%bcde', 'single message'), 690 _Case( 691 b'$$WHAT?$abc$WHY? is this $ok $', 692 b'%%WHAT?%bcd%WHY? is this %ok %', 693 'message and non-message', 694 ), 695 _Case(b'$1$', b'%1%', 'empty message'), 696 _Case(b'$abc$defgh', b'%bcd%efghh', 'sequential message'), 697 _Case( 698 b'w$abcx$defygh$$abz', 699 b'w$ABCx$DEFygh$$ABz', 700 'interspersed start/end non-message', 701 bytes.upper, 702 ), 703 _Case( 704 b'$abcx$defygh$$ab', 705 b'$ABCx$DEFygh$$AB', 706 'interspersed start/end message ', 707 bytes.upper, 708 ), 709 ) 710 711 def setUp(self) -> None: 712 self.decoder = detokenize.NestedMessageParser('$', 'abcdefg') 713 714 def test_transform_io(self) -> None: 715 for data, expected, title, transform in self.TRANSFORM_TEST_CASES: 716 self.assertEqual( 717 expected, 718 b''.join( 719 self.decoder.transform_io(io.BytesIO(data), transform) 720 ), 721 f'{title}: {data!r}', 722 ) 723 724 def test_transform_bytes_with_flush(self) -> None: 725 for data, expected, title, transform in self.TRANSFORM_TEST_CASES: 726 self.assertEqual( 727 expected, 728 self.decoder.transform(data, transform, flush=True), 729 f'{title}: {data!r}', 730 ) 731 732 def test_transform_bytes_sequential(self) -> None: 733 def transform(message): 734 return message.upper().replace(b'$', b'*') 735 736 self.assertEqual(self.decoder.transform(b'abc$abcd', transform), b'abc') 737 self.assertEqual(self.decoder.transform(b'$', transform), b'*ABCD') 738 self.assertEqual(self.decoder.transform(b'$b', transform), b'*') 739 self.assertEqual(self.decoder.transform(b'', transform), b'') 740 self.assertEqual(self.decoder.transform(b' ', transform), b'*B ') 741 self.assertEqual(self.decoder.transform(b'hello', transform), b'hello') 742 self.assertEqual(self.decoder.transform(b'?? $ab', transform), b'?? ') 743 self.assertEqual( 744 self.decoder.transform(b'123$ab4$56$a', transform), b'*AB123*AB4*56' 745 ) 746 self.assertEqual( 747 self.decoder.transform(b'bc', transform, flush=True), b'*ABC' 748 ) 749 750 MESSAGES_TEST: Any = ( 751 (b'123$abc456$a', (False, b'123'), (True, b'$abc'), (False, b'456')), 752 (b'7$abcd', (True, b'$a'), (False, b'7')), 753 (b'e',), 754 (b'',), 755 (b'$', (True, b'$abcde')), 756 (b'$', (True, b'$')), 757 (b'$a$b$c', (True, b'$'), (True, b'$a'), (True, b'$b')), 758 (b'1', (True, b'$c'), (False, b'1')), 759 (b'',), 760 (b'?', (False, b'?')), 761 (b'!@', (False, b'!@')), 762 (b'%^&', (False, b'%^&')), 763 ) 764 765 def test_read_messages(self) -> None: 766 for step in self.MESSAGES_TEST: 767 data: bytes = step[0] 768 pieces: tuple[tuple[bool, bytes], ...] = step[1:] 769 self.assertEqual(tuple(self.decoder.read_messages(data)), pieces) 770 771 def test_read_messages_flush(self) -> None: 772 self.assertEqual( 773 list(self.decoder.read_messages(b'123$a')), [(False, b'123')] 774 ) 775 self.assertEqual(list(self.decoder.read_messages(b'b')), []) 776 self.assertEqual( 777 list(self.decoder.read_messages(b'', flush=True)), [(True, b'$ab')] 778 ) 779 780 def test_read_messages_io(self) -> None: 781 # Rework the read_messages test data for stream input. 782 data = io.BytesIO(b''.join(step[0] for step in self.MESSAGES_TEST)) 783 expected_pieces = sum((step[1:] for step in self.MESSAGES_TEST), ()) 784 785 result = self.decoder.read_messages_io(data) 786 for expected_is_message, expected_data in expected_pieces: 787 if expected_is_message: 788 is_message, piece = next(result) 789 self.assertTrue(is_message) 790 self.assertEqual(expected_data, piece) 791 else: # the IO version yields non-messages byte by byte 792 for byte in expected_data: 793 is_message, piece = next(result) 794 self.assertFalse(is_message) 795 self.assertEqual(bytes([byte]), piece) 796 797 798class DetokenizeNested(unittest.TestCase): 799 """Tests detokenizing nested tokens""" 800 801 def test_nested_hashed_arg(self) -> None: 802 detok = detokenize.Detokenizer( 803 tokens.Database( 804 [ 805 tokens.TokenizedStringEntry(0xA, 'tokenized argument'), 806 tokens.TokenizedStringEntry( 807 2, 808 'This is a ' + '$#%08x', 809 ), 810 ] 811 ) 812 ) 813 self.assertEqual( 814 str(detok.detokenize(b'\x02\0\0\0\x14')), 815 'This is a tokenized argument', 816 ) 817 818 def test_nested_base64_arg(self) -> None: 819 detok = detokenize.Detokenizer( 820 tokens.Database( 821 [ 822 tokens.TokenizedStringEntry(1, 'base64 argument'), 823 tokens.TokenizedStringEntry(2, 'This is a %s'), 824 ] 825 ) 826 ) 827 self.assertEqual( 828 str(detok.detokenize(b'\x02\0\0\0\x09$AQAAAA==')), # token for 1 829 'This is a base64 argument', 830 ) 831 832 def test_deeply_nested_arg(self) -> None: 833 detok = detokenize.Detokenizer( 834 tokens.Database( 835 [ 836 tokens.TokenizedStringEntry(1, '$10#0000000005'), 837 tokens.TokenizedStringEntry(2, 'This is a $#%08x'), 838 tokens.TokenizedStringEntry(3, 'deeply nested argument'), 839 tokens.TokenizedStringEntry(4, '$AQAAAA=='), 840 tokens.TokenizedStringEntry(5, '$AwAAAA=='), 841 ] 842 ) 843 ) 844 self.assertEqual( 845 str(detok.detokenize(b'\x02\0\0\0\x08')), # token for 4 846 'This is a deeply nested argument', 847 ) 848 849 850class DetokenizeBase64(unittest.TestCase): 851 """Tests detokenizing Base64 messages.""" 852 853 JELLO = b'$' + base64.b64encode(JELLO_WORLD_TOKEN) 854 855 RECURSION_STRING = f'The secret message is "{JELLO.decode()}"' 856 RECURSION = b'$' + base64.b64encode( 857 struct.pack('I', tokens.c_hash(RECURSION_STRING)) 858 ) 859 860 RECURSION_STRING_2 = f"'{RECURSION.decode()}', said the spy." 861 RECURSION_2 = b'$' + base64.b64encode( 862 struct.pack('I', tokens.c_hash(RECURSION_STRING_2)) 863 ) 864 865 TEST_CASES = ( 866 (b'', b''), 867 (b'nothing here', b'nothing here'), 868 (JELLO, b'Jello, world!'), 869 (JELLO + b'a', b'Jello, world!a'), 870 (JELLO + b'abc', b'Jello, world!abc'), 871 (JELLO + b'abc=', b'Jello, world!abc='), 872 (b'$a' + JELLO + b'a', b'$aJello, world!a'), 873 (b'Hello ' + JELLO + b'?', b'Hello Jello, world!?'), 874 (b'$' + JELLO, b'$Jello, world!'), 875 (JELLO + JELLO, b'Jello, world!Jello, world!'), 876 (JELLO + b'$' + JELLO, b'Jello, world!$Jello, world!'), 877 (JELLO + b'$a' + JELLO + b'bcd', b'Jello, world!$aJello, world!bcd'), 878 (b'$3141', b'$3141'), 879 (JELLO + b'$3141', b'Jello, world!$3141'), 880 ( 881 JELLO + b'$a' + JELLO + b'b' + JELLO + b'c', 882 b'Jello, world!$aJello, world!bJello, world!c', 883 ), 884 (RECURSION, b'The secret message is "Jello, world!"'), 885 ( 886 RECURSION_2, 887 b'\'The secret message is "Jello, world!"\', said the spy.', 888 ), 889 ) 890 891 def setUp(self) -> None: 892 super().setUp() 893 db = database.load_token_database( 894 io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS) 895 ) 896 db.add( 897 tokens.TokenizedStringEntry(tokens.c_hash(s), s) 898 for s in [self.RECURSION_STRING, self.RECURSION_STRING_2] 899 ) 900 self.detok = detokenize.Detokenizer(db) 901 902 def test_detokenize_base64_live(self) -> None: 903 for data, expected in self.TEST_CASES: 904 output = io.BytesIO() 905 self.detok.detokenize_base64_live(io.BytesIO(data), output) 906 907 self.assertEqual(expected, output.getvalue(), f'Input: {data!r}') 908 909 def test_detokenize_base64_to_file(self) -> None: 910 for data, expected in self.TEST_CASES: 911 output = io.BytesIO() 912 self.detok.detokenize_base64_to_file(data, output) 913 914 self.assertEqual(expected, output.getvalue()) 915 916 def test_detokenize_base64(self) -> None: 917 for data, expected in self.TEST_CASES: 918 self.assertEqual(expected, self.detok.detokenize_base64(data)) 919 920 def test_detokenize_base64_str(self) -> None: 921 for data, expected in self.TEST_CASES: 922 self.assertEqual( 923 expected.decode(), self.detok.detokenize_base64(data.decode()) 924 ) 925 926 927class DetokenizeInfiniteRecursion(unittest.TestCase): 928 """Tests that infinite Base64 token recursion resolves.""" 929 930 def setUp(self) -> None: 931 super().setUp() 932 self.detok = detokenize.Detokenizer( 933 tokens.Database( 934 [ 935 tokens.TokenizedStringEntry(0, '$AAAAAA=='), # token for 0 936 tokens.TokenizedStringEntry(1, '$AgAAAA=='), # token for 2 937 tokens.TokenizedStringEntry(2, '$#00000003'), # token for 3 938 tokens.TokenizedStringEntry(3, '$AgAAAA=='), # token for 2 939 ] 940 ) 941 ) 942 943 def test_detokenize_self_recursion(self) -> None: 944 for depth in range(5): 945 self.assertEqual( 946 self.detok.detokenize_text( 947 b'This one is deep: $AAAAAA==', recursion=depth 948 ), 949 b'This one is deep: $AAAAAA==', 950 ) 951 952 def test_detokenize_self_recursion_default(self) -> None: 953 self.assertEqual( 954 self.detok.detokenize_text( 955 b'This one is deep: $AAAAAA==', 956 ), 957 b'This one is deep: $AAAAAA==', 958 ) 959 960 def test_detokenize_cyclic_recursion_even(self) -> None: 961 self.assertEqual( 962 self.detok.detokenize_text(b'I said "$AQAAAA=="', recursion=6), 963 b'I said "$AgAAAA=="', 964 ) 965 966 def test_detokenize_cyclic_recursion_odd(self) -> None: 967 self.assertEqual( 968 self.detok.detokenize_text(b'I said "$AQAAAA=="', recursion=7), 969 b'I said "$#00000003"', 970 ) 971 972 973class DetokenizeBase64InfiniteRecursion(unittest.TestCase): 974 """Tests that infinite Bas64 token recursion resolves.""" 975 976 def setUp(self) -> None: 977 super().setUp() 978 self.detok = detokenize.Detokenizer( 979 tokens.Database( 980 [ 981 tokens.TokenizedStringEntry(0, '$AAAAAA=='), # token for 0 982 tokens.TokenizedStringEntry(1, '$AgAAAA=='), # token for 2 983 tokens.TokenizedStringEntry(2, '$AwAAAA=='), # token for 3 984 tokens.TokenizedStringEntry(3, '$AgAAAA=='), # token for 2 985 ] 986 ) 987 ) 988 989 def test_detokenize_self_recursion(self) -> None: 990 for depth in range(5): 991 self.assertEqual( 992 self.detok.detokenize_base64( 993 b'This one is deep: $AAAAAA==', recursion=depth 994 ), 995 b'This one is deep: $AAAAAA==', 996 ) 997 998 def test_detokenize_self_recursion_default(self) -> None: 999 self.assertEqual( 1000 self.detok.detokenize_base64(b'This one is deep: $64#AAAAAA=='), 1001 b'This one is deep: $AAAAAA==', 1002 ) 1003 1004 def test_detokenize_cyclic_recursion_even(self) -> None: 1005 self.assertEqual( 1006 self.detok.detokenize_base64(b'I said "$AQAAAA=="', recursion=2), 1007 b'I said "$AgAAAA=="', 1008 ) 1009 1010 def test_detokenize_cyclic_recursion_odd(self) -> None: 1011 self.assertEqual( 1012 self.detok.detokenize_base64(b'I said "$AQAAAA=="', recursion=3), 1013 b'I said "$AwAAAA=="', 1014 ) 1015 1016 1017class DetokenizeNestedDomains(unittest.TestCase): 1018 """Tests detokenizing nested tokens with specified domains""" 1019 1020 def test_nested_hashed_arg_with_one_domain_match(self) -> None: 1021 detok = detokenize.Detokenizer( 1022 tokens.Database( 1023 [ 1024 tokens.TokenizedStringEntry(0xA, 'domain1', 'D1'), 1025 tokens.TokenizedStringEntry( 1026 2, 'This is all in ' + '${D1}#%08x', 'D1' 1027 ), 1028 ] 1029 ) 1030 ) 1031 self.assertEqual( 1032 str(detok.detokenize(b'\x02\0\0\0\x14')), 1033 'This is all in domain1', 1034 ) 1035 1036 def test_multiple_nested_args_in_one_sentence(self) -> None: 1037 detok = detokenize.Detokenizer( 1038 tokens.Database( 1039 [ 1040 tokens.TokenizedStringEntry(0xA, 'nested token 1', 'D1'), 1041 tokens.TokenizedStringEntry( 1042 2, 1043 'This is ' 1044 + '${D1}#%08x' 1045 + ' and this is ' 1046 + '${D1}#00000003', 1047 'D1', 1048 ), 1049 tokens.TokenizedStringEntry(3, 'nested token 2', 'D1'), 1050 ] 1051 ) 1052 ) 1053 self.assertEqual( 1054 str(detok.detokenize(b'\x02\0\0\0\x14')), 1055 'This is nested token 1 and this is nested token 2', 1056 ) 1057 1058 def test_nested_hashed_arg_with_two_domain_match(self) -> None: 1059 detok = detokenize.Detokenizer( 1060 tokens.Database( 1061 [ 1062 tokens.TokenizedStringEntry( 1063 0xA, 'and this is domain1', 'D1' 1064 ), 1065 tokens.TokenizedStringEntry( 1066 2, 'This is domain2 ' + '${D1}#%08x', 'D2' 1067 ), 1068 ] 1069 ) 1070 ) 1071 self.assertEqual( 1072 str(detok.detokenize(b'\x02\0\0\0\x14')), 1073 'This is domain2 and this is domain1', 1074 ) 1075 1076 def test_nested_hashed_arg_with_different_domains(self) -> None: 1077 detok = detokenize.Detokenizer( 1078 tokens.Database( 1079 [ 1080 tokens.TokenizedStringEntry(0xA, 'domain1', 'D1'), 1081 tokens.TokenizedStringEntry( 1082 2, 'This is all in ' + '${D2}#%08x', 'D1' 1083 ), 1084 ] 1085 ) 1086 ) 1087 result = detok.detokenize(b'\x02\0\0\0\x14') 1088 self.assertFalse(result == 'This is all in domain1') 1089 1090 def test_nested_base64_arg_multiple_domains(self) -> None: 1091 detok = detokenize.Detokenizer( 1092 tokens.Database( 1093 [ 1094 tokens.TokenizedStringEntry(1, '${D5}#00000004', 'D1'), 1095 tokens.TokenizedStringEntry(2, 'This is a %s', 'D1'), 1096 tokens.TokenizedStringEntry(3, 'base64 argument', 'D2'), 1097 tokens.TokenizedStringEntry( 1098 4, 'nested ' + '${D2}#00000003', 'D5' 1099 ), 1100 ] 1101 ) 1102 ) 1103 self.assertEqual( 1104 str(detok.detokenize(b'\x02\0\0\0\x09$AQAAAA==')), # token for 1 1105 'This is a nested base64 argument', 1106 ) 1107 1108 def test_nested_hashed_arg_with_domain_whitespace(self) -> None: 1109 detok = detokenize.Detokenizer( 1110 tokens.Database( 1111 [ 1112 tokens.TokenizedStringEntry( 1113 0xA, 'and this is domain1', 'Domain1' 1114 ), 1115 tokens.TokenizedStringEntry( 1116 2, 'This is domain2 ' + '${Domain 1}#%08x', 'D2' 1117 ), 1118 ] 1119 ) 1120 ) 1121 self.assertEqual( 1122 str(detok.detokenize(b'\x02\0\0\0\x14')), 1123 'This is domain2 and this is domain1', 1124 ) 1125 1126 1127if __name__ == '__main__': 1128 unittest.main() 1129