1# Copyright 2021-2023 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""
15Support for Realtek USB dongles.
16Based on various online bits of information, including the Linux kernel.
17(see `drivers/bluetooth/btrtl.c`)
18"""
19
20# -----------------------------------------------------------------------------
21# Imports
22# -----------------------------------------------------------------------------
23from dataclasses import dataclass
24import asyncio
25import enum
26import logging
27import math
28import os
29import pathlib
30import platform
31import struct
32from typing import Tuple
33import weakref
34
35
36from bumble import core
37from bumble.hci import (
38    hci_vendor_command_op_code,
39    STATUS_SPEC,
40    HCI_SUCCESS,
41    HCI_Command,
42    HCI_Reset_Command,
43    HCI_Read_Local_Version_Information_Command,
44)
45from bumble.drivers import common
46
47# -----------------------------------------------------------------------------
48# Logging
49# -----------------------------------------------------------------------------
50logger = logging.getLogger(__name__)
51
52
53class RtkFirmwareError(core.BaseBumbleError):
54    """Error raised when RTK firmware initialization fails."""
55
56
57# -----------------------------------------------------------------------------
58# Constants
59# -----------------------------------------------------------------------------
60RTK_ROM_LMP_8723A = 0x1200
61RTK_ROM_LMP_8723B = 0x8723
62RTK_ROM_LMP_8821A = 0x8821
63RTK_ROM_LMP_8761A = 0x8761
64RTK_ROM_LMP_8822B = 0x8822
65RTK_ROM_LMP_8852A = 0x8852
66RTK_CONFIG_MAGIC = 0x8723AB55
67
68RTK_EPATCH_SIGNATURE = b"Realtech"
69
70RTK_FRAGMENT_LENGTH = 252
71
72RTK_FIRMWARE_DIR_ENV = "BUMBLE_RTK_FIRMWARE_DIR"
73RTK_LINUX_FIRMWARE_DIR = "/lib/firmware/rtl_bt"
74
75
76class RtlProjectId(enum.IntEnum):
77    PROJECT_ID_8723A = 0
78    PROJECT_ID_8723B = 1
79    PROJECT_ID_8821A = 2
80    PROJECT_ID_8761A = 3
81    PROJECT_ID_8822B = 8
82    PROJECT_ID_8723D = 9
83    PROJECT_ID_8821C = 10
84    PROJECT_ID_8822C = 13
85    PROJECT_ID_8761B = 14
86    PROJECT_ID_8852A = 18
87    PROJECT_ID_8852B = 20
88    PROJECT_ID_8852C = 25
89
90
91RTK_PROJECT_ID_TO_ROM = {
92    0: RTK_ROM_LMP_8723A,
93    1: RTK_ROM_LMP_8723B,
94    2: RTK_ROM_LMP_8821A,
95    3: RTK_ROM_LMP_8761A,
96    8: RTK_ROM_LMP_8822B,
97    9: RTK_ROM_LMP_8723B,
98    10: RTK_ROM_LMP_8821A,
99    13: RTK_ROM_LMP_8822B,
100    14: RTK_ROM_LMP_8761A,
101    18: RTK_ROM_LMP_8852A,
102    20: RTK_ROM_LMP_8852A,
103    25: RTK_ROM_LMP_8852A,
104}
105
106# List of USB (VendorID, ProductID) for Realtek-based devices.
107RTK_USB_PRODUCTS = {
108    # Realtek 8723AE
109    (0x0930, 0x021D),
110    (0x13D3, 0x3394),
111    # Realtek 8723BE
112    (0x0489, 0xE085),
113    (0x0489, 0xE08B),
114    (0x04F2, 0xB49F),
115    (0x13D3, 0x3410),
116    (0x13D3, 0x3416),
117    (0x13D3, 0x3459),
118    (0x13D3, 0x3494),
119    # Realtek 8723BU
120    (0x7392, 0xA611),
121    # Realtek 8723DE
122    (0x0BDA, 0xB009),
123    (0x2FF8, 0xB011),
124    # Realtek 8761BUV
125    (0x0B05, 0x190E),
126    (0x0BDA, 0x8771),
127    (0x2230, 0x0016),
128    (0x2357, 0x0604),
129    (0x2550, 0x8761),
130    (0x2B89, 0x8761),
131    (0x7392, 0xC611),
132    (0x0BDA, 0x877B),
133    # Realtek 8821AE
134    (0x0B05, 0x17DC),
135    (0x13D3, 0x3414),
136    (0x13D3, 0x3458),
137    (0x13D3, 0x3461),
138    (0x13D3, 0x3462),
139    # Realtek 8821CE
140    (0x0BDA, 0xB00C),
141    (0x0BDA, 0xC822),
142    (0x13D3, 0x3529),
143    # Realtek 8822BE
144    (0x0B05, 0x185C),
145    (0x13D3, 0x3526),
146    # Realtek 8822CE
147    (0x04C5, 0x161F),
148    (0x04CA, 0x4005),
149    (0x0B05, 0x18EF),
150    (0x0BDA, 0xB00C),
151    (0x0BDA, 0xC123),
152    (0x0BDA, 0xC822),
153    (0x0CB5, 0xC547),
154    (0x1358, 0xC123),
155    (0x13D3, 0x3548),
156    (0x13D3, 0x3549),
157    (0x13D3, 0x3553),
158    (0x13D3, 0x3555),
159    (0x2FF8, 0x3051),
160    # Realtek 8822CU
161    (0x13D3, 0x3549),
162    # Realtek 8852AE
163    (0x04C5, 0x165C),
164    (0x04CA, 0x4006),
165    (0x0BDA, 0x2852),
166    (0x0BDA, 0x385A),
167    (0x0BDA, 0x4852),
168    (0x0BDA, 0xC852),
169    (0x0CB8, 0xC549),
170    # Realtek 8852BE
171    (0x0BDA, 0x887B),
172    (0x0CB8, 0xC559),
173    (0x13D3, 0x3571),
174    # Realtek 8852CE
175    (0x04C5, 0x1675),
176    (0x04CA, 0x4007),
177    (0x0CB8, 0xC558),
178    (0x13D3, 0x3586),
179    (0x13D3, 0x3587),
180    (0x13D3, 0x3592),
181}
182
183# -----------------------------------------------------------------------------
184# HCI Commands
185# -----------------------------------------------------------------------------
186HCI_RTK_READ_ROM_VERSION_COMMAND = hci_vendor_command_op_code(0x6D)
187HCI_RTK_DOWNLOAD_COMMAND = hci_vendor_command_op_code(0x20)
188HCI_RTK_DROP_FIRMWARE_COMMAND = hci_vendor_command_op_code(0x66)
189HCI_Command.register_commands(globals())
190
191
192@HCI_Command.command(return_parameters_fields=[("status", STATUS_SPEC), ("version", 1)])
193class HCI_RTK_Read_ROM_Version_Command(HCI_Command):
194    pass
195
196
197@HCI_Command.command(
198    fields=[("index", 1), ("payload", RTK_FRAGMENT_LENGTH)],
199    return_parameters_fields=[("status", STATUS_SPEC), ("index", 1)],
200)
201class HCI_RTK_Download_Command(HCI_Command):
202    pass
203
204
205@HCI_Command.command()
206class HCI_RTK_Drop_Firmware_Command(HCI_Command):
207    pass
208
209
210# -----------------------------------------------------------------------------
211class Firmware:
212    def __init__(self, firmware):
213        extension_sig = bytes([0x51, 0x04, 0xFD, 0x77])
214
215        if not firmware.startswith(RTK_EPATCH_SIGNATURE):
216            raise RtkFirmwareError("Firmware does not start with epatch signature")
217
218        if not firmware.endswith(extension_sig):
219            raise RtkFirmwareError("Firmware does not end with extension sig")
220
221        # The firmware should start with a 14 byte header.
222        epatch_header_size = 14
223        if len(firmware) < epatch_header_size:
224            raise RtkFirmwareError("Firmware too short")
225
226        # Look for the "project ID", starting from the end.
227        offset = len(firmware) - len(extension_sig)
228        project_id = -1
229        while offset >= epatch_header_size:
230            length, opcode = firmware[offset - 2 : offset]
231            offset -= 2
232
233            if opcode == 0xFF:
234                # End
235                break
236
237            if length == 0:
238                raise RtkFirmwareError("Invalid 0-length instruction")
239
240            if opcode == 0 and length == 1:
241                project_id = firmware[offset - 1]
242                break
243
244            offset -= length
245
246        if project_id < 0:
247            raise RtkFirmwareError("Project ID not found")
248
249        self.project_id = project_id
250
251        # Read the patch tables info.
252        self.version, num_patches = struct.unpack("<IH", firmware[8:14])
253        self.patches = []
254
255        # The patches tables are laid out as:
256        # <ChipID_1><ChipID_2>...<ChipID_N>  (16 bits each)
257        # <PatchLength_1><PatchLength_2>...<PatchLength_N> (16 bits each)
258        # <PatchOffset_1><PatchOffset_2>...<PatchOffset_N> (32 bits each)
259        if epatch_header_size + 8 * num_patches > len(firmware):
260            raise RtkFirmwareError("Firmware too short")
261        chip_id_table_offset = epatch_header_size
262        patch_length_table_offset = chip_id_table_offset + 2 * num_patches
263        patch_offset_table_offset = chip_id_table_offset + 4 * num_patches
264        for patch_index in range(num_patches):
265            chip_id_offset = chip_id_table_offset + 2 * patch_index
266            (chip_id,) = struct.unpack_from("<H", firmware, chip_id_offset)
267            (patch_length,) = struct.unpack_from(
268                "<H", firmware, patch_length_table_offset + 2 * patch_index
269            )
270            (patch_offset,) = struct.unpack_from(
271                "<I", firmware, patch_offset_table_offset + 4 * patch_index
272            )
273            if patch_offset + patch_length > len(firmware):
274                raise RtkFirmwareError("Firmware too short")
275
276            # Get the SVN version for the patch
277            (svn_version,) = struct.unpack_from(
278                "<I", firmware, patch_offset + patch_length - 8
279            )
280
281            # Create a payload with the patch, replacing the last 4 bytes with
282            # the firmware version.
283            self.patches.append(
284                (
285                    chip_id,
286                    firmware[patch_offset : patch_offset + patch_length - 4]
287                    + struct.pack("<I", self.version),
288                    svn_version,
289                )
290            )
291
292
293class Driver(common.Driver):
294    @dataclass
295    class DriverInfo:
296        rom: int
297        hci: Tuple[int, int]
298        config_needed: bool
299        has_rom_version: bool
300        has_msft_ext: bool = False
301        fw_name: str = ""
302        config_name: str = ""
303
304    POST_RESET_DELAY: float = 0.2
305
306    DRIVER_INFOS = [
307        # 8723A
308        DriverInfo(
309            rom=RTK_ROM_LMP_8723A,
310            hci=(0x0B, 0x06),
311            config_needed=False,
312            has_rom_version=False,
313            fw_name="rtl8723a_fw.bin",
314            config_name="",
315        ),
316        # 8723B
317        DriverInfo(
318            rom=RTK_ROM_LMP_8723B,
319            hci=(0x0B, 0x06),
320            config_needed=False,
321            has_rom_version=True,
322            fw_name="rtl8723b_fw.bin",
323            config_name="rtl8723b_config.bin",
324        ),
325        # 8723D
326        DriverInfo(
327            rom=RTK_ROM_LMP_8723B,
328            hci=(0x0D, 0x08),
329            config_needed=True,
330            has_rom_version=True,
331            fw_name="rtl8723d_fw.bin",
332            config_name="rtl8723d_config.bin",
333        ),
334        # 8821A
335        DriverInfo(
336            rom=RTK_ROM_LMP_8821A,
337            hci=(0x0A, 0x06),
338            config_needed=False,
339            has_rom_version=True,
340            fw_name="rtl8821a_fw.bin",
341            config_name="rtl8821a_config.bin",
342        ),
343        # 8821C
344        DriverInfo(
345            rom=RTK_ROM_LMP_8821A,
346            hci=(0x0C, 0x08),
347            config_needed=False,
348            has_rom_version=True,
349            has_msft_ext=True,
350            fw_name="rtl8821c_fw.bin",
351            config_name="rtl8821c_config.bin",
352        ),
353        # 8761A
354        DriverInfo(
355            rom=RTK_ROM_LMP_8761A,
356            hci=(0x0A, 0x06),
357            config_needed=False,
358            has_rom_version=True,
359            fw_name="rtl8761a_fw.bin",
360            config_name="rtl8761a_config.bin",
361        ),
362        # 8761BU
363        DriverInfo(
364            rom=RTK_ROM_LMP_8761A,
365            hci=(0x0B, 0x0A),
366            config_needed=False,
367            has_rom_version=True,
368            fw_name="rtl8761bu_fw.bin",
369            config_name="rtl8761bu_config.bin",
370        ),
371        # 8822C
372        DriverInfo(
373            rom=RTK_ROM_LMP_8822B,
374            hci=(0x0C, 0x0A),
375            config_needed=False,
376            has_rom_version=True,
377            has_msft_ext=True,
378            fw_name="rtl8822cu_fw.bin",
379            config_name="rtl8822cu_config.bin",
380        ),
381        # 8822B
382        DriverInfo(
383            rom=RTK_ROM_LMP_8822B,
384            hci=(0x0B, 0x07),
385            config_needed=True,
386            has_rom_version=True,
387            has_msft_ext=True,
388            fw_name="rtl8822b_fw.bin",
389            config_name="rtl8822b_config.bin",
390        ),
391        # 8852A
392        DriverInfo(
393            rom=RTK_ROM_LMP_8852A,
394            hci=(0x0A, 0x0B),
395            config_needed=False,
396            has_rom_version=True,
397            has_msft_ext=True,
398            fw_name="rtl8852au_fw.bin",
399            config_name="rtl8852au_config.bin",
400        ),
401        # 8852B
402        DriverInfo(
403            rom=RTK_ROM_LMP_8852A,
404            hci=(0xB, 0xB),
405            config_needed=False,
406            has_rom_version=True,
407            has_msft_ext=True,
408            fw_name="rtl8852bu_fw.bin",
409            config_name="rtl8852bu_config.bin",
410        ),
411        # 8852C
412        DriverInfo(
413            rom=RTK_ROM_LMP_8852A,
414            hci=(0x0C, 0x0C),
415            config_needed=False,
416            has_rom_version=True,
417            has_msft_ext=True,
418            fw_name="rtl8852cu_fw.bin",
419            config_name="rtl8852cu_config.bin",
420        ),
421    ]
422
423    POST_DROP_DELAY = 0.2
424
425    @staticmethod
426    def find_driver_info(hci_version, hci_subversion, lmp_subversion):
427        for driver_info in Driver.DRIVER_INFOS:
428            if driver_info.rom == lmp_subversion and driver_info.hci == (
429                hci_subversion,
430                hci_version,
431            ):
432                return driver_info
433
434        return None
435
436    @staticmethod
437    def find_binary_path(file_name):
438        # First check if an environment variable is set
439        if RTK_FIRMWARE_DIR_ENV in os.environ:
440            if (
441                path := pathlib.Path(os.environ[RTK_FIRMWARE_DIR_ENV]) / file_name
442            ).is_file():
443                logger.debug(f"{file_name} found in env dir")
444                return path
445
446            # When the environment variable is set, don't look elsewhere
447            return None
448
449        # Then, look where the firmware download tool writes by default
450        if (path := rtk_firmware_dir() / file_name).is_file():
451            logger.debug(f"{file_name} found in project data dir")
452            return path
453
454        # Then, look in the package's driver directory
455        if (path := pathlib.Path(__file__).parent / "rtk_fw" / file_name).is_file():
456            logger.debug(f"{file_name} found in package dir")
457            return path
458
459        # On Linux, check the system's FW directory
460        if (
461            platform.system() == "Linux"
462            and (path := pathlib.Path(RTK_LINUX_FIRMWARE_DIR) / file_name).is_file()
463        ):
464            logger.debug(f"{file_name} found in Linux system FW dir")
465            return path
466
467        # Finally look in the current directory
468        if (path := pathlib.Path.cwd() / file_name).is_file():
469            logger.debug(f"{file_name} found in CWD")
470            return path
471
472        return None
473
474    @staticmethod
475    def check(host):
476        if not host.hci_metadata:
477            logger.debug("USB metadata not found")
478            return False
479
480        if host.hci_metadata.get('driver') == 'rtk':
481            # Forced driver
482            return True
483
484        vendor_id = host.hci_metadata.get("vendor_id")
485        product_id = host.hci_metadata.get("product_id")
486        if vendor_id is None or product_id is None:
487            logger.debug("USB metadata not sufficient")
488            return False
489
490        if (vendor_id, product_id) not in RTK_USB_PRODUCTS:
491            logger.debug(
492                f"USB device ({vendor_id:04X}, {product_id:04X}) " "not in known list"
493            )
494            return False
495
496        return True
497
498    @classmethod
499    async def driver_info_for_host(cls, host):
500        try:
501            await host.send_command(
502                HCI_Reset_Command(),
503                check_result=True,
504                response_timeout=cls.POST_RESET_DELAY,
505            )
506            host.ready = True  # Needed to let the host know the controller is ready.
507        except asyncio.exceptions.TimeoutError:
508            logger.warning("timeout waiting for hci reset, retrying")
509            await host.send_command(HCI_Reset_Command(), check_result=True)
510            host.ready = True
511
512        command = HCI_Read_Local_Version_Information_Command()
513        response = await host.send_command(command, check_result=True)
514        if response.command_opcode != command.op_code:
515            logger.error("failed to probe local version information")
516            return None
517
518        local_version = response.return_parameters
519
520        logger.debug(
521            f"looking for a driver: 0x{local_version.lmp_subversion:04X} "
522            f"(0x{local_version.hci_version:02X}, "
523            f"0x{local_version.hci_subversion:04X})"
524        )
525
526        driver_info = cls.find_driver_info(
527            local_version.hci_version,
528            local_version.hci_subversion,
529            local_version.lmp_subversion,
530        )
531        if driver_info is None:
532            # TODO: it seems that the Linux driver will send command (0x3f, 0x66)
533            # in this case and then re-read the local version, then re-match.
534            logger.debug("firmware already loaded or no known driver for this device")
535
536        return driver_info
537
538    @classmethod
539    async def for_host(cls, host, force=False):
540        # Check that a driver is needed for this host
541        if not force and not cls.check(host):
542            return None
543
544        # Get the driver info
545        driver_info = await cls.driver_info_for_host(host)
546        if driver_info is None:
547            return None
548
549        # Load the firmware
550        firmware_path = cls.find_binary_path(driver_info.fw_name)
551        if not firmware_path:
552            logger.warning(f"Firmware file {driver_info.fw_name} not found")
553            logger.warning("See https://google.github.io/bumble/drivers/realtek.html")
554            return None
555        with open(firmware_path, "rb") as firmware_file:
556            firmware = firmware_file.read()
557
558        # Load the config
559        config = None
560        if driver_info.config_name:
561            config_path = cls.find_binary_path(driver_info.config_name)
562            if config_path:
563                with open(config_path, "rb") as config_file:
564                    config = config_file.read()
565        if driver_info.config_needed and not config:
566            logger.warning("Config needed, but no config file available")
567            return None
568
569        return cls(host, driver_info, firmware, config)
570
571    def __init__(self, host, driver_info, firmware, config):
572        self.host = weakref.proxy(host)
573        self.driver_info = driver_info
574        self.firmware = firmware
575        self.config = config
576
577    @staticmethod
578    async def drop_firmware(host):
579        host.send_hci_packet(HCI_RTK_Drop_Firmware_Command())
580
581        # Wait for the command to be effective (no response is sent)
582        await asyncio.sleep(Driver.POST_DROP_DELAY)
583
584    async def download_for_rtl8723a(self):
585        # Check that the firmware image does not include an epatch signature.
586        if RTK_EPATCH_SIGNATURE in self.firmware:
587            logger.warning(
588                "epatch signature found in firmware, it is probably the wrong firmware"
589            )
590            return
591
592        # TODO: load the firmware
593
594    async def download_for_rtl8723b(self):
595        if self.driver_info.has_rom_version:
596            response = await self.host.send_command(
597                HCI_RTK_Read_ROM_Version_Command(), check_result=True
598            )
599            if response.return_parameters.status != HCI_SUCCESS:
600                logger.warning("can't get ROM version")
601                return
602            rom_version = response.return_parameters.version
603            logger.debug(f"ROM version before download: {rom_version:04X}")
604        else:
605            rom_version = 0
606
607        firmware = Firmware(self.firmware)
608        logger.debug(f"firmware: project_id=0x{firmware.project_id:04X}")
609        for patch in firmware.patches:
610            if patch[0] == rom_version + 1:
611                logger.debug(f"using patch {patch[0]}")
612                break
613        else:
614            logger.warning("no valid patch found for rom version {rom_version}")
615            return
616
617        # Append the config if there is one.
618        if self.config:
619            payload = patch[1] + self.config
620        else:
621            payload = patch[1]
622
623        # Download the payload, one fragment at a time.
624        fragment_count = math.ceil(len(payload) / RTK_FRAGMENT_LENGTH)
625        for fragment_index in range(fragment_count):
626            # NOTE: the Linux driver somehow adds 1 to the index after it wraps around.
627            # That's odd, but we"ll do the same here.
628            download_index = fragment_index & 0x7F
629            if download_index >= 0x80:
630                download_index += 1
631            if fragment_index == fragment_count - 1:
632                download_index |= 0x80  # End marker.
633            fragment_offset = fragment_index * RTK_FRAGMENT_LENGTH
634            fragment = payload[fragment_offset : fragment_offset + RTK_FRAGMENT_LENGTH]
635            logger.debug(f"downloading fragment {fragment_index}")
636            await self.host.send_command(
637                HCI_RTK_Download_Command(
638                    index=download_index, payload=fragment, check_result=True
639                )
640            )
641
642        logger.debug("download complete!")
643
644        # Read the version again
645        response = await self.host.send_command(
646            HCI_RTK_Read_ROM_Version_Command(), check_result=True
647        )
648        if response.return_parameters.status != HCI_SUCCESS:
649            logger.warning("can't get ROM version")
650        else:
651            rom_version = response.return_parameters.version
652            logger.debug(f"ROM version after download: {rom_version:04X}")
653
654    async def download_firmware(self):
655        if self.driver_info.rom == RTK_ROM_LMP_8723A:
656            return await self.download_for_rtl8723a()
657
658        if self.driver_info.rom in (
659            RTK_ROM_LMP_8723B,
660            RTK_ROM_LMP_8821A,
661            RTK_ROM_LMP_8761A,
662            RTK_ROM_LMP_8822B,
663            RTK_ROM_LMP_8852A,
664        ):
665            return await self.download_for_rtl8723b()
666
667        raise RtkFirmwareError("ROM not supported")
668
669    async def init_controller(self):
670        await self.download_firmware()
671        await self.host.send_command(HCI_Reset_Command(), check_result=True)
672        logger.info(f"loaded FW image {self.driver_info.fw_name}")
673
674
675def rtk_firmware_dir() -> pathlib.Path:
676    """
677    Returns:
678        A path to a subdir of the project data dir for Realtek firmware.
679         The directory is created if it doesn't exist.
680    """
681    from bumble.drivers import project_data_dir
682
683    p = project_data_dir() / "firmware" / "realtek"
684    p.mkdir(parents=True, exist_ok=True)
685    return p
686