1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18from __future__ import annotations
19import asyncio
20import struct
21import time
22import logging
23import enum
24import warnings
25from pyee import EventEmitter
26from typing import (
27    Any,
28    Awaitable,
29    Dict,
30    Type,
31    Tuple,
32    Optional,
33    Callable,
34    List,
35    AsyncGenerator,
36    Iterable,
37    Union,
38    SupportsBytes,
39    cast,
40)
41
42from .core import (
43    BT_ADVANCED_AUDIO_DISTRIBUTION_SERVICE,
44    InvalidStateError,
45    ProtocolError,
46    InvalidArgumentError,
47    name_or_number,
48)
49from .a2dp import (
50    A2DP_CODEC_TYPE_NAMES,
51    A2DP_MPEG_2_4_AAC_CODEC_TYPE,
52    A2DP_NON_A2DP_CODEC_TYPE,
53    A2DP_SBC_CODEC_TYPE,
54    AacMediaCodecInformation,
55    SbcMediaCodecInformation,
56    VendorSpecificMediaCodecInformation,
57)
58from . import sdp, device, l2cap
59from .colors import color
60
61# -----------------------------------------------------------------------------
62# Logging
63# -----------------------------------------------------------------------------
64logger = logging.getLogger(__name__)
65
66
67# -----------------------------------------------------------------------------
68# Constants
69# -----------------------------------------------------------------------------
70# fmt: off
71# pylint: disable=line-too-long
72
73AVDTP_PSM = 0x0019
74
75AVDTP_DEFAULT_RTX_SIG_TIMER = 5  # Seconds
76
77# Signal Identifiers (AVDTP spec - 8.5 Signal Command Set)
78AVDTP_DISCOVER             = 0x01
79AVDTP_GET_CAPABILITIES     = 0x02
80AVDTP_SET_CONFIGURATION    = 0x03
81AVDTP_GET_CONFIGURATION    = 0x04
82AVDTP_RECONFIGURE          = 0x05
83AVDTP_OPEN                 = 0x06
84AVDTP_START                = 0x07
85AVDTP_CLOSE                = 0x08
86AVDTP_SUSPEND              = 0x09
87AVDTP_ABORT                = 0x0A
88AVDTP_SECURITY_CONTROL     = 0x0B
89AVDTP_GET_ALL_CAPABILITIES = 0x0C
90AVDTP_DELAYREPORT          = 0x0D
91
92AVDTP_SIGNAL_NAMES = {
93    AVDTP_DISCOVER:             'AVDTP_DISCOVER',
94    AVDTP_GET_CAPABILITIES:     'AVDTP_GET_CAPABILITIES',
95    AVDTP_SET_CONFIGURATION:    'AVDTP_SET_CONFIGURATION',
96    AVDTP_GET_CONFIGURATION:    'AVDTP_GET_CONFIGURATION',
97    AVDTP_RECONFIGURE:          'AVDTP_RECONFIGURE',
98    AVDTP_OPEN:                 'AVDTP_OPEN',
99    AVDTP_START:                'AVDTP_START',
100    AVDTP_CLOSE:                'AVDTP_CLOSE',
101    AVDTP_SUSPEND:              'AVDTP_SUSPEND',
102    AVDTP_ABORT:                'AVDTP_ABORT',
103    AVDTP_SECURITY_CONTROL:     'AVDTP_SECURITY_CONTROL',
104    AVDTP_GET_ALL_CAPABILITIES: 'AVDTP_GET_ALL_CAPABILITIES',
105    AVDTP_DELAYREPORT:          'AVDTP_DELAYREPORT'
106}
107
108AVDTP_SIGNAL_IDENTIFIERS = {
109    'AVDTP_DISCOVER':             AVDTP_DISCOVER,
110    'AVDTP_GET_CAPABILITIES':     AVDTP_GET_CAPABILITIES,
111    'AVDTP_SET_CONFIGURATION':    AVDTP_SET_CONFIGURATION,
112    'AVDTP_GET_CONFIGURATION':    AVDTP_GET_CONFIGURATION,
113    'AVDTP_RECONFIGURE':          AVDTP_RECONFIGURE,
114    'AVDTP_OPEN':                 AVDTP_OPEN,
115    'AVDTP_START':                AVDTP_START,
116    'AVDTP_CLOSE':                AVDTP_CLOSE,
117    'AVDTP_SUSPEND':              AVDTP_SUSPEND,
118    'AVDTP_ABORT':                AVDTP_ABORT,
119    'AVDTP_SECURITY_CONTROL':     AVDTP_SECURITY_CONTROL,
120    'AVDTP_GET_ALL_CAPABILITIES': AVDTP_GET_ALL_CAPABILITIES,
121    'AVDTP_DELAYREPORT':          AVDTP_DELAYREPORT
122}
123
124# Error codes (AVDTP spec - 8.20.6.2 ERROR_CODE tables)
125AVDTP_BAD_HEADER_FORMAT_ERROR          = 0x01
126AVDTP_BAD_LENGTH_ERROR                 = 0x11
127AVDTP_BAD_ACP_SEID_ERROR               = 0x12
128AVDTP_SEP_IN_USE_ERROR                 = 0x13
129AVDTP_SEP_NOT_IN_USE_ERROR             = 0x14
130AVDTP_BAD_SERV_CATEGORY_ERROR          = 0x17
131AVDTP_BAD_PAYLOAD_FORMAT_ERROR         = 0x18
132AVDTP_NOT_SUPPORTED_COMMAND_ERROR      = 0x19
133AVDTP_INVALID_CAPABILITIES_ERROR       = 0x1A
134AVDTP_BAD_RECOVERY_TYPE_ERROR          = 0x22
135AVDTP_BAD_MEDIA_TRANSPORT_FORMAT_ERROR = 0x23
136AVDTP_BAD_RECOVERY_FORMAT_ERROR        = 0x25
137AVDTP_BAD_ROHC_FORMAT_ERROR            = 0x26
138AVDTP_BAD_CP_FORMAT_ERROR              = 0x27
139AVDTP_BAD_MULTIPLEXING_FORMAT_ERROR    = 0x28
140AVDTP_UNSUPPORTED_CONFIGURATION_ERROR  = 0x29
141AVDTP_BAD_STATE_ERROR                  = 0x31
142
143AVDTP_ERROR_NAMES = {
144    AVDTP_BAD_HEADER_FORMAT_ERROR:          'AVDTP_BAD_HEADER_FORMAT_ERROR',
145    AVDTP_BAD_LENGTH_ERROR:                 'AVDTP_BAD_LENGTH_ERROR',
146    AVDTP_BAD_ACP_SEID_ERROR:               'AVDTP_BAD_ACP_SEID_ERROR',
147    AVDTP_SEP_IN_USE_ERROR:                 'AVDTP_SEP_IN_USE_ERROR',
148    AVDTP_SEP_NOT_IN_USE_ERROR:             'AVDTP_SEP_NOT_IN_USE_ERROR',
149    AVDTP_BAD_SERV_CATEGORY_ERROR:          'AVDTP_BAD_SERV_CATEGORY_ERROR',
150    AVDTP_BAD_PAYLOAD_FORMAT_ERROR:         'AVDTP_BAD_PAYLOAD_FORMAT_ERROR',
151    AVDTP_NOT_SUPPORTED_COMMAND_ERROR:      'AVDTP_NOT_SUPPORTED_COMMAND_ERROR',
152    AVDTP_INVALID_CAPABILITIES_ERROR:       'AVDTP_INVALID_CAPABILITIES_ERROR',
153    AVDTP_BAD_RECOVERY_TYPE_ERROR:          'AVDTP_BAD_RECOVERY_TYPE_ERROR',
154    AVDTP_BAD_MEDIA_TRANSPORT_FORMAT_ERROR: 'AVDTP_BAD_MEDIA_TRANSPORT_FORMAT_ERROR',
155    AVDTP_BAD_RECOVERY_FORMAT_ERROR:        'AVDTP_BAD_RECOVERY_FORMAT_ERROR',
156    AVDTP_BAD_ROHC_FORMAT_ERROR:            'AVDTP_BAD_ROHC_FORMAT_ERROR',
157    AVDTP_BAD_CP_FORMAT_ERROR:              'AVDTP_BAD_CP_FORMAT_ERROR',
158    AVDTP_BAD_MULTIPLEXING_FORMAT_ERROR:    'AVDTP_BAD_MULTIPLEXING_FORMAT_ERROR',
159    AVDTP_UNSUPPORTED_CONFIGURATION_ERROR:  'AVDTP_UNSUPPORTED_CONFIGURATION_ERROR',
160    AVDTP_BAD_STATE_ERROR:                  'AVDTP_BAD_STATE_ERROR'
161}
162
163AVDTP_AUDIO_MEDIA_TYPE      = 0x00
164AVDTP_VIDEO_MEDIA_TYPE      = 0x01
165AVDTP_MULTIMEDIA_MEDIA_TYPE = 0x02
166
167AVDTP_MEDIA_TYPE_NAMES = {
168    AVDTP_AUDIO_MEDIA_TYPE:      'AVDTP_AUDIO_MEDIA_TYPE',
169    AVDTP_VIDEO_MEDIA_TYPE:      'AVDTP_VIDEO_MEDIA_TYPE',
170    AVDTP_MULTIMEDIA_MEDIA_TYPE: 'AVDTP_MULTIMEDIA_MEDIA_TYPE'
171}
172
173# TSEP (AVDTP spec - 8.20.3 Stream End-point Type, Source or Sink (TSEP))
174AVDTP_TSEP_SRC = 0x00
175AVDTP_TSEP_SNK = 0x01
176
177AVDTP_TSEP_NAMES = {
178    AVDTP_TSEP_SRC: 'AVDTP_TSEP_SRC',
179    AVDTP_TSEP_SNK: 'AVDTP_TSEP_SNK'
180}
181
182# Service Categories (AVDTP spec - Table 8.47: Service Category information element field values)
183AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY    = 0x01
184AVDTP_REPORTING_SERVICE_CATEGORY          = 0x02
185AVDTP_RECOVERY_SERVICE_CATEGORY           = 0x03
186AVDTP_CONTENT_PROTECTION_SERVICE_CATEGORY = 0x04
187AVDTP_HEADER_COMPRESSION_SERVICE_CATEGORY = 0x05
188AVDTP_MULTIPLEXING_SERVICE_CATEGORY       = 0x06
189AVDTP_MEDIA_CODEC_SERVICE_CATEGORY        = 0x07
190AVDTP_DELAY_REPORTING_SERVICE_CATEGORY    = 0x08
191
192AVDTP_SERVICE_CATEGORY_NAMES = {
193    AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY:    'AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY',
194    AVDTP_REPORTING_SERVICE_CATEGORY:          'AVDTP_REPORTING_SERVICE_CATEGORY',
195    AVDTP_RECOVERY_SERVICE_CATEGORY:           'AVDTP_RECOVERY_SERVICE_CATEGORY',
196    AVDTP_CONTENT_PROTECTION_SERVICE_CATEGORY: 'AVDTP_CONTENT_PROTECTION_SERVICE_CATEGORY',
197    AVDTP_HEADER_COMPRESSION_SERVICE_CATEGORY: 'AVDTP_HEADER_COMPRESSION_SERVICE_CATEGORY',
198    AVDTP_MULTIPLEXING_SERVICE_CATEGORY:       'AVDTP_MULTIPLEXING_SERVICE_CATEGORY',
199    AVDTP_MEDIA_CODEC_SERVICE_CATEGORY:        'AVDTP_MEDIA_CODEC_SERVICE_CATEGORY',
200    AVDTP_DELAY_REPORTING_SERVICE_CATEGORY:    'AVDTP_DELAY_REPORTING_SERVICE_CATEGORY'
201}
202
203# States (AVDTP spec - 9.1 State Definitions)
204AVDTP_IDLE_STATE       = 0x00
205AVDTP_CONFIGURED_STATE = 0x01
206AVDTP_OPEN_STATE       = 0x02
207AVDTP_STREAMING_STATE  = 0x03
208AVDTP_CLOSING_STATE    = 0x04
209AVDTP_ABORTING_STATE   = 0x05
210
211AVDTP_STATE_NAMES = {
212    AVDTP_IDLE_STATE:       'AVDTP_IDLE_STATE',
213    AVDTP_CONFIGURED_STATE: 'AVDTP_CONFIGURED_STATE',
214    AVDTP_OPEN_STATE:       'AVDTP_OPEN_STATE',
215    AVDTP_STREAMING_STATE:  'AVDTP_STREAMING_STATE',
216    AVDTP_CLOSING_STATE:    'AVDTP_CLOSING_STATE',
217    AVDTP_ABORTING_STATE:   'AVDTP_ABORTING_STATE'
218}
219
220# fmt: on
221# pylint: enable=line-too-long
222# pylint: disable=invalid-name
223
224
225# -----------------------------------------------------------------------------
226async def find_avdtp_service_with_sdp_client(
227    sdp_client: sdp.Client,
228) -> Optional[Tuple[int, int]]:
229    '''
230    Find an AVDTP service, using a connected SDP client, and return its version,
231    or None if none is found
232    '''
233
234    # Search for services with an Audio Sink service class
235    search_result = await sdp_client.search_attributes(
236        [BT_ADVANCED_AUDIO_DISTRIBUTION_SERVICE],
237        [sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID],
238    )
239    for attribute_list in search_result:
240        profile_descriptor_list = sdp.ServiceAttribute.find_attribute_in_list(
241            attribute_list, sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID
242        )
243        if profile_descriptor_list:
244            for profile_descriptor in profile_descriptor_list.value:
245                if (
246                    profile_descriptor.type == sdp.DataElement.SEQUENCE
247                    and len(profile_descriptor.value) >= 2
248                ):
249                    avdtp_version_major = profile_descriptor.value[1].value >> 8
250                    avdtp_version_minor = profile_descriptor.value[1].value & 0xFF
251                    return (avdtp_version_major, avdtp_version_minor)
252    return None
253
254
255# -----------------------------------------------------------------------------
256async def find_avdtp_service_with_connection(
257    connection: device.Connection,
258) -> Optional[Tuple[int, int]]:
259    '''
260    Find an AVDTP service, for a connection, and return its version,
261    or None if none is found
262    '''
263
264    sdp_client = sdp.Client(connection)
265    await sdp_client.connect()
266    service_version = await find_avdtp_service_with_sdp_client(sdp_client)
267    await sdp_client.disconnect()
268
269    return service_version
270
271
272# -----------------------------------------------------------------------------
273class RealtimeClock:
274    def now(self) -> float:
275        return time.time()
276
277    async def sleep(self, duration: float) -> None:
278        await asyncio.sleep(duration)
279
280
281# -----------------------------------------------------------------------------
282class MediaPacket:
283    @staticmethod
284    def from_bytes(data: bytes) -> MediaPacket:
285        version = (data[0] >> 6) & 0x03
286        padding = (data[0] >> 5) & 0x01
287        extension = (data[0] >> 4) & 0x01
288        csrc_count = data[0] & 0x0F
289        marker = (data[1] >> 7) & 0x01
290        payload_type = data[1] & 0x7F
291        sequence_number = struct.unpack_from('>H', data, 2)[0]
292        timestamp = struct.unpack_from('>I', data, 4)[0]
293        ssrc = struct.unpack_from('>I', data, 8)[0]
294        csrc_list = [
295            struct.unpack_from('>I', data, 12 + i)[0] for i in range(csrc_count)
296        ]
297        payload = data[12 + csrc_count * 4 :]
298
299        return MediaPacket(
300            version,
301            padding,
302            extension,
303            marker,
304            sequence_number,
305            timestamp,
306            ssrc,
307            csrc_list,
308            payload_type,
309            payload,
310        )
311
312    def __init__(
313        self,
314        version: int,
315        padding: int,
316        extension: int,
317        marker: int,
318        sequence_number: int,
319        timestamp: int,
320        ssrc: int,
321        csrc_list: List[int],
322        payload_type: int,
323        payload: bytes,
324    ) -> None:
325        self.version = version
326        self.padding = padding
327        self.extension = extension
328        self.marker = marker
329        self.sequence_number = sequence_number & 0xFFFF
330        self.timestamp = timestamp & 0xFFFFFFFF
331        self.ssrc = ssrc
332        self.csrc_list = csrc_list
333        self.payload_type = payload_type
334        self.payload = payload
335
336    def __bytes__(self) -> bytes:
337        header = bytes(
338            [
339                self.version << 6
340                | self.padding << 5
341                | self.extension << 4
342                | len(self.csrc_list),
343                self.marker << 7 | self.payload_type,
344            ]
345        ) + struct.pack(
346            '>HII',
347            self.sequence_number,
348            self.timestamp,
349            self.ssrc,
350        )
351        for csrc in self.csrc_list:
352            header += struct.pack('>I', csrc)
353        return header + self.payload
354
355    def __str__(self) -> str:
356        return (
357            f'RTP(v={self.version},'
358            f'p={self.padding},'
359            f'x={self.extension},'
360            f'm={self.marker},'
361            f'pt={self.payload_type},'
362            f'sn={self.sequence_number},'
363            f'ts={self.timestamp},'
364            f'ssrc={self.ssrc},'
365            f'csrcs={self.csrc_list},'
366            f'payload_size={len(self.payload)})'
367        )
368
369
370# -----------------------------------------------------------------------------
371class MediaPacketPump:
372    pump_task: Optional[asyncio.Task]
373
374    def __init__(
375        self, packets: AsyncGenerator, clock: RealtimeClock = RealtimeClock()
376    ) -> None:
377        self.packets = packets
378        self.clock = clock
379        self.pump_task = None
380
381    async def start(self, rtp_channel: l2cap.ClassicChannel) -> None:
382        async def pump_packets():
383            start_time = 0
384            start_timestamp = 0
385
386            try:
387                logger.debug('pump starting')
388                async for packet in self.packets:
389                    # Capture the timestamp of the first packet
390                    if start_time == 0:
391                        start_time = self.clock.now()
392                        start_timestamp = packet.timestamp_seconds
393
394                    # Wait until we can send
395                    when = start_time + (packet.timestamp_seconds - start_timestamp)
396                    now = self.clock.now()
397                    if when > now:
398                        delay = when - now
399                        logger.debug(f'waiting for {delay}')
400                        await self.clock.sleep(delay)
401
402                    # Emit
403                    rtp_channel.send_pdu(bytes(packet))
404                    logger.debug(
405                        f'{color(">>> sending RTP packet:", "green")} {packet}'
406                    )
407            except asyncio.exceptions.CancelledError:
408                logger.debug('pump canceled')
409
410        # Pump packets
411        self.pump_task = asyncio.create_task(pump_packets())
412
413    async def stop(self) -> None:
414        # Stop the pump
415        if self.pump_task:
416            self.pump_task.cancel()
417            await self.pump_task
418            self.pump_task = None
419
420
421# -----------------------------------------------------------------------------
422class MessageAssembler:
423    message: Optional[bytes]
424
425    def __init__(self, callback: Callable[[int, Message], Any]) -> None:
426        self.callback = callback
427        self.reset()
428
429    def reset(self) -> None:
430        self.transaction_label = 0
431        self.message = None
432        self.message_type = Message.MessageType.COMMAND
433        self.signal_identifier = 0
434        self.number_of_signal_packets = 0
435        self.packet_count = 0
436
437    def on_pdu(self, pdu: bytes) -> None:
438        self.packet_count += 1
439
440        transaction_label = pdu[0] >> 4
441        packet_type = Protocol.PacketType((pdu[0] >> 2) & 3)
442        message_type = Message.MessageType(pdu[0] & 3)
443
444        logger.debug(
445            f'transaction_label={transaction_label}, '
446            f'packet_type={packet_type.name}, '
447            f'message_type={message_type.name}'
448        )
449        if packet_type in (
450            Protocol.PacketType.SINGLE_PACKET,
451            Protocol.PacketType.START_PACKET,
452        ):
453            if self.message is not None:
454                # The previous message has not been terminated
455                logger.warning(
456                    'received a start or single packet when expecting an end or '
457                    'continuation'
458                )
459                self.reset()
460
461            self.transaction_label = transaction_label
462            self.signal_identifier = pdu[1] & 0x3F
463            self.message_type = message_type
464
465            if packet_type == Protocol.PacketType.SINGLE_PACKET:
466                self.message = pdu[2:]
467                self.on_message_complete()
468            else:
469                self.number_of_signal_packets = pdu[2]
470                self.message = pdu[3:]
471        elif packet_type in (
472            Protocol.PacketType.CONTINUE_PACKET,
473            Protocol.PacketType.END_PACKET,
474        ):
475            if self.packet_count == 0:
476                logger.warning('unexpected continuation')
477                return
478
479            if transaction_label != self.transaction_label:
480                logger.warning(
481                    f'transaction label mismatch: expected {self.transaction_label}, '
482                    f'received {transaction_label}'
483                )
484                return
485
486            if message_type != self.message_type:
487                logger.warning(
488                    f'message type mismatch: expected {self.message_type}, '
489                    f'received {message_type}'
490                )
491                return
492
493            self.message = (self.message or b'') + pdu[1:]
494
495            if packet_type == Protocol.PacketType.END_PACKET:
496                if self.packet_count != self.number_of_signal_packets:
497                    logger.warning(
498                        'incomplete fragmented message: '
499                        f'expected {self.number_of_signal_packets} packets, '
500                        f'received {self.packet_count}'
501                    )
502                    self.reset()
503                    return
504
505                self.on_message_complete()
506            else:
507                if self.packet_count > self.number_of_signal_packets:
508                    logger.warning(
509                        'too many packets: '
510                        f'expected {self.number_of_signal_packets}, '
511                        f'received {self.packet_count}'
512                    )
513                    self.reset()
514                    return
515
516    def on_message_complete(self) -> None:
517        message = Message.create(
518            self.signal_identifier, self.message_type, self.message or b''
519        )
520        try:
521            self.callback(self.transaction_label, message)
522        except Exception as error:
523            logger.exception(color(f'!!! exception in callback: {error}', 'red'))
524
525        self.reset()
526
527
528# -----------------------------------------------------------------------------
529class ServiceCapabilities:
530    @staticmethod
531    def create(
532        service_category: int, service_capabilities_bytes: bytes
533    ) -> ServiceCapabilities:
534        # Select the appropriate subclass
535        cls: Type[ServiceCapabilities]
536        if service_category == AVDTP_MEDIA_CODEC_SERVICE_CATEGORY:
537            cls = MediaCodecCapabilities
538        else:
539            cls = ServiceCapabilities
540
541        # Create an instance and initialize it
542        instance = cls.__new__(cls)
543        instance.service_category = service_category
544        instance.service_capabilities_bytes = service_capabilities_bytes
545        instance.init_from_bytes()
546
547        return instance
548
549    @staticmethod
550    def parse_capabilities(payload: bytes) -> List[ServiceCapabilities]:
551        capabilities = []
552        while payload:
553            service_category = payload[0]
554            length_of_service_capabilities = payload[1]
555            service_capabilities_bytes = payload[2 : 2 + length_of_service_capabilities]
556            capabilities.append(
557                ServiceCapabilities.create(service_category, service_capabilities_bytes)
558            )
559
560            payload = payload[2 + length_of_service_capabilities :]
561
562        return capabilities
563
564    @staticmethod
565    def serialize_capabilities(capabilities: Iterable[ServiceCapabilities]) -> bytes:
566        serialized = b''
567        for item in capabilities:
568            serialized += (
569                bytes([item.service_category, len(item.service_capabilities_bytes)])
570                + item.service_capabilities_bytes
571            )
572        return serialized
573
574    def init_from_bytes(self) -> None:
575        pass
576
577    def __init__(
578        self, service_category: int, service_capabilities_bytes: bytes = b''
579    ) -> None:
580        self.service_category = service_category
581        self.service_capabilities_bytes = service_capabilities_bytes
582
583    def to_string(self, details: Optional[List[str]] = None) -> str:
584        attributes = ','.join(
585            [name_or_number(AVDTP_SERVICE_CATEGORY_NAMES, self.service_category)]
586            + (details or [])
587        )
588        return f'ServiceCapabilities({attributes})'
589
590    def __str__(self) -> str:
591        if self.service_capabilities_bytes:
592            details = [self.service_capabilities_bytes.hex()]
593        else:
594            details = []
595        return self.to_string(details)
596
597
598# -----------------------------------------------------------------------------
599class MediaCodecCapabilities(ServiceCapabilities):
600    media_codec_information: Union[bytes, SupportsBytes]
601    media_type: int
602    media_codec_type: int
603
604    def init_from_bytes(self) -> None:
605        self.media_type = self.service_capabilities_bytes[0]
606        self.media_codec_type = self.service_capabilities_bytes[1]
607        self.media_codec_information = self.service_capabilities_bytes[2:]
608
609        if self.media_codec_type == A2DP_SBC_CODEC_TYPE:
610            self.media_codec_information = SbcMediaCodecInformation.from_bytes(
611                self.media_codec_information
612            )
613        elif self.media_codec_type == A2DP_MPEG_2_4_AAC_CODEC_TYPE:
614            self.media_codec_information = AacMediaCodecInformation.from_bytes(
615                self.media_codec_information
616            )
617        elif self.media_codec_type == A2DP_NON_A2DP_CODEC_TYPE:
618            self.media_codec_information = (
619                VendorSpecificMediaCodecInformation.from_bytes(
620                    self.media_codec_information
621                )
622            )
623
624    def __init__(
625        self,
626        media_type: int,
627        media_codec_type: int,
628        media_codec_information: Union[bytes, SupportsBytes],
629    ) -> None:
630        super().__init__(
631            AVDTP_MEDIA_CODEC_SERVICE_CATEGORY,
632            bytes([media_type, media_codec_type]) + bytes(media_codec_information),
633        )
634        self.media_type = media_type
635        self.media_codec_type = media_codec_type
636        self.media_codec_information = media_codec_information
637
638    def __str__(self) -> str:
639        codec_info = (
640            self.media_codec_information.hex()
641            if isinstance(self.media_codec_information, bytes)
642            else str(self.media_codec_information)
643        )
644
645        details = [
646            f'media_type={name_or_number(AVDTP_MEDIA_TYPE_NAMES, self.media_type)}',
647            f'codec={name_or_number(A2DP_CODEC_TYPE_NAMES, self.media_codec_type)}',
648            f'codec_info={codec_info}',
649        ]
650        return self.to_string(details)
651
652
653# -----------------------------------------------------------------------------
654class EndPointInfo:
655    @staticmethod
656    def from_bytes(payload: bytes) -> EndPointInfo:
657        return EndPointInfo(
658            payload[0] >> 2, payload[0] >> 1 & 1, payload[1] >> 4, payload[1] >> 3 & 1
659        )
660
661    def __bytes__(self) -> bytes:
662        return bytes(
663            [self.seid << 2 | self.in_use << 1, self.media_type << 4 | self.tsep << 3]
664        )
665
666    def __init__(self, seid: int, in_use: int, media_type: int, tsep: int) -> None:
667        self.seid = seid
668        self.in_use = in_use
669        self.media_type = media_type
670        self.tsep = tsep
671
672
673# -----------------------------------------------------------------------------
674class Message:  # pylint:disable=attribute-defined-outside-init
675    class MessageType(enum.IntEnum):
676        COMMAND = 0
677        GENERAL_REJECT = 1
678        RESPONSE_ACCEPT = 2
679        RESPONSE_REJECT = 3
680
681    # Subclasses, by signal identifier and message type
682    subclasses: Dict[int, Dict[int, Type[Message]]] = {}
683    message_type: MessageType
684    signal_identifier: int
685
686    @staticmethod
687    def subclass(subclass):
688        # Infer the signal identifier and message subtype from the class name
689        name = subclass.__name__
690        if name == 'General_Reject':
691            subclass.signal_identifier = 0
692            signal_identifier_str = None
693            message_type = Message.MessageType.COMMAND
694        elif name.endswith('_Command'):
695            signal_identifier_str = name[:-8]
696            message_type = Message.MessageType.COMMAND
697        elif name.endswith('_Response'):
698            signal_identifier_str = name[:-9]
699            message_type = Message.MessageType.RESPONSE_ACCEPT
700        elif name.endswith('_Reject'):
701            signal_identifier_str = name[:-7]
702            message_type = Message.MessageType.RESPONSE_REJECT
703        else:
704            raise InvalidArgumentError('invalid class name')
705
706        subclass.message_type = message_type
707
708        if signal_identifier_str is not None:
709            for name, signal_identifier in AVDTP_SIGNAL_IDENTIFIERS.items():
710                if name.lower().endswith(signal_identifier_str.lower()):
711                    subclass.signal_identifier = signal_identifier
712                    break
713
714            # Register the subclass
715            Message.subclasses.setdefault(subclass.signal_identifier, {})[
716                subclass.message_type
717            ] = subclass
718
719        return subclass
720
721    # Factory method to create a subclass based on the signal identifier and message
722    # type
723    @staticmethod
724    def create(
725        signal_identifier: int, message_type: MessageType, payload: bytes
726    ) -> Message:
727        # Look for a registered subclass
728        subclasses = Message.subclasses.get(signal_identifier)
729        if subclasses:
730            subclass = subclasses.get(message_type)
731            if subclass:
732                instance = subclass.__new__(subclass)
733                instance.payload = payload
734                instance.init_from_payload()
735                return instance
736
737        # Instantiate the appropriate class based on the message type
738        if message_type == Message.MessageType.RESPONSE_REJECT:
739            # Assume a simple reject message
740            instance = Simple_Reject(payload)
741            instance.init_from_payload()
742        else:
743            instance = Message(payload)
744        instance.signal_identifier = signal_identifier
745        instance.message_type = message_type
746        return instance
747
748    def init_from_payload(self) -> None:
749        pass
750
751    def __init__(self, payload: bytes = b'') -> None:
752        self.payload = payload
753
754    def to_string(self, details: Union[str, Iterable[str]]) -> str:
755        base = color(
756            f'{name_or_number(AVDTP_SIGNAL_NAMES, self.signal_identifier)}_'
757            f'{self.message_type.name}',
758            'yellow',
759        )
760
761        if details:
762            if isinstance(details, str):
763                return f'{base}: {details}'
764
765            return (
766                base
767                + ':\n'
768                + '\n'.join(['  ' + color(detail, 'cyan') for detail in details])
769            )
770
771        return base
772
773    def __str__(self) -> str:
774        return self.to_string(self.payload.hex())
775
776
777# -----------------------------------------------------------------------------
778class Simple_Command(Message):
779    '''
780    Command message with just one seid
781    '''
782
783    def init_from_payload(self):
784        self.acp_seid = self.payload[0] >> 2
785
786    def __init__(self, seid):
787        super().__init__(payload=bytes([seid << 2]))
788        self.acp_seid = seid
789
790    def __str__(self) -> str:
791        return self.to_string([f'ACP SEID: {self.acp_seid}'])
792
793
794# -----------------------------------------------------------------------------
795class Simple_Reject(Message):
796    '''
797    Reject messages with just an error code
798    '''
799
800    def init_from_payload(self):
801        self.error_code = self.payload[0]
802
803    def __init__(self, error_code):
804        super().__init__(payload=bytes([error_code]))
805        self.error_code = error_code
806
807    def __str__(self) -> str:
808        details = [f'error_code: {name_or_number(AVDTP_ERROR_NAMES, self.error_code)}']
809        return self.to_string(details)
810
811
812# -----------------------------------------------------------------------------
813@Message.subclass
814class Discover_Command(Message):
815    '''
816    See Bluetooth AVDTP spec - 8.6.1 Stream End Point Discovery Command
817    '''
818
819
820# -----------------------------------------------------------------------------
821@Message.subclass
822class Discover_Response(Message):
823    '''
824    See Bluetooth AVDTP spec - 8.6.2 Stream End Point Discovery Response
825    '''
826
827    endpoints: List[EndPointInfo]
828
829    def init_from_payload(self):
830        self.endpoints = []
831        endpoint_count = len(self.payload) // 2
832        for i in range(endpoint_count):
833            self.endpoints.append(
834                EndPointInfo.from_bytes(self.payload[i * 2 : (i + 1) * 2])
835            )
836
837    def __init__(self, endpoints):
838        super().__init__(payload=b''.join([bytes(endpoint) for endpoint in endpoints]))
839        self.endpoints = endpoints
840
841    def __str__(self) -> str:
842        details = []
843        for endpoint in self.endpoints:
844            details.extend(
845                # pylint: disable=line-too-long
846                [
847                    f'ACP SEID: {endpoint.seid}',
848                    f'  in_use:     {endpoint.in_use}',
849                    f'  media_type: {name_or_number(AVDTP_MEDIA_TYPE_NAMES, endpoint.media_type)}',
850                    f'  tsep:       {name_or_number(AVDTP_TSEP_NAMES, endpoint.tsep)}',
851                ]
852            )
853        return self.to_string(details)
854
855
856# -----------------------------------------------------------------------------
857@Message.subclass
858class Get_Capabilities_Command(Simple_Command):
859    '''
860    See Bluetooth AVDTP spec - 8.7.1 Get Capabilities Command
861    '''
862
863
864# -----------------------------------------------------------------------------
865@Message.subclass
866class Get_Capabilities_Response(Message):
867    '''
868    See Bluetooth AVDTP spec - 8.7.2 Get All Capabilities Response
869    '''
870
871    def init_from_payload(self):
872        self.capabilities = ServiceCapabilities.parse_capabilities(self.payload)
873
874    def __init__(self, capabilities):
875        super().__init__(
876            payload=ServiceCapabilities.serialize_capabilities(capabilities)
877        )
878        self.capabilities = capabilities
879
880    def __str__(self) -> str:
881        details = [str(capability) for capability in self.capabilities]
882        return self.to_string(details)
883
884
885# -----------------------------------------------------------------------------
886@Message.subclass
887class Get_Capabilities_Reject(Simple_Reject):
888    '''
889    See Bluetooth AVDTP spec - 8.7.3 Get Capabilities Reject
890    '''
891
892
893# -----------------------------------------------------------------------------
894@Message.subclass
895class Get_All_Capabilities_Command(Get_Capabilities_Command):
896    '''
897    See Bluetooth AVDTP spec - 8.8.1 Get All Capabilities Command
898    '''
899
900
901# -----------------------------------------------------------------------------
902@Message.subclass
903class Get_All_Capabilities_Response(Get_Capabilities_Response):
904    '''
905    See Bluetooth AVDTP spec - 8.8.2 Get All Capabilities Response
906    '''
907
908
909# -----------------------------------------------------------------------------
910@Message.subclass
911class Get_All_Capabilities_Reject(Simple_Reject):
912    '''
913    See Bluetooth AVDTP spec - 8.8.3 Get All Capabilities Reject
914    '''
915
916
917# -----------------------------------------------------------------------------
918@Message.subclass
919class Set_Configuration_Command(Message):
920    '''
921    See Bluetooth AVDTP spec - 8.9.1 Set Configuration Command
922    '''
923
924    def init_from_payload(self):
925        self.acp_seid = self.payload[0] >> 2
926        self.int_seid = self.payload[1] >> 2
927        self.capabilities = ServiceCapabilities.parse_capabilities(self.payload[2:])
928
929    def __init__(
930        self, acp_seid: int, int_seid: int, capabilities: Iterable[ServiceCapabilities]
931    ) -> None:
932        super().__init__(
933            payload=bytes([acp_seid << 2, int_seid << 2])
934            + ServiceCapabilities.serialize_capabilities(capabilities)
935        )
936        self.acp_seid = acp_seid
937        self.int_seid = int_seid
938        self.capabilities = capabilities
939
940    def __str__(self) -> str:
941        details = [f'ACP SEID: {self.acp_seid}', f'INT SEID: {self.int_seid}'] + [
942            str(capability) for capability in self.capabilities
943        ]
944        return self.to_string(details)
945
946
947# -----------------------------------------------------------------------------
948@Message.subclass
949class Set_Configuration_Response(Message):
950    '''
951    See Bluetooth AVDTP spec - 8.9.2 Set Configuration Response
952    '''
953
954
955# -----------------------------------------------------------------------------
956@Message.subclass
957class Set_Configuration_Reject(Message):
958    '''
959    See Bluetooth AVDTP spec - 8.9.3 Set Configuration Reject
960    '''
961
962    def init_from_payload(self):
963        self.service_category = self.payload[0]
964        self.error_code = self.payload[1]
965
966    def __init__(self, service_category, error_code):
967        super().__init__(payload=bytes([service_category, error_code]))
968        self.service_category = service_category
969        self.error_code = error_code
970
971    def __str__(self) -> str:
972        details = [
973            (
974                'service_category: '
975                f'{name_or_number(AVDTP_SERVICE_CATEGORY_NAMES, self.service_category)}'
976            ),
977            (
978                'error_code:       '
979                f'{name_or_number(AVDTP_ERROR_NAMES, self.error_code)}'
980            ),
981        ]
982        return self.to_string(details)
983
984
985# -----------------------------------------------------------------------------
986@Message.subclass
987class Get_Configuration_Command(Simple_Command):
988    '''
989    See Bluetooth AVDTP spec - 8.10.1 Get Configuration Command
990    '''
991
992
993# -----------------------------------------------------------------------------
994@Message.subclass
995class Get_Configuration_Response(Message):
996    '''
997    See Bluetooth AVDTP spec - 8.10.2 Get Configuration Response
998    '''
999
1000    def init_from_payload(self):
1001        self.capabilities = ServiceCapabilities.parse_capabilities(self.payload)
1002
1003    def __init__(self, capabilities: Iterable[ServiceCapabilities]) -> None:
1004        super().__init__(
1005            payload=ServiceCapabilities.serialize_capabilities(capabilities)
1006        )
1007        self.capabilities = capabilities
1008
1009    def __str__(self) -> str:
1010        details = [str(capability) for capability in self.capabilities]
1011        return self.to_string(details)
1012
1013
1014# -----------------------------------------------------------------------------
1015@Message.subclass
1016class Get_Configuration_Reject(Simple_Reject):
1017    '''
1018    See Bluetooth AVDTP spec - 8.10.3 Get Configuration Reject
1019    '''
1020
1021
1022# -----------------------------------------------------------------------------
1023@Message.subclass
1024class Reconfigure_Command(Message):
1025    '''
1026    See Bluetooth AVDTP spec - 8.11.1 Reconfigure Command
1027    '''
1028
1029    def init_from_payload(self):
1030        # pylint: disable=attribute-defined-outside-init
1031        self.acp_seid = self.payload[0] >> 2
1032        self.capabilities = ServiceCapabilities.parse_capabilities(self.payload[1:])
1033
1034    def __str__(self) -> str:
1035        details = [
1036            f'ACP SEID: {self.acp_seid}',
1037        ] + [str(capability) for capability in self.capabilities]
1038        return self.to_string(details)
1039
1040
1041# -----------------------------------------------------------------------------
1042@Message.subclass
1043class Reconfigure_Response(Message):
1044    '''
1045    See Bluetooth AVDTP spec - 8.11.2 Reconfigure Response
1046    '''
1047
1048
1049# -----------------------------------------------------------------------------
1050@Message.subclass
1051class Reconfigure_Reject(Set_Configuration_Reject):
1052    '''
1053    See Bluetooth AVDTP spec - 8.11.3 Reconfigure Reject
1054    '''
1055
1056
1057# -----------------------------------------------------------------------------
1058@Message.subclass
1059class Open_Command(Simple_Command):
1060    '''
1061    See Bluetooth AVDTP spec - 8.12.1 Open Stream Command
1062    '''
1063
1064
1065# -----------------------------------------------------------------------------
1066@Message.subclass
1067class Open_Response(Message):
1068    '''
1069    See Bluetooth AVDTP spec - 8.12.2 Open Stream Response
1070    '''
1071
1072
1073# -----------------------------------------------------------------------------
1074@Message.subclass
1075class Open_Reject(Simple_Reject):
1076    '''
1077    See Bluetooth AVDTP spec - 8.12.3 Open Stream Reject
1078    '''
1079
1080
1081# -----------------------------------------------------------------------------
1082@Message.subclass
1083class Start_Command(Message):
1084    '''
1085    See Bluetooth AVDTP spec - 8.13.1 Start Stream Command
1086    '''
1087
1088    def init_from_payload(self):
1089        self.acp_seids = [x >> 2 for x in self.payload]
1090
1091    def __init__(self, seids: Iterable[int]) -> None:
1092        super().__init__(payload=bytes([seid << 2 for seid in seids]))
1093        self.acp_seids = seids
1094
1095    def __str__(self) -> str:
1096        return self.to_string([f'ACP SEIDs: {self.acp_seids}'])
1097
1098
1099# -----------------------------------------------------------------------------
1100@Message.subclass
1101class Start_Response(Message):
1102    '''
1103    See Bluetooth AVDTP spec - 8.13.2 Start Stream Response
1104    '''
1105
1106
1107# -----------------------------------------------------------------------------
1108@Message.subclass
1109class Start_Reject(Message):
1110    '''
1111    See Bluetooth AVDTP spec - 8.13.3 Set Configuration Reject
1112    '''
1113
1114    def init_from_payload(self):
1115        self.acp_seid = self.payload[0] >> 2
1116        self.error_code = self.payload[1]
1117
1118    def __init__(self, acp_seid, error_code):
1119        super().__init__(payload=bytes([acp_seid << 2, error_code]))
1120        self.acp_seid = acp_seid
1121        self.error_code = error_code
1122
1123    def __str__(self) -> str:
1124        details = [
1125            f'acp_seid:   {self.acp_seid}',
1126            f'error_code: {name_or_number(AVDTP_ERROR_NAMES, self.error_code)}',
1127        ]
1128        return self.to_string(details)
1129
1130
1131# -----------------------------------------------------------------------------
1132@Message.subclass
1133class Close_Command(Simple_Command):
1134    '''
1135    See Bluetooth AVDTP spec - 8.14.1 Close Stream Command
1136    '''
1137
1138
1139# -----------------------------------------------------------------------------
1140@Message.subclass
1141class Close_Response(Message):
1142    '''
1143    See Bluetooth AVDTP spec - 8.14.2 Close Stream Response
1144    '''
1145
1146
1147# -----------------------------------------------------------------------------
1148@Message.subclass
1149class Close_Reject(Simple_Reject):
1150    '''
1151    See Bluetooth AVDTP spec - 8.14.3 Close Stream Reject
1152    '''
1153
1154
1155# -----------------------------------------------------------------------------
1156@Message.subclass
1157class Suspend_Command(Start_Command):
1158    '''
1159    See Bluetooth AVDTP spec - 8.15.1 Suspend Command
1160    '''
1161
1162
1163# -----------------------------------------------------------------------------
1164@Message.subclass
1165class Suspend_Response(Message):
1166    '''
1167    See Bluetooth AVDTP spec - 8.15.2 Suspend Response
1168    '''
1169
1170
1171# -----------------------------------------------------------------------------
1172@Message.subclass
1173class Suspend_Reject(Start_Reject):
1174    '''
1175    See Bluetooth AVDTP spec - 8.15.3 Suspend Reject
1176    '''
1177
1178
1179# -----------------------------------------------------------------------------
1180@Message.subclass
1181class Abort_Command(Simple_Command):
1182    '''
1183    See Bluetooth AVDTP spec - 8.16.1 Abort Command
1184    '''
1185
1186
1187# -----------------------------------------------------------------------------
1188@Message.subclass
1189class Abort_Response(Message):
1190    '''
1191    See Bluetooth AVDTP spec - 8.16.2 Abort Response
1192    '''
1193
1194
1195# -----------------------------------------------------------------------------
1196@Message.subclass
1197class Security_Control_Command(Message):
1198    '''
1199    See Bluetooth AVDTP spec - 8.17.1 Security Control Command
1200    '''
1201
1202
1203# -----------------------------------------------------------------------------
1204@Message.subclass
1205class Security_Control_Response(Message):
1206    '''
1207    See Bluetooth AVDTP spec - 8.17.2 Security Control Response
1208    '''
1209
1210
1211# -----------------------------------------------------------------------------
1212@Message.subclass
1213class Security_Control_Reject(Simple_Reject):
1214    '''
1215    See Bluetooth AVDTP spec - 8.17.3 Security Control Reject
1216    '''
1217
1218
1219# -----------------------------------------------------------------------------
1220@Message.subclass
1221class General_Reject(Message):
1222    '''
1223    See Bluetooth AVDTP spec - 8.18 General Reject
1224    '''
1225
1226    def to_string(self, details):
1227        return color('GENERAL_REJECT', 'yellow')
1228
1229
1230# -----------------------------------------------------------------------------
1231@Message.subclass
1232class DelayReport_Command(Message):
1233    '''
1234    See Bluetooth AVDTP spec - 8.19.1 Delay Report Command
1235    '''
1236
1237    def init_from_payload(self):
1238        # pylint: disable=attribute-defined-outside-init
1239        self.acp_seid = self.payload[0] >> 2
1240        self.delay = (self.payload[1] << 8) | (self.payload[2])
1241
1242    def __str__(self) -> str:
1243        return self.to_string([f'ACP_SEID: {self.acp_seid}', f'delay:    {self.delay}'])
1244
1245
1246# -----------------------------------------------------------------------------
1247@Message.subclass
1248class DelayReport_Response(Message):
1249    '''
1250    See Bluetooth AVDTP spec - 8.19.2 Delay Report Response
1251    '''
1252
1253
1254# -----------------------------------------------------------------------------
1255@Message.subclass
1256class DelayReport_Reject(Simple_Reject):
1257    '''
1258    See Bluetooth AVDTP spec - 8.19.3 Delay Report Reject
1259    '''
1260
1261
1262# -----------------------------------------------------------------------------
1263class Protocol(EventEmitter):
1264    local_endpoints: List[LocalStreamEndPoint]
1265    remote_endpoints: Dict[int, DiscoveredStreamEndPoint]
1266    streams: Dict[int, Stream]
1267    transaction_results: List[Optional[asyncio.Future[Message]]]
1268    channel_connector: Callable[[], Awaitable[l2cap.ClassicChannel]]
1269
1270    class PacketType(enum.IntEnum):
1271        SINGLE_PACKET = 0
1272        START_PACKET = 1
1273        CONTINUE_PACKET = 2
1274        END_PACKET = 3
1275
1276    @staticmethod
1277    def packet_type_name(packet_type):
1278        return name_or_number(Protocol.PACKET_TYPE_NAMES, packet_type)
1279
1280    @staticmethod
1281    async def connect(
1282        connection: device.Connection, version: Tuple[int, int] = (1, 3)
1283    ) -> Protocol:
1284        channel = await connection.create_l2cap_channel(
1285            spec=l2cap.ClassicChannelSpec(psm=AVDTP_PSM)
1286        )
1287        protocol = Protocol(channel, version)
1288
1289        return protocol
1290
1291    def __init__(
1292        self, l2cap_channel: l2cap.ClassicChannel, version: Tuple[int, int] = (1, 3)
1293    ) -> None:
1294        super().__init__()
1295        self.l2cap_channel = l2cap_channel
1296        self.version = version
1297        self.rtx_sig_timer = AVDTP_DEFAULT_RTX_SIG_TIMER
1298        self.message_assembler = MessageAssembler(self.on_message)
1299        self.transaction_results = [None] * 16  # Futures for up to 16 transactions
1300        self.transaction_semaphore = asyncio.Semaphore(16)
1301        self.transaction_count = 0
1302        self.channel_acceptor = None
1303        self.local_endpoints = []  # Local endpoints, with contiguous seid values
1304        self.remote_endpoints = {}  # Remote stream endpoints, by seid
1305        self.streams = {}  # Streams, by seid
1306
1307        # Register to receive PDUs from the channel
1308        l2cap_channel.sink = self.on_pdu
1309        l2cap_channel.on('open', self.on_l2cap_channel_open)
1310        l2cap_channel.on('close', self.on_l2cap_channel_close)
1311
1312    def get_local_endpoint_by_seid(self, seid: int) -> Optional[LocalStreamEndPoint]:
1313        if 0 < seid <= len(self.local_endpoints):
1314            return self.local_endpoints[seid - 1]
1315
1316        return None
1317
1318    def add_source(
1319        self, codec_capabilities: MediaCodecCapabilities, packet_pump: MediaPacketPump
1320    ) -> LocalSource:
1321        seid = len(self.local_endpoints) + 1
1322        source = LocalSource(self, seid, codec_capabilities, packet_pump)
1323        self.local_endpoints.append(source)
1324
1325        return source
1326
1327    def add_sink(self, codec_capabilities: MediaCodecCapabilities) -> LocalSink:
1328        seid = len(self.local_endpoints) + 1
1329        sink = LocalSink(self, seid, codec_capabilities)
1330        self.local_endpoints.append(sink)
1331
1332        return sink
1333
1334    async def create_stream(
1335        self, source: LocalStreamEndPoint, sink: StreamEndPointProxy
1336    ) -> Stream:
1337        # Check that the source isn't already used in a stream
1338        if source.in_use:
1339            raise InvalidStateError('source already in use')
1340
1341        # Create or reuse a new stream to associate the source and the sink
1342        if source.seid in self.streams:
1343            stream = self.streams[source.seid]
1344        else:
1345            stream = Stream(self, source, sink)
1346            self.streams[source.seid] = stream
1347
1348        # The stream can now be configured
1349        await stream.configure()
1350
1351        return stream
1352
1353    async def discover_remote_endpoints(self) -> Iterable[DiscoveredStreamEndPoint]:
1354        self.remote_endpoints = {}
1355
1356        response: Discover_Response = await self.send_command(Discover_Command())
1357        for endpoint_entry in response.endpoints:
1358            logger.debug(
1359                f'getting endpoint capabilities for endpoint {endpoint_entry.seid}'
1360            )
1361            get_capabilities_response = await self.get_capabilities(endpoint_entry.seid)
1362            endpoint = DiscoveredStreamEndPoint(
1363                self,
1364                endpoint_entry.seid,
1365                endpoint_entry.media_type,
1366                endpoint_entry.tsep,
1367                endpoint_entry.in_use,
1368                get_capabilities_response.capabilities,
1369            )
1370            self.remote_endpoints[endpoint_entry.seid] = endpoint
1371
1372        return self.remote_endpoints.values()
1373
1374    def find_remote_sink_by_codec(
1375        self, media_type: int, codec_type: int
1376    ) -> Optional[DiscoveredStreamEndPoint]:
1377        for endpoint in self.remote_endpoints.values():
1378            if (
1379                not endpoint.in_use
1380                and endpoint.media_type == media_type
1381                and endpoint.tsep == AVDTP_TSEP_SNK
1382            ):
1383                has_media_transport = False
1384                has_codec = False
1385                for capabilities in endpoint.capabilities:
1386                    if (
1387                        capabilities.service_category
1388                        == AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY
1389                    ):
1390                        has_media_transport = True
1391                    elif (
1392                        capabilities.service_category
1393                        == AVDTP_MEDIA_CODEC_SERVICE_CATEGORY
1394                    ):
1395                        codec_capabilities = cast(MediaCodecCapabilities, capabilities)
1396                        if (
1397                            codec_capabilities.media_type == AVDTP_AUDIO_MEDIA_TYPE
1398                            and codec_capabilities.media_codec_type == codec_type
1399                        ):
1400                            has_codec = True
1401                if has_media_transport and has_codec:
1402                    return endpoint
1403
1404        return None
1405
1406    def on_pdu(self, pdu: bytes) -> None:
1407        self.message_assembler.on_pdu(pdu)
1408
1409    def on_message(self, transaction_label: int, message: Message) -> None:
1410        logger.debug(
1411            f'{color("<<< Received AVDTP message", "magenta")}: '
1412            f'[{transaction_label}] {message}'
1413        )
1414
1415        # Check that the identifier is not reserved
1416        if message.signal_identifier == 0:
1417            logger.warning('!!! reserved signal identifier')
1418            return
1419
1420        # Check that the identifier is valid
1421        if (
1422            message.signal_identifier < 0
1423            or message.signal_identifier > AVDTP_DELAYREPORT
1424        ):
1425            logger.warning('!!! invalid signal identifier')
1426            self.send_message(transaction_label, General_Reject())
1427
1428        if message.message_type == Message.MessageType.COMMAND:
1429            # Command
1430            signal_name = (
1431                AVDTP_SIGNAL_NAMES.get(message.signal_identifier, "")
1432                .replace("AVDTP_", "")
1433                .lower()
1434            )
1435            handler_name = f'on_{signal_name}_command'
1436            handler = getattr(self, handler_name, None)
1437            if handler:
1438                try:
1439                    response = handler(message)
1440                    self.send_message(transaction_label, response)
1441                except Exception as error:
1442                    logger.warning(
1443                        f'{color("!!! Exception in handler:", "red")} {error}'
1444                    )
1445            else:
1446                logger.warning('unhandled command')
1447        else:
1448            # Response, look for a pending transaction with the same label
1449            transaction_result = self.transaction_results[transaction_label]
1450            if transaction_result is None:
1451                logger.warning(color('!!! no pending transaction for label', 'red'))
1452                return
1453
1454            transaction_result.set_result(message)
1455            self.transaction_results[transaction_label] = None
1456            self.transaction_semaphore.release()
1457
1458    def on_l2cap_connection(self, channel):
1459        # Forward the channel to the endpoint that's expecting it
1460        if self.channel_acceptor is None:
1461            logger.warning(color('!!! l2cap connection with no acceptor', 'red'))
1462            return
1463        self.channel_acceptor.on_l2cap_connection(channel)
1464
1465    def on_l2cap_channel_open(self):
1466        logger.debug(color('<<< L2CAP channel open', 'magenta'))
1467        self.emit('open')
1468
1469    def on_l2cap_channel_close(self):
1470        logger.debug(color('<<< L2CAP channel close', 'magenta'))
1471        self.emit('close')
1472
1473    def send_message(self, transaction_label: int, message: Message) -> None:
1474        logger.debug(
1475            f'{color(">>> Sending AVDTP message", "magenta")}: '
1476            f'[{transaction_label}] {message}'
1477        )
1478        max_fragment_size = (
1479            self.l2cap_channel.peer_mtu - 3
1480        )  # Enough space for a 3-byte start packet header
1481        payload = message.payload
1482        if len(payload) + 2 <= self.l2cap_channel.peer_mtu:
1483            # Fits in a single packet
1484            packet_type = self.PacketType.SINGLE_PACKET
1485        else:
1486            packet_type = self.PacketType.START_PACKET
1487
1488        done = False
1489        while not done:
1490            first_header_byte = (
1491                transaction_label << 4 | packet_type << 2 | message.message_type
1492            )
1493
1494            if packet_type == self.PacketType.SINGLE_PACKET:
1495                header = bytes([first_header_byte, message.signal_identifier])
1496            elif packet_type == self.PacketType.START_PACKET:
1497                packet_count = (
1498                    max_fragment_size - 1 + len(payload)
1499                ) // max_fragment_size
1500                header = bytes(
1501                    [first_header_byte, message.signal_identifier, packet_count]
1502                )
1503            else:
1504                header = bytes([first_header_byte])
1505
1506            # Send one packet
1507            self.l2cap_channel.send_pdu(header + payload[:max_fragment_size])
1508
1509            # Prepare for the next packet
1510            payload = payload[max_fragment_size:]
1511            if payload:
1512                packet_type = (
1513                    self.PacketType.CONTINUE_PACKET
1514                    if len(payload) > max_fragment_size
1515                    else self.PacketType.END_PACKET
1516                )
1517            else:
1518                done = True
1519
1520    async def send_command(self, command: Message):
1521        # TODO: support timeouts
1522        # Send the command
1523        (transaction_label, transaction_result) = await self.start_transaction()
1524        self.send_message(transaction_label, command)
1525
1526        # Wait for the response
1527        response = await transaction_result
1528
1529        # Check for errors
1530        if response.message_type in (
1531            Message.MessageType.GENERAL_REJECT,
1532            Message.MessageType.RESPONSE_REJECT,
1533        ):
1534            assert hasattr(response, 'error_code')
1535            raise ProtocolError(response.error_code, 'avdtp')
1536
1537        return response
1538
1539    async def start_transaction(self) -> Tuple[int, asyncio.Future[Message]]:
1540        # Wait until we can start a new transaction
1541        await self.transaction_semaphore.acquire()
1542
1543        # Look for the next free entry to store the transaction result
1544        for i in range(16):
1545            transaction_label = (self.transaction_count + i) % 16
1546            if self.transaction_results[transaction_label] is None:
1547                transaction_result = asyncio.get_running_loop().create_future()
1548                self.transaction_results[transaction_label] = transaction_result
1549                self.transaction_count += 1
1550                return (transaction_label, transaction_result)
1551
1552        assert False  # Should never reach this
1553
1554    async def get_capabilities(self, seid: int) -> Union[
1555        Get_Capabilities_Response,
1556        Get_All_Capabilities_Response,
1557    ]:
1558        if self.version > (1, 2):
1559            return await self.send_command(Get_All_Capabilities_Command(seid))
1560
1561        return await self.send_command(Get_Capabilities_Command(seid))
1562
1563    async def set_configuration(
1564        self, acp_seid: int, int_seid: int, capabilities: Iterable[ServiceCapabilities]
1565    ) -> Set_Configuration_Response:
1566        return await self.send_command(
1567            Set_Configuration_Command(acp_seid, int_seid, capabilities)
1568        )
1569
1570    async def get_configuration(self, seid: int) -> Get_Configuration_Response:
1571        response = await self.send_command(Get_Configuration_Command(seid))
1572        return response.capabilities
1573
1574    async def open(self, seid: int) -> Open_Response:
1575        return await self.send_command(Open_Command(seid))
1576
1577    async def start(self, seids: Iterable[int]) -> Start_Response:
1578        return await self.send_command(Start_Command(seids))
1579
1580    async def suspend(self, seids: Iterable[int]) -> Suspend_Response:
1581        return await self.send_command(Suspend_Command(seids))
1582
1583    async def close(self, seid: int) -> Close_Response:
1584        return await self.send_command(Close_Command(seid))
1585
1586    async def abort(self, seid: int) -> Abort_Response:
1587        return await self.send_command(Abort_Command(seid))
1588
1589    def on_discover_command(self, _command):
1590        endpoint_infos = [
1591            EndPointInfo(endpoint.seid, 0, endpoint.media_type, endpoint.tsep)
1592            for endpoint in self.local_endpoints
1593        ]
1594        return Discover_Response(endpoint_infos)
1595
1596    def on_get_capabilities_command(self, command):
1597        endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
1598        if endpoint is None:
1599            return Get_Capabilities_Reject(AVDTP_BAD_ACP_SEID_ERROR)
1600
1601        return Get_Capabilities_Response(endpoint.capabilities)
1602
1603    def on_get_all_capabilities_command(self, command):
1604        endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
1605        if endpoint is None:
1606            return Get_All_Capabilities_Reject(AVDTP_BAD_ACP_SEID_ERROR)
1607
1608        return Get_All_Capabilities_Response(endpoint.capabilities)
1609
1610    def on_set_configuration_command(self, command):
1611        endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
1612        if endpoint is None:
1613            return Set_Configuration_Reject(AVDTP_BAD_ACP_SEID_ERROR)
1614
1615        # Check that the local endpoint isn't in use
1616        if endpoint.in_use:
1617            return Set_Configuration_Reject(AVDTP_SEP_IN_USE_ERROR)
1618
1619        # Create a stream object for the pair of endpoints
1620        stream = Stream(self, endpoint, StreamEndPointProxy(self, command.int_seid))
1621        self.streams[command.acp_seid] = stream
1622
1623        result = stream.on_set_configuration_command(command.capabilities)
1624        return result or Set_Configuration_Response()
1625
1626    def on_get_configuration_command(self, command):
1627        endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
1628        if endpoint is None:
1629            return Get_Configuration_Reject(AVDTP_BAD_ACP_SEID_ERROR)
1630        if endpoint.stream is None:
1631            return Get_Configuration_Reject(AVDTP_BAD_STATE_ERROR)
1632
1633        return endpoint.stream.on_get_configuration_command()
1634
1635    def on_reconfigure_command(self, command):
1636        endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
1637        if endpoint is None:
1638            return Reconfigure_Reject(0, AVDTP_BAD_ACP_SEID_ERROR)
1639        if endpoint.stream is None:
1640            return Reconfigure_Reject(0, AVDTP_BAD_STATE_ERROR)
1641
1642        result = endpoint.stream.on_reconfigure_command(command.capabilities)
1643        return result or Reconfigure_Response()
1644
1645    def on_open_command(self, command):
1646        endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
1647        if endpoint is None:
1648            return Open_Reject(AVDTP_BAD_ACP_SEID_ERROR)
1649        if endpoint.stream is None:
1650            return Open_Reject(AVDTP_BAD_STATE_ERROR)
1651
1652        result = endpoint.stream.on_open_command()
1653        return result or Open_Response()
1654
1655    def on_start_command(self, command):
1656        for seid in command.acp_seids:
1657            endpoint = self.get_local_endpoint_by_seid(seid)
1658            if endpoint is None:
1659                return Start_Reject(seid, AVDTP_BAD_ACP_SEID_ERROR)
1660            if endpoint.stream is None:
1661                return Start_Reject(AVDTP_BAD_STATE_ERROR)
1662
1663        # Start all streams
1664        # TODO: deal with partial failures
1665        for seid in command.acp_seids:
1666            endpoint = self.get_local_endpoint_by_seid(seid)
1667            result = endpoint.stream.on_start_command()
1668            if result is not None:
1669                return result
1670
1671        return Start_Response()
1672
1673    def on_suspend_command(self, command):
1674        for seid in command.acp_seids:
1675            endpoint = self.get_local_endpoint_by_seid(seid)
1676            if endpoint is None:
1677                return Suspend_Reject(seid, AVDTP_BAD_ACP_SEID_ERROR)
1678            if endpoint.stream is None:
1679                return Suspend_Reject(seid, AVDTP_BAD_STATE_ERROR)
1680
1681        # Suspend all streams
1682        # TODO: deal with partial failures
1683        for seid in command.acp_seids:
1684            endpoint = self.get_local_endpoint_by_seid(seid)
1685            result = endpoint.stream.on_suspend_command()
1686            if result is not None:
1687                return result
1688
1689        return Suspend_Response()
1690
1691    def on_close_command(self, command):
1692        endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
1693        if endpoint is None:
1694            return Close_Reject(AVDTP_BAD_ACP_SEID_ERROR)
1695        if endpoint.stream is None:
1696            return Close_Reject(AVDTP_BAD_STATE_ERROR)
1697
1698        result = endpoint.stream.on_close_command()
1699        return result or Close_Response()
1700
1701    def on_abort_command(self, command):
1702        endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
1703        if endpoint is None or endpoint.stream is None:
1704            return Abort_Response()
1705
1706        endpoint.stream.on_abort_command()
1707        return Abort_Response()
1708
1709    def on_security_control_command(self, command):
1710        endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
1711        if endpoint is None:
1712            return Security_Control_Reject(AVDTP_BAD_ACP_SEID_ERROR)
1713
1714        result = endpoint.on_security_control_command(command.payload)
1715        return result or Security_Control_Response()
1716
1717    def on_delayreport_command(self, command):
1718        endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
1719        if endpoint is None:
1720            return DelayReport_Reject(AVDTP_BAD_ACP_SEID_ERROR)
1721
1722        result = endpoint.on_delayreport_command(command.delay)
1723        return result or DelayReport_Response()
1724
1725
1726# -----------------------------------------------------------------------------
1727class Listener(EventEmitter):
1728    servers: Dict[int, Protocol]
1729
1730    @staticmethod
1731    def create_registrar(device: device.Device):
1732        warnings.warn("Please use Listener.for_device()", DeprecationWarning)
1733
1734        def wrapper(handler: Callable[[l2cap.ClassicChannel], None]) -> None:
1735            device.create_l2cap_server(l2cap.ClassicChannelSpec(psm=AVDTP_PSM), handler)
1736
1737        return wrapper
1738
1739    def set_server(self, connection: device.Connection, server: Protocol) -> None:
1740        self.servers[connection.handle] = server
1741
1742    def remove_server(self, connection: device.Connection) -> None:
1743        if connection.handle in self.servers:
1744            del self.servers[connection.handle]
1745
1746    def __init__(self, registrar=None, version=(1, 3)):
1747        super().__init__()
1748        self.version = version
1749        self.servers = {}  # Servers, by connection handle
1750
1751        # Listen for incoming L2CAP connections
1752        if registrar:
1753            warnings.warn("Please use Listener.for_device()", DeprecationWarning)
1754            registrar(self.on_l2cap_connection)
1755
1756    @classmethod
1757    def for_device(
1758        cls, device: device.Device, version: Tuple[int, int] = (1, 3)
1759    ) -> Listener:
1760        listener = Listener(registrar=None, version=version)
1761        l2cap_server = device.create_l2cap_server(
1762            spec=l2cap.ClassicChannelSpec(psm=AVDTP_PSM)
1763        )
1764        l2cap_server.on('connection', listener.on_l2cap_connection)
1765        return listener
1766
1767    def on_l2cap_connection(self, channel: l2cap.ClassicChannel) -> None:
1768        logger.debug(f'{color("<<< incoming L2CAP connection:", "magenta")} {channel}')
1769
1770        if channel.connection.handle in self.servers:
1771            # This is a channel for a stream endpoint
1772            server = self.servers[channel.connection.handle]
1773            server.on_l2cap_connection(channel)
1774        else:
1775            # This is a new command/response channel
1776            def on_channel_open():
1777                logger.debug('setting up new Protocol for the connection')
1778                server = Protocol(channel, self.version)
1779                self.set_server(channel.connection, server)
1780                self.emit('connection', server)
1781
1782            def on_channel_close():
1783                logger.debug('removing Protocol for the connection')
1784                self.remove_server(channel.connection)
1785
1786            channel.on('open', on_channel_open)
1787            channel.on('close', on_channel_close)
1788
1789
1790# -----------------------------------------------------------------------------
1791class Stream:
1792    '''
1793    Pair of a local and a remote stream endpoint that can stream from one to the other
1794    '''
1795
1796    rtp_channel: Optional[l2cap.ClassicChannel]
1797
1798    @staticmethod
1799    def state_name(state: int) -> str:
1800        return name_or_number(AVDTP_STATE_NAMES, state)
1801
1802    def change_state(self, state: int) -> None:
1803        logger.debug(f'{self} state change -> {color(self.state_name(state), "cyan")}')
1804        self.state = state
1805
1806    def send_media_packet(self, packet: MediaPacket) -> None:
1807        assert self.rtp_channel
1808        self.rtp_channel.send_pdu(bytes(packet))
1809
1810    async def configure(self) -> None:
1811        if self.state != AVDTP_IDLE_STATE:
1812            raise InvalidStateError('current state is not IDLE')
1813
1814        await self.remote_endpoint.set_configuration(
1815            self.local_endpoint.seid, self.local_endpoint.configuration
1816        )
1817        self.change_state(AVDTP_CONFIGURED_STATE)
1818
1819    async def open(self) -> None:
1820        if self.state != AVDTP_CONFIGURED_STATE:
1821            raise InvalidStateError('current state is not CONFIGURED')
1822
1823        logger.debug('opening remote endpoint')
1824        await self.remote_endpoint.open()
1825
1826        self.change_state(AVDTP_OPEN_STATE)
1827
1828        # Create a channel for RTP packets
1829        self.rtp_channel = (
1830            await self.protocol.l2cap_channel.connection.create_l2cap_channel(
1831                l2cap.ClassicChannelSpec(psm=AVDTP_PSM)
1832            )
1833        )
1834
1835    async def start(self) -> None:
1836        # Auto-open if needed
1837        if self.state == AVDTP_CONFIGURED_STATE:
1838            await self.open()
1839
1840        if self.state != AVDTP_OPEN_STATE:
1841            raise InvalidStateError('current state is not OPEN')
1842
1843        logger.debug('starting remote endpoint')
1844        await self.remote_endpoint.start()
1845
1846        logger.debug('starting local endpoint')
1847        await self.local_endpoint.start()
1848
1849        self.change_state(AVDTP_STREAMING_STATE)
1850
1851    async def stop(self) -> None:
1852        if self.state != AVDTP_STREAMING_STATE:
1853            raise InvalidStateError('current state is not STREAMING')
1854
1855        logger.debug('stopping local endpoint')
1856        await self.local_endpoint.stop()
1857
1858        logger.debug('stopping remote endpoint')
1859        await self.remote_endpoint.stop()
1860
1861        self.change_state(AVDTP_OPEN_STATE)
1862
1863    async def close(self) -> None:
1864        if self.state not in (AVDTP_OPEN_STATE, AVDTP_STREAMING_STATE):
1865            raise InvalidStateError('current state is not OPEN or STREAMING')
1866
1867        logger.debug('closing local endpoint')
1868        await self.local_endpoint.close()
1869
1870        logger.debug('closing remote endpoint')
1871        await self.remote_endpoint.close()
1872
1873        # Release any channels we may have created
1874        self.change_state(AVDTP_CLOSING_STATE)
1875        if self.rtp_channel:
1876            await self.rtp_channel.disconnect()
1877            self.rtp_channel = None
1878
1879        # Release the endpoint
1880        self.local_endpoint.in_use = 0
1881
1882        self.change_state(AVDTP_IDLE_STATE)
1883
1884    def on_set_configuration_command(self, configuration):
1885        if self.state != AVDTP_IDLE_STATE:
1886            return Set_Configuration_Reject(AVDTP_BAD_STATE_ERROR)
1887
1888        result = self.local_endpoint.on_set_configuration_command(configuration)
1889        if result is not None:
1890            return result
1891
1892        self.change_state(AVDTP_CONFIGURED_STATE)
1893        return None
1894
1895    def on_get_configuration_command(self, configuration):
1896        if self.state not in (
1897            AVDTP_CONFIGURED_STATE,
1898            AVDTP_OPEN_STATE,
1899            AVDTP_STREAMING_STATE,
1900        ):
1901            return Get_Configuration_Reject(AVDTP_BAD_STATE_ERROR)
1902
1903        return self.local_endpoint.on_get_configuration_command(configuration)
1904
1905    def on_reconfigure_command(self, configuration):
1906        if self.state != AVDTP_OPEN_STATE:
1907            return Reconfigure_Reject(AVDTP_BAD_STATE_ERROR)
1908
1909        result = self.local_endpoint.on_reconfigure_command(configuration)
1910        if result is not None:
1911            return result
1912
1913        return None
1914
1915    def on_open_command(self):
1916        if self.state != AVDTP_CONFIGURED_STATE:
1917            return Open_Reject(AVDTP_BAD_STATE_ERROR)
1918
1919        result = self.local_endpoint.on_open_command()
1920        if result is not None:
1921            return result
1922
1923        # Register to accept the next channel
1924        self.protocol.channel_acceptor = self
1925
1926        self.change_state(AVDTP_OPEN_STATE)
1927        return None
1928
1929    def on_start_command(self):
1930        if self.state != AVDTP_OPEN_STATE:
1931            return Open_Reject(AVDTP_BAD_STATE_ERROR)
1932
1933        # Check that we have an RTP channel
1934        if self.rtp_channel is None:
1935            logger.warning('received start command before RTP channel establishment')
1936            return Open_Reject(AVDTP_BAD_STATE_ERROR)
1937
1938        result = self.local_endpoint.on_start_command()
1939        if result is not None:
1940            return result
1941
1942        self.change_state(AVDTP_STREAMING_STATE)
1943        return None
1944
1945    def on_suspend_command(self):
1946        if self.state != AVDTP_STREAMING_STATE:
1947            return Open_Reject(AVDTP_BAD_STATE_ERROR)
1948
1949        result = self.local_endpoint.on_suspend_command()
1950        if result is not None:
1951            return result
1952
1953        self.change_state(AVDTP_OPEN_STATE)
1954        return None
1955
1956    def on_close_command(self):
1957        if self.state not in (AVDTP_OPEN_STATE, AVDTP_STREAMING_STATE):
1958            return Open_Reject(AVDTP_BAD_STATE_ERROR)
1959
1960        result = self.local_endpoint.on_close_command()
1961        if result is not None:
1962            return result
1963
1964        self.change_state(AVDTP_CLOSING_STATE)
1965
1966        if self.rtp_channel is None:
1967            # No channel to release, we're done
1968            self.change_state(AVDTP_IDLE_STATE)
1969        else:
1970            # TODO: set a timer as we wait for the RTP channel to be closed
1971            pass
1972
1973        return None
1974
1975    def on_abort_command(self):
1976        if self.rtp_channel is None:
1977            # No need to wait
1978            self.change_state(AVDTP_IDLE_STATE)
1979        else:
1980            # Wait for the RTP channel to be closed
1981            self.change_state(AVDTP_ABORTING_STATE)
1982
1983    def on_l2cap_connection(self, channel):
1984        logger.debug(color('<<< stream channel connected', 'magenta'))
1985        self.rtp_channel = channel
1986        channel.on('open', self.on_l2cap_channel_open)
1987        channel.on('close', self.on_l2cap_channel_close)
1988
1989        # We don't need more channels
1990        self.protocol.channel_acceptor = None
1991
1992    def on_l2cap_channel_open(self):
1993        logger.debug(color('<<< stream channel open', 'magenta'))
1994        self.local_endpoint.on_rtp_channel_open()
1995
1996    def on_l2cap_channel_close(self):
1997        logger.debug(color('<<< stream channel closed', 'magenta'))
1998        self.local_endpoint.on_rtp_channel_close()
1999        self.local_endpoint.in_use = 0
2000        self.rtp_channel = None
2001
2002        if self.state in (AVDTP_CLOSING_STATE, AVDTP_ABORTING_STATE):
2003            self.change_state(AVDTP_IDLE_STATE)
2004        else:
2005            logger.warning('unexpected channel close while not CLOSING or ABORTING')
2006
2007    def __init__(
2008        self,
2009        protocol: Protocol,
2010        local_endpoint: LocalStreamEndPoint,
2011        remote_endpoint: StreamEndPointProxy,
2012    ) -> None:
2013        '''
2014        remote_endpoint must be a subclass of StreamEndPointProxy
2015
2016        '''
2017        self.protocol = protocol
2018        self.local_endpoint = local_endpoint
2019        self.remote_endpoint = remote_endpoint
2020        self.rtp_channel = None
2021        self.state = AVDTP_IDLE_STATE
2022
2023        local_endpoint.stream = self
2024        local_endpoint.in_use = 1
2025
2026    def __str__(self) -> str:
2027        return (
2028            f'Stream({self.local_endpoint.seid} -> '
2029            f'{self.remote_endpoint.seid} {self.state_name(self.state)})'
2030        )
2031
2032
2033# -----------------------------------------------------------------------------
2034class StreamEndPoint:
2035    def __init__(
2036        self,
2037        seid: int,
2038        media_type: int,
2039        tsep: int,
2040        in_use: int,
2041        capabilities: Iterable[ServiceCapabilities],
2042    ) -> None:
2043        self.seid = seid
2044        self.media_type = media_type
2045        self.tsep = tsep
2046        self.in_use = in_use
2047        self.capabilities = capabilities
2048
2049    def __str__(self) -> str:
2050        media_type = f'{name_or_number(AVDTP_MEDIA_TYPE_NAMES, self.media_type)}'
2051        tsep = f'{name_or_number(AVDTP_TSEP_NAMES, self.tsep)}'
2052        return '\n'.join(
2053            [
2054                'SEP(',
2055                f'  seid={self.seid}',
2056                f'  media_type={media_type}',
2057                f'  tsep={tsep}',
2058                f'  in_use={self.in_use}',
2059                '  capabilities=[',
2060                '\n'.join([f'    {x}' for x in self.capabilities]),
2061                '  ]',
2062                ')',
2063            ]
2064        )
2065
2066
2067# -----------------------------------------------------------------------------
2068class StreamEndPointProxy:
2069    def __init__(self, protocol: Protocol, seid: int) -> None:
2070        self.seid = seid
2071        self.protocol = protocol
2072
2073    async def set_configuration(
2074        self, int_seid: int, configuration: Iterable[ServiceCapabilities]
2075    ) -> Set_Configuration_Response:
2076        return await self.protocol.set_configuration(self.seid, int_seid, configuration)
2077
2078    async def open(self) -> Open_Response:
2079        return await self.protocol.open(self.seid)
2080
2081    async def start(self) -> Start_Response:
2082        return await self.protocol.start([self.seid])
2083
2084    async def stop(self) -> Suspend_Response:
2085        return await self.protocol.suspend([self.seid])
2086
2087    async def close(self) -> Close_Response:
2088        return await self.protocol.close(self.seid)
2089
2090    async def abort(self) -> Abort_Response:
2091        return await self.protocol.abort(self.seid)
2092
2093
2094# -----------------------------------------------------------------------------
2095class DiscoveredStreamEndPoint(StreamEndPoint, StreamEndPointProxy):
2096    def __init__(
2097        self,
2098        protocol: Protocol,
2099        seid: int,
2100        media_type: int,
2101        tsep: int,
2102        in_use: int,
2103        capabilities: Iterable[ServiceCapabilities],
2104    ) -> None:
2105        StreamEndPoint.__init__(self, seid, media_type, tsep, in_use, capabilities)
2106        StreamEndPointProxy.__init__(self, protocol, seid)
2107
2108
2109# -----------------------------------------------------------------------------
2110class LocalStreamEndPoint(StreamEndPoint, EventEmitter):
2111    stream: Optional[Stream]
2112
2113    def __init__(
2114        self,
2115        protocol: Protocol,
2116        seid: int,
2117        media_type: int,
2118        tsep: int,
2119        capabilities: Iterable[ServiceCapabilities],
2120        configuration: Optional[Iterable[ServiceCapabilities]] = None,
2121    ):
2122        StreamEndPoint.__init__(self, seid, media_type, tsep, 0, capabilities)
2123        EventEmitter.__init__(self)
2124        self.protocol = protocol
2125        self.configuration = configuration if configuration is not None else []
2126        self.stream = None
2127
2128    async def start(self):
2129        pass
2130
2131    async def stop(self):
2132        pass
2133
2134    async def close(self):
2135        pass
2136
2137    def on_reconfigure_command(self, command):
2138        pass
2139
2140    def on_set_configuration_command(self, configuration):
2141        logger.debug(
2142            '<<< received configuration: '
2143            f'{",".join([str(capability) for capability in configuration])}'
2144        )
2145        self.configuration = configuration
2146        self.emit('configuration')
2147
2148    def on_get_configuration_command(self):
2149        return Get_Configuration_Response(self.configuration)
2150
2151    def on_open_command(self):
2152        self.emit('open')
2153
2154    def on_start_command(self):
2155        self.emit('start')
2156
2157    def on_suspend_command(self):
2158        self.emit('suspend')
2159
2160    def on_close_command(self):
2161        self.emit('close')
2162
2163    def on_abort_command(self):
2164        self.emit('abort')
2165
2166    def on_delayreport_command(self, delay: int):
2167        self.emit('delay_report', delay)
2168
2169    def on_rtp_channel_open(self):
2170        self.emit('rtp_channel_open')
2171
2172    def on_rtp_channel_close(self):
2173        self.emit('rtp_channel_close')
2174
2175
2176# -----------------------------------------------------------------------------
2177class LocalSource(LocalStreamEndPoint):
2178    def __init__(
2179        self,
2180        protocol: Protocol,
2181        seid: int,
2182        codec_capabilities: MediaCodecCapabilities,
2183        packet_pump: MediaPacketPump,
2184    ) -> None:
2185        capabilities = [
2186            ServiceCapabilities(AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY),
2187            codec_capabilities,
2188        ]
2189        super().__init__(
2190            protocol,
2191            seid,
2192            codec_capabilities.media_type,
2193            AVDTP_TSEP_SRC,
2194            capabilities,
2195            capabilities,
2196        )
2197        self.packet_pump = packet_pump
2198
2199    async def start(self) -> None:
2200        if self.packet_pump and self.stream and self.stream.rtp_channel:
2201            return await self.packet_pump.start(self.stream.rtp_channel)
2202
2203        self.emit('start')
2204
2205    async def stop(self) -> None:
2206        if self.packet_pump:
2207            return await self.packet_pump.stop()
2208
2209        self.emit('stop')
2210
2211    def on_start_command(self):
2212        asyncio.create_task(self.start())
2213
2214    def on_suspend_command(self):
2215        asyncio.create_task(self.stop())
2216
2217
2218# -----------------------------------------------------------------------------
2219class LocalSink(LocalStreamEndPoint):
2220    def __init__(
2221        self, protocol: Protocol, seid: int, codec_capabilities: MediaCodecCapabilities
2222    ) -> None:
2223        capabilities = [
2224            ServiceCapabilities(AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY),
2225            codec_capabilities,
2226        ]
2227        super().__init__(
2228            protocol,
2229            seid,
2230            codec_capabilities.media_type,
2231            AVDTP_TSEP_SNK,
2232            capabilities,
2233        )
2234
2235    def on_rtp_channel_open(self):
2236        logger.debug(color('<<< RTP channel open', 'magenta'))
2237        self.stream.rtp_channel.sink = self.on_avdtp_packet
2238        super().on_rtp_channel_open()
2239
2240    def on_rtp_channel_close(self):
2241        logger.debug(color('<<< RTP channel close', 'magenta'))
2242        super().on_rtp_channel_close()
2243
2244    def on_avdtp_packet(self, packet):
2245        rtp_packet = MediaPacket.from_bytes(packet)
2246        logger.debug(
2247            f'{color("<<< RTP Packet:", "green")} '
2248            f'{rtp_packet} {rtp_packet.payload[:16].hex()}'
2249        )
2250        self.emit('rtp_packet', rtp_packet)
2251