xref: /aosp_15_r20/external/pigweed/pw_tokenizer/py/pw_tokenizer/parse_message.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1# Copyright 2022 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Parses the arguments in a Base64-encoded tokenized message.
15
16This is useful for attempting to decode tokenized messages with arguments for
17which the token is not recognized.
18"""
19
20from __future__ import annotations
21
22import argparse
23import base64
24from dataclasses import dataclass
25import logging
26import sys
27from typing import Collection, Iterable, Iterator, Sequence
28
29import pw_cli.log
30from pw_tokenizer.decode import FormatString, FormattedString
31
32_LOG: logging.Logger = logging.getLogger('pw_tokenizer')
33
34DEFAULT_FORMAT_SPECS = (
35    '%s',
36    '%d',
37    '%f',
38)
39
40DEFAULT_MAX_ARGS = 8
41PREFIX = '$'
42
43
44def attempt_to_decode(
45    arg_data: bytes,
46    format_specs: Collection[str] = DEFAULT_FORMAT_SPECS,
47    max_args: int = DEFAULT_MAX_ARGS,
48    yield_failures: bool = False,
49) -> Iterator[FormattedString]:
50    """Attempts to decode arguments using the provided format specifiers."""
51    format_strings = [(0, '')]  # (argument count, format string)
52
53    # Each argument requires at least 1 byte.
54    max_args = min(max_args, len(arg_data))
55
56    while format_strings:
57        arg_count, string = format_strings.pop(0)
58        decode_attempt = FormatString(string).format(arg_data)
59
60        if yield_failures or decode_attempt.ok():
61            yield decode_attempt
62
63        if arg_count < max_args:
64            format_strings.extend(
65                (arg_count + 1, string + spec) for spec in format_specs
66            )
67
68
69@dataclass(frozen=True)
70class TokenizedMessage:
71    string: str
72    binary: bytes
73
74    @property
75    def token(self) -> int:
76        return int.from_bytes(self.binary[:4], 'little')
77
78    @property
79    def binary_args(self) -> bytes:
80        return self.binary[4:]
81
82    @classmethod
83    def parse(cls, message: str, prefix: str = '$') -> TokenizedMessage:
84        if not message.startswith(prefix):
85            raise ValueError(
86                f'{message} does not start with {prefix!r} as expected'
87            )
88
89        binary = base64.b64decode(message[1:])
90
91        if len(binary) < 4:
92            raise ValueError(
93                f'{message} is only {len(binary)} bytes; '
94                'tokenized messages must be at least 4 bytes'
95            )
96
97        return cls(message, binary)
98
99
100def _read_stdin():
101    try:
102        while True:
103            yield input()
104    except KeyboardInterrupt:
105        return
106
107
108def _text_list(items: Sequence, conjunction: str = 'or') -> str:
109    if len(items) == 1:
110        return str(items[0])
111
112    return f'{", ".join(str(i) for i in items[:-1])} {conjunction} {items[-1]}'
113
114
115def main(
116    messages: Iterable[str],
117    max_args: int,
118    specs: Sequence[str],
119    show_failures: bool,
120) -> int:
121    """Parses the arguments for a series of tokenized messages."""
122    exit_code = 0
123
124    for message in iter(messages) if messages else _read_stdin():
125        if not message:
126            continue
127
128        if not message.startswith(PREFIX):
129            message = PREFIX + message
130
131        _LOG.info('Decoding arguments for %r', message)
132        try:
133            parsed = TokenizedMessage.parse(message)
134        except ValueError as exc:
135            _LOG.error('%s', exc)
136            exit_code = 2
137            continue
138
139        _LOG.info(
140            'Binary: %r [%s] (%d bytes)',
141            parsed.binary,
142            parsed.binary.hex(' ', 1),
143            len(parsed.binary),
144        )
145        _LOG.info('Token:  0x%08x', parsed.token)
146        _LOG.info(
147            'Args:   %r [%s] (%d bytes)',
148            parsed.binary_args,
149            parsed.binary_args.hex(' ', 1),
150            len(parsed.binary_args),
151        )
152        _LOG.info(
153            'Decoding with up to %d %s arguments', max_args, _text_list(specs)
154        )
155
156        results = sorted(
157            attempt_to_decode(
158                parsed.binary_args, specs, max_args, show_failures
159            ),
160            key=FormattedString.score,
161            reverse=True,
162        )
163
164        if not any(result.ok() for result in results):
165            _LOG.warning(
166                '  No combinations of up to %d %s arguments decoded '
167                'successfully',
168                max_args,
169                _text_list(specs),
170            )
171            exit_code = 1
172
173        for i, result in enumerate(results, 1):
174            _LOG.info(  # pylint: disable=logging-fstring-interpolation
175                f'  Attempt %{len(str(len(results)))}d: [%s] %s',
176                i,
177                ' '.join(str(a.specifier) for a in result.args),
178                ' '.join(str(a) for a in result.args),
179            )
180        print()
181
182    return exit_code
183
184
185def _parse_args() -> argparse.Namespace:
186    parser = argparse.ArgumentParser(
187        description=__doc__,
188        formatter_class=argparse.ArgumentDefaultsHelpFormatter,
189    )
190    parser.add_argument(
191        '--max-args',
192        default=DEFAULT_MAX_ARGS,
193        type=int,
194        help='Maximum number of printf-style arguments',
195    )
196    parser.add_argument(
197        '--specs',
198        nargs='*',
199        default=DEFAULT_FORMAT_SPECS,
200        help='Which printf-style format specifiers to check',
201    )
202    parser.add_argument(
203        '--show-failures',
204        action='store_true',
205        help='Show argument combintations that fail to decode',
206    )
207    parser.add_argument(
208        'messages',
209        nargs='*',
210        help=(
211            'Base64-encoded tokenized messages to decode; omit to read from '
212            'stdin'
213        ),
214    )
215    return parser.parse_args()
216
217
218if __name__ == '__main__':
219    pw_cli.log.install()
220    sys.exit(main(**vars(_parse_args())))
221