// Copyright 2023 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. //! Drivers for Realtek controllers use crate::wrapper::{host::Host, PyObjectExt}; use pyo3::{intern, types::PyModule, PyObject, PyResult, Python, ToPyObject}; use pyo3_asyncio::tokio::into_future; pub use crate::internal::drivers::rtk::{Firmware, Patch}; /// Driver for a Realtek controller pub struct Driver(PyObject); impl Driver { /// Locate the driver for the provided host. pub async fn for_host(host: &Host, force: bool) -> PyResult> { Python::with_gil(|py| { PyModule::import(py, intern!(py, "bumble.drivers.rtk"))? .getattr(intern!(py, "Driver"))? .call_method1(intern!(py, "for_host"), (&host.obj, force)) .and_then(into_future) })? .await .map(|obj| obj.into_option(Self)) } /// Check if the host has a known driver. pub async fn check(host: &Host) -> PyResult { Python::with_gil(|py| { PyModule::import(py, intern!(py, "bumble.drivers.rtk"))? .getattr(intern!(py, "Driver"))? .call_method1(intern!(py, "check"), (&host.obj,)) .and_then(|obj| obj.extract::()) }) } /// Find the [DriverInfo] for the host, if one matches pub async fn driver_info_for_host(host: &Host) -> PyResult> { Python::with_gil(|py| { PyModule::import(py, intern!(py, "bumble.drivers.rtk"))? .getattr(intern!(py, "Driver"))? .call_method1(intern!(py, "driver_info_for_host"), (&host.obj,)) .and_then(into_future) })? .await .map(|obj| obj.into_option(DriverInfo)) } /// Send a command to the device to drop firmware pub async fn drop_firmware(host: &mut Host) -> PyResult<()> { Python::with_gil(|py| { PyModule::import(py, intern!(py, "bumble.drivers.rtk"))? .getattr(intern!(py, "Driver"))? .call_method1(intern!(py, "drop_firmware"), (&host.obj,)) .and_then(into_future) })? .await .map(|_| ()) } /// Load firmware onto the device. pub async fn download_firmware(&mut self) -> PyResult<()> { Python::with_gil(|py| { self.0 .call_method0(py, intern!(py, "download_firmware")) .and_then(|coroutine| into_future(coroutine.as_ref(py))) })? .await .map(|_| ()) } } /// Metadata about a known driver & applicable device pub struct DriverInfo(PyObject); impl DriverInfo { /// Returns a list of all drivers that Bumble knows how to handle. pub fn all_drivers() -> PyResult> { Python::with_gil(|py| { PyModule::import(py, intern!(py, "bumble.drivers.rtk"))? .getattr(intern!(py, "Driver"))? .getattr(intern!(py, "DRIVER_INFOS"))? .iter()? .map(|r| r.map(|h| DriverInfo(h.to_object(py)))) .collect::>>() }) } /// The firmware file name to load from the filesystem, e.g. `foo_fw.bin`. pub fn firmware_name(&self) -> PyResult { Python::with_gil(|py| { self.0 .getattr(py, intern!(py, "fw_name"))? .as_ref(py) .extract::() }) } /// The config file name, if any, to load from the filesystem, e.g. `foo_config.bin`. pub fn config_name(&self) -> PyResult> { Python::with_gil(|py| { let obj = self.0.getattr(py, intern!(py, "config_name"))?; let handle = obj.as_ref(py); if handle.is_none() { Ok(None) } else { handle .extract::() .map(|s| if s.is_empty() { None } else { Some(s) }) } }) } /// Whether or not config is required. pub fn config_needed(&self) -> PyResult { Python::with_gil(|py| { self.0 .getattr(py, intern!(py, "config_needed"))? .as_ref(py) .extract::() }) } /// ROM id pub fn rom(&self) -> PyResult { Python::with_gil(|py| self.0.getattr(py, intern!(py, "rom"))?.as_ref(py).extract()) } }