1 // Copyright 2023 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 //! HCI
16 
17 // re-export here, and internal usages of these imports should refer to this mod, not the internal
18 // mod
19 pub(crate) use crate::internal::hci::WithPacketType;
20 pub use crate::internal::hci::{packets, Error, Packet};
21 
22 use crate::wrapper::{
23     hci::packets::{AddressType, Command, ErrorCode},
24     ConversionError,
25 };
26 use itertools::Itertools as _;
27 use pyo3::{
28     exceptions::PyException, intern, types::PyModule, FromPyObject, IntoPy, PyAny, PyErr, PyObject,
29     PyResult, Python, ToPyObject,
30 };
31 
32 /// Provides helpers for interacting with HCI
33 pub struct HciConstant;
34 
35 impl HciConstant {
36     /// Human-readable error name
error_name(status: ErrorCode) -> PyResult<String>37     pub fn error_name(status: ErrorCode) -> PyResult<String> {
38         Python::with_gil(|py| {
39             PyModule::import(py, intern!(py, "bumble.hci"))?
40                 .getattr(intern!(py, "HCI_Constant"))?
41                 .call_method1(intern!(py, "error_name"), (status.to_object(py),))?
42                 .extract()
43         })
44     }
45 }
46 
47 /// Bumble's representation of an HCI command.
48 pub(crate) struct HciCommand(pub(crate) PyObject);
49 
50 impl HciCommand {
from_bytes(bytes: &[u8]) -> PyResult<Self>51     fn from_bytes(bytes: &[u8]) -> PyResult<Self> {
52         Python::with_gil(|py| {
53             PyModule::import(py, intern!(py, "bumble.hci"))?
54                 .getattr(intern!(py, "HCI_Command"))?
55                 .call_method1(intern!(py, "from_bytes"), (bytes,))
56                 .map(|obj| Self(obj.to_object(py)))
57         })
58     }
59 }
60 
61 impl TryFrom<Command> for HciCommand {
62     type Error = PyErr;
63 
try_from(value: Command) -> Result<Self, Self::Error>64     fn try_from(value: Command) -> Result<Self, Self::Error> {
65         HciCommand::from_bytes(&value.to_vec_with_packet_type())
66     }
67 }
68 
69 impl IntoPy<PyObject> for HciCommand {
into_py(self, _py: Python<'_>) -> PyObject70     fn into_py(self, _py: Python<'_>) -> PyObject {
71         self.0
72     }
73 }
74 
75 /// A Bluetooth address
76 #[derive(Clone)]
77 pub struct Address(pub(crate) PyObject);
78 
79 impl Address {
80     /// Creates a new [Address] object.
new(address: &str, address_type: AddressType) -> PyResult<Self>81     pub fn new(address: &str, address_type: AddressType) -> PyResult<Self> {
82         Python::with_gil(|py| {
83             PyModule::import(py, intern!(py, "bumble.device"))?
84                 .getattr(intern!(py, "Address"))?
85                 .call1((address, address_type))
86                 .map(|any| Self(any.into()))
87         })
88     }
89 
90     /// The type of address
address_type(&self) -> PyResult<AddressType>91     pub fn address_type(&self) -> PyResult<AddressType> {
92         Python::with_gil(|py| {
93             self.0
94                 .getattr(py, intern!(py, "address_type"))?
95                 .extract::<u8>(py)?
96                 .try_into()
97                 .map_err(|addr_type| {
98                     PyErr::new::<PyException, _>(format!(
99                         "Failed to convert {addr_type} to AddressType"
100                     ))
101                 })
102         })
103     }
104 
105     /// True if the address is static
is_static(&self) -> PyResult<bool>106     pub fn is_static(&self) -> PyResult<bool> {
107         Python::with_gil(|py| {
108             self.0
109                 .getattr(py, intern!(py, "is_static"))?
110                 .extract::<bool>(py)
111         })
112     }
113 
114     /// True if the address is resolvable
is_resolvable(&self) -> PyResult<bool>115     pub fn is_resolvable(&self) -> PyResult<bool> {
116         Python::with_gil(|py| {
117             self.0
118                 .getattr(py, intern!(py, "is_resolvable"))?
119                 .extract::<bool>(py)
120         })
121     }
122 
123     /// Address bytes in _little-endian_ format
as_le_bytes(&self) -> PyResult<Vec<u8>>124     pub fn as_le_bytes(&self) -> PyResult<Vec<u8>> {
125         Python::with_gil(|py| {
126             self.0
127                 .call_method0(py, intern!(py, "to_bytes"))?
128                 .extract::<Vec<u8>>(py)
129         })
130     }
131 
132     /// Address bytes as big-endian colon-separated hex
as_hex(&self) -> PyResult<String>133     pub fn as_hex(&self) -> PyResult<String> {
134         self.as_le_bytes().map(|bytes| {
135             bytes
136                 .into_iter()
137                 .rev()
138                 .map(|byte| hex::encode_upper([byte]))
139                 .join(":")
140         })
141     }
142 }
143 
144 impl ToPyObject for Address {
to_object(&self, _py: Python<'_>) -> PyObject145     fn to_object(&self, _py: Python<'_>) -> PyObject {
146         self.0.clone()
147     }
148 }
149 
150 /// An error meaning that the u64 value did not represent a valid BT address.
151 #[derive(Debug)]
152 pub struct InvalidAddress(#[allow(unused)] u64);
153 
154 impl TryInto<packets::Address> for Address {
155     type Error = ConversionError<InvalidAddress>;
156 
try_into(self) -> Result<packets::Address, Self::Error>157     fn try_into(self) -> Result<packets::Address, Self::Error> {
158         let addr_le_bytes = self.as_le_bytes().map_err(ConversionError::Python)?;
159 
160         // packets::Address only supports converting from a u64 (TODO: update if/when it supports converting from [u8; 6] -- https://github.com/google/pdl/issues/75)
161         // So first we take the python `Address` little-endian bytes (6 bytes), copy them into a
162         // [u8; 8] in little-endian format, and finally convert it into a u64.
163         let mut buf = [0_u8; 8];
164         buf[0..6].copy_from_slice(&addr_le_bytes);
165         let address_u64 = u64::from_le_bytes(buf);
166 
167         packets::Address::try_from(address_u64)
168             .map_err(InvalidAddress)
169             .map_err(ConversionError::Native)
170     }
171 }
172 
173 impl IntoPy<PyObject> for AddressType {
into_py(self, py: Python<'_>) -> PyObject174     fn into_py(self, py: Python<'_>) -> PyObject {
175         u8::from(self).to_object(py)
176     }
177 }
178 
179 impl<'source> FromPyObject<'source> for ErrorCode {
extract(ob: &'source PyAny) -> PyResult<Self>180     fn extract(ob: &'source PyAny) -> PyResult<Self> {
181         ob.extract()
182     }
183 }
184 
185 impl ToPyObject for ErrorCode {
to_object(&self, py: Python<'_>) -> PyObject186     fn to_object(&self, py: Python<'_>) -> PyObject {
187         u8::from(self).to_object(py)
188     }
189 }
190