1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import asyncio 19import sys 20import os 21import logging 22 23from bumble.device import Device, Connection 24from bumble.transport import open_transport_or_link 25from bumble.att import ATT_Error, ATT_INSUFFICIENT_ENCRYPTION_ERROR 26from bumble.gatt import ( 27 Service, 28 Characteristic, 29 CharacteristicValue, 30 Descriptor, 31 GATT_CHARACTERISTIC_USER_DESCRIPTION_DESCRIPTOR, 32 GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC, 33 GATT_DEVICE_INFORMATION_SERVICE, 34) 35 36 37# ----------------------------------------------------------------------------- 38class Listener(Device.Listener, Connection.Listener): 39 def __init__(self, device): 40 self.device = device 41 42 def on_connection(self, connection): 43 print(f'=== Connected to {connection}') 44 connection.listener = self 45 46 def on_disconnection(self, reason): 47 print(f'### Disconnected, reason={reason}') 48 49 50def my_custom_read(connection): 51 print('----- READ from', connection) 52 return bytes(f'Hello {connection}', 'ascii') 53 54 55def my_custom_write(connection, value): 56 print(f'----- WRITE from {connection}: {value}') 57 58 59def my_custom_read_with_error(connection): 60 print('----- READ from', connection, '[returning error]') 61 if connection.is_encrypted: 62 return bytes([123]) 63 64 raise ATT_Error(ATT_INSUFFICIENT_ENCRYPTION_ERROR) 65 66 67def my_custom_write_with_error(connection, value): 68 print(f'----- WRITE from {connection}: {value}', '[returning error]') 69 if not connection.is_encrypted: 70 raise ATT_Error(ATT_INSUFFICIENT_ENCRYPTION_ERROR) 71 72 73# ----------------------------------------------------------------------------- 74async def main() -> None: 75 if len(sys.argv) < 3: 76 print( 77 'Usage: run_gatt_server.py <device-config> <transport-spec> ' 78 '[<bluetooth-address>]' 79 ) 80 print('example: run_gatt_server.py device1.json usb:0 E1:CA:72:48:C4:E8') 81 return 82 83 print('<<< connecting to HCI...') 84 async with await open_transport_or_link(sys.argv[2]) as hci_transport: 85 print('<<< connected') 86 87 # Create a device to manage the host 88 device = Device.from_config_file_with_hci( 89 sys.argv[1], hci_transport.source, hci_transport.sink 90 ) 91 device.listener = Listener(device) 92 93 # Add a few entries to the device's GATT server 94 descriptor = Descriptor( 95 GATT_CHARACTERISTIC_USER_DESCRIPTION_DESCRIPTOR, 96 Descriptor.READABLE, 97 'My Description', 98 ) 99 manufacturer_name_characteristic = Characteristic( 100 GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC, 101 Characteristic.Properties.READ, 102 Characteristic.READABLE, 103 'Fitbit', 104 [descriptor], 105 ) 106 device_info_service = Service( 107 GATT_DEVICE_INFORMATION_SERVICE, [manufacturer_name_characteristic] 108 ) 109 custom_service1 = Service( 110 '50DB505C-8AC4-4738-8448-3B1D9CC09CC5', 111 [ 112 Characteristic( 113 'D901B45B-4916-412E-ACCA-376ECB603B2C', 114 Characteristic.Properties.READ | Characteristic.Properties.WRITE, 115 Characteristic.READABLE | Characteristic.WRITEABLE, 116 CharacteristicValue(read=my_custom_read, write=my_custom_write), 117 ), 118 Characteristic( 119 '552957FB-CF1F-4A31-9535-E78847E1A714', 120 Characteristic.Properties.READ | Characteristic.Properties.WRITE, 121 Characteristic.READABLE | Characteristic.WRITEABLE, 122 CharacteristicValue( 123 read=my_custom_read_with_error, write=my_custom_write_with_error 124 ), 125 ), 126 Characteristic( 127 '486F64C6-4B5F-4B3B-8AFF-EDE134A8446A', 128 Characteristic.Properties.READ | Characteristic.Properties.NOTIFY, 129 Characteristic.READABLE, 130 'hello', 131 ), 132 ], 133 ) 134 device.add_services([device_info_service, custom_service1]) 135 136 # Debug print 137 for attribute in device.gatt_server.attributes: 138 print(attribute) 139 140 # Get things going 141 await device.power_on() 142 143 # Connect to a peer 144 if len(sys.argv) > 3: 145 target_address = sys.argv[3] 146 print(f'=== Connecting to {target_address}...') 147 await device.connect(target_address) 148 else: 149 await device.start_advertising(auto_restart=True) 150 151 await hci_transport.source.wait_for_termination() 152 153 154# ----------------------------------------------------------------------------- 155logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper()) 156asyncio.run(main()) 157