1#!/usr/bin/env python3 2# Copyright 2020 The Pigweed Authors 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may not 5# use this file except in compliance with the License. You may obtain a copy of 6# the License at 7# 8# https://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations under 14# the License. 15r"""Decodes and detokenizes strings from binary or Base64 input. 16 17The main class provided by this module is the Detokenize class. To use it, 18construct it with the path to an ELF or CSV database, a tokens.Database, 19or a file object for an ELF file or CSV. Then, call the detokenize method with 20encoded messages, one at a time. The detokenize method returns a 21DetokenizedString object with the result. 22 23For example:: 24 25 from pw_tokenizer import detokenize 26 27 detok = detokenize.Detokenizer('path/to/firmware/image.elf') 28 print(detok.detokenize(b'\x12\x34\x56\x78\x03hi!')) 29 30This module also provides a command line interface for decoding and detokenizing 31messages from a file or stdin. 32""" 33 34import argparse 35import base64 36import binascii 37from concurrent.futures import Executor, ThreadPoolExecutor 38import enum 39import io 40import logging 41import os 42from pathlib import Path 43import re 44import string 45import struct 46import sys 47import threading 48import time 49from typing import ( 50 AnyStr, 51 BinaryIO, 52 Callable, 53 Iterable, 54 Iterator, 55 Match, 56 NamedTuple, 57 Pattern, 58) 59 60try: 61 from pw_tokenizer import database, decode, encode, tokens 62except ImportError: 63 # Append this path to the module search path to allow running this module 64 # without installing the pw_tokenizer package. 65 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) 66 from pw_tokenizer import database, decode, encode, tokens 67 68_LOG = logging.getLogger('pw_tokenizer') 69 70ENCODED_TOKEN = struct.Struct('<I') 71_BASE64_CHARS = string.ascii_letters + string.digits + '+/-_=' 72DEFAULT_RECURSION = 9 73NESTED_TOKEN_BASE_PREFIX = encode.NESTED_TOKEN_BASE_PREFIX.encode() 74NESTED_DOMAIN_START_PREFIX = encode.NESTED_DOMAIN_START_PREFIX.encode() 75NESTED_DOMAIN_END_PREFIX = encode.NESTED_DOMAIN_END_PREFIX.encode() 76 77_BASE8_TOKEN_REGEX = rb'(?P<base8>[0-7]{11})' 78_BASE10_TOKEN_REGEX = rb'(?P<base10>[0-9]{10})' 79_BASE16_TOKEN_REGEX = rb'(?P<base16>[A-Fa-f0-9]{8})' 80_BASE64_TOKEN_REGEX = ( 81 rb'(?P<base64>' 82 # Tokenized Base64 contains 0 or more blocks of four Base64 chars. 83 rb'(?:[A-Za-z0-9+/\-_]{4})*' 84 # The last block of 4 chars may have one or two padding chars (=). 85 rb'(?:[A-Za-z0-9+/\-_]{3}=|[A-Za-z0-9+/\-_]{2}==)?' 86 rb')' 87) 88_NESTED_TOKEN_FORMATS = ( 89 _BASE8_TOKEN_REGEX, 90 _BASE10_TOKEN_REGEX, 91 _BASE16_TOKEN_REGEX, 92 _BASE64_TOKEN_REGEX, 93) 94 95 96def _token_regex(prefix: str) -> Pattern[bytes]: 97 """Returns a regular expression for prefixed tokenized strings.""" 98 return re.compile( 99 # Tokenized strings start with the prefix character ($). 100 re.escape(prefix.encode()) 101 # Optional; no domain specifier defaults to (empty) domain. 102 # Brackets ({}) specifies domain string 103 + rb'(?P<domainspec>(' 104 + NESTED_DOMAIN_START_PREFIX 105 + rb'(?P<domain>[^' 106 + NESTED_DOMAIN_END_PREFIX 107 + rb']*)' 108 + NESTED_DOMAIN_END_PREFIX 109 + rb'))?' 110 # Optional; no base specifier defaults to BASE64. 111 # Hash (#) with no number specified defaults to Base-16. 112 + rb'(?P<basespec>(?P<base>[0-9]*)?' 113 + NESTED_TOKEN_BASE_PREFIX 114 + rb')?' 115 # Match one of the following token formats. 116 + rb'(' 117 + rb'|'.join(_NESTED_TOKEN_FORMATS) 118 + rb')' 119 ) 120 121 122class DetokenizedString: 123 """A detokenized string, with all results if there are collisions.""" 124 125 def __init__( 126 self, 127 token: int | None, 128 format_string_entries: Iterable[tuple], 129 encoded_message: bytes, 130 show_errors: bool = False, 131 recursive_detokenize: Callable[[str], str] | None = None, 132 ): 133 self.token = token 134 self.encoded_message = encoded_message 135 self._show_errors = show_errors 136 137 self.successes: list[decode.FormattedString] = [] 138 self.failures: list[decode.FormattedString] = [] 139 140 decode_attempts: list[tuple[tuple, decode.FormattedString]] = [] 141 142 for entry, fmt in format_string_entries: 143 result = fmt.format( 144 encoded_message[ENCODED_TOKEN.size :], show_errors 145 ) 146 if recursive_detokenize: 147 result = decode.FormattedString( 148 recursive_detokenize(result.value), 149 result.args, 150 result.remaining, 151 ) 152 decode_attempts.append((result.score(entry.date_removed), result)) 153 154 # Sort the attempts by the score so the most likely results are first. 155 decode_attempts.sort(key=lambda value: value[0], reverse=True) 156 157 # Split out the successesful decodes from the failures. 158 for score, result in decode_attempts: 159 if score[0]: 160 self.successes.append(result) 161 else: 162 self.failures.append(result) 163 164 def ok(self) -> bool: 165 """True if exactly one string decoded the arguments successfully.""" 166 return len(self.successes) == 1 167 168 def matches(self) -> list[decode.FormattedString]: 169 """Returns the strings that matched the token, best matches first.""" 170 return self.successes + self.failures 171 172 def best_result(self) -> decode.FormattedString | None: 173 """Returns the string and args for the most likely decoded string.""" 174 for string_and_args in self.matches(): 175 return string_and_args 176 177 return None 178 179 def error_message(self) -> str: 180 """If detokenization failed, returns a descriptive message.""" 181 if self.ok(): 182 return '' 183 184 if not self.matches(): 185 if self.token is None: 186 return 'missing token' 187 188 return 'unknown token {:08x}'.format(self.token) 189 190 if len(self.matches()) == 1: 191 return 'decoding failed for {!r}'.format(self.matches()[0].value) 192 193 return '{} matches'.format(len(self.matches())) 194 195 def __str__(self) -> str: 196 """Returns the string for the most likely result.""" 197 result = self.best_result() 198 if result: 199 return result[0] 200 201 if self._show_errors: 202 return '<[ERROR: {}|{!r}]>'.format( 203 self.error_message(), self.encoded_message 204 ) 205 206 # Display the string as prefixed Base64 if it cannot be decoded. 207 return encode.prefixed_base64(self.encoded_message) 208 209 def __repr__(self) -> str: 210 if self.ok(): 211 message = repr(str(self)) 212 else: 213 message = 'ERROR: {}|{!r}'.format( 214 self.error_message(), self.encoded_message 215 ) 216 217 return '{}({})'.format(type(self).__name__, message) 218 219 220class _TokenizedFormatString(NamedTuple): 221 entry: tokens.TokenizedStringEntry 222 format: decode.FormatString 223 224 225class Detokenizer: 226 """Main detokenization class; detokenizes strings and caches results.""" 227 228 def __init__( 229 self, 230 *token_database_or_elf, 231 show_errors: bool = False, 232 prefix: str | bytes = encode.NESTED_TOKEN_PREFIX, 233 ): 234 """Decodes and detokenizes binary messages. 235 236 Args: 237 *token_database_or_elf: a path or file object for an ELF or CSV 238 database, a tokens.Database, or an elf_reader.Elf 239 prefix: one-character byte string that signals the start of a message 240 show_errors: if True, an error message is used in place of the % 241 conversion specifier when an argument fails to decode 242 """ 243 self.show_errors = show_errors 244 self._prefix = prefix if isinstance(prefix, str) else prefix.decode() 245 self._token_regex = _token_regex(self._prefix) 246 self._database_lock = threading.Lock() 247 248 # Cache FormatStrings for faster lookup & formatting. 249 self._cache: dict[int, list[_TokenizedFormatString]] = {} 250 251 self._initialize_database(token_database_or_elf) 252 253 @property 254 def prefix(self) -> str: 255 return self._prefix 256 257 def _initialize_database(self, token_sources: Iterable) -> None: 258 with self._database_lock: 259 self.database = database.load_token_database(*token_sources) 260 self._cache.clear() 261 262 def lookup(self, token: int) -> list[_TokenizedFormatString]: 263 """Returns (TokenizedStringEntry, FormatString) list for matches.""" 264 with self._database_lock: 265 try: 266 return self._cache[token] 267 except KeyError: 268 format_strings = [ 269 _TokenizedFormatString( 270 entry, decode.FormatString(str(entry)) 271 ) 272 for entry in self.database.token_to_entries[token] 273 ] 274 self._cache[token] = format_strings 275 return format_strings 276 277 def detokenize( 278 self, 279 encoded_message: bytes, 280 recursion: int = DEFAULT_RECURSION, 281 ) -> DetokenizedString: 282 """Decodes and detokenizes a message as a DetokenizedString.""" 283 if not encoded_message: 284 return DetokenizedString( 285 None, (), encoded_message, self.show_errors 286 ) 287 288 # Pad messages smaller than ENCODED_TOKEN.size with zeroes to support 289 # tokens smaller than a uint32. Messages with arguments must always use 290 # a full 32-bit token. 291 missing_token_bytes = ENCODED_TOKEN.size - len(encoded_message) 292 if missing_token_bytes > 0: 293 encoded_message += b'\0' * missing_token_bytes 294 295 (token,) = ENCODED_TOKEN.unpack_from(encoded_message) 296 297 recursive_detokenize = None 298 if recursion > 0: 299 recursive_detokenize = self._detokenize_nested_callback(recursion) 300 301 return DetokenizedString( 302 token, 303 self.lookup(token), 304 encoded_message, 305 self.show_errors, 306 recursive_detokenize, 307 ) 308 309 def detokenize_text( 310 self, 311 data: AnyStr, 312 recursion: int = DEFAULT_RECURSION, 313 ) -> AnyStr: 314 """Decodes and replaces prefixed Base64 messages in the provided data. 315 316 Args: 317 data: the binary data to decode 318 recursion: how many levels to recursively decode 319 320 Returns: 321 copy of the data with all recognized tokens decoded 322 """ 323 return self._detokenize_nested_callback(recursion)(data) 324 325 # TODO(gschen): remove unnecessary function 326 def detokenize_base64( 327 self, 328 data: AnyStr, 329 recursion: int = DEFAULT_RECURSION, 330 ) -> AnyStr: 331 """Alias of detokenize_text for backwards compatibility.""" 332 return self.detokenize_text(data, recursion) 333 334 def detokenize_text_to_file( 335 self, 336 data: AnyStr, 337 output: BinaryIO, 338 recursion: int = DEFAULT_RECURSION, 339 ) -> None: 340 """Decodes prefixed Base64 messages in data; decodes to output file.""" 341 output.write(self._detokenize_nested(data, recursion)) 342 343 # TODO(gschen): remove unnecessary function 344 def detokenize_base64_to_file( 345 self, 346 data: AnyStr, 347 output: BinaryIO, 348 recursion: int = DEFAULT_RECURSION, 349 ) -> None: 350 """Alias of detokenize_text_to_file for backwards compatibility.""" 351 self.detokenize_text_to_file(data, output, recursion) 352 353 def detokenize_text_live( 354 self, 355 input_file: io.RawIOBase | BinaryIO, 356 output: BinaryIO, 357 recursion: int = DEFAULT_RECURSION, 358 ) -> None: 359 """Reads chars one-at-a-time, decoding messages; SLOW for big files.""" 360 361 def transform(data: bytes) -> bytes: 362 return self._detokenize_nested(data.decode(), recursion) 363 364 for message in NestedMessageParser( 365 self._prefix, _BASE64_CHARS 366 ).transform_io(input_file, transform): 367 output.write(message) 368 369 # Flush each line to prevent delays when piping between processes. 370 if b'\n' in message: 371 output.flush() 372 373 # TODO(gschen): remove unnecessary function 374 def detokenize_base64_live( 375 self, 376 input_file: io.RawIOBase | BinaryIO, 377 output: BinaryIO, 378 recursion: int = DEFAULT_RECURSION, 379 ) -> None: 380 """Alias of detokenize_text_live for backwards compatibility.""" 381 self.detokenize_text_live(input_file, output, recursion) 382 383 def _detokenize_nested_callback( 384 self, 385 recursion: int, 386 ) -> Callable[[AnyStr], AnyStr]: 387 """Returns a function that replaces all tokens for a given string.""" 388 389 def detokenize(message: AnyStr) -> AnyStr: 390 result = self._detokenize_nested(message, recursion) 391 return result.decode() if isinstance(message, str) else result 392 393 return detokenize 394 395 def _detokenize_nested( 396 self, 397 message: str | bytes, 398 recursion: int, 399 ) -> bytes: 400 """Returns the message with recognized tokens replaced. 401 402 Message data is internally handled as bytes regardless of input message 403 type and returns the result as bytes. 404 """ 405 # A unified format across the token types is required for regex 406 # consistency. 407 message = message.encode() if isinstance(message, str) else message 408 409 if not self.database: 410 return message 411 412 result = message 413 for _ in range(recursion - 1): 414 result = self._token_regex.sub(self._detokenize_scan, result) 415 416 if result == message: 417 return result 418 return result 419 420 def _detokenize_scan(self, match: Match[bytes]) -> bytes: 421 """Decodes prefixed tokens for one of multiple formats.""" 422 basespec = match.group('basespec') 423 base = match.group('base') 424 domain = match.group('domain') 425 426 if domain is None: 427 domain = tokens.DEFAULT_DOMAIN 428 else: 429 domain = domain.decode() 430 if not basespec or (base == b'64'): 431 return self._detokenize_once_base64(match) 432 433 if not base: 434 base = b'16' 435 436 domain = ''.join(domain.split()) 437 return self._detokenize_once(match, base, domain) 438 439 def _detokenize_once( 440 self, match: Match[bytes], base: bytes, domain: str 441 ) -> bytes: 442 """Performs lookup on a plain token""" 443 original = match.group(0) 444 token = match.group('base' + base.decode()) 445 if not token: 446 return original 447 448 token = int(token, int(base)) 449 entries = self.database.domains[domain][token] 450 451 if len(entries) == 1: 452 return str(entries[0]).encode() 453 454 # TODO(gschen): improve token collision reporting 455 456 return original 457 458 def _detokenize_once_base64( 459 self, 460 match: Match[bytes], 461 ) -> bytes: 462 """Performs lookup on a Base64 token""" 463 original = match.group(0) 464 465 try: 466 encoded_token = match.group('base64') 467 if not encoded_token: 468 return original 469 470 detokenized_string = self.detokenize( 471 base64.b64decode(encoded_token, validate=True), recursion=0 472 ) 473 474 if detokenized_string.matches(): 475 return str(detokenized_string).encode() 476 477 except binascii.Error: 478 pass 479 480 return original 481 482 483class AutoUpdatingDetokenizer(Detokenizer): 484 """Loads and updates a detokenizer from database paths.""" 485 486 class _DatabasePath: 487 """Tracks the modified time of a path or file object.""" 488 489 def __init__(self, path: Path | str) -> None: 490 self.path, self.domain = database.parse_domain(path) 491 self._modified_time: float | None = self._last_modified_time() 492 493 def updated(self) -> bool: 494 """True if the path has been updated since the last call.""" 495 modified_time = self._last_modified_time() 496 if modified_time is None or modified_time == self._modified_time: 497 return False 498 499 self._modified_time = modified_time 500 return True 501 502 def _last_modified_time(self) -> float | None: 503 if self.path.is_dir(): 504 mtime = -1.0 505 for child in self.path.glob(tokens.DIR_DB_GLOB): 506 mtime = max(mtime, os.path.getmtime(child)) 507 return mtime if mtime >= 0 else None 508 509 try: 510 return os.path.getmtime(self.path) 511 except FileNotFoundError: 512 return None 513 514 def load(self) -> tokens.Database: 515 try: 516 if self.domain is not None: 517 return database.load_token_database( 518 self.path, domain=self.domain 519 ) 520 return database.load_token_database(self.path) 521 except FileNotFoundError: 522 return database.load_token_database() 523 524 def __init__( 525 self, 526 *paths_or_files: Path | str, 527 min_poll_period_s: float = 1.0, 528 pool: Executor = ThreadPoolExecutor(max_workers=1), 529 prefix: str | bytes = encode.NESTED_TOKEN_PREFIX, 530 ) -> None: 531 self.paths = tuple(self._DatabasePath(path) for path in paths_or_files) 532 self.min_poll_period_s = min_poll_period_s 533 self._last_checked_time: float = time.time() 534 # Thread pool to use for loading the databases. Limit to a single 535 # worker since this is low volume and not time critical. 536 self._pool = pool 537 super().__init__(*(path.load() for path in self.paths), prefix=prefix) 538 539 def __del__(self) -> None: 540 self._pool.shutdown(wait=False) 541 542 def _reload_paths(self) -> None: 543 self._initialize_database([path.load() for path in self.paths]) 544 545 def _reload_if_changed(self) -> None: 546 if time.time() - self._last_checked_time >= self.min_poll_period_s: 547 self._last_checked_time = time.time() 548 549 if any(path.updated() for path in self.paths): 550 _LOG.info('Changes detected; reloading token database') 551 self._pool.submit(self._reload_paths) 552 553 def lookup(self, token: int) -> list[_TokenizedFormatString]: 554 self._reload_if_changed() 555 return super().lookup(token) 556 557 558class NestedMessageParser: 559 """Parses nested tokenized messages from a byte stream or string.""" 560 561 class _State(enum.Enum): 562 MESSAGE = 1 563 NON_MESSAGE = 2 564 565 def __init__( 566 self, 567 prefix: str | bytes = encode.NESTED_TOKEN_PREFIX, 568 chars: str | bytes = _BASE64_CHARS, 569 ) -> None: 570 """Initializes a parser. 571 572 Args: 573 prefix: one character that signifies the start of a message (``$``). 574 chars: characters allowed in a message 575 """ 576 self._prefix = ord(prefix) 577 578 if isinstance(chars, str): 579 chars = chars.encode() 580 581 # Store the valid message bytes as a set of byte values. 582 self._message_bytes = frozenset(chars) 583 584 if len(prefix) != 1 or self._prefix in self._message_bytes: 585 raise ValueError( 586 f'Invalid prefix {prefix!r}: the prefix must be a single ' 587 'character that is not a valid message character.' 588 ) 589 590 self._buffer = bytearray() 591 self._state: NestedMessageParser._State = self._State.NON_MESSAGE 592 593 def read_messages_io( 594 self, binary_io: io.RawIOBase | BinaryIO 595 ) -> Iterator[tuple[bool, bytes]]: 596 """Reads prefixed messages from a byte stream (BinaryIO object). 597 598 Reads until EOF. If the stream is nonblocking (``read(1)`` returns 599 ``None``), then this function returns and may be called again with the 600 same IO object to continue parsing. Partial messages are preserved 601 between calls. 602 603 Yields: 604 ``(is_message, contents)`` chunks. 605 """ 606 # The read may block indefinitely, depending on the IO object. 607 while (read_byte := binary_io.read(1)) != b'': 608 # Handle non-blocking IO by returning when no bytes are available. 609 if read_byte is None: 610 return 611 612 for byte in read_byte: 613 yield from self._handle_byte(byte) 614 615 if self._state is self._State.NON_MESSAGE: # yield non-message byte 616 yield from self._flush() 617 618 yield from self._flush() # Always flush after EOF 619 self._state = self._State.NON_MESSAGE 620 621 def read_messages( 622 self, chunk: bytes, *, flush: bool = False 623 ) -> Iterator[tuple[bool, bytes]]: 624 """Reads prefixed messages from a byte string. 625 626 This function may be called repeatedly with chunks of a stream. Partial 627 messages are preserved between calls, unless ``flush=True``. 628 629 Args: 630 chunk: byte string that may contain nested messagses 631 flush: whether to flush any incomplete messages after processing 632 this chunk 633 634 Yields: 635 ``(is_message, contents)`` chunks. 636 """ 637 for byte in chunk: 638 yield from self._handle_byte(byte) 639 640 if flush or self._state is self._State.NON_MESSAGE: 641 yield from self._flush() 642 643 def _handle_byte(self, byte: int) -> Iterator[tuple[bool, bytes]]: 644 if self._state is self._State.MESSAGE: 645 if byte not in self._message_bytes: 646 yield from self._flush() 647 if byte != self._prefix: 648 self._state = self._State.NON_MESSAGE 649 elif self._state is self._State.NON_MESSAGE: 650 if byte == self._prefix: 651 yield from self._flush() 652 self._state = self._State.MESSAGE 653 else: 654 raise NotImplementedError(f'Unsupported state: {self._state}') 655 656 self._buffer.append(byte) 657 658 def _flush(self) -> Iterator[tuple[bool, bytes]]: 659 data = bytes(self._buffer) 660 self._buffer.clear() 661 if data: 662 yield self._state is self._State.MESSAGE, data 663 664 def transform_io( 665 self, 666 binary_io: io.RawIOBase | BinaryIO, 667 transform: Callable[[bytes], bytes], 668 ) -> Iterator[bytes]: 669 """Yields the file with a transformation applied to the messages.""" 670 for is_message, chunk in self.read_messages_io(binary_io): 671 yield transform(chunk) if is_message else chunk 672 673 def transform( 674 self, 675 chunk: bytes, 676 transform: Callable[[bytes], bytes], 677 *, 678 flush: bool = False, 679 ) -> bytes: 680 """Yields the chunk with a transformation applied to the messages. 681 682 Partial messages are preserved between calls unless ``flush=True``. 683 """ 684 return b''.join( 685 transform(data) if is_message else data 686 for is_message, data in self.read_messages(chunk, flush=flush) 687 ) 688 689 690# TODO(hepler): Remove this unnecessary function. 691def detokenize_base64( 692 detokenizer: Detokenizer, 693 data: bytes, 694 recursion: int = DEFAULT_RECURSION, 695) -> bytes: 696 """Alias for detokenizer.detokenize_base64 for backwards compatibility. 697 698 This function is deprecated; do not call it. 699 """ 700 return detokenizer.detokenize_base64(data, recursion) 701 702 703def _follow_and_detokenize_file( 704 detokenizer: Detokenizer, 705 file: BinaryIO, 706 output: BinaryIO, 707 poll_period_s: float = 0.01, 708) -> None: 709 """Polls a file to detokenize it and any appended data.""" 710 711 try: 712 while True: 713 data = file.read() 714 if data: 715 detokenizer.detokenize_base64_to_file(data, output) 716 output.flush() 717 else: 718 time.sleep(poll_period_s) 719 except KeyboardInterrupt: 720 pass 721 722 723def _handle_base64( 724 databases, 725 input_file: BinaryIO, 726 output: BinaryIO, 727 prefix: str, 728 show_errors: bool, 729 follow: bool, 730) -> None: 731 """Handles the base64 command line option.""" 732 # argparse.FileType doesn't correctly handle - for binary files. 733 if input_file is sys.stdin: 734 input_file = sys.stdin.buffer 735 736 if output is sys.stdout: 737 output = sys.stdout.buffer 738 739 detokenizer = Detokenizer( 740 tokens.Database.merged(*databases), 741 prefix=prefix, 742 show_errors=show_errors, 743 ) 744 745 if follow: 746 _follow_and_detokenize_file(detokenizer, input_file, output) 747 elif input_file.seekable(): 748 # Process seekable files all at once, which is MUCH faster. 749 detokenizer.detokenize_base64_to_file(input_file.read(), output) 750 else: 751 # For non-seekable inputs (e.g. pipes), read one character at a time. 752 detokenizer.detokenize_base64_live(input_file, output) 753 754 755def _parse_args() -> argparse.Namespace: 756 """Parses and return command line arguments.""" 757 758 parser = argparse.ArgumentParser( 759 description=__doc__, 760 formatter_class=argparse.RawDescriptionHelpFormatter, 761 ) 762 parser.set_defaults(handler=lambda **_: parser.print_help()) 763 764 subparsers = parser.add_subparsers(help='Encoding of the input.') 765 766 base64_help = 'Detokenize Base64-encoded data from a file or stdin.' 767 subparser = subparsers.add_parser( 768 'base64', 769 description=base64_help, 770 parents=[database.token_databases_parser()], 771 help=base64_help, 772 ) 773 subparser.set_defaults(handler=_handle_base64) 774 subparser.add_argument( 775 '-i', 776 '--input', 777 dest='input_file', 778 type=argparse.FileType('rb'), 779 default=sys.stdin.buffer, 780 help='The file from which to read; provide - or omit for stdin.', 781 ) 782 subparser.add_argument( 783 '-f', 784 '--follow', 785 action='store_true', 786 help=( 787 'Detokenize data appended to input_file as it grows; similar to ' 788 'tail -f.' 789 ), 790 ) 791 subparser.add_argument( 792 '-o', 793 '--output', 794 type=argparse.FileType('wb'), 795 default=sys.stdout.buffer, 796 help=( 797 'The file to which to write the output; ' 798 'provide - or omit for stdout.' 799 ), 800 ) 801 subparser.add_argument( 802 '-p', 803 '--prefix', 804 default=encode.NESTED_TOKEN_PREFIX, 805 help=( 806 'The one-character prefix that signals the start of a ' 807 'nested tokenized message. (default: $)' 808 ), 809 ) 810 subparser.add_argument( 811 '-s', 812 '--show_errors', 813 action='store_true', 814 help=( 815 'Show error messages instead of conversion specifiers when ' 816 'arguments cannot be decoded.' 817 ), 818 ) 819 820 return parser.parse_args() 821 822 823def main() -> int: 824 args = _parse_args() 825 826 handler = args.handler 827 del args.handler 828 829 handler(**vars(args)) 830 return 0 831 832 833if __name__ == '__main__': 834 if sys.version_info[0] < 3: 835 sys.exit('ERROR: The detokenizer command line tools require Python 3.') 836 sys.exit(main()) 837