xref: /aosp_15_r20/external/pigweed/pw_hdlc/py/decode_test.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1#!/usr/bin/env python
2# Copyright 2020 The Pigweed Authors
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5# use this file except in compliance with the License. You may obtain a copy of
6# the License at
7#
8#     https://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations under
14# the License.
15"""Contains the Python decoder tests and generates C++ decoder tests."""
16
17from __future__ import annotations
18
19import queue
20from typing import Iterator, NamedTuple
21import unittest
22
23from pw_build.generated_tests import Context, PyTest, TestGenerator, GroupOrTest
24from pw_build.generated_tests import parse_test_generation_args
25from pw_hdlc.decode import (
26    Frame,
27    FrameDecoder,
28    FrameAndNonFrameDecoder,
29    FrameStatus,
30    NO_ADDRESS,
31)
32from pw_hdlc.protocol import frame_check_sequence as fcs
33from pw_hdlc.protocol import encode_address
34
35
36def _encode(address: int, control: int, data: bytes) -> bytes:
37    frame = encode_address(address) + bytes([control]) + data
38    frame += fcs(frame)
39    frame = frame.replace(b'}', b'}\x5d')
40    frame = frame.replace(b'~', b'}\x5e')
41    return b''.join([b'~', frame, b'~'])
42
43
44class Expected(NamedTuple):
45    address: int
46    control: bytes
47    data: bytes
48    status: FrameStatus = FrameStatus.OK
49
50    @classmethod
51    def error(cls, status: FrameStatus):
52        assert status is not FrameStatus.OK
53        return cls(NO_ADDRESS, b'', b'', status)
54
55    def __eq__(self, other) -> bool:
56        """Define == so an Expected and a Frame can be compared."""
57        return (
58            self.address == other.address
59            and self.control == other.control
60            and self.data == other.data
61            and self.status is other.status
62        )
63
64
65class ExpectedRaw(NamedTuple):
66    raw_encoded: bytes
67    status: FrameStatus
68
69    def __eq__(self, other) -> bool:
70        """Define == so an ExpectedRaw and a Frame can be compared."""
71        return (
72            self.raw_encoded == other.raw_encoded
73            and self.status is other.status
74        )
75
76
77class TestCase(NamedTuple):
78    data: bytes
79    frames: list[Expected | ExpectedRaw]
80    raw_data: bytes
81
82
83def case(data: bytes, frames: list, raw: bytes | None = None) -> TestCase:
84    """Creates a TestCase, filling in the default value for the raw bytes."""
85    if raw is not None:
86        return TestCase(data, frames, raw)
87    if not frames or all(f.status is not FrameStatus.OK for f in frames):
88        return TestCase(data, frames, data)
89    if all(f.status is FrameStatus.OK for f in frames):
90        return TestCase(data, frames, b'')
91    raise AssertionError(
92        f'Must specify expected non-frame data for this test case ({data=})!'
93    )
94
95
96_PARTIAL = fcs(b'\x0ACmsg\x5e')
97_ESCAPED_FLAG_TEST_CASE = case(
98    b'~\x0ACmsg}~' + _PARTIAL + b'~',
99    [
100        Expected.error(FrameStatus.FRAMING_ERROR),
101        Expected.error(FrameStatus.FRAMING_ERROR),
102    ],
103)
104
105# Test cases are a tuple with the following elements:
106#
107#   - raw data stream
108#   - expected valid & invalid frames
109#   - [optional] expected raw, non-HDLC data; defaults to the full raw data
110#     stream if no valid frames are expected, or b'' if only valid frames are
111#     expected
112#
113# These tests are executed twice: once for the standard HDLC decoder, and a
114# second time for the FrameAndNonFrameDecoder. The FrameAndNonFrameDecoder tests
115# flush the non-frame data to simulate a timeout or MTU overflow, so the
116# expected raw data includes all bytes not in an HDLC frame.
117TEST_CASES: tuple[GroupOrTest[TestCase], ...] = (
118    'Empty payload',
119    case(_encode(0, 0, b''), [Expected(0, b'\0', b'')]),
120    case(_encode(55, 0x99, b''), [Expected(55, b'\x99', b'')]),
121    case(_encode(55, 0x99, b'') * 3, [Expected(55, b'\x99', b'')] * 3),
122    'Simple one-byte payload',
123    case(_encode(0, 0, b'\0'), [Expected(0, b'\0', b'\0')]),
124    case(_encode(123, 0, b'A'), [Expected(123, b'\0', b'A')]),
125    'Simple multi-byte payload',
126    case(
127        _encode(0, 0, b'Hello, world!'), [Expected(0, b'\0', b'Hello, world!')]
128    ),
129    case(_encode(123, 0, b'\0\0\1\0\0'), [Expected(123, b'\0', b'\0\0\1\0\0')]),
130    'Escaped one-byte payload',
131    case(_encode(1, 2, b'~'), [Expected(1, b'\2', b'~')]),
132    case(_encode(1, 2, b'}'), [Expected(1, b'\2', b'}')]),
133    case(
134        _encode(1, 2, b'~') + _encode(1, 2, b'}'),
135        [Expected(1, b'\2', b'~'), Expected(1, b'\2', b'}')],
136    ),
137    'Escaped address',
138    case(_encode(0x7E, 0, b'A'), [Expected(0x7E, b'\0', b'A')]),
139    case(_encode(0x7D, 0, b'B'), [Expected(0x7D, b'\0', b'B')]),
140    'Escaped control',
141    case(_encode(0, 0x7E, b'C'), [Expected(0, b'~', b'C')]),
142    case(_encode(0, 0x7D, b'D'), [Expected(0, b'}', b'D')]),
143    'Escaped address and control',
144    case(_encode(0x7E, 0x7D, b'E'), [Expected(0x7E, b'}', b'E')]),
145    case(_encode(0x7D, 0x7E, b'F'), [Expected(0x7D, b'~', b'F')]),
146    case(_encode(0x7E, 0x7E, b'~'), [Expected(0x7E, b'~', b'~')]),
147    'Multibyte address',
148    case(
149        _encode(128, 0, b'big address'), [Expected(128, b'\0', b'big address')]
150    ),
151    case(
152        _encode(0xFFFFFFFF, 0, b'\0\0\1\0\0'),
153        [Expected(0xFFFFFFFF, b'\0', b'\0\0\1\0\0')],
154    ),
155    'Multiple frames separated by single flag',
156    case(
157        _encode(0, 0, b'A')[:-1] + _encode(1, 2, b'123'),
158        [Expected(0, b'\0', b'A'), Expected(1, b'\2', b'123')],
159    ),
160    case(
161        _encode(0xFF, 0, b'Yo')[:-1] * 3 + b'~',
162        [Expected(0xFF, b'\0', b'Yo')] * 3,
163    ),
164    'Empty frames produce framing errors with raw data',
165    case(b'~~', [ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR)], b'~~'),
166    case(
167        b'~' * 10,
168        [
169            ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR),
170            ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR),
171            ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR),
172            ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR),
173            ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR),
174        ],
175    ),
176    case(
177        b'~~' + _encode(1, 2, b'3') + b'~' * 5,
178        [
179            ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR),
180            Expected(1, b'\2', b'3'),
181            ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR),
182            ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR),
183            # One flag byte remains in the decoding state machine.
184        ],
185        b'~~~~~~~',
186    ),
187    case(
188        b'~' * 10 + _encode(1, 2, b':O') + b'~' * 3 + _encode(3, 4, b':P'),
189        [
190            ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR),
191            ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR),
192            ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR),
193            ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR),
194            ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR),
195            Expected(1, b'\2', b':O'),
196            ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR),
197            ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR),
198            Expected(3, b'\4', b':P'),
199        ],
200        b'~' * 13,
201    ),
202    'Cannot escape flag',
203    case(
204        b'~\xAA}~\xab\x00Hello' + fcs(b'\xab\0Hello') + b'~',
205        [
206            Expected.error(FrameStatus.FRAMING_ERROR),
207            Expected(0x55, b'\0', b'Hello'),
208        ],
209        b'~\xAA}',
210    ),
211    _ESCAPED_FLAG_TEST_CASE,
212    'Frame too short',
213    case(b'~1~', [Expected.error(FrameStatus.FRAMING_ERROR)]),
214    case(b'~12~', [Expected.error(FrameStatus.FRAMING_ERROR)]),
215    case(b'~12345~', [Expected.error(FrameStatus.FRAMING_ERROR)]),
216    'Multibyte address too long',
217    case(
218        _encode(2**100, 0, b'too long'),
219        [Expected.error(FrameStatus.BAD_ADDRESS)],
220    ),
221    'Incorrect frame check sequence',
222    case(b'~123456~', [Expected.error(FrameStatus.FCS_MISMATCH)]),
223    case(
224        b'~\1\2msg\xff\xff\xff\xff~', [Expected.error(FrameStatus.FCS_MISMATCH)]
225    ),
226    case(
227        _encode(0xA, 0xB, b'???')[:-2] + _encode(1, 2, b'def'),
228        [
229            Expected.error(FrameStatus.FCS_MISMATCH),
230            Expected(1, b'\2', b'def'),
231        ],
232        _encode(0xA, 0xB, b'???')[:-2],
233    ),
234    'Invalid escape in address',
235    case(
236        b'~}}\0' + fcs(b'\x5d\0') + b'~',
237        [Expected.error(FrameStatus.FRAMING_ERROR)],
238    ),
239    'Invalid escape in control',
240    case(
241        b'~\0}}' + fcs(b'\0\x5d') + b'~',
242        [Expected.error(FrameStatus.FRAMING_ERROR)],
243    ),
244    'Invalid escape in data',
245    case(
246        b'~\0\1}}' + fcs(b'\0\1\x5d') + b'~',
247        [Expected.error(FrameStatus.FRAMING_ERROR)],
248    ),
249    'Frame ends with escape',
250    case(b'~}~', [Expected.error(FrameStatus.FRAMING_ERROR)]),
251    case(b'~\1}~', [Expected.error(FrameStatus.FRAMING_ERROR)]),
252    case(b'~\1\2abc}~', [Expected.error(FrameStatus.FRAMING_ERROR)]),
253    case(b'~\1\2abcd}~', [Expected.error(FrameStatus.FRAMING_ERROR)]),
254    case(b'~\1\2abcd1234}~', [Expected.error(FrameStatus.FRAMING_ERROR)]),
255    'Inter-frame data is only escapes',
256    case(
257        b'~}~}~',
258        [
259            Expected.error(FrameStatus.FRAMING_ERROR),
260            Expected.error(FrameStatus.FRAMING_ERROR),
261        ],
262    ),
263    case(
264        b'~}}~}}~',
265        [
266            Expected.error(FrameStatus.FRAMING_ERROR),
267            Expected.error(FrameStatus.FRAMING_ERROR),
268        ],
269    ),
270    'Data before first flag',
271    case(b'\0\1' + fcs(b'\0\1'), []),
272    case(
273        b'\0\1' + fcs(b'\0\1') + b'~',
274        [Expected.error(FrameStatus.FRAMING_ERROR)],
275    ),
276    'No frames emitted until flag',
277    case(_encode(1, 2, b'3')[:-1], []),
278    case(b'~' + _encode(1, 2, b'3')[1:-1] * 2, []),
279    'Only flag and escape characters can be escaped',
280    case(
281        b'~}\0' + _encode(1, 2, b'3'),
282        [Expected.error(FrameStatus.FRAMING_ERROR), Expected(1, b'\2', b'3')],
283        b'~}\0',
284    ),
285    case(
286        b'~1234}a' + _encode(1, 2, b'3'),
287        [Expected.error(FrameStatus.FRAMING_ERROR), Expected(1, b'\2', b'3')],
288        b'~1234}a',
289    ),
290    'Invalid frame records raw data',
291    case(b'Hello?~', [ExpectedRaw(b'Hello?~', FrameStatus.FRAMING_ERROR)]),
292    case(
293        b'~~Hel}}lo~',
294        [
295            Expected.error(FrameStatus.FRAMING_ERROR),
296            ExpectedRaw(b'Hel}}lo~', FrameStatus.FRAMING_ERROR),
297        ],
298    ),
299    case(
300        b'Hello?~~~~~',
301        [
302            ExpectedRaw(b'Hello?~', FrameStatus.FRAMING_ERROR),
303            Expected.error(FrameStatus.FRAMING_ERROR),
304            Expected.error(FrameStatus.FRAMING_ERROR),
305        ],
306    ),
307    case(
308        b'~~~~Hello?~~~~~',
309        [
310            ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR),
311            ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR),
312            ExpectedRaw(b'Hello?~', FrameStatus.FCS_MISMATCH),
313            ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR),
314            ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR),
315        ],
316    ),
317    case(
318        b'Hello?~~Goodbye~',
319        [
320            ExpectedRaw(b'Hello?~', FrameStatus.FRAMING_ERROR),
321            ExpectedRaw(b'~Goodbye~', FrameStatus.FCS_MISMATCH),
322        ],
323    ),
324    'Valid data followed by frame followed by invalid',
325    case(
326        b'Hi~ this is a log message\r\n'
327        + _encode(0, 0, b'')
328        + b'More log messages!\r\n',
329        [
330            Expected.error(FrameStatus.FRAMING_ERROR),
331            Expected.error(FrameStatus.FCS_MISMATCH),
332            Expected(0, b'\0', b''),
333        ],
334        b'Hi~ this is a log message\r\nMore log messages!\r\n',
335    ),
336    case(
337        b'Hi~ this is a log message\r\n',
338        [Expected.error(FrameStatus.FRAMING_ERROR)],
339    ),
340    case(
341        b'~Hi~' + _encode(1, 2, b'def') + b' How are you?',
342        [Expected.error(FrameStatus.FRAMING_ERROR), Expected(1, b'\2', b'def')],
343        b'~Hi~ How are you?',
344    ),
345)
346# Formatting for the above tuple is very slow, so disable yapf. Manually enable
347# it as needed to format the test cases.
348
349_TESTS = TestGenerator(TEST_CASES)
350
351
352def _expected(frames: list[Frame]) -> Iterator[str]:
353    for i, frame in enumerate(frames, 1):
354        if frame.ok():
355            yield f'      Frame::Parse(kDecodedFrame{i:02}).value(),'
356        elif frame.status is FrameStatus.BAD_ADDRESS:
357            yield f'      Frame::Parse(kDecodedFrame{i:02}).status(),'
358        else:
359            yield f'      Status::DataLoss(),  // Frame {i}'
360
361
362_CPP_HEADER = """\
363#include "pw_hdlc/decoder.h"
364
365#include <array>
366#include <cstddef>
367#include <variant>
368
369#include "pw_unit_test/framework.h"
370#include "pw_bytes/array.h"
371
372namespace pw::hdlc {
373namespace {
374"""
375
376_CPP_FOOTER = """\
377}  // namespace
378}  // namespace pw::hdlc"""
379
380_TS_HEADER = """\
381import 'jasmine';
382
383import {Buffer} from 'buffer';
384
385import {Decoder, FrameStatus} from './decoder'
386import * as protocol from './protocol'
387import * as util from './util'
388
389class Expected {
390  address: number
391  control: Uint8Array
392  data: Uint8Array
393  status: FrameStatus
394
395  constructor(
396      address: number,
397      control: Uint8Array,
398      data: Uint8Array,
399      status: FrameStatus) {
400    this.address = address;
401    this.control = control;
402    this.data = data;
403    this.status = status;
404  }
405}
406
407class ExpectedRaw {
408  raw: Uint8Array
409  status: FrameStatus
410
411  constructor(raw: Uint8Array, status: FrameStatus) {
412    this.status = status;
413    this.raw = raw;
414  }
415}
416
417describe('Decoder', () => {
418  let decoder: Decoder;
419  let textEncoder: TextEncoder;
420
421  beforeEach(() => {
422    decoder = new Decoder();
423    textEncoder = new TextEncoder();
424  });
425
426"""
427_TS_FOOTER = """\
428});
429"""
430
431
432def _py_only_frame(frame: Frame) -> bool:
433    """Returns true for frames only returned by the Python library"""
434    return (
435        frame.status is FrameStatus.FRAMING_ERROR and frame.raw_encoded == b'~~'
436    )
437
438
439def _cpp_test(ctx: Context) -> Iterator[str]:
440    """Generates a C++ test for the provided test data."""
441    data, _, _ = ctx.test_case
442    frames = [
443        f for f in list(FrameDecoder().process(data)) if not _py_only_frame(f)
444    ]
445    data_bytes = ''.join(rf'\x{byte:02x}' for byte in data)
446
447    yield f'TEST(Decoder, {ctx.cc_name()}) {{'
448    yield f'  static constexpr auto kData = bytes::String("{data_bytes}");\n'
449
450    for i, frame in enumerate(frames, 1):
451        if frame.ok() or frame.status is FrameStatus.BAD_ADDRESS:
452            frame_bytes = ''.join(
453                rf'\x{byte:02x}' for byte in frame.raw_decoded
454            )
455            yield (
456                f'  static constexpr auto kDecodedFrame{i:02} = '
457                f'bytes::String("{frame_bytes}");'
458            )
459        else:
460            yield f'  // Frame {i}: {frame.status.value}'
461
462    yield ''
463
464    expected = '\n'.join(_expected(frames)) or '      // No frames'
465    decoder_size = max(len(data), 8)  # Make sure large enough for a frame
466
467    yield f"""\
468  DecoderBuffer<{decoder_size}> decoder;
469
470  static std::array<std::variant<Frame, Status>, {len(frames)}> kExpected = {{
471{expected}
472  }};
473
474  size_t decoded_frames = 0;
475
476  decoder.Process(kData, [&](const Result<Frame>& result) {{
477    ASSERT_LT(decoded_frames++, kExpected.size());
478    auto& expected = kExpected[decoded_frames - 1];
479
480    if (std::holds_alternative<Status>(expected)) {{
481      EXPECT_EQ(Status::DataLoss(), result.status());
482    }} else {{
483      ASSERT_EQ(OkStatus(), result.status());
484
485      const Frame& decoded_frame = result.value();
486      const Frame& expected_frame = std::get<Frame>(expected);
487      EXPECT_EQ(expected_frame.address(), decoded_frame.address());
488      EXPECT_EQ(expected_frame.control(), decoded_frame.control());
489      ASSERT_EQ(expected_frame.data().size(), decoded_frame.data().size());
490      EXPECT_EQ(std::memcmp(expected_frame.data().data(),
491                            decoded_frame.data().data(),
492                            expected_frame.data().size()),
493                0);
494    }}
495  }});
496
497  EXPECT_EQ(decoded_frames, kExpected.size());
498}}"""
499
500
501def _define_py_decoder_test(ctx: Context) -> PyTest:
502    data, expected_frames, _ = ctx.test_case
503
504    def test(self) -> None:
505        self.maxDiff = None
506        # Decode in one call
507        self.assertEqual(
508            expected_frames,
509            list(FrameDecoder().process(data)),
510            msg=f'{ctx.group}: {data!r}',
511        )
512        # Decode byte-by-byte
513        decoder = FrameDecoder()
514        decoded_frames: list[Frame] = []
515        for i in range(len(data)):
516            decoded_frames += decoder.process(data[i : i + 1])
517
518        self.assertEqual(
519            expected_frames,
520            decoded_frames,
521            msg=f'{ctx.group} (byte-by-byte): {data!r}',
522        )
523
524    return test
525
526
527def _define_raw_decoder_py_test(ctx: Context) -> PyTest:
528    raw_data, expected_frames, expected_non_frame_data = ctx.test_case
529
530    # The non-frame data decoder only yields valid frames.
531    expected_frames = [f for f in expected_frames if f.status is FrameStatus.OK]
532
533    def test(self) -> None:
534        self.maxDiff = None
535
536        non_frame_data = bytearray()
537
538        # Decode in one call
539        decoder = FrameAndNonFrameDecoder(
540            non_frame_data_handler=non_frame_data.extend
541        )
542
543        self.assertEqual(
544            expected_frames,
545            list(decoder.process(raw_data)),
546            msg=f'{ctx.group}: {raw_data!r}',
547        )
548
549        decoder.flush_non_frame_data()
550        self.assertEqual(expected_non_frame_data, bytes(non_frame_data))
551
552        # Decode byte-by-byte
553        non_frame_data.clear()
554        decoder = FrameAndNonFrameDecoder(
555            non_frame_data_handler=non_frame_data.extend
556        )
557        decoded_frames: list[Frame] = []
558        for i in range(len(raw_data)):
559            decoded_frames += decoder.process(raw_data[i : i + 1])
560
561        self.assertEqual(
562            expected_frames,
563            decoded_frames,
564            msg=f'{ctx.group} (byte-by-byte): {raw_data!r}',
565        )
566        decoder.flush_non_frame_data()
567        self.assertEqual(expected_non_frame_data, bytes(non_frame_data))
568
569    return test
570
571
572def _ts_byte_array(data: bytes) -> str:
573    return '[' + ', '.join(rf'0x{byte:02x}' for byte in data) + ']'
574
575
576def _ts_test(ctx: Context) -> Iterator[str]:
577    """Generates a TS test for the provided test data."""
578    data, _, _ = ctx.test_case
579    frames = [
580        f for f in list(FrameDecoder().process(data)) if not _py_only_frame(f)
581    ]
582    data_bytes = _ts_byte_array(data)
583
584    yield f'  it(\'{ctx.ts_name()}\', () => {{'
585    yield f'    const data = new Uint8Array({data_bytes});'
586
587    yield '    const expectedFrames = ['
588    for frame in frames:
589        control_bytes = _ts_byte_array(frame.control)
590        frame_bytes = _ts_byte_array(frame.data)
591
592        if frame is Expected:
593            yield (
594                f'      new Expected({frame.address}, '
595                f'new Uint8Array({control_bytes}), '
596                f'new Uint8Array({frame_bytes}), {frame.status}),'
597            )
598        else:
599            raw = _ts_byte_array(frame.raw_encoded)
600            yield (
601                f'      new ExpectedRaw(new Uint8Array({raw}), {frame.status}),'
602            )
603
604    yield '    ].values();\n'
605
606    yield """\
607    const result = decoder.process(data);
608
609    while (true) {
610      const expectedFrame = expectedFrames.next();
611      const actualFrame = result.next();
612      if (expectedFrame.done && actualFrame.done) {
613        break;
614      }
615      expect(expectedFrame.done).toBeFalse();
616      expect(actualFrame.done).toBeFalse();
617
618      const expected = expectedFrame.value;
619      const actual = actualFrame.value;
620      if (expected instanceof Expected) {
621        expect(actual.address).toEqual(expected.address);
622        expect(actual.control).toEqual(expected.control);
623        expect(actual.data).toEqual(expected.data);
624        expect(actual.status).toEqual(expected.status);
625      } else {
626        // Expected Raw
627        expect(actual.rawEncoded).toEqual(expected.raw);
628        expect(actual.status).toEqual(expected.status);
629      }
630    }
631  });
632"""
633
634
635# Class that tests all cases in TEST_CASES.
636DecoderTest = _TESTS.python_tests('DecoderTest', _define_py_decoder_test)
637NonFrameDecoderTest = _TESTS.python_tests(
638    'NonFrameDecoderTest', _define_raw_decoder_py_test
639)
640
641
642class AdditionalNonFrameDecoderTests(unittest.TestCase):
643    """Additional tests for the non-frame decoder."""
644
645    def test_shared_flags_waits_for_tilde_to_emit_data(self) -> None:
646        non_frame_data = bytearray()
647        decoder = FrameAndNonFrameDecoder(non_frame_data.extend)
648
649        self.assertEqual(
650            [Expected(0, b'\0', b'')], list(decoder.process(_encode(0, 0, b'')))
651        )
652        self.assertEqual(non_frame_data, b'')
653
654        self.assertEqual([], list(decoder.process(b'uh oh, no tilde!')))
655        self.assertEqual(non_frame_data, b'')
656
657        self.assertEqual([], list(decoder.process(b'~')))
658        self.assertEqual(non_frame_data, b'uh oh, no tilde!')
659
660    def test_no_shared_flags_immediately_emits_data(self) -> None:
661        non_frame_data = bytearray()
662        decoder = FrameAndNonFrameDecoder(
663            non_frame_data.extend, handle_shared_flags=False
664        )
665
666        self.assertEqual(
667            [Expected(0, b'\0', b'')], list(decoder.process(_encode(0, 0, b'')))
668        )
669        self.assertEqual(non_frame_data, b'')
670
671        self.assertEqual([], list(decoder.process(b'uh oh, no tilde!')))
672        self.assertEqual(non_frame_data, b'uh oh, no tilde!')
673
674    def test_emits_data_if_mtu_is_exceeded(self) -> None:
675        frame_start = b'~this looks like a real frame'
676
677        non_frame_data = bytearray()
678        decoder = FrameAndNonFrameDecoder(
679            non_frame_data.extend, mtu=len(frame_start)
680        )
681
682        self.assertEqual([], list(decoder.process(frame_start)))
683        self.assertEqual(non_frame_data, b'')
684
685        self.assertEqual([], list(decoder.process(b'!')))
686        self.assertEqual(non_frame_data, frame_start + b'!')
687
688    def test_emits_data_if_timeout_expires(self) -> None:
689        frame_start = b'~this looks like a real frame'
690
691        non_frame_data: queue.Queue[bytes] = queue.Queue()
692        decoder = FrameAndNonFrameDecoder(non_frame_data.put, timeout_s=0.001)
693
694        self.assertEqual([], list(decoder.process(frame_start)))
695        self.assertEqual(non_frame_data.get(timeout=2), frame_start)
696
697    def test_emits_raw_data_and_valid_frame_if_flushed_partway(self) -> None:
698        payload = b'Do you wanna ride in my blimp?'
699        frame = _encode(1, 2, payload)
700
701        non_frame_data = bytearray()
702        decoder = FrameAndNonFrameDecoder(non_frame_data.extend)
703
704        self.assertEqual([], list(decoder.process(frame[:5])))
705        decoder.flush_non_frame_data()
706
707        self.assertEqual(
708            [Expected(1, b'\2', payload)], list(decoder.process(frame[5:]))
709        )
710
711
712if __name__ == '__main__':
713    args = parse_test_generation_args()
714    if args.generate_cc_test:
715        _TESTS.cc_tests(
716            args.generate_cc_test, _cpp_test, _CPP_HEADER, _CPP_FOOTER
717        )
718    elif args.generate_ts_test:
719        _TESTS.ts_tests(args.generate_ts_test, _ts_test, _TS_HEADER, _TS_FOOTER)
720    else:
721        unittest.main()
722