1# Copyright 2024 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import asyncio
16import logging
17
18from pairing.br_edr.test_base import BREDRPairTestBase
19
20from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices
21from avatar.aio import asynchronous
22from bumble.colors import color
23from bumble.core import BT_L2CAP_PROTOCOL_ID, BT_RFCOMM_PROTOCOL_ID, UUID, ConnectionError, ProtocolError
24from bumble.hci import Address
25from bumble.l2cap import ClassicChannelSpec
26from bumble.rfcomm import Client
27
28from bumble.sdp import (
29    SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
30    SDP_ALL_ATTRIBUTES_RANGE,
31    SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID,
32    SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
33    SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
34    SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
35    Client as SDPClient,
36)
37
38from mobly.asserts import assert_equal, assert_is_not_none, assert_raises, fail
39from pandora_experimental.rfcomm_grpc_aio import RFCOMM as AioRFCOMM
40from pandora.security_pb2 import PairingEventAnswer
41
42class ServiceAccessTempBondingTest(BREDRPairTestBase):  # type: ignore[misc]
43    """
44    This test verifies that access to services (on BR/EDR transport) from
45    a peer device via a temporary bonding is properly arbitrated.
46    """
47
48    def _setup_devices(self):
49        self.ref.config.setdefault('classic_enabled', True)
50        self.ref.config.setdefault('classic_ssp_enabled', True)
51        self.ref.config.setdefault('classic_sc_enabled', True)
52        self.ref.config.setdefault(
53            'server',
54            {
55                'io_capability': 'no_output_no_input',
56                # create a temp bonding
57                'pairing_bonding_enable': False,
58                'pairing_mitm_enable': False,
59                'pairing_sc_enable': True,
60            },
61        )
62
63    async def start_service_access(
64        self,
65        initiator_acl_connection,
66        responder_acl_connection,
67    ):
68        '''
69        this method is not used in this test class
70        '''
71        pass
72
73    async def accept_pairing(self):
74        try:
75            bumble_ev = await anext(self.bumble_pairing_stream)
76            assert_equal(bumble_ev.method_variant(), 'just_works')
77            bumble_ev_answer = PairingEventAnswer(event=bumble_ev, confirm=True)
78
79            # accept the pairing from bumble
80            self.bumble_pairing_stream.send_nowait(bumble_ev_answer)
81
82            # pairing on android side is auto-accepted
83            # so no pairing event will be triggered here
84            # ignore it now
85        except:
86            fail('no exception should have happened during pairing')
87
88    @asynchronous
89    async def setup_test(self) -> None:
90        await asyncio.gather(self.dut.reset(), self.ref.reset())
91
92        self.acl_initiator = self.ref
93        self.acl_responder = self.dut
94        self.pairing_initiator = self.ref
95        self.pairing_responder = self.dut
96
97        self.prepare_pairing()
98
99        # first initiate an ACL connection from bumble to android
100        bumble_res, android_res = await self.start_acl_connection()
101
102        pairing_task = asyncio.create_task(
103            self.start_pairing(
104                initiator_acl_connection=bumble_res.connection,
105                responder_acl_connection=android_res.connection,
106            )
107        )
108        await self.accept_pairing()
109        await asyncio.wait_for(pairing_task, timeout=10.0)
110
111        android_addr = Address.from_string_for_transport(str(self.dut.address), Address.PUBLIC_DEVICE_ADDRESS)
112        self.bumble_raw_acl_connection = self.ref.device.find_connection_by_bd_addr(android_addr)
113
114    @asynchronous
115    async def test_access_sdp_service(self):
116        sdp_psm = 0x0001
117        sdp_channel = self.bumble_raw_acl_connection.create_l2cap_channel(spec=ClassicChannelSpec(psm=sdp_psm))
118        try:
119            _ = await sdp_channel
120        except:
121            fail("access to SDP service should be allowed")
122
123    @asynchronous
124    async def test_access_rfcomm_service(self):
125        rfc_psm = 0x0003
126        rfcomm_channel = self.bumble_raw_acl_connection.create_l2cap_channel(spec=ClassicChannelSpec(psm=rfc_psm))
127        try:
128            _ = await rfcomm_channel
129        except:
130            fail("access to RFCOMM service should be allowed")
131
132    @asynchronous
133    async def test_access_rfcomm_mx_secure_service(self):
134        rfcomm_client = Client(self.bumble_raw_acl_connection)
135        rfcomm_mux = await rfcomm_client.start()
136
137        # hfp rfcomm mx service
138        # it is a secure service exposed in the layer of rfcomm
139        # access should be blocked
140        hfp_rfcomm_chan = 0x0002
141        with assert_raises(ConnectionError):
142            _ = await rfcomm_mux.open_dlc(hfp_rfcomm_chan)
143
144    def _parse_rfcomm_channel_from_sdp_service_attributes(self, attributes):
145        '''
146        The SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID attribute of an insecure
147        rfcomm service record should look like this
148        id=SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
149        value=SEQUENCE([
150                SEQUENCE([UUID(UUID-16:0100 (L2CAP))]),
151                SEQUENCE([
152                    UUID(UUID-16:0003 (RFCOMM)),
153                    UNSIGNED_INTEGER(7#1)])
154                ])
155        '''
156
157        for attribute in attributes:
158            print(f"attribute: {attribute.to_string(with_colors=True)}")
159            if attribute.id == SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID and len(attribute.value.value) >= 2:
160                proto0 = attribute.value.value[0]
161                proto1 = attribute.value.value[1]
162
163                if proto0.value[0].value == BT_L2CAP_PROTOCOL_ID and proto1.value[0].value == BT_RFCOMM_PROTOCOL_ID:
164                    return proto1.value[1].value
165
166        return None
167
168    async def _lookup_rfcomm_channel_with_sdp(self, uuid):
169        sdp_client = SDPClient(self.bumble_raw_acl_connection)
170        await sdp_client.connect()
171
172        service_record_handles = await sdp_client.search_services([UUID(uuid)])
173
174        if len(service_record_handles) < 1:
175            await sdp_client.disconnect()
176            raise Exception(color(f'service not found on peer device!!!!', 'red'))
177
178        ret = None
179        for service_record_handle in service_record_handles:
180            attributes = await sdp_client.get_attributes(service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE])
181
182            print(color(f'SERVICE {service_record_handle:04X} attributes:', 'yellow'))
183            ret = self._parse_rfcomm_channel_from_sdp_service_attributes(attributes)
184            if ret is None:
185                continue
186            else:
187                break
188
189        assert_is_not_none(ret)
190        await sdp_client.disconnect()
191        return ret
192
193    @asynchronous
194    async def test_access_rfcomm_mx_insecure_service(self):
195        uuid = "F6FB4732-A802-487D-A9FA-9664D5C91F13"
196        name = "test_rfcomm_server"
197
198        # StartServer implementation on Android uses
199        # listenUsingInsecureRfcommWithServiceRecord
200        dut_rfcomm = AioRFCOMM(self.dut.aio.channel)
201        _ = dut_rfcomm.StartServer(name=name, uuid=uuid)
202
203        rfc_channel = await self._lookup_rfcomm_channel_with_sdp(uuid)
204        rfcomm_client = Client(self.bumble_raw_acl_connection)
205        rfcomm_mux = await rfcomm_client.start()
206
207        try:
208            _ = await rfcomm_mux.open_dlc(rfc_channel)
209        except:
210            fail("access to insecure rfcomm service should be allowed")
211
212    @asynchronous
213    async def test_access_hid_control_service(self):
214        # HID control service (secure)
215        # should be blocked
216        with assert_raises(ProtocolError):
217            hid_control_psm = 0x0011
218            connector_hid_control = self.bumble_raw_acl_connection.create_l2cap_channel(
219                spec=ClassicChannelSpec(psm=hid_control_psm)
220            )
221            _ = await connector_hid_control
222
223    @asynchronous
224    async def test_access_hid_interrupt_service(self):
225        # HID interrupt service (secure)
226        # should be blocked
227        with assert_raises(ProtocolError):
228            hid_interrupt_psm = 0x0013
229            connector_hid_interrupt = self.bumble_raw_acl_connection.create_l2cap_channel(
230                spec=ClassicChannelSpec(psm=hid_interrupt_psm)
231            )
232            _ = await connector_hid_interrupt
233