xref: /aosp_15_r20/external/pigweed/pw_tokenizer/py/pw_tokenizer/detokenize.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1#!/usr/bin/env python3
2# Copyright 2020 The Pigweed Authors
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5# use this file except in compliance with the License. You may obtain a copy of
6# the License at
7#
8#     https://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations under
14# the License.
15r"""Decodes and detokenizes strings from binary or Base64 input.
16
17The main class provided by this module is the Detokenize class. To use it,
18construct it with the path to an ELF or CSV database, a tokens.Database,
19or a file object for an ELF file or CSV. Then, call the detokenize method with
20encoded messages, one at a time. The detokenize method returns a
21DetokenizedString object with the result.
22
23For example::
24
25  from pw_tokenizer import detokenize
26
27  detok = detokenize.Detokenizer('path/to/firmware/image.elf')
28  print(detok.detokenize(b'\x12\x34\x56\x78\x03hi!'))
29
30This module also provides a command line interface for decoding and detokenizing
31messages from a file or stdin.
32"""
33
34import argparse
35import base64
36import binascii
37from concurrent.futures import Executor, ThreadPoolExecutor
38import enum
39import io
40import logging
41import os
42from pathlib import Path
43import re
44import string
45import struct
46import sys
47import threading
48import time
49from typing import (
50    AnyStr,
51    BinaryIO,
52    Callable,
53    Iterable,
54    Iterator,
55    Match,
56    NamedTuple,
57    Pattern,
58)
59
60try:
61    from pw_tokenizer import database, decode, encode, tokens
62except ImportError:
63    # Append this path to the module search path to allow running this module
64    # without installing the pw_tokenizer package.
65    sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
66    from pw_tokenizer import database, decode, encode, tokens
67
68_LOG = logging.getLogger('pw_tokenizer')
69
70ENCODED_TOKEN = struct.Struct('<I')
71_BASE64_CHARS = string.ascii_letters + string.digits + '+/-_='
72DEFAULT_RECURSION = 9
73NESTED_TOKEN_BASE_PREFIX = encode.NESTED_TOKEN_BASE_PREFIX.encode()
74NESTED_DOMAIN_START_PREFIX = encode.NESTED_DOMAIN_START_PREFIX.encode()
75NESTED_DOMAIN_END_PREFIX = encode.NESTED_DOMAIN_END_PREFIX.encode()
76
77_BASE8_TOKEN_REGEX = rb'(?P<base8>[0-7]{11})'
78_BASE10_TOKEN_REGEX = rb'(?P<base10>[0-9]{10})'
79_BASE16_TOKEN_REGEX = rb'(?P<base16>[A-Fa-f0-9]{8})'
80_BASE64_TOKEN_REGEX = (
81    rb'(?P<base64>'
82    # Tokenized Base64 contains 0 or more blocks of four Base64 chars.
83    rb'(?:[A-Za-z0-9+/\-_]{4})*'
84    # The last block of 4 chars may have one or two padding chars (=).
85    rb'(?:[A-Za-z0-9+/\-_]{3}=|[A-Za-z0-9+/\-_]{2}==)?'
86    rb')'
87)
88_NESTED_TOKEN_FORMATS = (
89    _BASE8_TOKEN_REGEX,
90    _BASE10_TOKEN_REGEX,
91    _BASE16_TOKEN_REGEX,
92    _BASE64_TOKEN_REGEX,
93)
94
95
96def _token_regex(prefix: str) -> Pattern[bytes]:
97    """Returns a regular expression for prefixed tokenized strings."""
98    return re.compile(
99        # Tokenized strings start with the prefix character ($).
100        re.escape(prefix.encode())
101        # Optional; no domain specifier defaults to (empty) domain.
102        # Brackets ({}) specifies domain string
103        + rb'(?P<domainspec>('
104        + NESTED_DOMAIN_START_PREFIX
105        + rb'(?P<domain>[^'
106        + NESTED_DOMAIN_END_PREFIX
107        + rb']*)'
108        + NESTED_DOMAIN_END_PREFIX
109        + rb'))?'
110        # Optional; no base specifier defaults to BASE64.
111        # Hash (#) with no number specified defaults to Base-16.
112        + rb'(?P<basespec>(?P<base>[0-9]*)?'
113        + NESTED_TOKEN_BASE_PREFIX
114        + rb')?'
115        # Match one of the following token formats.
116        + rb'('
117        + rb'|'.join(_NESTED_TOKEN_FORMATS)
118        + rb')'
119    )
120
121
122class DetokenizedString:
123    """A detokenized string, with all results if there are collisions."""
124
125    def __init__(
126        self,
127        token: int | None,
128        format_string_entries: Iterable[tuple],
129        encoded_message: bytes,
130        show_errors: bool = False,
131        recursive_detokenize: Callable[[str], str] | None = None,
132    ):
133        self.token = token
134        self.encoded_message = encoded_message
135        self._show_errors = show_errors
136
137        self.successes: list[decode.FormattedString] = []
138        self.failures: list[decode.FormattedString] = []
139
140        decode_attempts: list[tuple[tuple, decode.FormattedString]] = []
141
142        for entry, fmt in format_string_entries:
143            result = fmt.format(
144                encoded_message[ENCODED_TOKEN.size :], show_errors
145            )
146            if recursive_detokenize:
147                result = decode.FormattedString(
148                    recursive_detokenize(result.value),
149                    result.args,
150                    result.remaining,
151                )
152            decode_attempts.append((result.score(entry.date_removed), result))
153
154        # Sort the attempts by the score so the most likely results are first.
155        decode_attempts.sort(key=lambda value: value[0], reverse=True)
156
157        # Split out the successesful decodes from the failures.
158        for score, result in decode_attempts:
159            if score[0]:
160                self.successes.append(result)
161            else:
162                self.failures.append(result)
163
164    def ok(self) -> bool:
165        """True if exactly one string decoded the arguments successfully."""
166        return len(self.successes) == 1
167
168    def matches(self) -> list[decode.FormattedString]:
169        """Returns the strings that matched the token, best matches first."""
170        return self.successes + self.failures
171
172    def best_result(self) -> decode.FormattedString | None:
173        """Returns the string and args for the most likely decoded string."""
174        for string_and_args in self.matches():
175            return string_and_args
176
177        return None
178
179    def error_message(self) -> str:
180        """If detokenization failed, returns a descriptive message."""
181        if self.ok():
182            return ''
183
184        if not self.matches():
185            if self.token is None:
186                return 'missing token'
187
188            return 'unknown token {:08x}'.format(self.token)
189
190        if len(self.matches()) == 1:
191            return 'decoding failed for {!r}'.format(self.matches()[0].value)
192
193        return '{} matches'.format(len(self.matches()))
194
195    def __str__(self) -> str:
196        """Returns the string for the most likely result."""
197        result = self.best_result()
198        if result:
199            return result[0]
200
201        if self._show_errors:
202            return '<[ERROR: {}|{!r}]>'.format(
203                self.error_message(), self.encoded_message
204            )
205
206        # Display the string as prefixed Base64 if it cannot be decoded.
207        return encode.prefixed_base64(self.encoded_message)
208
209    def __repr__(self) -> str:
210        if self.ok():
211            message = repr(str(self))
212        else:
213            message = 'ERROR: {}|{!r}'.format(
214                self.error_message(), self.encoded_message
215            )
216
217        return '{}({})'.format(type(self).__name__, message)
218
219
220class _TokenizedFormatString(NamedTuple):
221    entry: tokens.TokenizedStringEntry
222    format: decode.FormatString
223
224
225class Detokenizer:
226    """Main detokenization class; detokenizes strings and caches results."""
227
228    def __init__(
229        self,
230        *token_database_or_elf,
231        show_errors: bool = False,
232        prefix: str | bytes = encode.NESTED_TOKEN_PREFIX,
233    ):
234        """Decodes and detokenizes binary messages.
235
236        Args:
237          *token_database_or_elf: a path or file object for an ELF or CSV
238              database, a tokens.Database, or an elf_reader.Elf
239          prefix: one-character byte string that signals the start of a message
240          show_errors: if True, an error message is used in place of the %
241              conversion specifier when an argument fails to decode
242        """
243        self.show_errors = show_errors
244        self._prefix = prefix if isinstance(prefix, str) else prefix.decode()
245        self._token_regex = _token_regex(self._prefix)
246        self._database_lock = threading.Lock()
247
248        # Cache FormatStrings for faster lookup & formatting.
249        self._cache: dict[int, list[_TokenizedFormatString]] = {}
250
251        self._initialize_database(token_database_or_elf)
252
253    @property
254    def prefix(self) -> str:
255        return self._prefix
256
257    def _initialize_database(self, token_sources: Iterable) -> None:
258        with self._database_lock:
259            self.database = database.load_token_database(*token_sources)
260            self._cache.clear()
261
262    def lookup(self, token: int) -> list[_TokenizedFormatString]:
263        """Returns (TokenizedStringEntry, FormatString) list for matches."""
264        with self._database_lock:
265            try:
266                return self._cache[token]
267            except KeyError:
268                format_strings = [
269                    _TokenizedFormatString(
270                        entry, decode.FormatString(str(entry))
271                    )
272                    for entry in self.database.token_to_entries[token]
273                ]
274                self._cache[token] = format_strings
275                return format_strings
276
277    def detokenize(
278        self,
279        encoded_message: bytes,
280        recursion: int = DEFAULT_RECURSION,
281    ) -> DetokenizedString:
282        """Decodes and detokenizes a message as a DetokenizedString."""
283        if not encoded_message:
284            return DetokenizedString(
285                None, (), encoded_message, self.show_errors
286            )
287
288        # Pad messages smaller than ENCODED_TOKEN.size with zeroes to support
289        # tokens smaller than a uint32. Messages with arguments must always use
290        # a full 32-bit token.
291        missing_token_bytes = ENCODED_TOKEN.size - len(encoded_message)
292        if missing_token_bytes > 0:
293            encoded_message += b'\0' * missing_token_bytes
294
295        (token,) = ENCODED_TOKEN.unpack_from(encoded_message)
296
297        recursive_detokenize = None
298        if recursion > 0:
299            recursive_detokenize = self._detokenize_nested_callback(recursion)
300
301        return DetokenizedString(
302            token,
303            self.lookup(token),
304            encoded_message,
305            self.show_errors,
306            recursive_detokenize,
307        )
308
309    def detokenize_text(
310        self,
311        data: AnyStr,
312        recursion: int = DEFAULT_RECURSION,
313    ) -> AnyStr:
314        """Decodes and replaces prefixed Base64 messages in the provided data.
315
316        Args:
317          data: the binary data to decode
318          recursion: how many levels to recursively decode
319
320        Returns:
321          copy of the data with all recognized tokens decoded
322        """
323        return self._detokenize_nested_callback(recursion)(data)
324
325    # TODO(gschen): remove unnecessary function
326    def detokenize_base64(
327        self,
328        data: AnyStr,
329        recursion: int = DEFAULT_RECURSION,
330    ) -> AnyStr:
331        """Alias of detokenize_text for backwards compatibility."""
332        return self.detokenize_text(data, recursion)
333
334    def detokenize_text_to_file(
335        self,
336        data: AnyStr,
337        output: BinaryIO,
338        recursion: int = DEFAULT_RECURSION,
339    ) -> None:
340        """Decodes prefixed Base64 messages in data; decodes to output file."""
341        output.write(self._detokenize_nested(data, recursion))
342
343    # TODO(gschen): remove unnecessary function
344    def detokenize_base64_to_file(
345        self,
346        data: AnyStr,
347        output: BinaryIO,
348        recursion: int = DEFAULT_RECURSION,
349    ) -> None:
350        """Alias of detokenize_text_to_file for backwards compatibility."""
351        self.detokenize_text_to_file(data, output, recursion)
352
353    def detokenize_text_live(
354        self,
355        input_file: io.RawIOBase | BinaryIO,
356        output: BinaryIO,
357        recursion: int = DEFAULT_RECURSION,
358    ) -> None:
359        """Reads chars one-at-a-time, decoding messages; SLOW for big files."""
360
361        def transform(data: bytes) -> bytes:
362            return self._detokenize_nested(data.decode(), recursion)
363
364        for message in NestedMessageParser(
365            self._prefix, _BASE64_CHARS
366        ).transform_io(input_file, transform):
367            output.write(message)
368
369            # Flush each line to prevent delays when piping between processes.
370            if b'\n' in message:
371                output.flush()
372
373    # TODO(gschen): remove unnecessary function
374    def detokenize_base64_live(
375        self,
376        input_file: io.RawIOBase | BinaryIO,
377        output: BinaryIO,
378        recursion: int = DEFAULT_RECURSION,
379    ) -> None:
380        """Alias of detokenize_text_live for backwards compatibility."""
381        self.detokenize_text_live(input_file, output, recursion)
382
383    def _detokenize_nested_callback(
384        self,
385        recursion: int,
386    ) -> Callable[[AnyStr], AnyStr]:
387        """Returns a function that replaces all tokens for a given string."""
388
389        def detokenize(message: AnyStr) -> AnyStr:
390            result = self._detokenize_nested(message, recursion)
391            return result.decode() if isinstance(message, str) else result
392
393        return detokenize
394
395    def _detokenize_nested(
396        self,
397        message: str | bytes,
398        recursion: int,
399    ) -> bytes:
400        """Returns the message with recognized tokens replaced.
401
402        Message data is internally handled as bytes regardless of input message
403        type and returns the result as bytes.
404        """
405        # A unified format across the token types is required for regex
406        # consistency.
407        message = message.encode() if isinstance(message, str) else message
408
409        if not self.database:
410            return message
411
412        result = message
413        for _ in range(recursion - 1):
414            result = self._token_regex.sub(self._detokenize_scan, result)
415
416            if result == message:
417                return result
418        return result
419
420    def _detokenize_scan(self, match: Match[bytes]) -> bytes:
421        """Decodes prefixed tokens for one of multiple formats."""
422        basespec = match.group('basespec')
423        base = match.group('base')
424        domain = match.group('domain')
425
426        if domain is None:
427            domain = tokens.DEFAULT_DOMAIN
428        else:
429            domain = domain.decode()
430        if not basespec or (base == b'64'):
431            return self._detokenize_once_base64(match)
432
433        if not base:
434            base = b'16'
435
436        domain = ''.join(domain.split())
437        return self._detokenize_once(match, base, domain)
438
439    def _detokenize_once(
440        self, match: Match[bytes], base: bytes, domain: str
441    ) -> bytes:
442        """Performs lookup on a plain token"""
443        original = match.group(0)
444        token = match.group('base' + base.decode())
445        if not token:
446            return original
447
448        token = int(token, int(base))
449        entries = self.database.domains[domain][token]
450
451        if len(entries) == 1:
452            return str(entries[0]).encode()
453
454        # TODO(gschen): improve token collision reporting
455
456        return original
457
458    def _detokenize_once_base64(
459        self,
460        match: Match[bytes],
461    ) -> bytes:
462        """Performs lookup on a Base64 token"""
463        original = match.group(0)
464
465        try:
466            encoded_token = match.group('base64')
467            if not encoded_token:
468                return original
469
470            detokenized_string = self.detokenize(
471                base64.b64decode(encoded_token, validate=True), recursion=0
472            )
473
474            if detokenized_string.matches():
475                return str(detokenized_string).encode()
476
477        except binascii.Error:
478            pass
479
480        return original
481
482
483class AutoUpdatingDetokenizer(Detokenizer):
484    """Loads and updates a detokenizer from database paths."""
485
486    class _DatabasePath:
487        """Tracks the modified time of a path or file object."""
488
489        def __init__(self, path: Path | str) -> None:
490            self.path, self.domain = database.parse_domain(path)
491            self._modified_time: float | None = self._last_modified_time()
492
493        def updated(self) -> bool:
494            """True if the path has been updated since the last call."""
495            modified_time = self._last_modified_time()
496            if modified_time is None or modified_time == self._modified_time:
497                return False
498
499            self._modified_time = modified_time
500            return True
501
502        def _last_modified_time(self) -> float | None:
503            if self.path.is_dir():
504                mtime = -1.0
505                for child in self.path.glob(tokens.DIR_DB_GLOB):
506                    mtime = max(mtime, os.path.getmtime(child))
507                return mtime if mtime >= 0 else None
508
509            try:
510                return os.path.getmtime(self.path)
511            except FileNotFoundError:
512                return None
513
514        def load(self) -> tokens.Database:
515            try:
516                if self.domain is not None:
517                    return database.load_token_database(
518                        self.path, domain=self.domain
519                    )
520                return database.load_token_database(self.path)
521            except FileNotFoundError:
522                return database.load_token_database()
523
524    def __init__(
525        self,
526        *paths_or_files: Path | str,
527        min_poll_period_s: float = 1.0,
528        pool: Executor = ThreadPoolExecutor(max_workers=1),
529        prefix: str | bytes = encode.NESTED_TOKEN_PREFIX,
530    ) -> None:
531        self.paths = tuple(self._DatabasePath(path) for path in paths_or_files)
532        self.min_poll_period_s = min_poll_period_s
533        self._last_checked_time: float = time.time()
534        # Thread pool to use for loading the databases. Limit to a single
535        # worker since this is low volume and not time critical.
536        self._pool = pool
537        super().__init__(*(path.load() for path in self.paths), prefix=prefix)
538
539    def __del__(self) -> None:
540        self._pool.shutdown(wait=False)
541
542    def _reload_paths(self) -> None:
543        self._initialize_database([path.load() for path in self.paths])
544
545    def _reload_if_changed(self) -> None:
546        if time.time() - self._last_checked_time >= self.min_poll_period_s:
547            self._last_checked_time = time.time()
548
549            if any(path.updated() for path in self.paths):
550                _LOG.info('Changes detected; reloading token database')
551                self._pool.submit(self._reload_paths)
552
553    def lookup(self, token: int) -> list[_TokenizedFormatString]:
554        self._reload_if_changed()
555        return super().lookup(token)
556
557
558class NestedMessageParser:
559    """Parses nested tokenized messages from a byte stream or string."""
560
561    class _State(enum.Enum):
562        MESSAGE = 1
563        NON_MESSAGE = 2
564
565    def __init__(
566        self,
567        prefix: str | bytes = encode.NESTED_TOKEN_PREFIX,
568        chars: str | bytes = _BASE64_CHARS,
569    ) -> None:
570        """Initializes a parser.
571
572        Args:
573            prefix: one character that signifies the start of a message (``$``).
574            chars: characters allowed in a message
575        """
576        self._prefix = ord(prefix)
577
578        if isinstance(chars, str):
579            chars = chars.encode()
580
581        # Store the valid message bytes as a set of byte values.
582        self._message_bytes = frozenset(chars)
583
584        if len(prefix) != 1 or self._prefix in self._message_bytes:
585            raise ValueError(
586                f'Invalid prefix {prefix!r}: the prefix must be a single '
587                'character that is not a valid message character.'
588            )
589
590        self._buffer = bytearray()
591        self._state: NestedMessageParser._State = self._State.NON_MESSAGE
592
593    def read_messages_io(
594        self, binary_io: io.RawIOBase | BinaryIO
595    ) -> Iterator[tuple[bool, bytes]]:
596        """Reads prefixed messages from a byte stream (BinaryIO object).
597
598        Reads until EOF. If the stream is nonblocking (``read(1)`` returns
599        ``None``), then this function returns and may be called again with the
600        same IO object to continue parsing. Partial messages are preserved
601        between calls.
602
603        Yields:
604            ``(is_message, contents)`` chunks.
605        """
606        # The read may block indefinitely, depending on the IO object.
607        while (read_byte := binary_io.read(1)) != b'':
608            # Handle non-blocking IO by returning when no bytes are available.
609            if read_byte is None:
610                return
611
612            for byte in read_byte:
613                yield from self._handle_byte(byte)
614
615            if self._state is self._State.NON_MESSAGE:  # yield non-message byte
616                yield from self._flush()
617
618        yield from self._flush()  # Always flush after EOF
619        self._state = self._State.NON_MESSAGE
620
621    def read_messages(
622        self, chunk: bytes, *, flush: bool = False
623    ) -> Iterator[tuple[bool, bytes]]:
624        """Reads prefixed messages from a byte string.
625
626        This function may be called repeatedly with chunks of a stream. Partial
627        messages are preserved between calls, unless ``flush=True``.
628
629        Args:
630            chunk: byte string that may contain nested messagses
631            flush: whether to flush any incomplete messages after processing
632                this chunk
633
634        Yields:
635            ``(is_message, contents)`` chunks.
636        """
637        for byte in chunk:
638            yield from self._handle_byte(byte)
639
640        if flush or self._state is self._State.NON_MESSAGE:
641            yield from self._flush()
642
643    def _handle_byte(self, byte: int) -> Iterator[tuple[bool, bytes]]:
644        if self._state is self._State.MESSAGE:
645            if byte not in self._message_bytes:
646                yield from self._flush()
647                if byte != self._prefix:
648                    self._state = self._State.NON_MESSAGE
649        elif self._state is self._State.NON_MESSAGE:
650            if byte == self._prefix:
651                yield from self._flush()
652                self._state = self._State.MESSAGE
653        else:
654            raise NotImplementedError(f'Unsupported state: {self._state}')
655
656        self._buffer.append(byte)
657
658    def _flush(self) -> Iterator[tuple[bool, bytes]]:
659        data = bytes(self._buffer)
660        self._buffer.clear()
661        if data:
662            yield self._state is self._State.MESSAGE, data
663
664    def transform_io(
665        self,
666        binary_io: io.RawIOBase | BinaryIO,
667        transform: Callable[[bytes], bytes],
668    ) -> Iterator[bytes]:
669        """Yields the file with a transformation applied to the messages."""
670        for is_message, chunk in self.read_messages_io(binary_io):
671            yield transform(chunk) if is_message else chunk
672
673    def transform(
674        self,
675        chunk: bytes,
676        transform: Callable[[bytes], bytes],
677        *,
678        flush: bool = False,
679    ) -> bytes:
680        """Yields the chunk with a transformation applied to the messages.
681
682        Partial messages are preserved between calls unless ``flush=True``.
683        """
684        return b''.join(
685            transform(data) if is_message else data
686            for is_message, data in self.read_messages(chunk, flush=flush)
687        )
688
689
690# TODO(hepler): Remove this unnecessary function.
691def detokenize_base64(
692    detokenizer: Detokenizer,
693    data: bytes,
694    recursion: int = DEFAULT_RECURSION,
695) -> bytes:
696    """Alias for detokenizer.detokenize_base64 for backwards compatibility.
697
698    This function is deprecated; do not call it.
699    """
700    return detokenizer.detokenize_base64(data, recursion)
701
702
703def _follow_and_detokenize_file(
704    detokenizer: Detokenizer,
705    file: BinaryIO,
706    output: BinaryIO,
707    poll_period_s: float = 0.01,
708) -> None:
709    """Polls a file to detokenize it and any appended data."""
710
711    try:
712        while True:
713            data = file.read()
714            if data:
715                detokenizer.detokenize_base64_to_file(data, output)
716                output.flush()
717            else:
718                time.sleep(poll_period_s)
719    except KeyboardInterrupt:
720        pass
721
722
723def _handle_base64(
724    databases,
725    input_file: BinaryIO,
726    output: BinaryIO,
727    prefix: str,
728    show_errors: bool,
729    follow: bool,
730) -> None:
731    """Handles the base64 command line option."""
732    # argparse.FileType doesn't correctly handle - for binary files.
733    if input_file is sys.stdin:
734        input_file = sys.stdin.buffer
735
736    if output is sys.stdout:
737        output = sys.stdout.buffer
738
739    detokenizer = Detokenizer(
740        tokens.Database.merged(*databases),
741        prefix=prefix,
742        show_errors=show_errors,
743    )
744
745    if follow:
746        _follow_and_detokenize_file(detokenizer, input_file, output)
747    elif input_file.seekable():
748        # Process seekable files all at once, which is MUCH faster.
749        detokenizer.detokenize_base64_to_file(input_file.read(), output)
750    else:
751        # For non-seekable inputs (e.g. pipes), read one character at a time.
752        detokenizer.detokenize_base64_live(input_file, output)
753
754
755def _parse_args() -> argparse.Namespace:
756    """Parses and return command line arguments."""
757
758    parser = argparse.ArgumentParser(
759        description=__doc__,
760        formatter_class=argparse.RawDescriptionHelpFormatter,
761    )
762    parser.set_defaults(handler=lambda **_: parser.print_help())
763
764    subparsers = parser.add_subparsers(help='Encoding of the input.')
765
766    base64_help = 'Detokenize Base64-encoded data from a file or stdin.'
767    subparser = subparsers.add_parser(
768        'base64',
769        description=base64_help,
770        parents=[database.token_databases_parser()],
771        help=base64_help,
772    )
773    subparser.set_defaults(handler=_handle_base64)
774    subparser.add_argument(
775        '-i',
776        '--input',
777        dest='input_file',
778        type=argparse.FileType('rb'),
779        default=sys.stdin.buffer,
780        help='The file from which to read; provide - or omit for stdin.',
781    )
782    subparser.add_argument(
783        '-f',
784        '--follow',
785        action='store_true',
786        help=(
787            'Detokenize data appended to input_file as it grows; similar to '
788            'tail -f.'
789        ),
790    )
791    subparser.add_argument(
792        '-o',
793        '--output',
794        type=argparse.FileType('wb'),
795        default=sys.stdout.buffer,
796        help=(
797            'The file to which to write the output; '
798            'provide - or omit for stdout.'
799        ),
800    )
801    subparser.add_argument(
802        '-p',
803        '--prefix',
804        default=encode.NESTED_TOKEN_PREFIX,
805        help=(
806            'The one-character prefix that signals the start of a '
807            'nested tokenized message. (default: $)'
808        ),
809    )
810    subparser.add_argument(
811        '-s',
812        '--show_errors',
813        action='store_true',
814        help=(
815            'Show error messages instead of conversion specifiers when '
816            'arguments cannot be decoded.'
817        ),
818    )
819
820    return parser.parse_args()
821
822
823def main() -> int:
824    args = _parse_args()
825
826    handler = args.handler
827    del args.handler
828
829    handler(**vars(args))
830    return 0
831
832
833if __name__ == '__main__':
834    if sys.version_info[0] < 3:
835        sys.exit('ERROR: The detokenizer command line tools require Python 3.')
836    sys.exit(main())
837