1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import random 19import os 20from bumble.transport.common import PacketParser 21 22 23# ----------------------------------------------------------------------------- 24class Sink: 25 def __init__(self): 26 self.packets = [] 27 28 def on_packet(self, packet): 29 self.packets.append(packet) 30 31 32# ----------------------------------------------------------------------------- 33def test_parser(): 34 sink1 = Sink() 35 parser1 = PacketParser(sink1) 36 sink2 = Sink() 37 parser2 = PacketParser(sink2) 38 39 for parser in [parser1, parser2]: 40 with open( 41 os.path.join(os.path.dirname(__file__), 'hci_data_001.bin'), 'rb' 42 ) as input: 43 while True: 44 n = random.randint(1, 9) 45 data = input.read(n) 46 if not data: 47 break 48 parser.feed_data(data) 49 50 assert sink1.packets == sink2.packets 51 52 53# ----------------------------------------------------------------------------- 54def test_parser_extensions(): 55 sink = Sink() 56 parser = PacketParser(sink) 57 58 # Check that an exception is thrown for an unknown type 59 try: 60 parser.feed_data(bytes([0x77, 0x00, 0x02, 0x01, 0x02])) 61 exception_thrown = False 62 except ValueError: 63 exception_thrown = True 64 65 assert exception_thrown 66 67 # Now add a custom info 68 parser.extended_packet_info[0x77] = (1, 1, 'B') 69 parser.reset() 70 parser.feed_data(bytes([0x77, 0x00, 0x02, 0x01, 0x02])) 71 assert len(sink.packets) == 1 72 73 74# ----------------------------------------------------------------------------- 75if __name__ == '__main__': 76 test_parser() 77 test_parser_extensions() 78