1 // Copyright 2023 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 use bytes::Bytes;
16 use futures::{channel::mpsc::UnboundedSender, sink::SinkExt, StreamExt};
17 use pica::{Handle, Pica};
18
19 use netsim_proto::model::chip::Radio as ProtoRadio;
20 use netsim_proto::model::Chip as ProtoChip;
21 use netsim_proto::stats::{netsim_radio_stats, NetsimRadioStats as ProtoRadioStats};
22
23 use crate::devices::chip::ChipIdentifier;
24 use crate::uwb::ranging_estimator::{SharedState, UwbRangingEstimator};
25 use crate::wireless::packet::handle_response;
26
27 use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
28 use std::sync::{Arc, Mutex, OnceLock};
29 use std::thread;
30
31 use super::{WirelessAdaptor, WirelessAdaptorImpl};
32
33 // TODO(b/331267949): Construct Manager struct for each wireless_adaptor module
34 static PICA_HANDLE_TO_STATE: OnceLock<SharedState> = OnceLock::new();
35
get_pica_handle_to_state() -> &'static SharedState36 fn get_pica_handle_to_state() -> &'static SharedState {
37 PICA_HANDLE_TO_STATE.get_or_init(SharedState::new)
38 }
39
40 static PICA: OnceLock<Arc<Mutex<Pica>>> = OnceLock::new();
41
get_pica() -> Arc<Mutex<Pica>>42 fn get_pica() -> Arc<Mutex<Pica>> {
43 PICA.get_or_init(|| {
44 Arc::new(Mutex::new(Pica::new(
45 Box::new(UwbRangingEstimator::new(get_pica_handle_to_state().clone())),
46 None,
47 )))
48 })
49 .clone()
50 }
51
52 static PICA_RUNTIME: OnceLock<Arc<tokio::runtime::Runtime>> = OnceLock::new();
53
get_pica_runtime() -> Arc<tokio::runtime::Runtime>54 fn get_pica_runtime() -> Arc<tokio::runtime::Runtime> {
55 PICA_RUNTIME.get_or_init(|| Arc::new(tokio::runtime::Runtime::new().unwrap())).clone()
56 }
57
58 /// Parameters for creating UWB chips
59 pub struct CreateParams {
60 #[allow(dead_code)]
61 pub address: String,
62 }
63
64 /// UWB struct will keep track of pica_id
65 pub struct Uwb {
66 pica_id: Handle,
67 uci_stream_writer: UnboundedSender<Vec<u8>>,
68 state: AtomicBool,
69 tx_count: AtomicI32,
70 rx_count: Arc<AtomicI32>,
71 }
72
73 impl Drop for Uwb {
drop(&mut self)74 fn drop(&mut self) {
75 get_pica_handle_to_state().remove(&self.pica_id);
76 }
77 }
78
79 impl WirelessAdaptor for Uwb {
handle_request(&self, packet: &Bytes)80 fn handle_request(&self, packet: &Bytes) {
81 // TODO(b/330788870): Increment tx_count
82 self.uci_stream_writer
83 .unbounded_send(packet.clone().into())
84 .expect("UciStream Receiver Disconnected");
85 let _ = self.tx_count.fetch_add(1, Ordering::SeqCst);
86 }
87
reset(&self)88 fn reset(&self) {
89 self.state.store(true, Ordering::SeqCst);
90 self.tx_count.store(0, Ordering::SeqCst);
91 self.rx_count.store(0, Ordering::SeqCst);
92 }
93
get(&self) -> ProtoChip94 fn get(&self) -> ProtoChip {
95 let mut chip_proto = ProtoChip::new();
96 let uwb_proto = ProtoRadio {
97 state: self.state.load(Ordering::SeqCst).into(),
98 tx_count: self.tx_count.load(Ordering::SeqCst),
99 rx_count: self.rx_count.load(Ordering::SeqCst),
100 ..Default::default()
101 };
102 chip_proto.mut_uwb().clone_from(&uwb_proto);
103 chip_proto
104 }
105
patch(&self, chip: &ProtoChip)106 fn patch(&self, chip: &ProtoChip) {
107 if !chip.has_uwb() {
108 return;
109 }
110 if let Some(patch_state) = chip.uwb().state {
111 self.state.store(patch_state, Ordering::SeqCst);
112 }
113 }
114
get_stats(&self, duration_secs: u64) -> Vec<ProtoRadioStats>115 fn get_stats(&self, duration_secs: u64) -> Vec<ProtoRadioStats> {
116 let mut stats_proto = ProtoRadioStats::new();
117 stats_proto.set_duration_secs(duration_secs);
118 stats_proto.set_kind(netsim_radio_stats::Kind::UWB);
119 let chip_proto = self.get();
120 if chip_proto.has_uwb() {
121 stats_proto.set_tx_count(chip_proto.uwb().tx_count);
122 stats_proto.set_rx_count(chip_proto.uwb().rx_count);
123 }
124 vec![stats_proto]
125 }
126 }
127
uwb_start()128 pub fn uwb_start() {
129 // TODO: Provide TcpStream as UWB connector
130 let _ = thread::Builder::new().name("pica_service".to_string()).spawn(move || {
131 log::info!("PICA STARTED");
132 let _guard = get_pica_runtime().enter();
133 futures::executor::block_on(pica::run(&get_pica()))
134 });
135 }
136
new(_create_params: &CreateParams, chip_id: ChipIdentifier) -> WirelessAdaptorImpl137 pub fn new(_create_params: &CreateParams, chip_id: ChipIdentifier) -> WirelessAdaptorImpl {
138 let (uci_stream_sender, uci_stream_receiver) = futures::channel::mpsc::unbounded();
139 let (uci_sink_sender, uci_sink_receiver) = futures::channel::mpsc::unbounded();
140 let _guard = get_pica_runtime().enter();
141 let pica_id = get_pica()
142 .lock()
143 .unwrap()
144 .add_device(Box::pin(uci_stream_receiver), Box::pin(uci_sink_sender.sink_err_into()))
145 .unwrap();
146 get_pica_handle_to_state().insert(pica_id, chip_id);
147
148 let rx_count = Arc::new(AtomicI32::new(0));
149 let uwb = Uwb {
150 pica_id,
151 uci_stream_writer: uci_stream_sender,
152 state: AtomicBool::new(true),
153 tx_count: AtomicI32::new(0),
154 rx_count: rx_count.clone(),
155 };
156
157 // Spawn a future for obtaining packet from pica and invoking handle_response_rust
158 get_pica_runtime().spawn(async move {
159 let mut uci_sink_receiver = uci_sink_receiver;
160 while let Some(packet) = uci_sink_receiver.next().await {
161 handle_response(chip_id, &Bytes::from(packet));
162 rx_count.fetch_add(1, Ordering::SeqCst);
163 }
164 });
165 Box::new(uwb)
166 }
167
168 #[cfg(test)]
169 mod tests {
170
171 use super::*;
172
new_uwb_wireless_adaptor() -> WirelessAdaptorImpl173 fn new_uwb_wireless_adaptor() -> WirelessAdaptorImpl {
174 new(&CreateParams { address: "test".to_string() }, ChipIdentifier(0))
175 }
176
patch_chip_proto() -> ProtoChip177 fn patch_chip_proto() -> ProtoChip {
178 let mut chip_proto = ProtoChip::new();
179 let uwb_proto = ProtoRadio { state: false.into(), ..Default::default() };
180 chip_proto.mut_uwb().clone_from(&uwb_proto);
181 chip_proto
182 }
183
184 #[test]
test_uwb_get()185 fn test_uwb_get() {
186 let wireless_adaptor = new_uwb_wireless_adaptor();
187 assert!(wireless_adaptor.get().has_uwb());
188 }
189
190 #[test]
test_uwb_patch_and_reset()191 fn test_uwb_patch_and_reset() {
192 let wireless_adaptor = new_uwb_wireless_adaptor();
193 wireless_adaptor.patch(&patch_chip_proto());
194 let binding = wireless_adaptor.get();
195 let radio = binding.uwb();
196 assert_eq!(radio.state, Some(false));
197 wireless_adaptor.reset();
198 let binding = wireless_adaptor.get();
199 let radio = binding.uwb();
200 assert_eq!(radio.rx_count, 0);
201 assert_eq!(radio.tx_count, 0);
202 assert_eq!(radio.state, Some(true));
203 }
204
205 #[test]
test_get_stats()206 fn test_get_stats() {
207 let wireless_adaptor = new_uwb_wireless_adaptor();
208 let radio_stat_vec = wireless_adaptor.get_stats(0);
209 let radio_stat = radio_stat_vec.first().unwrap();
210 assert_eq!(radio_stat.kind(), netsim_radio_stats::Kind::UWB);
211 assert_eq!(radio_stat.duration_secs(), 0);
212 }
213 }
214