// Copyright 2023 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. //! Devices and connections to them #[cfg(feature = "unstable_extended_adv")] use crate::wrapper::{ hci::packets::{ self, AdvertisingEventProperties, AdvertisingFilterPolicy, Enable, EnabledSet, FragmentPreference, LeSetAdvertisingSetRandomAddressBuilder, LeSetExtendedAdvertisingDataBuilder, LeSetExtendedAdvertisingEnableBuilder, LeSetExtendedAdvertisingParametersBuilder, Operation, OwnAddressType, PeerAddressType, PrimaryPhyType, SecondaryPhyType, }, ConversionError, }; use crate::{ adv::AdvertisementDataBuilder, wrapper::{ core::AdvertisingData, gatt_client::{ProfileServiceProxy, ServiceProxy}, hci::{ packets::{Command, ErrorCode, Event}, Address, HciCommand, WithPacketType, }, host::Host, l2cap::LeConnectionOrientedChannel, transport::{Sink, Source}, ClosureCallback, PyDictExt, PyObjectExt, }, }; use pyo3::{ exceptions::PyException, intern, types::{PyDict, PyModule}, IntoPy, PyErr, PyObject, PyResult, Python, ToPyObject, }; use pyo3_asyncio::tokio::into_future; use std::path; #[cfg(test)] mod tests; /// Represents the various properties of some device pub struct DeviceConfiguration(PyObject); impl DeviceConfiguration { /// Creates a new configuration, letting the internal Python object set all the defaults pub fn new() -> PyResult { Python::with_gil(|py| { PyModule::import(py, intern!(py, "bumble.device"))? .getattr(intern!(py, "DeviceConfiguration"))? .call0() .map(|any| Self(any.into())) }) } /// Creates a new configuration from the specified file pub fn load_from_file(&mut self, device_config: &path::Path) -> PyResult<()> { Python::with_gil(|py| { self.0 .call_method1(py, intern!(py, "load_from_file"), (device_config,)) }) .map(|_| ()) } } impl ToPyObject for DeviceConfiguration { fn to_object(&self, _py: Python<'_>) -> PyObject { self.0.clone() } } /// Used for tracking what advertising state a device might be in #[derive(PartialEq)] enum AdvertisingStatus { AdvertisingLegacy, AdvertisingExtended, NotAdvertising, } /// A device that can send/receive HCI frames. pub struct Device { obj: PyObject, advertising_status: AdvertisingStatus, } impl Device { #[cfg(feature = "unstable_extended_adv")] const ADVERTISING_HANDLE_EXTENDED: u8 = 0x00; /// Creates a Device. When optional arguments are not specified, the Python object specifies the /// defaults. pub fn new( name: Option<&str>, address: Option
, config: Option, host: Option, generic_access_service: Option, ) -> PyResult { Python::with_gil(|py| { let kwargs = PyDict::new(py); kwargs.set_opt_item("name", name)?; kwargs.set_opt_item("address", address)?; kwargs.set_opt_item("config", config)?; kwargs.set_opt_item("host", host)?; kwargs.set_opt_item("generic_access_service", generic_access_service)?; PyModule::import(py, intern!(py, "bumble.device"))? .getattr(intern!(py, "Device"))? .call((), Some(kwargs)) .map(|any| Self { obj: any.into(), advertising_status: AdvertisingStatus::NotAdvertising, }) }) } /// Create a Device per the provided file configured to communicate with a controller through an HCI source/sink pub fn from_config_file_with_hci( device_config: &path::Path, source: Source, sink: Sink, ) -> PyResult { Python::with_gil(|py| { PyModule::import(py, intern!(py, "bumble.device"))? .getattr(intern!(py, "Device"))? .call_method1( intern!(py, "from_config_file_with_hci"), (device_config, source.0, sink.0), ) .map(|any| Self { obj: any.into(), advertising_status: AdvertisingStatus::NotAdvertising, }) }) } /// Create a Device configured to communicate with a controller through an HCI source/sink pub fn with_hci(name: &str, address: Address, source: Source, sink: Sink) -> PyResult { Python::with_gil(|py| { PyModule::import(py, intern!(py, "bumble.device"))? .getattr(intern!(py, "Device"))? .call_method1(intern!(py, "with_hci"), (name, address.0, source.0, sink.0)) .map(|any| Self { obj: any.into(), advertising_status: AdvertisingStatus::NotAdvertising, }) }) } /// Sends an HCI command on this Device, returning the command's event result. /// /// When `check_result` is `true`, then an `Err` will be returned if the controller's response /// did not have an event code of "success". pub async fn send_command(&self, command: Command, check_result: bool) -> PyResult { let bumble_hci_command = HciCommand::try_from(command)?; Python::with_gil(|py| { self.obj .call_method1( py, intern!(py, "send_command"), (bumble_hci_command, check_result), ) .and_then(|coroutine| into_future(coroutine.as_ref(py))) })? .await .and_then(|event| { Python::with_gil(|py| { let py_bytes = event.call_method0(py, intern!(py, "__bytes__"))?; let bytes: &[u8] = py_bytes.extract(py)?; let event = Event::parse_with_packet_type(bytes) .map_err(|e| PyErr::new::(e.to_string()))?; Ok(event) }) }) } /// Turn the device on pub async fn power_on(&self) -> PyResult<()> { Python::with_gil(|py| { self.obj .call_method0(py, intern!(py, "power_on")) .and_then(|coroutine| into_future(coroutine.as_ref(py))) })? .await .map(|_| ()) } /// Connect to a peer pub async fn connect(&self, peer_addr: &str) -> PyResult { Python::with_gil(|py| { self.obj .call_method1(py, intern!(py, "connect"), (peer_addr,)) .and_then(|coroutine| into_future(coroutine.as_ref(py))) })? .await .map(Connection) } /// Register a callback to be called for each incoming connection. pub fn on_connection( &mut self, callback: impl Fn(Python, Connection) -> PyResult<()> + Send + 'static, ) -> PyResult<()> { let boxed = ClosureCallback::new(move |py, args, _kwargs| { callback(py, Connection(args.get_item(0)?.into())) }); Python::with_gil(|py| { self.obj .call_method1(py, intern!(py, "add_listener"), ("connection", boxed)) }) .map(|_| ()) } /// Start scanning pub async fn start_scanning(&self, filter_duplicates: bool) -> PyResult<()> { Python::with_gil(|py| { let kwargs = PyDict::new(py); kwargs.set_item("filter_duplicates", filter_duplicates)?; self.obj .call_method(py, intern!(py, "start_scanning"), (), Some(kwargs)) .and_then(|coroutine| into_future(coroutine.as_ref(py))) })? .await .map(|_| ()) } /// Register a callback to be called for each advertisement pub fn on_advertisement( &mut self, callback: impl Fn(Python, Advertisement) -> PyResult<()> + Send + 'static, ) -> PyResult<()> { let boxed = ClosureCallback::new(move |py, args, _kwargs| { callback(py, Advertisement(args.get_item(0)?.into())) }); Python::with_gil(|py| { self.obj .call_method1(py, intern!(py, "add_listener"), ("advertisement", boxed)) }) .map(|_| ()) } /// Set the advertisement data to be used when [Device::start_advertising] is called. pub fn set_advertising_data(&mut self, adv_data: AdvertisementDataBuilder) -> PyResult<()> { Python::with_gil(|py| { self.obj.setattr( py, intern!(py, "advertising_data"), adv_data.into_bytes().as_slice(), ) }) .map(|_| ()) } /// Returns the host used by the device, if any pub fn host(&mut self) -> PyResult> { Python::with_gil(|py| { self.obj .getattr(py, intern!(py, "host")) .map(|obj| obj.into_option(Host::from)) }) } /// Start advertising the data set with [Device.set_advertisement]. /// /// When `auto_restart` is set to `true`, then the device will automatically restart advertising /// when a connected device is disconnected. pub async fn start_advertising(&mut self, auto_restart: bool) -> PyResult<()> { if self.advertising_status == AdvertisingStatus::AdvertisingExtended { return Err(PyErr::new::("Already advertising in extended mode. Stop the existing extended advertisement to start a legacy advertisement.")); } // Bumble allows (and currently ignores) calling `start_advertising` when already // advertising. Because that behavior may change in the future, we continue to delegate the // handling to bumble. Python::with_gil(|py| { let kwargs = PyDict::new(py); kwargs.set_item("auto_restart", auto_restart)?; self.obj .call_method(py, intern!(py, "start_advertising"), (), Some(kwargs)) .and_then(|coroutine| into_future(coroutine.as_ref(py))) })? .await .map(|_| ())?; self.advertising_status = AdvertisingStatus::AdvertisingLegacy; Ok(()) } /// Start advertising the data set in extended mode, replacing any existing extended adv. The /// advertisement will be non-connectable. /// /// Fails if the device is already advertising in legacy mode. #[cfg(feature = "unstable_extended_adv")] pub async fn start_advertising_extended( &mut self, adv_data: AdvertisementDataBuilder, ) -> PyResult<()> { // TODO: add tests when local controller object supports extended advertisement commands (github.com/google/bumble/pull/238) match self.advertising_status { AdvertisingStatus::AdvertisingLegacy => return Err(PyErr::new::("Already advertising in legacy mode. Stop the existing legacy advertisement to start an extended advertisement.")), // Stop the current extended advertisement before advertising with new data. // We could just issue an LeSetExtendedAdvertisingData command, but this approach // allows better future flexibility if `start_advertising_extended` were to change. AdvertisingStatus::AdvertisingExtended => self.stop_advertising_extended().await?, _ => {} } // set extended params let properties = AdvertisingEventProperties { connectable: 0, scannable: 0, directed: 0, high_duty_cycle: 0, legacy: 0, anonymous: 0, tx_power: 0, }; let extended_advertising_params_cmd = LeSetExtendedAdvertisingParametersBuilder { advertising_event_properties: properties, advertising_filter_policy: AdvertisingFilterPolicy::AllDevices, advertising_handle: Self::ADVERTISING_HANDLE_EXTENDED, advertising_sid: 0, advertising_tx_power: 0, own_address_type: OwnAddressType::RandomDeviceAddress, peer_address: default_ignored_peer_address(), peer_address_type: PeerAddressType::PublicDeviceOrIdentityAddress, primary_advertising_channel_map: 7, primary_advertising_interval_max: 200, primary_advertising_interval_min: 100, primary_advertising_phy: PrimaryPhyType::Le1m, scan_request_notification_enable: Enable::Disabled, secondary_advertising_max_skip: 0, secondary_advertising_phy: SecondaryPhyType::Le1m, }; self.send_command(extended_advertising_params_cmd.into(), true) .await?; // set random address let random_address: packets::Address = self.random_address()?.try_into().map_err(|e| match e { ConversionError::Python(pyerr) => pyerr, ConversionError::Native(e) => PyErr::new::(format!("{e:?}")), })?; let random_address_cmd = LeSetAdvertisingSetRandomAddressBuilder { advertising_handle: Self::ADVERTISING_HANDLE_EXTENDED, random_address, }; self.send_command(random_address_cmd.into(), true).await?; // set adv data let advertising_data_cmd = LeSetExtendedAdvertisingDataBuilder { advertising_data: adv_data.into_bytes(), advertising_handle: Self::ADVERTISING_HANDLE_EXTENDED, fragment_preference: FragmentPreference::ControllerMayFragment, operation: Operation::CompleteAdvertisement, }; self.send_command(advertising_data_cmd.into(), true).await?; // enable adv let extended_advertising_enable_cmd = LeSetExtendedAdvertisingEnableBuilder { enable: Enable::Enabled, enabled_sets: vec![EnabledSet { advertising_handle: Self::ADVERTISING_HANDLE_EXTENDED, duration: 0, max_extended_advertising_events: 0, }], }; self.send_command(extended_advertising_enable_cmd.into(), true) .await?; self.advertising_status = AdvertisingStatus::AdvertisingExtended; Ok(()) } /// Stop advertising. pub async fn stop_advertising(&mut self) -> PyResult<()> { Python::with_gil(|py| { self.obj .call_method0(py, intern!(py, "stop_advertising")) .and_then(|coroutine| into_future(coroutine.as_ref(py))) })? .await .map(|_| ())?; if self.advertising_status == AdvertisingStatus::AdvertisingLegacy { self.advertising_status = AdvertisingStatus::NotAdvertising; } Ok(()) } /// Stop advertising extended. #[cfg(feature = "unstable_extended_adv")] pub async fn stop_advertising_extended(&mut self) -> PyResult<()> { if AdvertisingStatus::AdvertisingExtended != self.advertising_status { return Ok(()); } // disable adv let extended_advertising_enable_cmd = LeSetExtendedAdvertisingEnableBuilder { enable: Enable::Disabled, enabled_sets: vec![EnabledSet { advertising_handle: Self::ADVERTISING_HANDLE_EXTENDED, duration: 0, max_extended_advertising_events: 0, }], }; self.send_command(extended_advertising_enable_cmd.into(), true) .await?; self.advertising_status = AdvertisingStatus::NotAdvertising; Ok(()) } /// Registers an L2CAP connection oriented channel server. When a client connects to the server, /// the `server` callback is passed a handle to the established channel. When optional arguments /// are not specified, the Python module specifies the defaults. pub fn register_l2cap_channel_server( &mut self, psm: u16, server: impl Fn(Python, LeConnectionOrientedChannel) -> PyResult<()> + Send + 'static, max_credits: Option, mtu: Option, mps: Option, ) -> PyResult<()> { Python::with_gil(|py| { let boxed = ClosureCallback::new(move |py, args, _kwargs| { server( py, LeConnectionOrientedChannel::from(args.get_item(0)?.into()), ) }); let kwargs = PyDict::new(py); kwargs.set_item("psm", psm)?; kwargs.set_item("server", boxed.into_py(py))?; kwargs.set_opt_item("max_credits", max_credits)?; kwargs.set_opt_item("mtu", mtu)?; kwargs.set_opt_item("mps", mps)?; self.obj.call_method( py, intern!(py, "register_l2cap_channel_server"), (), Some(kwargs), ) })?; Ok(()) } /// Gets the Device's `random_address` property pub fn random_address(&self) -> PyResult
{ Python::with_gil(|py| { self.obj .getattr(py, intern!(py, "random_address")) .map(Address) }) } } /// A connection to a remote device. pub struct Connection(PyObject); impl Connection { /// Open an L2CAP channel using this connection. When optional arguments are not specified, the /// Python module specifies the defaults. pub async fn open_l2cap_channel( &mut self, psm: u16, max_credits: Option, mtu: Option, mps: Option, ) -> PyResult { Python::with_gil(|py| { let kwargs = PyDict::new(py); kwargs.set_item("psm", psm)?; kwargs.set_opt_item("max_credits", max_credits)?; kwargs.set_opt_item("mtu", mtu)?; kwargs.set_opt_item("mps", mps)?; self.0 .call_method(py, intern!(py, "open_l2cap_channel"), (), Some(kwargs)) .and_then(|coroutine| into_future(coroutine.as_ref(py))) })? .await .map(LeConnectionOrientedChannel::from) } /// Disconnect from device with provided reason. When optional arguments are not specified, the /// Python module specifies the defaults. pub async fn disconnect(&mut self, reason: Option) -> PyResult<()> { Python::with_gil(|py| { let kwargs = PyDict::new(py); kwargs.set_opt_item("reason", reason)?; self.0 .call_method(py, intern!(py, "disconnect"), (), Some(kwargs)) .and_then(|coroutine| into_future(coroutine.as_ref(py))) })? .await .map(|_| ()) } /// Register a callback to be called on disconnection. pub fn on_disconnection( &mut self, callback: impl Fn(Python, ErrorCode) -> PyResult<()> + Send + 'static, ) -> PyResult<()> { let boxed = ClosureCallback::new(move |py, args, _kwargs| { callback(py, args.get_item(0)?.extract()?) }); Python::with_gil(|py| { self.0 .call_method1(py, intern!(py, "add_listener"), ("disconnection", boxed)) }) .map(|_| ()) } /// Returns some information about the connection as a [String]. pub fn debug_string(&self) -> PyResult { Python::with_gil(|py| { let str_obj = self.0.call_method0(py, intern!(py, "__str__"))?; str_obj.gil_ref(py).extract() }) } } /// The other end of a connection pub struct Peer(PyObject); impl Peer { /// Wrap a [Connection] in a Peer pub fn new(conn: Connection) -> PyResult { Python::with_gil(|py| { PyModule::import(py, intern!(py, "bumble.device"))? .getattr(intern!(py, "Peer"))? .call1((conn.0,)) .map(|obj| Self(obj.into())) }) } /// Populates the peer's cache of services. /// /// Returns the discovered services. pub async fn discover_services(&mut self) -> PyResult> { Python::with_gil(|py| { self.0 .call_method0(py, intern!(py, "discover_services")) .and_then(|coroutine| into_future(coroutine.as_ref(py))) })? .await .and_then(|list| { Python::with_gil(|py| { list.as_ref(py) .iter()? .map(|r| r.map(|h| ServiceProxy(h.to_object(py)))) .collect() }) }) } /// Returns a snapshot of the Services currently in the peer's cache pub fn services(&self) -> PyResult> { Python::with_gil(|py| { self.0 .getattr(py, intern!(py, "services"))? .as_ref(py) .iter()? .map(|r| r.map(|h| ServiceProxy(h.to_object(py)))) .collect() }) } /// Build a [ProfileServiceProxy] for the specified type. /// [Peer::discover_services] or some other means of populating the Peer's service cache must be /// called first, or the required service won't be found. pub fn create_service_proxy(&self) -> PyResult> { Python::with_gil(|py| { let module = py.import(P::PROXY_CLASS_MODULE)?; let class = module.getattr(P::PROXY_CLASS_NAME)?; self.0 .call_method1(py, intern!(py, "create_service_proxy"), (class,)) .map(|obj| obj.into_option(P::wrap)) }) } } /// A BLE advertisement pub struct Advertisement(PyObject); impl Advertisement { /// Address that sent the advertisement pub fn address(&self) -> PyResult
{ Python::with_gil(|py| self.0.getattr(py, intern!(py, "address")).map(Address)) } /// Returns true if the advertisement is connectable pub fn is_connectable(&self) -> PyResult { Python::with_gil(|py| { self.0 .getattr(py, intern!(py, "is_connectable"))? .extract::(py) }) } /// RSSI of the advertisement pub fn rssi(&self) -> PyResult { Python::with_gil(|py| self.0.getattr(py, intern!(py, "rssi"))?.extract::(py)) } /// Data in the advertisement pub fn data(&self) -> PyResult { Python::with_gil(|py| self.0.getattr(py, intern!(py, "data")).map(AdvertisingData)) } } /// Use this address when sending an HCI command that requires providing a peer address, but the /// command is such that the peer address will be ignored. /// /// Internal to bumble, this address might mean "any", but a packets::Address typically gets sent /// directly to a controller, so we don't have to worry about it. #[cfg(feature = "unstable_extended_adv")] fn default_ignored_peer_address() -> packets::Address { packets::Address::try_from(0x0000_0000_0000_u64).unwrap() }