1# Copyright 2022 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Parses the arguments in a Base64-encoded tokenized message. 15 16This is useful for attempting to decode tokenized messages with arguments for 17which the token is not recognized. 18""" 19 20from __future__ import annotations 21 22import argparse 23import base64 24from dataclasses import dataclass 25import logging 26import sys 27from typing import Collection, Iterable, Iterator, Sequence 28 29import pw_cli.log 30from pw_tokenizer.decode import FormatString, FormattedString 31 32_LOG: logging.Logger = logging.getLogger('pw_tokenizer') 33 34DEFAULT_FORMAT_SPECS = ( 35 '%s', 36 '%d', 37 '%f', 38) 39 40DEFAULT_MAX_ARGS = 8 41PREFIX = '$' 42 43 44def attempt_to_decode( 45 arg_data: bytes, 46 format_specs: Collection[str] = DEFAULT_FORMAT_SPECS, 47 max_args: int = DEFAULT_MAX_ARGS, 48 yield_failures: bool = False, 49) -> Iterator[FormattedString]: 50 """Attempts to decode arguments using the provided format specifiers.""" 51 format_strings = [(0, '')] # (argument count, format string) 52 53 # Each argument requires at least 1 byte. 54 max_args = min(max_args, len(arg_data)) 55 56 while format_strings: 57 arg_count, string = format_strings.pop(0) 58 decode_attempt = FormatString(string).format(arg_data) 59 60 if yield_failures or decode_attempt.ok(): 61 yield decode_attempt 62 63 if arg_count < max_args: 64 format_strings.extend( 65 (arg_count + 1, string + spec) for spec in format_specs 66 ) 67 68 69@dataclass(frozen=True) 70class TokenizedMessage: 71 string: str 72 binary: bytes 73 74 @property 75 def token(self) -> int: 76 return int.from_bytes(self.binary[:4], 'little') 77 78 @property 79 def binary_args(self) -> bytes: 80 return self.binary[4:] 81 82 @classmethod 83 def parse(cls, message: str, prefix: str = '$') -> TokenizedMessage: 84 if not message.startswith(prefix): 85 raise ValueError( 86 f'{message} does not start with {prefix!r} as expected' 87 ) 88 89 binary = base64.b64decode(message[1:]) 90 91 if len(binary) < 4: 92 raise ValueError( 93 f'{message} is only {len(binary)} bytes; ' 94 'tokenized messages must be at least 4 bytes' 95 ) 96 97 return cls(message, binary) 98 99 100def _read_stdin(): 101 try: 102 while True: 103 yield input() 104 except KeyboardInterrupt: 105 return 106 107 108def _text_list(items: Sequence, conjunction: str = 'or') -> str: 109 if len(items) == 1: 110 return str(items[0]) 111 112 return f'{", ".join(str(i) for i in items[:-1])} {conjunction} {items[-1]}' 113 114 115def main( 116 messages: Iterable[str], 117 max_args: int, 118 specs: Sequence[str], 119 show_failures: bool, 120) -> int: 121 """Parses the arguments for a series of tokenized messages.""" 122 exit_code = 0 123 124 for message in iter(messages) if messages else _read_stdin(): 125 if not message: 126 continue 127 128 if not message.startswith(PREFIX): 129 message = PREFIX + message 130 131 _LOG.info('Decoding arguments for %r', message) 132 try: 133 parsed = TokenizedMessage.parse(message) 134 except ValueError as exc: 135 _LOG.error('%s', exc) 136 exit_code = 2 137 continue 138 139 _LOG.info( 140 'Binary: %r [%s] (%d bytes)', 141 parsed.binary, 142 parsed.binary.hex(' ', 1), 143 len(parsed.binary), 144 ) 145 _LOG.info('Token: 0x%08x', parsed.token) 146 _LOG.info( 147 'Args: %r [%s] (%d bytes)', 148 parsed.binary_args, 149 parsed.binary_args.hex(' ', 1), 150 len(parsed.binary_args), 151 ) 152 _LOG.info( 153 'Decoding with up to %d %s arguments', max_args, _text_list(specs) 154 ) 155 156 results = sorted( 157 attempt_to_decode( 158 parsed.binary_args, specs, max_args, show_failures 159 ), 160 key=FormattedString.score, 161 reverse=True, 162 ) 163 164 if not any(result.ok() for result in results): 165 _LOG.warning( 166 ' No combinations of up to %d %s arguments decoded ' 167 'successfully', 168 max_args, 169 _text_list(specs), 170 ) 171 exit_code = 1 172 173 for i, result in enumerate(results, 1): 174 _LOG.info( # pylint: disable=logging-fstring-interpolation 175 f' Attempt %{len(str(len(results)))}d: [%s] %s', 176 i, 177 ' '.join(str(a.specifier) for a in result.args), 178 ' '.join(str(a) for a in result.args), 179 ) 180 print() 181 182 return exit_code 183 184 185def _parse_args() -> argparse.Namespace: 186 parser = argparse.ArgumentParser( 187 description=__doc__, 188 formatter_class=argparse.ArgumentDefaultsHelpFormatter, 189 ) 190 parser.add_argument( 191 '--max-args', 192 default=DEFAULT_MAX_ARGS, 193 type=int, 194 help='Maximum number of printf-style arguments', 195 ) 196 parser.add_argument( 197 '--specs', 198 nargs='*', 199 default=DEFAULT_FORMAT_SPECS, 200 help='Which printf-style format specifiers to check', 201 ) 202 parser.add_argument( 203 '--show-failures', 204 action='store_true', 205 help='Show argument combintations that fail to decode', 206 ) 207 parser.add_argument( 208 'messages', 209 nargs='*', 210 help=( 211 'Base64-encoded tokenized messages to decode; omit to read from ' 212 'stdin' 213 ), 214 ) 215 return parser.parse_args() 216 217 218if __name__ == '__main__': 219 pw_cli.log.install() 220 sys.exit(main(**vars(_parse_args()))) 221