1# Copyright 2021-2023 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14""" 15Support for Realtek USB dongles. 16Based on various online bits of information, including the Linux kernel. 17(see `drivers/bluetooth/btrtl.c`) 18""" 19 20# ----------------------------------------------------------------------------- 21# Imports 22# ----------------------------------------------------------------------------- 23from dataclasses import dataclass 24import asyncio 25import enum 26import logging 27import math 28import os 29import pathlib 30import platform 31import struct 32from typing import Tuple 33import weakref 34 35 36from bumble import core 37from bumble.hci import ( 38 hci_vendor_command_op_code, 39 STATUS_SPEC, 40 HCI_SUCCESS, 41 HCI_Command, 42 HCI_Reset_Command, 43 HCI_Read_Local_Version_Information_Command, 44) 45from bumble.drivers import common 46 47# ----------------------------------------------------------------------------- 48# Logging 49# ----------------------------------------------------------------------------- 50logger = logging.getLogger(__name__) 51 52 53class RtkFirmwareError(core.BaseBumbleError): 54 """Error raised when RTK firmware initialization fails.""" 55 56 57# ----------------------------------------------------------------------------- 58# Constants 59# ----------------------------------------------------------------------------- 60RTK_ROM_LMP_8723A = 0x1200 61RTK_ROM_LMP_8723B = 0x8723 62RTK_ROM_LMP_8821A = 0x8821 63RTK_ROM_LMP_8761A = 0x8761 64RTK_ROM_LMP_8822B = 0x8822 65RTK_ROM_LMP_8852A = 0x8852 66RTK_CONFIG_MAGIC = 0x8723AB55 67 68RTK_EPATCH_SIGNATURE = b"Realtech" 69 70RTK_FRAGMENT_LENGTH = 252 71 72RTK_FIRMWARE_DIR_ENV = "BUMBLE_RTK_FIRMWARE_DIR" 73RTK_LINUX_FIRMWARE_DIR = "/lib/firmware/rtl_bt" 74 75 76class RtlProjectId(enum.IntEnum): 77 PROJECT_ID_8723A = 0 78 PROJECT_ID_8723B = 1 79 PROJECT_ID_8821A = 2 80 PROJECT_ID_8761A = 3 81 PROJECT_ID_8822B = 8 82 PROJECT_ID_8723D = 9 83 PROJECT_ID_8821C = 10 84 PROJECT_ID_8822C = 13 85 PROJECT_ID_8761B = 14 86 PROJECT_ID_8852A = 18 87 PROJECT_ID_8852B = 20 88 PROJECT_ID_8852C = 25 89 90 91RTK_PROJECT_ID_TO_ROM = { 92 0: RTK_ROM_LMP_8723A, 93 1: RTK_ROM_LMP_8723B, 94 2: RTK_ROM_LMP_8821A, 95 3: RTK_ROM_LMP_8761A, 96 8: RTK_ROM_LMP_8822B, 97 9: RTK_ROM_LMP_8723B, 98 10: RTK_ROM_LMP_8821A, 99 13: RTK_ROM_LMP_8822B, 100 14: RTK_ROM_LMP_8761A, 101 18: RTK_ROM_LMP_8852A, 102 20: RTK_ROM_LMP_8852A, 103 25: RTK_ROM_LMP_8852A, 104} 105 106# List of USB (VendorID, ProductID) for Realtek-based devices. 107RTK_USB_PRODUCTS = { 108 # Realtek 8723AE 109 (0x0930, 0x021D), 110 (0x13D3, 0x3394), 111 # Realtek 8723BE 112 (0x0489, 0xE085), 113 (0x0489, 0xE08B), 114 (0x04F2, 0xB49F), 115 (0x13D3, 0x3410), 116 (0x13D3, 0x3416), 117 (0x13D3, 0x3459), 118 (0x13D3, 0x3494), 119 # Realtek 8723BU 120 (0x7392, 0xA611), 121 # Realtek 8723DE 122 (0x0BDA, 0xB009), 123 (0x2FF8, 0xB011), 124 # Realtek 8761BUV 125 (0x0B05, 0x190E), 126 (0x0BDA, 0x8771), 127 (0x2230, 0x0016), 128 (0x2357, 0x0604), 129 (0x2550, 0x8761), 130 (0x2B89, 0x8761), 131 (0x7392, 0xC611), 132 (0x0BDA, 0x877B), 133 # Realtek 8821AE 134 (0x0B05, 0x17DC), 135 (0x13D3, 0x3414), 136 (0x13D3, 0x3458), 137 (0x13D3, 0x3461), 138 (0x13D3, 0x3462), 139 # Realtek 8821CE 140 (0x0BDA, 0xB00C), 141 (0x0BDA, 0xC822), 142 (0x13D3, 0x3529), 143 # Realtek 8822BE 144 (0x0B05, 0x185C), 145 (0x13D3, 0x3526), 146 # Realtek 8822CE 147 (0x04C5, 0x161F), 148 (0x04CA, 0x4005), 149 (0x0B05, 0x18EF), 150 (0x0BDA, 0xB00C), 151 (0x0BDA, 0xC123), 152 (0x0BDA, 0xC822), 153 (0x0CB5, 0xC547), 154 (0x1358, 0xC123), 155 (0x13D3, 0x3548), 156 (0x13D3, 0x3549), 157 (0x13D3, 0x3553), 158 (0x13D3, 0x3555), 159 (0x2FF8, 0x3051), 160 # Realtek 8822CU 161 (0x13D3, 0x3549), 162 # Realtek 8852AE 163 (0x04C5, 0x165C), 164 (0x04CA, 0x4006), 165 (0x0BDA, 0x2852), 166 (0x0BDA, 0x385A), 167 (0x0BDA, 0x4852), 168 (0x0BDA, 0xC852), 169 (0x0CB8, 0xC549), 170 # Realtek 8852BE 171 (0x0BDA, 0x887B), 172 (0x0CB8, 0xC559), 173 (0x13D3, 0x3571), 174 # Realtek 8852CE 175 (0x04C5, 0x1675), 176 (0x04CA, 0x4007), 177 (0x0CB8, 0xC558), 178 (0x13D3, 0x3586), 179 (0x13D3, 0x3587), 180 (0x13D3, 0x3592), 181} 182 183# ----------------------------------------------------------------------------- 184# HCI Commands 185# ----------------------------------------------------------------------------- 186HCI_RTK_READ_ROM_VERSION_COMMAND = hci_vendor_command_op_code(0x6D) 187HCI_RTK_DOWNLOAD_COMMAND = hci_vendor_command_op_code(0x20) 188HCI_RTK_DROP_FIRMWARE_COMMAND = hci_vendor_command_op_code(0x66) 189HCI_Command.register_commands(globals()) 190 191 192@HCI_Command.command(return_parameters_fields=[("status", STATUS_SPEC), ("version", 1)]) 193class HCI_RTK_Read_ROM_Version_Command(HCI_Command): 194 pass 195 196 197@HCI_Command.command( 198 fields=[("index", 1), ("payload", RTK_FRAGMENT_LENGTH)], 199 return_parameters_fields=[("status", STATUS_SPEC), ("index", 1)], 200) 201class HCI_RTK_Download_Command(HCI_Command): 202 pass 203 204 205@HCI_Command.command() 206class HCI_RTK_Drop_Firmware_Command(HCI_Command): 207 pass 208 209 210# ----------------------------------------------------------------------------- 211class Firmware: 212 def __init__(self, firmware): 213 extension_sig = bytes([0x51, 0x04, 0xFD, 0x77]) 214 215 if not firmware.startswith(RTK_EPATCH_SIGNATURE): 216 raise RtkFirmwareError("Firmware does not start with epatch signature") 217 218 if not firmware.endswith(extension_sig): 219 raise RtkFirmwareError("Firmware does not end with extension sig") 220 221 # The firmware should start with a 14 byte header. 222 epatch_header_size = 14 223 if len(firmware) < epatch_header_size: 224 raise RtkFirmwareError("Firmware too short") 225 226 # Look for the "project ID", starting from the end. 227 offset = len(firmware) - len(extension_sig) 228 project_id = -1 229 while offset >= epatch_header_size: 230 length, opcode = firmware[offset - 2 : offset] 231 offset -= 2 232 233 if opcode == 0xFF: 234 # End 235 break 236 237 if length == 0: 238 raise RtkFirmwareError("Invalid 0-length instruction") 239 240 if opcode == 0 and length == 1: 241 project_id = firmware[offset - 1] 242 break 243 244 offset -= length 245 246 if project_id < 0: 247 raise RtkFirmwareError("Project ID not found") 248 249 self.project_id = project_id 250 251 # Read the patch tables info. 252 self.version, num_patches = struct.unpack("<IH", firmware[8:14]) 253 self.patches = [] 254 255 # The patches tables are laid out as: 256 # <ChipID_1><ChipID_2>...<ChipID_N> (16 bits each) 257 # <PatchLength_1><PatchLength_2>...<PatchLength_N> (16 bits each) 258 # <PatchOffset_1><PatchOffset_2>...<PatchOffset_N> (32 bits each) 259 if epatch_header_size + 8 * num_patches > len(firmware): 260 raise RtkFirmwareError("Firmware too short") 261 chip_id_table_offset = epatch_header_size 262 patch_length_table_offset = chip_id_table_offset + 2 * num_patches 263 patch_offset_table_offset = chip_id_table_offset + 4 * num_patches 264 for patch_index in range(num_patches): 265 chip_id_offset = chip_id_table_offset + 2 * patch_index 266 (chip_id,) = struct.unpack_from("<H", firmware, chip_id_offset) 267 (patch_length,) = struct.unpack_from( 268 "<H", firmware, patch_length_table_offset + 2 * patch_index 269 ) 270 (patch_offset,) = struct.unpack_from( 271 "<I", firmware, patch_offset_table_offset + 4 * patch_index 272 ) 273 if patch_offset + patch_length > len(firmware): 274 raise RtkFirmwareError("Firmware too short") 275 276 # Get the SVN version for the patch 277 (svn_version,) = struct.unpack_from( 278 "<I", firmware, patch_offset + patch_length - 8 279 ) 280 281 # Create a payload with the patch, replacing the last 4 bytes with 282 # the firmware version. 283 self.patches.append( 284 ( 285 chip_id, 286 firmware[patch_offset : patch_offset + patch_length - 4] 287 + struct.pack("<I", self.version), 288 svn_version, 289 ) 290 ) 291 292 293class Driver(common.Driver): 294 @dataclass 295 class DriverInfo: 296 rom: int 297 hci: Tuple[int, int] 298 config_needed: bool 299 has_rom_version: bool 300 has_msft_ext: bool = False 301 fw_name: str = "" 302 config_name: str = "" 303 304 POST_RESET_DELAY: float = 0.2 305 306 DRIVER_INFOS = [ 307 # 8723A 308 DriverInfo( 309 rom=RTK_ROM_LMP_8723A, 310 hci=(0x0B, 0x06), 311 config_needed=False, 312 has_rom_version=False, 313 fw_name="rtl8723a_fw.bin", 314 config_name="", 315 ), 316 # 8723B 317 DriverInfo( 318 rom=RTK_ROM_LMP_8723B, 319 hci=(0x0B, 0x06), 320 config_needed=False, 321 has_rom_version=True, 322 fw_name="rtl8723b_fw.bin", 323 config_name="rtl8723b_config.bin", 324 ), 325 # 8723D 326 DriverInfo( 327 rom=RTK_ROM_LMP_8723B, 328 hci=(0x0D, 0x08), 329 config_needed=True, 330 has_rom_version=True, 331 fw_name="rtl8723d_fw.bin", 332 config_name="rtl8723d_config.bin", 333 ), 334 # 8821A 335 DriverInfo( 336 rom=RTK_ROM_LMP_8821A, 337 hci=(0x0A, 0x06), 338 config_needed=False, 339 has_rom_version=True, 340 fw_name="rtl8821a_fw.bin", 341 config_name="rtl8821a_config.bin", 342 ), 343 # 8821C 344 DriverInfo( 345 rom=RTK_ROM_LMP_8821A, 346 hci=(0x0C, 0x08), 347 config_needed=False, 348 has_rom_version=True, 349 has_msft_ext=True, 350 fw_name="rtl8821c_fw.bin", 351 config_name="rtl8821c_config.bin", 352 ), 353 # 8761A 354 DriverInfo( 355 rom=RTK_ROM_LMP_8761A, 356 hci=(0x0A, 0x06), 357 config_needed=False, 358 has_rom_version=True, 359 fw_name="rtl8761a_fw.bin", 360 config_name="rtl8761a_config.bin", 361 ), 362 # 8761BU 363 DriverInfo( 364 rom=RTK_ROM_LMP_8761A, 365 hci=(0x0B, 0x0A), 366 config_needed=False, 367 has_rom_version=True, 368 fw_name="rtl8761bu_fw.bin", 369 config_name="rtl8761bu_config.bin", 370 ), 371 # 8822C 372 DriverInfo( 373 rom=RTK_ROM_LMP_8822B, 374 hci=(0x0C, 0x0A), 375 config_needed=False, 376 has_rom_version=True, 377 has_msft_ext=True, 378 fw_name="rtl8822cu_fw.bin", 379 config_name="rtl8822cu_config.bin", 380 ), 381 # 8822B 382 DriverInfo( 383 rom=RTK_ROM_LMP_8822B, 384 hci=(0x0B, 0x07), 385 config_needed=True, 386 has_rom_version=True, 387 has_msft_ext=True, 388 fw_name="rtl8822b_fw.bin", 389 config_name="rtl8822b_config.bin", 390 ), 391 # 8852A 392 DriverInfo( 393 rom=RTK_ROM_LMP_8852A, 394 hci=(0x0A, 0x0B), 395 config_needed=False, 396 has_rom_version=True, 397 has_msft_ext=True, 398 fw_name="rtl8852au_fw.bin", 399 config_name="rtl8852au_config.bin", 400 ), 401 # 8852B 402 DriverInfo( 403 rom=RTK_ROM_LMP_8852A, 404 hci=(0xB, 0xB), 405 config_needed=False, 406 has_rom_version=True, 407 has_msft_ext=True, 408 fw_name="rtl8852bu_fw.bin", 409 config_name="rtl8852bu_config.bin", 410 ), 411 # 8852C 412 DriverInfo( 413 rom=RTK_ROM_LMP_8852A, 414 hci=(0x0C, 0x0C), 415 config_needed=False, 416 has_rom_version=True, 417 has_msft_ext=True, 418 fw_name="rtl8852cu_fw.bin", 419 config_name="rtl8852cu_config.bin", 420 ), 421 ] 422 423 POST_DROP_DELAY = 0.2 424 425 @staticmethod 426 def find_driver_info(hci_version, hci_subversion, lmp_subversion): 427 for driver_info in Driver.DRIVER_INFOS: 428 if driver_info.rom == lmp_subversion and driver_info.hci == ( 429 hci_subversion, 430 hci_version, 431 ): 432 return driver_info 433 434 return None 435 436 @staticmethod 437 def find_binary_path(file_name): 438 # First check if an environment variable is set 439 if RTK_FIRMWARE_DIR_ENV in os.environ: 440 if ( 441 path := pathlib.Path(os.environ[RTK_FIRMWARE_DIR_ENV]) / file_name 442 ).is_file(): 443 logger.debug(f"{file_name} found in env dir") 444 return path 445 446 # When the environment variable is set, don't look elsewhere 447 return None 448 449 # Then, look where the firmware download tool writes by default 450 if (path := rtk_firmware_dir() / file_name).is_file(): 451 logger.debug(f"{file_name} found in project data dir") 452 return path 453 454 # Then, look in the package's driver directory 455 if (path := pathlib.Path(__file__).parent / "rtk_fw" / file_name).is_file(): 456 logger.debug(f"{file_name} found in package dir") 457 return path 458 459 # On Linux, check the system's FW directory 460 if ( 461 platform.system() == "Linux" 462 and (path := pathlib.Path(RTK_LINUX_FIRMWARE_DIR) / file_name).is_file() 463 ): 464 logger.debug(f"{file_name} found in Linux system FW dir") 465 return path 466 467 # Finally look in the current directory 468 if (path := pathlib.Path.cwd() / file_name).is_file(): 469 logger.debug(f"{file_name} found in CWD") 470 return path 471 472 return None 473 474 @staticmethod 475 def check(host): 476 if not host.hci_metadata: 477 logger.debug("USB metadata not found") 478 return False 479 480 if host.hci_metadata.get('driver') == 'rtk': 481 # Forced driver 482 return True 483 484 vendor_id = host.hci_metadata.get("vendor_id") 485 product_id = host.hci_metadata.get("product_id") 486 if vendor_id is None or product_id is None: 487 logger.debug("USB metadata not sufficient") 488 return False 489 490 if (vendor_id, product_id) not in RTK_USB_PRODUCTS: 491 logger.debug( 492 f"USB device ({vendor_id:04X}, {product_id:04X}) " "not in known list" 493 ) 494 return False 495 496 return True 497 498 @classmethod 499 async def driver_info_for_host(cls, host): 500 try: 501 await host.send_command( 502 HCI_Reset_Command(), 503 check_result=True, 504 response_timeout=cls.POST_RESET_DELAY, 505 ) 506 host.ready = True # Needed to let the host know the controller is ready. 507 except asyncio.exceptions.TimeoutError: 508 logger.warning("timeout waiting for hci reset, retrying") 509 await host.send_command(HCI_Reset_Command(), check_result=True) 510 host.ready = True 511 512 command = HCI_Read_Local_Version_Information_Command() 513 response = await host.send_command(command, check_result=True) 514 if response.command_opcode != command.op_code: 515 logger.error("failed to probe local version information") 516 return None 517 518 local_version = response.return_parameters 519 520 logger.debug( 521 f"looking for a driver: 0x{local_version.lmp_subversion:04X} " 522 f"(0x{local_version.hci_version:02X}, " 523 f"0x{local_version.hci_subversion:04X})" 524 ) 525 526 driver_info = cls.find_driver_info( 527 local_version.hci_version, 528 local_version.hci_subversion, 529 local_version.lmp_subversion, 530 ) 531 if driver_info is None: 532 # TODO: it seems that the Linux driver will send command (0x3f, 0x66) 533 # in this case and then re-read the local version, then re-match. 534 logger.debug("firmware already loaded or no known driver for this device") 535 536 return driver_info 537 538 @classmethod 539 async def for_host(cls, host, force=False): 540 # Check that a driver is needed for this host 541 if not force and not cls.check(host): 542 return None 543 544 # Get the driver info 545 driver_info = await cls.driver_info_for_host(host) 546 if driver_info is None: 547 return None 548 549 # Load the firmware 550 firmware_path = cls.find_binary_path(driver_info.fw_name) 551 if not firmware_path: 552 logger.warning(f"Firmware file {driver_info.fw_name} not found") 553 logger.warning("See https://google.github.io/bumble/drivers/realtek.html") 554 return None 555 with open(firmware_path, "rb") as firmware_file: 556 firmware = firmware_file.read() 557 558 # Load the config 559 config = None 560 if driver_info.config_name: 561 config_path = cls.find_binary_path(driver_info.config_name) 562 if config_path: 563 with open(config_path, "rb") as config_file: 564 config = config_file.read() 565 if driver_info.config_needed and not config: 566 logger.warning("Config needed, but no config file available") 567 return None 568 569 return cls(host, driver_info, firmware, config) 570 571 def __init__(self, host, driver_info, firmware, config): 572 self.host = weakref.proxy(host) 573 self.driver_info = driver_info 574 self.firmware = firmware 575 self.config = config 576 577 @staticmethod 578 async def drop_firmware(host): 579 host.send_hci_packet(HCI_RTK_Drop_Firmware_Command()) 580 581 # Wait for the command to be effective (no response is sent) 582 await asyncio.sleep(Driver.POST_DROP_DELAY) 583 584 async def download_for_rtl8723a(self): 585 # Check that the firmware image does not include an epatch signature. 586 if RTK_EPATCH_SIGNATURE in self.firmware: 587 logger.warning( 588 "epatch signature found in firmware, it is probably the wrong firmware" 589 ) 590 return 591 592 # TODO: load the firmware 593 594 async def download_for_rtl8723b(self): 595 if self.driver_info.has_rom_version: 596 response = await self.host.send_command( 597 HCI_RTK_Read_ROM_Version_Command(), check_result=True 598 ) 599 if response.return_parameters.status != HCI_SUCCESS: 600 logger.warning("can't get ROM version") 601 return 602 rom_version = response.return_parameters.version 603 logger.debug(f"ROM version before download: {rom_version:04X}") 604 else: 605 rom_version = 0 606 607 firmware = Firmware(self.firmware) 608 logger.debug(f"firmware: project_id=0x{firmware.project_id:04X}") 609 for patch in firmware.patches: 610 if patch[0] == rom_version + 1: 611 logger.debug(f"using patch {patch[0]}") 612 break 613 else: 614 logger.warning("no valid patch found for rom version {rom_version}") 615 return 616 617 # Append the config if there is one. 618 if self.config: 619 payload = patch[1] + self.config 620 else: 621 payload = patch[1] 622 623 # Download the payload, one fragment at a time. 624 fragment_count = math.ceil(len(payload) / RTK_FRAGMENT_LENGTH) 625 for fragment_index in range(fragment_count): 626 # NOTE: the Linux driver somehow adds 1 to the index after it wraps around. 627 # That's odd, but we"ll do the same here. 628 download_index = fragment_index & 0x7F 629 if download_index >= 0x80: 630 download_index += 1 631 if fragment_index == fragment_count - 1: 632 download_index |= 0x80 # End marker. 633 fragment_offset = fragment_index * RTK_FRAGMENT_LENGTH 634 fragment = payload[fragment_offset : fragment_offset + RTK_FRAGMENT_LENGTH] 635 logger.debug(f"downloading fragment {fragment_index}") 636 await self.host.send_command( 637 HCI_RTK_Download_Command( 638 index=download_index, payload=fragment, check_result=True 639 ) 640 ) 641 642 logger.debug("download complete!") 643 644 # Read the version again 645 response = await self.host.send_command( 646 HCI_RTK_Read_ROM_Version_Command(), check_result=True 647 ) 648 if response.return_parameters.status != HCI_SUCCESS: 649 logger.warning("can't get ROM version") 650 else: 651 rom_version = response.return_parameters.version 652 logger.debug(f"ROM version after download: {rom_version:04X}") 653 654 async def download_firmware(self): 655 if self.driver_info.rom == RTK_ROM_LMP_8723A: 656 return await self.download_for_rtl8723a() 657 658 if self.driver_info.rom in ( 659 RTK_ROM_LMP_8723B, 660 RTK_ROM_LMP_8821A, 661 RTK_ROM_LMP_8761A, 662 RTK_ROM_LMP_8822B, 663 RTK_ROM_LMP_8852A, 664 ): 665 return await self.download_for_rtl8723b() 666 667 raise RtkFirmwareError("ROM not supported") 668 669 async def init_controller(self): 670 await self.download_firmware() 671 await self.host.send_command(HCI_Reset_Command(), check_result=True) 672 logger.info(f"loaded FW image {self.driver_info.fw_name}") 673 674 675def rtk_firmware_dir() -> pathlib.Path: 676 """ 677 Returns: 678 A path to a subdir of the project data dir for Realtek firmware. 679 The directory is created if it doesn't exist. 680 """ 681 from bumble.drivers import project_data_dir 682 683 p = project_data_dir() / "firmware" / "realtek" 684 p.mkdir(parents=True, exist_ok=True) 685 return p 686