1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import pyee
19
20from bumble.device import Device
21from bumble.hci import HCI_Reset_Command
22
23
24# -----------------------------------------------------------------------------
25class Scanner(pyee.EventEmitter):
26    """
27    Scanner web app
28
29    Emitted events:
30        update: Emit when new `ScanEntry` are available.
31    """
32
33    class ScanEntry:
34        def __init__(self, advertisement):
35            self.address = advertisement.address.to_string(False)
36            self.address_type = (
37                'Public',
38                'Random',
39                'Public Identity',
40                'Random Identity',
41            )[advertisement.address.address_type]
42            self.rssi = advertisement.rssi
43            self.data = advertisement.data.to_string('\n')
44
45    def __init__(self, hci_source, hci_sink):
46        super().__init__()
47        self.device = Device.with_hci(
48            'Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink
49        )
50        self.scan_entries = {}
51        self.device.on('advertisement', self.on_advertisement)
52
53    async def start(self):
54        print('### Starting Scanner')
55        self.scan_entries = {}
56        self.emit('update', self.scan_entries)
57        await self.device.power_on()
58        await self.device.start_scanning()
59        print('### Scanner started')
60
61    async def stop(self):
62        # TODO: replace this once a proper reset is implemented in the lib.
63        await self.device.host.send_command(HCI_Reset_Command())
64        await self.device.power_off()
65        print('### Scanner stopped')
66
67    def on_advertisement(self, advertisement):
68        self.scan_entries[advertisement.address] = self.ScanEntry(advertisement)
69        self.emit('update', self.scan_entries)
70
71
72# -----------------------------------------------------------------------------
73def main(hci_source, hci_sink):
74    return Scanner(hci_source, hci_sink)
75