1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import asyncio
19import os
20import logging
21from typing import Callable, Iterable, Optional
22
23import click
24
25from bumble.core import ProtocolError
26from bumble.colors import color
27from bumble.device import Device, Peer
28from bumble.gatt import Service
29from bumble.profiles.device_information_service import DeviceInformationServiceProxy
30from bumble.profiles.battery_service import BatteryServiceProxy
31from bumble.profiles.gap import GenericAccessServiceProxy
32from bumble.profiles.tmap import TelephonyAndMediaAudioServiceProxy
33from bumble.transport import open_transport_or_link
34
35
36# -----------------------------------------------------------------------------
37async def try_show(function: Callable, *args, **kwargs) -> None:
38    try:
39        await function(*args, **kwargs)
40    except ProtocolError as error:
41        print(color('ERROR:', 'red'), error)
42
43
44# -----------------------------------------------------------------------------
45def show_services(services: Iterable[Service]) -> None:
46    for service in services:
47        print(color(str(service), 'cyan'))
48
49        for characteristic in service.characteristics:
50            print(color('  ' + str(characteristic), 'magenta'))
51
52
53# -----------------------------------------------------------------------------
54async def show_gap_information(
55    gap_service: GenericAccessServiceProxy,
56):
57    print(color('### Generic Access Profile', 'yellow'))
58
59    if gap_service.device_name:
60        print(
61            color(' Device Name:', 'green'),
62            await gap_service.device_name.read_value(),
63        )
64
65    if gap_service.appearance:
66        print(
67            color(' Appearance: ', 'green'),
68            await gap_service.appearance.read_value(),
69        )
70
71    print()
72
73
74# -----------------------------------------------------------------------------
75async def show_device_information(
76    device_information_service: DeviceInformationServiceProxy,
77):
78    print(color('### Device Information', 'yellow'))
79
80    if device_information_service.manufacturer_name:
81        print(
82            color('  Manufacturer Name:', 'green'),
83            await device_information_service.manufacturer_name.read_value(),
84        )
85
86    if device_information_service.model_number:
87        print(
88            color('  Model Number:     ', 'green'),
89            await device_information_service.model_number.read_value(),
90        )
91
92    if device_information_service.serial_number:
93        print(
94            color('  Serial Number:    ', 'green'),
95            await device_information_service.serial_number.read_value(),
96        )
97
98    if device_information_service.firmware_revision:
99        print(
100            color('  Firmware Revision:', 'green'),
101            await device_information_service.firmware_revision.read_value(),
102        )
103
104    print()
105
106
107# -----------------------------------------------------------------------------
108async def show_battery_level(
109    battery_service: BatteryServiceProxy,
110):
111    print(color('### Battery Information', 'yellow'))
112
113    if battery_service.battery_level:
114        print(
115            color('  Battery Level:', 'green'),
116            await battery_service.battery_level.read_value(),
117        )
118
119    print()
120
121
122# -----------------------------------------------------------------------------
123async def show_tmas(
124    tmas: TelephonyAndMediaAudioServiceProxy,
125):
126    print(color('### Telephony And Media Audio Service', 'yellow'))
127
128    if tmas.role:
129        print(
130            color('  Role:', 'green'),
131            await tmas.role.read_value(),
132        )
133
134    print()
135
136
137# -----------------------------------------------------------------------------
138async def show_device_info(peer, done: Optional[asyncio.Future]) -> None:
139    try:
140        # Discover all services
141        print(color('### Discovering Services and Characteristics', 'magenta'))
142        await peer.discover_services()
143        for service in peer.services:
144            await service.discover_characteristics()
145
146        print(color('=== Services ===', 'yellow'))
147        show_services(peer.services)
148        print()
149
150        if gap_service := peer.create_service_proxy(GenericAccessServiceProxy):
151            await try_show(show_gap_information, gap_service)
152
153        if device_information_service := peer.create_service_proxy(
154            DeviceInformationServiceProxy
155        ):
156            await try_show(show_device_information, device_information_service)
157
158        if battery_service := peer.create_service_proxy(BatteryServiceProxy):
159            await try_show(show_battery_level, battery_service)
160
161        if tmas := peer.create_service_proxy(TelephonyAndMediaAudioServiceProxy):
162            await try_show(show_tmas, tmas)
163
164        if done is not None:
165            done.set_result(None)
166    except asyncio.CancelledError:
167        print(color('!!! Operation canceled', 'red'))
168
169
170# -----------------------------------------------------------------------------
171async def async_main(device_config, encrypt, transport, address_or_name):
172    async with await open_transport_or_link(transport) as (hci_source, hci_sink):
173
174        # Create a device
175        if device_config:
176            device = Device.from_config_file_with_hci(
177                device_config, hci_source, hci_sink
178            )
179        else:
180            device = Device.with_hci(
181                'Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink
182            )
183        await device.power_on()
184
185        if address_or_name:
186            # Connect to the target peer
187            print(color('>>> Connecting...', 'green'))
188            connection = await device.connect(address_or_name)
189            print(color('>>> Connected', 'green'))
190
191            # Encrypt the connection if required
192            if encrypt:
193                print(color('+++ Encrypting connection...', 'blue'))
194                await connection.encrypt()
195                print(color('+++ Encryption established', 'blue'))
196
197            await show_device_info(Peer(connection), None)
198        else:
199            # Wait for a connection
200            done = asyncio.get_running_loop().create_future()
201            device.on(
202                'connection',
203                lambda connection: asyncio.create_task(
204                    show_device_info(Peer(connection), done)
205                ),
206            )
207            await device.start_advertising(auto_restart=True)
208
209            print(color('### Waiting for connection...', 'blue'))
210            await done
211
212
213# -----------------------------------------------------------------------------
214@click.command()
215@click.option('--device-config', help='Device configuration', type=click.Path())
216@click.option('--encrypt', help='Encrypt the connection', is_flag=True, default=False)
217@click.argument('transport')
218@click.argument('address-or-name', required=False)
219def main(device_config, encrypt, transport, address_or_name):
220    """
221    Dump the GATT database on a remote device. If ADDRESS_OR_NAME is not specified,
222    wait for an incoming connection.
223    """
224    logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
225    asyncio.run(async_main(device_config, encrypt, transport, address_or_name))
226
227
228# -----------------------------------------------------------------------------
229if __name__ == '__main__':
230    main()
231