1# Copyright 2023 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import asyncio 16 17from avatar import BumblePandoraDevice 18from avatar import PandoraDevice 19from bumble.device import Connection as BumbleConnection 20from mobly.asserts import assert_equal # type: ignore 21from mobly.asserts import assert_is_not_none # type: ignore 22from pandora._utils import AioStream 23from pandora.host_pb2 import AdvertiseResponse 24from pandora.host_pb2 import Connection 25from pandora.host_pb2 import OwnAddressType 26from pandora.host_pb2 import ScanningResponse 27from typing import Optional, Tuple 28 29 30def get_raw_connection_handle(device: PandoraDevice, connection: Connection) -> int: 31 assert isinstance(device, BumblePandoraDevice) 32 return int.from_bytes(connection.cookie.value, 'big') 33 34 35def get_raw_connection(device: PandoraDevice, connection: Connection) -> Optional[BumbleConnection]: 36 assert isinstance(device, BumblePandoraDevice) 37 return device.device.lookup_connection(get_raw_connection_handle(device, connection)) 38 39 40async def connect(initiator: PandoraDevice, acceptor: PandoraDevice) -> Tuple[Connection, Connection]: 41 init_res, wait_res = await asyncio.gather( 42 initiator.aio.host.Connect(address=acceptor.address), 43 acceptor.aio.host.WaitConnection(address=initiator.address), 44 ) 45 assert_equal(init_res.result_variant(), 'connection') 46 assert_equal(wait_res.result_variant(), 'connection') 47 assert init_res.connection is not None and wait_res.connection is not None 48 return init_res.connection, wait_res.connection 49 50 51async def connect_le( 52 initiator: PandoraDevice, 53 acceptor: AioStream[AdvertiseResponse], 54 scan: ScanningResponse, 55 own_address_type: OwnAddressType, 56 cancel_advertisement: bool = True, 57) -> Tuple[Connection, Connection]: 58 (init_res, wait_res) = await asyncio.gather( 59 initiator.aio.host.ConnectLE(own_address_type=own_address_type, **scan.address_asdict()), 60 anext(aiter(acceptor)), # pytype: disable=name-error 61 ) 62 if cancel_advertisement: 63 acceptor.cancel() 64 assert_equal(init_res.result_variant(), 'connection') 65 assert_is_not_none(init_res.connection) 66 assert init_res.connection 67 return init_res.connection, wait_res.connection 68