# Copyright 2021-2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- from __future__ import annotations import enum import logging from typing import Dict, List from bumble.core import BT_BR_EDR_TRANSPORT, CommandTimeoutError from bumble.device import Device, DeviceConfiguration from bumble.pairing import PairingConfig from bumble.sdp import ServiceAttribute from bumble.avdtp import ( AVDTP_AUDIO_MEDIA_TYPE, Listener, MediaCodecCapabilities, MediaPacket, Protocol, ) from bumble.a2dp import ( make_audio_sink_service_sdp_records, MPEG_2_AAC_LC_OBJECT_TYPE, A2DP_SBC_CODEC_TYPE, A2DP_MPEG_2_4_AAC_CODEC_TYPE, SBC_MONO_CHANNEL_MODE, SBC_DUAL_CHANNEL_MODE, SBC_SNR_ALLOCATION_METHOD, SBC_LOUDNESS_ALLOCATION_METHOD, SBC_STEREO_CHANNEL_MODE, SBC_JOINT_STEREO_CHANNEL_MODE, SbcMediaCodecInformation, AacMediaCodecInformation, ) from bumble.utils import AsyncRunner from bumble.codecs import AacAudioRtpPacket from bumble.hci import HCI_Reset_Command # ----------------------------------------------------------------------------- # Logging # ----------------------------------------------------------------------------- logger = logging.getLogger(__name__) # ----------------------------------------------------------------------------- class AudioExtractor: @staticmethod def create(codec: str): if codec == 'aac': return AacAudioExtractor() if codec == 'sbc': return SbcAudioExtractor() def extract_audio(self, packet: MediaPacket) -> bytes: raise NotImplementedError() # ----------------------------------------------------------------------------- class AacAudioExtractor: def extract_audio(self, packet: MediaPacket) -> bytes: return AacAudioRtpPacket(packet.payload).to_adts() # ----------------------------------------------------------------------------- class SbcAudioExtractor: def extract_audio(self, packet: MediaPacket) -> bytes: # header = packet.payload[0] # fragmented = header >> 7 # start = (header >> 6) & 0x01 # last = (header >> 5) & 0x01 # number_of_frames = header & 0x0F # TODO: support fragmented payloads return packet.payload[1:] # ----------------------------------------------------------------------------- class Speaker: class StreamState(enum.Enum): IDLE = 0 STOPPED = 1 STARTED = 2 SUSPENDED = 3 def __init__(self, hci_source, hci_sink, codec): self.hci_source = hci_source self.hci_sink = hci_sink self.js_listeners = {} self.codec = codec self.device = None self.connection = None self.avdtp_listener = None self.packets_received = 0 self.bytes_received = 0 self.stream_state = Speaker.StreamState.IDLE self.audio_extractor = AudioExtractor.create(codec) def sdp_records(self) -> Dict[int, List[ServiceAttribute]]: service_record_handle = 0x00010001 return { service_record_handle: make_audio_sink_service_sdp_records( service_record_handle ) } def codec_capabilities(self) -> MediaCodecCapabilities: if self.codec == 'aac': return self.aac_codec_capabilities() if self.codec == 'sbc': return self.sbc_codec_capabilities() raise RuntimeError('unsupported codec') def aac_codec_capabilities(self) -> MediaCodecCapabilities: return MediaCodecCapabilities( media_type=AVDTP_AUDIO_MEDIA_TYPE, media_codec_type=A2DP_MPEG_2_4_AAC_CODEC_TYPE, media_codec_information=AacMediaCodecInformation.from_lists( object_types=[MPEG_2_AAC_LC_OBJECT_TYPE], sampling_frequencies=[48000, 44100], channels=[1, 2], vbr=1, bitrate=256000, ), ) def sbc_codec_capabilities(self) -> MediaCodecCapabilities: return MediaCodecCapabilities( media_type=AVDTP_AUDIO_MEDIA_TYPE, media_codec_type=A2DP_SBC_CODEC_TYPE, media_codec_information=SbcMediaCodecInformation.from_lists( sampling_frequencies=[48000, 44100, 32000, 16000], channel_modes=[ SBC_MONO_CHANNEL_MODE, SBC_DUAL_CHANNEL_MODE, SBC_STEREO_CHANNEL_MODE, SBC_JOINT_STEREO_CHANNEL_MODE, ], block_lengths=[4, 8, 12, 16], subbands=[4, 8], allocation_methods=[ SBC_LOUDNESS_ALLOCATION_METHOD, SBC_SNR_ALLOCATION_METHOD, ], minimum_bitpool_value=2, maximum_bitpool_value=53, ), ) def on_key_store_update(self): print("Key Store updated") self.emit('key_store_update') def on_bluetooth_connection(self, connection): print(f'Connection: {connection}') self.connection = connection connection.on('disconnection', self.on_bluetooth_disconnection) peer_name = '' if connection.peer_name is None else connection.peer_name peer_address = connection.peer_address.to_string(False) self.emit('connection', {'peer_name': peer_name, 'peer_address': peer_address}) def on_bluetooth_disconnection(self, reason): print(f'Disconnection ({reason})') self.connection = None self.emit('disconnection', None) def on_avdtp_connection(self, protocol): print('Audio Stream Open') # Add a sink endpoint to the server sink = protocol.add_sink(self.codec_capabilities()) sink.on('start', self.on_sink_start) sink.on('stop', self.on_sink_stop) sink.on('suspend', self.on_sink_suspend) sink.on('configuration', lambda: self.on_sink_configuration(sink.configuration)) sink.on('rtp_packet', self.on_rtp_packet) sink.on('rtp_channel_open', self.on_rtp_channel_open) sink.on('rtp_channel_close', self.on_rtp_channel_close) # Listen for close events protocol.on('close', self.on_avdtp_close) def on_avdtp_close(self): print("Audio Stream Closed") def on_sink_start(self): print("Sink Started") self.stream_state = self.StreamState.STARTED self.emit('start', None) def on_sink_stop(self): print("Sink Stopped") self.stream_state = self.StreamState.STOPPED self.emit('stop', None) def on_sink_suspend(self): print("Sink Suspended") self.stream_state = self.StreamState.SUSPENDED self.emit('suspend', None) def on_sink_configuration(self, config): print("Sink Configuration:") print('\n'.join([" " + str(capability) for capability in config])) def on_rtp_channel_open(self): print("RTP Channel Open") def on_rtp_channel_close(self): print("RTP Channel Closed") self.stream_state = self.StreamState.IDLE def on_rtp_packet(self, packet): self.packets_received += 1 self.bytes_received += len(packet.payload) self.emit("audio", self.audio_extractor.extract_audio(packet)) async def connect(self, address): # Connect to the source print(f'=== Connecting to {address}...') connection = await self.device.connect(address, transport=BT_BR_EDR_TRANSPORT) print(f'=== Connected to {connection.peer_address}') # Request authentication print('*** Authenticating...') await connection.authenticate() print('*** Authenticated') # Enable encryption print('*** Enabling encryption...') await connection.encrypt() print('*** Encryption on') protocol = await Protocol.connect(connection) self.avdtp_listener.set_server(connection, protocol) self.on_avdtp_connection(protocol) async def discover_remote_endpoints(self, protocol): endpoints = await protocol.discover_remote_endpoints() print(f'@@@ Found {len(endpoints)} endpoints') for endpoint in endpoints: print('@@@', endpoint) def on(self, event_name, listener): self.js_listeners[event_name] = listener def emit(self, event_name, event=None): if listener := self.js_listeners.get(event_name): listener(event) async def run(self, connect_address): # Create a device device_config = DeviceConfiguration() device_config.name = "Bumble Speaker" device_config.class_of_device = 0x240414 device_config.keystore = "JsonKeyStore:/bumble/keystore.json" device_config.classic_enabled = True device_config.le_enabled = False self.device = Device.from_config_with_hci( device_config, self.hci_source, self.hci_sink ) # Setup the SDP to expose the sink service self.device.sdp_service_records = self.sdp_records() # Don't require MITM when pairing. self.device.pairing_config_factory = lambda connection: PairingConfig( mitm=False ) # Start the controller await self.device.power_on() # Listen for Bluetooth connections self.device.on('connection', self.on_bluetooth_connection) # Listen for changes to the key store self.device.on('key_store_update', self.on_key_store_update) # Create a listener to wait for AVDTP connections self.avdtp_listener = Listener.for_device(self.device) self.avdtp_listener.on('connection', self.on_avdtp_connection) print(f'Speaker ready to play, codec={self.codec}') if connect_address: # Connect to the source try: await self.connect(connect_address) except CommandTimeoutError: print("Connection timed out") return else: # We'll wait for a connection print("Waiting for connection...") async def start(self): await self.run(None) async def stop(self): # TODO: replace this once a proper reset is implemented in the lib. await self.device.host.send_command(HCI_Reset_Command()) await self.device.power_off() print('Speaker stopped') # ----------------------------------------------------------------------------- def main(hci_source, hci_sink): return Speaker(hci_source, hci_sink, "aac")