1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import asyncio 19import os 20import logging 21from typing import Callable, Iterable, Optional 22 23import click 24 25from bumble.core import ProtocolError 26from bumble.colors import color 27from bumble.device import Device, Peer 28from bumble.gatt import Service 29from bumble.profiles.device_information_service import DeviceInformationServiceProxy 30from bumble.profiles.battery_service import BatteryServiceProxy 31from bumble.profiles.gap import GenericAccessServiceProxy 32from bumble.profiles.tmap import TelephonyAndMediaAudioServiceProxy 33from bumble.transport import open_transport_or_link 34 35 36# ----------------------------------------------------------------------------- 37async def try_show(function: Callable, *args, **kwargs) -> None: 38 try: 39 await function(*args, **kwargs) 40 except ProtocolError as error: 41 print(color('ERROR:', 'red'), error) 42 43 44# ----------------------------------------------------------------------------- 45def show_services(services: Iterable[Service]) -> None: 46 for service in services: 47 print(color(str(service), 'cyan')) 48 49 for characteristic in service.characteristics: 50 print(color(' ' + str(characteristic), 'magenta')) 51 52 53# ----------------------------------------------------------------------------- 54async def show_gap_information( 55 gap_service: GenericAccessServiceProxy, 56): 57 print(color('### Generic Access Profile', 'yellow')) 58 59 if gap_service.device_name: 60 print( 61 color(' Device Name:', 'green'), 62 await gap_service.device_name.read_value(), 63 ) 64 65 if gap_service.appearance: 66 print( 67 color(' Appearance: ', 'green'), 68 await gap_service.appearance.read_value(), 69 ) 70 71 print() 72 73 74# ----------------------------------------------------------------------------- 75async def show_device_information( 76 device_information_service: DeviceInformationServiceProxy, 77): 78 print(color('### Device Information', 'yellow')) 79 80 if device_information_service.manufacturer_name: 81 print( 82 color(' Manufacturer Name:', 'green'), 83 await device_information_service.manufacturer_name.read_value(), 84 ) 85 86 if device_information_service.model_number: 87 print( 88 color(' Model Number: ', 'green'), 89 await device_information_service.model_number.read_value(), 90 ) 91 92 if device_information_service.serial_number: 93 print( 94 color(' Serial Number: ', 'green'), 95 await device_information_service.serial_number.read_value(), 96 ) 97 98 if device_information_service.firmware_revision: 99 print( 100 color(' Firmware Revision:', 'green'), 101 await device_information_service.firmware_revision.read_value(), 102 ) 103 104 print() 105 106 107# ----------------------------------------------------------------------------- 108async def show_battery_level( 109 battery_service: BatteryServiceProxy, 110): 111 print(color('### Battery Information', 'yellow')) 112 113 if battery_service.battery_level: 114 print( 115 color(' Battery Level:', 'green'), 116 await battery_service.battery_level.read_value(), 117 ) 118 119 print() 120 121 122# ----------------------------------------------------------------------------- 123async def show_tmas( 124 tmas: TelephonyAndMediaAudioServiceProxy, 125): 126 print(color('### Telephony And Media Audio Service', 'yellow')) 127 128 if tmas.role: 129 print( 130 color(' Role:', 'green'), 131 await tmas.role.read_value(), 132 ) 133 134 print() 135 136 137# ----------------------------------------------------------------------------- 138async def show_device_info(peer, done: Optional[asyncio.Future]) -> None: 139 try: 140 # Discover all services 141 print(color('### Discovering Services and Characteristics', 'magenta')) 142 await peer.discover_services() 143 for service in peer.services: 144 await service.discover_characteristics() 145 146 print(color('=== Services ===', 'yellow')) 147 show_services(peer.services) 148 print() 149 150 if gap_service := peer.create_service_proxy(GenericAccessServiceProxy): 151 await try_show(show_gap_information, gap_service) 152 153 if device_information_service := peer.create_service_proxy( 154 DeviceInformationServiceProxy 155 ): 156 await try_show(show_device_information, device_information_service) 157 158 if battery_service := peer.create_service_proxy(BatteryServiceProxy): 159 await try_show(show_battery_level, battery_service) 160 161 if tmas := peer.create_service_proxy(TelephonyAndMediaAudioServiceProxy): 162 await try_show(show_tmas, tmas) 163 164 if done is not None: 165 done.set_result(None) 166 except asyncio.CancelledError: 167 print(color('!!! Operation canceled', 'red')) 168 169 170# ----------------------------------------------------------------------------- 171async def async_main(device_config, encrypt, transport, address_or_name): 172 async with await open_transport_or_link(transport) as (hci_source, hci_sink): 173 174 # Create a device 175 if device_config: 176 device = Device.from_config_file_with_hci( 177 device_config, hci_source, hci_sink 178 ) 179 else: 180 device = Device.with_hci( 181 'Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink 182 ) 183 await device.power_on() 184 185 if address_or_name: 186 # Connect to the target peer 187 print(color('>>> Connecting...', 'green')) 188 connection = await device.connect(address_or_name) 189 print(color('>>> Connected', 'green')) 190 191 # Encrypt the connection if required 192 if encrypt: 193 print(color('+++ Encrypting connection...', 'blue')) 194 await connection.encrypt() 195 print(color('+++ Encryption established', 'blue')) 196 197 await show_device_info(Peer(connection), None) 198 else: 199 # Wait for a connection 200 done = asyncio.get_running_loop().create_future() 201 device.on( 202 'connection', 203 lambda connection: asyncio.create_task( 204 show_device_info(Peer(connection), done) 205 ), 206 ) 207 await device.start_advertising(auto_restart=True) 208 209 print(color('### Waiting for connection...', 'blue')) 210 await done 211 212 213# ----------------------------------------------------------------------------- 214@click.command() 215@click.option('--device-config', help='Device configuration', type=click.Path()) 216@click.option('--encrypt', help='Encrypt the connection', is_flag=True, default=False) 217@click.argument('transport') 218@click.argument('address-or-name', required=False) 219def main(device_config, encrypt, transport, address_or_name): 220 """ 221 Dump the GATT database on a remote device. If ADDRESS_OR_NAME is not specified, 222 wait for an incoming connection. 223 """ 224 logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) 225 asyncio.run(async_main(device_config, encrypt, transport, address_or_name)) 226 227 228# ----------------------------------------------------------------------------- 229if __name__ == '__main__': 230 main() 231