1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import pyee 19 20from bumble.device import Device 21from bumble.hci import HCI_Reset_Command 22 23 24# ----------------------------------------------------------------------------- 25class Scanner(pyee.EventEmitter): 26 """ 27 Scanner web app 28 29 Emitted events: 30 update: Emit when new `ScanEntry` are available. 31 """ 32 33 class ScanEntry: 34 def __init__(self, advertisement): 35 self.address = advertisement.address.to_string(False) 36 self.address_type = ( 37 'Public', 38 'Random', 39 'Public Identity', 40 'Random Identity', 41 )[advertisement.address.address_type] 42 self.rssi = advertisement.rssi 43 self.data = advertisement.data.to_string('\n') 44 45 def __init__(self, hci_source, hci_sink): 46 super().__init__() 47 self.device = Device.with_hci( 48 'Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink 49 ) 50 self.scan_entries = {} 51 self.device.on('advertisement', self.on_advertisement) 52 53 async def start(self): 54 print('### Starting Scanner') 55 self.scan_entries = {} 56 self.emit('update', self.scan_entries) 57 await self.device.power_on() 58 await self.device.start_scanning() 59 print('### Scanner started') 60 61 async def stop(self): 62 # TODO: replace this once a proper reset is implemented in the lib. 63 await self.device.host.send_command(HCI_Reset_Command()) 64 await self.device.power_off() 65 print('### Scanner stopped') 66 67 def on_advertisement(self, advertisement): 68 self.scan_entries[advertisement.address] = self.ScanEntry(advertisement) 69 self.emit('update', self.scan_entries) 70 71 72# ----------------------------------------------------------------------------- 73def main(hci_source, hci_sink): 74 return Scanner(hci_source, hci_sink) 75