1# Copyright 2023 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import asyncio
16import hci_packets as hci
17import link_layer_packets as ll
18import unittest
19from hci_packets import ErrorCode
20from py.bluetooth import Address
21from py.controller import ControllerTest, generate_rpa
22
23
24class Test(ControllerTest):
25
26    # Verify that the scanner gracefully handles missing scan responses
27    # after a timeout.
28    async def test(self):
29        # Test parameters.
30        LL_scanner_scanInterval_MIN = 0x2000
31        LL_scanner_scanInterval_MAX = 0x2000
32        LL_scanner_scanWindow_MIN = 0x200
33        LL_scanner_scanWindow_MAX = 0x200
34        LL_scanner_Adv_Channel_Map = 0x7
35
36        controller = self.controller
37        peer_address = Address('aa:bb:cc:dd:ee:ff')
38
39        controller.send_cmd(
40            hci.LeSetScanParameters(le_scan_type=hci.LeScanType.ACTIVE,
41                                    le_scan_interval=LL_scanner_scanInterval_MAX,
42                                    le_scan_window=LL_scanner_scanWindow_MAX,
43                                    own_address_type=hci.OwnAddressType.RESOLVABLE_OR_PUBLIC_ADDRESS,
44                                    scanning_filter_policy=hci.LeScanningFilterPolicy.ACCEPT_ALL))
45
46        await self.expect_evt(hci.LeSetScanParametersComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1))
47
48        controller.send_cmd(
49            hci.LeSetScanEnable(le_scan_enable=hci.Enable.ENABLED, filter_duplicates=hci.Enable.DISABLED))
50
51        await self.expect_evt(hci.LeSetScanEnableComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1))
52
53        controller.send_ll(ll.LeLegacyAdvertisingPdu(source_address=peer_address,
54                                                     advertising_address_type=ll.AddressType.RANDOM,
55                                                     advertising_type=ll.LegacyAdvertisingType.ADV_SCAN_IND,
56                                                     advertising_data=[]),
57                           rssi=-16)
58
59        await self.expect_evt(
60            hci.LeAdvertisingReport(responses=[
61                hci.LeAdvertisingResponse(event_type=hci.AdvertisingEventType.ADV_SCAN_IND,
62                                          address_type=hci.AddressType.RANDOM_DEVICE_ADDRESS,
63                                          address=peer_address,
64                                          advertising_data=[],
65                                          rssi=0xf0)
66            ]))
67
68        await self.expect_ll(
69            ll.LeScan(source_address=controller.address,
70                      destination_address=peer_address,
71                      advertising_address_type=ll.AddressType.RANDOM,
72                      scanning_address_type=ll.AddressType.PUBLIC))
73
74        # No response is sent for the duration of the scan request timeout.
75        # The next scan request must be correctly handled.
76        await asyncio.sleep(1.0)
77
78        controller.send_ll(ll.LeLegacyAdvertisingPdu(source_address=peer_address,
79                                                     advertising_address_type=ll.AddressType.RANDOM,
80                                                     advertising_type=ll.LegacyAdvertisingType.ADV_SCAN_IND,
81                                                     advertising_data=[]),
82                           rssi=-16)
83
84        await self.expect_evt(
85            hci.LeAdvertisingReport(responses=[
86                hci.LeAdvertisingResponse(event_type=hci.AdvertisingEventType.ADV_SCAN_IND,
87                                          address_type=hci.AddressType.RANDOM_DEVICE_ADDRESS,
88                                          address=peer_address,
89                                          advertising_data=[],
90                                          rssi=0xf0)
91            ]))
92
93        await self.expect_ll(
94            ll.LeScan(source_address=controller.address,
95                      destination_address=peer_address,
96                      advertising_address_type=ll.AddressType.RANDOM,
97                      scanning_address_type=ll.AddressType.PUBLIC))
98
99        controller.send_ll(ll.LeScanResponse(source_address=peer_address,
100                                             advertising_address_type=ll.AddressType.RANDOM,
101                                             scan_response_data=[]),
102                           rssi=-16)
103
104        await self.expect_evt(
105            hci.LeAdvertisingReport(responses=[
106                hci.LeAdvertisingResponse(event_type=hci.AdvertisingEventType.SCAN_RESPONSE,
107                                          address_type=hci.AddressType.RANDOM_DEVICE_ADDRESS,
108                                          address=peer_address,
109                                          advertising_data=[],
110                                          rssi=0xf0)
111            ]))
112