1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import asyncio 19import os 20import logging 21from bumble.colors import color 22from bumble.core import ProtocolError 23from bumble.controller import Controller 24from bumble.device import Device, Peer 25from bumble.hci import Address 26from bumble.host import Host 27from bumble.link import LocalLink 28from bumble.gatt import ( 29 Service, 30 Characteristic, 31 Descriptor, 32 GATT_CHARACTERISTIC_USER_DESCRIPTION_DESCRIPTOR, 33 GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC, 34 GATT_DEVICE_INFORMATION_SERVICE, 35) 36from bumble.gatt_client import show_services 37 38 39# ----------------------------------------------------------------------------- 40class ServerListener(Device.Listener): 41 def on_connection(self, connection): 42 print(f'### Server: connected to {connection}') 43 44 45# ----------------------------------------------------------------------------- 46async def main() -> None: 47 # Create a local link 48 link = LocalLink() 49 50 # Setup a stack for the client 51 client_controller = Controller("client controller", link=link) 52 client_host = Host() 53 client_host.controller = client_controller 54 client_device = Device( 55 "client", address=Address('F0:F1:F2:F3:F4:F5'), host=client_host 56 ) 57 await client_device.power_on() 58 59 # Setup a stack for the server 60 server_controller = Controller("server controller", link=link) 61 server_host = Host() 62 server_host.controller = server_controller 63 server_device = Device( 64 "server", address=Address('F6:F7:F8:F9:FA:FB'), host=server_host 65 ) 66 server_device.listener = ServerListener() 67 await server_device.power_on() 68 69 # Add a few entries to the device's GATT server 70 descriptor = Descriptor( 71 GATT_CHARACTERISTIC_USER_DESCRIPTION_DESCRIPTOR, 72 Descriptor.READABLE, 73 'My Description', 74 ) 75 manufacturer_name_characteristic = Characteristic( 76 GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC, 77 Characteristic.Properties.READ, 78 Characteristic.READABLE, 79 "Fitbit", 80 [descriptor], 81 ) 82 device_info_service = Service( 83 GATT_DEVICE_INFORMATION_SERVICE, [manufacturer_name_characteristic] 84 ) 85 server_device.add_service(device_info_service) 86 87 # Connect the client to the server 88 connection = await client_device.connect(server_device.random_address) 89 print(f'=== Client: connected to {connection}') 90 91 # Discover all services 92 print('=== Discovering services') 93 peer = Peer(connection) 94 await peer.discover_services() 95 for service in peer.services: 96 await service.discover_characteristics() 97 for characteristic in service.characteristics: 98 await characteristic.discover_descriptors() 99 100 print('=== Services discovered') 101 show_services(peer.services) 102 103 # Discover all attributes 104 print('=== Discovering attributes') 105 attributes = await peer.discover_attributes() 106 for attribute in attributes: 107 print(attribute) 108 print('=== Attributes discovered') 109 110 # Read all attributes 111 for attribute in attributes: 112 try: 113 value = await attribute.read_value() 114 print(color(f'0x{attribute.handle:04X} = {value.hex()}', 'green')) 115 except ProtocolError as error: 116 print(color(f'cannot read {attribute.handle:04X}:', 'red'), error) 117 118 await asyncio.get_running_loop().create_future() 119 120 121# ----------------------------------------------------------------------------- 122logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper()) 123asyncio.run(main()) 124