1#!/usr/bin/env python 2# Copyright 2020 The Pigweed Authors 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may not 5# use this file except in compliance with the License. You may obtain a copy of 6# the License at 7# 8# https://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations under 14# the License. 15"""Contains the Python decoder tests and generates C++ decoder tests.""" 16 17from __future__ import annotations 18 19import queue 20from typing import Iterator, NamedTuple 21import unittest 22 23from pw_build.generated_tests import Context, PyTest, TestGenerator, GroupOrTest 24from pw_build.generated_tests import parse_test_generation_args 25from pw_hdlc.decode import ( 26 Frame, 27 FrameDecoder, 28 FrameAndNonFrameDecoder, 29 FrameStatus, 30 NO_ADDRESS, 31) 32from pw_hdlc.protocol import frame_check_sequence as fcs 33from pw_hdlc.protocol import encode_address 34 35 36def _encode(address: int, control: int, data: bytes) -> bytes: 37 frame = encode_address(address) + bytes([control]) + data 38 frame += fcs(frame) 39 frame = frame.replace(b'}', b'}\x5d') 40 frame = frame.replace(b'~', b'}\x5e') 41 return b''.join([b'~', frame, b'~']) 42 43 44class Expected(NamedTuple): 45 address: int 46 control: bytes 47 data: bytes 48 status: FrameStatus = FrameStatus.OK 49 50 @classmethod 51 def error(cls, status: FrameStatus): 52 assert status is not FrameStatus.OK 53 return cls(NO_ADDRESS, b'', b'', status) 54 55 def __eq__(self, other) -> bool: 56 """Define == so an Expected and a Frame can be compared.""" 57 return ( 58 self.address == other.address 59 and self.control == other.control 60 and self.data == other.data 61 and self.status is other.status 62 ) 63 64 65class ExpectedRaw(NamedTuple): 66 raw_encoded: bytes 67 status: FrameStatus 68 69 def __eq__(self, other) -> bool: 70 """Define == so an ExpectedRaw and a Frame can be compared.""" 71 return ( 72 self.raw_encoded == other.raw_encoded 73 and self.status is other.status 74 ) 75 76 77class TestCase(NamedTuple): 78 data: bytes 79 frames: list[Expected | ExpectedRaw] 80 raw_data: bytes 81 82 83def case(data: bytes, frames: list, raw: bytes | None = None) -> TestCase: 84 """Creates a TestCase, filling in the default value for the raw bytes.""" 85 if raw is not None: 86 return TestCase(data, frames, raw) 87 if not frames or all(f.status is not FrameStatus.OK for f in frames): 88 return TestCase(data, frames, data) 89 if all(f.status is FrameStatus.OK for f in frames): 90 return TestCase(data, frames, b'') 91 raise AssertionError( 92 f'Must specify expected non-frame data for this test case ({data=})!' 93 ) 94 95 96_PARTIAL = fcs(b'\x0ACmsg\x5e') 97_ESCAPED_FLAG_TEST_CASE = case( 98 b'~\x0ACmsg}~' + _PARTIAL + b'~', 99 [ 100 Expected.error(FrameStatus.FRAMING_ERROR), 101 Expected.error(FrameStatus.FRAMING_ERROR), 102 ], 103) 104 105# Test cases are a tuple with the following elements: 106# 107# - raw data stream 108# - expected valid & invalid frames 109# - [optional] expected raw, non-HDLC data; defaults to the full raw data 110# stream if no valid frames are expected, or b'' if only valid frames are 111# expected 112# 113# These tests are executed twice: once for the standard HDLC decoder, and a 114# second time for the FrameAndNonFrameDecoder. The FrameAndNonFrameDecoder tests 115# flush the non-frame data to simulate a timeout or MTU overflow, so the 116# expected raw data includes all bytes not in an HDLC frame. 117TEST_CASES: tuple[GroupOrTest[TestCase], ...] = ( 118 'Empty payload', 119 case(_encode(0, 0, b''), [Expected(0, b'\0', b'')]), 120 case(_encode(55, 0x99, b''), [Expected(55, b'\x99', b'')]), 121 case(_encode(55, 0x99, b'') * 3, [Expected(55, b'\x99', b'')] * 3), 122 'Simple one-byte payload', 123 case(_encode(0, 0, b'\0'), [Expected(0, b'\0', b'\0')]), 124 case(_encode(123, 0, b'A'), [Expected(123, b'\0', b'A')]), 125 'Simple multi-byte payload', 126 case( 127 _encode(0, 0, b'Hello, world!'), [Expected(0, b'\0', b'Hello, world!')] 128 ), 129 case(_encode(123, 0, b'\0\0\1\0\0'), [Expected(123, b'\0', b'\0\0\1\0\0')]), 130 'Escaped one-byte payload', 131 case(_encode(1, 2, b'~'), [Expected(1, b'\2', b'~')]), 132 case(_encode(1, 2, b'}'), [Expected(1, b'\2', b'}')]), 133 case( 134 _encode(1, 2, b'~') + _encode(1, 2, b'}'), 135 [Expected(1, b'\2', b'~'), Expected(1, b'\2', b'}')], 136 ), 137 'Escaped address', 138 case(_encode(0x7E, 0, b'A'), [Expected(0x7E, b'\0', b'A')]), 139 case(_encode(0x7D, 0, b'B'), [Expected(0x7D, b'\0', b'B')]), 140 'Escaped control', 141 case(_encode(0, 0x7E, b'C'), [Expected(0, b'~', b'C')]), 142 case(_encode(0, 0x7D, b'D'), [Expected(0, b'}', b'D')]), 143 'Escaped address and control', 144 case(_encode(0x7E, 0x7D, b'E'), [Expected(0x7E, b'}', b'E')]), 145 case(_encode(0x7D, 0x7E, b'F'), [Expected(0x7D, b'~', b'F')]), 146 case(_encode(0x7E, 0x7E, b'~'), [Expected(0x7E, b'~', b'~')]), 147 'Multibyte address', 148 case( 149 _encode(128, 0, b'big address'), [Expected(128, b'\0', b'big address')] 150 ), 151 case( 152 _encode(0xFFFFFFFF, 0, b'\0\0\1\0\0'), 153 [Expected(0xFFFFFFFF, b'\0', b'\0\0\1\0\0')], 154 ), 155 'Multiple frames separated by single flag', 156 case( 157 _encode(0, 0, b'A')[:-1] + _encode(1, 2, b'123'), 158 [Expected(0, b'\0', b'A'), Expected(1, b'\2', b'123')], 159 ), 160 case( 161 _encode(0xFF, 0, b'Yo')[:-1] * 3 + b'~', 162 [Expected(0xFF, b'\0', b'Yo')] * 3, 163 ), 164 'Empty frames produce framing errors with raw data', 165 case(b'~~', [ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR)], b'~~'), 166 case( 167 b'~' * 10, 168 [ 169 ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), 170 ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), 171 ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), 172 ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), 173 ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), 174 ], 175 ), 176 case( 177 b'~~' + _encode(1, 2, b'3') + b'~' * 5, 178 [ 179 ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), 180 Expected(1, b'\2', b'3'), 181 ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), 182 ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), 183 # One flag byte remains in the decoding state machine. 184 ], 185 b'~~~~~~~', 186 ), 187 case( 188 b'~' * 10 + _encode(1, 2, b':O') + b'~' * 3 + _encode(3, 4, b':P'), 189 [ 190 ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), 191 ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), 192 ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), 193 ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), 194 ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), 195 Expected(1, b'\2', b':O'), 196 ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), 197 ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), 198 Expected(3, b'\4', b':P'), 199 ], 200 b'~' * 13, 201 ), 202 'Cannot escape flag', 203 case( 204 b'~\xAA}~\xab\x00Hello' + fcs(b'\xab\0Hello') + b'~', 205 [ 206 Expected.error(FrameStatus.FRAMING_ERROR), 207 Expected(0x55, b'\0', b'Hello'), 208 ], 209 b'~\xAA}', 210 ), 211 _ESCAPED_FLAG_TEST_CASE, 212 'Frame too short', 213 case(b'~1~', [Expected.error(FrameStatus.FRAMING_ERROR)]), 214 case(b'~12~', [Expected.error(FrameStatus.FRAMING_ERROR)]), 215 case(b'~12345~', [Expected.error(FrameStatus.FRAMING_ERROR)]), 216 'Multibyte address too long', 217 case( 218 _encode(2**100, 0, b'too long'), 219 [Expected.error(FrameStatus.BAD_ADDRESS)], 220 ), 221 'Incorrect frame check sequence', 222 case(b'~123456~', [Expected.error(FrameStatus.FCS_MISMATCH)]), 223 case( 224 b'~\1\2msg\xff\xff\xff\xff~', [Expected.error(FrameStatus.FCS_MISMATCH)] 225 ), 226 case( 227 _encode(0xA, 0xB, b'???')[:-2] + _encode(1, 2, b'def'), 228 [ 229 Expected.error(FrameStatus.FCS_MISMATCH), 230 Expected(1, b'\2', b'def'), 231 ], 232 _encode(0xA, 0xB, b'???')[:-2], 233 ), 234 'Invalid escape in address', 235 case( 236 b'~}}\0' + fcs(b'\x5d\0') + b'~', 237 [Expected.error(FrameStatus.FRAMING_ERROR)], 238 ), 239 'Invalid escape in control', 240 case( 241 b'~\0}}' + fcs(b'\0\x5d') + b'~', 242 [Expected.error(FrameStatus.FRAMING_ERROR)], 243 ), 244 'Invalid escape in data', 245 case( 246 b'~\0\1}}' + fcs(b'\0\1\x5d') + b'~', 247 [Expected.error(FrameStatus.FRAMING_ERROR)], 248 ), 249 'Frame ends with escape', 250 case(b'~}~', [Expected.error(FrameStatus.FRAMING_ERROR)]), 251 case(b'~\1}~', [Expected.error(FrameStatus.FRAMING_ERROR)]), 252 case(b'~\1\2abc}~', [Expected.error(FrameStatus.FRAMING_ERROR)]), 253 case(b'~\1\2abcd}~', [Expected.error(FrameStatus.FRAMING_ERROR)]), 254 case(b'~\1\2abcd1234}~', [Expected.error(FrameStatus.FRAMING_ERROR)]), 255 'Inter-frame data is only escapes', 256 case( 257 b'~}~}~', 258 [ 259 Expected.error(FrameStatus.FRAMING_ERROR), 260 Expected.error(FrameStatus.FRAMING_ERROR), 261 ], 262 ), 263 case( 264 b'~}}~}}~', 265 [ 266 Expected.error(FrameStatus.FRAMING_ERROR), 267 Expected.error(FrameStatus.FRAMING_ERROR), 268 ], 269 ), 270 'Data before first flag', 271 case(b'\0\1' + fcs(b'\0\1'), []), 272 case( 273 b'\0\1' + fcs(b'\0\1') + b'~', 274 [Expected.error(FrameStatus.FRAMING_ERROR)], 275 ), 276 'No frames emitted until flag', 277 case(_encode(1, 2, b'3')[:-1], []), 278 case(b'~' + _encode(1, 2, b'3')[1:-1] * 2, []), 279 'Only flag and escape characters can be escaped', 280 case( 281 b'~}\0' + _encode(1, 2, b'3'), 282 [Expected.error(FrameStatus.FRAMING_ERROR), Expected(1, b'\2', b'3')], 283 b'~}\0', 284 ), 285 case( 286 b'~1234}a' + _encode(1, 2, b'3'), 287 [Expected.error(FrameStatus.FRAMING_ERROR), Expected(1, b'\2', b'3')], 288 b'~1234}a', 289 ), 290 'Invalid frame records raw data', 291 case(b'Hello?~', [ExpectedRaw(b'Hello?~', FrameStatus.FRAMING_ERROR)]), 292 case( 293 b'~~Hel}}lo~', 294 [ 295 Expected.error(FrameStatus.FRAMING_ERROR), 296 ExpectedRaw(b'Hel}}lo~', FrameStatus.FRAMING_ERROR), 297 ], 298 ), 299 case( 300 b'Hello?~~~~~', 301 [ 302 ExpectedRaw(b'Hello?~', FrameStatus.FRAMING_ERROR), 303 Expected.error(FrameStatus.FRAMING_ERROR), 304 Expected.error(FrameStatus.FRAMING_ERROR), 305 ], 306 ), 307 case( 308 b'~~~~Hello?~~~~~', 309 [ 310 ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), 311 ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), 312 ExpectedRaw(b'Hello?~', FrameStatus.FCS_MISMATCH), 313 ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), 314 ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), 315 ], 316 ), 317 case( 318 b'Hello?~~Goodbye~', 319 [ 320 ExpectedRaw(b'Hello?~', FrameStatus.FRAMING_ERROR), 321 ExpectedRaw(b'~Goodbye~', FrameStatus.FCS_MISMATCH), 322 ], 323 ), 324 'Valid data followed by frame followed by invalid', 325 case( 326 b'Hi~ this is a log message\r\n' 327 + _encode(0, 0, b'') 328 + b'More log messages!\r\n', 329 [ 330 Expected.error(FrameStatus.FRAMING_ERROR), 331 Expected.error(FrameStatus.FCS_MISMATCH), 332 Expected(0, b'\0', b''), 333 ], 334 b'Hi~ this is a log message\r\nMore log messages!\r\n', 335 ), 336 case( 337 b'Hi~ this is a log message\r\n', 338 [Expected.error(FrameStatus.FRAMING_ERROR)], 339 ), 340 case( 341 b'~Hi~' + _encode(1, 2, b'def') + b' How are you?', 342 [Expected.error(FrameStatus.FRAMING_ERROR), Expected(1, b'\2', b'def')], 343 b'~Hi~ How are you?', 344 ), 345) 346# Formatting for the above tuple is very slow, so disable yapf. Manually enable 347# it as needed to format the test cases. 348 349_TESTS = TestGenerator(TEST_CASES) 350 351 352def _expected(frames: list[Frame]) -> Iterator[str]: 353 for i, frame in enumerate(frames, 1): 354 if frame.ok(): 355 yield f' Frame::Parse(kDecodedFrame{i:02}).value(),' 356 elif frame.status is FrameStatus.BAD_ADDRESS: 357 yield f' Frame::Parse(kDecodedFrame{i:02}).status(),' 358 else: 359 yield f' Status::DataLoss(), // Frame {i}' 360 361 362_CPP_HEADER = """\ 363#include "pw_hdlc/decoder.h" 364 365#include <array> 366#include <cstddef> 367#include <variant> 368 369#include "pw_unit_test/framework.h" 370#include "pw_bytes/array.h" 371 372namespace pw::hdlc { 373namespace { 374""" 375 376_CPP_FOOTER = """\ 377} // namespace 378} // namespace pw::hdlc""" 379 380_TS_HEADER = """\ 381import 'jasmine'; 382 383import {Buffer} from 'buffer'; 384 385import {Decoder, FrameStatus} from './decoder' 386import * as protocol from './protocol' 387import * as util from './util' 388 389class Expected { 390 address: number 391 control: Uint8Array 392 data: Uint8Array 393 status: FrameStatus 394 395 constructor( 396 address: number, 397 control: Uint8Array, 398 data: Uint8Array, 399 status: FrameStatus) { 400 this.address = address; 401 this.control = control; 402 this.data = data; 403 this.status = status; 404 } 405} 406 407class ExpectedRaw { 408 raw: Uint8Array 409 status: FrameStatus 410 411 constructor(raw: Uint8Array, status: FrameStatus) { 412 this.status = status; 413 this.raw = raw; 414 } 415} 416 417describe('Decoder', () => { 418 let decoder: Decoder; 419 let textEncoder: TextEncoder; 420 421 beforeEach(() => { 422 decoder = new Decoder(); 423 textEncoder = new TextEncoder(); 424 }); 425 426""" 427_TS_FOOTER = """\ 428}); 429""" 430 431 432def _py_only_frame(frame: Frame) -> bool: 433 """Returns true for frames only returned by the Python library""" 434 return ( 435 frame.status is FrameStatus.FRAMING_ERROR and frame.raw_encoded == b'~~' 436 ) 437 438 439def _cpp_test(ctx: Context) -> Iterator[str]: 440 """Generates a C++ test for the provided test data.""" 441 data, _, _ = ctx.test_case 442 frames = [ 443 f for f in list(FrameDecoder().process(data)) if not _py_only_frame(f) 444 ] 445 data_bytes = ''.join(rf'\x{byte:02x}' for byte in data) 446 447 yield f'TEST(Decoder, {ctx.cc_name()}) {{' 448 yield f' static constexpr auto kData = bytes::String("{data_bytes}");\n' 449 450 for i, frame in enumerate(frames, 1): 451 if frame.ok() or frame.status is FrameStatus.BAD_ADDRESS: 452 frame_bytes = ''.join( 453 rf'\x{byte:02x}' for byte in frame.raw_decoded 454 ) 455 yield ( 456 f' static constexpr auto kDecodedFrame{i:02} = ' 457 f'bytes::String("{frame_bytes}");' 458 ) 459 else: 460 yield f' // Frame {i}: {frame.status.value}' 461 462 yield '' 463 464 expected = '\n'.join(_expected(frames)) or ' // No frames' 465 decoder_size = max(len(data), 8) # Make sure large enough for a frame 466 467 yield f"""\ 468 DecoderBuffer<{decoder_size}> decoder; 469 470 static std::array<std::variant<Frame, Status>, {len(frames)}> kExpected = {{ 471{expected} 472 }}; 473 474 size_t decoded_frames = 0; 475 476 decoder.Process(kData, [&](const Result<Frame>& result) {{ 477 ASSERT_LT(decoded_frames++, kExpected.size()); 478 auto& expected = kExpected[decoded_frames - 1]; 479 480 if (std::holds_alternative<Status>(expected)) {{ 481 EXPECT_EQ(Status::DataLoss(), result.status()); 482 }} else {{ 483 ASSERT_EQ(OkStatus(), result.status()); 484 485 const Frame& decoded_frame = result.value(); 486 const Frame& expected_frame = std::get<Frame>(expected); 487 EXPECT_EQ(expected_frame.address(), decoded_frame.address()); 488 EXPECT_EQ(expected_frame.control(), decoded_frame.control()); 489 ASSERT_EQ(expected_frame.data().size(), decoded_frame.data().size()); 490 EXPECT_EQ(std::memcmp(expected_frame.data().data(), 491 decoded_frame.data().data(), 492 expected_frame.data().size()), 493 0); 494 }} 495 }}); 496 497 EXPECT_EQ(decoded_frames, kExpected.size()); 498}}""" 499 500 501def _define_py_decoder_test(ctx: Context) -> PyTest: 502 data, expected_frames, _ = ctx.test_case 503 504 def test(self) -> None: 505 self.maxDiff = None 506 # Decode in one call 507 self.assertEqual( 508 expected_frames, 509 list(FrameDecoder().process(data)), 510 msg=f'{ctx.group}: {data!r}', 511 ) 512 # Decode byte-by-byte 513 decoder = FrameDecoder() 514 decoded_frames: list[Frame] = [] 515 for i in range(len(data)): 516 decoded_frames += decoder.process(data[i : i + 1]) 517 518 self.assertEqual( 519 expected_frames, 520 decoded_frames, 521 msg=f'{ctx.group} (byte-by-byte): {data!r}', 522 ) 523 524 return test 525 526 527def _define_raw_decoder_py_test(ctx: Context) -> PyTest: 528 raw_data, expected_frames, expected_non_frame_data = ctx.test_case 529 530 # The non-frame data decoder only yields valid frames. 531 expected_frames = [f for f in expected_frames if f.status is FrameStatus.OK] 532 533 def test(self) -> None: 534 self.maxDiff = None 535 536 non_frame_data = bytearray() 537 538 # Decode in one call 539 decoder = FrameAndNonFrameDecoder( 540 non_frame_data_handler=non_frame_data.extend 541 ) 542 543 self.assertEqual( 544 expected_frames, 545 list(decoder.process(raw_data)), 546 msg=f'{ctx.group}: {raw_data!r}', 547 ) 548 549 decoder.flush_non_frame_data() 550 self.assertEqual(expected_non_frame_data, bytes(non_frame_data)) 551 552 # Decode byte-by-byte 553 non_frame_data.clear() 554 decoder = FrameAndNonFrameDecoder( 555 non_frame_data_handler=non_frame_data.extend 556 ) 557 decoded_frames: list[Frame] = [] 558 for i in range(len(raw_data)): 559 decoded_frames += decoder.process(raw_data[i : i + 1]) 560 561 self.assertEqual( 562 expected_frames, 563 decoded_frames, 564 msg=f'{ctx.group} (byte-by-byte): {raw_data!r}', 565 ) 566 decoder.flush_non_frame_data() 567 self.assertEqual(expected_non_frame_data, bytes(non_frame_data)) 568 569 return test 570 571 572def _ts_byte_array(data: bytes) -> str: 573 return '[' + ', '.join(rf'0x{byte:02x}' for byte in data) + ']' 574 575 576def _ts_test(ctx: Context) -> Iterator[str]: 577 """Generates a TS test for the provided test data.""" 578 data, _, _ = ctx.test_case 579 frames = [ 580 f for f in list(FrameDecoder().process(data)) if not _py_only_frame(f) 581 ] 582 data_bytes = _ts_byte_array(data) 583 584 yield f' it(\'{ctx.ts_name()}\', () => {{' 585 yield f' const data = new Uint8Array({data_bytes});' 586 587 yield ' const expectedFrames = [' 588 for frame in frames: 589 control_bytes = _ts_byte_array(frame.control) 590 frame_bytes = _ts_byte_array(frame.data) 591 592 if frame is Expected: 593 yield ( 594 f' new Expected({frame.address}, ' 595 f'new Uint8Array({control_bytes}), ' 596 f'new Uint8Array({frame_bytes}), {frame.status}),' 597 ) 598 else: 599 raw = _ts_byte_array(frame.raw_encoded) 600 yield ( 601 f' new ExpectedRaw(new Uint8Array({raw}), {frame.status}),' 602 ) 603 604 yield ' ].values();\n' 605 606 yield """\ 607 const result = decoder.process(data); 608 609 while (true) { 610 const expectedFrame = expectedFrames.next(); 611 const actualFrame = result.next(); 612 if (expectedFrame.done && actualFrame.done) { 613 break; 614 } 615 expect(expectedFrame.done).toBeFalse(); 616 expect(actualFrame.done).toBeFalse(); 617 618 const expected = expectedFrame.value; 619 const actual = actualFrame.value; 620 if (expected instanceof Expected) { 621 expect(actual.address).toEqual(expected.address); 622 expect(actual.control).toEqual(expected.control); 623 expect(actual.data).toEqual(expected.data); 624 expect(actual.status).toEqual(expected.status); 625 } else { 626 // Expected Raw 627 expect(actual.rawEncoded).toEqual(expected.raw); 628 expect(actual.status).toEqual(expected.status); 629 } 630 } 631 }); 632""" 633 634 635# Class that tests all cases in TEST_CASES. 636DecoderTest = _TESTS.python_tests('DecoderTest', _define_py_decoder_test) 637NonFrameDecoderTest = _TESTS.python_tests( 638 'NonFrameDecoderTest', _define_raw_decoder_py_test 639) 640 641 642class AdditionalNonFrameDecoderTests(unittest.TestCase): 643 """Additional tests for the non-frame decoder.""" 644 645 def test_shared_flags_waits_for_tilde_to_emit_data(self) -> None: 646 non_frame_data = bytearray() 647 decoder = FrameAndNonFrameDecoder(non_frame_data.extend) 648 649 self.assertEqual( 650 [Expected(0, b'\0', b'')], list(decoder.process(_encode(0, 0, b''))) 651 ) 652 self.assertEqual(non_frame_data, b'') 653 654 self.assertEqual([], list(decoder.process(b'uh oh, no tilde!'))) 655 self.assertEqual(non_frame_data, b'') 656 657 self.assertEqual([], list(decoder.process(b'~'))) 658 self.assertEqual(non_frame_data, b'uh oh, no tilde!') 659 660 def test_no_shared_flags_immediately_emits_data(self) -> None: 661 non_frame_data = bytearray() 662 decoder = FrameAndNonFrameDecoder( 663 non_frame_data.extend, handle_shared_flags=False 664 ) 665 666 self.assertEqual( 667 [Expected(0, b'\0', b'')], list(decoder.process(_encode(0, 0, b''))) 668 ) 669 self.assertEqual(non_frame_data, b'') 670 671 self.assertEqual([], list(decoder.process(b'uh oh, no tilde!'))) 672 self.assertEqual(non_frame_data, b'uh oh, no tilde!') 673 674 def test_emits_data_if_mtu_is_exceeded(self) -> None: 675 frame_start = b'~this looks like a real frame' 676 677 non_frame_data = bytearray() 678 decoder = FrameAndNonFrameDecoder( 679 non_frame_data.extend, mtu=len(frame_start) 680 ) 681 682 self.assertEqual([], list(decoder.process(frame_start))) 683 self.assertEqual(non_frame_data, b'') 684 685 self.assertEqual([], list(decoder.process(b'!'))) 686 self.assertEqual(non_frame_data, frame_start + b'!') 687 688 def test_emits_data_if_timeout_expires(self) -> None: 689 frame_start = b'~this looks like a real frame' 690 691 non_frame_data: queue.Queue[bytes] = queue.Queue() 692 decoder = FrameAndNonFrameDecoder(non_frame_data.put, timeout_s=0.001) 693 694 self.assertEqual([], list(decoder.process(frame_start))) 695 self.assertEqual(non_frame_data.get(timeout=2), frame_start) 696 697 def test_emits_raw_data_and_valid_frame_if_flushed_partway(self) -> None: 698 payload = b'Do you wanna ride in my blimp?' 699 frame = _encode(1, 2, payload) 700 701 non_frame_data = bytearray() 702 decoder = FrameAndNonFrameDecoder(non_frame_data.extend) 703 704 self.assertEqual([], list(decoder.process(frame[:5]))) 705 decoder.flush_non_frame_data() 706 707 self.assertEqual( 708 [Expected(1, b'\2', payload)], list(decoder.process(frame[5:])) 709 ) 710 711 712if __name__ == '__main__': 713 args = parse_test_generation_args() 714 if args.generate_cc_test: 715 _TESTS.cc_tests( 716 args.generate_cc_test, _cpp_test, _CPP_HEADER, _CPP_FOOTER 717 ) 718 elif args.generate_ts_test: 719 _TESTS.ts_tests(args.generate_ts_test, _ts_test, _TS_HEADER, _TS_FOOTER) 720 else: 721 unittest.main() 722