xref: /aosp_15_r20/external/pigweed/pw_hdlc/py/pw_hdlc/decode.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1# Copyright 2020 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Decoder class for decoding bytes using HDLC protocol"""
15
16import enum
17import logging
18import threading
19import time
20from typing import Iterable, Callable, Any
21import zlib
22
23from pw_hdlc import protocol
24
25_LOG = logging.getLogger('pw_hdlc')
26
27NO_ADDRESS = -1
28_MIN_FRAME_SIZE = 6  # 1 B address + 1 B control + 4 B CRC-32
29_FLAG_BYTE = bytes([protocol.FLAG])
30
31
32class FrameStatus(enum.Enum):
33    """Indicates that an error occurred."""
34
35    OK = 'OK'
36    FCS_MISMATCH = 'frame check sequence failure'
37    FRAMING_ERROR = 'invalid flag or escape characters'
38    BAD_ADDRESS = 'address field too long'
39
40
41class Frame:
42    """Represents an HDLC frame."""
43
44    def __init__(
45        self,
46        raw_encoded: bytes,
47        raw_decoded: bytes,
48        status: FrameStatus = FrameStatus.OK,
49    ):
50        """Parses fields from an HDLC frame.
51
52        Arguments:
53            raw_encoded: The complete HDLC-encoded frame, including any HDLC
54                flag bytes.  In the case of back to back frames, the
55                beginning flag byte may be omitted.
56            raw_decoded: The complete decoded frame (address, control,
57                information, FCS).
58            status: Whether parsing the frame succeeded.
59        """
60        self.raw_encoded = raw_encoded
61        self.raw_decoded = raw_decoded
62        self.status = status
63
64        self.address: int = NO_ADDRESS
65        self.control: bytes = b''
66        self.data: bytes = b''
67
68        if status == FrameStatus.OK:
69            address, address_length = protocol.decode_address(raw_decoded)
70            if address_length == 0:
71                self.status = FrameStatus.BAD_ADDRESS
72                return
73
74            self.address = address
75            self.control = raw_decoded[address_length : address_length + 1]
76            self.data = raw_decoded[address_length + 1 : -4]
77
78    def ok(self) -> bool:
79        """``True`` if this represents a valid frame.
80
81        If ``False``, then parsing failed. The status is set to indicate what
82        type of error occurred, and the data field contains all bytes parsed
83        from the frame (including bytes parsed as address or control bytes).
84        """
85        return self.status is FrameStatus.OK
86
87    def __repr__(self) -> str:
88        if self.ok():
89            body = (
90                f'address={self.address}, control={self.control!r}, '
91                f'data={self.data!r}'
92            )
93        else:
94            body = (
95                f'raw_encoded={self.raw_encoded!r}, '
96                f'status={str(self.status)}'
97            )
98
99        return f'{type(self).__name__}({body})'
100
101
102class _State(enum.Enum):
103    INTERFRAME = 0
104    FRAME = 1
105    FRAME_ESCAPE = 2
106
107
108def _check_frame(frame_data: bytes) -> FrameStatus:
109    if len(frame_data) < _MIN_FRAME_SIZE:
110        return FrameStatus.FRAMING_ERROR
111
112    frame_crc = int.from_bytes(frame_data[-4:], 'little')
113    if zlib.crc32(frame_data[:-4]) != frame_crc:
114        return FrameStatus.FCS_MISMATCH
115
116    return FrameStatus.OK
117
118
119class FrameDecoder:
120    """Decodes one or more HDLC frames from a stream of data."""
121
122    def __init__(self) -> None:
123        self._decoded_data = bytearray()
124        self._raw_data = bytearray()
125        self._state = _State.INTERFRAME
126
127    def process(self, data: bytes) -> Iterable[Frame]:
128        """Decodes and yields HDLC frames, including corrupt frames.
129
130        The ``ok()`` method on ``Frame`` indicates whether it is valid or
131        represents a frame parsing error.
132
133        Yields:
134          Frames, which may be valid (``frame.ok()``) or corrupt
135          (``!frame.ok()``)
136        """
137        for byte in data:
138            frame = self.process_byte(byte)
139            if frame:
140                yield frame
141
142    def process_valid_frames(self, data: bytes) -> Iterable[Frame]:
143        """Decodes and yields valid HDLC frames, logging any errors."""
144        for frame in self.process(data):
145            if frame.ok():
146                yield frame
147            else:
148                _LOG.warning(
149                    'Failed to decode frame: %s; discarded %d bytes',
150                    frame.status.value,
151                    len(frame.raw_encoded),
152                )
153                _LOG.debug('Discarded data: %s', frame.raw_encoded)
154
155    def _finish_frame(self, status: FrameStatus) -> Frame:
156        # HDLC frames always start and end with a flag character, though the
157        # character may be shared with other frames. Ensure the raw encoding of
158        # OK frames always includes the start and end flags for consistency.
159        if status is FrameStatus.OK:
160            if not self._raw_data.startswith(_FLAG_BYTE):
161                self._raw_data.insert(0, protocol.FLAG)
162
163        frame = Frame(bytes(self._raw_data), bytes(self._decoded_data), status)
164        self._raw_data.clear()
165        self._decoded_data.clear()
166        return frame
167
168    def process_byte(self, byte: int) -> Frame | None:
169        """Processes a single byte and returns a frame if one was completed."""
170        frame: Frame | None = None
171
172        self._raw_data.append(byte)
173
174        if self._state is _State.INTERFRAME:
175            if byte == protocol.FLAG:
176                if len(self._raw_data) != 1:
177                    frame = self._finish_frame(FrameStatus.FRAMING_ERROR)
178
179                self._state = _State.FRAME
180        elif self._state is _State.FRAME:
181            if byte == protocol.FLAG:
182                # On back to back frames, we may see a repeated FLAG byte.
183                if len(self._raw_data) > 1:
184                    frame = self._finish_frame(_check_frame(self._decoded_data))
185
186                self._state = _State.FRAME
187            elif byte == protocol.ESCAPE:
188                self._state = _State.FRAME_ESCAPE
189            else:
190                self._decoded_data.append(byte)
191        elif self._state is _State.FRAME_ESCAPE:
192            if byte == protocol.FLAG:
193                frame = self._finish_frame(FrameStatus.FRAMING_ERROR)
194                self._state = _State.FRAME
195            elif byte in protocol.VALID_ESCAPED_BYTES:
196                self._state = _State.FRAME
197                self._decoded_data.append(protocol.escape(byte))
198            else:
199                self._state = _State.INTERFRAME
200        else:
201            raise AssertionError(f'Invalid decoder state: {self._state}')
202
203        return frame
204
205
206class FrameAndNonFrameDecoder:
207    """Processes both HDLC frames and non-frame data in a stream."""
208
209    def __init__(
210        self,
211        non_frame_data_handler: Callable[[bytes], Any],
212        *,
213        mtu: int | None = None,
214        timeout_s: float | None = None,
215        handle_shared_flags: bool = True,
216    ) -> None:
217        """Yields valid HDLC frames and passes non-frame data to callback.
218
219        Args:
220          mtu: Maximum bytes to receive before flushing raw data. If a valid
221              HDLC frame contains more than MTU bytes, the valid frame will be
222              emitted, but part of the frame will be included in the raw data.
223          timeout_s: How long to wait before automatically flushing raw
224              data. If a timeout occurs partway through a valid frame, the frame
225              will be emitted, but part of the frame will be included in the raw
226              data.
227          handle_shared_flags: Whether to permit HDLC frames to share a single
228              flag byte between frames. If ``False``, partial HDLC frames may be
229              emitted as raw data when HDLC frames share a flag byte, but raw
230              data won't have to wait for a timeout or full MTU to be flushed.
231        """
232        self._non_frame_data_handler = non_frame_data_handler
233        self._mtu = mtu
234        self._shared_flags = handle_shared_flags
235        self._timeout_s = timeout_s
236
237        self._raw_data = bytearray()
238        self._hdlc_decoder = FrameDecoder()
239        self._last_data_time = time.time()
240        self._lock = threading.Lock()
241
242        if self._timeout_s is not None:
243            threading.Thread(target=self._timeout_thread, daemon=True).start()
244
245    def flush_non_frame_data(self) -> None:
246        """Flushes any data in the buffer as non-frame data.
247
248        If a valid HDLC frame was flushed partway, the data for the first part
249        of the frame will be included both in the raw data and in the frame.
250        """
251        with self._lock:
252            self._flush_non_frame()
253
254    def _flush_non_frame(self, to_index: int | None = None):
255        if self._raw_data:
256            self._non_frame_data_handler(bytes(self._raw_data[:to_index]))
257            del self._raw_data[:to_index]
258
259    def _timeout_thread(self) -> None:
260        assert self._timeout_s is not None
261
262        while True:
263            time.sleep(self._timeout_s)
264            with self._lock:
265                if time.time() - self._last_data_time > self._timeout_s:
266                    self._flush_non_frame()
267
268    def process(self, data: bytes) -> Iterable[Frame]:
269        """Processes a stream of mixed HDLC and unstructured data.
270
271        Yields OK frames and calls ``non_frame_data_handler()`` with non-HDLC
272        data.
273        """
274        with self._lock:
275            for byte in data:
276                yield from self._process_byte(byte)
277
278            # Flush the data if it is larger than the MTU, or flag bytes are not
279            # being shared and no initial flag was seen.
280            if (self._mtu is not None and len(self._raw_data) > self._mtu) or (
281                not self._shared_flags
282                and not self._raw_data.startswith(_FLAG_BYTE)
283            ):
284                self._flush_non_frame()
285
286            self._last_data_time = time.time()
287
288    def _process_byte(self, byte: int) -> Iterable[Frame]:
289        self._raw_data.append(byte)
290        frame = self._hdlc_decoder.process_byte(byte)
291
292        if frame is None:
293            return
294
295        if frame.ok():
296            # Drop the valid frame from the data. Only drop matching bytes in
297            # case the frame was flushed prematurely.
298            for suffix_byte in reversed(frame.raw_encoded):
299                if not self._raw_data or self._raw_data[-1] != suffix_byte:
300                    break
301                self._raw_data.pop()
302
303            self._flush_non_frame()  # Flush the raw data before the frame.
304
305            if self._mtu is not None and len(frame.raw_encoded) > self._mtu:
306                _LOG.warning(
307                    'Found a valid %d B HDLC frame, but the MTU is set to %d! '
308                    'The MTU setting may be incorrect.',
309                    self._mtu,
310                    len(frame.raw_encoded),
311                )
312
313            yield frame
314        else:
315            # Don't flush a final flag byte yet because it might be the start of
316            # an HDLC frame.
317            to_index = -1 if self._raw_data[-1] == protocol.FLAG else None
318            self._flush_non_frame(to_index)
319