1# Copyright 2023 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import asyncio
16
17from avatar import BumblePandoraDevice
18from avatar import PandoraDevice
19from bumble.device import Connection as BumbleConnection
20from mobly.asserts import assert_equal  # type: ignore
21from mobly.asserts import assert_is_not_none  # type: ignore
22from pandora._utils import AioStream
23from pandora.host_pb2 import AdvertiseResponse
24from pandora.host_pb2 import Connection
25from pandora.host_pb2 import OwnAddressType
26from pandora.host_pb2 import ScanningResponse
27from typing import Optional, Tuple
28
29
30def get_raw_connection_handle(device: PandoraDevice, connection: Connection) -> int:
31    assert isinstance(device, BumblePandoraDevice)
32    return int.from_bytes(connection.cookie.value, 'big')
33
34
35def get_raw_connection(device: PandoraDevice, connection: Connection) -> Optional[BumbleConnection]:
36    assert isinstance(device, BumblePandoraDevice)
37    return device.device.lookup_connection(get_raw_connection_handle(device, connection))
38
39
40async def connect(initiator: PandoraDevice, acceptor: PandoraDevice) -> Tuple[Connection, Connection]:
41    init_res, wait_res = await asyncio.gather(
42        initiator.aio.host.Connect(address=acceptor.address),
43        acceptor.aio.host.WaitConnection(address=initiator.address),
44    )
45    assert_equal(init_res.result_variant(), 'connection')
46    assert_equal(wait_res.result_variant(), 'connection')
47    assert init_res.connection is not None and wait_res.connection is not None
48    return init_res.connection, wait_res.connection
49
50
51async def connect_le(
52    initiator: PandoraDevice,
53    acceptor: AioStream[AdvertiseResponse],
54    scan: ScanningResponse,
55    own_address_type: OwnAddressType,
56    cancel_advertisement: bool = True,
57) -> Tuple[Connection, Connection]:
58    (init_res, wait_res) = await asyncio.gather(
59        initiator.aio.host.ConnectLE(own_address_type=own_address_type, **scan.address_asdict()),
60        anext(aiter(acceptor)),  # pytype: disable=name-error
61    )
62    if cancel_advertisement:
63        acceptor.cancel()
64    assert_equal(init_res.result_variant(), 'connection')
65    assert_is_not_none(init_res.connection)
66    assert init_res.connection
67    return init_res.connection, wait_res.connection
68