1 // Copyright 2023 Google LLC 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 //! HCI 16 17 // re-export here, and internal usages of these imports should refer to this mod, not the internal 18 // mod 19 pub(crate) use crate::internal::hci::WithPacketType; 20 pub use crate::internal::hci::{packets, Error, Packet}; 21 22 use crate::wrapper::{ 23 hci::packets::{AddressType, Command, ErrorCode}, 24 ConversionError, 25 }; 26 use itertools::Itertools as _; 27 use pyo3::{ 28 exceptions::PyException, intern, types::PyModule, FromPyObject, IntoPy, PyAny, PyErr, PyObject, 29 PyResult, Python, ToPyObject, 30 }; 31 32 /// Provides helpers for interacting with HCI 33 pub struct HciConstant; 34 35 impl HciConstant { 36 /// Human-readable error name error_name(status: ErrorCode) -> PyResult<String>37 pub fn error_name(status: ErrorCode) -> PyResult<String> { 38 Python::with_gil(|py| { 39 PyModule::import(py, intern!(py, "bumble.hci"))? 40 .getattr(intern!(py, "HCI_Constant"))? 41 .call_method1(intern!(py, "error_name"), (status.to_object(py),))? 42 .extract() 43 }) 44 } 45 } 46 47 /// Bumble's representation of an HCI command. 48 pub(crate) struct HciCommand(pub(crate) PyObject); 49 50 impl HciCommand { from_bytes(bytes: &[u8]) -> PyResult<Self>51 fn from_bytes(bytes: &[u8]) -> PyResult<Self> { 52 Python::with_gil(|py| { 53 PyModule::import(py, intern!(py, "bumble.hci"))? 54 .getattr(intern!(py, "HCI_Command"))? 55 .call_method1(intern!(py, "from_bytes"), (bytes,)) 56 .map(|obj| Self(obj.to_object(py))) 57 }) 58 } 59 } 60 61 impl TryFrom<Command> for HciCommand { 62 type Error = PyErr; 63 try_from(value: Command) -> Result<Self, Self::Error>64 fn try_from(value: Command) -> Result<Self, Self::Error> { 65 HciCommand::from_bytes(&value.to_vec_with_packet_type()) 66 } 67 } 68 69 impl IntoPy<PyObject> for HciCommand { into_py(self, _py: Python<'_>) -> PyObject70 fn into_py(self, _py: Python<'_>) -> PyObject { 71 self.0 72 } 73 } 74 75 /// A Bluetooth address 76 #[derive(Clone)] 77 pub struct Address(pub(crate) PyObject); 78 79 impl Address { 80 /// Creates a new [Address] object. new(address: &str, address_type: AddressType) -> PyResult<Self>81 pub fn new(address: &str, address_type: AddressType) -> PyResult<Self> { 82 Python::with_gil(|py| { 83 PyModule::import(py, intern!(py, "bumble.device"))? 84 .getattr(intern!(py, "Address"))? 85 .call1((address, address_type)) 86 .map(|any| Self(any.into())) 87 }) 88 } 89 90 /// The type of address address_type(&self) -> PyResult<AddressType>91 pub fn address_type(&self) -> PyResult<AddressType> { 92 Python::with_gil(|py| { 93 self.0 94 .getattr(py, intern!(py, "address_type"))? 95 .extract::<u8>(py)? 96 .try_into() 97 .map_err(|addr_type| { 98 PyErr::new::<PyException, _>(format!( 99 "Failed to convert {addr_type} to AddressType" 100 )) 101 }) 102 }) 103 } 104 105 /// True if the address is static is_static(&self) -> PyResult<bool>106 pub fn is_static(&self) -> PyResult<bool> { 107 Python::with_gil(|py| { 108 self.0 109 .getattr(py, intern!(py, "is_static"))? 110 .extract::<bool>(py) 111 }) 112 } 113 114 /// True if the address is resolvable is_resolvable(&self) -> PyResult<bool>115 pub fn is_resolvable(&self) -> PyResult<bool> { 116 Python::with_gil(|py| { 117 self.0 118 .getattr(py, intern!(py, "is_resolvable"))? 119 .extract::<bool>(py) 120 }) 121 } 122 123 /// Address bytes in _little-endian_ format as_le_bytes(&self) -> PyResult<Vec<u8>>124 pub fn as_le_bytes(&self) -> PyResult<Vec<u8>> { 125 Python::with_gil(|py| { 126 self.0 127 .call_method0(py, intern!(py, "to_bytes"))? 128 .extract::<Vec<u8>>(py) 129 }) 130 } 131 132 /// Address bytes as big-endian colon-separated hex as_hex(&self) -> PyResult<String>133 pub fn as_hex(&self) -> PyResult<String> { 134 self.as_le_bytes().map(|bytes| { 135 bytes 136 .into_iter() 137 .rev() 138 .map(|byte| hex::encode_upper([byte])) 139 .join(":") 140 }) 141 } 142 } 143 144 impl ToPyObject for Address { to_object(&self, _py: Python<'_>) -> PyObject145 fn to_object(&self, _py: Python<'_>) -> PyObject { 146 self.0.clone() 147 } 148 } 149 150 /// An error meaning that the u64 value did not represent a valid BT address. 151 #[derive(Debug)] 152 pub struct InvalidAddress(#[allow(unused)] u64); 153 154 impl TryInto<packets::Address> for Address { 155 type Error = ConversionError<InvalidAddress>; 156 try_into(self) -> Result<packets::Address, Self::Error>157 fn try_into(self) -> Result<packets::Address, Self::Error> { 158 let addr_le_bytes = self.as_le_bytes().map_err(ConversionError::Python)?; 159 160 // packets::Address only supports converting from a u64 (TODO: update if/when it supports converting from [u8; 6] -- https://github.com/google/pdl/issues/75) 161 // So first we take the python `Address` little-endian bytes (6 bytes), copy them into a 162 // [u8; 8] in little-endian format, and finally convert it into a u64. 163 let mut buf = [0_u8; 8]; 164 buf[0..6].copy_from_slice(&addr_le_bytes); 165 let address_u64 = u64::from_le_bytes(buf); 166 167 packets::Address::try_from(address_u64) 168 .map_err(InvalidAddress) 169 .map_err(ConversionError::Native) 170 } 171 } 172 173 impl IntoPy<PyObject> for AddressType { into_py(self, py: Python<'_>) -> PyObject174 fn into_py(self, py: Python<'_>) -> PyObject { 175 u8::from(self).to_object(py) 176 } 177 } 178 179 impl<'source> FromPyObject<'source> for ErrorCode { extract(ob: &'source PyAny) -> PyResult<Self>180 fn extract(ob: &'source PyAny) -> PyResult<Self> { 181 ob.extract() 182 } 183 } 184 185 impl ToPyObject for ErrorCode { to_object(&self, py: Python<'_>) -> PyObject186 fn to_object(&self, py: Python<'_>) -> PyObject { 187 u8::from(self).to_object(py) 188 } 189 } 190