1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import logging
19import asyncio
20import os
21import sys
22
23from bumble import hci, transport
24from bumble.bridge import HCI_Bridge
25
26# -----------------------------------------------------------------------------
27# Logging
28# -----------------------------------------------------------------------------
29logger = logging.getLogger(__name__)
30
31
32# -----------------------------------------------------------------------------
33# Main
34# -----------------------------------------------------------------------------
35async def async_main():
36    if len(sys.argv) < 3:
37        print(
38            'Usage: hci_bridge.py <host-transport-spec> <controller-transport-spec> '
39            '[command-short-circuit-list]'
40        )
41        print(
42            'example: python hci_bridge.py udp:0.0.0.0:9000,127.0.0.1:9001 '
43            'serial:/dev/tty.usbmodem0006839912171,1000000 '
44            '0x3f:0x0070,0x3f:0x0074,0x3f:0x0077,0x3f:0x0078'
45        )
46        return
47
48    print('>>> connecting to HCI...')
49    async with await transport.open_transport_or_link(sys.argv[1]) as (
50        hci_host_source,
51        hci_host_sink,
52    ):
53        print('>>> connected')
54
55        print('>>> connecting to HCI...')
56        async with await transport.open_transport_or_link(sys.argv[2]) as (
57            hci_controller_source,
58            hci_controller_sink,
59        ):
60            print('>>> connected')
61
62            command_short_circuits = []
63            if len(sys.argv) >= 4:
64                for op_code_str in sys.argv[3].split(','):
65                    if ':' in op_code_str:
66                        ogf, ocf = op_code_str.split(':')
67                        command_short_circuits.append(
68                            hci.hci_command_op_code(int(ogf, 16), int(ocf, 16))
69                        )
70                    else:
71                        command_short_circuits.append(int(op_code_str, 16))
72
73            def host_to_controller_filter(hci_packet):
74                if (
75                    hci_packet.hci_packet_type == hci.HCI_COMMAND_PACKET
76                    and hci_packet.op_code in command_short_circuits
77                ):
78                    # Respond with a success response
79                    logger.debug('short-circuiting packet')
80                    response = hci.HCI_Command_Complete_Event(
81                        num_hci_command_packets=1,
82                        command_opcode=hci_packet.op_code,
83                        return_parameters=bytes([hci.HCI_SUCCESS]),
84                    )
85                    # Return a packet with 'respond to sender' set to True
86                    return (response.to_bytes(), True)
87
88                return None
89
90            _ = HCI_Bridge(
91                hci_host_source,
92                hci_host_sink,
93                hci_controller_source,
94                hci_controller_sink,
95                host_to_controller_filter,
96                None,
97            )
98            await asyncio.get_running_loop().create_future()
99
100
101# -----------------------------------------------------------------------------
102def main():
103    logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
104    asyncio.run(async_main())
105
106
107# -----------------------------------------------------------------------------
108if __name__ == '__main__':
109    main()
110