1# Lint as: python2, python3 2# Copyright 2020 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6"""Server side bluetooth tests on Advertisement Monitor API""" 7 8import time 9import logging 10import array 11 12from autotest_lib.client.bin import utils 13from autotest_lib.server.cros.bluetooth import bluetooth_adapter_tests 14from autotest_lib.client.common_lib import error 15 16 17# List of the controllers that does not support Adv Monitor HW offloading. 18ADVMON_UNSUPPORTED_CHIPSETS = [ 19 'BCM-43540', 'BCM-43560', 20 'Intel-AC7260', 'Intel-AC7265', 21 'MVL-8797', 'MVL-8887', 'MVL-8897', 'MVL-8997', 22 'Realtek-RTL8822C-USB', 'Realtek-RTL8822C-UART', 'Realtek-RTL8852A-USB', 23 'QCA-6174A-3-UART', 'QCA-6174A-5-USB' 24] 25 26 27class TestMonitor(): 28 """Local object hosting the test values for Advertisement Monitor object. 29 30 This class holds the values of parameters for creating an Advertisement 31 Monitor object. 32 33 """ 34 35 # Index of the pattern data in the patterns filter. 36 PATTERN_DATA_IDX = 2 37 38 def __init__(self, app_id): 39 """Construction of a local monitor object. 40 41 @param app_id: the app id associated with the monitor. 42 43 """ 44 self.type = None 45 self.rssi = [] 46 self.sampling_period = 256 # unset Sampling Period 47 self.patterns = [] 48 self.monitor_id = None 49 self.app_id = app_id 50 51 52 def _bytes(self, str_data): 53 """Convert string data to byte array. 54 55 @param str_data: the string data. 56 57 @returns: the byte array. 58 59 """ 60 return [b for b in array.array('B', str_data.encode())] 61 62 63 def update_type(self, monitor_type): 64 """Update the monitor type. 65 66 @param monitor_type: type of the monitor. 67 68 """ 69 self.type = monitor_type 70 71 72 def update_rssi(self, monitor_rssi): 73 """Update the RSSI filter values. 74 75 @param rssi: the list of rssi threshold and timeout values. 76 77 """ 78 self.rssi = monitor_rssi 79 80 81 def update_sampling_period(self, monitor_sampling_period): 82 """Update the sampling period value. 83 84 @param monitor_sampling_period: sampling period value. 85 86 """ 87 self.sampling_period = monitor_sampling_period 88 89 90 def update_patterns(self, monitor_patterns): 91 """Update the content filter patterns. 92 93 @param patterns: the list of start position, ad type and patterns. 94 95 """ 96 # Convert string patterns to byte array, if any. 97 for pattern in monitor_patterns: 98 if isinstance(pattern[self.PATTERN_DATA_IDX], str): 99 pattern[self.PATTERN_DATA_IDX] = self._bytes( 100 pattern[self.PATTERN_DATA_IDX]) 101 102 self.patterns = monitor_patterns 103 104 105 def update_monitor_id(self, monitor_id): 106 """Store the monitor id returned by add_monitor(). 107 108 @param monitor_id: the monitor id. 109 110 """ 111 self.monitor_id = monitor_id 112 113 114 def get_monitor_data(self): 115 """Return the monitor parameters. 116 117 @returns: List containing the monitor data. 118 119 """ 120 return [self.type, self.rssi + [self.sampling_period], self.patterns] 121 122 123 def get_monitor_id(self): 124 """Return the monitor id. 125 126 @returns: monitor id if monitor is already added, None otherwise. 127 128 """ 129 return self.monitor_id 130 131 132 def get_app_id(self): 133 """Return the application id. 134 135 @returns: app id associated to the monitor object. 136 137 """ 138 return self.app_id 139 140 141class BluetoothAdapterAdvMonitorTests( 142 bluetooth_adapter_tests.BluetoothAdapterTests): 143 """Server side bluetooth adapter advertising Test. 144 145 This class comprises a number of test cases to verify bluetooth 146 Advertisement Monitor API. 147 148 Refer to the test plan doc for more details: go/bt-advmon-api-test-plan 149 150 """ 151 152 ADD_MONITOR_POLLING_TIMEOUT_SECS = 3 153 ADD_MONITOR_POLLING_SLEEP_SECS = 1 154 PAIR_TEST_SLEEP_SECS = 5 155 156 # Refer doc/advertisement-monitor-api.txt for more info about unset values. 157 UNSET_RSSI = 127 158 UNSET_TIMEOUT = 0 159 UNSET_SAMPLING_PERIOD = 256 160 161 # Non-zero count value is used to indicate the case where multiple 162 # DeviceFound/DeviceLost events are expected to occur. 163 MULTIPLE_EVENTS = -1 164 165 # Number of cycle to observe during a test 166 INTERLEAVE_SCAN_TEST_CYCLE = 10 167 # Acceptable extra delay of interleave scan duration, in sec 168 INTERLEAVE_SCAN_DURATION_TOLERANCE = 0.1 169 # Acceptable delay of cancelling interleave scan, in sec 170 INTERLEAVE_SCAN_CANCEL_TOLERANCE = 2 171 # Acceptable extra/missing cycles in interleave scan 172 INTERLEAVE_SCAN_CYCLE_NUM_TOLERANCE = 2 173 174 # Duration of kernel perform 'start discovery', in sec 175 DISCOVERY_DURATION = 10.24 176 177 # Acceptable difference between the first RSSI sample and following one. 178 # LOW_RSSI_THRESHOLD_TOLERANCE must be larger than 179 # HIGH_RSSI_THRESHOLD_TOLERANCE 180 HIGH_RSSI_THRESHOLD_TOLERANCE = 20 181 LOW_RSSI_THRESHOLD_TOLERANCE = 40 182 183 test_case_log = bluetooth_adapter_tests.test_case_log 184 test_retry_and_log = bluetooth_adapter_tests.test_retry_and_log 185 186 187 def advmon_check_manager_interface_exist(self): 188 """Check if AdvertisementMonitorManager1 interface is available. 189 190 @returns: True if Manager interface is available, False otherwise. 191 192 """ 193 return self.bluetooth_facade.advmon_check_manager_interface_exist() 194 195 196 def read_supported_types(self): 197 """Read the Advertisement Monitor supported monitor types. 198 199 @returns: List of supported advertisement monitor types. 200 201 """ 202 return self.bluetooth_facade.advmon_read_supported_types() 203 204 205 def read_supported_features(self): 206 """Read the Advertisement Monitor supported features. 207 208 @returns: List of supported advertisement monitor features. 209 210 """ 211 return self.bluetooth_facade.advmon_read_supported_features() 212 213 214 def create_app(self): 215 """Create an advertisement monitor app. 216 217 @returns: app id, once the app is created. 218 219 """ 220 return self.bluetooth_facade.advmon_create_app() 221 222 223 def exit_app(self, app_id): 224 """Exit an advertisement monitor app. 225 226 @param app_id: the app id. 227 228 @returns: True on success, False otherwise. 229 230 """ 231 return self.bluetooth_facade.advmon_exit_app(app_id) 232 233 234 def kill_app(self, app_id): 235 """Kill an advertisement monitor app by sending SIGKILL. 236 237 @param app_id: the app id. 238 239 @returns: True on success, False otherwise. 240 241 """ 242 return self.bluetooth_facade.advmon_kill_app(app_id) 243 244 245 def register_app(self, app_id): 246 """Register an advertisement monitor app. 247 248 @param app_id: the app id. 249 250 @returns: True on success, False otherwise. 251 252 """ 253 return self.bluetooth_facade.advmon_register_app(app_id) 254 255 256 def unregister_app(self, app_id): 257 """Unregister an advertisement monitor app. 258 259 @param app_id: the app id. 260 261 @returns: True on success, False otherwise. 262 263 """ 264 return self.bluetooth_facade.advmon_unregister_app(app_id) 265 266 267 def add_monitor(self, app_id, monitor_data): 268 """Create an Advertisement Monitor object. 269 270 @param app_id: the app id. 271 @param monitor_data: the list containing monitor type, RSSI filter 272 values and patterns. 273 274 @returns: monitor id, once the monitor is created, None otherwise. 275 276 """ 277 return self.bluetooth_facade.advmon_add_monitor(app_id, monitor_data) 278 279 280 def remove_monitor(self, app_id, monitor_id): 281 """Remove the Advertisement Monitor object. 282 283 @param app_id: the app id. 284 @param monitor_id: the monitor id. 285 286 @returns: True on success, False otherwise. 287 288 """ 289 return self.bluetooth_facade.advmon_remove_monitor(app_id, monitor_id) 290 291 292 def get_event_count(self, app_id, monitor_id, event='All'): 293 """Read the count of a particular event on the given monitor. 294 295 @param app_id: the app id. 296 @param monitor_id: the monitor id. 297 @param event: name of the specific event or 'All' for all events. 298 299 @returns: count of the specific event or dict of counts of all events. 300 301 """ 302 return self.bluetooth_facade.advmon_get_event_count(app_id, 303 monitor_id, 304 event) 305 306 307 def reset_event_count(self, app_id, monitor_id, event='All'): 308 """Reset the count of a particular event on the given monitor. 309 310 @param app_id: the app id. 311 @param monitor_id: the monitor id. 312 @param event: name of the specific event or 'All' for all events. 313 314 @returns: True on success, False otherwise. 315 316 """ 317 return self.bluetooth_facade.advmon_reset_event_count(app_id, 318 monitor_id, 319 event) 320 321 def set_target_devices(self, app_id, monitor_id, devices): 322 """Set the target devices to the given monitor. 323 324 DeviceFound and DeviceLost will only be counted if it is triggered by a 325 target device. 326 327 @param app_id: the app id. 328 @param monitor_id: the monitor id. 329 @param devices: a list of devices in MAC address 330 331 @returns: True on success, False otherwise. 332 333 """ 334 return self.bluetooth_facade.advmon_set_target_devices( 335 app_id, monitor_id, devices) 336 337 def interleave_logger_start(self): 338 """ Start interleave logger recording 339 """ 340 self.bluetooth_facade.advmon_interleave_scan_logger_start() 341 342 def interleave_logger_stop(self): 343 """ Stop interleave logger recording 344 345 @returns: True if logs were successfully collected, 346 False otherwise. 347 348 """ 349 return self.bluetooth_facade.advmon_interleave_scan_logger_stop() 350 351 def interleave_logger_get_records(self): 352 """ Get records in previous log collections 353 354 @returns: a list of records, where each item is a record of 355 interleave |state| and the |time| the state starts. 356 |state| could be {'no filter', 'allowlist'} 357 |time| is system time in sec 358 359 """ 360 return self.bluetooth_facade.\ 361 advmon_interleave_scan_logger_get_records() 362 363 def interleave_logger_get_cancel_event(self): 364 """ Get cancel event in previous log collections 365 366 @returns: the first cancel event in the collections, 367 None if no cancel event was found 368 369 """ 370 events = self.bluetooth_facade.\ 371 advmon_interleave_scan_logger_get_cancel_events() 372 if len(events) == 0: 373 event = None 374 else: 375 event = events[0] 376 if len(events) > 1: 377 logging.warning('More than one cancel events found %s', events) 378 return event 379 380 def interleave_scan_get_durations(self): 381 """Get durations of allowlist scan and no filter scan 382 383 @returns: a dict of {'allowlist': allowlist_duration, 384 'no filter': no_filter_duration}, 385 or None if something went wrong 386 """ 387 return self.bluetooth_facade.advmon_interleave_scan_get_durations() 388 389 @test_retry_and_log(False) 390 def test_supported_types(self): 391 """Test supported monitor types. 392 393 @returns: True on success, False otherwise. 394 395 """ 396 supported_types = self.read_supported_types() 397 for supported_type in supported_types: 398 logging.info('type: %s', supported_type) 399 400 # TODO(b/169658213) - add check for supported types. 401 return True 402 403 404 @test_retry_and_log(False) 405 def test_supported_features(self): 406 """Test supported features. 407 408 @returns: True on success, False otherwise. 409 410 """ 411 supported_features = self.read_supported_features() 412 for supported_feature in supported_features: 413 logging.info('feature: %s', supported_feature) 414 415 # TODO(b/169658213) - add check for supported features. 416 return True 417 418 419 def test_is_controller_offloading_supported(self): 420 """Check if the controller supports HW offloading. 421 422 @raises: TestFail if the controller is expected to support Monitor 423 Offloading but the support is missing. 424 425 """ 426 chipset = self.bluetooth_facade.get_chipset_name() 427 if chipset in ADVMON_UNSUPPORTED_CHIPSETS: 428 logging.warning('Controller support check skipped for %s', chipset) 429 else: 430 supported_features = self.read_supported_features() 431 if not supported_features: 432 logging.error('Controller support missing on %s', chipset) 433 raise error.TestFail('Controller offloading not supported') 434 logging.info('Controller offloading supported on %s', chipset) 435 436 437 def test_is_adv_monitoring_supported(self): 438 """Check if Adv Monitor API is supported. 439 440 If AdvMonitor API is not supported by the platform, 441 AdvertisementMonitorManager1 interface won't be exposed by 442 bluetoothd. In such case, skip the test and raise TestNA. 443 444 @raises: TestNA if Adv Monitor API is not supported. 445 446 """ 447 if not self.advmon_check_manager_interface_exist(): 448 logging.info('Advertisement Monitor API not supported') 449 raise error.TestNAError('Advertisement Monitor API not supported') 450 451 self.test_is_controller_offloading_supported() 452 453 454 @test_retry_and_log(False) 455 def test_exit_app(self, app_id): 456 """Test exit application. 457 458 @param app_id: the app id. 459 460 @returns: True on success, False otherwise. 461 462 """ 463 return self.exit_app(app_id) 464 465 466 @test_retry_and_log(False) 467 def test_kill_app(self, app_id): 468 """Test kill application. 469 470 @param app_id: the app id. 471 472 @returns: True on success, False otherwise. 473 474 """ 475 return self.kill_app(app_id) 476 477 478 @test_retry_and_log(False) 479 def test_register_app(self, app_id, expected=True): 480 """Test register application. 481 482 @param app_id: the app id. 483 @param expected: expected result of the RegisterMonitor method. 484 485 @returns: True on success, False otherwise. 486 487 """ 488 return self.register_app(app_id) == expected 489 490 491 @test_retry_and_log(False) 492 def test_unregister_app(self, app_id, expected=True): 493 """Test unregister application. 494 495 @param app_id: the app id. 496 @param expected: expected result of the UnregisterMonitor method. 497 498 @returns: True on success, False otherwise. 499 500 """ 501 return self.unregister_app(app_id) == expected 502 503 504 @test_retry_and_log(False) 505 def test_monitor_activate(self, monitor, expected): 506 """Test if the Activate method on the monitor has been invoked or not. 507 508 @param monitor: the local monitor object. 509 @param expected: expected state of the Activate event. 510 511 @returns: True on success, False otherwise. 512 513 """ 514 app_id = monitor.get_app_id() 515 monitor_id = monitor.get_monitor_id() 516 if monitor_id is None: 517 return False 518 519 def _check_activate(): 520 """Handler for the activate event.""" 521 return self.get_event_count(app_id, monitor_id, 'Activate') == 1 522 523 activated = False 524 try: 525 utils.poll_for_condition( 526 condition=_check_activate, 527 timeout=self.ADD_MONITOR_POLLING_TIMEOUT_SECS, 528 sleep_interval=self.ADD_MONITOR_POLLING_SLEEP_SECS, 529 desc='Waiting for activate') 530 activated = True 531 except utils.TimeoutError as e: 532 logging.error('activate: %s', e) 533 except: 534 logging.error('activate: unexpected error') 535 536 return expected == activated 537 538 539 @test_retry_and_log(False) 540 def test_monitor_release(self, monitor, expected): 541 """Test if the Release method on the monitor has been invoked or not. 542 543 @param monitor: the local monitor object. 544 @param expected: expected state of the Release event. 545 546 @returns: True on success, False otherwise. 547 548 """ 549 app_id = monitor.get_app_id() 550 monitor_id = monitor.get_monitor_id() 551 if monitor_id is None: 552 return False 553 554 def _check_release(): 555 """Handler for the release event.""" 556 return self.get_event_count(app_id, monitor_id, 'Release') == 1 557 558 released = False 559 try: 560 utils.poll_for_condition( 561 condition=_check_release, 562 timeout=self.ADD_MONITOR_POLLING_TIMEOUT_SECS, 563 sleep_interval=self.ADD_MONITOR_POLLING_SLEEP_SECS, 564 desc='Waiting for release') 565 released = True 566 except utils.TimeoutError as e: 567 logging.error('release: %s', e) 568 except Exception as e: 569 logging.error('release: %s', e) 570 except: 571 logging.error('release: unexpected error') 572 573 return expected == released 574 575 576 @test_retry_and_log(True) 577 def test_device_found(self, monitor, count, delay=0): 578 """Test if the DeviceFound method on a monitor has been invoked or not. 579 580 @param monitor: the local monitor object. 581 @param count: expected count of the DeviceFound events. 582 @param delay: wait until 'delay' seconds before reading the event count. 583 584 @returns: True on success, False otherwise. 585 586 """ 587 app_id = monitor.get_app_id() 588 monitor_id = monitor.get_monitor_id() 589 if monitor_id is None: 590 return False 591 592 if delay: 593 time.sleep(delay) 594 595 checked_count = self.get_event_count(app_id, monitor_id, 'DeviceFound') 596 597 if count == self.MULTIPLE_EVENTS: 598 self.results = { 599 'Found events': checked_count, 600 'Expected events': 'multiple' 601 } 602 603 return checked_count > 0 604 605 self.results = { 606 'Found events': checked_count, 607 'Expected events': count 608 } 609 610 return checked_count == count 611 612 613 @test_retry_and_log(False) 614 def test_device_lost(self, monitor, count, delay=0): 615 """Test if the DeviceLost method on a monitor has been invoked or not. 616 617 @param monitor: the local monitor object. 618 @param count: expected count of the DeviceLost events. 619 @param delay: wait until 'delay' seconds before reading the event count. 620 621 @returns: True on success, False otherwise. 622 623 """ 624 app_id = monitor.get_app_id() 625 monitor_id = monitor.get_monitor_id() 626 if monitor_id is None: 627 return False 628 629 if delay: 630 time.sleep(delay) 631 632 checked_count = self.get_event_count(app_id, monitor_id, 'DeviceLost') 633 634 if count == self.MULTIPLE_EVENTS: 635 self.results = { 636 'Found events': checked_count, 637 'Expected events': 'multiple' 638 } 639 640 return checked_count > 1 641 642 self.results = { 643 'Found events': checked_count, 644 'Expected events': count 645 } 646 647 return checked_count == count 648 649 650 @test_retry_and_log(False) 651 def test_reset_event_count(self, monitor, event='All'): 652 """Test resetting count of a particular event on the given monitor. 653 654 @param monitor: the local monitor object. 655 @param event: name of the specific event or 'All' for all events. 656 657 @returns: True on success, False otherwise. 658 659 """ 660 return self.reset_event_count(monitor.get_app_id(), 661 monitor.get_monitor_id(), 662 event) 663 664 665 @test_retry_and_log(False) 666 def test_add_monitor(self, monitor, expected_activate=None, 667 expected_release=None): 668 """Test adding a monitor. 669 670 @param monitor: the local monitor object. 671 @param expected_activate: expected state of the Activate event. 672 @param expected_release: expected state of the Release event. 673 674 @returns: True on success, False otherwise. 675 676 """ 677 app_id = monitor.get_app_id() 678 monitor_id = self.add_monitor(app_id, monitor.get_monitor_data()) 679 if monitor_id is None: 680 return False 681 monitor.update_monitor_id(monitor_id) 682 683 checked_activate = True 684 if expected_activate is not None: 685 checked_activate = self.test_monitor_activate( 686 monitor, expected_activate) 687 688 checked_release = True 689 if expected_release is not None: 690 checked_release = self.test_monitor_release( 691 monitor, expected_release) 692 693 if self.get_event_count(app_id, monitor_id, 'Release') != 0: 694 self.remove_monitor(app_id, monitor_id) 695 monitor.update_monitor_id(None) 696 697 # Set the target devices so that AdvMon ignores Adv from other devices 698 target_devices = [] 699 700 if hasattr(self, 'peer_mouse'): 701 target_devices.append(self.peer_mouse.address) 702 703 if hasattr(self, 'peer_keybd'): 704 target_devices.append(self.peer_keybd.address) 705 706 self.set_target_devices(app_id, monitor_id, target_devices) 707 708 self.results = { 709 'activated': checked_activate, 710 'released': checked_release 711 } 712 return all(self.results.values()) 713 714 715 @test_retry_and_log(False) 716 def test_remove_monitor(self, monitor): 717 """Test removing a monitor. 718 719 @param monitor: the local monitor object. 720 721 @returns: True on success, False otherwise. 722 723 """ 724 app_id = monitor.get_app_id() 725 monitor_id = monitor.get_monitor_id() 726 if monitor_id is None: 727 return False 728 729 ret = self.remove_monitor(app_id, monitor_id) 730 monitor.update_monitor_id(None) 731 732 if ret is None: 733 return False 734 735 return True 736 737 738 @test_retry_and_log(False) 739 def test_setup_peer_devices(self): 740 """Test availability of the peer devices. 741 742 @returns: True on success, False otherwise. 743 744 """ 745 self.peer_keybd = None 746 self.peer_mouse = None 747 748 self.LOW_RSSI = None 749 self.HIGH_RSSI = None 750 751 for device_type, device_list in self.devices.items(): 752 for device in device_list: 753 if device_type is 'BLE_KEYBOARD': 754 self.peer_keybd = device 755 elif device_type is 'BLE_MOUSE': 756 self.peer_mouse = device 757 758 if self.peer_keybd is None or self.peer_mouse is None: 759 raise error.TestNAError('some peer device is not found') 760 761 # Setup default RSSI threshold based on real RSSI range 762 keybd_rssi = self.get_device_sample_rssi(self.peer_keybd) 763 mouse_rssi = self.get_device_sample_rssi(self.peer_mouse) 764 765 if mouse_rssi is None or keybd_rssi is None: 766 raise error.TestNAError('failed to examine peer RSSI') 767 768 min_rssi = min(mouse_rssi, keybd_rssi) 769 770 # Make RSSI threshold tolerable. 771 self.HIGH_RSSI = max(min_rssi - self.HIGH_RSSI_THRESHOLD_TOLERANCE, 772 -126) 773 self.LOW_RSSI = max(min_rssi - self.LOW_RSSI_THRESHOLD_TOLERANCE, -127) 774 775 self.test_stop_peer_device_adv(self.peer_keybd) 776 self.test_stop_peer_device_adv(self.peer_mouse) 777 778 return True 779 780 781 @test_retry_and_log(False) 782 def test_start_peer_device_adv(self, device, duration=0): 783 """Test enabling the peer device advertisements. 784 785 @param device: the device object. 786 @param duration: the duration of the advertisement. 787 788 @returns: True on success, False otherwise. 789 790 """ 791 ret = self.test_device_set_discoverable(device, True) 792 793 if duration: 794 time.sleep(duration) 795 796 return ret 797 798 799 @test_retry_and_log(False) 800 def test_stop_peer_device_adv(self, device, duration=0): 801 """Test disabling the peer device advertisements. 802 803 @param device: the device object. 804 @param duration: the duration of the advertisement disable. 805 806 @returns: True on success, False otherwise. 807 808 """ 809 ret = self.test_device_set_discoverable(device, False) 810 811 if duration: 812 time.sleep(duration) 813 814 return ret 815 816 def check_records_interleaving(self, durations, records, expect_cycles): 817 """ Check the state of records is interleaving and also the duration is 818 as expected. 819 820 @param durations: a dict of {'allowlist': allowlist_duration, 821 'no filter': no_filter_duration} 822 @param records: a list of records 823 824 @returns: a dict of {'Interleaved': record_state_is_interleaved, 825 'Span within range': duration_is_expected} 826 827 """ 828 829 actual_cycle = len(records) // len(list(durations.keys())) 830 offset = self.INTERLEAVE_SCAN_CYCLE_NUM_TOLERANCE 831 expect_cycle_lowerbound = max(1, expect_cycles - offset) 832 expect_cycle_upperbound = expect_cycles + offset 833 enough_cycle_num = (actual_cycle >= expect_cycle_lowerbound 834 and actual_cycle <= expect_cycle_upperbound) 835 interleaved = True 836 span_within_range = True 837 expected_state = None 838 839 def _next_state(state): 840 if state == 'allowlist': 841 return 'no filter' 842 elif state == 'no filter': 843 return 'allowlist' 844 else: 845 logging.warning('Unexpected state %s', state) 846 return None 847 848 for i, record in enumerate(records): 849 state = record['state'] 850 nstate = _next_state(state) 851 852 # We can't count span on single data point and expected_state 853 # hasn't set 854 if i != 0: 855 span = (record['time'] - records[i - 1]['time']) 856 857 if state != expected_state: 858 interleaved = False 859 860 if span < durations[nstate] -\ 861 self.INTERLEAVE_SCAN_DURATION_TOLERANCE: 862 span_within_range = False 863 864 if span > durations[nstate] +\ 865 self.INTERLEAVE_SCAN_DURATION_TOLERANCE: 866 span_within_range = False 867 868 expected_state = nstate 869 870 return { 871 'Enough cycle number': enough_cycle_num, 872 'Interleaved': interleaved, 873 'Span within range': span_within_range 874 } 875 876 def check_records_paused(self, records, cancel_event, expect_paused_time, 877 expect_resume): 878 """ Check if the interleave scan is paused 879 880 @param records: a list of records 881 @param cancel_event: the timestamp interleave was canceled 882 @param expect_paused_time: minimum duration of interleave scan paused 883 @param expect_resume: True if interleave scan should restart, 884 False if ***we don't care*** 885 886 @returns: a dict of {'Cancel event': (bool), 887 'Non-empty records before paused': (bool), 888 'Non-empty records after paused': (bool), 889 'Paused enough time': (bool) 890 } 891 Note: some entries might not exist if it doesn't make sense 892 in that case. 893 894 """ 895 896 result = {} 897 898 result.update({'Cancel event': cancel_event is not None}) 899 if cancel_event is None: 900 return result 901 902 canceled_time = cancel_event + self.INTERLEAVE_SCAN_CANCEL_TOLERANCE 903 904 before_paused_rec = [r for r in records if r['time'] < canceled_time] 905 after_paused_rec = [r for r in records if r['time'] >= canceled_time] 906 907 result.update({ 908 'Non-empty records before paused': 909 len(before_paused_rec) != 0 910 }) 911 912 if expect_resume: 913 result.update({ 914 'Non-empty records after paused': 915 len(after_paused_rec) != 0 916 }) 917 918 if len(before_paused_rec) > 0 and len(after_paused_rec) > 0: 919 # Records are stored chronologically. 920 last_time_before_paused = before_paused_rec[-1]['time'] 921 first_time_after_paused = after_paused_rec[0]['time'] 922 paused_time = first_time_after_paused - last_time_before_paused 923 result.update( 924 {'Paused enough time': paused_time >= expect_paused_time}) 925 926 return result 927 928 def get_interleave_scan_durations(self): 929 """ Get interleave scan duration. 930 931 @returns: a dict of {'allowlist': allowlist_duration, 932 'no filter': no_filter_duration} 933 934 """ 935 936 durations = self.interleave_scan_get_durations() 937 if durations is None: 938 raise error.TestFail( 939 'Unexpected error: failed to get interleave durations') 940 941 # Change the unit from msec to second for future convenience. 942 durations = {key: value * 0.001 for key, value in durations.items()} 943 944 return durations 945 946 @test_retry_and_log(False) 947 def test_interleaving_state(self, 948 expect_true, 949 cycles=INTERLEAVE_SCAN_TEST_CYCLE): 950 """ Test for checking if kernel is doing interleave scan or not. 951 952 @params expect_true: True if kernel should be running interleave scan 953 False if kernel shouldn't. 954 @params cycles: number of cycles to collect logs 955 956 @returns: True on success, False otherwise. 957 958 """ 959 durations = self.get_interleave_scan_durations() 960 interleave_period = sum(durations.values()) 961 log_time = interleave_period * cycles 962 self.interleave_logger_start() 963 time.sleep(log_time) 964 self.interleave_logger_stop() 965 records = self.interleave_logger_get_records() 966 967 logging.debug(records) 968 969 if not expect_true: 970 self.results = {'No records': len(records) == 0} 971 else: 972 self.results = self.check_records_interleaving( 973 durations, records, cycles) 974 975 return all(self.results.values()) 976 977 @test_retry_and_log(False) 978 def test_interleaving_suspend_resume(self, expect_true): 979 """ Test for checking if kernel paused interleave scan during system 980 suspended. 981 982 @returns: True on success, False otherwise. 983 984 """ 985 durations = self.get_interleave_scan_durations() 986 interleave_period = sum(durations.values()) 987 988 # make sure suspend time is long enough to verify there is no 989 # interleaving during suspended 990 expect_suspend_time = max(self.SUSPEND_TIME_SECS, 991 2 * interleave_period) 992 993 # make sure we'll get some records before/after system suspended 994 extra_sleep_time = 2 * interleave_period 995 996 self.interleave_logger_start() 997 time.sleep(extra_sleep_time) 998 self.suspend_resume(suspend_time=expect_suspend_time) 999 time.sleep(extra_sleep_time) 1000 self.interleave_logger_stop() 1001 records = self.interleave_logger_get_records() 1002 cancel_event = self.interleave_logger_get_cancel_event() 1003 1004 logging.debug(records) 1005 logging.debug(cancel_event) 1006 1007 if not expect_true: 1008 self.results = {'No records': len(records) == 0} 1009 else: 1010 # Currently resume time is not very reliable. It is likely the 1011 # actual time in sleeping is less than expect_suspend_time. 1012 # Check the interleave scan paused for at least one cycle long 1013 # instead. 1014 self.results = self.check_records_paused(records, cancel_event, 1015 interleave_period, True) 1016 return all(self.results.values()) 1017 1018 @test_retry_and_log(False) 1019 def test_interleaving_active_scan_cycle(self, expect_true): 1020 """ Test for checking if kernel paused interleave scan during active 1021 scan. 1022 1023 @returns: True on success, False otherwise. 1024 1025 """ 1026 durations = self.get_interleave_scan_durations() 1027 interleave_period = sum(durations.values()) 1028 1029 # make sure we'll get some records before/after active scan 1030 extra_sleep_time = 2 * interleave_period 1031 1032 self.interleave_logger_start() 1033 time.sleep(extra_sleep_time) 1034 self.test_start_discovery() 1035 time.sleep(extra_sleep_time + self.INTERLEAVE_SCAN_CANCEL_TOLERANCE) 1036 self.interleave_logger_stop() 1037 records = self.interleave_logger_get_records() 1038 cancel_event = self.interleave_logger_get_cancel_event() 1039 1040 logging.debug(records) 1041 logging.debug(cancel_event) 1042 1043 if not expect_true: 1044 self.results = {'No records': len(records) == 0} 1045 else: 1046 # BlueZ pauses discovery for every DISCOVERY_DURATION then restarts 1047 # it 5 seconds later. Interleave scan also get restarted during the 1048 # paused time. 1049 self.results = self.check_records_paused(records, cancel_event, 1050 self.DISCOVERY_DURATION, 1051 False) 1052 self.test_stop_discovery() 1053 return all(self.results.values()) 1054 1055 def advmon_test_monitor_creation(self): 1056 """Test case: MONITOR_CREATION 1057 1058 Validate register/unregister app and create/remove monitor. 1059 1060 """ 1061 self.test_is_adv_monitoring_supported() 1062 1063 # Create a test app instance. 1064 app1 = self.create_app() 1065 1066 monitor1 = TestMonitor(app1) 1067 monitor1.update_type('or_patterns') 1068 monitor1.update_rssi([-40, 5, -60, 5]) 1069 monitor1.update_patterns([ 1070 [0, 0x19, [0xc2, 0x03]], 1071 ]) 1072 1073 monitor2 = TestMonitor(app1) 1074 monitor2.update_type('or_patterns') 1075 monitor2.update_rssi([-40, 10, -60, 10]) 1076 monitor2.update_patterns([ 1077 [0, 0x03, [0x12, 0x18]], 1078 ]) 1079 1080 # Read supported types and features, should not fail. 1081 self.test_supported_types() 1082 self.test_supported_features() 1083 1084 # Activate/Release should not get called. 1085 self.test_add_monitor(monitor1, 1086 expected_activate=False, 1087 expected_release=False) 1088 1089 # Register the app, should not fail. 1090 self.test_register_app(app1) 1091 1092 # Already registered app path, should fail with AlreadyExists. 1093 self.test_register_app(app1, expected=False) 1094 1095 # Activate should get called for the monitor added before register app. 1096 self.test_monitor_activate(monitor1, expected=True) 1097 1098 # Correct monitor parameters, activate should get called. 1099 self.test_add_monitor(monitor2, expected_activate=True) 1100 1101 # Remove a monitor, should not fail. 1102 self.test_remove_monitor(monitor1) 1103 1104 # Unregister the app, should not fail. 1105 self.test_unregister_app(app1) 1106 1107 # Already unregistered app path, should fail with DoesNotExists. 1108 self.test_unregister_app(app1, expected=False) 1109 1110 # Release should get called for a monitor not removed before unregister. 1111 self.test_monitor_release(monitor2, expected=True) 1112 1113 # Remove another monitor, should not fail. 1114 self.test_remove_monitor(monitor2) 1115 1116 # Terminate the test app instance. 1117 self.test_exit_app(app1) 1118 1119 1120 def advmon_test_monitor_validity(self): 1121 """Test case: MONITOR_VALIDITY 1122 1123 Validate monitor parameters - monitor type, patterns, RSSI filter 1124 values. 1125 1126 """ 1127 self.test_is_adv_monitoring_supported() 1128 1129 # Create a test app instance. 1130 app1 = self.create_app() 1131 1132 monitor1 = TestMonitor(app1) 1133 monitor1.update_type('incorrect_pattern') 1134 monitor1.update_rssi([-40, 5, -60, 5]) 1135 monitor1.update_patterns([ 1136 [0, 0x19, [0xc2, 0x03]], 1137 ]) 1138 1139 monitor2 = TestMonitor(app1) 1140 monitor2.update_type('or_patterns') 1141 monitor2.update_rssi([-40, 10, -60, 10]) 1142 monitor2.update_patterns([ 1143 [0, 0x03, [0x12, 0x18]], 1144 ]) 1145 1146 # Register the app, should not fail. 1147 self.test_register_app(app1) 1148 1149 # Incorrect monitor type, release should get called. 1150 self.test_add_monitor(monitor1, expected_release=True) 1151 1152 # Incorrect rssi parameters, release should get called. 1153 monitor2.update_rssi([40, 10, -60, 10]) 1154 self.test_add_monitor(monitor2, expected_release=True) 1155 1156 monitor2.update_rssi([-140, 10, -60, 10]) 1157 self.test_add_monitor(monitor2, expected_release=True) 1158 1159 monitor2.update_rssi([-40, 10, 60, 10]) 1160 self.test_add_monitor(monitor2, expected_release=True) 1161 1162 monitor2.update_rssi([-40, 10, -160, 10]) 1163 self.test_add_monitor(monitor2, expected_release=True) 1164 1165 monitor2.update_rssi([-60, 10, -40, 10]) 1166 self.test_add_monitor(monitor2, expected_release=True) 1167 1168 # Correct rssi parameters, activate should get called. 1169 monitor2.update_rssi([self.UNSET_RSSI, 10, -60, 10]) 1170 self.test_add_monitor(monitor2, expected_activate=True) 1171 self.test_remove_monitor(monitor2) 1172 1173 monitor2.update_rssi([-40, self.UNSET_TIMEOUT, -60, 10]) 1174 self.test_add_monitor(monitor2, expected_activate=True) 1175 self.test_remove_monitor(monitor2) 1176 1177 monitor2.update_rssi([-40, 10, self.UNSET_RSSI, 10]) 1178 self.test_add_monitor(monitor2, expected_activate=True) 1179 self.test_remove_monitor(monitor2) 1180 1181 monitor2.update_rssi([-40, 10, -60, self.UNSET_TIMEOUT]) 1182 self.test_add_monitor(monitor2, expected_activate=True) 1183 self.test_remove_monitor(monitor2) 1184 1185 # Incorrect sampling period, release should get called. 1186 monitor2.update_sampling_period(257) 1187 self.test_add_monitor(monitor2, expected_release=True) 1188 1189 # Partial RSSI filter and sampling period, activate should get called. 1190 monitor2.update_rssi([-40, 10, self.UNSET_RSSI, self.UNSET_TIMEOUT]) 1191 monitor2.update_sampling_period(self.UNSET_SAMPLING_PERIOD) 1192 self.test_add_monitor(monitor2, expected_activate=True) 1193 self.test_remove_monitor(monitor2) 1194 1195 monitor2.update_rssi([-40, 10, self.UNSET_RSSI, self.UNSET_TIMEOUT]) 1196 monitor2.update_sampling_period(5) 1197 self.test_add_monitor(monitor2, expected_activate=True) 1198 self.test_remove_monitor(monitor2) 1199 1200 monitor2.update_rssi([self.UNSET_RSSI, self.UNSET_TIMEOUT, -60, 10]) 1201 monitor2.update_sampling_period(self.UNSET_SAMPLING_PERIOD) 1202 self.test_add_monitor(monitor2, expected_activate=True) 1203 self.test_remove_monitor(monitor2) 1204 1205 monitor2.update_rssi([self.UNSET_RSSI, self.UNSET_TIMEOUT, -60, 10]) 1206 monitor2.update_sampling_period(10) 1207 self.test_add_monitor(monitor2, expected_activate=True) 1208 self.test_remove_monitor(monitor2) 1209 1210 monitor2.update_rssi([ 1211 self.UNSET_RSSI, 1212 self.UNSET_TIMEOUT, 1213 self.UNSET_RSSI, 1214 self.UNSET_TIMEOUT 1215 ]) 1216 monitor2.update_sampling_period(self.UNSET_SAMPLING_PERIOD) 1217 self.test_add_monitor(monitor2, expected_activate=True) 1218 self.test_remove_monitor(monitor2) 1219 1220 # Incorrect pattern parameters, release should get called. 1221 monitor2.update_patterns([ 1222 [32, 0x09, 'MOUSE'], 1223 ]) 1224 self.test_add_monitor(monitor2, expected_release=True) 1225 1226 monitor2.update_patterns([ 1227 [0, 0x00, 'MOUSE'], 1228 ]) 1229 self.test_add_monitor(monitor2, expected_release=True) 1230 1231 monitor2.update_patterns([ 1232 [0, 0x40, 'MOUSE'], 1233 ]) 1234 self.test_add_monitor(monitor2, expected_release=True) 1235 1236 monitor2.update_patterns([ 1237 [0, 0x09, '0123456789ABCDEF0123456789ABCDEF0'], 1238 ]) 1239 self.test_add_monitor(monitor2, expected_release=True) 1240 1241 monitor2.update_patterns([ 1242 [32, 0x09, [0xc2, 0x03]], 1243 [0, 3, [0x12, 0x18]], 1244 ]) 1245 self.test_add_monitor(monitor2, expected_release=True) 1246 1247 monitor2.update_patterns([ 1248 [0, 0x19, [0xc2, 0x03]], 1249 [0, 0x00, [0x12, 0x18]], 1250 ]) 1251 self.test_add_monitor(monitor2, expected_release=True) 1252 1253 # Correct pattern parameters, activate should get called. 1254 monitor2.update_patterns([ 1255 [0, 0x09, 'MOUSE'], 1256 ]) 1257 self.test_add_monitor(monitor2, expected_activate=True) 1258 self.test_remove_monitor(monitor2) 1259 1260 monitor2.update_rssi([-40, 10, -60, 10]) 1261 monitor2.update_patterns([ 1262 [0, 0x19, [0xc2, 0x03]], 1263 [0, 0x03, [0x12, 0x18]], 1264 ]) 1265 self.test_add_monitor(monitor2, expected_activate=True) 1266 self.test_remove_monitor(monitor2) 1267 1268 # Unregister the app, should not fail. 1269 self.test_unregister_app(app1) 1270 1271 # Terminate the test app instance. 1272 self.test_exit_app(app1) 1273 1274 1275 def advmon_test_pattern_filter(self): 1276 """Test case: PATTERN_FILTER 1277 1278 Verify matching of advertisements w.r.t. various pattern values and 1279 different AD Data Types - Local Name Service UUID and Device Type. 1280 1281 """ 1282 self.test_is_adv_monitoring_supported() 1283 self.test_setup_peer_devices() 1284 1285 # Create a test app instance. 1286 app1 = self.create_app() 1287 1288 monitor1 = TestMonitor(app1) 1289 monitor1.update_type('or_patterns') 1290 monitor1.update_rssi([self.HIGH_RSSI, 3, self.LOW_RSSI, 3]) 1291 1292 # Register the app, should not fail. 1293 self.test_register_app(app1) 1294 1295 monitor1.update_patterns([ 1296 [5, 0x09, '_REF'], 1297 ]) 1298 self.test_add_monitor(monitor1, expected_activate=True) 1299 1300 # Local name 'KEYBD_REF' should match. 1301 self.test_start_peer_device_adv(self.peer_keybd, duration=5) 1302 self.test_device_found(monitor1, count=1) 1303 1304 # Local name 'MOUSE_REF' should match. 1305 self.test_start_peer_device_adv(self.peer_mouse, duration=5) 1306 self.test_device_found(monitor1, count=2) 1307 1308 self.test_stop_peer_device_adv(self.peer_keybd) 1309 self.test_stop_peer_device_adv(self.peer_mouse) 1310 self.test_remove_monitor(monitor1) 1311 1312 monitor1.update_patterns([ 1313 [0, 0x03, [0x12, 0x18]], 1314 ]) 1315 self.test_add_monitor(monitor1, expected_activate=True) 1316 1317 # Service UUID 0x1812 should match. 1318 self.test_start_peer_device_adv(self.peer_keybd, duration=5) 1319 self.test_device_found(monitor1, count=1) 1320 1321 # Service UUID 0x1812 should match. 1322 self.test_start_peer_device_adv(self.peer_mouse, duration=5) 1323 self.test_device_found(monitor1, count=2) 1324 1325 self.test_stop_peer_device_adv(self.peer_keybd) 1326 self.test_stop_peer_device_adv(self.peer_mouse) 1327 self.test_remove_monitor(monitor1) 1328 1329 monitor1.update_patterns([ 1330 [0, 0x19, [0xc1, 0x03]], 1331 [0, 0x09, 'MOUSE'], 1332 ]) 1333 self.test_add_monitor(monitor1, expected_activate=True) 1334 1335 # Device type 0xc103 should match. 1336 self.test_start_peer_device_adv(self.peer_keybd, duration=5) 1337 self.test_device_found(monitor1, count=1) 1338 1339 # Local name 'MOUSE_REF' should match. 1340 self.test_start_peer_device_adv(self.peer_mouse, duration=5) 1341 self.test_device_found(monitor1, count=2) 1342 1343 self.test_stop_peer_device_adv(self.peer_keybd) 1344 self.test_stop_peer_device_adv(self.peer_mouse) 1345 self.test_remove_monitor(monitor1) 1346 1347 monitor1.update_patterns([ 1348 [0, 0x19, [0xc1, 0x03]], 1349 [0, 0x19, [0xc3, 0x03]], 1350 ]) 1351 self.test_add_monitor(monitor1, expected_activate=True) 1352 1353 # Device type 0xc103 should match. 1354 self.test_start_peer_device_adv(self.peer_keybd, duration=5) 1355 self.test_device_found(monitor1, count=1) 1356 1357 # Device type 0xc203 should not match. 1358 self.test_start_peer_device_adv(self.peer_mouse, duration=5) 1359 self.test_device_found(monitor1, count=1) 1360 1361 self.test_stop_peer_device_adv(self.peer_keybd) 1362 self.test_stop_peer_device_adv(self.peer_mouse) 1363 self.test_remove_monitor(monitor1) 1364 1365 # Unregister the app, should not fail. 1366 self.test_unregister_app(app1) 1367 1368 # Terminate the test app instance. 1369 self.test_exit_app(app1) 1370 1371 1372 def advmon_test_rssi_filter_range(self): 1373 """Test case: RSSI_FILTER_RANGE 1374 1375 Verify unset RSSI filter and filter with no matching RSSI values. 1376 1377 """ 1378 self.test_is_adv_monitoring_supported() 1379 self.test_setup_peer_devices() 1380 1381 # Create a test app instance. 1382 app1 = self.create_app() 1383 1384 monitor1 = TestMonitor(app1) 1385 monitor1.update_type('or_patterns') 1386 monitor1.update_patterns([ 1387 [0, 0x03, [0x12, 0x18]], 1388 ]) 1389 1390 # Register the app, should not fail. 1391 self.test_register_app(app1) 1392 1393 monitor1.update_rssi([ 1394 self.UNSET_RSSI, 1395 self.UNSET_TIMEOUT, 1396 self.UNSET_RSSI, 1397 self.UNSET_TIMEOUT 1398 ]) 1399 self.test_add_monitor(monitor1, expected_activate=True) 1400 1401 # Unset RSSI filter, adv should match multiple times. 1402 self.test_start_peer_device_adv(self.peer_keybd, duration=5) 1403 self.test_device_found(monitor1, count=self.MULTIPLE_EVENTS) 1404 1405 # Unset RSSI filter, DeviceLost should not get triggered. 1406 self.test_stop_peer_device_adv(self.peer_keybd, duration=5) 1407 self.test_device_lost(monitor1, count=0) 1408 1409 self.test_remove_monitor(monitor1) 1410 1411 monitor1.update_rssi([-10, 5, -20, 5]) 1412 self.test_add_monitor(monitor1, expected_activate=True) 1413 1414 # Adv RSSI lower than RSSI filter, DeviceFound should not get triggered. 1415 self.test_start_peer_device_adv(self.peer_keybd, duration=10) 1416 self.test_device_found(monitor1, count=0) 1417 1418 # No device was found earlier, so DeviceLost should not get triggered. 1419 self.test_stop_peer_device_adv(self.peer_keybd, duration=10) 1420 self.test_device_lost(monitor1, count=0) 1421 1422 self.test_remove_monitor(monitor1) 1423 1424 # Unregister the app, should not fail. 1425 self.test_unregister_app(app1) 1426 1427 # Terminate the test app instance. 1428 self.test_exit_app(app1) 1429 1430 1431 def advmon_test_rssi_filter_multi_peers(self): 1432 """Test case: RSSI_FILTER_MULTI_PEERS 1433 1434 Verify RSSI filter matching with multiple peer devices. 1435 1436 """ 1437 self.test_is_adv_monitoring_supported() 1438 self.test_setup_peer_devices() 1439 1440 # Create a test app instance. 1441 app1 = self.create_app() 1442 1443 monitor1 = TestMonitor(app1) 1444 monitor1.update_type('or_patterns') 1445 monitor1.update_patterns([ 1446 [0, 0x03, [0x12, 0x18]], 1447 ]) 1448 1449 # Register the app, should not fail. 1450 self.test_register_app(app1) 1451 1452 monitor1.update_rssi([ 1453 self.HIGH_RSSI, self.UNSET_TIMEOUT, self.LOW_RSSI, 3, 1454 ]) 1455 self.test_add_monitor(monitor1, expected_activate=True) 1456 1457 # DeviceFound should get triggered only once per device. 1458 self.test_start_peer_device_adv(self.peer_keybd, duration=10) 1459 self.test_device_found(monitor1, count=1) 1460 1461 # DeviceFound should get triggered for another device. 1462 self.test_start_peer_device_adv(self.peer_mouse, duration=10) 1463 self.test_device_found(monitor1, count=2) 1464 1465 # DeviceLost should get triggered only once per device. 1466 self.test_stop_peer_device_adv(self.peer_keybd, duration=10) 1467 self.test_device_lost(monitor1, count=1) 1468 1469 # DeviceLost should get triggered for another device. 1470 self.test_stop_peer_device_adv(self.peer_mouse, duration=10) 1471 self.test_device_lost(monitor1, count=2) 1472 1473 self.test_remove_monitor(monitor1) 1474 1475 # Unregister the app, should not fail. 1476 self.test_unregister_app(app1) 1477 1478 # Terminate the test app instance. 1479 self.test_exit_app(app1) 1480 1481 1482 def advmon_test_rssi_filter_reset(self): 1483 """Test case: RSSI_FILTER_RESET 1484 1485 Verify reset of RSSI timers based on advertisements. 1486 1487 """ 1488 self.test_is_adv_monitoring_supported() 1489 self.test_setup_peer_devices() 1490 1491 # Create a test app instance. 1492 app1 = self.create_app() 1493 1494 monitor1 = TestMonitor(app1) 1495 monitor1.update_type('or_patterns') 1496 monitor1.update_patterns([ 1497 [0, 0x03, [0x12, 0x18]], 1498 ]) 1499 1500 # Register the app, should not fail. 1501 self.test_register_app(app1) 1502 1503 monitor1.update_rssi([ 1504 self.HIGH_RSSI, self.UNSET_TIMEOUT, self.LOW_RSSI, 10, 1505 ]) 1506 self.test_add_monitor(monitor1, expected_activate=True) 1507 1508 # DeviceFound should get triggered once the peer starts advertising. 1509 self.test_start_peer_device_adv(self.peer_keybd, duration=5) 1510 self.test_device_found(monitor1, count=1) 1511 1512 # DeviceLost should not get triggered before timeout. 1513 self.test_stop_peer_device_adv(self.peer_keybd, duration=5) 1514 self.test_device_lost(monitor1, count=0) 1515 1516 # Timer should get reset, so DeviceLost should not get triggered. 1517 # DeviceFound should not get triggered as device is not lost yet. 1518 self.test_start_peer_device_adv(self.peer_keybd, duration=5) 1519 self.test_device_lost(monitor1, count=0) 1520 self.test_device_found(monitor1, count=1) 1521 1522 # Timer should get reset, so DeviceLost should not get triggered. 1523 self.test_stop_peer_device_adv(self.peer_keybd, duration=5) 1524 self.test_device_lost(monitor1, count=0) 1525 1526 # DeviceLost should get triggered once timer completes. 1527 self.test_device_lost(monitor1, count=1, delay=10) 1528 1529 self.test_remove_monitor(monitor1) 1530 1531 # Unregister the app, should not fail. 1532 self.test_unregister_app(app1) 1533 1534 # Terminate the test app instance. 1535 self.test_exit_app(app1) 1536 1537 1538 def advmon_test_multi_client(self): 1539 """Test case: MULTI_CLIENT 1540 1541 Verify working of patterns filter and RSSI filters with multiple 1542 clients and multiple monitors. 1543 1544 """ 1545 self.test_is_adv_monitoring_supported() 1546 self.test_setup_peer_devices() 1547 1548 # Create two test app instances. 1549 app1 = self.create_app() 1550 app2 = self.create_app() 1551 1552 # Register both apps, should not fail. 1553 self.test_register_app(app1) 1554 self.test_register_app(app2) 1555 1556 # Monitors with same pattern and RSSI filter values in both apps. 1557 monitor1 = TestMonitor(app1) 1558 monitor1.update_type('or_patterns') 1559 monitor1.update_patterns([ 1560 [0, 0x03, [0x12, 0x18]], 1561 [0, 0x19, [0xc1, 0x03]], 1562 ]) 1563 monitor1.update_rssi([ 1564 self.HIGH_RSSI, self.UNSET_TIMEOUT, self.LOW_RSSI, 3, 1565 ]) 1566 1567 monitor2 = TestMonitor(app2) 1568 monitor2.update_type('or_patterns') 1569 monitor2.update_patterns([ 1570 [0, 0x03, [0x12, 0x18]], 1571 [0, 0x19, [0xc1, 0x03]], 1572 ]) 1573 monitor2.update_rssi([ 1574 self.HIGH_RSSI, self.UNSET_TIMEOUT, self.LOW_RSSI, 3, 1575 ]) 1576 1577 # Activate should get invoked. 1578 self.test_add_monitor(monitor1, expected_activate=True) 1579 self.test_add_monitor(monitor2, expected_activate=True) 1580 1581 # DeviceFound should get triggered for keyboard. 1582 self.test_start_peer_device_adv(self.peer_keybd, duration=5) 1583 self.test_device_found(monitor1, count=1) 1584 self.test_device_found(monitor2, count=1) 1585 self.test_stop_peer_device_adv(self.peer_keybd) 1586 1587 # Remove a monitor from one app. 1588 self.test_remove_monitor(monitor1) 1589 1590 # Monitors with same pattern but different RSSI filter values. 1591 monitor3 = TestMonitor(app1) 1592 monitor3.update_type('or_patterns') 1593 monitor3.update_patterns([ 1594 [0, 0x19, [0xc2, 0x03]], 1595 ]) 1596 monitor3.update_rssi([ 1597 self.HIGH_RSSI, self.UNSET_TIMEOUT, self.LOW_RSSI, 3, 1598 ]) 1599 1600 monitor4 = TestMonitor(app2) 1601 monitor4.update_type('or_patterns') 1602 monitor4.update_patterns([ 1603 [0, 0x19, [0xc2, 0x03]], 1604 ]) 1605 monitor4.update_rssi([ 1606 self.HIGH_RSSI, self.UNSET_TIMEOUT, self.LOW_RSSI, 10, 1607 ]) 1608 1609 # Activate should get invoked. 1610 self.test_add_monitor(monitor3, expected_activate=True) 1611 self.test_add_monitor(monitor4, expected_activate=True) 1612 1613 # DeviceFound should get triggered for mouse. 1614 self.test_start_peer_device_adv(self.peer_mouse, duration=5) 1615 self.test_device_found(monitor2, count=2) 1616 self.test_device_found(monitor3, count=1) 1617 self.test_device_found(monitor4, count=1) 1618 self.test_stop_peer_device_adv(self.peer_mouse) 1619 1620 # Unregister both apps, should not fail. 1621 self.test_unregister_app(app1) 1622 self.test_unregister_app(app2) 1623 1624 # Terminate the both test app instances. 1625 self.test_exit_app(app1) 1626 self.test_exit_app(app2) 1627 1628 1629 def advmon_test_fg_bg_combination(self): 1630 """Test case: FG_BG_COMBINATION 1631 1632 Verify background scanning and foreground scanning do not interfere 1633 working of each other. 1634 1635 """ 1636 self.test_is_adv_monitoring_supported() 1637 self.test_setup_peer_devices() 1638 1639 # Create a test app instance. 1640 app1 = self.create_app() 1641 1642 monitor1 = TestMonitor(app1) 1643 monitor1.update_type('or_patterns') 1644 monitor1.update_patterns([ 1645 [0, 0x03, [0x12, 0x18]], 1646 ]) 1647 monitor1.update_rssi([ 1648 self.HIGH_RSSI, self.UNSET_TIMEOUT, self.LOW_RSSI, 3, 1649 ]) 1650 1651 # Register the app, should not fail. 1652 self.test_register_app(app1) 1653 1654 # Activate should get invoked. 1655 self.test_add_monitor(monitor1, expected_activate=True) 1656 1657 # Pair/connect LE Mouse. 1658 self.test_start_peer_device_adv(self.peer_mouse, duration=5) 1659 time.sleep(self.PAIR_TEST_SLEEP_SECS) 1660 self.test_discover_device(self.peer_mouse.address) 1661 time.sleep(self.PAIR_TEST_SLEEP_SECS) 1662 self.test_pairing(self.peer_mouse.address, self.peer_mouse.pin) 1663 time.sleep(self.PAIR_TEST_SLEEP_SECS) 1664 self.test_connection_by_adapter(self.peer_mouse.address) 1665 self.test_connection_by_device(self.peer_mouse) 1666 1667 # DeviceFound should get triggered for keyboard. 1668 self.test_reset_event_count(monitor1) 1669 self.test_start_peer_device_adv(self.peer_keybd, duration=5) 1670 self.test_device_found(monitor1, count=self.MULTIPLE_EVENTS) 1671 self.test_stop_peer_device_adv(self.peer_keybd, duration=5) 1672 1673 # Start foreground scanning. 1674 self.test_start_discovery() 1675 1676 # Disconnect LE mouse. 1677 self.test_disconnection_by_device(self.peer_mouse) 1678 1679 # Remove the monitor. 1680 self.test_remove_monitor(monitor1) 1681 1682 # Activate should get invoked. 1683 self.test_add_monitor(monitor1, expected_activate=True) 1684 1685 # Connect LE mouse. 1686 self.test_connection_by_device(self.peer_mouse) 1687 1688 # DeviceFound should get triggered for keyboard. 1689 self.test_reset_event_count(monitor1) 1690 self.test_start_peer_device_adv(self.peer_keybd, duration=10) 1691 self.test_device_found(monitor1, count=self.MULTIPLE_EVENTS) 1692 self.test_stop_peer_device_adv(self.peer_keybd, duration=5) 1693 1694 # Stop foreground scanning. 1695 self.test_stop_discovery() 1696 1697 # Disconnect LE mouse. 1698 self.test_disconnection_by_device(self.peer_mouse) 1699 1700 # DeviceFound should get triggered for keyboard. 1701 self.test_reset_event_count(monitor1) 1702 self.test_start_peer_device_adv(self.peer_keybd, duration=5) 1703 self.test_device_found(monitor1, count=self.MULTIPLE_EVENTS) 1704 self.test_stop_peer_device_adv(self.peer_keybd) 1705 1706 # Remove the monitor. 1707 self.test_remove_monitor(monitor1) 1708 1709 # Connect LE mouse. 1710 self.test_connection_by_device(self.peer_mouse) 1711 1712 # Unregister the app, should not fail. 1713 self.test_unregister_app(app1) 1714 1715 # Terminate the test app instance. 1716 self.test_exit_app(app1) 1717 1718 1719 def advmon_test_suspend_resume(self): 1720 """Test case: SUSPEND_RESUME 1721 1722 Verify working of background scanning with suspend/resume. 1723 1724 """ 1725 self.test_is_adv_monitoring_supported() 1726 self.test_setup_peer_devices() 1727 1728 # Create two test app instances. 1729 app1 = self.create_app() 1730 app2 = self.create_app() 1731 1732 # Register both apps, should not fail. 1733 self.test_register_app(app1) 1734 self.test_register_app(app2) 1735 1736 # Add monitors in both apps. 1737 monitor1 = TestMonitor(app1) 1738 monitor1.update_type('or_patterns') 1739 monitor1.update_patterns([ [0, 0x03, [0x12, 0x18]], ]) 1740 monitor1.update_rssi([ 1741 self.HIGH_RSSI, self.UNSET_TIMEOUT, self.LOW_RSSI, 3, 1742 ]) 1743 1744 monitor2 = TestMonitor(app1) 1745 monitor2.update_type('or_patterns') 1746 monitor2.update_patterns([ [0, 0x19, [0xc2, 0x03]], ]) 1747 monitor2.update_rssi([ 1748 self.HIGH_RSSI, self.UNSET_TIMEOUT, self.LOW_RSSI, 10, 1749 ]) 1750 1751 monitor3 = TestMonitor(app2) 1752 monitor3.update_type('or_patterns') 1753 monitor3.update_patterns([ [0, 0x03, [0x12, 0x18]], ]) 1754 monitor3.update_rssi([ 1755 self.HIGH_RSSI, self.UNSET_TIMEOUT, self.LOW_RSSI, 3, 1756 ]) 1757 1758 monitor4 = TestMonitor(app2) 1759 monitor4.update_type('or_patterns') 1760 monitor4.update_patterns([ [0, 0x19, [0xc1, 0x03]], ]) 1761 monitor4.update_rssi([ 1762 self.HIGH_RSSI, self.UNSET_TIMEOUT, self.LOW_RSSI, 15, 1763 ]) 1764 1765 # Activate should get invoked. 1766 self.test_add_monitor(monitor1, expected_activate=True) 1767 self.test_add_monitor(monitor2, expected_activate=True) 1768 self.test_add_monitor(monitor3, expected_activate=True) 1769 self.test_add_monitor(monitor4, expected_activate=True) 1770 1771 # DeviceFound for mouse should get triggered only for matched monitors 1772 self.test_start_peer_device_adv(self.peer_mouse, duration=5) 1773 self.test_device_found(monitor1, count=1) 1774 self.test_device_found(monitor2, count=1) 1775 self.test_device_found(monitor3, count=1) 1776 self.test_device_found(monitor4, count=0) 1777 1778 # Initiate suspend/resume. 1779 self.suspend_resume() 1780 1781 # DeviceLost should get triggered for tracked devices on resume. 1782 self.test_device_lost(monitor1, count=1) 1783 self.test_device_lost(monitor2, count=1) 1784 self.test_device_lost(monitor3, count=1) 1785 self.test_device_lost(monitor4, count=0) 1786 1787 # DeviceFound should get triggered again for matched monitors on resume. 1788 self.test_device_found(monitor1, count=2) 1789 self.test_device_found(monitor2, count=2) 1790 self.test_device_found(monitor3, count=2) 1791 self.test_device_found(monitor4, count=0) 1792 self.test_stop_peer_device_adv(self.peer_mouse) 1793 1794 # Remove a monitor from one app, shouldn't affect working of other 1795 # monitors or apps. 1796 self.test_remove_monitor(monitor1) 1797 1798 # Terminate an app, shouldn't affect working of monitors in other apps. 1799 self.test_exit_app(app1) 1800 1801 # DeviceFound should get triggered for keyboard. 1802 self.test_start_peer_device_adv(self.peer_keybd, duration=5) 1803 self.test_device_found(monitor3, count=3) 1804 self.test_device_found(monitor4, count=1) 1805 self.test_stop_peer_device_adv(self.peer_keybd) 1806 1807 # Unregister the running app, should not fail. 1808 self.test_unregister_app(app2) 1809 1810 # Terminate the running test app instance. 1811 self.test_exit_app(app2) 1812 1813 1814 def advmon_test_interleaved_scan(self): 1815 """ Test cases for verifying interleave scan """ 1816 1817 self.test_is_adv_monitoring_supported() 1818 1819 # cycles to collect logs for tests expect no interleave scan 1820 EXPECT_FALSE_TEST_CYCLE = 3 1821 1822 supported_features = self.read_supported_features() 1823 1824 if 'controller-patterns' in supported_features: 1825 # For device supporting hardware filtering, software interleave 1826 # scan shall not be used. 1827 sw_interleave_scan = False 1828 else: 1829 sw_interleave_scan = True 1830 1831 # Create a test app instance. 1832 app1 = self.create_app() 1833 1834 monitor1 = TestMonitor(app1) 1835 monitor1.update_type('or_patterns') 1836 monitor1.update_patterns([ 1837 [0, 0x03, [0x12, 0x18]], 1838 ]) 1839 monitor1.update_rssi([ 1840 self.UNSET_RSSI, 1841 self.UNSET_TIMEOUT, 1842 self.UNSET_RSSI, 1843 self.UNSET_TIMEOUT 1844 ]) 1845 1846 # Register the app, should not fail. 1847 self.test_register_app(app1) 1848 1849 # Activate should get invoked. 1850 self.test_add_monitor(monitor1, expected_activate=True) 1851 1852 # No device in allowlist, interleave with idle 1853 self.test_interleaving_state(False, cycles=EXPECT_FALSE_TEST_CYCLE) 1854 1855 # No device in allowlist, interleave with idle, interrupted by active 1856 # scan 1857 self.test_start_discovery() 1858 self.test_interleaving_state(False, cycles=EXPECT_FALSE_TEST_CYCLE) 1859 self.test_stop_discovery() 1860 self.test_interleaving_state(False, cycles=EXPECT_FALSE_TEST_CYCLE) 1861 1862 # No device in allowlist, interleave with idle, interrupted by suspend 1863 # resume 1864 self.suspend_resume() 1865 self.test_interleaving_state(False, cycles=EXPECT_FALSE_TEST_CYCLE) 1866 1867 # Pair/connect LE Mouse. 1868 device = self.devices['BLE_MOUSE'][0] 1869 self.test_discover_device(device.address) 1870 time.sleep(self.PAIR_TEST_SLEEP_SECS) 1871 self.test_pairing(device.address, device.pin, trusted=True) 1872 time.sleep(self.PAIR_TEST_SLEEP_SECS) 1873 1874 # BLE_MOUSE in allowlist, interleave with allowlist passive scan 1875 self.test_interleaving_state(False, cycles=EXPECT_FALSE_TEST_CYCLE) 1876 device.AdapterPowerOff() 1877 # Make sure the peer is disconnected 1878 self.test_device_is_not_connected(device.address) 1879 self.test_interleaving_state(sw_interleave_scan) 1880 1881 # Interleaving with allowlist should get paused during active scan 1882 self.test_interleaving_active_scan_cycle(sw_interleave_scan) 1883 1884 # Interleaving with allowlist should get resumed after stopping scan 1885 self.test_interleaving_state(sw_interleave_scan) 1886 1887 # Interleaving with allowlist should get paused during system suspend, 1888 # get resumed after system awake 1889 self.test_interleaving_suspend_resume(sw_interleave_scan) 1890 self.test_interleaving_state(sw_interleave_scan) 1891 1892 self.test_remove_monitor(monitor1) 1893 self.test_interleaving_state(False, cycles=EXPECT_FALSE_TEST_CYCLE) 1894 1895 # Unregister the app, should not fail. 1896 self.test_unregister_app(app1) 1897 1898 # Terminate the test app instance. 1899 self.test_exit_app(app1) 1900 1901 device.AdapterPowerOn() 1902