1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18from __future__ import annotations
19import asyncio
20import dataclasses
21import enum
22import logging
23import struct
24
25from collections import deque
26from pyee import EventEmitter
27from typing import (
28    Dict,
29    Type,
30    List,
31    Optional,
32    Tuple,
33    Callable,
34    Any,
35    Union,
36    Deque,
37    Iterable,
38    SupportsBytes,
39    TYPE_CHECKING,
40)
41
42from .utils import deprecated
43from .colors import color
44from .core import (
45    BT_CENTRAL_ROLE,
46    InvalidStateError,
47    InvalidArgumentError,
48    InvalidPacketError,
49    OutOfResourcesError,
50    ProtocolError,
51)
52from .hci import (
53    HCI_LE_Connection_Update_Command,
54    HCI_Object,
55    key_with_value,
56    name_or_number,
57)
58
59if TYPE_CHECKING:
60    from bumble.device import Connection
61    from bumble.host import Host
62
63# -----------------------------------------------------------------------------
64# Logging
65# -----------------------------------------------------------------------------
66logger = logging.getLogger(__name__)
67
68
69# -----------------------------------------------------------------------------
70# Constants
71# -----------------------------------------------------------------------------
72# fmt: off
73# pylint: disable=line-too-long
74
75L2CAP_SIGNALING_CID    = 0x01
76L2CAP_LE_SIGNALING_CID = 0x05
77
78L2CAP_MIN_LE_MTU     = 23
79L2CAP_MIN_BR_EDR_MTU = 48
80L2CAP_MAX_BR_EDR_MTU = 65535
81
82L2CAP_DEFAULT_MTU = 2048  # Default value for the MTU we are willing to accept
83
84L2CAP_DEFAULT_CONNECTIONLESS_MTU = 1024
85
86# See Bluetooth spec @ Vol 3, Part A - Table 2.1: CID name space on ACL-U, ASB-U, and AMP-U logical links
87L2CAP_ACL_U_DYNAMIC_CID_RANGE_START = 0x0040
88L2CAP_ACL_U_DYNAMIC_CID_RANGE_END   = 0xFFFF
89
90# See Bluetooth spec @ Vol 3, Part A - Table 2.2: CID name space on LE-U logical link
91L2CAP_LE_U_DYNAMIC_CID_RANGE_START = 0x0040
92L2CAP_LE_U_DYNAMIC_CID_RANGE_END   = 0x007F
93
94# PSM Range - See Bluetooth spec @ Vol 3, Part A / Table 4.5: PSM ranges and usage
95L2CAP_PSM_DYNAMIC_RANGE_START = 0x1001
96L2CAP_PSM_DYNAMIC_RANGE_END   = 0xFFFF
97
98# LE PSM Ranges - See Bluetooth spec @ Vol 3, Part A / Table 4.19: LE Credit Based Connection Request LE_PSM ranges
99L2CAP_LE_PSM_DYNAMIC_RANGE_START = 0x0080
100L2CAP_LE_PSM_DYNAMIC_RANGE_END   = 0x00FF
101
102# Frame types
103L2CAP_COMMAND_REJECT                       = 0x01
104L2CAP_CONNECTION_REQUEST                   = 0x02
105L2CAP_CONNECTION_RESPONSE                  = 0x03
106L2CAP_CONFIGURE_REQUEST                    = 0x04
107L2CAP_CONFIGURE_RESPONSE                   = 0x05
108L2CAP_DISCONNECTION_REQUEST                = 0x06
109L2CAP_DISCONNECTION_RESPONSE               = 0x07
110L2CAP_ECHO_REQUEST                         = 0x08
111L2CAP_ECHO_RESPONSE                        = 0x09
112L2CAP_INFORMATION_REQUEST                  = 0x0A
113L2CAP_INFORMATION_RESPONSE                 = 0x0B
114L2CAP_CREATE_CHANNEL_REQUEST               = 0x0C
115L2CAP_CREATE_CHANNEL_RESPONSE              = 0x0D
116L2CAP_MOVE_CHANNEL_REQUEST                 = 0x0E
117L2CAP_MOVE_CHANNEL_RESPONSE                = 0x0F
118L2CAP_MOVE_CHANNEL_CONFIRMATION            = 0x10
119L2CAP_MOVE_CHANNEL_CONFIRMATION_RESPONSE   = 0x11
120L2CAP_CONNECTION_PARAMETER_UPDATE_REQUEST  = 0x12
121L2CAP_CONNECTION_PARAMETER_UPDATE_RESPONSE = 0x13
122L2CAP_LE_CREDIT_BASED_CONNECTION_REQUEST   = 0x14
123L2CAP_LE_CREDIT_BASED_CONNECTION_RESPONSE  = 0x15
124L2CAP_LE_FLOW_CONTROL_CREDIT               = 0x16
125
126L2CAP_CONTROL_FRAME_NAMES = {
127    L2CAP_COMMAND_REJECT:                       'L2CAP_COMMAND_REJECT',
128    L2CAP_CONNECTION_REQUEST:                   'L2CAP_CONNECTION_REQUEST',
129    L2CAP_CONNECTION_RESPONSE:                  'L2CAP_CONNECTION_RESPONSE',
130    L2CAP_CONFIGURE_REQUEST:                    'L2CAP_CONFIGURE_REQUEST',
131    L2CAP_CONFIGURE_RESPONSE:                   'L2CAP_CONFIGURE_RESPONSE',
132    L2CAP_DISCONNECTION_REQUEST:                'L2CAP_DISCONNECTION_REQUEST',
133    L2CAP_DISCONNECTION_RESPONSE:               'L2CAP_DISCONNECTION_RESPONSE',
134    L2CAP_ECHO_REQUEST:                         'L2CAP_ECHO_REQUEST',
135    L2CAP_ECHO_RESPONSE:                        'L2CAP_ECHO_RESPONSE',
136    L2CAP_INFORMATION_REQUEST:                  'L2CAP_INFORMATION_REQUEST',
137    L2CAP_INFORMATION_RESPONSE:                 'L2CAP_INFORMATION_RESPONSE',
138    L2CAP_CREATE_CHANNEL_REQUEST:               'L2CAP_CREATE_CHANNEL_REQUEST',
139    L2CAP_CREATE_CHANNEL_RESPONSE:              'L2CAP_CREATE_CHANNEL_RESPONSE',
140    L2CAP_MOVE_CHANNEL_REQUEST:                 'L2CAP_MOVE_CHANNEL_REQUEST',
141    L2CAP_MOVE_CHANNEL_RESPONSE:                'L2CAP_MOVE_CHANNEL_RESPONSE',
142    L2CAP_MOVE_CHANNEL_CONFIRMATION:            'L2CAP_MOVE_CHANNEL_CONFIRMATION',
143    L2CAP_MOVE_CHANNEL_CONFIRMATION_RESPONSE:   'L2CAP_MOVE_CHANNEL_CONFIRMATION_RESPONSE',
144    L2CAP_CONNECTION_PARAMETER_UPDATE_REQUEST:  'L2CAP_CONNECTION_PARAMETER_UPDATE_REQUEST',
145    L2CAP_CONNECTION_PARAMETER_UPDATE_RESPONSE: 'L2CAP_CONNECTION_PARAMETER_UPDATE_RESPONSE',
146    L2CAP_LE_CREDIT_BASED_CONNECTION_REQUEST:   'L2CAP_LE_CREDIT_BASED_CONNECTION_REQUEST',
147    L2CAP_LE_CREDIT_BASED_CONNECTION_RESPONSE:  'L2CAP_LE_CREDIT_BASED_CONNECTION_RESPONSE',
148    L2CAP_LE_FLOW_CONTROL_CREDIT:               'L2CAP_LE_FLOW_CONTROL_CREDIT'
149}
150
151L2CAP_CONNECTION_PARAMETERS_ACCEPTED_RESULT = 0x0000
152L2CAP_CONNECTION_PARAMETERS_REJECTED_RESULT = 0x0001
153
154L2CAP_COMMAND_NOT_UNDERSTOOD_REASON = 0x0000
155L2CAP_SIGNALING_MTU_EXCEEDED_REASON = 0x0001
156L2CAP_INVALID_CID_IN_REQUEST_REASON = 0x0002
157
158L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_CREDITS             = 65535
159L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MTU                 = 23
160L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MTU                 = 65535
161L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MPS                 = 23
162L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MPS                 = 65533
163L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU             = 2048
164L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS             = 2048
165L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_INITIAL_CREDITS = 256
166
167L2CAP_MAXIMUM_TRANSMISSION_UNIT_CONFIGURATION_OPTION_TYPE = 0x01
168
169L2CAP_MTU_CONFIGURATION_PARAMETER_TYPE = 0x01
170
171# fmt: on
172# pylint: enable=line-too-long
173
174
175# -----------------------------------------------------------------------------
176# Classes
177# -----------------------------------------------------------------------------
178# pylint: disable=invalid-name
179
180
181@dataclasses.dataclass
182class ClassicChannelSpec:
183    psm: Optional[int] = None
184    mtu: int = L2CAP_DEFAULT_MTU
185
186
187@dataclasses.dataclass
188class LeCreditBasedChannelSpec:
189    psm: Optional[int] = None
190    mtu: int = L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU
191    mps: int = L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS
192    max_credits: int = L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_INITIAL_CREDITS
193
194    def __post_init__(self):
195        if (
196            self.max_credits < 1
197            or self.max_credits > L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_CREDITS
198        ):
199            raise InvalidArgumentError('max credits out of range')
200        if (
201            self.mtu < L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MTU
202            or self.mtu > L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MTU
203        ):
204            raise InvalidArgumentError('MTU out of range')
205        if (
206            self.mps < L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MPS
207            or self.mps > L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MPS
208        ):
209            raise InvalidArgumentError('MPS out of range')
210
211
212class L2CAP_PDU:
213    '''
214    See Bluetooth spec @ Vol 3, Part A - 3 DATA PACKET FORMAT
215    '''
216
217    @staticmethod
218    def from_bytes(data: bytes) -> L2CAP_PDU:
219        # Check parameters
220        if len(data) < 4:
221            raise InvalidPacketError('not enough data for L2CAP header')
222
223        _, l2cap_pdu_cid = struct.unpack_from('<HH', data, 0)
224        l2cap_pdu_payload = data[4:]
225
226        return L2CAP_PDU(l2cap_pdu_cid, l2cap_pdu_payload)
227
228    def to_bytes(self) -> bytes:
229        header = struct.pack('<HH', len(self.payload), self.cid)
230        return header + self.payload
231
232    def __init__(self, cid: int, payload: bytes) -> None:
233        self.cid = cid
234        self.payload = payload
235
236    def __bytes__(self) -> bytes:
237        return self.to_bytes()
238
239    def __str__(self) -> str:
240        return f'{color("L2CAP", "green")} [CID={self.cid}]: {self.payload.hex()}'
241
242
243# -----------------------------------------------------------------------------
244class L2CAP_Control_Frame:
245    '''
246    See Bluetooth spec @ Vol 3, Part A - 4 SIGNALING PACKET FORMATS
247    '''
248
249    classes: Dict[int, Type[L2CAP_Control_Frame]] = {}
250    code = 0
251    name: str
252
253    @staticmethod
254    def from_bytes(pdu: bytes) -> L2CAP_Control_Frame:
255        code = pdu[0]
256
257        cls = L2CAP_Control_Frame.classes.get(code)
258        if cls is None:
259            instance = L2CAP_Control_Frame(pdu)
260            instance.name = L2CAP_Control_Frame.code_name(code)
261            instance.code = code
262            return instance
263        self = cls.__new__(cls)
264        L2CAP_Control_Frame.__init__(self, pdu)
265        self.identifier = pdu[1]
266        length = struct.unpack_from('<H', pdu, 2)[0]
267        if length + 4 != len(pdu):
268            logger.warning(
269                color(
270                    f'!!! length mismatch: expected {len(pdu) - 4} but got {length}',
271                    'red',
272                )
273            )
274        if hasattr(self, 'fields'):
275            self.init_from_bytes(pdu, 4)
276        return self
277
278    @staticmethod
279    def code_name(code: int) -> str:
280        return name_or_number(L2CAP_CONTROL_FRAME_NAMES, code)
281
282    @staticmethod
283    def decode_configuration_options(data: bytes) -> List[Tuple[int, bytes]]:
284        options = []
285        while len(data) >= 2:
286            value_type = data[0]
287            length = data[1]
288            value = data[2 : 2 + length]
289            data = data[2 + length :]
290            options.append((value_type, value))
291
292        return options
293
294    @staticmethod
295    def encode_configuration_options(options: List[Tuple[int, bytes]]) -> bytes:
296        return b''.join(
297            [bytes([option[0], len(option[1])]) + option[1] for option in options]
298        )
299
300    @staticmethod
301    def subclass(fields):
302        def inner(cls):
303            cls.name = cls.__name__.upper()
304            cls.code = key_with_value(L2CAP_CONTROL_FRAME_NAMES, cls.name)
305            if cls.code is None:
306                raise KeyError(
307                    f'Control Frame name {cls.name} '
308                    'not found in L2CAP_CONTROL_FRAME_NAMES'
309                )
310            cls.fields = fields
311
312            # Register a factory for this class
313            L2CAP_Control_Frame.classes[cls.code] = cls
314
315            return cls
316
317        return inner
318
319    def __init__(self, pdu=None, **kwargs) -> None:
320        self.identifier = kwargs.get('identifier', 0)
321        if hasattr(self, 'fields'):
322            if kwargs:
323                HCI_Object.init_from_fields(self, self.fields, kwargs)
324            if pdu is None:
325                data = HCI_Object.dict_to_bytes(kwargs, self.fields)
326                pdu = (
327                    bytes([self.code, self.identifier])
328                    + struct.pack('<H', len(data))
329                    + data
330                )
331        self.pdu = pdu
332
333    def init_from_bytes(self, pdu, offset):
334        return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
335
336    def to_bytes(self) -> bytes:
337        return self.pdu
338
339    def __bytes__(self) -> bytes:
340        return self.to_bytes()
341
342    def __str__(self) -> str:
343        result = f'{color(self.name, "yellow")} [ID={self.identifier}]'
344        if fields := getattr(self, 'fields', None):
345            result += ':\n' + HCI_Object.format_fields(self.__dict__, fields, '  ')
346        else:
347            if len(self.pdu) > 1:
348                result += f': {self.pdu.hex()}'
349        return result
350
351
352# -----------------------------------------------------------------------------
353@L2CAP_Control_Frame.subclass(
354    # pylint: disable=unnecessary-lambda
355    [
356        (
357            'reason',
358            {'size': 2, 'mapper': lambda x: L2CAP_Command_Reject.reason_name(x)},
359        ),
360        ('data', '*'),
361    ]
362)
363class L2CAP_Command_Reject(L2CAP_Control_Frame):
364    '''
365    See Bluetooth spec @ Vol 3, Part A - 4.1 COMMAND REJECT
366    '''
367
368    COMMAND_NOT_UNDERSTOOD = 0x0000
369    SIGNALING_MTU_EXCEEDED = 0x0001
370    INVALID_CID_IN_REQUEST = 0x0002
371
372    REASON_NAMES = {
373        COMMAND_NOT_UNDERSTOOD: 'COMMAND_NOT_UNDERSTOOD',
374        SIGNALING_MTU_EXCEEDED: 'SIGNALING_MTU_EXCEEDED',
375        INVALID_CID_IN_REQUEST: 'INVALID_CID_IN_REQUEST',
376    }
377
378    @staticmethod
379    def reason_name(reason: int) -> str:
380        return name_or_number(L2CAP_Command_Reject.REASON_NAMES, reason)
381
382
383# -----------------------------------------------------------------------------
384@L2CAP_Control_Frame.subclass(
385    # pylint: disable=unnecessary-lambda
386    [
387        (
388            'psm',
389            {
390                'parser': lambda data, offset: L2CAP_Connection_Request.parse_psm(
391                    data, offset
392                ),
393                'serializer': lambda value: L2CAP_Connection_Request.serialize_psm(
394                    value
395                ),
396            },
397        ),
398        ('source_cid', 2),
399    ]
400)
401class L2CAP_Connection_Request(L2CAP_Control_Frame):
402    '''
403    See Bluetooth spec @ Vol 3, Part A - 4.2 CONNECTION REQUEST
404    '''
405
406    psm: int
407    source_cid: int
408
409    @staticmethod
410    def parse_psm(data: bytes, offset: int = 0) -> Tuple[int, int]:
411        psm_length = 2
412        psm = data[offset] | data[offset + 1] << 8
413
414        # The PSM field extends until the first even octet (inclusive)
415        while data[offset + psm_length - 1] % 2 == 1:
416            psm |= data[offset + psm_length] << (8 * psm_length)
417            psm_length += 1
418
419        return offset + psm_length, psm
420
421    @staticmethod
422    def serialize_psm(psm: int) -> bytes:
423        serialized = struct.pack('<H', psm & 0xFFFF)
424        psm >>= 16
425        while psm:
426            serialized += bytes([psm & 0xFF])
427            psm >>= 8
428
429        return serialized
430
431
432# -----------------------------------------------------------------------------
433@L2CAP_Control_Frame.subclass(
434    # pylint: disable=unnecessary-lambda
435    [
436        ('destination_cid', 2),
437        ('source_cid', 2),
438        (
439            'result',
440            {'size': 2, 'mapper': lambda x: L2CAP_Connection_Response.result_name(x)},
441        ),
442        ('status', 2),
443    ]
444)
445class L2CAP_Connection_Response(L2CAP_Control_Frame):
446    '''
447    See Bluetooth spec @ Vol 3, Part A - 4.3 CONNECTION RESPONSE
448    '''
449
450    source_cid: int
451    destination_cid: int
452    status: int
453    result: int
454
455    CONNECTION_SUCCESSFUL = 0x0000
456    CONNECTION_PENDING = 0x0001
457    CONNECTION_REFUSED_PSM_NOT_SUPPORTED = 0x0002
458    CONNECTION_REFUSED_SECURITY_BLOCK = 0x0003
459    CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE = 0x0004
460    CONNECTION_REFUSED_INVALID_SOURCE_CID = 0x0006
461    CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED = 0x0007
462    CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS = 0x000B
463
464    # pylint: disable=line-too-long
465    RESULT_NAMES = {
466        CONNECTION_SUCCESSFUL: 'CONNECTION_SUCCESSFUL',
467        CONNECTION_PENDING: 'CONNECTION_PENDING',
468        CONNECTION_REFUSED_PSM_NOT_SUPPORTED: 'CONNECTION_REFUSED_PSM_NOT_SUPPORTED',
469        CONNECTION_REFUSED_SECURITY_BLOCK: 'CONNECTION_REFUSED_SECURITY_BLOCK',
470        CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE: 'CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE',
471        CONNECTION_REFUSED_INVALID_SOURCE_CID: 'CONNECTION_REFUSED_INVALID_SOURCE_CID',
472        CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED: 'CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED',
473        CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS: 'CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS',
474    }
475
476    @staticmethod
477    def result_name(result: int) -> str:
478        return name_or_number(L2CAP_Connection_Response.RESULT_NAMES, result)
479
480
481# -----------------------------------------------------------------------------
482@L2CAP_Control_Frame.subclass([('destination_cid', 2), ('flags', 2), ('options', '*')])
483class L2CAP_Configure_Request(L2CAP_Control_Frame):
484    '''
485    See Bluetooth spec @ Vol 3, Part A - 4.4 CONFIGURATION REQUEST
486    '''
487
488
489# -----------------------------------------------------------------------------
490@L2CAP_Control_Frame.subclass(
491    # pylint: disable=unnecessary-lambda
492    [
493        ('source_cid', 2),
494        ('flags', 2),
495        (
496            'result',
497            {'size': 2, 'mapper': lambda x: L2CAP_Configure_Response.result_name(x)},
498        ),
499        ('options', '*'),
500    ]
501)
502class L2CAP_Configure_Response(L2CAP_Control_Frame):
503    '''
504    See Bluetooth spec @ Vol 3, Part A - 4.5 CONFIGURATION RESPONSE
505    '''
506
507    SUCCESS = 0x0000
508    FAILURE_UNACCEPTABLE_PARAMETERS = 0x0001
509    FAILURE_REJECTED = 0x0002
510    FAILURE_UNKNOWN_OPTIONS = 0x0003
511    PENDING = 0x0004
512    FAILURE_FLOW_SPEC_REJECTED = 0x0005
513
514    RESULT_NAMES = {
515        SUCCESS: 'SUCCESS',
516        FAILURE_UNACCEPTABLE_PARAMETERS: 'FAILURE_UNACCEPTABLE_PARAMETERS',
517        FAILURE_REJECTED: 'FAILURE_REJECTED',
518        FAILURE_UNKNOWN_OPTIONS: 'FAILURE_UNKNOWN_OPTIONS',
519        PENDING: 'PENDING',
520        FAILURE_FLOW_SPEC_REJECTED: 'FAILURE_FLOW_SPEC_REJECTED',
521    }
522
523    @staticmethod
524    def result_name(result: int) -> str:
525        return name_or_number(L2CAP_Configure_Response.RESULT_NAMES, result)
526
527
528# -----------------------------------------------------------------------------
529@L2CAP_Control_Frame.subclass([('destination_cid', 2), ('source_cid', 2)])
530class L2CAP_Disconnection_Request(L2CAP_Control_Frame):
531    '''
532    See Bluetooth spec @ Vol 3, Part A - 4.6 DISCONNECTION REQUEST
533    '''
534
535
536# -----------------------------------------------------------------------------
537@L2CAP_Control_Frame.subclass([('destination_cid', 2), ('source_cid', 2)])
538class L2CAP_Disconnection_Response(L2CAP_Control_Frame):
539    '''
540    See Bluetooth spec @ Vol 3, Part A - 4.7 DISCONNECTION RESPONSE
541    '''
542
543
544# -----------------------------------------------------------------------------
545@L2CAP_Control_Frame.subclass([('data', '*')])
546class L2CAP_Echo_Request(L2CAP_Control_Frame):
547    '''
548    See Bluetooth spec @ Vol 3, Part A - 4.8 ECHO REQUEST
549    '''
550
551
552# -----------------------------------------------------------------------------
553@L2CAP_Control_Frame.subclass([('data', '*')])
554class L2CAP_Echo_Response(L2CAP_Control_Frame):
555    '''
556    See Bluetooth spec @ Vol 3, Part A - 4.9 ECHO RESPONSE
557    '''
558
559
560# -----------------------------------------------------------------------------
561@L2CAP_Control_Frame.subclass(
562    [
563        (
564            'info_type',
565            {
566                'size': 2,
567                # pylint: disable-next=unnecessary-lambda
568                'mapper': lambda x: L2CAP_Information_Request.info_type_name(x),
569            },
570        )
571    ]
572)
573class L2CAP_Information_Request(L2CAP_Control_Frame):
574    '''
575    See Bluetooth spec @ Vol 3, Part A - 4.10 INFORMATION REQUEST
576    '''
577
578    CONNECTIONLESS_MTU = 0x0001
579    EXTENDED_FEATURES_SUPPORTED = 0x0002
580    FIXED_CHANNELS_SUPPORTED = 0x0003
581
582    EXTENDED_FEATURE_FLOW_MODE_CONTROL = 0x0001
583    EXTENDED_FEATURE_RETRANSMISSION_MODE = 0x0002
584    EXTENDED_FEATURE_BIDIRECTIONAL_QOS = 0x0004
585    EXTENDED_FEATURE_ENHANCED_RETRANSMISSION_MODE = 0x0008
586    EXTENDED_FEATURE_STREAMING_MODE = 0x0010
587    EXTENDED_FEATURE_FCS_OPTION = 0x0020
588    EXTENDED_FEATURE_EXTENDED_FLOW_SPEC = 0x0040
589    EXTENDED_FEATURE_FIXED_CHANNELS = 0x0080
590    EXTENDED_FEATURE_EXTENDED_WINDOW_SIZE = 0x0100
591    EXTENDED_FEATURE_UNICAST_CONNECTIONLESS_DATA = 0x0200
592    EXTENDED_FEATURE_ENHANCED_CREDIT_BASE_FLOW_CONTROL = 0x0400
593
594    INFO_TYPE_NAMES = {
595        CONNECTIONLESS_MTU: 'CONNECTIONLESS_MTU',
596        EXTENDED_FEATURES_SUPPORTED: 'EXTENDED_FEATURES_SUPPORTED',
597        FIXED_CHANNELS_SUPPORTED: 'FIXED_CHANNELS_SUPPORTED',
598    }
599
600    @staticmethod
601    def info_type_name(info_type: int) -> str:
602        return name_or_number(L2CAP_Information_Request.INFO_TYPE_NAMES, info_type)
603
604
605# -----------------------------------------------------------------------------
606@L2CAP_Control_Frame.subclass(
607    [
608        ('info_type', {'size': 2, 'mapper': L2CAP_Information_Request.info_type_name}),
609        (
610            'result',
611            # pylint: disable-next=unnecessary-lambda
612            {'size': 2, 'mapper': lambda x: L2CAP_Information_Response.result_name(x)},
613        ),
614        ('data', '*'),
615    ]
616)
617class L2CAP_Information_Response(L2CAP_Control_Frame):
618    '''
619    See Bluetooth spec @ Vol 3, Part A - 4.11 INFORMATION RESPONSE
620    '''
621
622    SUCCESS = 0x00
623    NOT_SUPPORTED = 0x01
624
625    RESULT_NAMES = {SUCCESS: 'SUCCESS', NOT_SUPPORTED: 'NOT_SUPPORTED'}
626
627    @staticmethod
628    def result_name(result: int) -> str:
629        return name_or_number(L2CAP_Information_Response.RESULT_NAMES, result)
630
631
632# -----------------------------------------------------------------------------
633@L2CAP_Control_Frame.subclass(
634    [('interval_min', 2), ('interval_max', 2), ('latency', 2), ('timeout', 2)]
635)
636class L2CAP_Connection_Parameter_Update_Request(L2CAP_Control_Frame):
637    '''
638    See Bluetooth spec @ Vol 3, Part A - 4.20 CONNECTION PARAMETER UPDATE REQUEST
639    '''
640
641
642# -----------------------------------------------------------------------------
643@L2CAP_Control_Frame.subclass([('result', 2)])
644class L2CAP_Connection_Parameter_Update_Response(L2CAP_Control_Frame):
645    '''
646    See Bluetooth spec @ Vol 3, Part A - 4.21 CONNECTION PARAMETER UPDATE RESPONSE
647    '''
648
649
650# -----------------------------------------------------------------------------
651@L2CAP_Control_Frame.subclass(
652    [('le_psm', 2), ('source_cid', 2), ('mtu', 2), ('mps', 2), ('initial_credits', 2)]
653)
654class L2CAP_LE_Credit_Based_Connection_Request(L2CAP_Control_Frame):
655    '''
656    See Bluetooth spec @ Vol 3, Part A - 4.22 LE CREDIT BASED CONNECTION REQUEST
657    (CODE 0x14)
658    '''
659
660    source_cid: int
661
662
663# -----------------------------------------------------------------------------
664@L2CAP_Control_Frame.subclass(
665    # pylint: disable=unnecessary-lambda,line-too-long
666    [
667        ('destination_cid', 2),
668        ('mtu', 2),
669        ('mps', 2),
670        ('initial_credits', 2),
671        (
672            'result',
673            {
674                'size': 2,
675                'mapper': lambda x: L2CAP_LE_Credit_Based_Connection_Response.result_name(
676                    x
677                ),
678            },
679        ),
680    ]
681)
682class L2CAP_LE_Credit_Based_Connection_Response(L2CAP_Control_Frame):
683    '''
684    See Bluetooth spec @ Vol 3, Part A - 4.23 LE CREDIT BASED CONNECTION RESPONSE
685    (CODE 0x15)
686    '''
687
688    CONNECTION_SUCCESSFUL = 0x0000
689    CONNECTION_REFUSED_LE_PSM_NOT_SUPPORTED = 0x0002
690    CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE = 0x0004
691    CONNECTION_REFUSED_INSUFFICIENT_AUTHENTICATION = 0x0005
692    CONNECTION_REFUSED_INSUFFICIENT_AUTHORIZATION = 0x0006
693    CONNECTION_REFUSED_INSUFFICIENT_ENCRYPTION_KEY_SIZE = 0x0007
694    CONNECTION_REFUSED_INSUFFICIENT_ENCRYPTION = 0x0008
695    CONNECTION_REFUSED_INVALID_SOURCE_CID = 0x0009
696    CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED = 0x000A
697    CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS = 0x000B
698
699    # pylint: disable=line-too-long
700    RESULT_NAMES = {
701        CONNECTION_SUCCESSFUL: 'CONNECTION_SUCCESSFUL',
702        CONNECTION_REFUSED_LE_PSM_NOT_SUPPORTED: 'CONNECTION_REFUSED_LE_PSM_NOT_SUPPORTED',
703        CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE: 'CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE',
704        CONNECTION_REFUSED_INSUFFICIENT_AUTHENTICATION: 'CONNECTION_REFUSED_INSUFFICIENT_AUTHENTICATION',
705        CONNECTION_REFUSED_INSUFFICIENT_AUTHORIZATION: 'CONNECTION_REFUSED_INSUFFICIENT_AUTHORIZATION',
706        CONNECTION_REFUSED_INSUFFICIENT_ENCRYPTION_KEY_SIZE: 'CONNECTION_REFUSED_INSUFFICIENT_ENCRYPTION_KEY_SIZE',
707        CONNECTION_REFUSED_INSUFFICIENT_ENCRYPTION: 'CONNECTION_REFUSED_INSUFFICIENT_ENCRYPTION',
708        CONNECTION_REFUSED_INVALID_SOURCE_CID: 'CONNECTION_REFUSED_INVALID_SOURCE_CID',
709        CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED: 'CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED',
710        CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS: 'CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS',
711    }
712
713    @staticmethod
714    def result_name(result: int) -> str:
715        return name_or_number(
716            L2CAP_LE_Credit_Based_Connection_Response.RESULT_NAMES, result
717        )
718
719
720# -----------------------------------------------------------------------------
721@L2CAP_Control_Frame.subclass([('cid', 2), ('credits', 2)])
722class L2CAP_LE_Flow_Control_Credit(L2CAP_Control_Frame):
723    '''
724    See Bluetooth spec @ Vol 3, Part A - 4.24 LE FLOW CONTROL CREDIT (CODE 0x16)
725    '''
726
727
728# -----------------------------------------------------------------------------
729class ClassicChannel(EventEmitter):
730    class State(enum.IntEnum):
731        # States
732        CLOSED = 0x00
733        WAIT_CONNECT = 0x01
734        WAIT_CONNECT_RSP = 0x02
735        OPEN = 0x03
736        WAIT_DISCONNECT = 0x04
737        WAIT_CREATE = 0x05
738        WAIT_CREATE_RSP = 0x06
739        WAIT_MOVE = 0x07
740        WAIT_MOVE_RSP = 0x08
741        WAIT_MOVE_CONFIRM = 0x09
742        WAIT_CONFIRM_RSP = 0x0A
743
744        # CONFIG substates
745        WAIT_CONFIG = 0x10
746        WAIT_SEND_CONFIG = 0x11
747        WAIT_CONFIG_REQ_RSP = 0x12
748        WAIT_CONFIG_RSP = 0x13
749        WAIT_CONFIG_REQ = 0x14
750        WAIT_IND_FINAL_RSP = 0x15
751        WAIT_FINAL_RSP = 0x16
752        WAIT_CONTROL_IND = 0x17
753
754    connection_result: Optional[asyncio.Future[None]]
755    disconnection_result: Optional[asyncio.Future[None]]
756    response: Optional[asyncio.Future[bytes]]
757    sink: Optional[Callable[[bytes], Any]]
758    state: State
759    connection: Connection
760    mtu: int
761    peer_mtu: int
762
763    def __init__(
764        self,
765        manager: ChannelManager,
766        connection: Connection,
767        signaling_cid: int,
768        psm: int,
769        source_cid: int,
770        mtu: int,
771    ) -> None:
772        super().__init__()
773        self.manager = manager
774        self.connection = connection
775        self.signaling_cid = signaling_cid
776        self.state = self.State.CLOSED
777        self.mtu = mtu
778        self.peer_mtu = L2CAP_MIN_BR_EDR_MTU
779        self.psm = psm
780        self.source_cid = source_cid
781        self.destination_cid = 0
782        self.response = None
783        self.connection_result = None
784        self.disconnection_result = None
785        self.sink = None
786
787    def _change_state(self, new_state: State) -> None:
788        logger.debug(f'{self} state change -> {color(new_state.name, "cyan")}')
789        self.state = new_state
790
791    def send_pdu(self, pdu: Union[SupportsBytes, bytes]) -> None:
792        self.manager.send_pdu(self.connection, self.destination_cid, pdu)
793
794    def send_control_frame(self, frame: L2CAP_Control_Frame) -> None:
795        self.manager.send_control_frame(self.connection, self.signaling_cid, frame)
796
797    async def send_request(self, request: SupportsBytes) -> bytes:
798        # Check that there isn't already a request pending
799        if self.response:
800            raise InvalidStateError('request already pending')
801        if self.state != self.State.OPEN:
802            raise InvalidStateError('channel not open')
803
804        self.response = asyncio.get_running_loop().create_future()
805        self.send_pdu(request)
806        return await self.response
807
808    def on_pdu(self, pdu: bytes) -> None:
809        if self.response:
810            self.response.set_result(pdu)
811            self.response = None
812        elif self.sink:
813            # pylint: disable=not-callable
814            self.sink(pdu)
815        else:
816            logger.warning(
817                color('received pdu without a pending request or sink', 'red')
818            )
819
820    async def connect(self) -> None:
821        if self.state != self.State.CLOSED:
822            raise InvalidStateError('invalid state')
823
824        # Check that we can start a new connection
825        if self.connection_result:
826            raise InvalidStateError('connection already pending')
827
828        self._change_state(self.State.WAIT_CONNECT_RSP)
829        self.send_control_frame(
830            L2CAP_Connection_Request(
831                identifier=self.manager.next_identifier(self.connection),
832                psm=self.psm,
833                source_cid=self.source_cid,
834            )
835        )
836
837        # Create a future to wait for the state machine to get to a success or error
838        # state
839        self.connection_result = asyncio.get_running_loop().create_future()
840
841        # Wait for the connection to succeed or fail
842        try:
843            return await self.connection.abort_on(
844                'disconnection', self.connection_result
845            )
846        finally:
847            self.connection_result = None
848
849    async def disconnect(self) -> None:
850        if self.state != self.State.OPEN:
851            raise InvalidStateError('invalid state')
852
853        self._change_state(self.State.WAIT_DISCONNECT)
854        self.send_control_frame(
855            L2CAP_Disconnection_Request(
856                identifier=self.manager.next_identifier(self.connection),
857                destination_cid=self.destination_cid,
858                source_cid=self.source_cid,
859            )
860        )
861
862        # Create a future to wait for the state machine to get to a success or error
863        # state
864        self.disconnection_result = asyncio.get_running_loop().create_future()
865        return await self.disconnection_result
866
867    def abort(self) -> None:
868        if self.state == self.State.OPEN:
869            self._change_state(self.State.CLOSED)
870            self.emit('close')
871
872    def send_configure_request(self) -> None:
873        options = L2CAP_Control_Frame.encode_configuration_options(
874            [
875                (
876                    L2CAP_MAXIMUM_TRANSMISSION_UNIT_CONFIGURATION_OPTION_TYPE,
877                    struct.pack('<H', self.mtu),
878                )
879            ]
880        )
881        self.send_control_frame(
882            L2CAP_Configure_Request(
883                identifier=self.manager.next_identifier(self.connection),
884                destination_cid=self.destination_cid,
885                flags=0x0000,
886                options=options,
887            )
888        )
889
890    def on_connection_request(self, request) -> None:
891        self.destination_cid = request.source_cid
892        self._change_state(self.State.WAIT_CONNECT)
893        self.send_control_frame(
894            L2CAP_Connection_Response(
895                identifier=request.identifier,
896                destination_cid=self.source_cid,
897                source_cid=self.destination_cid,
898                result=L2CAP_Connection_Response.CONNECTION_SUCCESSFUL,
899                status=0x0000,
900            )
901        )
902        self._change_state(self.State.WAIT_CONFIG)
903        self.send_configure_request()
904        self._change_state(self.State.WAIT_CONFIG_REQ_RSP)
905
906    def on_connection_response(self, response):
907        if self.state != self.State.WAIT_CONNECT_RSP:
908            logger.warning(color('invalid state', 'red'))
909            return
910
911        if response.result == L2CAP_Connection_Response.CONNECTION_SUCCESSFUL:
912            self.destination_cid = response.destination_cid
913            self._change_state(self.State.WAIT_CONFIG)
914            self.send_configure_request()
915            self._change_state(self.State.WAIT_CONFIG_REQ_RSP)
916        elif response.result == L2CAP_Connection_Response.CONNECTION_PENDING:
917            pass
918        else:
919            self._change_state(self.State.CLOSED)
920            self.connection_result.set_exception(
921                ProtocolError(
922                    response.result,
923                    'l2cap',
924                    L2CAP_Connection_Response.result_name(response.result),
925                )
926            )
927            self.connection_result = None
928
929    def on_configure_request(self, request) -> None:
930        if self.state not in (
931            self.State.WAIT_CONFIG,
932            self.State.WAIT_CONFIG_REQ,
933            self.State.WAIT_CONFIG_REQ_RSP,
934        ):
935            logger.warning(color('invalid state', 'red'))
936            return
937
938        # Decode the options
939        options = L2CAP_Control_Frame.decode_configuration_options(request.options)
940        for option in options:
941            if option[0] == L2CAP_MTU_CONFIGURATION_PARAMETER_TYPE:
942                self.peer_mtu = struct.unpack('<H', option[1])[0]
943                logger.debug(f'peer MTU = {self.peer_mtu}')
944
945        self.send_control_frame(
946            L2CAP_Configure_Response(
947                identifier=request.identifier,
948                source_cid=self.destination_cid,
949                flags=0x0000,
950                result=L2CAP_Configure_Response.SUCCESS,
951                options=request.options,  # TODO: don't accept everything blindly
952            )
953        )
954        if self.state == self.State.WAIT_CONFIG:
955            self._change_state(self.State.WAIT_SEND_CONFIG)
956            self.send_configure_request()
957            self._change_state(self.State.WAIT_CONFIG_RSP)
958        elif self.state == self.State.WAIT_CONFIG_REQ:
959            self._change_state(self.State.OPEN)
960            if self.connection_result:
961                self.connection_result.set_result(None)
962                self.connection_result = None
963            self.emit('open')
964        elif self.state == self.State.WAIT_CONFIG_REQ_RSP:
965            self._change_state(self.State.WAIT_CONFIG_RSP)
966
967    def on_configure_response(self, response) -> None:
968        if response.result == L2CAP_Configure_Response.SUCCESS:
969            if self.state == self.State.WAIT_CONFIG_REQ_RSP:
970                self._change_state(self.State.WAIT_CONFIG_REQ)
971            elif self.state in (
972                self.State.WAIT_CONFIG_RSP,
973                self.State.WAIT_CONTROL_IND,
974            ):
975                self._change_state(self.State.OPEN)
976                if self.connection_result:
977                    self.connection_result.set_result(None)
978                    self.connection_result = None
979                self.emit('open')
980            else:
981                logger.warning(color('invalid state', 'red'))
982        elif (
983            response.result == L2CAP_Configure_Response.FAILURE_UNACCEPTABLE_PARAMETERS
984        ):
985            # Re-configure with what's suggested in the response
986            self.send_control_frame(
987                L2CAP_Configure_Request(
988                    identifier=self.manager.next_identifier(self.connection),
989                    destination_cid=self.destination_cid,
990                    flags=0x0000,
991                    options=response.options,
992                )
993            )
994        else:
995            logger.warning(
996                color(
997                    '!!! configuration rejected: '
998                    f'{L2CAP_Configure_Response.result_name(response.result)}',
999                    'red',
1000                )
1001            )
1002            # TODO: decide how to fail gracefully
1003
1004    def on_disconnection_request(self, request) -> None:
1005        if self.state in (self.State.OPEN, self.State.WAIT_DISCONNECT):
1006            self.send_control_frame(
1007                L2CAP_Disconnection_Response(
1008                    identifier=request.identifier,
1009                    destination_cid=request.destination_cid,
1010                    source_cid=request.source_cid,
1011                )
1012            )
1013            self._change_state(self.State.CLOSED)
1014            self.emit('close')
1015            self.manager.on_channel_closed(self)
1016        else:
1017            logger.warning(color('invalid state', 'red'))
1018
1019    def on_disconnection_response(self, response) -> None:
1020        if self.state != self.State.WAIT_DISCONNECT:
1021            logger.warning(color('invalid state', 'red'))
1022            return
1023
1024        if (
1025            response.destination_cid != self.destination_cid
1026            or response.source_cid != self.source_cid
1027        ):
1028            logger.warning('unexpected source or destination CID')
1029            return
1030
1031        self._change_state(self.State.CLOSED)
1032        if self.disconnection_result:
1033            self.disconnection_result.set_result(None)
1034            self.disconnection_result = None
1035        self.emit('close')
1036        self.manager.on_channel_closed(self)
1037
1038    def __str__(self) -> str:
1039        return (
1040            f'Channel({self.source_cid}->{self.destination_cid}, '
1041            f'PSM={self.psm}, '
1042            f'MTU={self.mtu}/{self.peer_mtu}, '
1043            f'state={self.state.name})'
1044        )
1045
1046
1047# -----------------------------------------------------------------------------
1048class LeCreditBasedChannel(EventEmitter):
1049    """
1050    LE Credit-based Connection Oriented Channel
1051    """
1052
1053    class State(enum.IntEnum):
1054        INIT = 0
1055        CONNECTED = 1
1056        CONNECTING = 2
1057        DISCONNECTING = 3
1058        DISCONNECTED = 4
1059        CONNECTION_ERROR = 5
1060
1061    out_queue: Deque[bytes]
1062    connection_result: Optional[asyncio.Future[LeCreditBasedChannel]]
1063    disconnection_result: Optional[asyncio.Future[None]]
1064    in_sdu: Optional[bytes]
1065    out_sdu: Optional[bytes]
1066    state: State
1067    connection: Connection
1068    sink: Optional[Callable[[bytes], Any]]
1069
1070    def __init__(
1071        self,
1072        manager: ChannelManager,
1073        connection: Connection,
1074        le_psm: int,
1075        source_cid: int,
1076        destination_cid: int,
1077        mtu: int,
1078        mps: int,
1079        credits: int,  # pylint: disable=redefined-builtin
1080        peer_mtu: int,
1081        peer_mps: int,
1082        peer_credits: int,
1083        connected: bool,
1084    ) -> None:
1085        super().__init__()
1086        self.manager = manager
1087        self.connection = connection
1088        self.le_psm = le_psm
1089        self.source_cid = source_cid
1090        self.destination_cid = destination_cid
1091        self.mtu = mtu
1092        self.mps = mps
1093        self.credits = credits
1094        self.peer_mtu = peer_mtu
1095        self.peer_mps = peer_mps
1096        self.peer_credits = peer_credits
1097        self.peer_max_credits = self.peer_credits
1098        self.peer_credits_threshold = self.peer_max_credits // 2
1099        self.in_sdu = None
1100        self.in_sdu_length = 0
1101        self.out_queue = deque()
1102        self.out_sdu = None
1103        self.sink = None
1104        self.connected = False
1105        self.connection_result = None
1106        self.disconnection_result = None
1107        self.drained = asyncio.Event()
1108
1109        self.drained.set()
1110
1111        if connected:
1112            self.state = self.State.CONNECTED
1113        else:
1114            self.state = self.State.INIT
1115
1116    def _change_state(self, new_state: State) -> None:
1117        logger.debug(f'{self} state change -> {color(new_state.name, "cyan")}')
1118        self.state = new_state
1119
1120        if new_state == self.State.CONNECTED:
1121            self.emit('open')
1122        elif new_state == self.State.DISCONNECTED:
1123            self.emit('close')
1124
1125    def send_pdu(self, pdu: Union[SupportsBytes, bytes]) -> None:
1126        self.manager.send_pdu(self.connection, self.destination_cid, pdu)
1127
1128    def send_control_frame(self, frame: L2CAP_Control_Frame) -> None:
1129        self.manager.send_control_frame(self.connection, L2CAP_LE_SIGNALING_CID, frame)
1130
1131    async def connect(self) -> LeCreditBasedChannel:
1132        # Check that we're in the right state
1133        if self.state != self.State.INIT:
1134            raise InvalidStateError('not in a connectable state')
1135
1136        # Check that we can start a new connection
1137        identifier = self.manager.next_identifier(self.connection)
1138        if identifier in self.manager.le_coc_requests:
1139            raise InvalidStateError('too many concurrent connection requests')
1140
1141        self._change_state(self.State.CONNECTING)
1142        request = L2CAP_LE_Credit_Based_Connection_Request(
1143            identifier=identifier,
1144            le_psm=self.le_psm,
1145            source_cid=self.source_cid,
1146            mtu=self.mtu,
1147            mps=self.mps,
1148            initial_credits=self.peer_credits,
1149        )
1150        self.manager.le_coc_requests[identifier] = request
1151        self.send_control_frame(request)
1152
1153        # Create a future to wait for the response
1154        self.connection_result = asyncio.get_running_loop().create_future()
1155
1156        # Wait for the connection to succeed or fail
1157        return await self.connection_result
1158
1159    async def disconnect(self) -> None:
1160        # Check that we're connected
1161        if self.state != self.State.CONNECTED:
1162            raise InvalidStateError('not connected')
1163
1164        self._change_state(self.State.DISCONNECTING)
1165        self.flush_output()
1166        self.send_control_frame(
1167            L2CAP_Disconnection_Request(
1168                identifier=self.manager.next_identifier(self.connection),
1169                destination_cid=self.destination_cid,
1170                source_cid=self.source_cid,
1171            )
1172        )
1173
1174        # Create a future to wait for the state machine to get to a success or error
1175        # state
1176        self.disconnection_result = asyncio.get_running_loop().create_future()
1177        return await self.disconnection_result
1178
1179    def abort(self) -> None:
1180        if self.state == self.State.CONNECTED:
1181            self._change_state(self.State.DISCONNECTED)
1182
1183    def on_pdu(self, pdu: bytes) -> None:
1184        if self.sink is None:
1185            logger.warning('received pdu without a sink')
1186            return
1187
1188        if self.state != self.State.CONNECTED:
1189            logger.warning('received PDU while not connected, dropping')
1190
1191        # Manage the peer credits
1192        if self.peer_credits == 0:
1193            logger.warning('received LE frame when peer out of credits')
1194        else:
1195            self.peer_credits -= 1
1196            if self.peer_credits <= self.peer_credits_threshold:
1197                # The credits fell below the threshold, replenish them to the max
1198                self.send_control_frame(
1199                    L2CAP_LE_Flow_Control_Credit(
1200                        identifier=self.manager.next_identifier(self.connection),
1201                        cid=self.source_cid,
1202                        credits=self.peer_max_credits - self.peer_credits,
1203                    )
1204                )
1205                self.peer_credits = self.peer_max_credits
1206
1207        # Check if this starts a new SDU
1208        if self.in_sdu is None:
1209            # Start a new SDU
1210            self.in_sdu = pdu
1211        else:
1212            # Continue an SDU
1213            self.in_sdu += pdu
1214
1215        # Check if the SDU is complete
1216        if self.in_sdu_length == 0:
1217            # We don't know the size yet, check if we have received the header to
1218            # compute it
1219            if len(self.in_sdu) >= 2:
1220                self.in_sdu_length = struct.unpack_from('<H', self.in_sdu, 0)[0]
1221        if self.in_sdu_length == 0:
1222            # We'll compute it later
1223            return
1224        if len(self.in_sdu) < 2 + self.in_sdu_length:
1225            # Not complete yet
1226            logger.debug(
1227                f'SDU: {len(self.in_sdu) - 2} of {self.in_sdu_length} bytes received'
1228            )
1229            return
1230        if len(self.in_sdu) != 2 + self.in_sdu_length:
1231            # Overflow
1232            logger.warning(
1233                f'SDU overflow: sdu_length={self.in_sdu_length}, '
1234                f'received {len(self.in_sdu) - 2}'
1235            )
1236            # TODO: we should disconnect
1237            self.in_sdu = None
1238            self.in_sdu_length = 0
1239            return
1240
1241        # Send the SDU to the sink
1242        logger.debug(f'SDU complete: 2+{len(self.in_sdu) - 2} bytes')
1243        self.sink(self.in_sdu[2:])  # pylint: disable=not-callable
1244
1245        # Prepare for a new SDU
1246        self.in_sdu = None
1247        self.in_sdu_length = 0
1248
1249    def on_connection_response(self, response) -> None:
1250        # Look for a matching pending response result
1251        if self.connection_result is None:
1252            logger.warning(
1253                f'received unexpected connection response (id={response.identifier})'
1254            )
1255            return
1256
1257        if (
1258            response.result
1259            == L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_SUCCESSFUL
1260        ):
1261            self.destination_cid = response.destination_cid
1262            self.peer_mtu = response.mtu
1263            self.peer_mps = response.mps
1264            self.credits = response.initial_credits
1265            self.connected = True
1266            self.connection_result.set_result(self)
1267            self._change_state(self.State.CONNECTED)
1268        else:
1269            self.connection_result.set_exception(
1270                ProtocolError(
1271                    response.result,
1272                    'l2cap',
1273                    L2CAP_LE_Credit_Based_Connection_Response.result_name(
1274                        response.result
1275                    ),
1276                )
1277            )
1278            self._change_state(self.State.CONNECTION_ERROR)
1279
1280        # Cleanup
1281        self.connection_result = None
1282
1283    def on_credits(self, credits: int) -> None:  # pylint: disable=redefined-builtin
1284        self.credits += credits
1285        logger.debug(f'received {credits} credits, total = {self.credits}')
1286
1287        # Try to send more data if we have any queued up
1288        self.process_output()
1289
1290    def on_disconnection_request(self, request) -> None:
1291        self.send_control_frame(
1292            L2CAP_Disconnection_Response(
1293                identifier=request.identifier,
1294                destination_cid=request.destination_cid,
1295                source_cid=request.source_cid,
1296            )
1297        )
1298        self._change_state(self.State.DISCONNECTED)
1299        self.flush_output()
1300
1301    def on_disconnection_response(self, response) -> None:
1302        if self.state != self.State.DISCONNECTING:
1303            logger.warning(color('invalid state', 'red'))
1304            return
1305
1306        if (
1307            response.destination_cid != self.destination_cid
1308            or response.source_cid != self.source_cid
1309        ):
1310            logger.warning('unexpected source or destination CID')
1311            return
1312
1313        self._change_state(self.State.DISCONNECTED)
1314        if self.disconnection_result:
1315            self.disconnection_result.set_result(None)
1316            self.disconnection_result = None
1317
1318    def flush_output(self) -> None:
1319        self.out_queue.clear()
1320        self.out_sdu = None
1321
1322    def process_output(self) -> None:
1323        while self.credits > 0:
1324            if self.out_sdu is not None:
1325                # Finish the current SDU
1326                packet = self.out_sdu[: self.peer_mps]
1327                self.send_pdu(packet)
1328                self.credits -= 1
1329                logger.debug(f'sent {len(packet)} bytes, {self.credits} credits left')
1330                if len(packet) == len(self.out_sdu):
1331                    # We sent everything
1332                    self.out_sdu = None
1333                else:
1334                    # Keep what's still left to send
1335                    self.out_sdu = self.out_sdu[len(packet) :]
1336                continue
1337
1338            if self.out_queue:
1339                # Create the next SDU (2 bytes header plus up to MTU bytes payload)
1340                logger.debug(
1341                    f'assembling SDU from {len(self.out_queue)} packets in output queue'
1342                )
1343                payload = b''
1344                while self.out_queue and len(payload) < self.peer_mtu:
1345                    # We can add more data to the payload
1346                    chunk = self.out_queue[0][: self.peer_mtu - len(payload)]
1347                    payload += chunk
1348                    self.out_queue[0] = self.out_queue[0][len(chunk) :]
1349                    if len(self.out_queue[0]) == 0:
1350                        # We consumed the entire buffer, remove it
1351                        self.out_queue.popleft()
1352                        logger.debug(
1353                            f'packet completed, {len(self.out_queue)} left in queue'
1354                        )
1355
1356                # Construct the SDU with its header
1357                assert len(payload) != 0
1358                logger.debug(f'SDU complete: {len(payload)} payload bytes')
1359                self.out_sdu = struct.pack('<H', len(payload)) + payload
1360            else:
1361                # Nothing left to send for now
1362                self.drained.set()
1363                return
1364
1365    def write(self, data: bytes) -> None:
1366        if self.state != self.State.CONNECTED:
1367            logger.warning('not connected, dropping data')
1368            return
1369
1370        # Queue the data
1371        self.out_queue.append(data)
1372        self.drained.clear()
1373        logger.debug(
1374            f'{len(data)} bytes packet queued, {len(self.out_queue)} packets in queue'
1375        )
1376
1377        # Send what we can
1378        self.process_output()
1379
1380    async def drain(self) -> None:
1381        await self.drained.wait()
1382
1383    def pause_reading(self) -> None:
1384        # TODO: not implemented yet
1385        pass
1386
1387    def resume_reading(self) -> None:
1388        # TODO: not implemented yet
1389        pass
1390
1391    def __str__(self) -> str:
1392        return (
1393            f'CoC({self.source_cid}->{self.destination_cid}, '
1394            f'State={self.state.name}, '
1395            f'PSM={self.le_psm}, '
1396            f'MTU={self.mtu}/{self.peer_mtu}, '
1397            f'MPS={self.mps}/{self.peer_mps}, '
1398            f'credits={self.credits}/{self.peer_credits})'
1399        )
1400
1401
1402# -----------------------------------------------------------------------------
1403class ClassicChannelServer(EventEmitter):
1404    def __init__(
1405        self,
1406        manager: ChannelManager,
1407        psm: int,
1408        handler: Optional[Callable[[ClassicChannel], Any]],
1409        mtu: int,
1410    ) -> None:
1411        super().__init__()
1412        self.manager = manager
1413        self.handler = handler
1414        self.psm = psm
1415        self.mtu = mtu
1416
1417    def on_connection(self, channel: ClassicChannel) -> None:
1418        self.emit('connection', channel)
1419        if self.handler:
1420            self.handler(channel)
1421
1422    def close(self) -> None:
1423        if self.psm in self.manager.servers:
1424            del self.manager.servers[self.psm]
1425
1426
1427# -----------------------------------------------------------------------------
1428class LeCreditBasedChannelServer(EventEmitter):
1429    def __init__(
1430        self,
1431        manager: ChannelManager,
1432        psm: int,
1433        handler: Optional[Callable[[LeCreditBasedChannel], Any]],
1434        max_credits: int,
1435        mtu: int,
1436        mps: int,
1437    ) -> None:
1438        super().__init__()
1439        self.manager = manager
1440        self.handler = handler
1441        self.psm = psm
1442        self.max_credits = max_credits
1443        self.mtu = mtu
1444        self.mps = mps
1445
1446    def on_connection(self, channel: LeCreditBasedChannel) -> None:
1447        self.emit('connection', channel)
1448        if self.handler:
1449            self.handler(channel)
1450
1451    def close(self) -> None:
1452        if self.psm in self.manager.le_coc_servers:
1453            del self.manager.le_coc_servers[self.psm]
1454
1455
1456# -----------------------------------------------------------------------------
1457class ChannelManager:
1458    identifiers: Dict[int, int]
1459    channels: Dict[int, Dict[int, Union[ClassicChannel, LeCreditBasedChannel]]]
1460    servers: Dict[int, ClassicChannelServer]
1461    le_coc_channels: Dict[int, Dict[int, LeCreditBasedChannel]]
1462    le_coc_servers: Dict[int, LeCreditBasedChannelServer]
1463    le_coc_requests: Dict[int, L2CAP_LE_Credit_Based_Connection_Request]
1464    fixed_channels: Dict[int, Optional[Callable[[int, bytes], Any]]]
1465    _host: Optional[Host]
1466    connection_parameters_update_response: Optional[asyncio.Future[int]]
1467
1468    def __init__(
1469        self,
1470        extended_features: Iterable[int] = (),
1471        connectionless_mtu: int = L2CAP_DEFAULT_CONNECTIONLESS_MTU,
1472    ) -> None:
1473        self._host = None
1474        self.identifiers = {}  # Incrementing identifier values by connection
1475        self.channels = {}  # All channels, mapped by connection and source cid
1476        self.fixed_channels = {  # Fixed channel handlers, mapped by cid
1477            L2CAP_SIGNALING_CID: None,
1478            L2CAP_LE_SIGNALING_CID: None,
1479        }
1480        self.servers = {}  # Servers accepting connections, by PSM
1481        self.le_coc_channels = (
1482            {}
1483        )  # LE CoC channels, mapped by connection and destination cid
1484        self.le_coc_servers = {}  # LE CoC - Servers accepting connections, by PSM
1485        self.le_coc_requests = {}  # LE CoC connection requests, by identifier
1486        self.extended_features = extended_features
1487        self.connectionless_mtu = connectionless_mtu
1488        self.connection_parameters_update_response = None
1489
1490    @property
1491    def host(self) -> Host:
1492        assert self._host
1493        return self._host
1494
1495    @host.setter
1496    def host(self, host: Host) -> None:
1497        if self._host is not None:
1498            self._host.remove_listener('disconnection', self.on_disconnection)
1499        self._host = host
1500        if host is not None:
1501            host.on('disconnection', self.on_disconnection)
1502
1503    def find_channel(self, connection_handle: int, cid: int):
1504        if connection_channels := self.channels.get(connection_handle):
1505            return connection_channels.get(cid)
1506
1507        return None
1508
1509    def find_le_coc_channel(self, connection_handle: int, cid: int):
1510        if connection_channels := self.le_coc_channels.get(connection_handle):
1511            return connection_channels.get(cid)
1512
1513        return None
1514
1515    @staticmethod
1516    def find_free_br_edr_cid(channels: Iterable[int]) -> int:
1517        # Pick the smallest valid CID that's not already in the list
1518        # (not necessarily the most efficient algorithm, but the list of CID is
1519        # very small in practice)
1520        for cid in range(
1521            L2CAP_ACL_U_DYNAMIC_CID_RANGE_START, L2CAP_ACL_U_DYNAMIC_CID_RANGE_END + 1
1522        ):
1523            if cid not in channels:
1524                return cid
1525
1526        raise OutOfResourcesError('no free CID available')
1527
1528    @staticmethod
1529    def find_free_le_cid(channels: Iterable[int]) -> int:
1530        # Pick the smallest valid CID that's not already in the list
1531        # (not necessarily the most efficient algorithm, but the list of CID is
1532        # very small in practice)
1533        for cid in range(
1534            L2CAP_LE_U_DYNAMIC_CID_RANGE_START, L2CAP_LE_U_DYNAMIC_CID_RANGE_END + 1
1535        ):
1536            if cid not in channels:
1537                return cid
1538
1539        raise OutOfResourcesError('no free CID')
1540
1541    def next_identifier(self, connection: Connection) -> int:
1542        identifier = (self.identifiers.setdefault(connection.handle, 0) + 1) % 256
1543        self.identifiers[connection.handle] = identifier
1544        return identifier
1545
1546    def register_fixed_channel(
1547        self, cid: int, handler: Callable[[int, bytes], Any]
1548    ) -> None:
1549        self.fixed_channels[cid] = handler
1550
1551    def deregister_fixed_channel(self, cid: int) -> None:
1552        if cid in self.fixed_channels:
1553            del self.fixed_channels[cid]
1554
1555    @deprecated("Please use create_classic_server")
1556    def register_server(
1557        self,
1558        psm: int,
1559        server: Callable[[ClassicChannel], Any],
1560    ) -> int:
1561        return self.create_classic_server(
1562            handler=server, spec=ClassicChannelSpec(psm=psm)
1563        ).psm
1564
1565    def create_classic_server(
1566        self,
1567        spec: ClassicChannelSpec,
1568        handler: Optional[Callable[[ClassicChannel], Any]] = None,
1569    ) -> ClassicChannelServer:
1570        if not spec.psm:
1571            # Find a free PSM
1572            for candidate in range(
1573                L2CAP_PSM_DYNAMIC_RANGE_START, L2CAP_PSM_DYNAMIC_RANGE_END + 1, 2
1574            ):
1575                if (candidate >> 8) % 2 == 1:
1576                    continue
1577                if candidate in self.servers:
1578                    continue
1579                spec.psm = candidate
1580                break
1581            else:
1582                raise InvalidStateError('no free PSM')
1583        else:
1584            # Check that the PSM isn't already in use
1585            if spec.psm in self.servers:
1586                raise InvalidArgumentError('PSM already in use')
1587
1588            # Check that the PSM is valid
1589            if spec.psm % 2 == 0:
1590                raise InvalidArgumentError('invalid PSM (not odd)')
1591            check = spec.psm >> 8
1592            while check:
1593                if check % 2 != 0:
1594                    raise InvalidArgumentError('invalid PSM')
1595                check >>= 8
1596
1597        self.servers[spec.psm] = ClassicChannelServer(self, spec.psm, handler, spec.mtu)
1598
1599        return self.servers[spec.psm]
1600
1601    @deprecated("Please use create_le_credit_based_server()")
1602    def register_le_coc_server(
1603        self,
1604        psm: int,
1605        server: Callable[[LeCreditBasedChannel], Any],
1606        max_credits: int,
1607        mtu: int,
1608        mps: int,
1609    ) -> int:
1610        return self.create_le_credit_based_server(
1611            spec=LeCreditBasedChannelSpec(
1612                psm=None if psm == 0 else psm, mtu=mtu, mps=mps, max_credits=max_credits
1613            ),
1614            handler=server,
1615        ).psm
1616
1617    def create_le_credit_based_server(
1618        self,
1619        spec: LeCreditBasedChannelSpec,
1620        handler: Optional[Callable[[LeCreditBasedChannel], Any]] = None,
1621    ) -> LeCreditBasedChannelServer:
1622        if not spec.psm:
1623            # Find a free PSM
1624            for candidate in range(
1625                L2CAP_LE_PSM_DYNAMIC_RANGE_START, L2CAP_LE_PSM_DYNAMIC_RANGE_END + 1
1626            ):
1627                if candidate in self.le_coc_servers:
1628                    continue
1629                spec.psm = candidate
1630                break
1631            else:
1632                raise InvalidStateError('no free PSM')
1633        else:
1634            # Check that the PSM isn't already in use
1635            if spec.psm in self.le_coc_servers:
1636                raise InvalidArgumentError('PSM already in use')
1637
1638        self.le_coc_servers[spec.psm] = LeCreditBasedChannelServer(
1639            self,
1640            spec.psm,
1641            handler,
1642            max_credits=spec.max_credits,
1643            mtu=spec.mtu,
1644            mps=spec.mps,
1645        )
1646
1647        return self.le_coc_servers[spec.psm]
1648
1649    def on_disconnection(self, connection_handle: int, _reason: int) -> None:
1650        logger.debug(f'disconnection from {connection_handle}, cleaning up channels')
1651        if connection_handle in self.channels:
1652            for _, channel in self.channels[connection_handle].items():
1653                channel.abort()
1654            del self.channels[connection_handle]
1655        if connection_handle in self.le_coc_channels:
1656            for _, channel in self.le_coc_channels[connection_handle].items():
1657                channel.abort()
1658            del self.le_coc_channels[connection_handle]
1659        if connection_handle in self.identifiers:
1660            del self.identifiers[connection_handle]
1661
1662    def send_pdu(self, connection, cid: int, pdu: Union[SupportsBytes, bytes]) -> None:
1663        pdu_str = pdu.hex() if isinstance(pdu, bytes) else str(pdu)
1664        pdu_bytes = bytes(pdu)
1665        logger.debug(
1666            f'{color(">>> Sending L2CAP PDU", "blue")} '
1667            f'on connection [0x{connection.handle:04X}] (CID={cid}) '
1668            f'{connection.peer_address}: {len(pdu_bytes)} bytes, {pdu_str}'
1669        )
1670        self.host.send_l2cap_pdu(connection.handle, cid, pdu_bytes)
1671
1672    def on_pdu(self, connection: Connection, cid: int, pdu: bytes) -> None:
1673        if cid in (L2CAP_SIGNALING_CID, L2CAP_LE_SIGNALING_CID):
1674            # Parse the L2CAP payload into a Control Frame object
1675            control_frame = L2CAP_Control_Frame.from_bytes(pdu)
1676
1677            self.on_control_frame(connection, cid, control_frame)
1678        elif cid in self.fixed_channels:
1679            handler = self.fixed_channels[cid]
1680            assert handler is not None
1681            handler(connection.handle, pdu)
1682        else:
1683            if (channel := self.find_channel(connection.handle, cid)) is None:
1684                logger.warning(
1685                    color(
1686                        f'channel not found for 0x{connection.handle:04X}:{cid}', 'red'
1687                    )
1688                )
1689                return
1690
1691            channel.on_pdu(pdu)
1692
1693    def send_control_frame(
1694        self, connection: Connection, cid: int, control_frame: L2CAP_Control_Frame
1695    ) -> None:
1696        logger.debug(
1697            f'{color(">>> Sending L2CAP Signaling Control Frame", "blue")} '
1698            f'on connection [0x{connection.handle:04X}] (CID={cid}) '
1699            f'{connection.peer_address}:\n{control_frame}'
1700        )
1701        self.host.send_l2cap_pdu(connection.handle, cid, bytes(control_frame))
1702
1703    def on_control_frame(
1704        self, connection: Connection, cid: int, control_frame: L2CAP_Control_Frame
1705    ) -> None:
1706        logger.debug(
1707            f'{color("<<< Received L2CAP Signaling Control Frame", "green")} '
1708            f'on connection [0x{connection.handle:04X}] (CID={cid}) '
1709            f'{connection.peer_address}:\n{control_frame}'
1710        )
1711
1712        # Find the handler method
1713        handler_name = f'on_{control_frame.name.lower()}'
1714        handler = getattr(self, handler_name, None)
1715        if handler:
1716            try:
1717                handler(connection, cid, control_frame)
1718            except Exception as error:
1719                logger.warning(f'{color("!!! Exception in handler:", "red")} {error}')
1720                self.send_control_frame(
1721                    connection,
1722                    cid,
1723                    L2CAP_Command_Reject(
1724                        identifier=control_frame.identifier,
1725                        reason=L2CAP_COMMAND_NOT_UNDERSTOOD_REASON,
1726                        data=b'',
1727                    ),
1728                )
1729                raise error
1730        else:
1731            logger.error(color('Channel Manager command not handled???', 'red'))
1732            self.send_control_frame(
1733                connection,
1734                cid,
1735                L2CAP_Command_Reject(
1736                    identifier=control_frame.identifier,
1737                    reason=L2CAP_COMMAND_NOT_UNDERSTOOD_REASON,
1738                    data=b'',
1739                ),
1740            )
1741
1742    def on_l2cap_command_reject(
1743        self, _connection: Connection, _cid: int, packet
1744    ) -> None:
1745        logger.warning(f'{color("!!! Command rejected:", "red")} {packet.reason}')
1746
1747    def on_l2cap_connection_request(
1748        self, connection: Connection, cid: int, request
1749    ) -> None:
1750        # Check if there's a server for this PSM
1751        server = self.servers.get(request.psm)
1752        if server:
1753            # Find a free CID for this new channel
1754            connection_channels = self.channels.setdefault(connection.handle, {})
1755            source_cid = self.find_free_br_edr_cid(connection_channels)
1756            if source_cid is None:  # Should never happen!
1757                self.send_control_frame(
1758                    connection,
1759                    cid,
1760                    L2CAP_Connection_Response(
1761                        identifier=request.identifier,
1762                        destination_cid=request.source_cid,
1763                        source_cid=0,
1764                        # pylint: disable=line-too-long
1765                        result=L2CAP_Connection_Response.CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE,
1766                        status=0x0000,
1767                    ),
1768                )
1769                return
1770
1771            # Create a new channel
1772            logger.debug(
1773                f'creating server channel with cid={source_cid} for psm {request.psm}'
1774            )
1775            channel = ClassicChannel(
1776                self, connection, cid, request.psm, source_cid, server.mtu
1777            )
1778            connection_channels[source_cid] = channel
1779
1780            # Notify
1781            server.on_connection(channel)
1782            channel.on_connection_request(request)
1783        else:
1784            logger.warning(
1785                f'No server for connection 0x{connection.handle:04X} '
1786                f'on PSM {request.psm}'
1787            )
1788            self.send_control_frame(
1789                connection,
1790                cid,
1791                L2CAP_Connection_Response(
1792                    identifier=request.identifier,
1793                    destination_cid=request.source_cid,
1794                    source_cid=0,
1795                    # pylint: disable=line-too-long
1796                    result=L2CAP_Connection_Response.CONNECTION_REFUSED_PSM_NOT_SUPPORTED,
1797                    status=0x0000,
1798                ),
1799            )
1800
1801    def on_l2cap_connection_response(
1802        self, connection: Connection, cid: int, response
1803    ) -> None:
1804        if (
1805            channel := self.find_channel(connection.handle, response.source_cid)
1806        ) is None:
1807            logger.warning(
1808                color(
1809                    f'channel {response.source_cid} not found for '
1810                    f'0x{connection.handle:04X}:{cid}',
1811                    'red',
1812                )
1813            )
1814            return
1815
1816        channel.on_connection_response(response)
1817
1818    def on_l2cap_configure_request(
1819        self, connection: Connection, cid: int, request
1820    ) -> None:
1821        if (
1822            channel := self.find_channel(connection.handle, request.destination_cid)
1823        ) is None:
1824            logger.warning(
1825                color(
1826                    f'channel {request.destination_cid} not found for '
1827                    f'0x{connection.handle:04X}:{cid}',
1828                    'red',
1829                )
1830            )
1831            return
1832
1833        channel.on_configure_request(request)
1834
1835    def on_l2cap_configure_response(
1836        self, connection: Connection, cid: int, response
1837    ) -> None:
1838        if (
1839            channel := self.find_channel(connection.handle, response.source_cid)
1840        ) is None:
1841            logger.warning(
1842                color(
1843                    f'channel {response.source_cid} not found for '
1844                    f'0x{connection.handle:04X}:{cid}',
1845                    'red',
1846                )
1847            )
1848            return
1849
1850        channel.on_configure_response(response)
1851
1852    def on_l2cap_disconnection_request(
1853        self, connection: Connection, cid: int, request
1854    ) -> None:
1855        if (
1856            channel := self.find_channel(connection.handle, request.destination_cid)
1857        ) is None:
1858            logger.warning(
1859                color(
1860                    f'channel {request.destination_cid} not found for '
1861                    f'0x{connection.handle:04X}:{cid}',
1862                    'red',
1863                )
1864            )
1865            return
1866
1867        channel.on_disconnection_request(request)
1868
1869    def on_l2cap_disconnection_response(
1870        self, connection: Connection, cid: int, response
1871    ) -> None:
1872        if (
1873            channel := self.find_channel(connection.handle, response.source_cid)
1874        ) is None:
1875            logger.warning(
1876                color(
1877                    f'channel {response.source_cid} not found for '
1878                    f'0x{connection.handle:04X}:{cid}',
1879                    'red',
1880                )
1881            )
1882            return
1883
1884        channel.on_disconnection_response(response)
1885
1886    def on_l2cap_echo_request(self, connection: Connection, cid: int, request) -> None:
1887        logger.debug(f'<<< Echo request: data={request.data.hex()}')
1888        self.send_control_frame(
1889            connection,
1890            cid,
1891            L2CAP_Echo_Response(identifier=request.identifier, data=request.data),
1892        )
1893
1894    def on_l2cap_echo_response(
1895        self, _connection: Connection, _cid: int, response
1896    ) -> None:
1897        logger.debug(f'<<< Echo response: data={response.data.hex()}')
1898        # TODO notify listeners
1899
1900    def on_l2cap_information_request(
1901        self, connection: Connection, cid: int, request
1902    ) -> None:
1903        if request.info_type == L2CAP_Information_Request.CONNECTIONLESS_MTU:
1904            result = L2CAP_Information_Response.SUCCESS
1905            data = self.connectionless_mtu.to_bytes(2, 'little')
1906        elif request.info_type == L2CAP_Information_Request.EXTENDED_FEATURES_SUPPORTED:
1907            result = L2CAP_Information_Response.SUCCESS
1908            data = sum(self.extended_features).to_bytes(4, 'little')
1909        elif request.info_type == L2CAP_Information_Request.FIXED_CHANNELS_SUPPORTED:
1910            result = L2CAP_Information_Response.SUCCESS
1911            data = sum(1 << cid for cid in self.fixed_channels).to_bytes(8, 'little')
1912        else:
1913            result = L2CAP_Information_Response.NOT_SUPPORTED
1914
1915        self.send_control_frame(
1916            connection,
1917            cid,
1918            L2CAP_Information_Response(
1919                identifier=request.identifier,
1920                info_type=request.info_type,
1921                result=result,
1922                data=data,
1923            ),
1924        )
1925
1926    def on_l2cap_connection_parameter_update_request(
1927        self, connection: Connection, cid: int, request
1928    ):
1929        if connection.role == BT_CENTRAL_ROLE:
1930            self.send_control_frame(
1931                connection,
1932                cid,
1933                L2CAP_Connection_Parameter_Update_Response(
1934                    identifier=request.identifier,
1935                    result=L2CAP_CONNECTION_PARAMETERS_ACCEPTED_RESULT,
1936                ),
1937            )
1938            self.host.send_command_sync(
1939                HCI_LE_Connection_Update_Command(
1940                    connection_handle=connection.handle,
1941                    connection_interval_min=request.interval_min,
1942                    connection_interval_max=request.interval_max,
1943                    max_latency=request.latency,
1944                    supervision_timeout=request.timeout,
1945                    min_ce_length=0,
1946                    max_ce_length=0,
1947                )
1948            )
1949        else:
1950            self.send_control_frame(
1951                connection,
1952                cid,
1953                L2CAP_Connection_Parameter_Update_Response(
1954                    identifier=request.identifier,
1955                    result=L2CAP_CONNECTION_PARAMETERS_REJECTED_RESULT,
1956                ),
1957            )
1958
1959    async def update_connection_parameters(
1960        self,
1961        connection: Connection,
1962        interval_min: int,
1963        interval_max: int,
1964        latency: int,
1965        timeout: int,
1966    ) -> int:
1967        # Check that there isn't already a request pending
1968        if self.connection_parameters_update_response:
1969            raise InvalidStateError('request already pending')
1970        self.connection_parameters_update_response = (
1971            asyncio.get_running_loop().create_future()
1972        )
1973        self.send_control_frame(
1974            connection,
1975            L2CAP_LE_SIGNALING_CID,
1976            L2CAP_Connection_Parameter_Update_Request(
1977                interval_min=interval_min,
1978                interval_max=interval_max,
1979                latency=latency,
1980                timeout=timeout,
1981            ),
1982        )
1983        return await self.connection_parameters_update_response
1984
1985    def on_l2cap_connection_parameter_update_response(
1986        self, connection: Connection, cid: int, response
1987    ) -> None:
1988        if self.connection_parameters_update_response:
1989            self.connection_parameters_update_response.set_result(response.result)
1990            self.connection_parameters_update_response = None
1991        else:
1992            logger.warning(
1993                color(
1994                    'received l2cap_connection_parameter_update_response without a pending request',
1995                    'red',
1996                )
1997            )
1998
1999    def on_l2cap_le_credit_based_connection_request(
2000        self, connection: Connection, cid: int, request
2001    ) -> None:
2002        if request.le_psm in self.le_coc_servers:
2003            server = self.le_coc_servers[request.le_psm]
2004
2005            # Check that the CID isn't already used
2006            le_connection_channels = self.le_coc_channels.setdefault(
2007                connection.handle, {}
2008            )
2009            if request.source_cid in le_connection_channels:
2010                logger.warning(f'source CID {request.source_cid} already in use')
2011                self.send_control_frame(
2012                    connection,
2013                    cid,
2014                    L2CAP_LE_Credit_Based_Connection_Response(
2015                        identifier=request.identifier,
2016                        destination_cid=0,
2017                        mtu=server.mtu,
2018                        mps=server.mps,
2019                        initial_credits=0,
2020                        # pylint: disable=line-too-long
2021                        result=L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED,
2022                    ),
2023                )
2024                return
2025
2026            # Find a free CID for this new channel
2027            connection_channels = self.channels.setdefault(connection.handle, {})
2028            source_cid = self.find_free_le_cid(connection_channels)
2029            if source_cid is None:  # Should never happen!
2030                self.send_control_frame(
2031                    connection,
2032                    cid,
2033                    L2CAP_LE_Credit_Based_Connection_Response(
2034                        identifier=request.identifier,
2035                        destination_cid=0,
2036                        mtu=server.mtu,
2037                        mps=server.mps,
2038                        initial_credits=0,
2039                        # pylint: disable=line-too-long
2040                        result=L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE,
2041                    ),
2042                )
2043                return
2044
2045            # Create a new channel
2046            logger.debug(
2047                f'creating LE CoC server channel with cid={source_cid} for psm '
2048                f'{request.le_psm}'
2049            )
2050            channel = LeCreditBasedChannel(
2051                self,
2052                connection,
2053                request.le_psm,
2054                source_cid,
2055                request.source_cid,
2056                server.mtu,
2057                server.mps,
2058                request.initial_credits,
2059                request.mtu,
2060                request.mps,
2061                server.max_credits,
2062                True,
2063            )
2064            connection_channels[source_cid] = channel
2065            le_connection_channels[request.source_cid] = channel
2066
2067            # Respond
2068            self.send_control_frame(
2069                connection,
2070                cid,
2071                L2CAP_LE_Credit_Based_Connection_Response(
2072                    identifier=request.identifier,
2073                    destination_cid=source_cid,
2074                    mtu=server.mtu,
2075                    mps=server.mps,
2076                    initial_credits=server.max_credits,
2077                    # pylint: disable=line-too-long
2078                    result=L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_SUCCESSFUL,
2079                ),
2080            )
2081
2082            # Notify
2083            server.on_connection(channel)
2084        else:
2085            logger.info(
2086                f'No LE server for connection 0x{connection.handle:04X} '
2087                f'on PSM {request.le_psm}'
2088            )
2089            self.send_control_frame(
2090                connection,
2091                cid,
2092                L2CAP_LE_Credit_Based_Connection_Response(
2093                    identifier=request.identifier,
2094                    destination_cid=0,
2095                    mtu=L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU,
2096                    mps=L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS,
2097                    initial_credits=0,
2098                    # pylint: disable=line-too-long
2099                    result=L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_REFUSED_LE_PSM_NOT_SUPPORTED,
2100                ),
2101            )
2102
2103    def on_l2cap_le_credit_based_connection_response(
2104        self, connection: Connection, _cid: int, response
2105    ) -> None:
2106        # Find the pending request by identifier
2107        request = self.le_coc_requests.get(response.identifier)
2108        if request is None:
2109            logger.warning(color('!!! received response for unknown request', 'red'))
2110            return
2111        del self.le_coc_requests[response.identifier]
2112
2113        # Find the channel for this request
2114        channel = self.find_channel(connection.handle, request.source_cid)
2115        if channel is None:
2116            logger.warning(
2117                color(
2118                    'received connection response for an unknown channel '
2119                    f'(cid={request.source_cid})',
2120                    'red',
2121                )
2122            )
2123            return
2124
2125        # Process the response
2126        channel.on_connection_response(response)
2127
2128    def on_l2cap_le_flow_control_credit(
2129        self, connection: Connection, _cid: int, credit
2130    ) -> None:
2131        channel = self.find_le_coc_channel(connection.handle, credit.cid)
2132        if channel is None:
2133            logger.warning(f'received credits for an unknown channel (cid={credit.cid}')
2134            return
2135
2136        channel.on_credits(credit.credits)
2137
2138    def on_channel_closed(self, channel: ClassicChannel) -> None:
2139        connection_channels = self.channels.get(channel.connection.handle)
2140        if connection_channels:
2141            if channel.source_cid in connection_channels:
2142                del connection_channels[channel.source_cid]
2143
2144    @deprecated("Please use create_le_credit_based_channel()")
2145    async def open_le_coc(
2146        self, connection: Connection, psm: int, max_credits: int, mtu: int, mps: int
2147    ) -> LeCreditBasedChannel:
2148        return await self.create_le_credit_based_channel(
2149            connection=connection,
2150            spec=LeCreditBasedChannelSpec(
2151                psm=psm, max_credits=max_credits, mtu=mtu, mps=mps
2152            ),
2153        )
2154
2155    async def create_le_credit_based_channel(
2156        self,
2157        connection: Connection,
2158        spec: LeCreditBasedChannelSpec,
2159    ) -> LeCreditBasedChannel:
2160        # Find a free CID for the new channel
2161        connection_channels = self.channels.setdefault(connection.handle, {})
2162        source_cid = self.find_free_le_cid(connection_channels)
2163        if source_cid is None:  # Should never happen!
2164            raise OutOfResourcesError('all CIDs already in use')
2165
2166        if spec.psm is None:
2167            raise InvalidArgumentError('PSM cannot be None')
2168
2169        # Create the channel
2170        logger.debug(f'creating coc channel with cid={source_cid} for psm {spec.psm}')
2171        channel = LeCreditBasedChannel(
2172            manager=self,
2173            connection=connection,
2174            le_psm=spec.psm,
2175            source_cid=source_cid,
2176            destination_cid=0,
2177            mtu=spec.mtu,
2178            mps=spec.mps,
2179            credits=0,
2180            peer_mtu=0,
2181            peer_mps=0,
2182            peer_credits=spec.max_credits,
2183            connected=False,
2184        )
2185        connection_channels[source_cid] = channel
2186
2187        # Connect
2188        try:
2189            await channel.connect()
2190        except Exception as error:
2191            logger.warning(f'connection failed: {error}')
2192            del connection_channels[source_cid]
2193            raise
2194
2195        # Remember the channel by source CID and destination CID
2196        le_connection_channels = self.le_coc_channels.setdefault(connection.handle, {})
2197        le_connection_channels[channel.destination_cid] = channel
2198
2199        return channel
2200
2201    @deprecated("Please use create_classic_channel()")
2202    async def connect(self, connection: Connection, psm: int) -> ClassicChannel:
2203        return await self.create_classic_channel(
2204            connection=connection, spec=ClassicChannelSpec(psm=psm)
2205        )
2206
2207    async def create_classic_channel(
2208        self, connection: Connection, spec: ClassicChannelSpec
2209    ) -> ClassicChannel:
2210        # NOTE: this implementation hard-codes BR/EDR
2211
2212        # Find a free CID for a new channel
2213        connection_channels = self.channels.setdefault(connection.handle, {})
2214        source_cid = self.find_free_br_edr_cid(connection_channels)
2215        if source_cid is None:  # Should never happen!
2216            raise OutOfResourcesError('all CIDs already in use')
2217
2218        if spec.psm is None:
2219            raise InvalidArgumentError('PSM cannot be None')
2220
2221        # Create the channel
2222        logger.debug(
2223            f'creating client channel with cid={source_cid} for psm {spec.psm}'
2224        )
2225        channel = ClassicChannel(
2226            self,
2227            connection,
2228            L2CAP_SIGNALING_CID,
2229            spec.psm,
2230            source_cid,
2231            spec.mtu,
2232        )
2233        connection_channels[source_cid] = channel
2234
2235        # Connect
2236        try:
2237            await channel.connect()
2238        except BaseException as e:
2239            del connection_channels[source_cid]
2240            raise e
2241
2242        return channel
2243
2244
2245# -----------------------------------------------------------------------------
2246# Deprecated Classes
2247# -----------------------------------------------------------------------------
2248
2249
2250class Channel(ClassicChannel):
2251    @deprecated("Please use ClassicChannel")
2252    def __init__(self, *args, **kwargs) -> None:
2253        super().__init__(*args, **kwargs)
2254
2255
2256class LeConnectionOrientedChannel(LeCreditBasedChannel):
2257    @deprecated("Please use LeCreditBasedChannel")
2258    def __init__(self, *args, **kwargs) -> None:
2259        super().__init__(*args, **kwargs)
2260