1# Copyright 2024 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import asyncio
16
17from avatar import asynchronous
18
19from mobly.asserts import fail
20
21from pairing.br_edr.test_base import BREDRPairTestBase
22
23class BREDRSSPPairTestBase(BREDRPairTestBase):
24
25    @asynchronous
26    async def test_general_pairing(self) -> None:
27
28        self.acl_initiator = self.ref
29        self.acl_responder = self.dut
30        self.pairing_initiator = self.ref
31        self.pairing_responder = self.dut
32        self.service_initiator = self.ref
33        self.service_responder = self.dut
34
35        self.prepare_pairing()
36
37        # first initiate an ACL connection from bumble to android
38        bumble_res, android_res = await self.start_acl_connection()
39
40        service_access_task = asyncio.create_task(
41            self.start_service_access(bumble_res.connection, android_res.connection)
42        )
43
44        # at this point, android expects bumble to initiate pairing to compete the
45        # service access
46        pairing_task = asyncio.create_task(self.start_pairing(bumble_res.connection, android_res.connection))
47
48        await self.accept_pairing()
49
50        try:
51            await asyncio.wait_for(service_access_task, timeout=10.0)
52            await asyncio.wait_for(pairing_task, timeout=10.0)
53        except:
54            fail("service access and pairing should have succeeded")
55