1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import asyncio
19import sys
20import os
21import logging
22
23from bumble.colors import color
24from bumble.device import Device
25from bumble.transport import open_transport_or_link
26from bumble.core import (
27    BT_BR_EDR_TRANSPORT,
28    BT_AVDTP_PROTOCOL_ID,
29    BT_AUDIO_SINK_SERVICE,
30    BT_L2CAP_PROTOCOL_ID,
31)
32from bumble.avdtp import Protocol as AVDTP_Protocol
33from bumble.a2dp import make_audio_source_service_sdp_records
34from bumble.sdp import (
35    Client as SDP_Client,
36    ServiceAttribute,
37    DataElement,
38    SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
39    SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
40    SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
41)
42
43
44# -----------------------------------------------------------------------------
45def sdp_records():
46    service_record_handle = 0x00010001
47    return {
48        service_record_handle: make_audio_source_service_sdp_records(
49            service_record_handle
50        )
51    }
52
53
54# -----------------------------------------------------------------------------
55# pylint: disable-next=too-many-nested-blocks
56async def find_a2dp_service(connection):
57    # Connect to the SDP Server
58    sdp_client = SDP_Client(connection)
59    await sdp_client.connect()
60
61    # Search for services with an Audio Sink service class
62    search_result = await sdp_client.search_attributes(
63        [BT_AUDIO_SINK_SERVICE],
64        [
65            SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
66            SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
67            SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
68        ],
69    )
70
71    print(color('==================================', 'blue'))
72    print(color('A2DP Sink Services:', 'yellow'))
73
74    service_version = None
75
76    for attribute_list in search_result:
77        print(color('SERVICE:', 'green'))
78
79        # Service classes
80        service_class_id_list = ServiceAttribute.find_attribute_in_list(
81            attribute_list, SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID
82        )
83        if service_class_id_list:
84            if service_class_id_list.value:
85                print(color('  Service Classes:', 'green'))
86                for service_class_id in service_class_id_list.value:
87                    print('   ', service_class_id.value)
88
89        # Protocol info
90        protocol_descriptor_list = ServiceAttribute.find_attribute_in_list(
91            attribute_list, SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID
92        )
93        if protocol_descriptor_list:
94            print(color('  Protocol:', 'green'))
95            for protocol_descriptor in protocol_descriptor_list.value:
96                if protocol_descriptor.value[0].value == BT_L2CAP_PROTOCOL_ID:
97                    if len(protocol_descriptor.value) >= 2:
98                        psm = protocol_descriptor.value[1].value
99                        print(f'{color("    L2CAP PSM:", "cyan")}     {psm}')
100                elif protocol_descriptor.value[0].value == BT_AVDTP_PROTOCOL_ID:
101                    if len(protocol_descriptor.value) >= 2:
102                        avdtp_version_major = protocol_descriptor.value[1].value >> 8
103                        avdtp_version_minor = protocol_descriptor.value[1].value & 0xFF
104                        print(
105                            f'{color("    AVDTP Version:", "cyan")} '
106                            f'{avdtp_version_major}.{avdtp_version_minor}'
107                        )
108                        service_version = (avdtp_version_major, avdtp_version_minor)
109
110        # Profile info
111        bluetooth_profile_descriptor_list = ServiceAttribute.find_attribute_in_list(
112            attribute_list, SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID
113        )
114        if bluetooth_profile_descriptor_list:
115            if bluetooth_profile_descriptor_list.value:
116                if (
117                    bluetooth_profile_descriptor_list.value[0].type
118                    == DataElement.SEQUENCE
119                ):
120                    bluetooth_profile_descriptors = (
121                        bluetooth_profile_descriptor_list.value
122                    )
123                else:
124                    # Sometimes, instead of a list of lists, we just find a list.
125                    # Fix that.
126                    bluetooth_profile_descriptors = [bluetooth_profile_descriptor_list]
127
128                print(color('  Profiles:', 'green'))
129                for bluetooth_profile_descriptor in bluetooth_profile_descriptors:
130                    version_major = bluetooth_profile_descriptor.value[1].value >> 8
131                    version_minor = bluetooth_profile_descriptor.value[1].value & 0xFF
132                    print(
133                        f'    {bluetooth_profile_descriptor.value[0].value}'
134                        f' - version {version_major}.{version_minor}'
135                    )
136
137    await sdp_client.disconnect()
138    return service_version
139
140
141# -----------------------------------------------------------------------------
142async def main() -> None:
143    if len(sys.argv) < 4:
144        print('Usage: run_a2dp_info.py <device-config> <transport-spec> <bt-addr>')
145        print('example: run_a2dp_info.py classic1.json usb:0 14:7D:DA:4E:53:A8')
146        return
147
148    print('<<< connecting to HCI...')
149    async with await open_transport_or_link(sys.argv[2]) as hci_transport:
150        print('<<< connected')
151
152        # Create a device
153        device = Device.from_config_file_with_hci(
154            sys.argv[1], hci_transport.source, hci_transport.sink
155        )
156        device.classic_enabled = True
157
158        # Start the controller
159        await device.power_on()
160
161        # Setup the SDP to expose a SRC service, in case the remote device queries us
162        # back
163        device.sdp_service_records = sdp_records()
164
165        # Connect to a peer
166        target_address = sys.argv[3]
167        print(f'=== Connecting to {target_address}...')
168        connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT)
169        print(f'=== Connected to {connection.peer_address}!')
170
171        # Request authentication
172        print('*** Authenticating...')
173        await connection.authenticate()
174        print('*** Authenticated')
175
176        # Enable encryption
177        print('*** Enabling encryption...')
178        await connection.encrypt()
179        print('*** Encryption on')
180
181        # Look for an A2DP service
182        avdtp_version = await find_a2dp_service(connection)
183        if not avdtp_version:
184            print(color('!!! no AVDTP service found'))
185            return
186        print(f'AVDTP version: {avdtp_version[0]}.{avdtp_version[1]}')
187
188        # Create a client to interact with the remote device
189        client = await AVDTP_Protocol.connect(connection, avdtp_version)
190
191        # Discover all endpoints on the remote device
192        endpoints = list(await client.discover_remote_endpoints())
193        print(f'@@@ Found {len(endpoints)} endpoints')
194        for endpoint in endpoints:
195            print('@@@', endpoint)
196
197
198# -----------------------------------------------------------------------------
199logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
200asyncio.run(main())
201