1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import random
19import os
20from bumble.transport.common import PacketParser
21
22
23# -----------------------------------------------------------------------------
24class Sink:
25    def __init__(self):
26        self.packets = []
27
28    def on_packet(self, packet):
29        self.packets.append(packet)
30
31
32# -----------------------------------------------------------------------------
33def test_parser():
34    sink1 = Sink()
35    parser1 = PacketParser(sink1)
36    sink2 = Sink()
37    parser2 = PacketParser(sink2)
38
39    for parser in [parser1, parser2]:
40        with open(
41            os.path.join(os.path.dirname(__file__), 'hci_data_001.bin'), 'rb'
42        ) as input:
43            while True:
44                n = random.randint(1, 9)
45                data = input.read(n)
46                if not data:
47                    break
48                parser.feed_data(data)
49
50    assert sink1.packets == sink2.packets
51
52
53# -----------------------------------------------------------------------------
54def test_parser_extensions():
55    sink = Sink()
56    parser = PacketParser(sink)
57
58    # Check that an exception is thrown for an unknown type
59    try:
60        parser.feed_data(bytes([0x77, 0x00, 0x02, 0x01, 0x02]))
61        exception_thrown = False
62    except ValueError:
63        exception_thrown = True
64
65    assert exception_thrown
66
67    # Now add a custom info
68    parser.extended_packet_info[0x77] = (1, 1, 'B')
69    parser.reset()
70    parser.feed_data(bytes([0x77, 0x00, 0x02, 0x01, 0x02]))
71    assert len(sink.packets) == 1
72
73
74# -----------------------------------------------------------------------------
75if __name__ == '__main__':
76    test_parser()
77    test_parser_extensions()
78