1#!/usr/bin/env python3 2# 3# Copyright (C) 2016 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16""" 17Original file: 18 tools/test/connectivity/acts_tests/acts_contrib/test_utils/bt/bt_gatt_utils.py 19""" 20 21import logging 22import pprint 23from queue import Empty 24 25from blueberry.utils.bt_gatt_constants import GattCallbackError 26from blueberry.utils.bt_gatt_constants import GattCallbackString 27from blueberry.utils.bt_gatt_constants import GattCharacteristic 28from blueberry.utils.bt_gatt_constants import GattConnectionState 29from blueberry.utils.bt_gatt_constants import GattDescriptor 30from blueberry.utils.bt_gatt_constants import GattPhyMask 31from blueberry.utils.bt_gatt_constants import GattServiceType 32from blueberry.utils.bt_gatt_constants import GattTransport 33from blueberry.utils.bt_test_utils import BtTestUtilsError 34from mobly.controllers.android_device import AndroidDevice 35from mobly.controllers.android_device_lib.event_dispatcher import EventDispatcher 36from mobly.controllers.android_device_lib.sl4a_client import Sl4aClient 37 38default_timeout = 10 39log = logging 40 41 42class GattTestUtilsError(Exception): 43 pass 44 45 46def setup_gatt_connection(central: AndroidDevice, 47 mac_address, 48 autoconnect, 49 transport=GattTransport.TRANSPORT_AUTO, 50 opportunistic=False, 51 timeout_seconds=default_timeout): 52 gatt_callback = central.sl4a.gattCreateGattCallback() 53 log.info("Gatt Connect to mac address {}.".format(mac_address)) 54 bluetooth_gatt = central.sl4a.gattClientConnectGatt(gatt_callback, mac_address, autoconnect, transport, 55 opportunistic, GattPhyMask.PHY_LE_1M_MASK) 56 expected_event = GattCallbackString.GATT_CONN_CHANGE.format(gatt_callback) 57 try: 58 event = central.ed.pop_event(expected_event, timeout_seconds) 59 except Empty: 60 close_gatt_client(central, bluetooth_gatt) 61 raise GattTestUtilsError("Could not establish a connection to " 62 "peripheral. Expected event: {}".format(expected_event)) 63 logging.info("Got connection event {}".format(event)) 64 if event['data']['State'] != GattConnectionState.STATE_CONNECTED: 65 close_gatt_client(central, bluetooth_gatt) 66 raise GattTestUtilsError("Could not establish a connection to " 67 "peripheral. Event Details: {}".format(pprint.pformat(event))) 68 return bluetooth_gatt, gatt_callback 69 70 71def wait_for_gatt_connection(central: AndroidDevice, gatt_callback, bluetooth_gatt, timeout): 72 expected_event = GattCallbackString.GATT_CONN_CHANGE.format(gatt_callback) 73 try: 74 event = central.ed.pop_event(expected_event, timeout=timeout) 75 except Empty: 76 close_gatt_client(central, bluetooth_gatt) 77 raise GattTestUtilsError("Could not establish a connection to " 78 "peripheral. Expected event: {}".format(expected_event)) 79 if event['data']['State'] != GattConnectionState.STATE_CONNECTED: 80 close_gatt_client(central, bluetooth_gatt) 81 try: 82 central.sl4a.gattClientClose(bluetooth_gatt) 83 except Exception: 84 logging.debug("Failed to close gatt client.") 85 raise GattTestUtilsError("Could not establish a connection to " 86 "peripheral. Event Details: {}".format(pprint.pformat(event))) 87 88 89def close_gatt_client(central: AndroidDevice, bluetooth_gatt): 90 try: 91 central.sl4a.gattClientClose(bluetooth_gatt) 92 except Exception: 93 log.debug("Failed to close gatt client.") 94 95 96def disconnect_gatt_connection(central: AndroidDevice, bluetooth_gatt, gatt_callback): 97 central.sl4a.gattClientDisconnect(bluetooth_gatt) 98 wait_for_gatt_disconnect_event(central, gatt_callback) 99 return 100 101 102def wait_for_gatt_disconnect_event(central: AndroidDevice, gatt_callback): 103 expected_event = GattCallbackString.GATT_CONN_CHANGE.format(gatt_callback) 104 try: 105 event = central.ed.pop_event(expected_event, default_timeout) 106 except Empty: 107 raise GattTestUtilsError(GattCallbackError.GATT_CONN_CHANGE_ERR.format(expected_event)) 108 found_state = event['data']['State'] 109 expected_state = GattConnectionState.STATE_DISCONNECTED 110 if found_state != expected_state: 111 raise GattTestUtilsError("GATT connection state change expected {}, found {}".format( 112 expected_event, found_state)) 113 return 114 115 116def run_continuous_write_descriptor(cen_droid: Sl4aClient, 117 cen_ed: EventDispatcher, 118 per_droid: Sl4aClient, 119 per_ed: EventDispatcher, 120 gatt_server, 121 gatt_server_callback, 122 bluetooth_gatt, 123 services_count, 124 discovered_services_index, 125 number_of_iterations=100000): 126 log.info("Starting continuous write") 127 bt_device_id = 0 128 status = 1 129 offset = 1 130 test_value = [1, 2, 3, 4, 5, 6, 7] 131 test_value_return = [1, 2, 3] 132 for _ in range(number_of_iterations): 133 try: 134 for i in range(services_count): 135 characteristic_uuids = (cen_droid.gattClientGetDiscoveredCharacteristicUuids( 136 discovered_services_index, i)) 137 log.info(characteristic_uuids) 138 for characteristic in characteristic_uuids: 139 descriptor_uuids = (cen_droid.gattClientGetDiscoveredDescriptorUuids( 140 discovered_services_index, i, characteristic)) 141 log.info(descriptor_uuids) 142 for descriptor in descriptor_uuids: 143 cen_droid.gattClientDescriptorSetValue(bluetooth_gatt, discovered_services_index, i, 144 characteristic, descriptor, test_value) 145 cen_droid.gattClientWriteDescriptor(bluetooth_gatt, discovered_services_index, i, 146 characteristic, descriptor) 147 expected_event = \ 148 GattCallbackString.DESC_WRITE_REQ.format( 149 gatt_server_callback) 150 try: 151 event = per_ed.pop_event(expected_event, default_timeout) 152 except Empty: 153 log.error(GattCallbackError.DESC_WRITE_REQ_ERR.format(expected_event)) 154 return False 155 request_id = event['data']['requestId'] 156 found_value = event['data']['value'] 157 if found_value != test_value: 158 log.error("Values didn't match. Found: {}, Expected: " "{}".format(found_value, test_value)) 159 per_droid.gattServerSendResponse(gatt_server, bt_device_id, request_id, status, offset, 160 test_value_return) 161 expected_event = GattCallbackString.DESC_WRITE.format(bluetooth_gatt) 162 try: 163 cen_ed.pop_event(expected_event, default_timeout) 164 except Empty: 165 log.error(GattCallbackError.DESC_WRITE_ERR.format(expected_event)) 166 raise Exception("Thread ended prematurely.") 167 except Exception as err: 168 log.error("Continuing but found exception: {}".format(err)) 169 170 171def setup_characteristics_and_descriptors_read_write(droid: Sl4aClient): 172 characteristic_input = [ 173 { 174 'uuid': "aa7edd5a-4d1d-4f0e-883a-d145616a1630", 175 'property': GattCharacteristic.PROPERTY_WRITE | GattCharacteristic.PROPERTY_WRITE_NO_RESPONSE, 176 'permission': GattCharacteristic.PERMISSION_WRITE 177 }, 178 { 179 'uuid': "21c0a0bf-ad51-4a2d-8124-b74003e4e8c8", 180 'property': GattCharacteristic.PROPERTY_WRITE | GattCharacteristic.PROPERTY_READ, 181 'permission': GattCharacteristic.PERMISSION_READ 182 }, 183 { 184 'uuid': "6774191f-6ec3-4aa2-b8a8-cf830e41fda6", 185 'property': GattCharacteristic.PROPERTY_NOTIFY | GattCharacteristic.PROPERTY_READ, 186 'permission': GattCharacteristic.PERMISSION_READ 187 }, 188 ] 189 descriptor_input = [{ 190 'uuid': "aa7edd5a-4d1d-4f0e-883a-d145616a1630", 191 'property': GattDescriptor.PERMISSION_READ[0] | GattDescriptor.PERMISSION_WRITE[0], 192 }, { 193 'uuid': "76d5ed92-ca81-4edb-bb6b-9f019665fb32", 194 'property': GattDescriptor.PERMISSION_READ[0] | GattDescriptor.PERMISSION_WRITE[0], 195 }] 196 characteristic_list = setup_gatt_characteristics(droid, characteristic_input) 197 descriptor_list = setup_gatt_descriptors(droid, descriptor_input) 198 return characteristic_list, descriptor_list 199 200 201def setup_multiple_services(peripheral: AndroidDevice): 202 per_droid, per_ed = peripheral.sl4a, peripheral.sl4a.ed 203 gatt_server_callback = per_droid.gattServerCreateGattServerCallback() 204 gatt_server = per_droid.gattServerOpenGattServer(gatt_server_callback) 205 characteristic_list, descriptor_list = (setup_characteristics_and_descriptors_read_write(per_droid)) 206 per_droid.gattServerCharacteristicAddDescriptor(characteristic_list[1], descriptor_list[0]) 207 per_droid.gattServerCharacteristicAddDescriptor(characteristic_list[2], descriptor_list[1]) 208 gattService = per_droid.gattServerCreateService("00000000-0000-1000-8000-00805f9b34fb", 209 GattServiceType.SERVICE_TYPE_PRIMARY) 210 gattService2 = per_droid.gattServerCreateService("FFFFFFFF-0000-1000-8000-00805f9b34fb", 211 GattServiceType.SERVICE_TYPE_PRIMARY) 212 gattService3 = per_droid.gattServerCreateService("3846D7A0-69C8-11E4-BA00-0002A5D5C51B", 213 GattServiceType.SERVICE_TYPE_PRIMARY) 214 for characteristic in characteristic_list: 215 per_droid.gattServerAddCharacteristicToService(gattService, characteristic) 216 per_droid.gattServerAddService(gatt_server, gattService) 217 expected_event = GattCallbackString.SERV_ADDED.format(gatt_server_callback) 218 try: 219 per_ed.pop_event(expected_event, default_timeout) 220 except Empty: 221 peripheral.sl4a.gattServerClose(gatt_server) 222 raise GattTestUtilsError(GattCallbackError.SERV_ADDED_ERR.format(expected_event)) 223 for characteristic in characteristic_list: 224 per_droid.gattServerAddCharacteristicToService(gattService2, characteristic) 225 per_droid.gattServerAddService(gatt_server, gattService2) 226 try: 227 per_ed.pop_event(expected_event, default_timeout) 228 except Empty: 229 peripheral.sl4a.gattServerClose(gatt_server) 230 raise GattTestUtilsError(GattCallbackError.SERV_ADDED_ERR.format(expected_event)) 231 for characteristic in characteristic_list: 232 per_droid.gattServerAddCharacteristicToService(gattService3, characteristic) 233 per_droid.gattServerAddService(gatt_server, gattService3) 234 try: 235 per_ed.pop_event(expected_event, default_timeout) 236 except Empty: 237 peripheral.sl4a.gattServerClose(gatt_server) 238 raise GattTestUtilsError(GattCallbackError.SERV_ADDED_ERR.format(expected_event)) 239 return gatt_server_callback, gatt_server 240 241 242def setup_characteristics_and_descriptors_notify_read(droid: Sl4aClient): 243 characteristic_input = [ 244 { 245 'uuid': "aa7edd5a-4d1d-4f0e-883a-d145616a1630", 246 'property': GattCharacteristic.PROPERTY_WRITE | GattCharacteristic.PROPERTY_WRITE_NO_RESPONSE, 247 'permission': GattCharacteristic.PROPERTY_WRITE 248 }, 249 { 250 'uuid': "21c0a0bf-ad51-4a2d-8124-b74003e4e8c8", 251 'property': GattCharacteristic.PROPERTY_NOTIFY | GattCharacteristic.PROPERTY_READ, 252 'permission': GattCharacteristic.PERMISSION_READ 253 }, 254 { 255 'uuid': "6774191f-6ec3-4aa2-b8a8-cf830e41fda6", 256 'property': GattCharacteristic.PROPERTY_NOTIFY | GattCharacteristic.PROPERTY_READ, 257 'permission': GattCharacteristic.PERMISSION_READ 258 }, 259 ] 260 descriptor_input = [{ 261 'uuid': "aa7edd5a-4d1d-4f0e-883a-d145616a1630", 262 'property': GattDescriptor.PERMISSION_READ[0] | GattDescriptor.PERMISSION_WRITE[0], 263 }, { 264 'uuid': "76d5ed92-ca81-4edb-bb6b-9f019665fb32", 265 'property': GattDescriptor.PERMISSION_READ[0] | GattDescriptor.PERMISSION_WRITE[0], 266 }] 267 characteristic_list = setup_gatt_characteristics(droid, characteristic_input) 268 descriptor_list = setup_gatt_descriptors(droid, descriptor_input) 269 return characteristic_list, descriptor_list 270 271 272def setup_gatt_characteristics(droid: Sl4aClient, input): 273 characteristic_list = [] 274 for item in input: 275 index = droid.gattServerCreateBluetoothGattCharacteristic(item['uuid'], item['property'], item['permission']) 276 characteristic_list.append(index) 277 return characteristic_list 278 279 280def setup_gatt_descriptors(droid: Sl4aClient, input): 281 descriptor_list = [] 282 for item in input: 283 index = droid.gattServerCreateBluetoothGattDescriptor( 284 item['uuid'], 285 item['property'], 286 ) 287 descriptor_list.append(index) 288 log.info("setup descriptor list: {}".format(descriptor_list)) 289 return descriptor_list 290 291 292def setup_gatt_mtu(central: AndroidDevice, bluetooth_gatt, gatt_callback, mtu): 293 """utility function to set mtu for GATT connection. 294 295 Steps: 296 1. Request mtu change. 297 2. Check if the mtu is changed to the new value 298 299 Args: 300 central: test device for client to scan. 301 bluetooth_gatt: GATT object 302 mtu: new mtu value to be set 303 304 Returns: 305 If success, return True. 306 if fail, return False 307 """ 308 central.sl4a.gattClientRequestMtu(bluetooth_gatt, mtu) 309 expected_event = GattCallbackString.MTU_CHANGED.format(gatt_callback) 310 try: 311 mtu_event = central.ed.pop_event(expected_event, default_timeout) 312 mtu_size_found = mtu_event['data']['MTU'] 313 if mtu_size_found != mtu: 314 log.error("MTU size found: {}, expected: {}".format(mtu_size_found, mtu)) 315 return False 316 except Empty: 317 log.error(GattCallbackError.MTU_CHANGED_ERR.format(expected_event)) 318 return False 319 return True 320 321 322def log_gatt_server_uuids(central: AndroidDevice, discovered_services_index, bluetooth_gatt=None): 323 services_count = central.sl4a.gattClientGetDiscoveredServicesCount(discovered_services_index) 324 for i in range(services_count): 325 service = central.sl4a.gattClientGetDiscoveredServiceUuid(discovered_services_index, i) 326 log.info("Discovered service uuid {}".format(service)) 327 characteristic_uuids = (central.sl4a.gattClientGetDiscoveredCharacteristicUuids(discovered_services_index, i)) 328 for j in range(len(characteristic_uuids)): 329 descriptor_uuids = (central.sl4a.gattClientGetDiscoveredDescriptorUuidsByIndex( 330 discovered_services_index, i, j)) 331 if bluetooth_gatt: 332 char_inst_id = central.sl4a.gattClientGetCharacteristicInstanceId(bluetooth_gatt, 333 discovered_services_index, i, j) 334 log.info("Discovered characteristic handle uuid: {} {}".format( 335 hex(char_inst_id), characteristic_uuids[j])) 336 for k in range(len(descriptor_uuids)): 337 desc_inst_id = central.sl4a.gattClientGetDescriptorInstanceId(bluetooth_gatt, 338 discovered_services_index, i, j, k) 339 log.info("Discovered descriptor handle uuid: {} {}".format(hex(desc_inst_id), descriptor_uuids[k])) 340 else: 341 log.info("Discovered characteristic uuid: {}".format(characteristic_uuids[j])) 342 for k in range(len(descriptor_uuids)): 343 log.info("Discovered descriptor uuid {}".format(descriptor_uuids[k])) 344