1 // Copyright 2023 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 //! Rust version of the Python `l2cap_bridge.py` found under the `apps` folder.
16
17 use crate::L2cap;
18 use anyhow::anyhow;
19 use bumble::wrapper::{device::Device, l2cap::LeConnectionOrientedChannel, transport::Transport};
20 use owo_colors::{colors::css::Orange, OwoColorize};
21 use pyo3::{PyResult, Python};
22 use std::{future::Future, path::PathBuf, sync::Arc};
23 use tokio::{
24 io::{AsyncReadExt, AsyncWriteExt},
25 net::tcp::{OwnedReadHalf, OwnedWriteHalf},
26 sync::{mpsc::Receiver, Mutex},
27 };
28
29 mod client_bridge;
30 mod server_bridge;
31
run( command: L2cap, device_config: PathBuf, transport: String, psm: u16, max_credits: Option<u16>, mtu: Option<u16>, mps: Option<u16>, ) -> PyResult<()>32 pub(crate) async fn run(
33 command: L2cap,
34 device_config: PathBuf,
35 transport: String,
36 psm: u16,
37 max_credits: Option<u16>,
38 mtu: Option<u16>,
39 mps: Option<u16>,
40 ) -> PyResult<()> {
41 println!("<<< connecting to HCI...");
42 let transport = Transport::open(transport).await?;
43 println!("<<< connected");
44
45 let mut device =
46 Device::from_config_file_with_hci(&device_config, transport.source()?, transport.sink()?)?;
47
48 device.power_on().await?;
49
50 match command {
51 L2cap::Server { tcp_host, tcp_port } => {
52 let args = server_bridge::Args {
53 psm,
54 max_credits,
55 mtu,
56 mps,
57 tcp_host,
58 tcp_port,
59 };
60
61 server_bridge::start(&args, &mut device).await?
62 }
63 L2cap::Client {
64 bluetooth_address,
65 tcp_host,
66 tcp_port,
67 } => {
68 let args = client_bridge::Args {
69 psm,
70 max_credits,
71 mtu,
72 mps,
73 bluetooth_address,
74 tcp_host,
75 tcp_port,
76 };
77
78 client_bridge::start(&args, &mut device).await?
79 }
80 };
81
82 // wait until user kills the process
83 tokio::signal::ctrl_c().await?;
84
85 Ok(())
86 }
87
88 /// Used for channeling data from Python callbacks to a Rust consumer.
89 enum BridgeData {
90 Data(Vec<u8>),
91 CloseSignal,
92 }
93
proxy_l2cap_rx_to_tcp_tx( mut l2cap_data_receiver: Receiver<BridgeData>, mut tcp_writer: OwnedWriteHalf, l2cap_channel: Arc<Mutex<Option<LeConnectionOrientedChannel>>>, ) -> anyhow::Result<()>94 async fn proxy_l2cap_rx_to_tcp_tx(
95 mut l2cap_data_receiver: Receiver<BridgeData>,
96 mut tcp_writer: OwnedWriteHalf,
97 l2cap_channel: Arc<Mutex<Option<LeConnectionOrientedChannel>>>,
98 ) -> anyhow::Result<()> {
99 while let Some(bridge_data) = l2cap_data_receiver.recv().await {
100 match bridge_data {
101 BridgeData::Data(sdu) => {
102 println!("{}", format!("<<< [L2CAP SDU]: {} bytes", sdu.len()).cyan());
103 tcp_writer
104 .write_all(sdu.as_ref())
105 .await
106 .map_err(|_| anyhow!("Failed to write to tcp stream"))?;
107 tcp_writer
108 .flush()
109 .await
110 .map_err(|_| anyhow!("Failed to flush tcp stream"))?;
111 }
112 BridgeData::CloseSignal => {
113 l2cap_channel.lock().await.take();
114 tcp_writer
115 .shutdown()
116 .await
117 .map_err(|_| anyhow!("Failed to shut down write half of tcp stream"))?;
118 return Ok(());
119 }
120 }
121 }
122 Ok(())
123 }
124
proxy_tcp_rx_to_l2cap_tx( mut tcp_reader: OwnedReadHalf, l2cap_channel: Arc<Mutex<Option<LeConnectionOrientedChannel>>>, drain_l2cap_after_write: bool, ) -> PyResult<()>125 async fn proxy_tcp_rx_to_l2cap_tx(
126 mut tcp_reader: OwnedReadHalf,
127 l2cap_channel: Arc<Mutex<Option<LeConnectionOrientedChannel>>>,
128 drain_l2cap_after_write: bool,
129 ) -> PyResult<()> {
130 let mut buf = [0; 4096];
131 loop {
132 match tcp_reader.read(&mut buf).await {
133 Ok(len) => {
134 if len == 0 {
135 println!("{}", "!!! End of stream".fg::<Orange>());
136
137 if let Some(mut channel) = l2cap_channel.lock().await.take() {
138 channel.disconnect().await.map_err(|e| {
139 eprintln!("Failed to call disconnect on l2cap channel: {e}");
140 e
141 })?;
142 }
143 return Ok(());
144 }
145
146 println!("{}", format!("<<< [TCP DATA]: {len} bytes").blue());
147 match l2cap_channel.lock().await.as_mut() {
148 None => {
149 println!("{}", "!!! L2CAP channel not connected, dropping".red());
150 return Ok(());
151 }
152 Some(channel) => {
153 channel.write(&buf[..len])?;
154 if drain_l2cap_after_write {
155 channel.drain().await?;
156 }
157 }
158 }
159 }
160 Err(e) => {
161 println!("{}", format!("!!! TCP connection lost: {}", e).red());
162 if let Some(mut channel) = l2cap_channel.lock().await.take() {
163 let _ = channel.disconnect().await.map_err(|e| {
164 eprintln!("Failed to call disconnect on l2cap channel: {e}");
165 });
166 }
167 return Err(e.into());
168 }
169 }
170 }
171 }
172
173 /// Copies the current thread's Python even loop (contained in `TaskLocals`) into the given future.
174 /// Useful when sending work to another thread that calls Python code which calls `get_running_loop()`.
inject_py_event_loop<F, R>(fut: F) -> PyResult<impl Future<Output = R>> where F: Future<Output = R> + Send + 'static,175 pub fn inject_py_event_loop<F, R>(fut: F) -> PyResult<impl Future<Output = R>>
176 where
177 F: Future<Output = R> + Send + 'static,
178 {
179 let locals = Python::with_gil(pyo3_asyncio::tokio::get_current_locals)?;
180 Ok(pyo3_asyncio::tokio::scope(locals, fut))
181 }
182