xref: /aosp_15_r20/external/autotest/client/common_lib/cros/bluetooth/bluetooth_sdp_socket.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Lint as: python2, python3
2# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6from __future__ import absolute_import
7from __future__ import division
8from __future__ import print_function
9
10import logging
11import socket
12import struct
13
14import btsocket
15from six.moves import range
16
17SDP_HDR_FORMAT        = '>BHH'
18SDP_HDR_SIZE          = struct.calcsize(SDP_HDR_FORMAT)
19SDP_TID_CNT           = 1 << 16
20SDP_MAX_UUIDS_CNT     = 12
21SDP_BODY_CNT_FORMAT   = '>HH'
22SDP_BODY_CNT_SIZE     = struct.calcsize(SDP_BODY_CNT_FORMAT)
23BLUETOOTH_BASE_UUID   = 0x0000000000001000800000805F9B34FB
24
25# Constants are taken from lib/sdp.h in BlueZ source.
26SDP_RESPONSE_TIMEOUT    = 20
27SDP_REQ_BUFFER_SIZE     = 2048
28SDP_RSP_BUFFER_SIZE     = 65535
29SDP_PDU_CHUNK_SIZE      = 1024
30
31SDP_PSM                 = 0x0001
32
33SDP_UUID                = 0x0001
34
35SDP_DATA_NIL            = 0x00
36SDP_UINT8               = 0x08
37SDP_UINT16              = 0x09
38SDP_UINT32              = 0x0A
39SDP_UINT64              = 0x0B
40SDP_UINT128             = 0x0C
41SDP_INT8                = 0x10
42SDP_INT16               = 0x11
43SDP_INT32               = 0x12
44SDP_INT64               = 0x13
45SDP_INT128              = 0x14
46SDP_UUID_UNSPEC         = 0x18
47SDP_UUID16              = 0x19
48SDP_UUID32              = 0x1A
49SDP_UUID128             = 0x1C
50SDP_TEXT_STR_UNSPEC     = 0x20
51SDP_TEXT_STR8           = 0x25
52SDP_TEXT_STR16          = 0x26
53SDP_TEXT_STR32          = 0x27
54SDP_BOOL                = 0x28
55SDP_SEQ_UNSPEC          = 0x30
56SDP_SEQ8                = 0x35
57SDP_SEQ16               = 0x36
58SDP_SEQ32               = 0x37
59SDP_ALT_UNSPEC          = 0x38
60SDP_ALT8                = 0x3D
61SDP_ALT16               = 0x3E
62SDP_ALT32               = 0x3F
63SDP_URL_STR_UNSPEC      = 0x40
64SDP_URL_STR8            = 0x45
65SDP_URL_STR16           = 0x46
66SDP_URL_STR32           = 0x47
67
68SDP_ERROR_RSP           = 0x01
69SDP_SVC_SEARCH_REQ      = 0x02
70SDP_SVC_SEARCH_RSP      = 0x03
71SDP_SVC_ATTR_REQ        = 0x04
72SDP_SVC_ATTR_RSP        = 0x05
73SDP_SVC_SEARCH_ATTR_REQ = 0x06
74SDP_SVC_SEARCH_ATTR_RSP = 0x07
75
76
77class BluetoothSDPSocketError(Exception):
78    """Error raised for SDP-related issues with BluetoothSDPSocket."""
79    pass
80
81
82class BluetoothSDPSocket(btsocket.socket):
83    """Bluetooth SDP Socket.
84
85    BluetoothSDPSocket wraps the btsocket.socket() class to implement
86    the necessary send and receive methods for the SDP protocol.
87
88    """
89
90    def __init__(self):
91        super(BluetoothSDPSocket, self).__init__(family=btsocket.AF_BLUETOOTH,
92                                                 type=socket.SOCK_SEQPACKET,
93                                                 proto=btsocket.BTPROTO_L2CAP)
94        self.tid = 0
95
96
97    def gen_tid(self):
98        """Generate new Transaction ID
99
100        @return Transaction ID
101
102        """
103        self.tid = (self.tid + 1) % SDP_TID_CNT
104        return self.tid
105
106
107    def connect(self, address):
108        """Connect to device with the given address
109
110        @param address: Bluetooth address.
111
112        """
113        try:
114            super(BluetoothSDPSocket, self).connect((address, SDP_PSM))
115        except btsocket.error as e:
116            logging.error('Error connecting to %s: %s', address, e)
117            raise BluetoothSDPSocketError('Error connecting to host: %s' % e)
118        except btsocket.timeout as e:
119            logging.error('Timeout connecting to %s: %s', address, e)
120            raise BluetoothSDPSocketError('Timeout connecting to host: %s' % e)
121
122    def send_request(self, code, tid, data, forced_pdu_size=None):
123        """Send a request to the socket.
124
125        @param code: Request code.
126        @param tid: Transaction ID.
127        @param data: Parameters as bytearray or str.
128        @param forced_pdu_size: Use certain PDU size parameter instead of
129               calculating actual length of sequence.
130
131        @raise BluetoothSDPSocketError: if 'send' to the socket didn't succeed.
132
133        """
134        size = len(data)
135        if forced_pdu_size != None:
136            size = forced_pdu_size
137        msg = struct.pack(SDP_HDR_FORMAT, code, tid, size) + data
138
139        length = self.send(msg)
140        if length != len(msg):
141            raise BluetoothSDPSocketError('Short write on socket')
142
143
144    def recv_response(self):
145        """Receive a single response from the socket.
146
147        The response data is not parsed.
148
149        Use settimeout() to set whether this method will block if there is no
150        reply, return immediately or wait for a specific length of time before
151        timing out and raising TimeoutError.
152
153        @return tuple of (code, tid, data)
154        @raise BluetoothSDPSocketError: if the received packet is too small or
155               if size of the packet differs from size written in header
156
157        """
158        # Read the response from the socket.
159        response = self.recv(SDP_RSP_BUFFER_SIZE)
160
161        if len(response) < SDP_HDR_SIZE:
162            raise BluetoothSDPSocketError('Short read on socket')
163
164        code, tid, length = struct.unpack_from(SDP_HDR_FORMAT, response)
165        data = response[SDP_HDR_SIZE:]
166
167        if length != len(data):
168            raise BluetoothSDPSocketError('Short read on socket')
169
170        return code, tid, data
171
172
173    def send_request_and_wait(self, req_code, req_data, forced_pdu_size=None):
174        """Send a request to the socket and wait for the response.
175
176        The response data is not parsed.
177
178        @param req_code: Request code.
179        @param req_data: Parameters as bytearray or str.
180        @param forced_pdu_size: Use certain PDU size parameter instead of
181               calculating actual length of sequence.
182
183        Use settimeout() to set whether this method will block if there is no
184        reply, return immediately or wait for a specific length of time before
185        timing out and raising TimeoutError.
186
187        @return tuple of (rsp_code, data)
188        @raise BluetoothSDPSocketError: if Transaction ID of the response
189               doesn't match to Transaction ID sent in request
190
191        """
192        req_tid = self.gen_tid()
193        self.send_request(req_code, req_tid, req_data, forced_pdu_size)
194        rsp_code, rsp_tid, rsp_data = self.recv_response()
195
196        if req_tid != rsp_tid:
197            raise BluetoothSDPSocketError("Transaction IDs for request and "
198                                          "response don't match")
199
200        return rsp_code, rsp_data
201
202
203    def _pack_list(self, data_element_list):
204        """Preappend a list with required header.
205
206        Size of the header is chosen to be minimal possible.
207
208        @param data_element_list: List to be packed.
209
210        @return packed list as a str
211        @raise BluetoothSDPSocketError: if size of the list is larger than or
212               equal to 2^32 bytes, which is not supported in SDP transactions
213
214        """
215        size = len(data_element_list)
216        if size < (1 << 8):
217            header = struct.pack('>BB', SDP_SEQ8, size)
218        elif size < (1 << 16):
219            header = struct.pack('>BH', SDP_SEQ16, size)
220        elif size < (1 << 32):
221            header = struct.pack('>BI', SDP_SEQ32, size)
222        else:
223            raise BluetoothSDPSocketError('List is too long')
224        return header + data_element_list.encode('utf-8')
225
226
227    def _pack_uuids(self, uuids, preferred_size):
228        """Pack a list of UUIDs to a binary sequence
229
230        @param uuids: List of UUIDs (as integers).
231        @param preferred_size: Preffered size of UUIDs in bits (16, 32, or 128).
232
233        @return packed list as a str
234        @raise BluetoothSDPSocketError: the given preferred size is not
235               supported by SDP
236
237        """
238        if preferred_size not in (16, 32, 128):
239            raise BluetoothSDPSocketError('Unsupported UUID size: %d; '
240                                          'Supported values are: 16, 32, 128'
241                                          % preferred_size)
242
243        res = ''
244        for uuid in uuids:
245            # Fall back to 128 bits if the UUID doesn't fit into preferred_size.
246            if uuid >= (1 << preferred_size) or preferred_size == 128:
247                uuid128 = uuid
248                if uuid < (1 << 32):
249                    uuid128 = (uuid128 << 96) + BLUETOOTH_BASE_UUID
250                packed_uuid = struct.pack('>BQQ', SDP_UUID128, uuid128 >> 64,
251                                          uuid128 & ((1 << 64) - 1))
252            elif preferred_size == 16:
253                packed_uuid = struct.pack('>BH', SDP_UUID16, uuid)
254            elif preferred_size == 32:
255                packed_uuid = struct.pack('>BI', SDP_UUID32, uuid)
256
257            res += packed_uuid
258
259        res = self._pack_list(res)
260
261        return res
262
263
264    def _unpack_uuids(self, response):
265        """Unpack SDP response
266
267        @param response: body of raw SDP response.
268
269        @return tuple of (uuids, cont_state)
270
271        """
272        total_cnt, cur_cnt = struct.unpack_from(SDP_BODY_CNT_FORMAT, response)
273        scanned = SDP_BODY_CNT_SIZE
274        uuids = []
275        for i in range(cur_cnt):
276            uuid, = struct.unpack_from('>I', response, scanned)
277            uuids.append(uuid)
278            scanned += 4
279
280        cont_state = response[scanned:]
281        return uuids, cont_state
282
283
284    def _unpack_error_code(self, response):
285        """Unpack Error Code from SDP error response
286
287        @param response: Body of raw SDP response.
288
289        @return Error Code as int
290
291        """
292        error_code, = struct.unpack_from('>H', response)
293        return error_code
294
295
296    def service_search_request(self, uuids, max_rec_cnt, preferred_size=32,
297                               forced_pdu_size=None, invalid_request=False):
298        """Send a Service Search Request
299
300        @param uuids: List of UUIDs (as integers) to look for.
301        @param max_rec_cnt: Maximum count of returned service records.
302        @param preferred_size: Preffered size of UUIDs in bits (16, 32, or 128).
303        @param forced_pdu_size: Use certain PDU size parameter instead of
304               calculating actual length of sequence.
305        @param invalid_request: Whether to send request with intentionally
306               invalid syntax for testing purposes (bool flag).
307
308        @return list of found services' service record handles or Error Code
309        @raise BluetoothSDPSocketError: arguments do not match the SDP
310               restrictions or if the response has an incorrect code
311
312        """
313        if max_rec_cnt < 1 or max_rec_cnt > 65535:
314            raise BluetoothSDPSocketError('MaximumServiceRecordCount must be '
315                                          'between 1 and 65535, inclusive')
316
317        if len(uuids) > SDP_MAX_UUIDS_CNT:
318            raise BluetoothSDPSocketError('Too many UUIDs')
319
320        pattern = self._pack_uuids(uuids, preferred_size) + struct.pack(
321                  '>H', max_rec_cnt)
322        cont_state = '\0'
323        handles = []
324
325        while True:
326            request = pattern + cont_state.encode('utf-8')
327
328            # Request without any continuation state is an example of invalid
329            # request syntax.
330            if invalid_request:
331                request = pattern
332
333            code, response = self.send_request_and_wait(
334                    SDP_SVC_SEARCH_REQ, request, forced_pdu_size)
335
336            if code == SDP_ERROR_RSP:
337                return self._unpack_error_code(response)
338
339            if code != SDP_SVC_SEARCH_RSP:
340                raise BluetoothSDPSocketError('Incorrect response code')
341
342            cur_list, cont_state = self._unpack_uuids(response)
343            handles.extend(cur_list)
344            if cont_state == '\0':
345                break
346
347        return handles
348
349
350    def _pack_attr_ids(self, attr_ids):
351        """Pack a list of Attribute IDs to a binary sequence
352
353        @param attr_ids: List of Attribute IDs.
354
355        @return packed list as a str
356        @raise BluetoothSDPSocketError: if list of UUIDs after packing is larger
357               than or equal to 2^32 bytes
358
359        """
360        attr_ids.sort()
361        res = ''
362        for attr_id in attr_ids:
363            # Each element could be either a single Attribute ID or
364            # a range of IDs.
365            if isinstance(attr_id, list):
366                packed_attr_id = struct.pack('>BHH', SDP_UINT32,
367                                             attr_id[0], attr_id[1])
368            else:
369                packed_attr_id = struct.pack('>BH', SDP_UINT16, attr_id)
370
371            res += packed_attr_id
372
373        res = self._pack_list(res)
374
375        return res
376
377
378    def _unpack_int(self, data, cnt):
379        """Unpack an unsigned integer of cnt bytes
380
381        @param data: raw data to be parsed
382        @param cnt: size of integer
383
384        @return unsigned integer
385
386        """
387        res = 0
388        for i in range(cnt):
389            res = (res << 8) | ord(data[i])
390        return res
391
392
393    def _unpack_sdp_data_element(self, data):
394        """Unpack a data element from a raw response
395
396        @param data: raw data to be parsed
397
398        @return tuple (result, scanned bytes)
399
400        """
401        header, = struct.unpack_from('>B', data)
402        data_type = header >> 3
403        data_size = header & 7
404        scanned = 1
405        data = data[1:]
406        if data_type == 0:
407            if data_size != 0:
408                raise BluetoothSDPSocketError('Invalid size descriptor')
409            return None, scanned
410        elif data_type <= 3 or data_type == 5:
411            if (data_size > 4 or
412                data_type == 3 and (data_size == 0 or data_size == 3) or
413                data_type == 5 and data_size != 0):
414                raise BluetoothSDPSocketError('Invalid size descriptor')
415
416            int_size = 1 << data_size
417            res = self._unpack_int(data, int_size)
418
419            # Consider negative integers.
420            if data_type == 2 and (ord(data[0]) & 128) != 0:
421                res = res - (1 << (int_size * 8))
422
423            # Consider booleans.
424            if data_type == 5:
425                res = res != 0
426
427            scanned += int_size
428            return res, scanned
429        elif data_type == 4 or data_type == 8:
430            if data_size < 5 or data_size > 7:
431                raise BluetoothSDPSocketError('Invalid size descriptor')
432
433            int_size = 1 << (data_size - 5)
434            str_size = self._unpack_int(data, int_size)
435
436            res = data[int_size : int_size + str_size]
437            scanned += int_size + str_size
438            return res, scanned
439        elif data_type == 6 or data_type == 7:
440            if data_size < 5 or data_size > 7:
441                raise BluetoothSDPSocketError('Invalid size descriptor')
442
443            int_size = 1 << (data_size - 5)
444            total_size = self._unpack_int(data, int_size)
445
446            data = data[int_size:]
447            scanned += int_size + total_size
448
449            res = []
450            cur_size = 0
451            while cur_size < total_size:
452                elem, elem_size = self._unpack_sdp_data_element(data)
453                res.append(elem)
454                data = data[elem_size:]
455                cur_size += elem_size
456
457            return res, scanned
458        else:
459            raise BluetoothSDPSocketError('Invalid size descriptor')
460
461
462    def service_attribute_request(self, handle, max_attr_byte_count, attr_ids,
463                                  forced_pdu_size=None, invalid_request=None):
464        """Send a Service Attribute Request
465
466        @param handle: service record from which attribute values are to be
467               retrieved.
468        @param max_attr_byte_count: maximum number of bytes of attribute data to
469               be returned in the response to this request.
470        @param attr_ids: a list, where each element is either an attribute ID
471               or a range of attribute IDs.
472        @param forced_pdu_size: Use certain PDU size parameter instead of
473               calculating actual length of sequence.
474        @param invalid_request: Whether to send request with intentionally
475               invalid syntax for testing purposes (string with raw request).
476
477        @return list of found attributes IDs and their values or Error Code
478        @raise BluetoothSDPSocketError: arguments do not match the SDP
479               restrictions or if the response has an incorrect code
480
481        """
482        if max_attr_byte_count < 7 or max_attr_byte_count > 65535:
483            raise BluetoothSDPSocketError('MaximumAttributeByteCount must be '
484                                          'between 7 and 65535, inclusive')
485
486        pattern = (struct.pack('>I', handle) +
487                   struct.pack('>H', max_attr_byte_count) +
488                   self._pack_attr_ids(attr_ids))
489        cont_state = '\0'
490        complete_response = ''
491
492        while True:
493            request = (invalid_request if invalid_request else pattern +
494                       cont_state.encode('utf-8'))
495
496            code, response = self.send_request_and_wait(
497                    SDP_SVC_ATTR_REQ, request, forced_pdu_size)
498
499            if code == SDP_ERROR_RSP:
500                return self._unpack_error_code(response)
501
502            if code != SDP_SVC_ATTR_RSP:
503                raise BluetoothSDPSocketError('Incorrect response code')
504
505            response_byte_count, = struct.unpack_from('>H', response)
506            if response_byte_count > max_attr_byte_count:
507                raise BluetoothSDPSocketError('AttributeListByteCount exceeds'
508                                              'MaximumAttributeByteCount')
509
510            response = response[2:]
511            complete_response += response[:response_byte_count]
512            cont_state = response[response_byte_count:]
513
514            if cont_state == '\0':
515                break
516
517        id_values_list = self._unpack_sdp_data_element(complete_response)[0]
518        if len(id_values_list) % 2 == 1:
519            raise BluetoothSDPSocketError('Length of returned list is odd')
520
521        return id_values_list
522
523
524    def service_search_attribute_request(self, uuids, max_attr_byte_count,
525                                         attr_ids, preferred_size=32,
526                                         forced_pdu_size=None,
527                                         invalid_request=None):
528        """Send a Service Search Attribute Request
529
530        @param uuids: list of UUIDs (as integers) to look for.
531        @param max_attr_byte_count: maximum number of bytes of attribute data to
532               be returned in the response to this request.
533        @param attr_ids: a list, where each element is either an attribute ID
534               or a range of attribute IDs.
535        @param preferred_size: Preffered size of UUIDs in bits (16, 32, or 128).
536        @param forced_pdu_size: Use certain PDU size parameter instead of
537               calculating actual length of sequence.
538        @param invalid_request: Whether to send request with intentionally
539               invalid syntax for testing purposes (string to be prepended
540               to correct request).
541
542        @return list of found attributes IDs and their values or Error Code
543        @raise BluetoothSDPSocketError: arguments do not match the SDP
544               restrictions or if the response has an incorrect code
545
546        """
547        if len(uuids) > SDP_MAX_UUIDS_CNT:
548            raise BluetoothSDPSocketError('Too many UUIDs')
549
550        if max_attr_byte_count < 7 or max_attr_byte_count > 65535:
551            raise BluetoothSDPSocketError('MaximumAttributeByteCount must be '
552                                          'between 7 and 65535, inclusive')
553
554        pattern = (self._pack_uuids(uuids, preferred_size) +
555                   struct.pack('>H', max_attr_byte_count) +
556                   self._pack_attr_ids(attr_ids))
557        cont_state = '\0'
558        complete_response = ''
559
560        while True:
561            request = pattern + cont_state.encode('utf-8')
562            if invalid_request:
563                request = invalid_request + request
564
565            code, response = self.send_request_and_wait(
566                    SDP_SVC_SEARCH_ATTR_REQ, request, forced_pdu_size)
567
568            if code == SDP_ERROR_RSP:
569                return self._unpack_error_code(response)
570
571            if code != SDP_SVC_SEARCH_ATTR_RSP:
572                raise BluetoothSDPSocketError('Incorrect response code')
573
574            response_byte_count, = struct.unpack_from('>H', response)
575            if response_byte_count > max_attr_byte_count:
576                raise BluetoothSDPSocketError('AttributeListByteCount exceeds'
577                                              'MaximumAttributeByteCount')
578
579            response = response[2:]
580            complete_response += response[:response_byte_count]
581            cont_state = response[response_byte_count:]
582
583            if cont_state == '\0':
584                break
585
586        id_values_list = self._unpack_sdp_data_element(complete_response)[0]
587
588        return id_values_list
589