1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import asyncio
19import sys
20import os
21import logging
22
23from bumble.colors import color
24
25import bumble.core
26from bumble.device import Device
27from bumble.transport import open_transport_or_link
28from bumble.core import (
29    BT_L2CAP_PROTOCOL_ID,
30    BT_RFCOMM_PROTOCOL_ID,
31    BT_BR_EDR_TRANSPORT,
32)
33from bumble.rfcomm import Client
34from bumble.sdp import (
35    Client as SDP_Client,
36    DataElement,
37    ServiceAttribute,
38    SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
39    SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
40    SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
41)
42
43
44# -----------------------------------------------------------------------------
45async def list_rfcomm_channels(connection):
46    # Connect to the SDP Server
47    sdp_client = SDP_Client(connection)
48    await sdp_client.connect()
49
50    # Search for services with an L2CAP service attribute
51    search_result = await sdp_client.search_attributes(
52        [BT_L2CAP_PROTOCOL_ID],
53        [
54            SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
55            SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
56            SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
57        ],
58    )
59    print(color('==================================', 'blue'))
60    print(color('RFCOMM Services:', 'yellow'))
61    # pylint: disable-next=too-many-nested-blocks
62    for attribute_list in search_result:
63        # Look for the RFCOMM Channel number
64        protocol_descriptor_list = ServiceAttribute.find_attribute_in_list(
65            attribute_list, SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID
66        )
67        if protocol_descriptor_list:
68            for protocol_descriptor in protocol_descriptor_list.value:
69                if len(protocol_descriptor.value) >= 2:
70                    if protocol_descriptor.value[0].value == BT_RFCOMM_PROTOCOL_ID:
71                        print(color('SERVICE:', 'green'))
72                        print(
73                            color('  RFCOMM Channel:', 'cyan'),
74                            protocol_descriptor.value[1].value,
75                        )
76
77                        # List profiles
78                        bluetooth_profile_descriptor_list = (
79                            ServiceAttribute.find_attribute_in_list(
80                                attribute_list,
81                                SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
82                            )
83                        )
84                        if bluetooth_profile_descriptor_list:
85                            if bluetooth_profile_descriptor_list.value:
86                                if (
87                                    bluetooth_profile_descriptor_list.value[0].type
88                                    == DataElement.SEQUENCE
89                                ):
90                                    bluetooth_profile_descriptors = (
91                                        bluetooth_profile_descriptor_list.value
92                                    )
93                                else:
94                                    # Sometimes, instead of a list of lists, we just
95                                    # find a list. Fix that
96                                    bluetooth_profile_descriptors = [
97                                        bluetooth_profile_descriptor_list
98                                    ]
99
100                                print(color('  Profiles:', 'green'))
101                                for (
102                                    bluetooth_profile_descriptor
103                                ) in bluetooth_profile_descriptors:
104                                    version_major = (
105                                        bluetooth_profile_descriptor.value[1].value >> 8
106                                    )
107                                    version_minor = (
108                                        bluetooth_profile_descriptor.value[1].value
109                                        & 0xFF
110                                    )
111                                    print(
112                                        '    '
113                                        f'{bluetooth_profile_descriptor.value[0].value}'
114                                        f' - version {version_major}.{version_minor}'
115                                    )
116
117                        # List service classes
118                        service_class_id_list = ServiceAttribute.find_attribute_in_list(
119                            attribute_list, SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID
120                        )
121                        if service_class_id_list:
122                            if service_class_id_list.value:
123                                print(color('  Service Classes:', 'green'))
124                                for service_class_id in service_class_id_list.value:
125                                    print('   ', service_class_id.value)
126
127    await sdp_client.disconnect()
128
129
130# -----------------------------------------------------------------------------
131class TcpServerProtocol(asyncio.Protocol):
132    def __init__(self, rfcomm_session):
133        self.rfcomm_session = rfcomm_session
134        self.transport = None
135
136    def connection_made(self, transport):
137        peer_name = transport.get_extra_info('peer_name')
138        print(f'<<< TCP Server: connection from {peer_name}')
139        self.transport = transport
140        self.rfcomm_session.sink = self.rfcomm_data_received
141
142    def rfcomm_data_received(self, data):
143        print(f'<<< RFCOMM Data: {data.hex()}')
144        if self.transport:
145            self.transport.write(data)
146        else:
147            print('!!! no TCP connection, dropping data')
148
149    def data_received(self, data):
150        print(f'<<< TCP Server: data received: {len(data)} bytes - {data.hex()}')
151        self.rfcomm_session.write(data)
152
153
154# -----------------------------------------------------------------------------
155async def tcp_server(tcp_port, rfcomm_session):
156    print(f'$$$ Starting TCP server on port {tcp_port}')
157
158    server = await asyncio.get_running_loop().create_server(
159        lambda: TcpServerProtocol(rfcomm_session), '127.0.0.1', tcp_port
160    )
161    await asyncio.get_running_loop().create_future()
162
163    async with server:
164        await server.serve_forever()
165
166
167# -----------------------------------------------------------------------------
168async def main() -> None:
169    if len(sys.argv) < 5:
170        print(
171            'Usage: run_rfcomm_client.py <device-config> <transport-spec> '
172            '<bluetooth-address> <channel>|discover [tcp-port]'
173        )
174        print(
175            '  specifying a channel number, or "discover" to list all RFCOMM channels'
176        )
177        print('example: run_rfcomm_client.py classic1.json usb:0 E1:CA:72:48:C4:E8 8')
178        return
179
180    print('<<< connecting to HCI...')
181    async with await open_transport_or_link(sys.argv[2]) as hci_transport:
182        print('<<< connected')
183
184        # Create a device
185        device = Device.from_config_file_with_hci(
186            sys.argv[1], hci_transport.source, hci_transport.sink
187        )
188        device.classic_enabled = True
189        await device.power_on()
190
191        # Connect to a peer
192        target_address = sys.argv[3]
193        print(f'=== Connecting to {target_address}...')
194        connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT)
195        print(f'=== Connected to {connection.peer_address}!')
196
197        channel_str = sys.argv[4]
198        if channel_str == 'discover':
199            await list_rfcomm_channels(connection)
200            return
201
202        # Request authentication
203        print('*** Authenticating...')
204        await connection.authenticate()
205        print('*** Authenticated')
206
207        # Enable encryption
208        print('*** Enabling encryption...')
209        await connection.encrypt()
210        print('*** Encryption on')
211
212        # Create a client and start it
213        print('@@@ Starting RFCOMM client...')
214        rfcomm_client = Client(connection)
215        rfcomm_mux = await rfcomm_client.start()
216        print('@@@ Started')
217
218        channel = int(channel_str)
219        print(f'### Opening session for channel {channel}...')
220        try:
221            session = await rfcomm_mux.open_dlc(channel)
222            print('### Session open', session)
223        except bumble.core.ConnectionError as error:
224            print(f'### Session open failed: {error}')
225            await rfcomm_mux.disconnect()
226            print('@@@ Disconnected from RFCOMM server')
227            return
228
229        if len(sys.argv) == 6:
230            # A TCP port was specified, start listening
231            tcp_port = int(sys.argv[5])
232            asyncio.create_task(tcp_server(tcp_port, session))
233
234        await hci_transport.source.wait_for_termination()
235
236
237# -----------------------------------------------------------------------------
238logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
239asyncio.run(main())
240