1# Copyright 2020 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Decoder class for decoding bytes using HDLC protocol""" 15 16import enum 17import logging 18import threading 19import time 20from typing import Iterable, Callable, Any 21import zlib 22 23from pw_hdlc import protocol 24 25_LOG = logging.getLogger('pw_hdlc') 26 27NO_ADDRESS = -1 28_MIN_FRAME_SIZE = 6 # 1 B address + 1 B control + 4 B CRC-32 29_FLAG_BYTE = bytes([protocol.FLAG]) 30 31 32class FrameStatus(enum.Enum): 33 """Indicates that an error occurred.""" 34 35 OK = 'OK' 36 FCS_MISMATCH = 'frame check sequence failure' 37 FRAMING_ERROR = 'invalid flag or escape characters' 38 BAD_ADDRESS = 'address field too long' 39 40 41class Frame: 42 """Represents an HDLC frame.""" 43 44 def __init__( 45 self, 46 raw_encoded: bytes, 47 raw_decoded: bytes, 48 status: FrameStatus = FrameStatus.OK, 49 ): 50 """Parses fields from an HDLC frame. 51 52 Arguments: 53 raw_encoded: The complete HDLC-encoded frame, including any HDLC 54 flag bytes. In the case of back to back frames, the 55 beginning flag byte may be omitted. 56 raw_decoded: The complete decoded frame (address, control, 57 information, FCS). 58 status: Whether parsing the frame succeeded. 59 """ 60 self.raw_encoded = raw_encoded 61 self.raw_decoded = raw_decoded 62 self.status = status 63 64 self.address: int = NO_ADDRESS 65 self.control: bytes = b'' 66 self.data: bytes = b'' 67 68 if status == FrameStatus.OK: 69 address, address_length = protocol.decode_address(raw_decoded) 70 if address_length == 0: 71 self.status = FrameStatus.BAD_ADDRESS 72 return 73 74 self.address = address 75 self.control = raw_decoded[address_length : address_length + 1] 76 self.data = raw_decoded[address_length + 1 : -4] 77 78 def ok(self) -> bool: 79 """``True`` if this represents a valid frame. 80 81 If ``False``, then parsing failed. The status is set to indicate what 82 type of error occurred, and the data field contains all bytes parsed 83 from the frame (including bytes parsed as address or control bytes). 84 """ 85 return self.status is FrameStatus.OK 86 87 def __repr__(self) -> str: 88 if self.ok(): 89 body = ( 90 f'address={self.address}, control={self.control!r}, ' 91 f'data={self.data!r}' 92 ) 93 else: 94 body = ( 95 f'raw_encoded={self.raw_encoded!r}, ' 96 f'status={str(self.status)}' 97 ) 98 99 return f'{type(self).__name__}({body})' 100 101 102class _State(enum.Enum): 103 INTERFRAME = 0 104 FRAME = 1 105 FRAME_ESCAPE = 2 106 107 108def _check_frame(frame_data: bytes) -> FrameStatus: 109 if len(frame_data) < _MIN_FRAME_SIZE: 110 return FrameStatus.FRAMING_ERROR 111 112 frame_crc = int.from_bytes(frame_data[-4:], 'little') 113 if zlib.crc32(frame_data[:-4]) != frame_crc: 114 return FrameStatus.FCS_MISMATCH 115 116 return FrameStatus.OK 117 118 119class FrameDecoder: 120 """Decodes one or more HDLC frames from a stream of data.""" 121 122 def __init__(self) -> None: 123 self._decoded_data = bytearray() 124 self._raw_data = bytearray() 125 self._state = _State.INTERFRAME 126 127 def process(self, data: bytes) -> Iterable[Frame]: 128 """Decodes and yields HDLC frames, including corrupt frames. 129 130 The ``ok()`` method on ``Frame`` indicates whether it is valid or 131 represents a frame parsing error. 132 133 Yields: 134 Frames, which may be valid (``frame.ok()``) or corrupt 135 (``!frame.ok()``) 136 """ 137 for byte in data: 138 frame = self.process_byte(byte) 139 if frame: 140 yield frame 141 142 def process_valid_frames(self, data: bytes) -> Iterable[Frame]: 143 """Decodes and yields valid HDLC frames, logging any errors.""" 144 for frame in self.process(data): 145 if frame.ok(): 146 yield frame 147 else: 148 _LOG.warning( 149 'Failed to decode frame: %s; discarded %d bytes', 150 frame.status.value, 151 len(frame.raw_encoded), 152 ) 153 _LOG.debug('Discarded data: %s', frame.raw_encoded) 154 155 def _finish_frame(self, status: FrameStatus) -> Frame: 156 # HDLC frames always start and end with a flag character, though the 157 # character may be shared with other frames. Ensure the raw encoding of 158 # OK frames always includes the start and end flags for consistency. 159 if status is FrameStatus.OK: 160 if not self._raw_data.startswith(_FLAG_BYTE): 161 self._raw_data.insert(0, protocol.FLAG) 162 163 frame = Frame(bytes(self._raw_data), bytes(self._decoded_data), status) 164 self._raw_data.clear() 165 self._decoded_data.clear() 166 return frame 167 168 def process_byte(self, byte: int) -> Frame | None: 169 """Processes a single byte and returns a frame if one was completed.""" 170 frame: Frame | None = None 171 172 self._raw_data.append(byte) 173 174 if self._state is _State.INTERFRAME: 175 if byte == protocol.FLAG: 176 if len(self._raw_data) != 1: 177 frame = self._finish_frame(FrameStatus.FRAMING_ERROR) 178 179 self._state = _State.FRAME 180 elif self._state is _State.FRAME: 181 if byte == protocol.FLAG: 182 # On back to back frames, we may see a repeated FLAG byte. 183 if len(self._raw_data) > 1: 184 frame = self._finish_frame(_check_frame(self._decoded_data)) 185 186 self._state = _State.FRAME 187 elif byte == protocol.ESCAPE: 188 self._state = _State.FRAME_ESCAPE 189 else: 190 self._decoded_data.append(byte) 191 elif self._state is _State.FRAME_ESCAPE: 192 if byte == protocol.FLAG: 193 frame = self._finish_frame(FrameStatus.FRAMING_ERROR) 194 self._state = _State.FRAME 195 elif byte in protocol.VALID_ESCAPED_BYTES: 196 self._state = _State.FRAME 197 self._decoded_data.append(protocol.escape(byte)) 198 else: 199 self._state = _State.INTERFRAME 200 else: 201 raise AssertionError(f'Invalid decoder state: {self._state}') 202 203 return frame 204 205 206class FrameAndNonFrameDecoder: 207 """Processes both HDLC frames and non-frame data in a stream.""" 208 209 def __init__( 210 self, 211 non_frame_data_handler: Callable[[bytes], Any], 212 *, 213 mtu: int | None = None, 214 timeout_s: float | None = None, 215 handle_shared_flags: bool = True, 216 ) -> None: 217 """Yields valid HDLC frames and passes non-frame data to callback. 218 219 Args: 220 mtu: Maximum bytes to receive before flushing raw data. If a valid 221 HDLC frame contains more than MTU bytes, the valid frame will be 222 emitted, but part of the frame will be included in the raw data. 223 timeout_s: How long to wait before automatically flushing raw 224 data. If a timeout occurs partway through a valid frame, the frame 225 will be emitted, but part of the frame will be included in the raw 226 data. 227 handle_shared_flags: Whether to permit HDLC frames to share a single 228 flag byte between frames. If ``False``, partial HDLC frames may be 229 emitted as raw data when HDLC frames share a flag byte, but raw 230 data won't have to wait for a timeout or full MTU to be flushed. 231 """ 232 self._non_frame_data_handler = non_frame_data_handler 233 self._mtu = mtu 234 self._shared_flags = handle_shared_flags 235 self._timeout_s = timeout_s 236 237 self._raw_data = bytearray() 238 self._hdlc_decoder = FrameDecoder() 239 self._last_data_time = time.time() 240 self._lock = threading.Lock() 241 242 if self._timeout_s is not None: 243 threading.Thread(target=self._timeout_thread, daemon=True).start() 244 245 def flush_non_frame_data(self) -> None: 246 """Flushes any data in the buffer as non-frame data. 247 248 If a valid HDLC frame was flushed partway, the data for the first part 249 of the frame will be included both in the raw data and in the frame. 250 """ 251 with self._lock: 252 self._flush_non_frame() 253 254 def _flush_non_frame(self, to_index: int | None = None): 255 if self._raw_data: 256 self._non_frame_data_handler(bytes(self._raw_data[:to_index])) 257 del self._raw_data[:to_index] 258 259 def _timeout_thread(self) -> None: 260 assert self._timeout_s is not None 261 262 while True: 263 time.sleep(self._timeout_s) 264 with self._lock: 265 if time.time() - self._last_data_time > self._timeout_s: 266 self._flush_non_frame() 267 268 def process(self, data: bytes) -> Iterable[Frame]: 269 """Processes a stream of mixed HDLC and unstructured data. 270 271 Yields OK frames and calls ``non_frame_data_handler()`` with non-HDLC 272 data. 273 """ 274 with self._lock: 275 for byte in data: 276 yield from self._process_byte(byte) 277 278 # Flush the data if it is larger than the MTU, or flag bytes are not 279 # being shared and no initial flag was seen. 280 if (self._mtu is not None and len(self._raw_data) > self._mtu) or ( 281 not self._shared_flags 282 and not self._raw_data.startswith(_FLAG_BYTE) 283 ): 284 self._flush_non_frame() 285 286 self._last_data_time = time.time() 287 288 def _process_byte(self, byte: int) -> Iterable[Frame]: 289 self._raw_data.append(byte) 290 frame = self._hdlc_decoder.process_byte(byte) 291 292 if frame is None: 293 return 294 295 if frame.ok(): 296 # Drop the valid frame from the data. Only drop matching bytes in 297 # case the frame was flushed prematurely. 298 for suffix_byte in reversed(frame.raw_encoded): 299 if not self._raw_data or self._raw_data[-1] != suffix_byte: 300 break 301 self._raw_data.pop() 302 303 self._flush_non_frame() # Flush the raw data before the frame. 304 305 if self._mtu is not None and len(frame.raw_encoded) > self._mtu: 306 _LOG.warning( 307 'Found a valid %d B HDLC frame, but the MTU is set to %d! ' 308 'The MTU setting may be incorrect.', 309 self._mtu, 310 len(frame.raw_encoded), 311 ) 312 313 yield frame 314 else: 315 # Don't flush a final flag byte yet because it might be the start of 316 # an HDLC frame. 317 to_index = -1 if self._raw_data[-1] == protocol.FLAG else None 318 self._flush_non_frame(to_index) 319