// Copyright 2023 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. //! Realtek firmware tools use crate::{Download, Source}; use anyhow::anyhow; use bumble::wrapper::{ drivers::rtk::{Driver, DriverInfo, Firmware}, host::{DriverFactory, Host}, transport::Transport, }; use owo_colors::{colors::css, OwoColorize}; use pyo3::PyResult; use std::{fs, path}; pub(crate) async fn download(dl: Download) -> PyResult<()> { let data_dir = dl .output_dir .or_else(|| { directories::ProjectDirs::from("com", "google", "bumble") .map(|pd| pd.data_local_dir().join("firmware").join("realtek")) }) .unwrap_or_else(|| { eprintln!("Could not determine standard data directory"); path::PathBuf::from(".") }); fs::create_dir_all(&data_dir)?; let (base_url, uses_bin_suffix) = match dl.source { Source::LinuxKernel => ("https://git.kernel.org/pub/scm/linux/kernel/git/firmware/linux-firmware.git/plain/rtl_bt", true), Source::RealtekOpensource => ("https://github.com/Realtek-OpenSource/android_hardware_realtek/raw/rtk1395/bt/rtkbt/Firmware/BT", false), Source::LinuxFromScratch => ("https://anduin.linuxfromscratch.org/sources/linux-firmware/rtl_bt", true), }; println!("Downloading"); println!("{} {}", "FROM:".green(), base_url); println!("{} {}", "TO:".green(), data_dir.to_string_lossy()); let url_for_file = |file_name: &str| { let url_suffix = if uses_bin_suffix { file_name } else { file_name.trim_end_matches(".bin") }; let mut url = base_url.to_string(); url.push('/'); url.push_str(url_suffix); url }; let to_download = if let Some(single) = dl.single { vec![( format!("{single}_fw.bin"), Some(format!("{single}_config.bin")), false, )] } else { DriverInfo::all_drivers()? .iter() .map(|di| Ok((di.firmware_name()?, di.config_name()?, di.config_needed()?))) .collect::>>()? }; let client = SimpleClient::new(); for (fw_filename, config_filename, config_needed) in to_download { println!("{}", "---".yellow()); let fw_path = data_dir.join(&fw_filename); let config_path = config_filename.as_ref().map(|f| data_dir.join(f)); if fw_path.exists() && !dl.overwrite { println!( "{}", format!("{} already exists, skipping", fw_path.to_string_lossy()) .fg::() ); continue; } if let Some(cp) = config_path.as_ref() { if cp.exists() && !dl.overwrite { println!( "{}", format!("{} already exists, skipping", cp.to_string_lossy()) .fg::() ); continue; } } let fw_contents = match client.get(&url_for_file(&fw_filename)).await { Ok(data) => { println!("Downloaded {}: {} bytes", fw_filename, data.len()); data } Err(e) => { eprintln!( "{} {} {:?}", "Failed to download".red(), fw_filename.red(), e ); continue; } }; let config_contents = if let Some(cn) = &config_filename { match client.get(&url_for_file(cn)).await { Ok(data) => { println!("Downloaded {}: {} bytes", cn, data.len()); Some(data) } Err(e) => { if config_needed { eprintln!("{} {} {:?}", "Failed to download".red(), cn.red(), e); continue; } else { eprintln!( "{}", format!("No config available as {cn}").fg::() ); None } } } } else { None }; fs::write(&fw_path, &fw_contents)?; if !dl.no_parse && config_filename.is_some() { println!("{} {}", "Parsing:".cyan(), &fw_filename); match Firmware::parse(&fw_contents).map_err(|e| anyhow!("Parse error: {:?}", e)) { Ok(fw) => dump_firmware_desc(&fw), Err(e) => { eprintln!( "{} {:?}", "Could not parse firmware:".fg::(), e ); } } } if let Some((cp, cd)) = config_path .as_ref() .and_then(|p| config_contents.map(|c| (p, c))) { fs::write(cp, &cd)?; } } Ok(()) } pub(crate) fn parse(firmware_path: &path::Path) -> PyResult<()> { let contents = fs::read(firmware_path)?; let fw = Firmware::parse(&contents) // squish the error into a string to avoid the error type requiring that the input be // 'static .map_err(|e| anyhow!("Parse error: {:?}", e))?; dump_firmware_desc(&fw); Ok(()) } pub(crate) async fn info(transport: &str, force: bool) -> PyResult<()> { let transport = Transport::open(transport).await?; let mut host = Host::new(transport.source()?, transport.sink()?).await?; host.reset(DriverFactory::None).await?; if !force && !Driver::check(&host).await? { println!("USB device not supported by this RTK driver"); } else if let Some(driver_info) = Driver::driver_info_for_host(&host).await? { println!("Driver:"); println!(" {:10} {:04X}", "ROM:", driver_info.rom()?); println!(" {:10} {}", "Firmware:", driver_info.firmware_name()?); println!( " {:10} {}", "Config:", driver_info.config_name()?.unwrap_or_default() ); } else { println!("Firmware already loaded or no supported driver for this device.") } Ok(()) } pub(crate) async fn load(transport: &str, force: bool) -> PyResult<()> { let transport = Transport::open(transport).await?; let mut host = Host::new(transport.source()?, transport.sink()?).await?; host.reset(DriverFactory::None).await?; match Driver::for_host(&host, force).await? { None => { eprintln!("Firmware already loaded or no supported driver for this device."); } Some(mut d) => d.download_firmware().await?, }; Ok(()) } pub(crate) async fn drop(transport: &str) -> PyResult<()> { let transport = Transport::open(transport).await?; let mut host = Host::new(transport.source()?, transport.sink()?).await?; host.reset(DriverFactory::None).await?; Driver::drop_firmware(&mut host).await?; Ok(()) } fn dump_firmware_desc(fw: &Firmware) { println!( "Firmware: version=0x{:08X} project_id=0x{:04X}", fw.version(), fw.project_id() ); for p in fw.patches() { println!( " Patch: chip_id=0x{:04X}, {} bytes, SVN Version={:08X}", p.chip_id(), p.contents().len(), p.svn_version() ) } } struct SimpleClient { client: reqwest::Client, } impl SimpleClient { fn new() -> Self { Self { client: reqwest::Client::new(), } } async fn get(&self, url: &str) -> anyhow::Result> { let resp = self.client.get(url).send().await?; if !resp.status().is_success() { return Err(anyhow!("Bad status: {}", resp.status())); } let bytes = resp.bytes().await?; Ok(bytes.as_ref().to_vec()) } }