1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18 19from bumble.hci import ( 20 HCI_DISCONNECT_COMMAND, 21 HCI_LE_1M_PHY_BIT, 22 HCI_LE_CODED_PHY_BIT, 23 HCI_LE_READ_BUFFER_SIZE_COMMAND, 24 HCI_RESET_COMMAND, 25 HCI_SUCCESS, 26 HCI_LE_CONNECTION_COMPLETE_EVENT, 27 HCI_LE_ENHANCED_CONNECTION_COMPLETE_V2_EVENT, 28 Address, 29 CodingFormat, 30 CodecID, 31 HCI_Command, 32 HCI_Command_Complete_Event, 33 HCI_Command_Status_Event, 34 HCI_CustomPacket, 35 HCI_Disconnect_Command, 36 HCI_Event, 37 HCI_IsoDataPacket, 38 HCI_LE_Add_Device_To_Filter_Accept_List_Command, 39 HCI_LE_Advertising_Report_Event, 40 HCI_LE_Channel_Selection_Algorithm_Event, 41 HCI_LE_Connection_Complete_Event, 42 HCI_LE_Connection_Update_Command, 43 HCI_LE_Connection_Update_Complete_Event, 44 HCI_LE_Create_Connection_Command, 45 HCI_LE_Extended_Create_Connection_Command, 46 HCI_LE_Read_Buffer_Size_Command, 47 HCI_LE_Read_Remote_Features_Command, 48 HCI_LE_Read_Remote_Features_Complete_Event, 49 HCI_LE_Remove_Device_From_Filter_Accept_List_Command, 50 HCI_LE_Set_Advertising_Data_Command, 51 HCI_LE_Set_Advertising_Parameters_Command, 52 HCI_LE_Set_Default_PHY_Command, 53 HCI_LE_Set_Event_Mask_Command, 54 HCI_LE_Set_Extended_Advertising_Enable_Command, 55 HCI_LE_Set_Extended_Scan_Parameters_Command, 56 HCI_LE_Set_Random_Address_Command, 57 HCI_LE_Set_Scan_Enable_Command, 58 HCI_LE_Set_Scan_Parameters_Command, 59 HCI_LE_Setup_ISO_Data_Path_Command, 60 HCI_Number_Of_Completed_Packets_Event, 61 HCI_Packet, 62 HCI_PIN_Code_Request_Reply_Command, 63 HCI_Read_Local_Supported_Codecs_Command, 64 HCI_Read_Local_Supported_Codecs_V2_Command, 65 HCI_Read_Local_Supported_Commands_Command, 66 HCI_Read_Local_Supported_Features_Command, 67 HCI_Read_Local_Version_Information_Command, 68 HCI_Reset_Command, 69 HCI_Set_Event_Mask_Command, 70) 71 72 73# ----------------------------------------------------------------------------- 74# pylint: disable=invalid-name 75 76 77def basic_check(x): 78 packet = x.to_bytes() 79 print(packet.hex()) 80 parsed = HCI_Packet.from_bytes(packet) 81 x_str = str(x) 82 parsed_str = str(parsed) 83 print(x_str) 84 parsed_bytes = parsed.to_bytes() 85 assert x_str == parsed_str 86 assert packet == parsed_bytes 87 88 89# ----------------------------------------------------------------------------- 90def test_HCI_Event(): 91 event = HCI_Event(0xF9) 92 basic_check(event) 93 94 event = HCI_Event(0xF8, bytes.fromhex('AABBCC')) 95 basic_check(event) 96 97 98# ----------------------------------------------------------------------------- 99def test_HCI_LE_Connection_Complete_Event(): 100 address = Address('00:11:22:33:44:55') 101 event = HCI_LE_Connection_Complete_Event( 102 status=HCI_SUCCESS, 103 connection_handle=1, 104 role=1, 105 peer_address_type=1, 106 peer_address=address, 107 connection_interval=3, 108 peripheral_latency=4, 109 supervision_timeout=5, 110 central_clock_accuracy=6, 111 ) 112 basic_check(event) 113 114 115# ----------------------------------------------------------------------------- 116def test_HCI_LE_Advertising_Report_Event(): 117 address = Address('00:11:22:33:44:55/P') 118 report = HCI_LE_Advertising_Report_Event.Report( 119 HCI_LE_Advertising_Report_Event.Report.FIELDS, 120 event_type=HCI_LE_Advertising_Report_Event.ADV_IND, 121 address_type=Address.PUBLIC_DEVICE_ADDRESS, 122 address=address, 123 data=bytes.fromhex( 124 '0201061106ba5689a6fabfa2bd01467d6e00fbabad08160a181604659b03' 125 ), 126 rssi=100, 127 ) 128 event = HCI_LE_Advertising_Report_Event([report]) 129 basic_check(event) 130 131 132# ----------------------------------------------------------------------------- 133def test_HCI_LE_Read_Remote_Features_Complete_Event(): 134 event = HCI_LE_Read_Remote_Features_Complete_Event( 135 status=HCI_SUCCESS, 136 connection_handle=0x007, 137 le_features=bytes.fromhex('0011223344556677'), 138 ) 139 basic_check(event) 140 141 142# ----------------------------------------------------------------------------- 143def test_HCI_LE_Connection_Update_Complete_Event(): 144 event = HCI_LE_Connection_Update_Complete_Event( 145 status=HCI_SUCCESS, 146 connection_handle=0x007, 147 connection_interval=10, 148 peripheral_latency=3, 149 supervision_timeout=5, 150 ) 151 basic_check(event) 152 153 154# ----------------------------------------------------------------------------- 155def test_HCI_LE_Channel_Selection_Algorithm_Event(): 156 event = HCI_LE_Channel_Selection_Algorithm_Event( 157 connection_handle=7, channel_selection_algorithm=1 158 ) 159 basic_check(event) 160 161 162# ----------------------------------------------------------------------------- 163def test_HCI_Command_Complete_Event(): 164 # With a serializable object 165 event = HCI_Command_Complete_Event( 166 num_hci_command_packets=34, 167 command_opcode=HCI_LE_READ_BUFFER_SIZE_COMMAND, 168 return_parameters=HCI_LE_Read_Buffer_Size_Command.create_return_parameters( 169 status=0, 170 hc_le_acl_data_packet_length=1234, 171 hc_total_num_le_acl_data_packets=56, 172 ), 173 ) 174 basic_check(event) 175 176 # With an arbitrary byte array 177 event = HCI_Command_Complete_Event( 178 num_hci_command_packets=1, 179 command_opcode=HCI_RESET_COMMAND, 180 return_parameters=bytes([1, 2, 3, 4]), 181 ) 182 basic_check(event) 183 184 # With a simple status as a 1-byte array 185 event = HCI_Command_Complete_Event( 186 num_hci_command_packets=1, 187 command_opcode=HCI_RESET_COMMAND, 188 return_parameters=bytes([7]), 189 ) 190 basic_check(event) 191 event = HCI_Packet.from_bytes(event.to_bytes()) 192 assert event.return_parameters == 7 193 194 # With a simple status as an integer status 195 event = HCI_Command_Complete_Event( 196 num_hci_command_packets=1, command_opcode=HCI_RESET_COMMAND, return_parameters=9 197 ) 198 basic_check(event) 199 assert event.return_parameters == 9 200 201 202# ----------------------------------------------------------------------------- 203def test_HCI_Command_Status_Event(): 204 event = HCI_Command_Status_Event( 205 status=0, num_hci_command_packets=37, command_opcode=HCI_DISCONNECT_COMMAND 206 ) 207 basic_check(event) 208 209 210# ----------------------------------------------------------------------------- 211def test_HCI_Number_Of_Completed_Packets_Event(): 212 event = HCI_Number_Of_Completed_Packets_Event([(1, 2), (3, 4)]) 213 basic_check(event) 214 215 216# ----------------------------------------------------------------------------- 217def test_HCI_Command(): 218 command = HCI_Command(0x5566) 219 basic_check(command) 220 221 command = HCI_Command(0x5566, bytes.fromhex('AABBCC')) 222 basic_check(command) 223 224 225# ----------------------------------------------------------------------------- 226def test_HCI_PIN_Code_Request_Reply_Command(): 227 pin_code = b'1234' 228 pin_code_length = len(pin_code) 229 # here to make the test pass, we need to 230 # pad pin_code, as HCI_Object.format_fields 231 # does not do it for us 232 padded_pin_code = pin_code + bytes(16 - pin_code_length) 233 command = HCI_PIN_Code_Request_Reply_Command( 234 bd_addr=Address( 235 '00:11:22:33:44:55', address_type=Address.PUBLIC_DEVICE_ADDRESS 236 ), 237 pin_code_length=pin_code_length, 238 pin_code=padded_pin_code, 239 ) 240 basic_check(command) 241 242 243def test_HCI_Reset_Command(): 244 command = HCI_Reset_Command() 245 basic_check(command) 246 247 248# ----------------------------------------------------------------------------- 249def test_HCI_Read_Local_Version_Information_Command(): 250 command = HCI_Read_Local_Version_Information_Command() 251 basic_check(command) 252 253 254# ----------------------------------------------------------------------------- 255def test_HCI_Read_Local_Supported_Commands_Command(): 256 command = HCI_Read_Local_Supported_Commands_Command() 257 basic_check(command) 258 259 260# ----------------------------------------------------------------------------- 261def test_HCI_Read_Local_Supported_Features_Command(): 262 command = HCI_Read_Local_Supported_Features_Command() 263 basic_check(command) 264 265 266# ----------------------------------------------------------------------------- 267def test_HCI_Disconnect_Command(): 268 command = HCI_Disconnect_Command(connection_handle=123, reason=0x11) 269 basic_check(command) 270 271 272# ----------------------------------------------------------------------------- 273def test_HCI_Set_Event_Mask_Command(): 274 command = HCI_Set_Event_Mask_Command(event_mask=bytes.fromhex('0011223344556677')) 275 basic_check(command) 276 277 278# ----------------------------------------------------------------------------- 279def test_HCI_LE_Set_Event_Mask_Command(): 280 command = HCI_LE_Set_Event_Mask_Command( 281 le_event_mask=HCI_LE_Set_Event_Mask_Command.mask( 282 [ 283 HCI_LE_CONNECTION_COMPLETE_EVENT, 284 HCI_LE_ENHANCED_CONNECTION_COMPLETE_V2_EVENT, 285 ] 286 ) 287 ) 288 assert command.le_event_mask == bytes.fromhex('0100000000010000') 289 basic_check(command) 290 291 292# ----------------------------------------------------------------------------- 293def test_HCI_LE_Set_Random_Address_Command(): 294 command = HCI_LE_Set_Random_Address_Command( 295 random_address=Address('00:11:22:33:44:55') 296 ) 297 basic_check(command) 298 299 300# ----------------------------------------------------------------------------- 301def test_HCI_LE_Set_Advertising_Parameters_Command(): 302 command = HCI_LE_Set_Advertising_Parameters_Command( 303 advertising_interval_min=20, 304 advertising_interval_max=30, 305 advertising_type=HCI_LE_Set_Advertising_Parameters_Command.ADV_NONCONN_IND, 306 own_address_type=Address.PUBLIC_DEVICE_ADDRESS, 307 peer_address_type=Address.RANDOM_DEVICE_ADDRESS, 308 peer_address=Address('00:11:22:33:44:55'), 309 advertising_channel_map=0x03, 310 advertising_filter_policy=1, 311 ) 312 basic_check(command) 313 314 315# ----------------------------------------------------------------------------- 316def test_HCI_LE_Set_Advertising_Data_Command(): 317 command = HCI_LE_Set_Advertising_Data_Command( 318 advertising_data=bytes.fromhex('AABBCC') 319 ) 320 basic_check(command) 321 322 323# ----------------------------------------------------------------------------- 324def test_HCI_LE_Set_Scan_Parameters_Command(): 325 command = HCI_LE_Set_Scan_Parameters_Command( 326 le_scan_type=1, 327 le_scan_interval=20, 328 le_scan_window=10, 329 own_address_type=1, 330 scanning_filter_policy=0, 331 ) 332 basic_check(command) 333 334 335# ----------------------------------------------------------------------------- 336def test_HCI_LE_Set_Scan_Enable_Command(): 337 command = HCI_LE_Set_Scan_Enable_Command(le_scan_enable=1, filter_duplicates=0) 338 basic_check(command) 339 340 341# ----------------------------------------------------------------------------- 342def test_HCI_LE_Create_Connection_Command(): 343 command = HCI_LE_Create_Connection_Command( 344 le_scan_interval=4, 345 le_scan_window=5, 346 initiator_filter_policy=1, 347 peer_address_type=1, 348 peer_address=Address('00:11:22:33:44:55'), 349 own_address_type=2, 350 connection_interval_min=7, 351 connection_interval_max=8, 352 max_latency=9, 353 supervision_timeout=10, 354 min_ce_length=11, 355 max_ce_length=12, 356 ) 357 basic_check(command) 358 359 360# ----------------------------------------------------------------------------- 361def test_HCI_LE_Extended_Create_Connection_Command(): 362 command = HCI_LE_Extended_Create_Connection_Command( 363 initiator_filter_policy=0, 364 own_address_type=0, 365 peer_address_type=1, 366 peer_address=Address('00:11:22:33:44:55'), 367 initiating_phys=3, 368 scan_intervals=(10, 11), 369 scan_windows=(12, 13), 370 connection_interval_mins=(14, 15), 371 connection_interval_maxs=(16, 17), 372 max_latencies=(18, 19), 373 supervision_timeouts=(20, 21), 374 min_ce_lengths=(100, 101), 375 max_ce_lengths=(102, 103), 376 ) 377 basic_check(command) 378 379 380# ----------------------------------------------------------------------------- 381def test_HCI_LE_Add_Device_To_Filter_Accept_List_Command(): 382 command = HCI_LE_Add_Device_To_Filter_Accept_List_Command( 383 address_type=1, address=Address('00:11:22:33:44:55') 384 ) 385 basic_check(command) 386 387 388# ----------------------------------------------------------------------------- 389def test_HCI_LE_Remove_Device_From_Filter_Accept_List_Command(): 390 command = HCI_LE_Remove_Device_From_Filter_Accept_List_Command( 391 address_type=1, address=Address('00:11:22:33:44:55') 392 ) 393 basic_check(command) 394 395 396# ----------------------------------------------------------------------------- 397def test_HCI_LE_Connection_Update_Command(): 398 command = HCI_LE_Connection_Update_Command( 399 connection_handle=0x0002, 400 connection_interval_min=10, 401 connection_interval_max=20, 402 max_latency=7, 403 supervision_timeout=3, 404 min_ce_length=100, 405 max_ce_length=200, 406 ) 407 basic_check(command) 408 409 410# ----------------------------------------------------------------------------- 411def test_HCI_LE_Read_Remote_Features_Command(): 412 command = HCI_LE_Read_Remote_Features_Command(connection_handle=0x0002) 413 basic_check(command) 414 415 416# ----------------------------------------------------------------------------- 417def test_HCI_LE_Set_Default_PHY_Command(): 418 command = HCI_LE_Set_Default_PHY_Command(all_phys=0, tx_phys=1, rx_phys=1) 419 basic_check(command) 420 421 422# ----------------------------------------------------------------------------- 423def test_HCI_LE_Set_Extended_Scan_Parameters_Command(): 424 command = HCI_LE_Set_Extended_Scan_Parameters_Command( 425 own_address_type=Address.RANDOM_DEVICE_ADDRESS, 426 # pylint: disable-next=line-too-long 427 scanning_filter_policy=HCI_LE_Set_Extended_Scan_Parameters_Command.BASIC_FILTERED_POLICY, 428 scanning_phys=(1 << HCI_LE_1M_PHY_BIT | 1 << HCI_LE_CODED_PHY_BIT | 1 << 4), 429 scan_types=[ 430 HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING, 431 HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING, 432 HCI_LE_Set_Extended_Scan_Parameters_Command.PASSIVE_SCANNING, 433 ], 434 scan_intervals=[1, 2, 3], 435 scan_windows=[4, 5, 6], 436 ) 437 basic_check(command) 438 439 440# ----------------------------------------------------------------------------- 441def test_HCI_LE_Set_Extended_Advertising_Enable_Command(): 442 command = HCI_Packet.from_bytes( 443 bytes.fromhex('0139200e010301050008020600090307000a') 444 ) 445 assert command.enable == 1 446 assert command.advertising_handles == [1, 2, 3] 447 assert command.durations == [5, 6, 7] 448 assert command.max_extended_advertising_events == [8, 9, 10] 449 450 command = HCI_LE_Set_Extended_Advertising_Enable_Command( 451 enable=1, 452 advertising_handles=[1, 2, 3], 453 durations=[5, 6, 7], 454 max_extended_advertising_events=[8, 9, 10], 455 ) 456 basic_check(command) 457 458 459# ----------------------------------------------------------------------------- 460def test_HCI_LE_Setup_ISO_Data_Path_Command(): 461 command = HCI_Packet.from_bytes(bytes.fromhex('016e200d60000001030000000000000000')) 462 463 assert command.connection_handle == 0x0060 464 assert command.data_path_direction == 0x00 465 assert command.data_path_id == 0x01 466 assert command.codec_id == CodingFormat(CodecID.TRANSPARENT) 467 assert command.controller_delay == 0 468 assert command.codec_configuration == b'' 469 470 command = HCI_LE_Setup_ISO_Data_Path_Command( 471 connection_handle=0x0060, 472 data_path_direction=0x00, 473 data_path_id=0x01, 474 codec_id=CodingFormat(CodecID.TRANSPARENT), 475 controller_delay=0x00, 476 codec_configuration=b'', 477 ) 478 basic_check(command) 479 480 481# ----------------------------------------------------------------------------- 482def test_HCI_Read_Local_Supported_Codecs_Command_Complete(): 483 returned_parameters = ( 484 HCI_Read_Local_Supported_Codecs_Command.parse_return_parameters( 485 bytes([HCI_SUCCESS, 3, CodecID.A_LOG, CodecID.CVSD, CodecID.LINEAR_PCM, 0]) 486 ) 487 ) 488 assert returned_parameters.standard_codec_ids == [ 489 CodecID.A_LOG, 490 CodecID.CVSD, 491 CodecID.LINEAR_PCM, 492 ] 493 494 495# ----------------------------------------------------------------------------- 496def test_HCI_Read_Local_Supported_Codecs_V2_Command_Complete(): 497 returned_parameters = ( 498 HCI_Read_Local_Supported_Codecs_V2_Command.parse_return_parameters( 499 bytes( 500 [ 501 HCI_SUCCESS, 502 3, 503 CodecID.A_LOG, 504 HCI_Read_Local_Supported_Codecs_V2_Command.Transport.BR_EDR_ACL, 505 CodecID.CVSD, 506 HCI_Read_Local_Supported_Codecs_V2_Command.Transport.BR_EDR_SCO, 507 CodecID.LINEAR_PCM, 508 HCI_Read_Local_Supported_Codecs_V2_Command.Transport.LE_CIS, 509 0, 510 ] 511 ) 512 ) 513 ) 514 assert returned_parameters.standard_codec_ids == [ 515 CodecID.A_LOG, 516 CodecID.CVSD, 517 CodecID.LINEAR_PCM, 518 ] 519 assert returned_parameters.standard_codec_transports == [ 520 HCI_Read_Local_Supported_Codecs_V2_Command.Transport.BR_EDR_ACL, 521 HCI_Read_Local_Supported_Codecs_V2_Command.Transport.BR_EDR_SCO, 522 HCI_Read_Local_Supported_Codecs_V2_Command.Transport.LE_CIS, 523 ] 524 525 526# ----------------------------------------------------------------------------- 527def test_address(): 528 a = Address('C4:F2:17:1A:1D:BB') 529 assert not a.is_public 530 assert a.is_random 531 assert a.address_type == Address.RANDOM_DEVICE_ADDRESS 532 assert not a.is_resolvable 533 assert not a.is_resolved 534 assert a.is_static 535 536 537# ----------------------------------------------------------------------------- 538def test_custom(): 539 data = bytes([0x77, 0x02, 0x01, 0x03]) 540 packet = HCI_CustomPacket(data) 541 assert packet.hci_packet_type == 0x77 542 assert packet.payload == data 543 544 545# ----------------------------------------------------------------------------- 546def test_iso_data_packet(): 547 data = bytes.fromhex( 548 '05616044002ac9f0a193003c00e83b477b00eba8d41dc018bf1a980f0290afe1e7c37652096697' 549 '52b6a535a8df61e22931ef5a36281bc77ed6a3206d984bcdabee6be831c699cb50e2' 550 ) 551 packet = HCI_IsoDataPacket.from_bytes(data) 552 assert packet.connection_handle == 0x0061 553 assert packet.packet_status_flag == 0 554 assert packet.pb_flag == 0x02 555 assert packet.ts_flag == 0x01 556 assert packet.data_total_length == 68 557 assert packet.time_stamp == 2716911914 558 assert packet.packet_sequence_number == 147 559 assert packet.iso_sdu_length == 60 560 assert packet.iso_sdu_fragment == bytes.fromhex( 561 'e83b477b00eba8d41dc018bf1a980f0290afe1e7c3765209669752b6a535a8df61e22931ef5a3' 562 '6281bc77ed6a3206d984bcdabee6be831c699cb50e2' 563 ) 564 565 assert packet.to_bytes() == data 566 567 568# ----------------------------------------------------------------------------- 569def run_test_events(): 570 test_HCI_Event() 571 test_HCI_LE_Connection_Complete_Event() 572 test_HCI_LE_Advertising_Report_Event() 573 test_HCI_LE_Connection_Update_Complete_Event() 574 test_HCI_LE_Read_Remote_Features_Complete_Event() 575 test_HCI_LE_Channel_Selection_Algorithm_Event() 576 test_HCI_Command_Complete_Event() 577 test_HCI_Command_Status_Event() 578 test_HCI_Number_Of_Completed_Packets_Event() 579 580 581# ----------------------------------------------------------------------------- 582def run_test_commands(): 583 test_HCI_Command() 584 test_HCI_Reset_Command() 585 test_HCI_PIN_Code_Request_Reply_Command() 586 test_HCI_Read_Local_Version_Information_Command() 587 test_HCI_Read_Local_Supported_Commands_Command() 588 test_HCI_Read_Local_Supported_Features_Command() 589 test_HCI_Disconnect_Command() 590 test_HCI_Set_Event_Mask_Command() 591 test_HCI_LE_Set_Event_Mask_Command() 592 test_HCI_LE_Set_Random_Address_Command() 593 test_HCI_LE_Set_Advertising_Parameters_Command() 594 test_HCI_LE_Set_Advertising_Data_Command() 595 test_HCI_LE_Set_Scan_Parameters_Command() 596 test_HCI_LE_Set_Scan_Enable_Command() 597 test_HCI_LE_Create_Connection_Command() 598 test_HCI_LE_Extended_Create_Connection_Command() 599 test_HCI_LE_Add_Device_To_Filter_Accept_List_Command() 600 test_HCI_LE_Remove_Device_From_Filter_Accept_List_Command() 601 test_HCI_LE_Connection_Update_Command() 602 test_HCI_LE_Read_Remote_Features_Command() 603 test_HCI_LE_Set_Default_PHY_Command() 604 test_HCI_LE_Set_Extended_Scan_Parameters_Command() 605 test_HCI_LE_Set_Extended_Advertising_Enable_Command() 606 test_HCI_LE_Setup_ISO_Data_Path_Command() 607 608 609# ----------------------------------------------------------------------------- 610if __name__ == '__main__': 611 run_test_events() 612 run_test_commands() 613 test_address() 614 test_custom() 615 test_iso_data_packet() 616