1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18
19from bumble.hci import (
20    HCI_DISCONNECT_COMMAND,
21    HCI_LE_1M_PHY_BIT,
22    HCI_LE_CODED_PHY_BIT,
23    HCI_LE_READ_BUFFER_SIZE_COMMAND,
24    HCI_RESET_COMMAND,
25    HCI_SUCCESS,
26    HCI_LE_CONNECTION_COMPLETE_EVENT,
27    HCI_LE_ENHANCED_CONNECTION_COMPLETE_V2_EVENT,
28    Address,
29    CodingFormat,
30    CodecID,
31    HCI_Command,
32    HCI_Command_Complete_Event,
33    HCI_Command_Status_Event,
34    HCI_CustomPacket,
35    HCI_Disconnect_Command,
36    HCI_Event,
37    HCI_IsoDataPacket,
38    HCI_LE_Add_Device_To_Filter_Accept_List_Command,
39    HCI_LE_Advertising_Report_Event,
40    HCI_LE_Channel_Selection_Algorithm_Event,
41    HCI_LE_Connection_Complete_Event,
42    HCI_LE_Connection_Update_Command,
43    HCI_LE_Connection_Update_Complete_Event,
44    HCI_LE_Create_Connection_Command,
45    HCI_LE_Extended_Create_Connection_Command,
46    HCI_LE_Read_Buffer_Size_Command,
47    HCI_LE_Read_Remote_Features_Command,
48    HCI_LE_Read_Remote_Features_Complete_Event,
49    HCI_LE_Remove_Device_From_Filter_Accept_List_Command,
50    HCI_LE_Set_Advertising_Data_Command,
51    HCI_LE_Set_Advertising_Parameters_Command,
52    HCI_LE_Set_Default_PHY_Command,
53    HCI_LE_Set_Event_Mask_Command,
54    HCI_LE_Set_Extended_Advertising_Enable_Command,
55    HCI_LE_Set_Extended_Scan_Parameters_Command,
56    HCI_LE_Set_Random_Address_Command,
57    HCI_LE_Set_Scan_Enable_Command,
58    HCI_LE_Set_Scan_Parameters_Command,
59    HCI_LE_Setup_ISO_Data_Path_Command,
60    HCI_Number_Of_Completed_Packets_Event,
61    HCI_Packet,
62    HCI_PIN_Code_Request_Reply_Command,
63    HCI_Read_Local_Supported_Codecs_Command,
64    HCI_Read_Local_Supported_Codecs_V2_Command,
65    HCI_Read_Local_Supported_Commands_Command,
66    HCI_Read_Local_Supported_Features_Command,
67    HCI_Read_Local_Version_Information_Command,
68    HCI_Reset_Command,
69    HCI_Set_Event_Mask_Command,
70)
71
72
73# -----------------------------------------------------------------------------
74# pylint: disable=invalid-name
75
76
77def basic_check(x):
78    packet = x.to_bytes()
79    print(packet.hex())
80    parsed = HCI_Packet.from_bytes(packet)
81    x_str = str(x)
82    parsed_str = str(parsed)
83    print(x_str)
84    parsed_bytes = parsed.to_bytes()
85    assert x_str == parsed_str
86    assert packet == parsed_bytes
87
88
89# -----------------------------------------------------------------------------
90def test_HCI_Event():
91    event = HCI_Event(0xF9)
92    basic_check(event)
93
94    event = HCI_Event(0xF8, bytes.fromhex('AABBCC'))
95    basic_check(event)
96
97
98# -----------------------------------------------------------------------------
99def test_HCI_LE_Connection_Complete_Event():
100    address = Address('00:11:22:33:44:55')
101    event = HCI_LE_Connection_Complete_Event(
102        status=HCI_SUCCESS,
103        connection_handle=1,
104        role=1,
105        peer_address_type=1,
106        peer_address=address,
107        connection_interval=3,
108        peripheral_latency=4,
109        supervision_timeout=5,
110        central_clock_accuracy=6,
111    )
112    basic_check(event)
113
114
115# -----------------------------------------------------------------------------
116def test_HCI_LE_Advertising_Report_Event():
117    address = Address('00:11:22:33:44:55/P')
118    report = HCI_LE_Advertising_Report_Event.Report(
119        HCI_LE_Advertising_Report_Event.Report.FIELDS,
120        event_type=HCI_LE_Advertising_Report_Event.ADV_IND,
121        address_type=Address.PUBLIC_DEVICE_ADDRESS,
122        address=address,
123        data=bytes.fromhex(
124            '0201061106ba5689a6fabfa2bd01467d6e00fbabad08160a181604659b03'
125        ),
126        rssi=100,
127    )
128    event = HCI_LE_Advertising_Report_Event([report])
129    basic_check(event)
130
131
132# -----------------------------------------------------------------------------
133def test_HCI_LE_Read_Remote_Features_Complete_Event():
134    event = HCI_LE_Read_Remote_Features_Complete_Event(
135        status=HCI_SUCCESS,
136        connection_handle=0x007,
137        le_features=bytes.fromhex('0011223344556677'),
138    )
139    basic_check(event)
140
141
142# -----------------------------------------------------------------------------
143def test_HCI_LE_Connection_Update_Complete_Event():
144    event = HCI_LE_Connection_Update_Complete_Event(
145        status=HCI_SUCCESS,
146        connection_handle=0x007,
147        connection_interval=10,
148        peripheral_latency=3,
149        supervision_timeout=5,
150    )
151    basic_check(event)
152
153
154# -----------------------------------------------------------------------------
155def test_HCI_LE_Channel_Selection_Algorithm_Event():
156    event = HCI_LE_Channel_Selection_Algorithm_Event(
157        connection_handle=7, channel_selection_algorithm=1
158    )
159    basic_check(event)
160
161
162# -----------------------------------------------------------------------------
163def test_HCI_Command_Complete_Event():
164    # With a serializable object
165    event = HCI_Command_Complete_Event(
166        num_hci_command_packets=34,
167        command_opcode=HCI_LE_READ_BUFFER_SIZE_COMMAND,
168        return_parameters=HCI_LE_Read_Buffer_Size_Command.create_return_parameters(
169            status=0,
170            hc_le_acl_data_packet_length=1234,
171            hc_total_num_le_acl_data_packets=56,
172        ),
173    )
174    basic_check(event)
175
176    # With an arbitrary byte array
177    event = HCI_Command_Complete_Event(
178        num_hci_command_packets=1,
179        command_opcode=HCI_RESET_COMMAND,
180        return_parameters=bytes([1, 2, 3, 4]),
181    )
182    basic_check(event)
183
184    # With a simple status as a 1-byte array
185    event = HCI_Command_Complete_Event(
186        num_hci_command_packets=1,
187        command_opcode=HCI_RESET_COMMAND,
188        return_parameters=bytes([7]),
189    )
190    basic_check(event)
191    event = HCI_Packet.from_bytes(event.to_bytes())
192    assert event.return_parameters == 7
193
194    # With a simple status as an integer status
195    event = HCI_Command_Complete_Event(
196        num_hci_command_packets=1, command_opcode=HCI_RESET_COMMAND, return_parameters=9
197    )
198    basic_check(event)
199    assert event.return_parameters == 9
200
201
202# -----------------------------------------------------------------------------
203def test_HCI_Command_Status_Event():
204    event = HCI_Command_Status_Event(
205        status=0, num_hci_command_packets=37, command_opcode=HCI_DISCONNECT_COMMAND
206    )
207    basic_check(event)
208
209
210# -----------------------------------------------------------------------------
211def test_HCI_Number_Of_Completed_Packets_Event():
212    event = HCI_Number_Of_Completed_Packets_Event([(1, 2), (3, 4)])
213    basic_check(event)
214
215
216# -----------------------------------------------------------------------------
217def test_HCI_Command():
218    command = HCI_Command(0x5566)
219    basic_check(command)
220
221    command = HCI_Command(0x5566, bytes.fromhex('AABBCC'))
222    basic_check(command)
223
224
225# -----------------------------------------------------------------------------
226def test_HCI_PIN_Code_Request_Reply_Command():
227    pin_code = b'1234'
228    pin_code_length = len(pin_code)
229    # here to make the test pass, we need to
230    # pad pin_code, as HCI_Object.format_fields
231    # does not do it for us
232    padded_pin_code = pin_code + bytes(16 - pin_code_length)
233    command = HCI_PIN_Code_Request_Reply_Command(
234        bd_addr=Address(
235            '00:11:22:33:44:55', address_type=Address.PUBLIC_DEVICE_ADDRESS
236        ),
237        pin_code_length=pin_code_length,
238        pin_code=padded_pin_code,
239    )
240    basic_check(command)
241
242
243def test_HCI_Reset_Command():
244    command = HCI_Reset_Command()
245    basic_check(command)
246
247
248# -----------------------------------------------------------------------------
249def test_HCI_Read_Local_Version_Information_Command():
250    command = HCI_Read_Local_Version_Information_Command()
251    basic_check(command)
252
253
254# -----------------------------------------------------------------------------
255def test_HCI_Read_Local_Supported_Commands_Command():
256    command = HCI_Read_Local_Supported_Commands_Command()
257    basic_check(command)
258
259
260# -----------------------------------------------------------------------------
261def test_HCI_Read_Local_Supported_Features_Command():
262    command = HCI_Read_Local_Supported_Features_Command()
263    basic_check(command)
264
265
266# -----------------------------------------------------------------------------
267def test_HCI_Disconnect_Command():
268    command = HCI_Disconnect_Command(connection_handle=123, reason=0x11)
269    basic_check(command)
270
271
272# -----------------------------------------------------------------------------
273def test_HCI_Set_Event_Mask_Command():
274    command = HCI_Set_Event_Mask_Command(event_mask=bytes.fromhex('0011223344556677'))
275    basic_check(command)
276
277
278# -----------------------------------------------------------------------------
279def test_HCI_LE_Set_Event_Mask_Command():
280    command = HCI_LE_Set_Event_Mask_Command(
281        le_event_mask=HCI_LE_Set_Event_Mask_Command.mask(
282            [
283                HCI_LE_CONNECTION_COMPLETE_EVENT,
284                HCI_LE_ENHANCED_CONNECTION_COMPLETE_V2_EVENT,
285            ]
286        )
287    )
288    assert command.le_event_mask == bytes.fromhex('0100000000010000')
289    basic_check(command)
290
291
292# -----------------------------------------------------------------------------
293def test_HCI_LE_Set_Random_Address_Command():
294    command = HCI_LE_Set_Random_Address_Command(
295        random_address=Address('00:11:22:33:44:55')
296    )
297    basic_check(command)
298
299
300# -----------------------------------------------------------------------------
301def test_HCI_LE_Set_Advertising_Parameters_Command():
302    command = HCI_LE_Set_Advertising_Parameters_Command(
303        advertising_interval_min=20,
304        advertising_interval_max=30,
305        advertising_type=HCI_LE_Set_Advertising_Parameters_Command.ADV_NONCONN_IND,
306        own_address_type=Address.PUBLIC_DEVICE_ADDRESS,
307        peer_address_type=Address.RANDOM_DEVICE_ADDRESS,
308        peer_address=Address('00:11:22:33:44:55'),
309        advertising_channel_map=0x03,
310        advertising_filter_policy=1,
311    )
312    basic_check(command)
313
314
315# -----------------------------------------------------------------------------
316def test_HCI_LE_Set_Advertising_Data_Command():
317    command = HCI_LE_Set_Advertising_Data_Command(
318        advertising_data=bytes.fromhex('AABBCC')
319    )
320    basic_check(command)
321
322
323# -----------------------------------------------------------------------------
324def test_HCI_LE_Set_Scan_Parameters_Command():
325    command = HCI_LE_Set_Scan_Parameters_Command(
326        le_scan_type=1,
327        le_scan_interval=20,
328        le_scan_window=10,
329        own_address_type=1,
330        scanning_filter_policy=0,
331    )
332    basic_check(command)
333
334
335# -----------------------------------------------------------------------------
336def test_HCI_LE_Set_Scan_Enable_Command():
337    command = HCI_LE_Set_Scan_Enable_Command(le_scan_enable=1, filter_duplicates=0)
338    basic_check(command)
339
340
341# -----------------------------------------------------------------------------
342def test_HCI_LE_Create_Connection_Command():
343    command = HCI_LE_Create_Connection_Command(
344        le_scan_interval=4,
345        le_scan_window=5,
346        initiator_filter_policy=1,
347        peer_address_type=1,
348        peer_address=Address('00:11:22:33:44:55'),
349        own_address_type=2,
350        connection_interval_min=7,
351        connection_interval_max=8,
352        max_latency=9,
353        supervision_timeout=10,
354        min_ce_length=11,
355        max_ce_length=12,
356    )
357    basic_check(command)
358
359
360# -----------------------------------------------------------------------------
361def test_HCI_LE_Extended_Create_Connection_Command():
362    command = HCI_LE_Extended_Create_Connection_Command(
363        initiator_filter_policy=0,
364        own_address_type=0,
365        peer_address_type=1,
366        peer_address=Address('00:11:22:33:44:55'),
367        initiating_phys=3,
368        scan_intervals=(10, 11),
369        scan_windows=(12, 13),
370        connection_interval_mins=(14, 15),
371        connection_interval_maxs=(16, 17),
372        max_latencies=(18, 19),
373        supervision_timeouts=(20, 21),
374        min_ce_lengths=(100, 101),
375        max_ce_lengths=(102, 103),
376    )
377    basic_check(command)
378
379
380# -----------------------------------------------------------------------------
381def test_HCI_LE_Add_Device_To_Filter_Accept_List_Command():
382    command = HCI_LE_Add_Device_To_Filter_Accept_List_Command(
383        address_type=1, address=Address('00:11:22:33:44:55')
384    )
385    basic_check(command)
386
387
388# -----------------------------------------------------------------------------
389def test_HCI_LE_Remove_Device_From_Filter_Accept_List_Command():
390    command = HCI_LE_Remove_Device_From_Filter_Accept_List_Command(
391        address_type=1, address=Address('00:11:22:33:44:55')
392    )
393    basic_check(command)
394
395
396# -----------------------------------------------------------------------------
397def test_HCI_LE_Connection_Update_Command():
398    command = HCI_LE_Connection_Update_Command(
399        connection_handle=0x0002,
400        connection_interval_min=10,
401        connection_interval_max=20,
402        max_latency=7,
403        supervision_timeout=3,
404        min_ce_length=100,
405        max_ce_length=200,
406    )
407    basic_check(command)
408
409
410# -----------------------------------------------------------------------------
411def test_HCI_LE_Read_Remote_Features_Command():
412    command = HCI_LE_Read_Remote_Features_Command(connection_handle=0x0002)
413    basic_check(command)
414
415
416# -----------------------------------------------------------------------------
417def test_HCI_LE_Set_Default_PHY_Command():
418    command = HCI_LE_Set_Default_PHY_Command(all_phys=0, tx_phys=1, rx_phys=1)
419    basic_check(command)
420
421
422# -----------------------------------------------------------------------------
423def test_HCI_LE_Set_Extended_Scan_Parameters_Command():
424    command = HCI_LE_Set_Extended_Scan_Parameters_Command(
425        own_address_type=Address.RANDOM_DEVICE_ADDRESS,
426        # pylint: disable-next=line-too-long
427        scanning_filter_policy=HCI_LE_Set_Extended_Scan_Parameters_Command.BASIC_FILTERED_POLICY,
428        scanning_phys=(1 << HCI_LE_1M_PHY_BIT | 1 << HCI_LE_CODED_PHY_BIT | 1 << 4),
429        scan_types=[
430            HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING,
431            HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING,
432            HCI_LE_Set_Extended_Scan_Parameters_Command.PASSIVE_SCANNING,
433        ],
434        scan_intervals=[1, 2, 3],
435        scan_windows=[4, 5, 6],
436    )
437    basic_check(command)
438
439
440# -----------------------------------------------------------------------------
441def test_HCI_LE_Set_Extended_Advertising_Enable_Command():
442    command = HCI_Packet.from_bytes(
443        bytes.fromhex('0139200e010301050008020600090307000a')
444    )
445    assert command.enable == 1
446    assert command.advertising_handles == [1, 2, 3]
447    assert command.durations == [5, 6, 7]
448    assert command.max_extended_advertising_events == [8, 9, 10]
449
450    command = HCI_LE_Set_Extended_Advertising_Enable_Command(
451        enable=1,
452        advertising_handles=[1, 2, 3],
453        durations=[5, 6, 7],
454        max_extended_advertising_events=[8, 9, 10],
455    )
456    basic_check(command)
457
458
459# -----------------------------------------------------------------------------
460def test_HCI_LE_Setup_ISO_Data_Path_Command():
461    command = HCI_Packet.from_bytes(bytes.fromhex('016e200d60000001030000000000000000'))
462
463    assert command.connection_handle == 0x0060
464    assert command.data_path_direction == 0x00
465    assert command.data_path_id == 0x01
466    assert command.codec_id == CodingFormat(CodecID.TRANSPARENT)
467    assert command.controller_delay == 0
468    assert command.codec_configuration == b''
469
470    command = HCI_LE_Setup_ISO_Data_Path_Command(
471        connection_handle=0x0060,
472        data_path_direction=0x00,
473        data_path_id=0x01,
474        codec_id=CodingFormat(CodecID.TRANSPARENT),
475        controller_delay=0x00,
476        codec_configuration=b'',
477    )
478    basic_check(command)
479
480
481# -----------------------------------------------------------------------------
482def test_HCI_Read_Local_Supported_Codecs_Command_Complete():
483    returned_parameters = (
484        HCI_Read_Local_Supported_Codecs_Command.parse_return_parameters(
485            bytes([HCI_SUCCESS, 3, CodecID.A_LOG, CodecID.CVSD, CodecID.LINEAR_PCM, 0])
486        )
487    )
488    assert returned_parameters.standard_codec_ids == [
489        CodecID.A_LOG,
490        CodecID.CVSD,
491        CodecID.LINEAR_PCM,
492    ]
493
494
495# -----------------------------------------------------------------------------
496def test_HCI_Read_Local_Supported_Codecs_V2_Command_Complete():
497    returned_parameters = (
498        HCI_Read_Local_Supported_Codecs_V2_Command.parse_return_parameters(
499            bytes(
500                [
501                    HCI_SUCCESS,
502                    3,
503                    CodecID.A_LOG,
504                    HCI_Read_Local_Supported_Codecs_V2_Command.Transport.BR_EDR_ACL,
505                    CodecID.CVSD,
506                    HCI_Read_Local_Supported_Codecs_V2_Command.Transport.BR_EDR_SCO,
507                    CodecID.LINEAR_PCM,
508                    HCI_Read_Local_Supported_Codecs_V2_Command.Transport.LE_CIS,
509                    0,
510                ]
511            )
512        )
513    )
514    assert returned_parameters.standard_codec_ids == [
515        CodecID.A_LOG,
516        CodecID.CVSD,
517        CodecID.LINEAR_PCM,
518    ]
519    assert returned_parameters.standard_codec_transports == [
520        HCI_Read_Local_Supported_Codecs_V2_Command.Transport.BR_EDR_ACL,
521        HCI_Read_Local_Supported_Codecs_V2_Command.Transport.BR_EDR_SCO,
522        HCI_Read_Local_Supported_Codecs_V2_Command.Transport.LE_CIS,
523    ]
524
525
526# -----------------------------------------------------------------------------
527def test_address():
528    a = Address('C4:F2:17:1A:1D:BB')
529    assert not a.is_public
530    assert a.is_random
531    assert a.address_type == Address.RANDOM_DEVICE_ADDRESS
532    assert not a.is_resolvable
533    assert not a.is_resolved
534    assert a.is_static
535
536
537# -----------------------------------------------------------------------------
538def test_custom():
539    data = bytes([0x77, 0x02, 0x01, 0x03])
540    packet = HCI_CustomPacket(data)
541    assert packet.hci_packet_type == 0x77
542    assert packet.payload == data
543
544
545# -----------------------------------------------------------------------------
546def test_iso_data_packet():
547    data = bytes.fromhex(
548        '05616044002ac9f0a193003c00e83b477b00eba8d41dc018bf1a980f0290afe1e7c37652096697'
549        '52b6a535a8df61e22931ef5a36281bc77ed6a3206d984bcdabee6be831c699cb50e2'
550    )
551    packet = HCI_IsoDataPacket.from_bytes(data)
552    assert packet.connection_handle == 0x0061
553    assert packet.packet_status_flag == 0
554    assert packet.pb_flag == 0x02
555    assert packet.ts_flag == 0x01
556    assert packet.data_total_length == 68
557    assert packet.time_stamp == 2716911914
558    assert packet.packet_sequence_number == 147
559    assert packet.iso_sdu_length == 60
560    assert packet.iso_sdu_fragment == bytes.fromhex(
561        'e83b477b00eba8d41dc018bf1a980f0290afe1e7c3765209669752b6a535a8df61e22931ef5a3'
562        '6281bc77ed6a3206d984bcdabee6be831c699cb50e2'
563    )
564
565    assert packet.to_bytes() == data
566
567
568# -----------------------------------------------------------------------------
569def run_test_events():
570    test_HCI_Event()
571    test_HCI_LE_Connection_Complete_Event()
572    test_HCI_LE_Advertising_Report_Event()
573    test_HCI_LE_Connection_Update_Complete_Event()
574    test_HCI_LE_Read_Remote_Features_Complete_Event()
575    test_HCI_LE_Channel_Selection_Algorithm_Event()
576    test_HCI_Command_Complete_Event()
577    test_HCI_Command_Status_Event()
578    test_HCI_Number_Of_Completed_Packets_Event()
579
580
581# -----------------------------------------------------------------------------
582def run_test_commands():
583    test_HCI_Command()
584    test_HCI_Reset_Command()
585    test_HCI_PIN_Code_Request_Reply_Command()
586    test_HCI_Read_Local_Version_Information_Command()
587    test_HCI_Read_Local_Supported_Commands_Command()
588    test_HCI_Read_Local_Supported_Features_Command()
589    test_HCI_Disconnect_Command()
590    test_HCI_Set_Event_Mask_Command()
591    test_HCI_LE_Set_Event_Mask_Command()
592    test_HCI_LE_Set_Random_Address_Command()
593    test_HCI_LE_Set_Advertising_Parameters_Command()
594    test_HCI_LE_Set_Advertising_Data_Command()
595    test_HCI_LE_Set_Scan_Parameters_Command()
596    test_HCI_LE_Set_Scan_Enable_Command()
597    test_HCI_LE_Create_Connection_Command()
598    test_HCI_LE_Extended_Create_Connection_Command()
599    test_HCI_LE_Add_Device_To_Filter_Accept_List_Command()
600    test_HCI_LE_Remove_Device_From_Filter_Accept_List_Command()
601    test_HCI_LE_Connection_Update_Command()
602    test_HCI_LE_Read_Remote_Features_Command()
603    test_HCI_LE_Set_Default_PHY_Command()
604    test_HCI_LE_Set_Extended_Scan_Parameters_Command()
605    test_HCI_LE_Set_Extended_Advertising_Enable_Command()
606    test_HCI_LE_Setup_ISO_Data_Path_Command()
607
608
609# -----------------------------------------------------------------------------
610if __name__ == '__main__':
611    run_test_events()
612    run_test_commands()
613    test_address()
614    test_custom()
615    test_iso_data_packet()
616