1#!/usr/bin/env python3
2#
3# Copyright (C) 2016 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""
17Original file:
18    tools/test/connectivity/acts_tests/acts_contrib/test_utils/bt/bt_gatt_utils.py
19"""
20
21import logging
22import pprint
23from queue import Empty
24
25from blueberry.utils.bt_gatt_constants import GattCallbackError
26from blueberry.utils.bt_gatt_constants import GattCallbackString
27from blueberry.utils.bt_gatt_constants import GattCharacteristic
28from blueberry.utils.bt_gatt_constants import GattConnectionState
29from blueberry.utils.bt_gatt_constants import GattDescriptor
30from blueberry.utils.bt_gatt_constants import GattPhyMask
31from blueberry.utils.bt_gatt_constants import GattServiceType
32from blueberry.utils.bt_gatt_constants import GattTransport
33from blueberry.utils.bt_test_utils import BtTestUtilsError
34from mobly.controllers.android_device import AndroidDevice
35from mobly.controllers.android_device_lib.event_dispatcher import EventDispatcher
36from mobly.controllers.android_device_lib.sl4a_client import Sl4aClient
37
38default_timeout = 10
39log = logging
40
41
42class GattTestUtilsError(Exception):
43    pass
44
45
46def setup_gatt_connection(central: AndroidDevice,
47                          mac_address,
48                          autoconnect,
49                          transport=GattTransport.TRANSPORT_AUTO,
50                          opportunistic=False,
51                          timeout_seconds=default_timeout):
52    gatt_callback = central.sl4a.gattCreateGattCallback()
53    log.info("Gatt Connect to mac address {}.".format(mac_address))
54    bluetooth_gatt = central.sl4a.gattClientConnectGatt(gatt_callback, mac_address, autoconnect, transport,
55                                                        opportunistic, GattPhyMask.PHY_LE_1M_MASK)
56    expected_event = GattCallbackString.GATT_CONN_CHANGE.format(gatt_callback)
57    try:
58        event = central.ed.pop_event(expected_event, timeout_seconds)
59    except Empty:
60        close_gatt_client(central, bluetooth_gatt)
61        raise GattTestUtilsError("Could not establish a connection to "
62                                 "peripheral. Expected event: {}".format(expected_event))
63    logging.info("Got connection event {}".format(event))
64    if event['data']['State'] != GattConnectionState.STATE_CONNECTED:
65        close_gatt_client(central, bluetooth_gatt)
66        raise GattTestUtilsError("Could not establish a connection to "
67                                 "peripheral. Event Details: {}".format(pprint.pformat(event)))
68    return bluetooth_gatt, gatt_callback
69
70
71def wait_for_gatt_connection(central: AndroidDevice, gatt_callback, bluetooth_gatt, timeout):
72    expected_event = GattCallbackString.GATT_CONN_CHANGE.format(gatt_callback)
73    try:
74        event = central.ed.pop_event(expected_event, timeout=timeout)
75    except Empty:
76        close_gatt_client(central, bluetooth_gatt)
77        raise GattTestUtilsError("Could not establish a connection to "
78                                 "peripheral. Expected event: {}".format(expected_event))
79    if event['data']['State'] != GattConnectionState.STATE_CONNECTED:
80        close_gatt_client(central, bluetooth_gatt)
81        try:
82            central.sl4a.gattClientClose(bluetooth_gatt)
83        except Exception:
84            logging.debug("Failed to close gatt client.")
85        raise GattTestUtilsError("Could not establish a connection to "
86                                 "peripheral. Event Details: {}".format(pprint.pformat(event)))
87
88
89def close_gatt_client(central: AndroidDevice, bluetooth_gatt):
90    try:
91        central.sl4a.gattClientClose(bluetooth_gatt)
92    except Exception:
93        log.debug("Failed to close gatt client.")
94
95
96def disconnect_gatt_connection(central: AndroidDevice, bluetooth_gatt, gatt_callback):
97    central.sl4a.gattClientDisconnect(bluetooth_gatt)
98    wait_for_gatt_disconnect_event(central, gatt_callback)
99    return
100
101
102def wait_for_gatt_disconnect_event(central: AndroidDevice, gatt_callback):
103    expected_event = GattCallbackString.GATT_CONN_CHANGE.format(gatt_callback)
104    try:
105        event = central.ed.pop_event(expected_event, default_timeout)
106    except Empty:
107        raise GattTestUtilsError(GattCallbackError.GATT_CONN_CHANGE_ERR.format(expected_event))
108    found_state = event['data']['State']
109    expected_state = GattConnectionState.STATE_DISCONNECTED
110    if found_state != expected_state:
111        raise GattTestUtilsError("GATT connection state change expected {}, found {}".format(
112            expected_event, found_state))
113    return
114
115
116def run_continuous_write_descriptor(cen_droid: Sl4aClient,
117                                    cen_ed: EventDispatcher,
118                                    per_droid: Sl4aClient,
119                                    per_ed: EventDispatcher,
120                                    gatt_server,
121                                    gatt_server_callback,
122                                    bluetooth_gatt,
123                                    services_count,
124                                    discovered_services_index,
125                                    number_of_iterations=100000):
126    log.info("Starting continuous write")
127    bt_device_id = 0
128    status = 1
129    offset = 1
130    test_value = [1, 2, 3, 4, 5, 6, 7]
131    test_value_return = [1, 2, 3]
132    for _ in range(number_of_iterations):
133        try:
134            for i in range(services_count):
135                characteristic_uuids = (cen_droid.gattClientGetDiscoveredCharacteristicUuids(
136                    discovered_services_index, i))
137                log.info(characteristic_uuids)
138                for characteristic in characteristic_uuids:
139                    descriptor_uuids = (cen_droid.gattClientGetDiscoveredDescriptorUuids(
140                        discovered_services_index, i, characteristic))
141                    log.info(descriptor_uuids)
142                    for descriptor in descriptor_uuids:
143                        cen_droid.gattClientDescriptorSetValue(bluetooth_gatt, discovered_services_index, i,
144                                                               characteristic, descriptor, test_value)
145                        cen_droid.gattClientWriteDescriptor(bluetooth_gatt, discovered_services_index, i,
146                                                            characteristic, descriptor)
147                        expected_event = \
148                            GattCallbackString.DESC_WRITE_REQ.format(
149                                gatt_server_callback)
150                        try:
151                            event = per_ed.pop_event(expected_event, default_timeout)
152                        except Empty:
153                            log.error(GattCallbackError.DESC_WRITE_REQ_ERR.format(expected_event))
154                            return False
155                        request_id = event['data']['requestId']
156                        found_value = event['data']['value']
157                        if found_value != test_value:
158                            log.error("Values didn't match. Found: {}, Expected: " "{}".format(found_value, test_value))
159                        per_droid.gattServerSendResponse(gatt_server, bt_device_id, request_id, status, offset,
160                                                         test_value_return)
161                        expected_event = GattCallbackString.DESC_WRITE.format(bluetooth_gatt)
162                        try:
163                            cen_ed.pop_event(expected_event, default_timeout)
164                        except Empty:
165                            log.error(GattCallbackError.DESC_WRITE_ERR.format(expected_event))
166                            raise Exception("Thread ended prematurely.")
167        except Exception as err:
168            log.error("Continuing but found exception: {}".format(err))
169
170
171def setup_characteristics_and_descriptors_read_write(droid: Sl4aClient):
172    characteristic_input = [
173        {
174            'uuid': "aa7edd5a-4d1d-4f0e-883a-d145616a1630",
175            'property': GattCharacteristic.PROPERTY_WRITE | GattCharacteristic.PROPERTY_WRITE_NO_RESPONSE,
176            'permission': GattCharacteristic.PERMISSION_WRITE
177        },
178        {
179            'uuid': "21c0a0bf-ad51-4a2d-8124-b74003e4e8c8",
180            'property': GattCharacteristic.PROPERTY_WRITE | GattCharacteristic.PROPERTY_READ,
181            'permission': GattCharacteristic.PERMISSION_READ
182        },
183        {
184            'uuid': "6774191f-6ec3-4aa2-b8a8-cf830e41fda6",
185            'property': GattCharacteristic.PROPERTY_NOTIFY | GattCharacteristic.PROPERTY_READ,
186            'permission': GattCharacteristic.PERMISSION_READ
187        },
188    ]
189    descriptor_input = [{
190        'uuid': "aa7edd5a-4d1d-4f0e-883a-d145616a1630",
191        'property': GattDescriptor.PERMISSION_READ[0] | GattDescriptor.PERMISSION_WRITE[0],
192    }, {
193        'uuid': "76d5ed92-ca81-4edb-bb6b-9f019665fb32",
194        'property': GattDescriptor.PERMISSION_READ[0] | GattDescriptor.PERMISSION_WRITE[0],
195    }]
196    characteristic_list = setup_gatt_characteristics(droid, characteristic_input)
197    descriptor_list = setup_gatt_descriptors(droid, descriptor_input)
198    return characteristic_list, descriptor_list
199
200
201def setup_multiple_services(peripheral: AndroidDevice):
202    per_droid, per_ed = peripheral.sl4a, peripheral.sl4a.ed
203    gatt_server_callback = per_droid.gattServerCreateGattServerCallback()
204    gatt_server = per_droid.gattServerOpenGattServer(gatt_server_callback)
205    characteristic_list, descriptor_list = (setup_characteristics_and_descriptors_read_write(per_droid))
206    per_droid.gattServerCharacteristicAddDescriptor(characteristic_list[1], descriptor_list[0])
207    per_droid.gattServerCharacteristicAddDescriptor(characteristic_list[2], descriptor_list[1])
208    gattService = per_droid.gattServerCreateService("00000000-0000-1000-8000-00805f9b34fb",
209                                                    GattServiceType.SERVICE_TYPE_PRIMARY)
210    gattService2 = per_droid.gattServerCreateService("FFFFFFFF-0000-1000-8000-00805f9b34fb",
211                                                     GattServiceType.SERVICE_TYPE_PRIMARY)
212    gattService3 = per_droid.gattServerCreateService("3846D7A0-69C8-11E4-BA00-0002A5D5C51B",
213                                                     GattServiceType.SERVICE_TYPE_PRIMARY)
214    for characteristic in characteristic_list:
215        per_droid.gattServerAddCharacteristicToService(gattService, characteristic)
216    per_droid.gattServerAddService(gatt_server, gattService)
217    expected_event = GattCallbackString.SERV_ADDED.format(gatt_server_callback)
218    try:
219        per_ed.pop_event(expected_event, default_timeout)
220    except Empty:
221        peripheral.sl4a.gattServerClose(gatt_server)
222        raise GattTestUtilsError(GattCallbackError.SERV_ADDED_ERR.format(expected_event))
223    for characteristic in characteristic_list:
224        per_droid.gattServerAddCharacteristicToService(gattService2, characteristic)
225    per_droid.gattServerAddService(gatt_server, gattService2)
226    try:
227        per_ed.pop_event(expected_event, default_timeout)
228    except Empty:
229        peripheral.sl4a.gattServerClose(gatt_server)
230        raise GattTestUtilsError(GattCallbackError.SERV_ADDED_ERR.format(expected_event))
231    for characteristic in characteristic_list:
232        per_droid.gattServerAddCharacteristicToService(gattService3, characteristic)
233    per_droid.gattServerAddService(gatt_server, gattService3)
234    try:
235        per_ed.pop_event(expected_event, default_timeout)
236    except Empty:
237        peripheral.sl4a.gattServerClose(gatt_server)
238        raise GattTestUtilsError(GattCallbackError.SERV_ADDED_ERR.format(expected_event))
239    return gatt_server_callback, gatt_server
240
241
242def setup_characteristics_and_descriptors_notify_read(droid: Sl4aClient):
243    characteristic_input = [
244        {
245            'uuid': "aa7edd5a-4d1d-4f0e-883a-d145616a1630",
246            'property': GattCharacteristic.PROPERTY_WRITE | GattCharacteristic.PROPERTY_WRITE_NO_RESPONSE,
247            'permission': GattCharacteristic.PROPERTY_WRITE
248        },
249        {
250            'uuid': "21c0a0bf-ad51-4a2d-8124-b74003e4e8c8",
251            'property': GattCharacteristic.PROPERTY_NOTIFY | GattCharacteristic.PROPERTY_READ,
252            'permission': GattCharacteristic.PERMISSION_READ
253        },
254        {
255            'uuid': "6774191f-6ec3-4aa2-b8a8-cf830e41fda6",
256            'property': GattCharacteristic.PROPERTY_NOTIFY | GattCharacteristic.PROPERTY_READ,
257            'permission': GattCharacteristic.PERMISSION_READ
258        },
259    ]
260    descriptor_input = [{
261        'uuid': "aa7edd5a-4d1d-4f0e-883a-d145616a1630",
262        'property': GattDescriptor.PERMISSION_READ[0] | GattDescriptor.PERMISSION_WRITE[0],
263    }, {
264        'uuid': "76d5ed92-ca81-4edb-bb6b-9f019665fb32",
265        'property': GattDescriptor.PERMISSION_READ[0] | GattDescriptor.PERMISSION_WRITE[0],
266    }]
267    characteristic_list = setup_gatt_characteristics(droid, characteristic_input)
268    descriptor_list = setup_gatt_descriptors(droid, descriptor_input)
269    return characteristic_list, descriptor_list
270
271
272def setup_gatt_characteristics(droid: Sl4aClient, input):
273    characteristic_list = []
274    for item in input:
275        index = droid.gattServerCreateBluetoothGattCharacteristic(item['uuid'], item['property'], item['permission'])
276        characteristic_list.append(index)
277    return characteristic_list
278
279
280def setup_gatt_descriptors(droid: Sl4aClient, input):
281    descriptor_list = []
282    for item in input:
283        index = droid.gattServerCreateBluetoothGattDescriptor(
284            item['uuid'],
285            item['property'],
286        )
287        descriptor_list.append(index)
288    log.info("setup descriptor list: {}".format(descriptor_list))
289    return descriptor_list
290
291
292def setup_gatt_mtu(central: AndroidDevice, bluetooth_gatt, gatt_callback, mtu):
293    """utility function to set mtu for GATT connection.
294
295    Steps:
296    1. Request mtu change.
297    2. Check if the mtu is changed to the new value
298
299    Args:
300        central: test device for client to scan.
301        bluetooth_gatt: GATT object
302        mtu: new mtu value to be set
303
304    Returns:
305        If success, return True.
306        if fail, return False
307    """
308    central.sl4a.gattClientRequestMtu(bluetooth_gatt, mtu)
309    expected_event = GattCallbackString.MTU_CHANGED.format(gatt_callback)
310    try:
311        mtu_event = central.ed.pop_event(expected_event, default_timeout)
312        mtu_size_found = mtu_event['data']['MTU']
313        if mtu_size_found != mtu:
314            log.error("MTU size found: {}, expected: {}".format(mtu_size_found, mtu))
315            return False
316    except Empty:
317        log.error(GattCallbackError.MTU_CHANGED_ERR.format(expected_event))
318        return False
319    return True
320
321
322def log_gatt_server_uuids(central: AndroidDevice, discovered_services_index, bluetooth_gatt=None):
323    services_count = central.sl4a.gattClientGetDiscoveredServicesCount(discovered_services_index)
324    for i in range(services_count):
325        service = central.sl4a.gattClientGetDiscoveredServiceUuid(discovered_services_index, i)
326        log.info("Discovered service uuid {}".format(service))
327        characteristic_uuids = (central.sl4a.gattClientGetDiscoveredCharacteristicUuids(discovered_services_index, i))
328        for j in range(len(characteristic_uuids)):
329            descriptor_uuids = (central.sl4a.gattClientGetDiscoveredDescriptorUuidsByIndex(
330                discovered_services_index, i, j))
331            if bluetooth_gatt:
332                char_inst_id = central.sl4a.gattClientGetCharacteristicInstanceId(bluetooth_gatt,
333                                                                                  discovered_services_index, i, j)
334                log.info("Discovered characteristic handle uuid: {} {}".format(
335                    hex(char_inst_id), characteristic_uuids[j]))
336                for k in range(len(descriptor_uuids)):
337                    desc_inst_id = central.sl4a.gattClientGetDescriptorInstanceId(bluetooth_gatt,
338                                                                                  discovered_services_index, i, j, k)
339                    log.info("Discovered descriptor handle uuid: {} {}".format(hex(desc_inst_id), descriptor_uuids[k]))
340            else:
341                log.info("Discovered characteristic uuid: {}".format(characteristic_uuids[j]))
342                for k in range(len(descriptor_uuids)):
343                    log.info("Discovered descriptor uuid {}".format(descriptor_uuids[k]))
344