1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import asyncio
19import os
20import logging
21import click
22
23from bumble.colors import color
24from bumble.device import Device
25from bumble.transport import open_transport_or_link
26from bumble.keys import JsonKeyStore
27from bumble.smp import AddressResolver
28from bumble.device import Advertisement
29from bumble.hci import Address, HCI_Constant, HCI_LE_1M_PHY, HCI_LE_CODED_PHY
30
31
32# -----------------------------------------------------------------------------
33def make_rssi_bar(rssi):
34    DISPLAY_MIN_RSSI = -105
35    DISPLAY_MAX_RSSI = -30
36    DEFAULT_RSSI_BAR_WIDTH = 30
37
38    blocks = ['', '▏', '▎', '▍', '▌', '▋', '▊', '▉']
39    bar_width = (rssi - DISPLAY_MIN_RSSI) / (DISPLAY_MAX_RSSI - DISPLAY_MIN_RSSI)
40    bar_width = min(max(bar_width, 0), 1)
41    bar_ticks = int(bar_width * DEFAULT_RSSI_BAR_WIDTH * 8)
42    return ('█' * int(bar_ticks / 8)) + blocks[bar_ticks % 8]
43
44
45# -----------------------------------------------------------------------------
46class AdvertisementPrinter:
47    def __init__(self, min_rssi, resolver):
48        self.min_rssi = min_rssi
49        self.resolver = resolver
50
51    def print_advertisement(self, advertisement):
52        address = advertisement.address
53        address_color = 'yellow' if advertisement.is_connectable else 'red'
54
55        if self.min_rssi is not None and advertisement.rssi < self.min_rssi:
56            return
57
58        address_qualifier = ''
59        resolution_qualifier = ''
60        if self.resolver and advertisement.address.is_resolvable:
61            resolved = self.resolver.resolve(advertisement.address)
62            if resolved is not None:
63                resolution_qualifier = f'(resolved from {advertisement.address})'
64                address = resolved
65
66        address_type_string = ('PUBLIC', 'RANDOM', 'PUBLIC_ID', 'RANDOM_ID')[
67            address.address_type
68        ]
69        if address.address_type in (
70            Address.RANDOM_IDENTITY_ADDRESS,
71            Address.PUBLIC_IDENTITY_ADDRESS,
72        ):
73            type_color = 'yellow'
74        else:
75            if address.is_public:
76                type_color = 'cyan'
77            elif address.is_static:
78                type_color = 'green'
79                address_qualifier = '(static)'
80            elif address.is_resolvable:
81                type_color = 'magenta'
82                address_qualifier = '(resolvable)'
83            else:
84                type_color = 'blue'
85                address_qualifier = '(non-resolvable)'
86
87        separator = '\n  '
88        rssi_bar = make_rssi_bar(advertisement.rssi)
89        if not advertisement.is_legacy:
90            phy_info = (
91                f'PHY: {HCI_Constant.le_phy_name(advertisement.primary_phy)}/'
92                f'{HCI_Constant.le_phy_name(advertisement.secondary_phy)} '
93                f'{separator}'
94            )
95        else:
96            phy_info = ''
97
98        print(
99            f'>>> {color(address, address_color)} '
100            f'[{color(address_type_string, type_color)}]{address_qualifier}'
101            f'{resolution_qualifier}:{separator}'
102            f'{phy_info}'
103            f'RSSI:{advertisement.rssi:4} {rssi_bar}{separator}'
104            f'{advertisement.data.to_string(separator)}\n'
105        )
106
107    def on_advertisement(self, advertisement):
108        self.print_advertisement(advertisement)
109
110    def on_advertising_report(self, report):
111        print(f'{color("EVENT", "green")}: {report.event_type_string()}')
112        self.print_advertisement(Advertisement.from_advertising_report(report))
113
114
115# -----------------------------------------------------------------------------
116async def scan(
117    min_rssi,
118    passive,
119    scan_interval,
120    scan_window,
121    phy,
122    filter_duplicates,
123    raw,
124    irks,
125    keystore_file,
126    device_config,
127    transport,
128):
129    print('<<< connecting to HCI...')
130    async with await open_transport_or_link(transport) as (hci_source, hci_sink):
131        print('<<< connected')
132
133        if device_config:
134            device = Device.from_config_file_with_hci(
135                device_config, hci_source, hci_sink
136            )
137        else:
138            device = Device.with_hci(
139                'Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink
140            )
141
142        await device.power_on()
143
144        if keystore_file:
145            device.keystore = JsonKeyStore.from_device(device, filename=keystore_file)
146
147        if device.keystore:
148            resolving_keys = await device.keystore.get_resolving_keys()
149        else:
150            resolving_keys = []
151
152        for irk_and_address in irks:
153            if ':' not in irk_and_address:
154                raise ValueError('invalid IRK:ADDRESS value')
155            irk_hex, address_str = irk_and_address.split(':', 1)
156            resolving_keys.append(
157                (
158                    bytes.fromhex(irk_hex),
159                    Address(address_str, Address.RANDOM_DEVICE_ADDRESS),
160                )
161            )
162
163        resolver = AddressResolver(resolving_keys) if resolving_keys else None
164
165        printer = AdvertisementPrinter(min_rssi, resolver)
166        if raw:
167            device.host.on('advertising_report', printer.on_advertising_report)
168        else:
169            device.on('advertisement', printer.on_advertisement)
170
171        if phy is None:
172            scanning_phys = [HCI_LE_1M_PHY, HCI_LE_CODED_PHY]
173        else:
174            scanning_phys = [{'1m': HCI_LE_1M_PHY, 'coded': HCI_LE_CODED_PHY}[phy]]
175
176        await device.start_scanning(
177            active=(not passive),
178            scan_interval=scan_interval,
179            scan_window=scan_window,
180            filter_duplicates=filter_duplicates,
181            scanning_phys=scanning_phys,
182        )
183
184        await hci_source.wait_for_termination()
185
186
187# -----------------------------------------------------------------------------
188@click.command()
189@click.option('--min-rssi', type=int, help='Minimum RSSI value')
190@click.option('--passive', is_flag=True, default=False, help='Perform passive scanning')
191@click.option('--scan-interval', type=int, default=60, help='Scan interval')
192@click.option('--scan-window', type=int, default=60, help='Scan window')
193@click.option(
194    '--phy', type=click.Choice(['1m', 'coded']), help='Only scan on the specified PHY'
195)
196@click.option(
197    '--filter-duplicates',
198    type=bool,
199    default=True,
200    help='Filter duplicates at the controller level',
201)
202@click.option(
203    '--raw',
204    is_flag=True,
205    default=False,
206    help='Listen for raw advertising reports instead of processed ones',
207)
208@click.option(
209    '--irk',
210    metavar='<IRK_HEX>:<ADDRESS>',
211    help=(
212        'Use this IRK for resolving private addresses ' '(may be used more than once)'
213    ),
214    multiple=True,
215)
216@click.option(
217    '--keystore-file',
218    metavar='FILE_PATH',
219    help='Keystore file to use when resolving addresses',
220)
221@click.option(
222    '--device-config',
223    metavar='FILE_PATH',
224    help='Device config file for the scanning device',
225)
226@click.argument('transport')
227def main(
228    min_rssi,
229    passive,
230    scan_interval,
231    scan_window,
232    phy,
233    filter_duplicates,
234    raw,
235    irk,
236    keystore_file,
237    device_config,
238    transport,
239):
240    logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
241    asyncio.run(
242        scan(
243            min_rssi,
244            passive,
245            scan_interval,
246            scan_window,
247            phy,
248            filter_duplicates,
249            raw,
250            irk,
251            keystore_file,
252            device_config,
253            transport,
254        )
255    )
256
257
258# -----------------------------------------------------------------------------
259if __name__ == '__main__':
260    main()
261