1 // Copyright 2022, The Android Open Source Project 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 use std::collections::HashMap; 16 use std::iter::FromIterator; 17 use std::time::Duration; 18 19 use log::{debug, error, warn}; 20 use tokio::sync::{mpsc, oneshot, watch}; 21 use tokio::time::timeout; 22 23 use crate::error::{Error, Result}; 24 use crate::params::app_config_params::AppConfigParams; 25 use crate::params::ccc_started_app_config_params::CccStartedAppConfigParams; 26 use crate::params::uci_packets::{ 27 Controlee, ControleeStatusList, Controlees, MulticastUpdateStatusCode, SessionId, SessionState, 28 SessionType, UpdateMulticastListAction, 29 }; 30 use crate::uci::error::status_code_to_result; 31 use crate::uci::uci_manager::UciManager; 32 33 const NOTIFICATION_TIMEOUT_MS: u64 = 1000; 34 35 #[derive(Debug)] 36 pub(super) enum Response { 37 Null, 38 AppConfigParams(AppConfigParams), 39 } 40 pub(super) type ResponseSender = oneshot::Sender<Result<Response>>; 41 42 pub(super) struct UwbSession { 43 cmd_sender: mpsc::UnboundedSender<(Command, ResponseSender)>, 44 state_sender: watch::Sender<SessionState>, 45 controlee_status_notf_sender: Option<oneshot::Sender<ControleeStatusList>>, 46 } 47 48 impl UwbSession { new<T: UciManager>( uci_manager: T, session_id: SessionId, session_type: SessionType, ) -> Self49 pub fn new<T: UciManager>( 50 uci_manager: T, 51 session_id: SessionId, 52 session_type: SessionType, 53 ) -> Self { 54 let (cmd_sender, cmd_receiver) = mpsc::unbounded_channel(); 55 let (state_sender, mut state_receiver) = watch::channel(SessionState::SessionStateDeinit); 56 // Mark the initial value of state as seen. 57 let _ = state_receiver.borrow_and_update(); 58 59 let mut actor = UwbSessionActor::new( 60 cmd_receiver, 61 state_receiver, 62 uci_manager, 63 session_id, 64 session_type, 65 ); 66 tokio::spawn(async move { actor.run().await }); 67 68 Self { cmd_sender, state_sender, controlee_status_notf_sender: None } 69 } 70 initialize(&mut self, params: AppConfigParams, result_sender: ResponseSender)71 pub fn initialize(&mut self, params: AppConfigParams, result_sender: ResponseSender) { 72 let _ = self.cmd_sender.send((Command::Initialize { params }, result_sender)); 73 } 74 deinitialize(&mut self, result_sender: ResponseSender)75 pub fn deinitialize(&mut self, result_sender: ResponseSender) { 76 let _ = self.cmd_sender.send((Command::Deinitialize, result_sender)); 77 } 78 start_ranging(&mut self, result_sender: ResponseSender)79 pub fn start_ranging(&mut self, result_sender: ResponseSender) { 80 let _ = self.cmd_sender.send((Command::StartRanging, result_sender)); 81 } 82 stop_ranging(&mut self, result_sender: ResponseSender)83 pub fn stop_ranging(&mut self, result_sender: ResponseSender) { 84 let _ = self.cmd_sender.send((Command::StopRanging, result_sender)); 85 } 86 reconfigure(&mut self, params: AppConfigParams, result_sender: ResponseSender)87 pub fn reconfigure(&mut self, params: AppConfigParams, result_sender: ResponseSender) { 88 let _ = self.cmd_sender.send((Command::Reconfigure { params }, result_sender)); 89 } 90 update_controller_multicast_list( &mut self, action: UpdateMulticastListAction, controlees: Vec<Controlee>, result_sender: ResponseSender, )91 pub fn update_controller_multicast_list( 92 &mut self, 93 action: UpdateMulticastListAction, 94 controlees: Vec<Controlee>, 95 result_sender: ResponseSender, 96 ) { 97 let (notf_sender, notf_receiver) = oneshot::channel(); 98 self.controlee_status_notf_sender = Some(notf_sender); 99 let _ = self.cmd_sender.send(( 100 Command::UpdateControllerMulticastList { action, controlees, notf_receiver }, 101 result_sender, 102 )); 103 } 104 params(&mut self, result_sender: ResponseSender)105 pub fn params(&mut self, result_sender: ResponseSender) { 106 let _ = self.cmd_sender.send((Command::GetParams, result_sender)); 107 } 108 on_session_status_changed(&mut self, state: SessionState)109 pub fn on_session_status_changed(&mut self, state: SessionState) { 110 let _ = self.state_sender.send(state); 111 } 112 on_controller_multicast_list_updated(&mut self, status_list: ControleeStatusList)113 pub fn on_controller_multicast_list_updated(&mut self, status_list: ControleeStatusList) { 114 if let Some(sender) = self.controlee_status_notf_sender.take() { 115 let _ = sender.send(status_list); 116 } 117 } 118 } 119 120 struct UwbSessionActor<T: UciManager> { 121 cmd_receiver: mpsc::UnboundedReceiver<(Command, ResponseSender)>, 122 state_receiver: watch::Receiver<SessionState>, 123 uci_manager: T, 124 session_id: SessionId, 125 session_type: SessionType, 126 params: Option<AppConfigParams>, 127 } 128 129 impl<T: UciManager> UwbSessionActor<T> { new( cmd_receiver: mpsc::UnboundedReceiver<(Command, ResponseSender)>, state_receiver: watch::Receiver<SessionState>, uci_manager: T, session_id: SessionId, session_type: SessionType, ) -> Self130 fn new( 131 cmd_receiver: mpsc::UnboundedReceiver<(Command, ResponseSender)>, 132 state_receiver: watch::Receiver<SessionState>, 133 uci_manager: T, 134 session_id: SessionId, 135 session_type: SessionType, 136 ) -> Self { 137 Self { cmd_receiver, state_receiver, uci_manager, session_id, session_type, params: None } 138 } 139 run(&mut self)140 async fn run(&mut self) { 141 loop { 142 tokio::select! { 143 cmd = self.cmd_receiver.recv() => { 144 match cmd { 145 None => { 146 debug!("UwbSession is about to drop."); 147 break; 148 } 149 Some((cmd, result_sender)) => { 150 let result = match cmd { 151 Command::Initialize { params } => self.initialize(params).await, 152 Command::Deinitialize => self.deinitialize().await, 153 Command::StartRanging => self.start_ranging().await, 154 Command::StopRanging => self.stop_ranging().await, 155 Command::Reconfigure { params } => self.reconfigure(params).await, 156 Command::UpdateControllerMulticastList { 157 action, 158 controlees, 159 notf_receiver, 160 } => { 161 self.update_controller_multicast_list( 162 action, 163 controlees, 164 notf_receiver, 165 ) 166 .await 167 }, 168 Command::GetParams => self.params().await, 169 }; 170 let _ = result_sender.send(result); 171 } 172 } 173 } 174 } 175 } 176 } 177 initialize(&mut self, params: AppConfigParams) -> Result<Response>178 async fn initialize(&mut self, params: AppConfigParams) -> Result<Response> { 179 debug_assert!(*self.state_receiver.borrow() == SessionState::SessionStateDeinit); 180 181 // TODO(b/279669973): Support CR-461 fully here. Need to wait for session init rsp. 182 // But, that does not seem to be fully plumbed up in session_manager yet. 183 self.uci_manager.session_init(self.session_id, self.session_type).await?; 184 self.wait_state(SessionState::SessionStateInit).await?; 185 186 self.reconfigure(params).await?; 187 self.wait_state(SessionState::SessionStateIdle).await?; 188 189 Ok(Response::Null) 190 } 191 deinitialize(&mut self) -> Result<Response>192 async fn deinitialize(&mut self) -> Result<Response> { 193 self.uci_manager.session_deinit(self.session_id).await?; 194 Ok(Response::Null) 195 } 196 start_ranging(&mut self) -> Result<Response>197 async fn start_ranging(&mut self) -> Result<Response> { 198 let state = *self.state_receiver.borrow(); 199 match state { 200 SessionState::SessionStateActive => { 201 warn!("Session {} is already running", self.session_id); 202 Err(Error::BadParameters) 203 } 204 SessionState::SessionStateIdle => { 205 self.uci_manager.range_start(self.session_id).await?; 206 self.wait_state(SessionState::SessionStateActive).await?; 207 208 let params = if self.session_type != SessionType::Ccc { 209 // self.params should be Some() in this state. 210 self.params.clone().unwrap() 211 } else { 212 // Get the CCC specific app config after ranging started. 213 let tlvs = self 214 .uci_manager 215 .session_get_app_config(self.session_id, vec![]) 216 .await 217 .map_err(|e| { 218 error!("Failed to get CCC app config after start ranging: {:?}", e); 219 e 220 })?; 221 let config_map = HashMap::from_iter(tlvs.into_iter().map(|tlv| { 222 let tlv = tlv.into_inner(); 223 (tlv.cfg_id, tlv.v.clone()) 224 })); 225 let params = CccStartedAppConfigParams::from_config_map(config_map) 226 .ok_or_else(|| { 227 error!("Failed to generate CccStartedAppConfigParams"); 228 Error::Unknown 229 })?; 230 AppConfigParams::CccStarted(params) 231 }; 232 Ok(Response::AppConfigParams(params)) 233 } 234 _ => { 235 error!("Session {} cannot start running at {:?}", self.session_id, state); 236 Err(Error::BadParameters) 237 } 238 } 239 } 240 stop_ranging(&mut self) -> Result<Response>241 async fn stop_ranging(&mut self) -> Result<Response> { 242 let state = *self.state_receiver.borrow(); 243 match state { 244 SessionState::SessionStateIdle => { 245 warn!("Session {} is already stopped", self.session_id); 246 Ok(Response::Null) 247 } 248 SessionState::SessionStateActive => { 249 self.uci_manager.range_stop(self.session_id).await?; 250 self.wait_state(SessionState::SessionStateIdle).await?; 251 252 Ok(Response::Null) 253 } 254 _ => { 255 error!("Session {} cannot stop running at {:?}", self.session_id, state); 256 Err(Error::BadParameters) 257 } 258 } 259 } 260 reconfigure(&mut self, params: AppConfigParams) -> Result<Response>261 async fn reconfigure(&mut self, params: AppConfigParams) -> Result<Response> { 262 debug_assert!(*self.state_receiver.borrow() != SessionState::SessionStateDeinit); 263 264 let state = *self.state_receiver.borrow(); 265 let tlvs = match self.params.as_ref() { 266 Some(prev_params) => { 267 if let Some(tlvs) = params.generate_updated_tlvs(prev_params, state) { 268 tlvs 269 } else { 270 error!("Cannot update the app config at state {:?}: {:?}", state, params); 271 return Err(Error::BadParameters); 272 } 273 } 274 None => params.generate_tlvs(), 275 }; 276 277 let result = self.uci_manager.session_set_app_config(self.session_id, tlvs).await?; 278 for config_status in result.config_status.iter() { 279 warn!( 280 "AppConfig {:?} is not applied: {:?}", 281 config_status.cfg_id, config_status.status 282 ); 283 } 284 if let Err(e) = status_code_to_result(result.status) { 285 error!("Failed to set app_config. StatusCode: {:?}", result.status); 286 return Err(e); 287 } 288 289 self.params = Some(params); 290 Ok(Response::Null) 291 } 292 update_controller_multicast_list( &mut self, action: UpdateMulticastListAction, controlees: Vec<Controlee>, notf_receiver: oneshot::Receiver<ControleeStatusList>, ) -> Result<Response>293 async fn update_controller_multicast_list( 294 &mut self, 295 action: UpdateMulticastListAction, 296 controlees: Vec<Controlee>, 297 notf_receiver: oneshot::Receiver<ControleeStatusList>, 298 ) -> Result<Response> { 299 if self.session_type == SessionType::Ccc { 300 error!("Cannot update multicast list for CCC session"); 301 return Err(Error::BadParameters); 302 } 303 304 let state = *self.state_receiver.borrow(); 305 if !matches!(state, SessionState::SessionStateIdle | SessionState::SessionStateActive) { 306 error!("Cannot update multicast list at state {:?}", state); 307 return Err(Error::BadParameters); 308 } 309 310 self.uci_manager 311 .session_update_controller_multicast_list( 312 self.session_id, 313 action, 314 Controlees::NoSessionKey(controlees), 315 false, 316 false, 317 ) 318 .await?; 319 320 // Wait for the notification of the update status. 321 let results = timeout(Duration::from_millis(NOTIFICATION_TIMEOUT_MS), notf_receiver) 322 .await 323 .map_err(|_| { 324 error!("Timeout waiting for the multicast list notification"); 325 Error::Timeout 326 })? 327 .map_err(|_| { 328 error!("oneshot sender is dropped."); 329 Error::Unknown 330 })?; 331 332 // Check the update status for adding new controlees. 333 if action == UpdateMulticastListAction::AddControlee { 334 match results { 335 ControleeStatusList::V1(res) => { 336 for result in res.iter() { 337 if result.status != MulticastUpdateStatusCode::StatusOkMulticastListUpdate { 338 error!("Failed to update multicast list: {:?}", result); 339 return Err(Error::Unknown); 340 } 341 } 342 } 343 ControleeStatusList::V2(res) => { 344 for result in res.iter() { 345 if result.status != MulticastUpdateStatusCode::StatusOkMulticastListUpdate { 346 error!("Failed to update multicast list: {:?}", result); 347 return Err(Error::Unknown); 348 } 349 } 350 } 351 } 352 } 353 354 Ok(Response::Null) 355 } 356 wait_state(&mut self, expected_state: SessionState) -> Result<()>357 async fn wait_state(&mut self, expected_state: SessionState) -> Result<()> { 358 // Wait for the notification of the session status. 359 timeout(Duration::from_millis(NOTIFICATION_TIMEOUT_MS), self.state_receiver.changed()) 360 .await 361 .map_err(|_| { 362 error!("Timeout waiting for the session status notification"); 363 Error::Timeout 364 })? 365 .map_err(|_| { 366 debug!("UwbSession is about to drop."); 367 Error::Unknown 368 })?; 369 370 // Check if the latest session status is expected or not. 371 let state = *self.state_receiver.borrow(); 372 if state != expected_state { 373 error!( 374 "Transit to wrong Session state {:?}. The expected state is {:?}", 375 state, expected_state 376 ); 377 return Err(Error::BadParameters); 378 } 379 380 Ok(()) 381 } 382 params(&mut self) -> Result<Response>383 async fn params(&mut self) -> Result<Response> { 384 match &self.params { 385 None => Err(Error::BadParameters), 386 Some(params) => Ok(Response::AppConfigParams(params.clone())), 387 } 388 } 389 } 390 391 enum Command { 392 Initialize { 393 params: AppConfigParams, 394 }, 395 Deinitialize, 396 StartRanging, 397 StopRanging, 398 Reconfigure { 399 params: AppConfigParams, 400 }, 401 UpdateControllerMulticastList { 402 action: UpdateMulticastListAction, 403 controlees: Vec<Controlee>, 404 notf_receiver: oneshot::Receiver<ControleeStatusList>, 405 }, 406 GetParams, 407 } 408