1# Copyright 2024 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import asyncio 16import logging 17 18from pairing.br_edr.test_base import BREDRPairTestBase 19 20from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices 21from avatar.aio import asynchronous 22from bumble.colors import color 23from bumble.core import BT_L2CAP_PROTOCOL_ID, BT_RFCOMM_PROTOCOL_ID, UUID, ConnectionError, ProtocolError 24from bumble.hci import Address 25from bumble.l2cap import ClassicChannelSpec 26from bumble.rfcomm import Client 27 28from bumble.sdp import ( 29 SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, 30 SDP_ALL_ATTRIBUTES_RANGE, 31 SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID, 32 SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, 33 SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, 34 SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, 35 Client as SDPClient, 36) 37 38from mobly.asserts import assert_equal, assert_is_not_none, assert_raises, fail 39from pandora_experimental.rfcomm_grpc_aio import RFCOMM as AioRFCOMM 40from pandora.security_pb2 import PairingEventAnswer 41 42class ServiceAccessTempBondingTest(BREDRPairTestBase): # type: ignore[misc] 43 """ 44 This test verifies that access to services (on BR/EDR transport) from 45 a peer device via a temporary bonding is properly arbitrated. 46 """ 47 48 def _setup_devices(self): 49 self.ref.config.setdefault('classic_enabled', True) 50 self.ref.config.setdefault('classic_ssp_enabled', True) 51 self.ref.config.setdefault('classic_sc_enabled', True) 52 self.ref.config.setdefault( 53 'server', 54 { 55 'io_capability': 'no_output_no_input', 56 # create a temp bonding 57 'pairing_bonding_enable': False, 58 'pairing_mitm_enable': False, 59 'pairing_sc_enable': True, 60 }, 61 ) 62 63 async def start_service_access( 64 self, 65 initiator_acl_connection, 66 responder_acl_connection, 67 ): 68 ''' 69 this method is not used in this test class 70 ''' 71 pass 72 73 async def accept_pairing(self): 74 try: 75 bumble_ev = await anext(self.bumble_pairing_stream) 76 assert_equal(bumble_ev.method_variant(), 'just_works') 77 bumble_ev_answer = PairingEventAnswer(event=bumble_ev, confirm=True) 78 79 # accept the pairing from bumble 80 self.bumble_pairing_stream.send_nowait(bumble_ev_answer) 81 82 # pairing on android side is auto-accepted 83 # so no pairing event will be triggered here 84 # ignore it now 85 except: 86 fail('no exception should have happened during pairing') 87 88 @asynchronous 89 async def setup_test(self) -> None: 90 await asyncio.gather(self.dut.reset(), self.ref.reset()) 91 92 self.acl_initiator = self.ref 93 self.acl_responder = self.dut 94 self.pairing_initiator = self.ref 95 self.pairing_responder = self.dut 96 97 self.prepare_pairing() 98 99 # first initiate an ACL connection from bumble to android 100 bumble_res, android_res = await self.start_acl_connection() 101 102 pairing_task = asyncio.create_task( 103 self.start_pairing( 104 initiator_acl_connection=bumble_res.connection, 105 responder_acl_connection=android_res.connection, 106 ) 107 ) 108 await self.accept_pairing() 109 await asyncio.wait_for(pairing_task, timeout=10.0) 110 111 android_addr = Address.from_string_for_transport(str(self.dut.address), Address.PUBLIC_DEVICE_ADDRESS) 112 self.bumble_raw_acl_connection = self.ref.device.find_connection_by_bd_addr(android_addr) 113 114 @asynchronous 115 async def test_access_sdp_service(self): 116 sdp_psm = 0x0001 117 sdp_channel = self.bumble_raw_acl_connection.create_l2cap_channel(spec=ClassicChannelSpec(psm=sdp_psm)) 118 try: 119 _ = await sdp_channel 120 except: 121 fail("access to SDP service should be allowed") 122 123 @asynchronous 124 async def test_access_rfcomm_service(self): 125 rfc_psm = 0x0003 126 rfcomm_channel = self.bumble_raw_acl_connection.create_l2cap_channel(spec=ClassicChannelSpec(psm=rfc_psm)) 127 try: 128 _ = await rfcomm_channel 129 except: 130 fail("access to RFCOMM service should be allowed") 131 132 @asynchronous 133 async def test_access_rfcomm_mx_secure_service(self): 134 rfcomm_client = Client(self.bumble_raw_acl_connection) 135 rfcomm_mux = await rfcomm_client.start() 136 137 # hfp rfcomm mx service 138 # it is a secure service exposed in the layer of rfcomm 139 # access should be blocked 140 hfp_rfcomm_chan = 0x0002 141 with assert_raises(ConnectionError): 142 _ = await rfcomm_mux.open_dlc(hfp_rfcomm_chan) 143 144 def _parse_rfcomm_channel_from_sdp_service_attributes(self, attributes): 145 ''' 146 The SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID attribute of an insecure 147 rfcomm service record should look like this 148 id=SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, 149 value=SEQUENCE([ 150 SEQUENCE([UUID(UUID-16:0100 (L2CAP))]), 151 SEQUENCE([ 152 UUID(UUID-16:0003 (RFCOMM)), 153 UNSIGNED_INTEGER(7#1)]) 154 ]) 155 ''' 156 157 for attribute in attributes: 158 print(f"attribute: {attribute.to_string(with_colors=True)}") 159 if attribute.id == SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID and len(attribute.value.value) >= 2: 160 proto0 = attribute.value.value[0] 161 proto1 = attribute.value.value[1] 162 163 if proto0.value[0].value == BT_L2CAP_PROTOCOL_ID and proto1.value[0].value == BT_RFCOMM_PROTOCOL_ID: 164 return proto1.value[1].value 165 166 return None 167 168 async def _lookup_rfcomm_channel_with_sdp(self, uuid): 169 sdp_client = SDPClient(self.bumble_raw_acl_connection) 170 await sdp_client.connect() 171 172 service_record_handles = await sdp_client.search_services([UUID(uuid)]) 173 174 if len(service_record_handles) < 1: 175 await sdp_client.disconnect() 176 raise Exception(color(f'service not found on peer device!!!!', 'red')) 177 178 ret = None 179 for service_record_handle in service_record_handles: 180 attributes = await sdp_client.get_attributes(service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE]) 181 182 print(color(f'SERVICE {service_record_handle:04X} attributes:', 'yellow')) 183 ret = self._parse_rfcomm_channel_from_sdp_service_attributes(attributes) 184 if ret is None: 185 continue 186 else: 187 break 188 189 assert_is_not_none(ret) 190 await sdp_client.disconnect() 191 return ret 192 193 @asynchronous 194 async def test_access_rfcomm_mx_insecure_service(self): 195 uuid = "F6FB4732-A802-487D-A9FA-9664D5C91F13" 196 name = "test_rfcomm_server" 197 198 # StartServer implementation on Android uses 199 # listenUsingInsecureRfcommWithServiceRecord 200 dut_rfcomm = AioRFCOMM(self.dut.aio.channel) 201 _ = dut_rfcomm.StartServer(name=name, uuid=uuid) 202 203 rfc_channel = await self._lookup_rfcomm_channel_with_sdp(uuid) 204 rfcomm_client = Client(self.bumble_raw_acl_connection) 205 rfcomm_mux = await rfcomm_client.start() 206 207 try: 208 _ = await rfcomm_mux.open_dlc(rfc_channel) 209 except: 210 fail("access to insecure rfcomm service should be allowed") 211 212 @asynchronous 213 async def test_access_hid_control_service(self): 214 # HID control service (secure) 215 # should be blocked 216 with assert_raises(ProtocolError): 217 hid_control_psm = 0x0011 218 connector_hid_control = self.bumble_raw_acl_connection.create_l2cap_channel( 219 spec=ClassicChannelSpec(psm=hid_control_psm) 220 ) 221 _ = await connector_hid_control 222 223 @asynchronous 224 async def test_access_hid_interrupt_service(self): 225 # HID interrupt service (secure) 226 # should be blocked 227 with assert_raises(ProtocolError): 228 hid_interrupt_psm = 0x0013 229 connector_hid_interrupt = self.bumble_raw_acl_connection.create_l2cap_channel( 230 spec=ClassicChannelSpec(psm=hid_interrupt_psm) 231 ) 232 _ = await connector_hid_interrupt 233