1 //! This module handles an individual connection on the ATT fixed channel. 2 //! It handles ATT transactions and unacknowledged operations, backed by an 3 //! AttDatabase (that may in turn be backed by an upper-layer protocol) 4 5 use pdl_runtime::EncodeError; 6 use std::{cell::Cell, future::Future}; 7 8 use anyhow::Result; 9 use log::{error, trace, warn}; 10 use tokio::task::spawn_local; 11 12 use crate::{ 13 core::{ 14 shared_box::{WeakBox, WeakBoxRef}, 15 shared_mutex::SharedMutex, 16 }, 17 gatt::{ 18 ids::AttHandle, 19 mtu::{AttMtu, MtuEvent}, 20 opcode_types::{classify_opcode, OperationType}, 21 }, 22 packets::att::{self, AttErrorCode}, 23 utils::owned_handle::OwnedHandle, 24 }; 25 26 use super::{ 27 att_database::AttDatabase, 28 command_handler::AttCommandHandler, 29 indication_handler::{ConfirmationWatcher, IndicationError, IndicationHandler}, 30 request_handler::AttRequestHandler, 31 }; 32 33 enum AttRequestState<T: AttDatabase> { 34 Idle(AttRequestHandler<T>), 35 Pending { _task: OwnedHandle<()> }, 36 Replacing, 37 } 38 39 /// The errors that can occur while trying to send a packet 40 #[derive(Debug)] 41 pub enum SendError { 42 /// The packet failed to serialize 43 SerializeError(EncodeError), 44 /// The connection no longer exists 45 ConnectionDropped, 46 } 47 48 /// This represents a single ATT bearer (currently, always the unenhanced fixed 49 /// channel on LE) The AttRequestState ensures that only one transaction can 50 /// take place at a time 51 pub struct AttServerBearer<T: AttDatabase> { 52 // general 53 send_packet: Box<dyn Fn(att::Att) -> Result<(), EncodeError>>, 54 mtu: AttMtu, 55 56 // request state 57 curr_request: Cell<AttRequestState<T>>, 58 59 // indication state 60 indication_handler: SharedMutex<IndicationHandler<T>>, 61 pending_confirmation: ConfirmationWatcher, 62 63 // command handler (across all bearers) 64 command_handler: AttCommandHandler<T>, 65 } 66 67 impl<T: AttDatabase + Clone + 'static> AttServerBearer<T> { 68 /// Constructor, wrapping an ATT channel (for outgoing packets) and an 69 /// AttDatabase new(db: T, send_packet: impl Fn(att::Att) -> Result<(), EncodeError> + 'static) -> Self70 pub fn new(db: T, send_packet: impl Fn(att::Att) -> Result<(), EncodeError> + 'static) -> Self { 71 let (indication_handler, pending_confirmation) = IndicationHandler::new(db.clone()); 72 Self { 73 send_packet: Box::new(send_packet), 74 mtu: AttMtu::new(), 75 76 curr_request: AttRequestState::Idle(AttRequestHandler::new(db.clone())).into(), 77 78 indication_handler: SharedMutex::new(indication_handler), 79 pending_confirmation, 80 81 command_handler: AttCommandHandler::new(db), 82 } 83 } 84 send_packet(&self, packet: att::Att) -> Result<(), EncodeError>85 fn send_packet(&self, packet: att::Att) -> Result<(), EncodeError> { 86 (self.send_packet)(packet) 87 } 88 } 89 90 impl<T: AttDatabase + Clone + 'static> WeakBoxRef<'_, AttServerBearer<T>> { 91 /// Handle an incoming packet, and send outgoing packets as appropriate 92 /// using the owned ATT channel. handle_packet(&self, packet: att::Att)93 pub fn handle_packet(&self, packet: att::Att) { 94 match classify_opcode(packet.opcode) { 95 OperationType::Command => { 96 self.command_handler.process_packet(packet); 97 } 98 OperationType::Request => { 99 self.handle_request(packet); 100 } 101 OperationType::Confirmation => self.pending_confirmation.on_confirmation(), 102 OperationType::Response | OperationType::Notification | OperationType::Indication => { 103 unreachable!("the arbiter should not let us receive these packet types") 104 } 105 } 106 } 107 108 /// Send an indication, wait for the peer confirmation, and return the 109 /// appropriate status If multiple calls are outstanding, they are 110 /// executed in FIFO order. send_indication( &self, handle: AttHandle, data: Vec<u8>, ) -> impl Future<Output = Result<(), IndicationError>>111 pub fn send_indication( 112 &self, 113 handle: AttHandle, 114 data: Vec<u8>, 115 ) -> impl Future<Output = Result<(), IndicationError>> { 116 trace!("sending indication for handle {handle:?}"); 117 118 let locked_indication_handler = self.indication_handler.lock(); 119 let pending_mtu = self.mtu.snapshot(); 120 let this = self.downgrade(); 121 122 async move { 123 // first wait until we are at the head of the queue and are ready to send 124 // indications 125 let mut indication_handler = locked_indication_handler 126 .await 127 .ok_or_else(|| { 128 warn!("indication for handle {handle:?} cancelled while queued since the connection dropped"); 129 IndicationError::SendError(SendError::ConnectionDropped) 130 })?; 131 // then, if MTU negotiation is taking place, wait for it to complete 132 let mtu = pending_mtu 133 .await 134 .ok_or_else(|| { 135 warn!("indication for handle {handle:?} cancelled while waiting for MTU exchange to complete since the connection dropped"); 136 IndicationError::SendError(SendError::ConnectionDropped) 137 })?; 138 // finally, send, and wait for a response 139 indication_handler.send(handle, &data, mtu, |packet| this.try_send_packet(packet)).await 140 } 141 } 142 143 /// Handle a snooped MTU event, to update the MTU we use for our various 144 /// operations handle_mtu_event(&self, mtu_event: MtuEvent) -> Result<()>145 pub fn handle_mtu_event(&self, mtu_event: MtuEvent) -> Result<()> { 146 self.mtu.handle_event(mtu_event) 147 } 148 handle_request(&self, packet: att::Att)149 fn handle_request(&self, packet: att::Att) { 150 let curr_request = self.curr_request.replace(AttRequestState::Replacing); 151 self.curr_request.replace(match curr_request { 152 AttRequestState::Idle(mut request_handler) => { 153 // even if the MTU is updated afterwards, 5.3 3F 3.4.2.2 states that the 154 // request-time MTU should be used 155 let mtu = self.mtu.snapshot_or_default(); 156 let this = self.downgrade(); 157 let opcode = packet.opcode; 158 let task = spawn_local(async move { 159 trace!("starting ATT transaction"); 160 let reply = request_handler.process_packet(packet, mtu).await; 161 this.with(|this| { 162 this.map(|this| { 163 match this.send_packet(reply) { 164 Ok(_) => { 165 trace!("reply packet sent") 166 } 167 Err(err) => { 168 error!("serializer failure {err:?}, dropping packet and sending failed reply"); 169 // if this also fails, we're stuck 170 if let Err(err) = this.send_packet(att::AttErrorResponse { 171 opcode_in_error: opcode, 172 handle_in_error: AttHandle(0).into(), 173 error_code: AttErrorCode::UnlikelyError, 174 }.try_into().unwrap()) { 175 panic!("unexpected serialize error for known-good packet {err:?}") 176 } 177 } 178 }; 179 // ready for next transaction 180 this.curr_request.replace(AttRequestState::Idle(request_handler)); 181 }) 182 }); 183 }); 184 AttRequestState::Pending { _task: task.into() } 185 } 186 AttRequestState::Pending { .. } => { 187 warn!("multiple ATT operations cannot simultaneously take place, dropping one"); 188 // TODO(aryarahul) - disconnect connection here; 189 curr_request 190 } 191 AttRequestState::Replacing => { 192 panic!("Replacing is an ephemeral state"); 193 } 194 }); 195 } 196 } 197 198 impl<T: AttDatabase + Clone + 'static> WeakBox<AttServerBearer<T>> { try_send_packet(&self, packet: att::Att) -> Result<(), SendError>199 fn try_send_packet(&self, packet: att::Att) -> Result<(), SendError> { 200 self.with(|this| { 201 this.ok_or_else(|| { 202 warn!("connection dropped before packet sent"); 203 SendError::ConnectionDropped 204 })? 205 .send_packet(packet) 206 .map_err(SendError::SerializeError) 207 }) 208 } 209 } 210 211 #[cfg(test)] 212 mod test { 213 use std::rc::Rc; 214 215 use tokio::sync::mpsc::{error::TryRecvError, unbounded_channel, UnboundedReceiver}; 216 217 use super::*; 218 219 use crate::{ 220 core::{shared_box::SharedBox, uuid::Uuid}, 221 gatt::{ 222 ffi::AttributeBackingType, 223 ids::TransportIndex, 224 mocks::mock_datastore::{MockDatastore, MockDatastoreEvents}, 225 server::{ 226 att_database::{AttAttribute, AttPermissions}, 227 gatt_database::{ 228 GattCharacteristicWithHandle, GattDatabase, GattServiceWithHandle, 229 }, 230 test::test_att_db::TestAttDatabase, 231 }, 232 }, 233 packets::att, 234 utils::task::{block_on_locally, try_await}, 235 }; 236 237 const VALID_HANDLE: AttHandle = AttHandle(3); 238 const INVALID_HANDLE: AttHandle = AttHandle(4); 239 const ANOTHER_VALID_HANDLE: AttHandle = AttHandle(10); 240 241 const TCB_IDX: TransportIndex = TransportIndex(1); 242 open_connection( ) -> (SharedBox<AttServerBearer<TestAttDatabase>>, UnboundedReceiver<att::Att>)243 fn open_connection( 244 ) -> (SharedBox<AttServerBearer<TestAttDatabase>>, UnboundedReceiver<att::Att>) { 245 let db = TestAttDatabase::new(vec![ 246 ( 247 AttAttribute { 248 handle: VALID_HANDLE, 249 type_: Uuid::new(0x1234), 250 permissions: AttPermissions::READABLE | AttPermissions::INDICATE, 251 }, 252 vec![5, 6], 253 ), 254 ( 255 AttAttribute { 256 handle: ANOTHER_VALID_HANDLE, 257 type_: Uuid::new(0x5678), 258 permissions: AttPermissions::READABLE | AttPermissions::INDICATE, 259 }, 260 vec![5, 6], 261 ), 262 ]); 263 let (tx, rx) = unbounded_channel(); 264 let conn = AttServerBearer::new(db, move |packet| { 265 tx.send(packet).unwrap(); 266 Ok(()) 267 }) 268 .into(); 269 (conn, rx) 270 } 271 272 #[test] test_single_transaction()273 fn test_single_transaction() { 274 block_on_locally(async { 275 let (conn, mut rx) = open_connection(); 276 conn.as_ref().handle_packet( 277 att::AttReadRequest { attribute_handle: VALID_HANDLE.into() }.try_into().unwrap(), 278 ); 279 assert_eq!(rx.recv().await.unwrap().opcode, att::AttOpcode::ReadResponse); 280 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); 281 }); 282 } 283 284 #[test] test_sequential_transactions()285 fn test_sequential_transactions() { 286 block_on_locally(async { 287 let (conn, mut rx) = open_connection(); 288 conn.as_ref().handle_packet( 289 att::AttReadRequest { attribute_handle: INVALID_HANDLE.into() }.try_into().unwrap(), 290 ); 291 assert_eq!(rx.recv().await.unwrap().opcode, att::AttOpcode::ErrorResponse); 292 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); 293 294 conn.as_ref().handle_packet( 295 att::AttReadRequest { attribute_handle: VALID_HANDLE.into() }.try_into().unwrap(), 296 ); 297 assert_eq!(rx.recv().await.unwrap().opcode, att::AttOpcode::ReadResponse); 298 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); 299 }); 300 } 301 302 #[test] test_concurrent_transaction_failure()303 fn test_concurrent_transaction_failure() { 304 // arrange: AttServerBearer linked to a backing datastore and packet queue, with 305 // two characteristics in the database 306 let (datastore, mut data_rx) = MockDatastore::new(); 307 let datastore = Rc::new(datastore); 308 let db = SharedBox::new(GattDatabase::new()); 309 db.add_service_with_handles( 310 GattServiceWithHandle { 311 handle: AttHandle(1), 312 type_: Uuid::new(1), 313 characteristics: vec![ 314 GattCharacteristicWithHandle { 315 handle: VALID_HANDLE, 316 type_: Uuid::new(2), 317 permissions: AttPermissions::READABLE, 318 descriptors: vec![], 319 }, 320 GattCharacteristicWithHandle { 321 handle: ANOTHER_VALID_HANDLE, 322 type_: Uuid::new(2), 323 permissions: AttPermissions::READABLE, 324 descriptors: vec![], 325 }, 326 ], 327 }, 328 datastore, 329 ) 330 .unwrap(); 331 let (tx, mut rx) = unbounded_channel(); 332 let send_packet = move |packet| { 333 tx.send(packet).unwrap(); 334 Ok(()) 335 }; 336 let conn = SharedBox::new(AttServerBearer::new(db.get_att_database(TCB_IDX), send_packet)); 337 let data = [1, 2]; 338 339 // act: send two read requests before replying to either read 340 // first request 341 block_on_locally(async { 342 let req1 = att::AttReadRequest { attribute_handle: VALID_HANDLE.into() }; 343 conn.as_ref().handle_packet(req1.try_into().unwrap()); 344 // second request 345 let req2 = att::AttReadRequest { attribute_handle: ANOTHER_VALID_HANDLE.into() }; 346 conn.as_ref().handle_packet(req2.try_into().unwrap()); 347 // handle first reply 348 let MockDatastoreEvents::Read( 349 TCB_IDX, 350 VALID_HANDLE, 351 AttributeBackingType::Characteristic, 352 data_resp, 353 ) = data_rx.recv().await.unwrap() 354 else { 355 unreachable!(); 356 }; 357 data_resp.send(Ok(data.to_vec())).unwrap(); 358 trace!("reply sent from upper tester"); 359 360 // assert: that the first reply was made 361 let resp = rx.recv().await.unwrap(); 362 assert_eq!(resp, att::AttReadResponse { value: data.to_vec() }.try_into().unwrap()); 363 // assert no other replies were made 364 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); 365 // assert no callbacks are pending 366 assert_eq!(data_rx.try_recv().unwrap_err(), TryRecvError::Empty); 367 }); 368 } 369 370 #[test] test_indication_confirmation()371 fn test_indication_confirmation() { 372 block_on_locally(async { 373 // arrange 374 let (conn, mut rx) = open_connection(); 375 376 // act: send an indication 377 let pending_send = 378 spawn_local(conn.as_ref().send_indication(VALID_HANDLE, vec![1, 2, 3])); 379 assert_eq!(rx.recv().await.unwrap().opcode, att::AttOpcode::HandleValueIndication); 380 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); 381 // and the confirmation 382 conn.as_ref().handle_packet(att::AttHandleValueConfirmation {}.try_into().unwrap()); 383 384 // assert: the indication was correctly sent 385 assert!(matches!(pending_send.await.unwrap(), Ok(()))); 386 }); 387 } 388 389 #[test] test_sequential_indications()390 fn test_sequential_indications() { 391 block_on_locally(async { 392 // arrange 393 let (conn, mut rx) = open_connection(); 394 395 // act: send the first indication 396 let pending_send1 = 397 spawn_local(conn.as_ref().send_indication(VALID_HANDLE, vec![1, 2, 3])); 398 // wait for/capture the outgoing packet 399 let sent1 = rx.recv().await.unwrap(); 400 // send the response 401 conn.as_ref().handle_packet(att::AttHandleValueConfirmation {}.try_into().unwrap()); 402 // send the second indication 403 let pending_send2 = 404 spawn_local(conn.as_ref().send_indication(VALID_HANDLE, vec![1, 2, 3])); 405 // wait for/capture the outgoing packet 406 let sent2 = rx.recv().await.unwrap(); 407 // and the response 408 conn.as_ref().handle_packet(att::AttHandleValueConfirmation {}.try_into().unwrap()); 409 410 // assert: exactly two indications were sent 411 assert_eq!(sent1.opcode, att::AttOpcode::HandleValueIndication); 412 assert_eq!(sent2.opcode, att::AttOpcode::HandleValueIndication); 413 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); 414 // and that both got successful responses 415 assert!(matches!(pending_send1.await.unwrap(), Ok(()))); 416 assert!(matches!(pending_send2.await.unwrap(), Ok(()))); 417 }); 418 } 419 420 #[test] test_queued_indications_only_one_sent()421 fn test_queued_indications_only_one_sent() { 422 block_on_locally(async { 423 // arrange 424 let (conn, mut rx) = open_connection(); 425 426 // act: send two indications simultaneously 427 let pending_send1 = 428 spawn_local(conn.as_ref().send_indication(VALID_HANDLE, vec![1, 2, 3])); 429 let pending_send2 = 430 spawn_local(conn.as_ref().send_indication(ANOTHER_VALID_HANDLE, vec![1, 2, 3])); 431 // assert: only one was initially sent 432 assert_eq!(rx.recv().await.unwrap().opcode, att::AttOpcode::HandleValueIndication); 433 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); 434 // and both are still pending 435 assert!(!pending_send1.is_finished()); 436 assert!(!pending_send2.is_finished()); 437 }); 438 } 439 440 #[test] test_queued_indications_dequeue_second()441 fn test_queued_indications_dequeue_second() { 442 block_on_locally(async { 443 // arrange 444 let (conn, mut rx) = open_connection(); 445 446 // act: send two indications simultaneously 447 let pending_send1 = 448 spawn_local(conn.as_ref().send_indication(VALID_HANDLE, vec![1, 2, 3])); 449 let pending_send2 = 450 spawn_local(conn.as_ref().send_indication(ANOTHER_VALID_HANDLE, vec![1, 2, 3])); 451 // wait for/capture the outgoing packet 452 let sent1 = rx.recv().await.unwrap(); 453 // send response for the first one 454 conn.as_ref().handle_packet(att::AttHandleValueConfirmation {}.try_into().unwrap()); 455 // wait for/capture the outgoing packet 456 let sent2 = rx.recv().await.unwrap(); 457 458 // assert: the first future has completed successfully, the second one is 459 // pending 460 assert!(matches!(pending_send1.await.unwrap(), Ok(()))); 461 assert!(!pending_send2.is_finished()); 462 // and that both indications have been sent 463 assert_eq!(sent1.opcode, att::AttOpcode::HandleValueIndication); 464 assert_eq!(sent2.opcode, att::AttOpcode::HandleValueIndication); 465 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); 466 }); 467 } 468 469 #[test] test_queued_indications_complete_both()470 fn test_queued_indications_complete_both() { 471 block_on_locally(async { 472 // arrange 473 let (conn, mut rx) = open_connection(); 474 475 // act: send two indications simultaneously 476 let pending_send1 = 477 spawn_local(conn.as_ref().send_indication(VALID_HANDLE, vec![1, 2, 3])); 478 let pending_send2 = 479 spawn_local(conn.as_ref().send_indication(ANOTHER_VALID_HANDLE, vec![1, 2, 3])); 480 // wait for/capture the outgoing packet 481 let sent1 = rx.recv().await.unwrap(); 482 // send response for the first one 483 conn.as_ref().handle_packet(att::AttHandleValueConfirmation {}.try_into().unwrap()); 484 // wait for/capture the outgoing packet 485 let sent2 = rx.recv().await.unwrap(); 486 // and now the second 487 conn.as_ref().handle_packet(att::AttHandleValueConfirmation {}.try_into().unwrap()); 488 489 // assert: both futures have completed successfully 490 assert!(matches!(pending_send1.await.unwrap(), Ok(()))); 491 assert!(matches!(pending_send2.await.unwrap(), Ok(()))); 492 // and both indications have been sent 493 assert_eq!(sent1.opcode, att::AttOpcode::HandleValueIndication); 494 assert_eq!(sent2.opcode, att::AttOpcode::HandleValueIndication); 495 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); 496 }); 497 } 498 499 #[test] test_indication_connection_drop()500 fn test_indication_connection_drop() { 501 block_on_locally(async { 502 // arrange: a pending indication 503 let (conn, mut rx) = open_connection(); 504 let pending_send = 505 spawn_local(conn.as_ref().send_indication(VALID_HANDLE, vec![1, 2, 3])); 506 507 // act: drop the connection after the indication is sent 508 rx.recv().await.unwrap(); 509 drop(conn); 510 511 // assert: the pending indication fails with the appropriate error 512 assert!(matches!( 513 pending_send.await.unwrap(), 514 Err(IndicationError::ConnectionDroppedWhileWaitingForConfirmation) 515 )); 516 }); 517 } 518 519 #[test] test_single_indication_pending_mtu()520 fn test_single_indication_pending_mtu() { 521 block_on_locally(async { 522 // arrange: pending MTU negotiation 523 let (conn, mut rx) = open_connection(); 524 conn.as_ref().handle_mtu_event(MtuEvent::OutgoingRequest).unwrap(); 525 526 // act: try to send an indication with a large payload size 527 let _ = try_await(conn.as_ref().send_indication(VALID_HANDLE, (1..50).collect())).await; 528 // then resolve the MTU negotiation with a large MTU 529 conn.as_ref().handle_mtu_event(MtuEvent::IncomingResponse(100)).unwrap(); 530 531 // assert: the indication was sent 532 assert_eq!(rx.recv().await.unwrap().opcode, att::AttOpcode::HandleValueIndication); 533 }); 534 } 535 536 #[test] test_single_indication_pending_mtu_fail()537 fn test_single_indication_pending_mtu_fail() { 538 block_on_locally(async { 539 // arrange: pending MTU negotiation 540 let (conn, _) = open_connection(); 541 conn.as_ref().handle_mtu_event(MtuEvent::OutgoingRequest).unwrap(); 542 543 // act: try to send an indication with a large payload size 544 let pending_mtu = 545 try_await(conn.as_ref().send_indication(VALID_HANDLE, (1..50).collect())) 546 .await 547 .unwrap_err(); 548 // then resolve the MTU negotiation with a small MTU 549 conn.as_ref().handle_mtu_event(MtuEvent::IncomingResponse(32)).unwrap(); 550 551 // assert: the indication failed to send 552 assert!(matches!(pending_mtu.await, Err(IndicationError::DataExceedsMtu { .. }))); 553 }); 554 } 555 556 #[test] test_server_transaction_pending_mtu()557 fn test_server_transaction_pending_mtu() { 558 block_on_locally(async { 559 // arrange: pending MTU negotiation 560 let (conn, mut rx) = open_connection(); 561 conn.as_ref().handle_mtu_event(MtuEvent::OutgoingRequest).unwrap(); 562 563 // act: send server packet 564 conn.as_ref().handle_packet( 565 att::AttReadRequest { attribute_handle: VALID_HANDLE.into() }.try_into().unwrap(), 566 ); 567 568 // assert: that we reply even while the MTU req is outstanding 569 assert_eq!(rx.recv().await.unwrap().opcode, att::AttOpcode::ReadResponse); 570 }); 571 } 572 573 #[test] test_queued_indication_pending_mtu_uses_mtu_on_dequeue()574 fn test_queued_indication_pending_mtu_uses_mtu_on_dequeue() { 575 block_on_locally(async { 576 // arrange: an outstanding indication 577 let (conn, mut rx) = open_connection(); 578 let _ = try_await(conn.as_ref().send_indication(VALID_HANDLE, vec![1, 2, 3])).await; 579 rx.recv().await.unwrap(); // flush rx_queue 580 581 // act: enqueue an indication with a large payload 582 let _ = try_await(conn.as_ref().send_indication(VALID_HANDLE, (1..50).collect())).await; 583 // then perform MTU negotiation to upgrade to a large MTU 584 conn.as_ref().handle_mtu_event(MtuEvent::OutgoingRequest).unwrap(); 585 conn.as_ref().handle_mtu_event(MtuEvent::IncomingResponse(512)).unwrap(); 586 // finally resolve the first indication, so the second indication can be sent 587 conn.as_ref().handle_packet(att::AttHandleValueConfirmation {}.try_into().unwrap()); 588 589 // assert: the second indication successfully sent (so it used the new MTU) 590 assert_eq!(rx.recv().await.unwrap().opcode, att::AttOpcode::HandleValueIndication); 591 }); 592 } 593 } 594