xref: /aosp_15_r20/external/pigweed/pw_tokenizer/py/detokenize_test.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1#!/usr/bin/env python3
2# Copyright 2020 The Pigweed Authors
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5# use this file except in compliance with the License. You may obtain a copy of
6# the License at
7#
8#     https://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations under
14# the License.
15"""Tests for detokenize."""
16
17import base64
18import concurrent
19import datetime as dt
20import functools
21import io
22import os
23from pathlib import Path
24import struct
25import tempfile
26from typing import Any, Callable, NamedTuple
27import unittest
28from unittest import mock
29
30from pw_tokenizer import database
31from pw_tokenizer import detokenize
32from pw_tokenizer import elf_reader
33from pw_tokenizer import tokens
34
35
36# This function is not part of this test. It was used to generate the binary
37# strings for EMPTY_ELF and ELF_WITH_TOKENIZER_SECTIONS. It takes a path and
38# returns a Python byte string suitable for copying into Python source code.
39def path_to_byte_string(path: str) -> str:
40    with open(path, 'rb') as fd:
41        data = fd.read()
42
43    output: list[str] = []
44    indices = iter(range(len(data)))
45
46    while True:
47        line = ''
48
49        while len(line) < 70:
50            try:
51                i = next(indices)
52            except StopIteration:
53                break
54
55            line += repr(data[i : i + 1])[2:-1].replace("'", r'\'')
56
57        if not line:
58            return ''.join(output)
59
60        output.append("    b'{}'\n".format(''.join(line)))
61
62
63# This is an empty ELF file. It was created from the ELF file for
64# tokenize_test.cc with the command:
65#
66#   arm-none-eabi-objcopy -S --only-section NO_SECTIONS_PLEASE <ELF> <OUTPUT>
67#
68# The resulting ELF was converted to a Python binary string using
69# path_to_byte_string function above.
70EMPTY_ELF = (
71    b'\x7fELF\x01\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00(\x00\x01'
72    b'\x00\x00\x00\xd1\x83\x00\x084\x00\x00\x00\xe0\x00\x00\x00\x00\x04\x00\x05'
73    b'4\x00 \x00\x05\x00(\x00\x02\x00\x01\x00\x01\x00\x00\x00\xd4\x00\x00\x00'
74    b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x07\x00'
75    b'\x00\x00\x00\x00\x01\x00\x01\x00\x00\x00\xd4\x00\x00\x00\x00\x00\x00\x00'
76    b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x06\x00\x00\x00\x00\x00'
77    b'\x01\x00\x01\x00\x00\x00\xd4\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
78    b'\x00\x00\x00\x00\x00\x00\x00\x00\x06\x00\x00\x00\x00\x00\x01\x00\x01\x00'
79    b'\x00\x00\xd4\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
80    b'\x00\x00\x00\x00\x06\x00\x00\x00\x00\x00\x01\x00\x01\x00\x00\x00\xd4\x00'
81    b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
82    b'\x06\x00\x00\x00\x00\x00\x01\x00\x00.shstrtab\x00\x00\x00\x00\x00\x00\x00'
83    b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
84    b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01'
85    b'\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xd4\x00\x00'
86    b'\x00\x0b\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00'
87    b'\x00\x00\x00'
88)
89
90# This is an ELF file with only the pw_tokenizer sections. It was created
91# from a tokenize_test binary built for the STM32F429i Discovery board. The
92# pw_tokenizer sections were extracted with this command:
93#
94#   arm-none-eabi-objcopy -S --only-section ".pw_tokenizer*" <ELF> <OUTPUT>
95#
96ELF_WITH_TOKENIZER_SECTIONS_PATH = Path(__file__).parent.joinpath(
97    'example_binary_with_tokenized_strings.elf'
98)
99ELF_WITH_TOKENIZER_SECTIONS = ELF_WITH_TOKENIZER_SECTIONS_PATH.read_bytes()
100
101DEFAULT_DOMAIN_TOKENS = 22
102
103# 27 total since token 881436a0="The answer is: %s" is in two domains.
104ALL_DOMAIN_TOKENS = 26 + 1
105
106# 0x2e668cd6 is 'Jello, world!' (which is also used in database_test.py).
107JELLO_WORLD_TOKEN = b'\xd6\x8c\x66\x2e'
108
109
110class DetokenizeTest(unittest.TestCase):
111    """Tests the detokenize.Detokenizer."""
112
113    def test_simple(self) -> None:
114        detok = detokenize.Detokenizer(
115            tokens.Database(
116                [
117                    tokens.TokenizedStringEntry(
118                        0xCDAB, '%02d %s %c%%', date_removed=dt.datetime.now()
119                    )
120                ]
121            )
122        )
123        self.assertEqual(
124            str(detok.detokenize(b'\xab\xcd\0\0\x02\x03Two\x66')), '01 Two 3%'
125        )
126
127    def test_detokenize_extra_data_is_unsuccessful(self) -> None:
128        detok = detokenize.Detokenizer(
129            tokens.Database(
130                [
131                    tokens.TokenizedStringEntry(
132                        1, 'no args', date_removed=dt.datetime(1, 1, 1)
133                    )
134                ]
135            )
136        )
137
138        result = detok.detokenize(b'\x01\0\0\0\x04args')
139        self.assertEqual(len(result.failures), 1)
140        string, args, remaining = result.failures[0]
141        self.assertEqual('no args', string)
142        self.assertFalse(args)
143        self.assertEqual(b'\x04args', remaining)
144        self.assertEqual('no args', string)
145        self.assertEqual('no args', str(result))
146
147    def test_detokenize_zero_extend_short_token_with_no_args(self) -> None:
148        detok = detokenize.Detokenizer(
149            tokens.Database(
150                [tokens.TokenizedStringEntry(0xCDAB, 'This token is 16 bits')]
151            )
152        )
153        self.assertEqual(
154            str(detok.detokenize(b'\xab\xcd')), 'This token is 16 bits'
155        )
156
157    def test_detokenize_missing_data_is_unsuccessful(self) -> None:
158        detok = detokenize.Detokenizer(
159            tokens.Database(
160                [
161                    tokens.TokenizedStringEntry(
162                        2, '%s', date_removed=dt.datetime(1, 1, 1)
163                    )
164                ]
165            )
166        )
167
168        result = detok.detokenize(b'\x02\0\0\0')
169        string, args, remaining = result.failures[0]
170        self.assertEqual('%s', string)
171        self.assertEqual(len(args), 1)
172        self.assertEqual(b'', remaining)
173        self.assertEqual(len(result.failures), 1)
174        self.assertEqual('%s', str(result))
175
176    def test_detokenize_missing_data_with_errors_is_unsuccessful(self) -> None:
177        detok = detokenize.Detokenizer(
178            tokens.Database(
179                [
180                    tokens.TokenizedStringEntry(
181                        2, '%s', date_removed=dt.datetime(1, 1, 1)
182                    )
183                ]
184            ),
185            show_errors=True,
186        )
187
188        result = detok.detokenize(b'\x02\0\0\0')
189        string, args, remaining = result.failures[0]
190        self.assertIn('%s MISSING', string)
191        self.assertEqual(len(args), 1)
192        self.assertEqual(b'', remaining)
193        self.assertEqual(len(result.failures), 1)
194        self.assertIn('%s MISSING', str(result))
195
196    def test_unparsed_data(self) -> None:
197        detok = detokenize.Detokenizer(
198            tokens.Database(
199                [
200                    tokens.TokenizedStringEntry(
201                        1, 'no args', date_removed=dt.datetime(100, 1, 1)
202                    ),
203                ]
204            )
205        )
206        result = detok.detokenize(b'\x01\0\0\0o_o')
207        self.assertFalse(result.ok())
208        self.assertEqual('no args', str(result))
209        self.assertIn('o_o', repr(result))
210        self.assertIn('decoding failed', result.error_message())
211
212    def test_empty_db(self) -> None:
213        detok = detokenize.Detokenizer(io.BytesIO(EMPTY_ELF))
214        self.assertFalse(detok.detokenize(b'\x12\x34\0\0').ok())
215        self.assertIn(
216            'unknown token', detok.detokenize(b'1234').error_message()
217        )
218        self.assertIn('unknown token', repr(detok.detokenize(b'1234')))
219
220        self.assertEqual(
221            '$' + base64.b64encode(b'1234').decode(),
222            str(detok.detokenize(b'1234')),
223        )
224
225        self.assertIsNone(detok.detokenize(b'').token)
226
227    def test_empty_db_show_errors(self) -> None:
228        detok = detokenize.Detokenizer(io.BytesIO(EMPTY_ELF), show_errors=True)
229        self.assertFalse(detok.detokenize(b'\x12\x34\0\0').ok())
230        self.assertIn(
231            'unknown token', detok.detokenize(b'1234').error_message()
232        )
233        self.assertIn('unknown token', repr(detok.detokenize(b'1234')))
234        self.assertIn('unknown token', str(detok.detokenize(b'1234')))
235
236        self.assertIsNone(detok.detokenize(b'').token)
237
238    def test_missing_token_show_errors(self) -> None:
239        detok = detokenize.Detokenizer(io.BytesIO(EMPTY_ELF), show_errors=True)
240        self.assertIn('missing token', detok.detokenize(b'').error_message())
241        self.assertIn('missing token', str(detok.detokenize(b'')))
242
243    def test_missing_token(self) -> None:
244        detok = detokenize.Detokenizer(io.BytesIO(EMPTY_ELF))
245        self.assertIn('missing token', detok.detokenize(b'').error_message())
246        self.assertEqual('$', str(detok.detokenize(b'')))
247
248    def test_unknown_shorter_token_show_error(self) -> None:
249        detok = detokenize.Detokenizer(io.BytesIO(EMPTY_ELF), show_errors=True)
250
251        self.assertIn('unknown token', detok.detokenize(b'1').error_message())
252        self.assertIn('unknown token', str(detok.detokenize(b'1')))
253        self.assertIn('unknown token', repr(detok.detokenize(b'1')))
254
255        self.assertIn('unknown token', detok.detokenize(b'123').error_message())
256        self.assertIn('unknown token', str(detok.detokenize(b'123')))
257        self.assertIn('unknown token', repr(detok.detokenize(b'123')))
258
259    def test_unknown_shorter_token(self) -> None:
260        detok = detokenize.Detokenizer(io.BytesIO(EMPTY_ELF))
261
262        self.assertEqual(
263            'unknown token 00000001', detok.detokenize(b'\1').error_message()
264        )
265        self.assertEqual(
266            '$' + base64.b64encode(b'\1\0\0\0').decode(),
267            str(detok.detokenize(b'\1')),
268        )
269        self.assertIn('unknown token 00000001', repr(detok.detokenize(b'\1')))
270
271        self.assertEqual(
272            'unknown token 00030201',
273            detok.detokenize(b'\1\2\3').error_message(),
274        )
275        self.assertEqual(
276            '$' + base64.b64encode(b'\1\2\3\0').decode(),
277            str(detok.detokenize(b'\1\2\3')),
278        )
279        self.assertIn(
280            'unknown token 00030201', repr(detok.detokenize(b'\1\2\3'))
281        )
282
283    def test_decode_from_elf_data(self) -> None:
284        detok = detokenize.Detokenizer(io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS))
285
286        self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok())
287        self.assertEqual(
288            str(detok.detokenize(JELLO_WORLD_TOKEN)), 'Jello, world!'
289        )
290
291        undecoded_args = detok.detokenize(JELLO_WORLD_TOKEN + b'some junk')
292        self.assertFalse(undecoded_args.ok())
293        self.assertEqual(str(undecoded_args), 'Jello, world!')
294
295        self.assertTrue(detok.detokenize(b'\0\0\0\0').ok())
296        self.assertEqual(str(detok.detokenize(b'\0\0\0\0')), '')
297
298    def test_decode_from_elf_file(self) -> None:
299        """Test decoding from an elf file."""
300        detok = detokenize.Detokenizer(io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS))
301        expected_tokens = frozenset(detok.database.token_to_entries.keys())
302
303        with tempfile.NamedTemporaryFile('wb', delete=False) as elf:
304            try:
305                elf.write(ELF_WITH_TOKENIZER_SECTIONS)
306                elf.close()
307
308                # Open ELF by file object
309                with open(elf.name, 'rb') as fd:
310                    detok = detokenize.Detokenizer(fd)
311
312                self.assertEqual(
313                    expected_tokens,
314                    frozenset(detok.database.token_to_entries.keys()),
315                )
316
317                # Open ELF by path
318                detok = detokenize.Detokenizer(elf.name)
319                self.assertEqual(
320                    expected_tokens,
321                    frozenset(detok.database.token_to_entries.keys()),
322                )
323
324                # Open ELF by elf_reader.Elf
325                with open(elf.name, 'rb') as fd:
326                    detok = detokenize.Detokenizer(elf_reader.Elf(fd))
327
328                self.assertEqual(
329                    expected_tokens,
330                    frozenset(detok.database.token_to_entries.keys()),
331                )
332            finally:
333                os.unlink(elf.name)
334
335    def test_decode_from_csv_file(self) -> None:
336        detok = detokenize.Detokenizer(io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS))
337        expected_tokens = frozenset(detok.database.token_to_entries.keys())
338
339        csv_database = str(detok.database)
340        self.assertEqual(len(csv_database.splitlines()), ALL_DOMAIN_TOKENS)
341
342        with tempfile.NamedTemporaryFile('w', delete=False) as csv_file:
343            try:
344                csv_file.write(csv_database)
345                csv_file.close()
346
347                # Open CSV by path
348                detok = detokenize.Detokenizer(csv_file.name)
349                self.assertEqual(
350                    expected_tokens,
351                    frozenset(detok.database.token_to_entries.keys()),
352                )
353
354                # Open CSV by file object
355                with open(csv_file.name) as fd:
356                    detok = detokenize.Detokenizer(fd)
357
358                self.assertEqual(
359                    expected_tokens,
360                    frozenset(detok.database.token_to_entries.keys()),
361                )
362            finally:
363                os.unlink(csv_file.name)
364
365    def test_create_detokenizer_with_token_database(self) -> None:
366        detok = detokenize.Detokenizer(io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS))
367        expected_tokens = frozenset(detok.database.token_to_entries.keys())
368
369        detok = detokenize.Detokenizer(detok.database)
370        self.assertEqual(
371            expected_tokens, frozenset(detok.database.token_to_entries.keys())
372        )
373
374
375class DetokenizeWithCollisions(unittest.TestCase):
376    """Tests collision resolution."""
377
378    def setUp(self) -> None:
379        super().setUp()
380        token = 0xBAAD
381
382        # Database with several conflicting tokens.
383        self.detok = detokenize.Detokenizer(
384            tokens.Database(
385                [
386                    tokens.TokenizedStringEntry(
387                        token, 'REMOVED', date_removed=dt.datetime(9, 1, 1)
388                    ),
389                    tokens.TokenizedStringEntry(token, 'newer'),
390                    tokens.TokenizedStringEntry(
391                        token, 'A: %d', date_removed=dt.datetime(30, 5, 9)
392                    ),
393                    tokens.TokenizedStringEntry(
394                        token, 'B: %c', date_removed=dt.datetime(30, 5, 10)
395                    ),
396                    tokens.TokenizedStringEntry(token, 'C: %s'),
397                    tokens.TokenizedStringEntry(token, '%d%u'),
398                    tokens.TokenizedStringEntry(token, '%s%u %d'),
399                    tokens.TokenizedStringEntry(1, '%s'),
400                    tokens.TokenizedStringEntry(1, '%d'),
401                    tokens.TokenizedStringEntry(2, 'Three %s %s %s'),
402                    tokens.TokenizedStringEntry(2, 'Five %d %d %d %d %s'),
403                ]
404            )
405        )
406
407    def test_collision_no_args_favors_most_recently_present(self) -> None:
408        no_args = self.detok.detokenize(b'\xad\xba\0\0')
409        self.assertFalse(no_args.ok())
410        self.assertEqual(len(no_args.successes), 2)
411        self.assertEqual(len(no_args.failures), 5)
412        self.assertEqual(len(no_args.matches()), 7)
413        self.assertEqual(str(no_args), 'newer')
414        best_result = no_args.best_result()
415        assert best_result is not None
416        self.assertEqual(len(best_result.args), 0)
417        self.assertEqual(best_result[0], 'newer')
418
419    def test_collision_one_integer_arg_favors_most_recently_present(
420        self,
421    ) -> None:
422        multiple_correct = self.detok.detokenize(b'\xad\xba\0\0\x7a')
423        self.assertFalse(multiple_correct.ok())
424        self.assertIn('ERROR', repr(multiple_correct))
425        self.assertEqual(len(multiple_correct.successes), 2)
426        self.assertEqual(len(multiple_correct.failures), 5)
427        self.assertEqual(len(multiple_correct.matches()), 7)
428        self.assertEqual(str(multiple_correct), 'B: =')
429
430    def test_collision_one_integer_arg_favor_successful_decode(self) -> None:
431        # One string decodes successfully, since the arg is out of range for %c.
432        int_arg = self.detok.detokenize(b'\xad\xba\0\0\xfe\xff\xff\xff\x0f')
433        self.assertTrue(int_arg.ok())
434        self.assertEqual(str(int_arg), 'A: 2147483647')
435
436    def test_collision_one_string_arg_favors_successful_decode(self) -> None:
437        # One string decodes successfully, since decoding the argument as an
438        # integer does not decode all the data.
439        string_arg = self.detok.detokenize(b'\xad\xba\0\0\x02Hi')
440        self.assertTrue(string_arg.ok())
441        self.assertEqual(str(string_arg), 'C: Hi')
442
443    def test_collision_one_string_arg_favors_decoding_all_data(self) -> None:
444        result = self.detok.detokenize(b'\1\0\0\0\x83hi')
445        self.assertEqual(len(result.failures), 2)
446        # Should resolve to the string since %d would leave one byte behind.
447        self.assertEqual(str(result), '%s')
448
449    def test_collision_multiple_args_favors_decoding_more_arguments(
450        self,
451    ) -> None:
452        result = self.detok.detokenize(b'\2\0\0\0\1\2\1\4\5')
453        self.assertEqual(len(result.matches()), 2)
454        self.assertEqual(result.matches()[0][0], 'Five -1 1 -1 2 %s')
455        self.assertEqual(result.matches()[1][0], 'Three \2 \4 %s')
456
457    def test_collision_multiple_args_favors_decoding_all_arguments(
458        self,
459    ) -> None:
460        unambiguous = self.detok.detokenize(b'\xad\xba\0\0\x01#\x00\x01')
461        self.assertTrue(unambiguous.ok())
462        self.assertEqual(len(unambiguous.matches()), 7)
463        self.assertEqual('#0 -1', str(unambiguous))
464        self.assertIn('#0 -1', repr(unambiguous))
465
466
467class ManualPoolExecutor(concurrent.futures.Executor):
468    """A stubbed pool executor that captures the most recent work request
469    and holds it until the public process method is manually called."""
470
471    def __init__(self) -> None:
472        super().__init__()
473        self._func = None
474
475    # pylint: disable=arguments-differ
476    def submit(self, func, *args, **kwargs):
477        """Submits work to the pool, stashing the partial for later use."""
478        self._func = functools.partial(func, *args, **kwargs)
479
480    def process(self) -> None:
481        """Processes the latest func submitted to the pool."""
482        if self._func is not None:
483            self._func()
484            self._func = None
485
486
487class InlinePoolExecutor(concurrent.futures.Executor):
488    """A stubbed pool executor that runs work immediately, inline."""
489
490    # pylint: disable=arguments-differ
491    def submit(self, func, *args, **kwargs):
492        """Submits work to the pool, stashing the partial for later use."""
493        func(*args, **kwargs)
494
495
496@mock.patch('os.path.getmtime')
497class AutoUpdatingDetokenizerTest(unittest.TestCase):
498    """Tests the AutoUpdatingDetokenizer class."""
499
500    def test_update(self, mock_getmtime) -> None:
501        """Tests the update command."""
502
503        db = database.load_token_database(
504            io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS)
505        )
506        self.assertEqual(len(db), ALL_DOMAIN_TOKENS)
507
508        the_time = [100]
509
510        def move_back_time_if_file_exists(path) -> int:
511            if os.path.exists(path):
512                the_time[0] -= 1
513                return the_time[0]
514
515            raise FileNotFoundError
516
517        mock_getmtime.side_effect = move_back_time_if_file_exists
518
519        with tempfile.NamedTemporaryFile('wb', delete=False) as file:
520            try:
521                file.close()
522
523                pool = ManualPoolExecutor()
524                detok = detokenize.AutoUpdatingDetokenizer(
525                    file.name, min_poll_period_s=0, pool=pool
526                )
527                self.assertFalse(detok.detokenize(JELLO_WORLD_TOKEN).ok())
528
529                with open(file.name, 'wb') as fd:
530                    tokens.write_binary(db, fd)
531
532                # After the change but before the pool runs in another thread,
533                # the token should not exist.
534                self.assertFalse(detok.detokenize(JELLO_WORLD_TOKEN).ok())
535
536                # After the pool is allowed to process, it should.
537                pool.process()
538                self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok())
539            finally:
540                os.unlink(file.name)
541
542    def test_update_with_directory(self, mock_getmtime) -> None:
543        """Tests the update command with a directory format database."""
544        db = database.load_token_database(
545            io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS)
546        )
547        self.assertEqual(len(db), ALL_DOMAIN_TOKENS)
548
549        the_time = [100]
550
551        def move_back_time_if_file_exists(path: str) -> int:
552            if os.path.exists(path):
553                the_time[0] -= 1
554                return the_time[0]
555
556            raise FileNotFoundError
557
558        mock_getmtime.side_effect = move_back_time_if_file_exists
559
560        with tempfile.TemporaryDirectory() as dbdir:
561            with tempfile.NamedTemporaryFile(
562                'wb', delete=False, suffix='.pw_tokenizer.csv', dir=dbdir
563            ) as matching_suffix_file, tempfile.NamedTemporaryFile(
564                'wb', delete=False, suffix='.not.right', dir=dbdir
565            ) as mismatched_suffix_file:
566                try:
567                    matching_suffix_file.close()
568                    mismatched_suffix_file.close()
569
570                    pool = ManualPoolExecutor()
571                    detok = detokenize.AutoUpdatingDetokenizer(
572                        dbdir, min_poll_period_s=0, pool=pool
573                    )
574                    self.assertFalse(detok.detokenize(JELLO_WORLD_TOKEN).ok())
575
576                    with open(mismatched_suffix_file.name, 'wb') as fd:
577                        tokens.write_csv(db, fd)
578                    pool.process()
579                    self.assertFalse(detok.detokenize(JELLO_WORLD_TOKEN).ok())
580
581                    with open(matching_suffix_file.name, 'wb') as fd:
582                        tokens.write_csv(db, fd)
583
584                    # After the change but before the pool runs in another
585                    # thread, the token should not exist.
586                    self.assertFalse(detok.detokenize(JELLO_WORLD_TOKEN).ok())
587                    pool.process()
588
589                    # After the pool is allowed to process, it should.
590                    self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok())
591                finally:
592                    os.unlink(mismatched_suffix_file.name)
593                    os.unlink(matching_suffix_file.name)
594                    os.rmdir(dbdir)
595
596        # The database stays around if the file is deleted.
597        self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok())
598
599    def test_no_update_if_time_is_same(self, mock_getmtime) -> None:
600        mock_getmtime.return_value = 100
601
602        with tempfile.NamedTemporaryFile('wb', delete=False) as file:
603            try:
604                tokens.write_csv(
605                    database.load_token_database(
606                        io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS)
607                    ),
608                    file,
609                )
610                file.close()
611
612                detok = detokenize.AutoUpdatingDetokenizer(
613                    file.name, min_poll_period_s=0, pool=InlinePoolExecutor()
614                )
615                self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok())
616
617                # Empty the database, but keep the mock modified time the same.
618                with open(file.name, 'wb'):
619                    pass
620
621                self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok())
622                self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok())
623
624                # Move back time so the now-empty file is reloaded.
625                mock_getmtime.return_value = 50
626                self.assertFalse(detok.detokenize(JELLO_WORLD_TOKEN).ok())
627            finally:
628                os.unlink(file.name)
629
630    def test_token_domain_in_str(self, _) -> None:
631        """Tests a str containing a domain."""
632        detok = detokenize.AutoUpdatingDetokenizer(
633            f'{ELF_WITH_TOKENIZER_SECTIONS_PATH}#',  # Default domain
634            min_poll_period_s=0,
635            pool=InlinePoolExecutor(),
636        )
637        self.assertEqual(len(detok.database), DEFAULT_DOMAIN_TOKENS)
638
639    def test_token_domain_in_path(self, _) -> None:
640        """Tests a Path() containing a domain."""
641        detok = detokenize.AutoUpdatingDetokenizer(
642            Path(f'{ELF_WITH_TOKENIZER_SECTIONS_PATH}#'),
643            min_poll_period_s=0,
644            pool=InlinePoolExecutor(),
645        )
646        self.assertEqual(len(detok.database), DEFAULT_DOMAIN_TOKENS)
647
648    def test_token_no_domain_in_str(self, _) -> None:
649        """Tests a str without a domain, which loads all domains."""
650        detok = detokenize.AutoUpdatingDetokenizer(
651            str(ELF_WITH_TOKENIZER_SECTIONS_PATH),
652            min_poll_period_s=0,
653            pool=InlinePoolExecutor(),
654        )
655        self.assertEqual(len(detok.database), ALL_DOMAIN_TOKENS)
656
657    def test_token_no_domain_in_path(self, _) -> None:
658        """Tests a Path() without a domain, which loads all domains."""
659        detok = detokenize.AutoUpdatingDetokenizer(
660            ELF_WITH_TOKENIZER_SECTIONS_PATH,
661            min_poll_period_s=0,
662            pool=InlinePoolExecutor(),
663        )
664        self.assertEqual(len(detok.database), ALL_DOMAIN_TOKENS)
665
666    def test_invalid_domain_specification(self, _) -> None:
667        with self.assertRaises(ValueError, msg='Too many # delimiters'):
668            detokenize.AutoUpdatingDetokenizer(
669                f'{ELF_WITH_TOKENIZER_SECTIONS_PATH}##',
670                min_poll_period_s=0,
671                pool=InlinePoolExecutor(),
672            )
673
674
675def _next_char(message: bytes) -> bytes:
676    return bytes(b + 1 for b in message)
677
678
679class NestedMessageParserTest(unittest.TestCase):
680    """Tests parsing prefixed messages."""
681
682    class _Case(NamedTuple):
683        data: bytes
684        expected: bytes
685        title: str
686        transform: Callable[[bytes], bytes] = _next_char
687
688    TRANSFORM_TEST_CASES = (
689        _Case(b'$abcd', b'%bcde', 'single message'),
690        _Case(
691            b'$$WHAT?$abc$WHY? is this $ok $',
692            b'%%WHAT?%bcd%WHY? is this %ok %',
693            'message and non-message',
694        ),
695        _Case(b'$1$', b'%1%', 'empty message'),
696        _Case(b'$abc$defgh', b'%bcd%efghh', 'sequential message'),
697        _Case(
698            b'w$abcx$defygh$$abz',
699            b'w$ABCx$DEFygh$$ABz',
700            'interspersed start/end non-message',
701            bytes.upper,
702        ),
703        _Case(
704            b'$abcx$defygh$$ab',
705            b'$ABCx$DEFygh$$AB',
706            'interspersed start/end message ',
707            bytes.upper,
708        ),
709    )
710
711    def setUp(self) -> None:
712        self.decoder = detokenize.NestedMessageParser('$', 'abcdefg')
713
714    def test_transform_io(self) -> None:
715        for data, expected, title, transform in self.TRANSFORM_TEST_CASES:
716            self.assertEqual(
717                expected,
718                b''.join(
719                    self.decoder.transform_io(io.BytesIO(data), transform)
720                ),
721                f'{title}: {data!r}',
722            )
723
724    def test_transform_bytes_with_flush(self) -> None:
725        for data, expected, title, transform in self.TRANSFORM_TEST_CASES:
726            self.assertEqual(
727                expected,
728                self.decoder.transform(data, transform, flush=True),
729                f'{title}: {data!r}',
730            )
731
732    def test_transform_bytes_sequential(self) -> None:
733        def transform(message):
734            return message.upper().replace(b'$', b'*')
735
736        self.assertEqual(self.decoder.transform(b'abc$abcd', transform), b'abc')
737        self.assertEqual(self.decoder.transform(b'$', transform), b'*ABCD')
738        self.assertEqual(self.decoder.transform(b'$b', transform), b'*')
739        self.assertEqual(self.decoder.transform(b'', transform), b'')
740        self.assertEqual(self.decoder.transform(b' ', transform), b'*B ')
741        self.assertEqual(self.decoder.transform(b'hello', transform), b'hello')
742        self.assertEqual(self.decoder.transform(b'?? $ab', transform), b'?? ')
743        self.assertEqual(
744            self.decoder.transform(b'123$ab4$56$a', transform), b'*AB123*AB4*56'
745        )
746        self.assertEqual(
747            self.decoder.transform(b'bc', transform, flush=True), b'*ABC'
748        )
749
750    MESSAGES_TEST: Any = (
751        (b'123$abc456$a', (False, b'123'), (True, b'$abc'), (False, b'456')),
752        (b'7$abcd', (True, b'$a'), (False, b'7')),
753        (b'e',),
754        (b'',),
755        (b'$', (True, b'$abcde')),
756        (b'$', (True, b'$')),
757        (b'$a$b$c', (True, b'$'), (True, b'$a'), (True, b'$b')),
758        (b'1', (True, b'$c'), (False, b'1')),
759        (b'',),
760        (b'?', (False, b'?')),
761        (b'!@', (False, b'!@')),
762        (b'%^&', (False, b'%^&')),
763    )
764
765    def test_read_messages(self) -> None:
766        for step in self.MESSAGES_TEST:
767            data: bytes = step[0]
768            pieces: tuple[tuple[bool, bytes], ...] = step[1:]
769            self.assertEqual(tuple(self.decoder.read_messages(data)), pieces)
770
771    def test_read_messages_flush(self) -> None:
772        self.assertEqual(
773            list(self.decoder.read_messages(b'123$a')), [(False, b'123')]
774        )
775        self.assertEqual(list(self.decoder.read_messages(b'b')), [])
776        self.assertEqual(
777            list(self.decoder.read_messages(b'', flush=True)), [(True, b'$ab')]
778        )
779
780    def test_read_messages_io(self) -> None:
781        # Rework the read_messages test data for stream input.
782        data = io.BytesIO(b''.join(step[0] for step in self.MESSAGES_TEST))
783        expected_pieces = sum((step[1:] for step in self.MESSAGES_TEST), ())
784
785        result = self.decoder.read_messages_io(data)
786        for expected_is_message, expected_data in expected_pieces:
787            if expected_is_message:
788                is_message, piece = next(result)
789                self.assertTrue(is_message)
790                self.assertEqual(expected_data, piece)
791            else:  # the IO version yields non-messages byte by byte
792                for byte in expected_data:
793                    is_message, piece = next(result)
794                    self.assertFalse(is_message)
795                    self.assertEqual(bytes([byte]), piece)
796
797
798class DetokenizeNested(unittest.TestCase):
799    """Tests detokenizing nested tokens"""
800
801    def test_nested_hashed_arg(self) -> None:
802        detok = detokenize.Detokenizer(
803            tokens.Database(
804                [
805                    tokens.TokenizedStringEntry(0xA, 'tokenized argument'),
806                    tokens.TokenizedStringEntry(
807                        2,
808                        'This is a ' + '$#%08x',
809                    ),
810                ]
811            )
812        )
813        self.assertEqual(
814            str(detok.detokenize(b'\x02\0\0\0\x14')),
815            'This is a tokenized argument',
816        )
817
818    def test_nested_base64_arg(self) -> None:
819        detok = detokenize.Detokenizer(
820            tokens.Database(
821                [
822                    tokens.TokenizedStringEntry(1, 'base64 argument'),
823                    tokens.TokenizedStringEntry(2, 'This is a %s'),
824                ]
825            )
826        )
827        self.assertEqual(
828            str(detok.detokenize(b'\x02\0\0\0\x09$AQAAAA==')),  # token for 1
829            'This is a base64 argument',
830        )
831
832    def test_deeply_nested_arg(self) -> None:
833        detok = detokenize.Detokenizer(
834            tokens.Database(
835                [
836                    tokens.TokenizedStringEntry(1, '$10#0000000005'),
837                    tokens.TokenizedStringEntry(2, 'This is a $#%08x'),
838                    tokens.TokenizedStringEntry(3, 'deeply nested argument'),
839                    tokens.TokenizedStringEntry(4, '$AQAAAA=='),
840                    tokens.TokenizedStringEntry(5, '$AwAAAA=='),
841                ]
842            )
843        )
844        self.assertEqual(
845            str(detok.detokenize(b'\x02\0\0\0\x08')),  # token for 4
846            'This is a deeply nested argument',
847        )
848
849
850class DetokenizeBase64(unittest.TestCase):
851    """Tests detokenizing Base64 messages."""
852
853    JELLO = b'$' + base64.b64encode(JELLO_WORLD_TOKEN)
854
855    RECURSION_STRING = f'The secret message is "{JELLO.decode()}"'
856    RECURSION = b'$' + base64.b64encode(
857        struct.pack('I', tokens.c_hash(RECURSION_STRING))
858    )
859
860    RECURSION_STRING_2 = f"'{RECURSION.decode()}', said the spy."
861    RECURSION_2 = b'$' + base64.b64encode(
862        struct.pack('I', tokens.c_hash(RECURSION_STRING_2))
863    )
864
865    TEST_CASES = (
866        (b'', b''),
867        (b'nothing here', b'nothing here'),
868        (JELLO, b'Jello, world!'),
869        (JELLO + b'a', b'Jello, world!a'),
870        (JELLO + b'abc', b'Jello, world!abc'),
871        (JELLO + b'abc=', b'Jello, world!abc='),
872        (b'$a' + JELLO + b'a', b'$aJello, world!a'),
873        (b'Hello ' + JELLO + b'?', b'Hello Jello, world!?'),
874        (b'$' + JELLO, b'$Jello, world!'),
875        (JELLO + JELLO, b'Jello, world!Jello, world!'),
876        (JELLO + b'$' + JELLO, b'Jello, world!$Jello, world!'),
877        (JELLO + b'$a' + JELLO + b'bcd', b'Jello, world!$aJello, world!bcd'),
878        (b'$3141', b'$3141'),
879        (JELLO + b'$3141', b'Jello, world!$3141'),
880        (
881            JELLO + b'$a' + JELLO + b'b' + JELLO + b'c',
882            b'Jello, world!$aJello, world!bJello, world!c',
883        ),
884        (RECURSION, b'The secret message is "Jello, world!"'),
885        (
886            RECURSION_2,
887            b'\'The secret message is "Jello, world!"\', said the spy.',
888        ),
889    )
890
891    def setUp(self) -> None:
892        super().setUp()
893        db = database.load_token_database(
894            io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS)
895        )
896        db.add(
897            tokens.TokenizedStringEntry(tokens.c_hash(s), s)
898            for s in [self.RECURSION_STRING, self.RECURSION_STRING_2]
899        )
900        self.detok = detokenize.Detokenizer(db)
901
902    def test_detokenize_base64_live(self) -> None:
903        for data, expected in self.TEST_CASES:
904            output = io.BytesIO()
905            self.detok.detokenize_base64_live(io.BytesIO(data), output)
906
907            self.assertEqual(expected, output.getvalue(), f'Input: {data!r}')
908
909    def test_detokenize_base64_to_file(self) -> None:
910        for data, expected in self.TEST_CASES:
911            output = io.BytesIO()
912            self.detok.detokenize_base64_to_file(data, output)
913
914            self.assertEqual(expected, output.getvalue())
915
916    def test_detokenize_base64(self) -> None:
917        for data, expected in self.TEST_CASES:
918            self.assertEqual(expected, self.detok.detokenize_base64(data))
919
920    def test_detokenize_base64_str(self) -> None:
921        for data, expected in self.TEST_CASES:
922            self.assertEqual(
923                expected.decode(), self.detok.detokenize_base64(data.decode())
924            )
925
926
927class DetokenizeInfiniteRecursion(unittest.TestCase):
928    """Tests that infinite Base64 token recursion resolves."""
929
930    def setUp(self) -> None:
931        super().setUp()
932        self.detok = detokenize.Detokenizer(
933            tokens.Database(
934                [
935                    tokens.TokenizedStringEntry(0, '$AAAAAA=='),  # token for 0
936                    tokens.TokenizedStringEntry(1, '$AgAAAA=='),  # token for 2
937                    tokens.TokenizedStringEntry(2, '$#00000003'),  # token for 3
938                    tokens.TokenizedStringEntry(3, '$AgAAAA=='),  # token for 2
939                ]
940            )
941        )
942
943    def test_detokenize_self_recursion(self) -> None:
944        for depth in range(5):
945            self.assertEqual(
946                self.detok.detokenize_text(
947                    b'This one is deep: $AAAAAA==', recursion=depth
948                ),
949                b'This one is deep: $AAAAAA==',
950            )
951
952    def test_detokenize_self_recursion_default(self) -> None:
953        self.assertEqual(
954            self.detok.detokenize_text(
955                b'This one is deep: $AAAAAA==',
956            ),
957            b'This one is deep: $AAAAAA==',
958        )
959
960    def test_detokenize_cyclic_recursion_even(self) -> None:
961        self.assertEqual(
962            self.detok.detokenize_text(b'I said "$AQAAAA=="', recursion=6),
963            b'I said "$AgAAAA=="',
964        )
965
966    def test_detokenize_cyclic_recursion_odd(self) -> None:
967        self.assertEqual(
968            self.detok.detokenize_text(b'I said "$AQAAAA=="', recursion=7),
969            b'I said "$#00000003"',
970        )
971
972
973class DetokenizeBase64InfiniteRecursion(unittest.TestCase):
974    """Tests that infinite Bas64 token recursion resolves."""
975
976    def setUp(self) -> None:
977        super().setUp()
978        self.detok = detokenize.Detokenizer(
979            tokens.Database(
980                [
981                    tokens.TokenizedStringEntry(0, '$AAAAAA=='),  # token for 0
982                    tokens.TokenizedStringEntry(1, '$AgAAAA=='),  # token for 2
983                    tokens.TokenizedStringEntry(2, '$AwAAAA=='),  # token for 3
984                    tokens.TokenizedStringEntry(3, '$AgAAAA=='),  # token for 2
985                ]
986            )
987        )
988
989    def test_detokenize_self_recursion(self) -> None:
990        for depth in range(5):
991            self.assertEqual(
992                self.detok.detokenize_base64(
993                    b'This one is deep: $AAAAAA==', recursion=depth
994                ),
995                b'This one is deep: $AAAAAA==',
996            )
997
998    def test_detokenize_self_recursion_default(self) -> None:
999        self.assertEqual(
1000            self.detok.detokenize_base64(b'This one is deep: $64#AAAAAA=='),
1001            b'This one is deep: $AAAAAA==',
1002        )
1003
1004    def test_detokenize_cyclic_recursion_even(self) -> None:
1005        self.assertEqual(
1006            self.detok.detokenize_base64(b'I said "$AQAAAA=="', recursion=2),
1007            b'I said "$AgAAAA=="',
1008        )
1009
1010    def test_detokenize_cyclic_recursion_odd(self) -> None:
1011        self.assertEqual(
1012            self.detok.detokenize_base64(b'I said "$AQAAAA=="', recursion=3),
1013            b'I said "$AwAAAA=="',
1014        )
1015
1016
1017class DetokenizeNestedDomains(unittest.TestCase):
1018    """Tests detokenizing nested tokens with specified domains"""
1019
1020    def test_nested_hashed_arg_with_one_domain_match(self) -> None:
1021        detok = detokenize.Detokenizer(
1022            tokens.Database(
1023                [
1024                    tokens.TokenizedStringEntry(0xA, 'domain1', 'D1'),
1025                    tokens.TokenizedStringEntry(
1026                        2, 'This is all in ' + '${D1}#%08x', 'D1'
1027                    ),
1028                ]
1029            )
1030        )
1031        self.assertEqual(
1032            str(detok.detokenize(b'\x02\0\0\0\x14')),
1033            'This is all in domain1',
1034        )
1035
1036    def test_multiple_nested_args_in_one_sentence(self) -> None:
1037        detok = detokenize.Detokenizer(
1038            tokens.Database(
1039                [
1040                    tokens.TokenizedStringEntry(0xA, 'nested token 1', 'D1'),
1041                    tokens.TokenizedStringEntry(
1042                        2,
1043                        'This is '
1044                        + '${D1}#%08x'
1045                        + ' and this is '
1046                        + '${D1}#00000003',
1047                        'D1',
1048                    ),
1049                    tokens.TokenizedStringEntry(3, 'nested token 2', 'D1'),
1050                ]
1051            )
1052        )
1053        self.assertEqual(
1054            str(detok.detokenize(b'\x02\0\0\0\x14')),
1055            'This is nested token 1 and this is nested token 2',
1056        )
1057
1058    def test_nested_hashed_arg_with_two_domain_match(self) -> None:
1059        detok = detokenize.Detokenizer(
1060            tokens.Database(
1061                [
1062                    tokens.TokenizedStringEntry(
1063                        0xA, 'and this is domain1', 'D1'
1064                    ),
1065                    tokens.TokenizedStringEntry(
1066                        2, 'This is domain2 ' + '${D1}#%08x', 'D2'
1067                    ),
1068                ]
1069            )
1070        )
1071        self.assertEqual(
1072            str(detok.detokenize(b'\x02\0\0\0\x14')),
1073            'This is domain2 and this is domain1',
1074        )
1075
1076    def test_nested_hashed_arg_with_different_domains(self) -> None:
1077        detok = detokenize.Detokenizer(
1078            tokens.Database(
1079                [
1080                    tokens.TokenizedStringEntry(0xA, 'domain1', 'D1'),
1081                    tokens.TokenizedStringEntry(
1082                        2, 'This is all in ' + '${D2}#%08x', 'D1'
1083                    ),
1084                ]
1085            )
1086        )
1087        result = detok.detokenize(b'\x02\0\0\0\x14')
1088        self.assertFalse(result == 'This is all in domain1')
1089
1090    def test_nested_base64_arg_multiple_domains(self) -> None:
1091        detok = detokenize.Detokenizer(
1092            tokens.Database(
1093                [
1094                    tokens.TokenizedStringEntry(1, '${D5}#00000004', 'D1'),
1095                    tokens.TokenizedStringEntry(2, 'This is a %s', 'D1'),
1096                    tokens.TokenizedStringEntry(3, 'base64 argument', 'D2'),
1097                    tokens.TokenizedStringEntry(
1098                        4, 'nested ' + '${D2}#00000003', 'D5'
1099                    ),
1100                ]
1101            )
1102        )
1103        self.assertEqual(
1104            str(detok.detokenize(b'\x02\0\0\0\x09$AQAAAA==')),  # token for 1
1105            'This is a nested base64 argument',
1106        )
1107
1108    def test_nested_hashed_arg_with_domain_whitespace(self) -> None:
1109        detok = detokenize.Detokenizer(
1110            tokens.Database(
1111                [
1112                    tokens.TokenizedStringEntry(
1113                        0xA, 'and this is domain1', 'Domain1'
1114                    ),
1115                    tokens.TokenizedStringEntry(
1116                        2, 'This is domain2 ' + '${Domain 1}#%08x', 'D2'
1117                    ),
1118                ]
1119            )
1120        )
1121        self.assertEqual(
1122            str(detok.detokenize(b'\x02\0\0\0\x14')),
1123            'This is domain2 and this is domain1',
1124        )
1125
1126
1127if __name__ == '__main__':
1128    unittest.main()
1129