1# Copyright 2024 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from bumble.avdtp import Listener as AvdtpListener, MediaCodecCapabilities, AVDTP_AUDIO_MEDIA_TYPE
16from bumble.avrcp import Protocol as AvrcpProtocol, make_target_service_sdp_records, make_controller_service_sdp_records
17from bumble.a2dp import (A2DP_SBC_CODEC_TYPE, SBC_DUAL_CHANNEL_MODE, SBC_JOINT_STEREO_CHANNEL_MODE,
18                         SBC_LOUDNESS_ALLOCATION_METHOD, SBC_MONO_CHANNEL_MODE, SBC_SNR_ALLOCATION_METHOD,
19                         SBC_STEREO_CHANNEL_MODE, SbcMediaCodecInformation, make_audio_sink_service_sdp_records,
20                         make_audio_source_service_sdp_records)
21from bumble.device import Device
22from pandora_experimental.avrcp_grpc_aio import AVRCPServicer
23
24
25class AvrcpService(AVRCPServicer):
26    device: Device
27
28    def __init__(self, device: Device) -> None:
29        super().__init__()
30        self.device = device
31
32        sdp_records = {
33            0x00010002: make_audio_source_service_sdp_records(0x00010002),  # A2DP Source
34            0x00010003: make_audio_sink_service_sdp_records(0x00010003),  # A2DP Sink
35            0x00010004: make_controller_service_sdp_records(0x00010004),  # AVRCP Controller
36            0x00010005: make_target_service_sdp_records(0x00010005),  # AVRCP Target
37        }
38        self.device.sdp_service_records.update(sdp_records)
39
40        # Register AVDTP L2cap
41        avdtp_listener = AvdtpListener.for_device(device)
42
43        def on_avdtp_connection(server) -> None:  # type: ignore
44            server.add_sink(codec_capabilities())  # type: ignore
45
46        avdtp_listener.on('connection', on_avdtp_connection)  # type: ignore
47
48        # Register AVRCP L2cap
49        avrcp_protocol = AvrcpProtocol(delegate=None)
50        avrcp_protocol.listen(device)
51
52
53def codec_capabilities() -> MediaCodecCapabilities:
54    """Codec capabilities for the Bumble sink devices."""
55
56    return MediaCodecCapabilities(
57        media_type=AVDTP_AUDIO_MEDIA_TYPE,
58        media_codec_type=A2DP_SBC_CODEC_TYPE,
59        media_codec_information=SbcMediaCodecInformation.from_lists(
60            sampling_frequencies=[48000, 44100, 32000, 16000],
61            channel_modes=[
62                SBC_MONO_CHANNEL_MODE,
63                SBC_DUAL_CHANNEL_MODE,
64                SBC_STEREO_CHANNEL_MODE,
65                SBC_JOINT_STEREO_CHANNEL_MODE,
66            ],
67            block_lengths=[4, 8, 12, 16],
68            subbands=[4, 8],
69            allocation_methods=[
70                SBC_LOUDNESS_ALLOCATION_METHOD,
71                SBC_SNR_ALLOCATION_METHOD,
72            ],
73            minimum_bitpool_value=2,
74            maximum_bitpool_value=53,
75        ),
76    )
77