1 // Copyright 2023 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 //! Rust version of the Python `l2cap_bridge.py` found under the `apps` folder.
16 
17 use crate::L2cap;
18 use anyhow::anyhow;
19 use bumble::wrapper::{device::Device, l2cap::LeConnectionOrientedChannel, transport::Transport};
20 use owo_colors::{colors::css::Orange, OwoColorize};
21 use pyo3::{PyResult, Python};
22 use std::{future::Future, path::PathBuf, sync::Arc};
23 use tokio::{
24     io::{AsyncReadExt, AsyncWriteExt},
25     net::tcp::{OwnedReadHalf, OwnedWriteHalf},
26     sync::{mpsc::Receiver, Mutex},
27 };
28 
29 mod client_bridge;
30 mod server_bridge;
31 
run( command: L2cap, device_config: PathBuf, transport: String, psm: u16, max_credits: Option<u16>, mtu: Option<u16>, mps: Option<u16>, ) -> PyResult<()>32 pub(crate) async fn run(
33     command: L2cap,
34     device_config: PathBuf,
35     transport: String,
36     psm: u16,
37     max_credits: Option<u16>,
38     mtu: Option<u16>,
39     mps: Option<u16>,
40 ) -> PyResult<()> {
41     println!("<<< connecting to HCI...");
42     let transport = Transport::open(transport).await?;
43     println!("<<< connected");
44 
45     let mut device =
46         Device::from_config_file_with_hci(&device_config, transport.source()?, transport.sink()?)?;
47 
48     device.power_on().await?;
49 
50     match command {
51         L2cap::Server { tcp_host, tcp_port } => {
52             let args = server_bridge::Args {
53                 psm,
54                 max_credits,
55                 mtu,
56                 mps,
57                 tcp_host,
58                 tcp_port,
59             };
60 
61             server_bridge::start(&args, &mut device).await?
62         }
63         L2cap::Client {
64             bluetooth_address,
65             tcp_host,
66             tcp_port,
67         } => {
68             let args = client_bridge::Args {
69                 psm,
70                 max_credits,
71                 mtu,
72                 mps,
73                 bluetooth_address,
74                 tcp_host,
75                 tcp_port,
76             };
77 
78             client_bridge::start(&args, &mut device).await?
79         }
80     };
81 
82     // wait until user kills the process
83     tokio::signal::ctrl_c().await?;
84 
85     Ok(())
86 }
87 
88 /// Used for channeling data from Python callbacks to a Rust consumer.
89 enum BridgeData {
90     Data(Vec<u8>),
91     CloseSignal,
92 }
93 
proxy_l2cap_rx_to_tcp_tx( mut l2cap_data_receiver: Receiver<BridgeData>, mut tcp_writer: OwnedWriteHalf, l2cap_channel: Arc<Mutex<Option<LeConnectionOrientedChannel>>>, ) -> anyhow::Result<()>94 async fn proxy_l2cap_rx_to_tcp_tx(
95     mut l2cap_data_receiver: Receiver<BridgeData>,
96     mut tcp_writer: OwnedWriteHalf,
97     l2cap_channel: Arc<Mutex<Option<LeConnectionOrientedChannel>>>,
98 ) -> anyhow::Result<()> {
99     while let Some(bridge_data) = l2cap_data_receiver.recv().await {
100         match bridge_data {
101             BridgeData::Data(sdu) => {
102                 println!("{}", format!("<<< [L2CAP SDU]: {} bytes", sdu.len()).cyan());
103                 tcp_writer
104                     .write_all(sdu.as_ref())
105                     .await
106                     .map_err(|_| anyhow!("Failed to write to tcp stream"))?;
107                 tcp_writer
108                     .flush()
109                     .await
110                     .map_err(|_| anyhow!("Failed to flush tcp stream"))?;
111             }
112             BridgeData::CloseSignal => {
113                 l2cap_channel.lock().await.take();
114                 tcp_writer
115                     .shutdown()
116                     .await
117                     .map_err(|_| anyhow!("Failed to shut down write half of tcp stream"))?;
118                 return Ok(());
119             }
120         }
121     }
122     Ok(())
123 }
124 
proxy_tcp_rx_to_l2cap_tx( mut tcp_reader: OwnedReadHalf, l2cap_channel: Arc<Mutex<Option<LeConnectionOrientedChannel>>>, drain_l2cap_after_write: bool, ) -> PyResult<()>125 async fn proxy_tcp_rx_to_l2cap_tx(
126     mut tcp_reader: OwnedReadHalf,
127     l2cap_channel: Arc<Mutex<Option<LeConnectionOrientedChannel>>>,
128     drain_l2cap_after_write: bool,
129 ) -> PyResult<()> {
130     let mut buf = [0; 4096];
131     loop {
132         match tcp_reader.read(&mut buf).await {
133             Ok(len) => {
134                 if len == 0 {
135                     println!("{}", "!!! End of stream".fg::<Orange>());
136 
137                     if let Some(mut channel) = l2cap_channel.lock().await.take() {
138                         channel.disconnect().await.map_err(|e| {
139                             eprintln!("Failed to call disconnect on l2cap channel: {e}");
140                             e
141                         })?;
142                     }
143                     return Ok(());
144                 }
145 
146                 println!("{}", format!("<<< [TCP DATA]: {len} bytes").blue());
147                 match l2cap_channel.lock().await.as_mut() {
148                     None => {
149                         println!("{}", "!!! L2CAP channel not connected, dropping".red());
150                         return Ok(());
151                     }
152                     Some(channel) => {
153                         channel.write(&buf[..len])?;
154                         if drain_l2cap_after_write {
155                             channel.drain().await?;
156                         }
157                     }
158                 }
159             }
160             Err(e) => {
161                 println!("{}", format!("!!! TCP connection lost: {}", e).red());
162                 if let Some(mut channel) = l2cap_channel.lock().await.take() {
163                     let _ = channel.disconnect().await.map_err(|e| {
164                         eprintln!("Failed to call disconnect on l2cap channel: {e}");
165                     });
166                 }
167                 return Err(e.into());
168             }
169         }
170     }
171 }
172 
173 /// Copies the current thread's Python even loop (contained in `TaskLocals`) into the given future.
174 /// Useful when sending work to another thread that calls Python code which calls `get_running_loop()`.
inject_py_event_loop<F, R>(fut: F) -> PyResult<impl Future<Output = R>> where F: Future<Output = R> + Send + 'static,175 pub fn inject_py_event_loop<F, R>(fut: F) -> PyResult<impl Future<Output = R>>
176 where
177     F: Future<Output = R> + Send + 'static,
178 {
179     let locals = Python::with_gil(pyo3_asyncio::tokio::get_current_locals)?;
180     Ok(pyo3_asyncio::tokio::scope(locals, fut))
181 }
182