1# Copyright 2023 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import asyncio 16import hci_packets as hci 17import link_layer_packets as ll 18import unittest 19from hci_packets import ErrorCode 20from py.bluetooth import Address 21from py.controller import ControllerTest, generate_rpa 22 23 24class Test(ControllerTest): 25 26 # Verify that the scanner gracefully handles missing scan responses 27 # after a timeout. 28 async def test(self): 29 # Test parameters. 30 LL_scanner_scanInterval_MIN = 0x2000 31 LL_scanner_scanInterval_MAX = 0x2000 32 LL_scanner_scanWindow_MIN = 0x200 33 LL_scanner_scanWindow_MAX = 0x200 34 LL_scanner_Adv_Channel_Map = 0x7 35 36 controller = self.controller 37 peer_address = Address('aa:bb:cc:dd:ee:ff') 38 39 controller.send_cmd( 40 hci.LeSetScanParameters(le_scan_type=hci.LeScanType.ACTIVE, 41 le_scan_interval=LL_scanner_scanInterval_MAX, 42 le_scan_window=LL_scanner_scanWindow_MAX, 43 own_address_type=hci.OwnAddressType.RESOLVABLE_OR_PUBLIC_ADDRESS, 44 scanning_filter_policy=hci.LeScanningFilterPolicy.ACCEPT_ALL)) 45 46 await self.expect_evt(hci.LeSetScanParametersComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1)) 47 48 controller.send_cmd( 49 hci.LeSetScanEnable(le_scan_enable=hci.Enable.ENABLED, filter_duplicates=hci.Enable.DISABLED)) 50 51 await self.expect_evt(hci.LeSetScanEnableComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1)) 52 53 controller.send_ll(ll.LeLegacyAdvertisingPdu(source_address=peer_address, 54 advertising_address_type=ll.AddressType.RANDOM, 55 advertising_type=ll.LegacyAdvertisingType.ADV_SCAN_IND, 56 advertising_data=[]), 57 rssi=-16) 58 59 await self.expect_evt( 60 hci.LeAdvertisingReport(responses=[ 61 hci.LeAdvertisingResponse(event_type=hci.AdvertisingEventType.ADV_SCAN_IND, 62 address_type=hci.AddressType.RANDOM_DEVICE_ADDRESS, 63 address=peer_address, 64 advertising_data=[], 65 rssi=0xf0) 66 ])) 67 68 await self.expect_ll( 69 ll.LeScan(source_address=controller.address, 70 destination_address=peer_address, 71 advertising_address_type=ll.AddressType.RANDOM, 72 scanning_address_type=ll.AddressType.PUBLIC)) 73 74 # No response is sent for the duration of the scan request timeout. 75 # The next scan request must be correctly handled. 76 await asyncio.sleep(1.0) 77 78 controller.send_ll(ll.LeLegacyAdvertisingPdu(source_address=peer_address, 79 advertising_address_type=ll.AddressType.RANDOM, 80 advertising_type=ll.LegacyAdvertisingType.ADV_SCAN_IND, 81 advertising_data=[]), 82 rssi=-16) 83 84 await self.expect_evt( 85 hci.LeAdvertisingReport(responses=[ 86 hci.LeAdvertisingResponse(event_type=hci.AdvertisingEventType.ADV_SCAN_IND, 87 address_type=hci.AddressType.RANDOM_DEVICE_ADDRESS, 88 address=peer_address, 89 advertising_data=[], 90 rssi=0xf0) 91 ])) 92 93 await self.expect_ll( 94 ll.LeScan(source_address=controller.address, 95 destination_address=peer_address, 96 advertising_address_type=ll.AddressType.RANDOM, 97 scanning_address_type=ll.AddressType.PUBLIC)) 98 99 controller.send_ll(ll.LeScanResponse(source_address=peer_address, 100 advertising_address_type=ll.AddressType.RANDOM, 101 scan_response_data=[]), 102 rssi=-16) 103 104 await self.expect_evt( 105 hci.LeAdvertisingReport(responses=[ 106 hci.LeAdvertisingResponse(event_type=hci.AdvertisingEventType.SCAN_RESPONSE, 107 address_type=hci.AddressType.RANDOM_DEVICE_ADDRESS, 108 address=peer_address, 109 advertising_data=[], 110 rssi=0xf0) 111 ])) 112