1#!/usr/bin/env python3
2#
3#   Copyright 2016 Google, Inc.
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import ipaddress
18import logging
19import os
20import re
21import shutil
22import random
23import subprocess
24import time
25
26from retry import retry
27
28from collections import namedtuple
29from enum import IntEnum
30from queue import Empty
31
32from acts import asserts
33from acts import context
34from acts import signals
35from acts import utils
36from acts.controllers import attenuator
37from acts.controllers.adb_lib.error import AdbCommandError
38from acts.controllers.android_device import AndroidDevice
39from acts.controllers.ap_lib import hostapd_security
40from acts.controllers.ap_lib import hostapd_ap_preset
41from acts.controllers.ap_lib.hostapd_constants import BAND_2G
42from acts.controllers.ap_lib.hostapd_constants import BAND_5G
43from acts_contrib.test_utils.net import connectivity_const as cconsts
44from acts_contrib.test_utils.tel import tel_defines
45from acts_contrib.test_utils.wifi import wifi_constants
46from acts_contrib.test_utils.wifi.aware import aware_test_utils as autils
47
48# Default timeout used for reboot, toggle WiFi and Airplane mode,
49# for the system to settle down after the operation.
50DEFAULT_TIMEOUT = 10
51# Number of seconds to wait for events that are supposed to happen quickly.
52# Like onSuccess for start background scan and confirmation on wifi state
53# change.
54SHORT_TIMEOUT = 30
55ROAMING_TIMEOUT = 30
56WIFI_CONNECTION_TIMEOUT_DEFAULT = 30
57DEFAULT_SCAN_TRIES = 3
58DEFAULT_CONNECT_TRIES = 3
59# Speed of light in m/s.
60SPEED_OF_LIGHT = 299792458
61# WiFi scan retry interval
62WIFI_SCAN_RETRY_INTERVAL_SEC = 5
63
64DEFAULT_PING_ADDR = "https://www.google.com/robots.txt"
65
66ROAMING_ATTN = {
67    "AP1_on_AP2_off": [0, 0, 95, 95],
68    "AP1_off_AP2_on": [95, 95, 0, 0],
69    "default": [0, 0, 0, 0]
70}
71
72
73class WifiEnums():
74
75    SSID_KEY = "SSID"  # Used for Wifi & SoftAp
76    SSID_PATTERN_KEY = "ssidPattern"
77    NETID_KEY = "network_id"
78    BSSID_KEY = "BSSID"  # Used for Wifi & SoftAp
79    BSSID_PATTERN_KEY = "bssidPattern"
80    PWD_KEY = "password"  # Used for Wifi & SoftAp
81    frequency_key = "frequency"
82    HIDDEN_KEY = "hiddenSSID"  # Used for Wifi & SoftAp
83    IS_APP_INTERACTION_REQUIRED = "isAppInteractionRequired"
84    IS_USER_INTERACTION_REQUIRED = "isUserInteractionRequired"
85    IS_SUGGESTION_METERED = "isMetered"
86    PRIORITY = "priority"
87    SECURITY = "security"  # Used for Wifi & SoftAp
88
89    # Used for SoftAp
90    AP_BAND_KEY = "apBand"
91    AP_CHANNEL_KEY = "apChannel"
92    AP_BANDS_KEY = "apBands"
93    AP_CHANNEL_FREQUENCYS_KEY = "apChannelFrequencies"
94    AP_MAC_RANDOMIZATION_SETTING_KEY = "MacRandomizationSetting"
95    AP_BRIDGED_OPPORTUNISTIC_SHUTDOWN_ENABLE_KEY = "BridgedModeOpportunisticShutdownEnabled"
96    AP_IEEE80211AX_ENABLED_KEY = "Ieee80211axEnabled"
97    AP_MAXCLIENTS_KEY = "MaxNumberOfClients"
98    AP_SHUTDOWNTIMEOUT_KEY = "ShutdownTimeoutMillis"
99    AP_SHUTDOWNTIMEOUTENABLE_KEY = "AutoShutdownEnabled"
100    AP_CLIENTCONTROL_KEY = "ClientControlByUserEnabled"
101    AP_ALLOWEDLIST_KEY = "AllowedClientList"
102    AP_BLOCKEDLIST_KEY = "BlockedClientList"
103
104    WIFI_CONFIG_SOFTAP_BAND_2G = 1
105    WIFI_CONFIG_SOFTAP_BAND_5G = 2
106    WIFI_CONFIG_SOFTAP_BAND_2G_5G = 3
107    WIFI_CONFIG_SOFTAP_BAND_6G = 4
108    WIFI_CONFIG_SOFTAP_BAND_2G_6G = 5
109    WIFI_CONFIG_SOFTAP_BAND_5G_6G = 6
110    WIFI_CONFIG_SOFTAP_BAND_ANY = 7
111
112    # DO NOT USE IT for new test case! Replaced by WIFI_CONFIG_SOFTAP_BAND_
113    WIFI_CONFIG_APBAND_2G = WIFI_CONFIG_SOFTAP_BAND_2G
114    WIFI_CONFIG_APBAND_5G = WIFI_CONFIG_SOFTAP_BAND_5G
115    WIFI_CONFIG_APBAND_AUTO = WIFI_CONFIG_SOFTAP_BAND_2G_5G
116
117    WIFI_CONFIG_APBAND_2G_OLD = 0
118    WIFI_CONFIG_APBAND_5G_OLD = 1
119    WIFI_CONFIG_APBAND_AUTO_OLD = -1
120
121    WIFI_WPS_INFO_PBC = 0
122    WIFI_WPS_INFO_DISPLAY = 1
123    WIFI_WPS_INFO_KEYPAD = 2
124    WIFI_WPS_INFO_LABEL = 3
125    WIFI_WPS_INFO_INVALID = 4
126
127    class SoftApSecurityType():
128        OPEN = "NONE"
129        WPA2 = "WPA2_PSK"
130        WPA3_SAE_TRANSITION = "WPA3_SAE_TRANSITION"
131        WPA3_SAE = "WPA3_SAE"
132
133    class CountryCode():
134        AUSTRALIA = "AU"
135        CHINA = "CN"
136        GERMANY = "DE"
137        JAPAN = "JP"
138        UK = "GB"
139        US = "US"
140        UNKNOWN = "UNKNOWN"
141
142    # Start of Macros for EAP
143    # EAP types
144    class Eap(IntEnum):
145        NONE = -1
146        PEAP = 0
147        TLS = 1
148        TTLS = 2
149        PWD = 3
150        SIM = 4
151        AKA = 5
152        AKA_PRIME = 6
153        UNAUTH_TLS = 7
154
155    # EAP Phase2 types
156    class EapPhase2(IntEnum):
157        NONE = 0
158        PAP = 1
159        MSCHAP = 2
160        MSCHAPV2 = 3
161        GTC = 4
162
163    class Enterprise:
164        # Enterprise Config Macros
165        EMPTY_VALUE = "NULL"
166        EAP = "eap"
167        PHASE2 = "phase2"
168        IDENTITY = "identity"
169        ANON_IDENTITY = "anonymous_identity"
170        PASSWORD = "password"
171        SUBJECT_MATCH = "subject_match"
172        ALTSUBJECT_MATCH = "altsubject_match"
173        DOM_SUFFIX_MATCH = "domain_suffix_match"
174        CLIENT_CERT = "client_cert"
175        CA_CERT = "ca_cert"
176        ENGINE = "engine"
177        ENGINE_ID = "engine_id"
178        PRIVATE_KEY_ID = "key_id"
179        REALM = "realm"
180        PLMN = "plmn"
181        FQDN = "FQDN"
182        FRIENDLY_NAME = "providerFriendlyName"
183        ROAMING_IDS = "roamingConsortiumIds"
184        OCSP = "ocsp"
185
186    # End of Macros for EAP
187
188    # Macros for wifi p2p.
189    WIFI_P2P_SERVICE_TYPE_ALL = 0
190    WIFI_P2P_SERVICE_TYPE_BONJOUR = 1
191    WIFI_P2P_SERVICE_TYPE_UPNP = 2
192    WIFI_P2P_SERVICE_TYPE_VENDOR_SPECIFIC = 255
193
194    class ScanResult:
195        CHANNEL_WIDTH_20MHZ = 0
196        CHANNEL_WIDTH_40MHZ = 1
197        CHANNEL_WIDTH_80MHZ = 2
198        CHANNEL_WIDTH_160MHZ = 3
199        CHANNEL_WIDTH_80MHZ_PLUS_MHZ = 4
200
201    # Macros for wifi rtt.
202    class RttType(IntEnum):
203        TYPE_ONE_SIDED = 1
204        TYPE_TWO_SIDED = 2
205
206    class RttPeerType(IntEnum):
207        PEER_TYPE_AP = 1
208        PEER_TYPE_STA = 2  # Requires NAN.
209        PEER_P2P_GO = 3
210        PEER_P2P_CLIENT = 4
211        PEER_NAN = 5
212
213    class RttPreamble(IntEnum):
214        PREAMBLE_LEGACY = 0x01
215        PREAMBLE_HT = 0x02
216        PREAMBLE_VHT = 0x04
217
218    class RttBW(IntEnum):
219        BW_5_SUPPORT = 0x01
220        BW_10_SUPPORT = 0x02
221        BW_20_SUPPORT = 0x04
222        BW_40_SUPPORT = 0x08
223        BW_80_SUPPORT = 0x10
224        BW_160_SUPPORT = 0x20
225
226    class Rtt(IntEnum):
227        STATUS_SUCCESS = 0
228        STATUS_FAILURE = 1
229        STATUS_FAIL_NO_RSP = 2
230        STATUS_FAIL_REJECTED = 3
231        STATUS_FAIL_NOT_SCHEDULED_YET = 4
232        STATUS_FAIL_TM_TIMEOUT = 5
233        STATUS_FAIL_AP_ON_DIFF_CHANNEL = 6
234        STATUS_FAIL_NO_CAPABILITY = 7
235        STATUS_ABORTED = 8
236        STATUS_FAIL_INVALID_TS = 9
237        STATUS_FAIL_PROTOCOL = 10
238        STATUS_FAIL_SCHEDULE = 11
239        STATUS_FAIL_BUSY_TRY_LATER = 12
240        STATUS_INVALID_REQ = 13
241        STATUS_NO_WIFI = 14
242        STATUS_FAIL_FTM_PARAM_OVERRIDE = 15
243
244        REASON_UNSPECIFIED = -1
245        REASON_NOT_AVAILABLE = -2
246        REASON_INVALID_LISTENER = -3
247        REASON_INVALID_REQUEST = -4
248
249    class RttParam:
250        device_type = "deviceType"
251        request_type = "requestType"
252        BSSID = "bssid"
253        channel_width = "channelWidth"
254        frequency = "frequency"
255        center_freq0 = "centerFreq0"
256        center_freq1 = "centerFreq1"
257        number_burst = "numberBurst"
258        interval = "interval"
259        num_samples_per_burst = "numSamplesPerBurst"
260        num_retries_per_measurement_frame = "numRetriesPerMeasurementFrame"
261        num_retries_per_FTMR = "numRetriesPerFTMR"
262        lci_request = "LCIRequest"
263        lcr_request = "LCRRequest"
264        burst_timeout = "burstTimeout"
265        preamble = "preamble"
266        bandwidth = "bandwidth"
267        margin = "margin"
268
269    RTT_MARGIN_OF_ERROR = {
270        RttBW.BW_80_SUPPORT: 2,
271        RttBW.BW_40_SUPPORT: 5,
272        RttBW.BW_20_SUPPORT: 5
273    }
274
275    # Macros as specified in the WifiScanner code.
276    WIFI_BAND_UNSPECIFIED = 0  # not specified
277    WIFI_BAND_24_GHZ = 1  # 2.4 GHz band
278    WIFI_BAND_5_GHZ = 2  # 5 GHz band without DFS channels
279    WIFI_BAND_5_GHZ_DFS_ONLY = 4  # 5 GHz band with DFS channels
280    WIFI_BAND_5_GHZ_WITH_DFS = 6  # 5 GHz band with DFS channels
281    WIFI_BAND_BOTH = 3  # both bands without DFS channels
282    WIFI_BAND_BOTH_WITH_DFS = 7  # both bands with DFS channels
283
284    REPORT_EVENT_AFTER_BUFFER_FULL = 0
285    REPORT_EVENT_AFTER_EACH_SCAN = 1
286    REPORT_EVENT_FULL_SCAN_RESULT = 2
287
288    SCAN_TYPE_LOW_LATENCY = 0
289    SCAN_TYPE_LOW_POWER = 1
290    SCAN_TYPE_HIGH_ACCURACY = 2
291
292    # US Wifi frequencies
293    ALL_2G_FREQUENCIES = [
294        2412, 2417, 2422, 2427, 2432, 2437, 2442, 2447, 2452, 2457, 2462
295    ]
296    DFS_5G_FREQUENCIES = [
297        5260, 5280, 5300, 5320, 5500, 5520, 5540, 5560, 5580, 5600, 5620, 5640,
298        5660, 5680, 5700, 5720
299    ]
300    NONE_DFS_5G_FREQUENCIES = [
301        5180, 5200, 5220, 5240, 5745, 5765, 5785, 5805, 5825
302    ]
303    ALL_5G_FREQUENCIES = DFS_5G_FREQUENCIES + NONE_DFS_5G_FREQUENCIES
304
305    band_to_frequencies = {
306        WIFI_BAND_24_GHZ: ALL_2G_FREQUENCIES,
307        WIFI_BAND_5_GHZ: NONE_DFS_5G_FREQUENCIES,
308        WIFI_BAND_5_GHZ_DFS_ONLY: DFS_5G_FREQUENCIES,
309        WIFI_BAND_5_GHZ_WITH_DFS: ALL_5G_FREQUENCIES,
310        WIFI_BAND_BOTH: ALL_2G_FREQUENCIES + NONE_DFS_5G_FREQUENCIES,
311        WIFI_BAND_BOTH_WITH_DFS: ALL_5G_FREQUENCIES + ALL_2G_FREQUENCIES
312    }
313
314    # TODO: add all of the band mapping.
315    softap_band_frequencies = {
316        WIFI_CONFIG_SOFTAP_BAND_2G: ALL_2G_FREQUENCIES,
317        WIFI_CONFIG_SOFTAP_BAND_5G: ALL_5G_FREQUENCIES
318    }
319
320    # All Wifi frequencies to channels lookup.
321    freq_to_channel = {
322        2412: 1,
323        2417: 2,
324        2422: 3,
325        2427: 4,
326        2432: 5,
327        2437: 6,
328        2442: 7,
329        2447: 8,
330        2452: 9,
331        2457: 10,
332        2462: 11,
333        2467: 12,
334        2472: 13,
335        2484: 14,
336        4915: 183,
337        4920: 184,
338        4925: 185,
339        4935: 187,
340        4940: 188,
341        4945: 189,
342        4960: 192,
343        4980: 196,
344        5035: 7,
345        5040: 8,
346        5045: 9,
347        5055: 11,
348        5060: 12,
349        5080: 16,
350        5170: 34,
351        5180: 36,
352        5190: 38,
353        5200: 40,
354        5210: 42,
355        5220: 44,
356        5230: 46,
357        5240: 48,
358        5260: 52,
359        5280: 56,
360        5300: 60,
361        5320: 64,
362        5500: 100,
363        5520: 104,
364        5540: 108,
365        5560: 112,
366        5580: 116,
367        5600: 120,
368        5620: 124,
369        5640: 128,
370        5660: 132,
371        5680: 136,
372        5700: 140,
373        5720: 144,
374        5745: 149,
375        5765: 153,
376        5785: 157,
377        5795: 159,
378        5805: 161,
379        5825: 165,
380        5845: 169,
381        5865: 173,
382        5885: 177
383    }
384
385    # All Wifi channels to frequencies lookup.
386    channel_2G_to_freq = {
387        1: 2412,
388        2: 2417,
389        3: 2422,
390        4: 2427,
391        5: 2432,
392        6: 2437,
393        7: 2442,
394        8: 2447,
395        9: 2452,
396        10: 2457,
397        11: 2462,
398        12: 2467,
399        13: 2472,
400        14: 2484
401    }
402
403    channel_5G_to_freq = {
404        183: 4915,
405        184: 4920,
406        185: 4925,
407        187: 4935,
408        188: 4940,
409        189: 4945,
410        192: 4960,
411        196: 4980,
412        7: 5035,
413        8: 5040,
414        9: 5045,
415        11: 5055,
416        12: 5060,
417        16: 5080,
418        34: 5170,
419        36: 5180,
420        38: 5190,
421        40: 5200,
422        42: 5210,
423        44: 5220,
424        46: 5230,
425        48: 5240,
426        50: 5250,
427        52: 5260,
428        56: 5280,
429        60: 5300,
430        64: 5320,
431        100: 5500,
432        104: 5520,
433        108: 5540,
434        112: 5560,
435        116: 5580,
436        120: 5600,
437        124: 5620,
438        128: 5640,
439        132: 5660,
440        136: 5680,
441        140: 5700,
442        149: 5745,
443        151: 5755,
444        153: 5765,
445        155: 5775,
446        157: 5785,
447        159: 5795,
448        161: 5805,
449        165: 5825,
450        169: 5845,
451        173: 5865,
452        177: 5885
453    }
454
455    channel_6G_to_freq = {4 * x + 1: 5955 + 20 * x for x in range(59)}
456
457    channel_to_freq = {
458        '2G': channel_2G_to_freq,
459        '5G': channel_5G_to_freq,
460        '6G': channel_6G_to_freq
461    }
462
463
464class WifiChannelBase:
465    ALL_2G_FREQUENCIES = []
466    DFS_5G_FREQUENCIES = []
467    NONE_DFS_5G_FREQUENCIES = []
468    ALL_5G_FREQUENCIES = DFS_5G_FREQUENCIES + NONE_DFS_5G_FREQUENCIES
469    MIX_CHANNEL_SCAN = []
470
471    def band_to_freq(self, band):
472        _band_to_frequencies = {
473            WifiEnums.WIFI_BAND_24_GHZ:
474            self.ALL_2G_FREQUENCIES,
475            WifiEnums.WIFI_BAND_5_GHZ:
476            self.NONE_DFS_5G_FREQUENCIES,
477            WifiEnums.WIFI_BAND_5_GHZ_DFS_ONLY:
478            self.DFS_5G_FREQUENCIES,
479            WifiEnums.WIFI_BAND_5_GHZ_WITH_DFS:
480            self.ALL_5G_FREQUENCIES,
481            WifiEnums.WIFI_BAND_BOTH:
482            self.ALL_2G_FREQUENCIES + self.NONE_DFS_5G_FREQUENCIES,
483            WifiEnums.WIFI_BAND_BOTH_WITH_DFS:
484            self.ALL_5G_FREQUENCIES + self.ALL_2G_FREQUENCIES
485        }
486        return _band_to_frequencies[band]
487
488
489class WifiChannelUS(WifiChannelBase):
490    # US Wifi frequencies
491    ALL_2G_FREQUENCIES = [
492        2412, 2417, 2422, 2427, 2432, 2437, 2442, 2447, 2452, 2457, 2462
493    ]
494    NONE_DFS_5G_FREQUENCIES = [
495        5180, 5200, 5220, 5240, 5745, 5765, 5785, 5805, 5825
496    ]
497    MIX_CHANNEL_SCAN = [
498        2412, 2437, 2462, 5180, 5200, 5280, 5260, 5300, 5500, 5320, 5520, 5560,
499        5700, 5745, 5805
500    ]
501
502    def __init__(self, model=None, support_addition_channel=[]):
503        if model in support_addition_channel:
504            self.ALL_2G_FREQUENCIES = [
505                2412, 2417, 2422, 2427, 2432, 2437, 2442, 2447, 2452, 2457,
506                2462, 2467, 2472
507                ]
508        self.DFS_5G_FREQUENCIES = [
509            5260, 5280, 5300, 5320, 5500, 5520, 5540, 5560, 5580, 5600, 5620,
510            5640, 5660, 5680, 5700, 5720, 5845, 5865, 5885
511            ]
512        self.ALL_5G_FREQUENCIES = self.DFS_5G_FREQUENCIES + self.NONE_DFS_5G_FREQUENCIES
513
514
515class WifiReferenceNetworks:
516    """ Class to parse and return networks of different band and
517        auth type from reference_networks
518    """
519    def __init__(self, obj):
520        self.reference_networks = obj
521        self.WIFI_2G = "2g"
522        self.WIFI_5G = "5g"
523
524        self.secure_networks_2g = []
525        self.secure_networks_5g = []
526        self.open_networks_2g = []
527        self.open_networks_5g = []
528        self._parse_networks()
529
530    def _parse_networks(self):
531        for network in self.reference_networks:
532            for key in network:
533                if key == self.WIFI_2G:
534                    if "password" in network[key]:
535                        self.secure_networks_2g.append(network[key])
536                    else:
537                        self.open_networks_2g.append(network[key])
538                else:
539                    if "password" in network[key]:
540                        self.secure_networks_5g.append(network[key])
541                    else:
542                        self.open_networks_5g.append(network[key])
543
544    def return_2g_secure_networks(self):
545        return self.secure_networks_2g
546
547    def return_5g_secure_networks(self):
548        return self.secure_networks_5g
549
550    def return_2g_open_networks(self):
551        return self.open_networks_2g
552
553    def return_5g_open_networks(self):
554        return self.open_networks_5g
555
556    def return_secure_networks(self):
557        return self.secure_networks_2g + self.secure_networks_5g
558
559    def return_open_networks(self):
560        return self.open_networks_2g + self.open_networks_5g
561
562
563def _assert_on_fail_handler(func, assert_on_fail, *args, **kwargs):
564    """Wrapper function that handles the bahevior of assert_on_fail.
565
566    When assert_on_fail is True, let all test signals through, which can
567    terminate test cases directly. When assert_on_fail is False, the wrapper
568    raises no test signals and reports operation status by returning True or
569    False.
570
571    Args:
572        func: The function to wrap. This function reports operation status by
573              raising test signals.
574        assert_on_fail: A boolean that specifies if the output of the wrapper
575                        is test signal based or return value based.
576        args: Positional args for func.
577        kwargs: Name args for func.
578
579    Returns:
580        If assert_on_fail is True, returns True/False to signal operation
581        status, otherwise return nothing.
582    """
583    try:
584        func(*args, **kwargs)
585        if not assert_on_fail:
586            return True
587    except signals.TestSignal:
588        if assert_on_fail:
589            raise
590        return False
591
592
593def assert_network_in_list(target, network_list):
594    """Makes sure a specified target Wi-Fi network exists in a list of Wi-Fi
595    networks.
596
597    Args:
598        target: A dict representing a Wi-Fi network.
599                E.g. {WifiEnums.SSID_KEY: "SomeNetwork"}
600        network_list: A list of dicts, each representing a Wi-Fi network.
601    """
602    match_results = match_networks(target, network_list)
603    asserts.assert_true(
604        match_results, "Target network %s, does not exist in network list %s" %
605        (target, network_list))
606
607
608def match_networks(target_params, networks):
609    """Finds the WiFi networks that match a given set of parameters in a list
610    of WiFi networks.
611
612    To be considered a match, the network should contain every key-value pair
613    of target_params
614
615    Args:
616        target_params: A dict with 1 or more key-value pairs representing a Wi-Fi network.
617                       E.g { 'SSID': 'wh_ap1_5g', 'BSSID': '30:b5:c2:33:e4:47' }
618        networks: A list of dict objects representing WiFi networks.
619
620    Returns:
621        The networks that match the target parameters.
622    """
623    results = []
624    asserts.assert_true(target_params,
625                        "Expected networks object 'target_params' is empty")
626    for n in networks:
627        add_network = 1
628        for k, v in target_params.items():
629            if k not in n:
630                add_network = 0
631                break
632            if n[k] != v:
633                add_network = 0
634                break
635        if add_network:
636            results.append(n)
637    return results
638
639
640def wait_for_wifi_state(ad, state, assert_on_fail=True):
641    """Waits for the device to transition to the specified wifi state
642
643    Args:
644        ad: An AndroidDevice object.
645        state: Wifi state to wait for.
646        assert_on_fail: If True, error checks in this function will raise test
647                        failure signals.
648
649    Returns:
650        If assert_on_fail is False, function returns True if the device transitions
651        to the specified state, False otherwise. If assert_on_fail is True, no return value.
652    """
653    return _assert_on_fail_handler(_wait_for_wifi_state,
654                                   assert_on_fail,
655                                   ad,
656                                   state=state)
657
658
659def _wait_for_wifi_state(ad, state):
660    """Toggles the state of wifi.
661
662    TestFailure signals are raised when something goes wrong.
663
664    Args:
665        ad: An AndroidDevice object.
666        state: Wifi state to wait for.
667    """
668    if state == ad.droid.wifiCheckState():
669        # Check if the state is already achieved, so we don't wait for the
670        # state change event by mistake.
671        return
672    ad.droid.wifiStartTrackingStateChange()
673    fail_msg = "Device did not transition to Wi-Fi state to %s on %s." % (
674        state, ad.serial)
675    try:
676        ad.ed.wait_for_event(wifi_constants.WIFI_STATE_CHANGED,
677                             lambda x: x["data"]["enabled"] == state,
678                             SHORT_TIMEOUT)
679    except Empty:
680        asserts.assert_equal(state, ad.droid.wifiCheckState(), fail_msg)
681    finally:
682        ad.droid.wifiStopTrackingStateChange()
683
684
685def wifi_toggle_state(ad, new_state=None, assert_on_fail=True):
686    """Toggles the state of wifi.
687
688    Args:
689        ad: An AndroidDevice object.
690        new_state: Wifi state to set to. If None, opposite of the current state.
691        assert_on_fail: If True, error checks in this function will raise test
692                        failure signals.
693
694    Returns:
695        If assert_on_fail is False, function returns True if the toggle was
696        successful, False otherwise. If assert_on_fail is True, no return value.
697    """
698    return _assert_on_fail_handler(_wifi_toggle_state,
699                                   assert_on_fail,
700                                   ad,
701                                   new_state=new_state)
702
703
704def _wifi_toggle_state(ad, new_state=None):
705    """Toggles the state of wifi.
706
707    TestFailure signals are raised when something goes wrong.
708
709    Args:
710        ad: An AndroidDevice object.
711        new_state: The state to set Wi-Fi to. If None, opposite of the current
712                   state will be set.
713    """
714    if new_state is None:
715        new_state = not ad.droid.wifiCheckState()
716    elif new_state == ad.droid.wifiCheckState():
717        # Check if the new_state is already achieved, so we don't wait for the
718        # state change event by mistake.
719        return
720    ad.droid.wifiStartTrackingStateChange()
721    ad.log.info("Setting Wi-Fi state to %s.", new_state)
722    ad.ed.clear_all_events()
723    # Setting wifi state.
724    ad.droid.wifiToggleState(new_state)
725    time.sleep(2)
726    fail_msg = "Failed to set Wi-Fi state to %s on %s." % (new_state,
727                                                           ad.serial)
728    try:
729        ad.ed.wait_for_event(wifi_constants.WIFI_STATE_CHANGED,
730                             lambda x: x["data"]["enabled"] == new_state,
731                             SHORT_TIMEOUT)
732    except Empty:
733        asserts.assert_equal(new_state, ad.droid.wifiCheckState(), fail_msg)
734    finally:
735        ad.droid.wifiStopTrackingStateChange()
736
737
738def reset_wifi(ad):
739    """Clears all saved Wi-Fi networks on a device.
740
741    This will turn Wi-Fi on.
742
743    Args:
744        ad: An AndroidDevice object.
745
746    """
747    networks = ad.droid.wifiGetConfiguredNetworks()
748    if not networks:
749        return
750    removed = []
751    for n in networks:
752        if n['networkId'] not in removed:
753            ad.droid.wifiForgetNetwork(n['networkId'])
754            removed.append(n['networkId'])
755        else:
756            continue
757        try:
758            event = ad.ed.pop_event(wifi_constants.WIFI_FORGET_NW_SUCCESS,
759                                    SHORT_TIMEOUT)
760        except Empty:
761            logging.warning("Could not confirm the removal of network %s.", n)
762    # Check again to see if there's any network left.
763    asserts.assert_true(
764        not ad.droid.wifiGetConfiguredNetworks(),
765        "Failed to remove these configured Wi-Fi networks: %s" % networks)
766
767
768
769def toggle_airplane_mode_on_and_off(ad):
770    """Turn ON and OFF Airplane mode.
771
772    ad: An AndroidDevice object.
773    Returns: Assert if turning on/off Airplane mode fails.
774
775    """
776    ad.log.debug("Toggling Airplane mode ON.")
777    asserts.assert_true(utils.force_airplane_mode(ad, True),
778                        "Can not turn on airplane mode on: %s" % ad.serial)
779    time.sleep(DEFAULT_TIMEOUT)
780    ad.log.debug("Toggling Airplane mode OFF.")
781    asserts.assert_true(utils.force_airplane_mode(ad, False),
782                        "Can not turn on airplane mode on: %s" % ad.serial)
783    time.sleep(DEFAULT_TIMEOUT)
784
785
786def toggle_wifi_off_and_on(ad):
787    """Turn OFF and ON WiFi.
788
789    ad: An AndroidDevice object.
790    Returns: Assert if turning off/on WiFi fails.
791
792    """
793    ad.log.debug("Toggling wifi OFF.")
794    wifi_toggle_state(ad, False)
795    time.sleep(DEFAULT_TIMEOUT)
796    ad.log.debug("Toggling wifi ON.")
797    wifi_toggle_state(ad, True)
798    time.sleep(DEFAULT_TIMEOUT)
799
800
801def wifi_forget_network(ad, net_ssid):
802    """Remove configured Wifi network on an android device.
803
804    Args:
805        ad: android_device object for forget network.
806        net_ssid: ssid of network to be forget
807
808    """
809    networks = ad.droid.wifiGetConfiguredNetworks()
810    if not networks:
811        return
812    removed = []
813    for n in networks:
814        if net_ssid in n[WifiEnums.SSID_KEY] and n['networkId'] not in removed:
815            ad.droid.wifiForgetNetwork(n['networkId'])
816            removed.append(n['networkId'])
817            try:
818                event = ad.ed.pop_event(wifi_constants.WIFI_FORGET_NW_SUCCESS,
819                                        SHORT_TIMEOUT)
820            except Empty:
821                asserts.fail("Failed to remove network %s." % n)
822            break
823
824
825def wifi_test_device_init(ad, country_code=WifiEnums.CountryCode.US):
826    """Initializes an android device for wifi testing.
827
828    0. Make sure SL4A connection is established on the android device.
829    1. Disable location service's WiFi scan.
830    2. Turn WiFi on.
831    3. Clear all saved networks.
832    4. Set country code to US.
833    5. Enable WiFi verbose logging.
834    6. Sync device time with computer time.
835    7. Turn off cellular data.
836    8. Turn off ambient display.
837    """
838    utils.require_sl4a((ad, ))
839    ad.droid.wifiScannerToggleAlwaysAvailable(False)
840    msg = "Failed to turn off location service's scan."
841    asserts.assert_true(not ad.droid.wifiScannerIsAlwaysAvailable(), msg)
842    wifi_toggle_state(ad, True)
843    reset_wifi(ad)
844    ad.droid.wifiEnableVerboseLogging(1)
845    msg = "Failed to enable WiFi verbose logging."
846    asserts.assert_equal(ad.droid.wifiGetVerboseLoggingLevel(), 1, msg)
847    # We don't verify the following settings since they are not critical.
848    # Set wpa_supplicant log level to EXCESSIVE.
849    output = ad.adb.shell(
850        "wpa_cli -i wlan0 -p -g@android:wpa_wlan0 IFNAME="
851        "wlan0 log_level EXCESSIVE",
852        ignore_status=True)
853    ad.log.info("wpa_supplicant log change status: %s", output)
854    utils.sync_device_time(ad)
855    ad.droid.telephonyToggleDataConnection(False)
856    set_wifi_country_code(ad, country_code)
857    utils.set_ambient_display(ad, False)
858
859
860def set_wifi_country_code(ad, country_code):
861    """Sets the wifi country code on the device.
862
863    Args:
864        ad: An AndroidDevice object.
865        country_code: 2 letter ISO country code
866
867    Raises:
868        An RpcException if unable to set the country code.
869    """
870    try:
871        ad.adb.shell("cmd wifi force-country-code enabled %s" % country_code)
872    except Exception as e:
873        ad.droid.wifiSetCountryCode(WifiEnums.CountryCode.US)
874
875
876def start_wifi_connection_scan(ad):
877    """Starts a wifi connection scan and wait for results to become available.
878
879    Args:
880        ad: An AndroidDevice object.
881    """
882    ad.ed.clear_all_events()
883    ad.droid.wifiStartScan()
884    try:
885        ad.ed.pop_event("WifiManagerScanResultsAvailable", 60)
886    except Empty:
887        asserts.fail("Wi-Fi results did not become available within 60s.")
888
889
890def start_wifi_connection_scan_and_return_status(ad):
891    """
892    Starts a wifi connection scan and wait for results to become available
893    or a scan failure to be reported.
894
895    Args:
896        ad: An AndroidDevice object.
897    Returns:
898        True: if scan succeeded & results are available
899        False: if scan failed
900    """
901    ad.ed.clear_all_events()
902    ad.droid.wifiStartScan()
903    try:
904        events = ad.ed.pop_events("WifiManagerScan(ResultsAvailable|Failure)",
905                                  60)
906    except Empty:
907        asserts.fail(
908            "Wi-Fi scan results/failure did not become available within 60s.")
909    # If there are multiple matches, we check for atleast one success.
910    for event in events:
911        if event["name"] == "WifiManagerScanResultsAvailable":
912            return True
913        elif event["name"] == "WifiManagerScanFailure":
914            ad.log.debug("Scan failure received")
915    return False
916
917
918def start_wifi_connection_scan_and_check_for_network(ad,
919                                                     network_ssid,
920                                                     max_tries=3,
921                                                     found=True):
922    """
923    Start connectivity scans & checks if the |network_ssid| is seen in
924    scan results. The method performs a max of |max_tries| connectivity scans
925    to find the network.
926
927    Args:
928        ad: An AndroidDevice object.
929        network_ssid: SSID of the network we are looking for.
930        max_tries: Number of scans to try.
931        found: True if expected a given SSID to be found; False otherwise.
932    Returns:
933        True: if network_ssid status is expected in scan results.
934        False: if network_ssid status is expected in scan results.
935    """
936    start_time = time.time()
937    for num_tries in range(max_tries):
938        if start_wifi_connection_scan_and_return_status(ad):
939            scan_results = ad.droid.wifiGetScanResults()
940            match_results = match_networks({WifiEnums.SSID_KEY: network_ssid},
941                                           scan_results)
942            if found == (len(match_results) > 0):
943                if found:
944                    ad.log.debug("%s network found in %s seconds." %
945                                  (network_ssid, (time.time() - start_time)))
946                    return True
947                # if found == False, we loop over till max_tries to make sure the ssid is
948                # really no show.
949                elif not found and (num_tries + 1) == max_tries:
950                    ad.log.debug("%s network not found in %d tries in %s seconds." %
951                                 (network_ssid, max_tries, (time.time() - start_time)))
952                    return True
953        else:
954            if (num_tries + 1) == max_tries:
955                break
956            # wait for a while when a WiFi scan is failed, e.g. because of device busy.
957            time.sleep(WIFI_SCAN_RETRY_INTERVAL_SEC)
958    return False
959
960
961def start_wifi_connection_scan_and_ensure_network_found(
962        ad, network_ssid, max_tries=3):
963    """
964    Start connectivity scans & ensure the |network_ssid| is seen in
965    scan results. The method performs a max of |max_tries| connectivity scans
966    to find the network.
967    This method asserts on failure!
968
969    Args:
970        ad: An AndroidDevice object.
971        network_ssid: SSID of the network we are looking for.
972        max_tries: Number of scans to try.
973    """
974    ad.log.info("Starting scans to ensure %s is present", network_ssid)
975    assert_msg = "Failed to find " + network_ssid + " in scan results" \
976        " after " + str(max_tries) + " tries"
977    asserts.assert_true(
978        start_wifi_connection_scan_and_check_for_network(
979            ad, network_ssid, max_tries, True), assert_msg)
980
981
982def start_wifi_connection_scan_and_ensure_network_not_found(
983        ad, network_ssid, max_tries=3):
984    """
985    Start connectivity scans & ensure the |network_ssid| is not seen in
986    scan results. The method performs a max of |max_tries| connectivity scans
987    to find the network.
988    This method asserts on failure!
989
990    Args:
991        ad: An AndroidDevice object.
992        network_ssid: SSID of the network we are looking for.
993        max_tries: Number of scans to try.
994    """
995    ad.log.info("Starting scans to ensure %s is not present", network_ssid)
996    assert_msg = "Found " + network_ssid + " in scan results" \
997        " after " + str(max_tries) + " tries"
998    asserts.assert_true(
999        start_wifi_connection_scan_and_check_for_network(
1000            ad, network_ssid, max_tries, False), assert_msg)
1001
1002
1003def start_wifi_background_scan(ad, scan_setting):
1004    """Starts wifi background scan.
1005
1006    Args:
1007        ad: android_device object to initiate connection on.
1008        scan_setting: A dict representing the settings of the scan.
1009
1010    Returns:
1011        If scan was started successfully, event data of success event is returned.
1012    """
1013    idx = ad.droid.wifiScannerStartBackgroundScan(scan_setting)
1014    event = ad.ed.pop_event("WifiScannerScan{}onSuccess".format(idx),
1015                            SHORT_TIMEOUT)
1016    return event['data']
1017
1018
1019def start_wifi_tethering(ad, ssid, password, band=None, hidden=None,
1020                         security=None):
1021    """Starts wifi tethering on an android_device.
1022
1023    Args:
1024        ad: android_device to start wifi tethering on.
1025        ssid: The SSID the soft AP should broadcast.
1026        password: The password the soft AP should use.
1027        band: The band the soft AP should be set on. It should be either
1028            WifiEnums.WIFI_CONFIG_APBAND_2G or WifiEnums.WIFI_CONFIG_APBAND_5G.
1029        hidden: boolean to indicate if the AP needs to be hidden or not.
1030        security: security type of softap.
1031
1032    Returns:
1033        No return value. Error checks in this function will raise test failure signals
1034    """
1035    config = {WifiEnums.SSID_KEY: ssid}
1036    if password:
1037        config[WifiEnums.PWD_KEY] = password
1038    if band:
1039        config[WifiEnums.AP_BAND_KEY] = band
1040    if hidden:
1041        config[WifiEnums.HIDDEN_KEY] = hidden
1042    if security:
1043        config[WifiEnums.SECURITY] = security
1044    asserts.assert_true(ad.droid.wifiSetWifiApConfiguration(config),
1045                        "Failed to update WifiAp Configuration")
1046    ad.droid.wifiStartTrackingTetherStateChange()
1047    ad.droid.connectivityStartTethering(tel_defines.TETHERING_WIFI, False)
1048    try:
1049        ad.ed.pop_event("ConnectivityManagerOnTetheringStarted")
1050        ad.ed.wait_for_event("TetherStateChanged",
1051                             lambda x: x["data"]["ACTIVE_TETHER"], 30)
1052        ad.log.debug("Tethering started successfully.")
1053    except Empty:
1054        msg = "Failed to receive confirmation of wifi tethering starting"
1055        asserts.fail(msg)
1056    finally:
1057        ad.droid.wifiStopTrackingTetherStateChange()
1058
1059
1060def save_wifi_soft_ap_config(ad,
1061                             wifi_config,
1062                             band=None,
1063                             hidden=None,
1064                             security=None,
1065                             password=None,
1066                             channel=None,
1067                             max_clients=None,
1068                             shutdown_timeout_enable=None,
1069                             shutdown_timeout_millis=None,
1070                             client_control_enable=None,
1071                             allowedList=None,
1072                             blockedList=None,
1073                             bands=None,
1074                             channel_frequencys=None,
1075                             mac_randomization_setting=None,
1076                             bridged_opportunistic_shutdown_enabled=None,
1077                             ieee80211ax_enabled=None):
1078    """ Save a soft ap configuration and verified
1079    Args:
1080        ad: android_device to set soft ap configuration.
1081        wifi_config: a soft ap configuration object, at least include SSID.
1082        band: specifies the band for the soft ap.
1083        hidden: specifies the soft ap need to broadcast its SSID or not.
1084        security: specifies the security type for the soft ap.
1085        password: specifies the password for the soft ap.
1086        channel: specifies the channel for the soft ap.
1087        max_clients: specifies the maximum connected client number.
1088        shutdown_timeout_enable: specifies the auto shut down enable or not.
1089        shutdown_timeout_millis: specifies the shut down timeout value.
1090        client_control_enable: specifies the client control enable or not.
1091        allowedList: specifies allowed clients list.
1092        blockedList: specifies blocked clients list.
1093        bands: specifies the band list for the soft ap.
1094        channel_frequencys: specifies the channel frequency list for soft ap.
1095        mac_randomization_setting: specifies the mac randomization setting.
1096        bridged_opportunistic_shutdown_enabled: specifies the opportunistic
1097                shutdown enable or not.
1098        ieee80211ax_enabled: specifies the ieee80211ax enable or not.
1099    """
1100    if security and password:
1101        wifi_config[WifiEnums.SECURITY] = security
1102        wifi_config[WifiEnums.PWD_KEY] = password
1103    if hidden is not None:
1104        wifi_config[WifiEnums.HIDDEN_KEY] = hidden
1105    if max_clients is not None:
1106        wifi_config[WifiEnums.AP_MAXCLIENTS_KEY] = max_clients
1107    if shutdown_timeout_enable is not None:
1108        wifi_config[
1109            WifiEnums.AP_SHUTDOWNTIMEOUTENABLE_KEY] = shutdown_timeout_enable
1110    if shutdown_timeout_millis is not None:
1111        wifi_config[WifiEnums.AP_SHUTDOWNTIMEOUT_KEY] = shutdown_timeout_millis
1112    if client_control_enable is not None:
1113        wifi_config[WifiEnums.AP_CLIENTCONTROL_KEY] = client_control_enable
1114    if allowedList is not None:
1115        wifi_config[WifiEnums.AP_ALLOWEDLIST_KEY] = allowedList
1116    if blockedList is not None:
1117        wifi_config[WifiEnums.AP_BLOCKEDLIST_KEY] = blockedList
1118    if mac_randomization_setting is not None:
1119        wifi_config[WifiEnums.AP_MAC_RANDOMIZATION_SETTING_KEY
1120                ] = mac_randomization_setting
1121    if bridged_opportunistic_shutdown_enabled is not None:
1122        wifi_config[WifiEnums.AP_BRIDGED_OPPORTUNISTIC_SHUTDOWN_ENABLE_KEY
1123                ] = bridged_opportunistic_shutdown_enabled
1124    if ieee80211ax_enabled is not None:
1125       wifi_config[WifiEnums.AP_IEEE80211AX_ENABLED_KEY]= ieee80211ax_enabled
1126    if channel_frequencys is not None:
1127        wifi_config[WifiEnums.AP_CHANNEL_FREQUENCYS_KEY] = channel_frequencys
1128    elif bands is not None:
1129        wifi_config[WifiEnums.AP_BANDS_KEY] = bands
1130    elif band is not None:
1131        if channel is not None:
1132            wifi_config[WifiEnums.AP_BAND_KEY] = band
1133            wifi_config[WifiEnums.AP_CHANNEL_KEY] = channel
1134        else:
1135             wifi_config[WifiEnums.AP_BAND_KEY] = band
1136
1137    if WifiEnums.AP_CHANNEL_KEY in wifi_config and wifi_config[
1138            WifiEnums.AP_CHANNEL_KEY] == 0:
1139        del wifi_config[WifiEnums.AP_CHANNEL_KEY]
1140
1141    if WifiEnums.SECURITY in wifi_config and wifi_config[
1142            WifiEnums.SECURITY] == WifiEnums.SoftApSecurityType.OPEN:
1143        del wifi_config[WifiEnums.SECURITY]
1144        del wifi_config[WifiEnums.PWD_KEY]
1145
1146    asserts.assert_true(ad.droid.wifiSetWifiApConfiguration(wifi_config),
1147                        "Failed to set WifiAp Configuration")
1148
1149    wifi_ap = ad.droid.wifiGetApConfiguration()
1150    asserts.assert_true(
1151        wifi_ap[WifiEnums.SSID_KEY] == wifi_config[WifiEnums.SSID_KEY],
1152        "Hotspot SSID doesn't match")
1153    if WifiEnums.SECURITY in wifi_config:
1154        asserts.assert_true(
1155            wifi_ap[WifiEnums.SECURITY] == wifi_config[WifiEnums.SECURITY],
1156            "Hotspot Security doesn't match")
1157    if WifiEnums.PWD_KEY in wifi_config:
1158        asserts.assert_true(
1159            wifi_ap[WifiEnums.PWD_KEY] == wifi_config[WifiEnums.PWD_KEY],
1160            "Hotspot Password doesn't match")
1161
1162    if WifiEnums.HIDDEN_KEY in wifi_config:
1163        asserts.assert_true(
1164            wifi_ap[WifiEnums.HIDDEN_KEY] == wifi_config[WifiEnums.HIDDEN_KEY],
1165            "Hotspot hidden setting doesn't match")
1166
1167    if WifiEnums.AP_CHANNEL_KEY in wifi_config:
1168        asserts.assert_true(
1169            wifi_ap[WifiEnums.AP_CHANNEL_KEY] == wifi_config[
1170                WifiEnums.AP_CHANNEL_KEY], "Hotspot Channel doesn't match")
1171    if WifiEnums.AP_MAXCLIENTS_KEY in wifi_config:
1172        asserts.assert_true(
1173            wifi_ap[WifiEnums.AP_MAXCLIENTS_KEY] == wifi_config[
1174                WifiEnums.AP_MAXCLIENTS_KEY],
1175            "Hotspot Max Clients doesn't match")
1176    if WifiEnums.AP_SHUTDOWNTIMEOUTENABLE_KEY in wifi_config:
1177        asserts.assert_true(
1178            wifi_ap[WifiEnums.AP_SHUTDOWNTIMEOUTENABLE_KEY] == wifi_config[
1179                WifiEnums.AP_SHUTDOWNTIMEOUTENABLE_KEY],
1180            "Hotspot ShutDown feature flag doesn't match")
1181    if WifiEnums.AP_SHUTDOWNTIMEOUT_KEY in wifi_config:
1182        asserts.assert_true(
1183            wifi_ap[WifiEnums.AP_SHUTDOWNTIMEOUT_KEY] == wifi_config[
1184                WifiEnums.AP_SHUTDOWNTIMEOUT_KEY],
1185            "Hotspot ShutDown timeout setting doesn't match")
1186    if WifiEnums.AP_CLIENTCONTROL_KEY in wifi_config:
1187        asserts.assert_true(
1188            wifi_ap[WifiEnums.AP_CLIENTCONTROL_KEY] == wifi_config[
1189                WifiEnums.AP_CLIENTCONTROL_KEY],
1190            "Hotspot Client control flag doesn't match")
1191    if WifiEnums.AP_ALLOWEDLIST_KEY in wifi_config:
1192        asserts.assert_true(
1193            wifi_ap[WifiEnums.AP_ALLOWEDLIST_KEY] == wifi_config[
1194                WifiEnums.AP_ALLOWEDLIST_KEY],
1195            "Hotspot Allowed List doesn't match")
1196    if WifiEnums.AP_BLOCKEDLIST_KEY in wifi_config:
1197        asserts.assert_true(
1198            wifi_ap[WifiEnums.AP_BLOCKEDLIST_KEY] == wifi_config[
1199                WifiEnums.AP_BLOCKEDLIST_KEY],
1200            "Hotspot Blocked List doesn't match")
1201
1202    if WifiEnums.AP_MAC_RANDOMIZATION_SETTING_KEY in wifi_config:
1203        asserts.assert_true(
1204            wifi_ap[WifiEnums.AP_MAC_RANDOMIZATION_SETTING_KEY] == wifi_config[
1205                  WifiEnums.AP_MAC_RANDOMIZATION_SETTING_KEY],
1206            "Hotspot Mac randomization setting doesn't match")
1207
1208    if WifiEnums.AP_BRIDGED_OPPORTUNISTIC_SHUTDOWN_ENABLE_KEY in wifi_config:
1209        asserts.assert_true(
1210            wifi_ap[WifiEnums.AP_BRIDGED_OPPORTUNISTIC_SHUTDOWN_ENABLE_KEY] == wifi_config[
1211                  WifiEnums.AP_BRIDGED_OPPORTUNISTIC_SHUTDOWN_ENABLE_KEY],
1212            "Hotspot bridged shutdown enable setting doesn't match")
1213
1214    if WifiEnums.AP_IEEE80211AX_ENABLED_KEY in wifi_config:
1215        asserts.assert_true(
1216            wifi_ap[WifiEnums.AP_IEEE80211AX_ENABLED_KEY] == wifi_config[
1217                  WifiEnums.AP_IEEE80211AX_ENABLED_KEY],
1218            "Hotspot 80211 AX enable setting doesn't match")
1219
1220    if WifiEnums.AP_CHANNEL_FREQUENCYS_KEY in wifi_config:
1221        asserts.assert_true(
1222            wifi_ap[WifiEnums.AP_CHANNEL_FREQUENCYS_KEY] == wifi_config[
1223                  WifiEnums.AP_CHANNEL_FREQUENCYS_KEY],
1224            "Hotspot channels setting doesn't match")
1225
1226def start_wifi_tethering_saved_config(ad):
1227    """ Turn on wifi hotspot with a config that is already saved """
1228    ad.droid.wifiStartTrackingTetherStateChange()
1229    ad.droid.connectivityStartTethering(tel_defines.TETHERING_WIFI, False)
1230    try:
1231        ad.ed.pop_event("ConnectivityManagerOnTetheringStarted")
1232        ad.ed.wait_for_event("TetherStateChanged",
1233                             lambda x: x["data"]["ACTIVE_TETHER"], 30)
1234    except:
1235        asserts.fail("Didn't receive wifi tethering starting confirmation")
1236    finally:
1237        ad.droid.wifiStopTrackingTetherStateChange()
1238
1239
1240def stop_wifi_tethering(ad):
1241    """Stops wifi tethering on an android_device.
1242    Args:
1243        ad: android_device to stop wifi tethering on.
1244    """
1245    ad.droid.wifiStartTrackingTetherStateChange()
1246    ad.droid.connectivityStopTethering(tel_defines.TETHERING_WIFI)
1247    try:
1248        ad.ed.pop_event("WifiManagerApDisabled", 30)
1249        ad.ed.wait_for_event("TetherStateChanged",
1250                             lambda x: not x["data"]["ACTIVE_TETHER"], 30)
1251    except Empty:
1252        msg = "Failed to receive confirmation of wifi tethering stopping"
1253        asserts.fail(msg)
1254    finally:
1255        ad.droid.wifiStopTrackingTetherStateChange()
1256
1257
1258def toggle_wifi_and_wait_for_reconnection(ad,
1259                                          network,
1260                                          num_of_tries=1,
1261                                          assert_on_fail=True):
1262    """Toggle wifi state and then wait for Android device to reconnect to
1263    the provided wifi network.
1264
1265    This expects the device to be already connected to the provided network.
1266
1267    Logic steps are
1268     1. Ensure that we're connected to the network.
1269     2. Turn wifi off.
1270     3. Wait for 10 seconds.
1271     4. Turn wifi on.
1272     5. Wait for the "connected" event, then confirm the connected ssid is the
1273        one requested.
1274
1275    Args:
1276        ad: android_device object to initiate connection on.
1277        network: A dictionary representing the network to await connection. The
1278                 dictionary must have the key "SSID".
1279        num_of_tries: An integer that is the number of times to try before
1280                      delaring failure. Default is 1.
1281        assert_on_fail: If True, error checks in this function will raise test
1282                        failure signals.
1283
1284    Returns:
1285        If assert_on_fail is False, function returns True if the toggle was
1286        successful, False otherwise. If assert_on_fail is True, no return value.
1287    """
1288    return _assert_on_fail_handler(_toggle_wifi_and_wait_for_reconnection,
1289                                   assert_on_fail,
1290                                   ad,
1291                                   network,
1292                                   num_of_tries=num_of_tries)
1293
1294
1295def _toggle_wifi_and_wait_for_reconnection(ad, network, num_of_tries=3):
1296    """Toggle wifi state and then wait for Android device to reconnect to
1297    the provided wifi network.
1298
1299    This expects the device to be already connected to the provided network.
1300
1301    Logic steps are
1302     1. Ensure that we're connected to the network.
1303     2. Turn wifi off.
1304     3. Wait for 10 seconds.
1305     4. Turn wifi on.
1306     5. Wait for the "connected" event, then confirm the connected ssid is the
1307        one requested.
1308
1309    This will directly fail a test if anything goes wrong.
1310
1311    Args:
1312        ad: android_device object to initiate connection on.
1313        network: A dictionary representing the network to await connection. The
1314                 dictionary must have the key "SSID".
1315        num_of_tries: An integer that is the number of times to try before
1316                      delaring failure. Default is 1.
1317    """
1318    expected_ssid = network[WifiEnums.SSID_KEY]
1319    # First ensure that we're already connected to the provided network.
1320    verify_con = {WifiEnums.SSID_KEY: expected_ssid}
1321    verify_wifi_connection_info(ad, verify_con)
1322    # Now toggle wifi state and wait for the connection event.
1323    wifi_toggle_state(ad, False)
1324    time.sleep(10)
1325    wifi_toggle_state(ad, True)
1326    ad.droid.wifiStartTrackingStateChange()
1327    try:
1328        connect_result = None
1329        for i in range(num_of_tries):
1330            try:
1331                connect_result = ad.ed.pop_event(wifi_constants.WIFI_CONNECTED,
1332                                                 30)
1333                break
1334            except Empty:
1335                pass
1336        asserts.assert_true(
1337            connect_result, "Failed to connect to Wi-Fi network %s on %s" %
1338            (network, ad.serial))
1339        logging.debug("Connection result on %s: %s.", ad.serial,
1340                      connect_result)
1341        actual_ssid = connect_result['data'][WifiEnums.SSID_KEY]
1342        asserts.assert_equal(
1343            actual_ssid, expected_ssid, "Connected to the wrong network on %s."
1344            "Expected %s, but got %s." %
1345            (ad.serial, expected_ssid, actual_ssid))
1346        logging.info("Connected to Wi-Fi network %s on %s", actual_ssid,
1347                     ad.serial)
1348    finally:
1349        ad.droid.wifiStopTrackingStateChange()
1350
1351
1352def wait_for_connect(ad,
1353                     expected_ssid=None,
1354                     expected_id=None,
1355                     tries=2,
1356                     assert_on_fail=True):
1357    """Wait for a connect event.
1358
1359    This will directly fail a test if anything goes wrong.
1360
1361    Args:
1362        ad: An Android device object.
1363        expected_ssid: SSID of the network to connect to.
1364        expected_id: Network Id of the network to connect to.
1365        tries: An integer that is the number of times to try before failing.
1366        assert_on_fail: If True, error checks in this function will raise test
1367                        failure signals.
1368
1369    Returns:
1370        Returns a value only if assert_on_fail is false.
1371        Returns True if the connection was successful, False otherwise.
1372    """
1373    return _assert_on_fail_handler(_wait_for_connect, assert_on_fail, ad,
1374                                   expected_ssid, expected_id, tries)
1375
1376
1377def _wait_for_connect(ad, expected_ssid=None, expected_id=None, tries=2):
1378    """Wait for a connect event.
1379
1380    Args:
1381        ad: An Android device object.
1382        expected_ssid: SSID of the network to connect to.
1383        expected_id: Network Id of the network to connect to.
1384        tries: An integer that is the number of times to try before failing.
1385    """
1386    ad.droid.wifiStartTrackingStateChange()
1387    try:
1388        connect_result = _wait_for_connect_event(ad,
1389                                                 ssid=expected_ssid,
1390                                                 id=expected_id,
1391                                                 tries=tries)
1392        asserts.assert_true(
1393            connect_result,
1394            "Failed to connect to Wi-Fi network %s" % expected_ssid)
1395        ad.log.debug("Wi-Fi connection result: %s.", connect_result)
1396        actual_ssid = connect_result['data'][WifiEnums.SSID_KEY]
1397        if expected_ssid:
1398            asserts.assert_equal(actual_ssid, expected_ssid,
1399                                 "Connected to the wrong network")
1400        actual_id = connect_result['data'][WifiEnums.NETID_KEY]
1401        if expected_id:
1402            asserts.assert_equal(actual_id, expected_id,
1403                                 "Connected to the wrong network")
1404        ad.log.info("Connected to Wi-Fi network %s.", actual_ssid)
1405    except Empty:
1406        asserts.fail("Failed to start connection process to %s" %
1407                     expected_ssid)
1408    except Exception as error:
1409        ad.log.error("Failed to connect to %s with error %s", expected_ssid,
1410                     error)
1411        raise signals.TestFailure("Failed to connect to %s network" %
1412                                  expected_ssid)
1413    finally:
1414        ad.droid.wifiStopTrackingStateChange()
1415
1416
1417def _wait_for_connect_event(ad, ssid=None, id=None, tries=1):
1418    """Wait for a connect event on queue and pop when available.
1419
1420    Args:
1421        ad: An Android device object.
1422        ssid: SSID of the network to connect to.
1423        id: Network Id of the network to connect to.
1424        tries: An integer that is the number of times to try before failing.
1425
1426    Returns:
1427        A dict with details of the connection data, which looks like this:
1428        {
1429         'time': 1485460337798,
1430         'name': 'WifiNetworkConnected',
1431         'data': {
1432                  'rssi': -27,
1433                  'is_24ghz': True,
1434                  'mac_address': '02:00:00:00:00:00',
1435                  'network_id': 1,
1436                  'BSSID': '30:b5:c2:33:d3:fc',
1437                  'ip_address': 117483712,
1438                  'link_speed': 54,
1439                  'supplicant_state': 'completed',
1440                  'hidden_ssid': False,
1441                  'SSID': 'wh_ap1_2g',
1442                  'is_5ghz': False}
1443        }
1444
1445    """
1446    conn_result = None
1447
1448    # If ssid and network id is None, just wait for any connect event.
1449    if id is None and ssid is None:
1450        for i in range(tries):
1451            try:
1452                conn_result = ad.ed.pop_event(wifi_constants.WIFI_CONNECTED,
1453                                              30)
1454                break
1455            except Empty:
1456                pass
1457    else:
1458        # If ssid or network id is specified, wait for specific connect event.
1459        for i in range(tries):
1460            try:
1461                conn_result = ad.ed.pop_event(wifi_constants.WIFI_CONNECTED,
1462                                              30)
1463                if id and conn_result['data'][WifiEnums.NETID_KEY] == id:
1464                    break
1465                elif ssid and conn_result['data'][WifiEnums.SSID_KEY] == ssid:
1466                    break
1467            except Empty:
1468                pass
1469
1470    return conn_result
1471
1472
1473def wait_for_disconnect(ad, timeout=10):
1474    """Wait for a disconnect event within the specified timeout.
1475
1476    Args:
1477        ad: Android device object.
1478        timeout: Timeout in seconds.
1479
1480    """
1481    try:
1482        ad.droid.wifiStartTrackingStateChange()
1483        event = ad.ed.pop_event("WifiNetworkDisconnected", timeout)
1484    except Empty:
1485        raise signals.TestFailure("Device did not disconnect from the network")
1486    finally:
1487        ad.droid.wifiStopTrackingStateChange()
1488
1489
1490def ensure_no_disconnect(ad, duration=10):
1491    """Ensure that there is no disconnect for the specified duration.
1492
1493    Args:
1494        ad: Android device object.
1495        duration: Duration in seconds.
1496
1497    """
1498    try:
1499        ad.droid.wifiStartTrackingStateChange()
1500        event = ad.ed.pop_event("WifiNetworkDisconnected", duration)
1501        raise signals.TestFailure("Device disconnected from the network")
1502    except Empty:
1503        pass
1504    finally:
1505        ad.droid.wifiStopTrackingStateChange()
1506
1507
1508def connect_to_wifi_network(ad, network, assert_on_fail=True,
1509                            check_connectivity=True, hidden=False,
1510                            num_of_scan_tries=DEFAULT_SCAN_TRIES,
1511                            num_of_connect_tries=DEFAULT_CONNECT_TRIES):
1512    """Connection logic for open and psk wifi networks.
1513
1514    Args:
1515        ad: AndroidDevice to use for connection
1516        network: network info of the network to connect to
1517        assert_on_fail: If true, errors from wifi_connect will raise
1518                        test failure signals.
1519        hidden: Is the Wifi network hidden.
1520        num_of_scan_tries: The number of times to try scan
1521                           interface before declaring failure.
1522        num_of_connect_tries: The number of times to try
1523                              connect wifi before declaring failure.
1524    """
1525    if hidden:
1526        start_wifi_connection_scan_and_ensure_network_not_found(
1527            ad, network[WifiEnums.SSID_KEY], max_tries=num_of_scan_tries)
1528    else:
1529        start_wifi_connection_scan_and_ensure_network_found(
1530            ad, network[WifiEnums.SSID_KEY], max_tries=num_of_scan_tries)
1531    wifi_connect(ad,
1532                 network,
1533                 num_of_tries=num_of_connect_tries,
1534                 assert_on_fail=assert_on_fail,
1535                 check_connectivity=check_connectivity)
1536
1537
1538def connect_to_wifi_network_with_id(ad, network_id, network_ssid):
1539    """Connect to the given network using network id and verify SSID.
1540
1541    Args:
1542        network_id: int Network Id of the network.
1543        network_ssid: string SSID of the network.
1544
1545    Returns: True if connect using network id was successful;
1546             False otherwise.
1547
1548    """
1549    start_wifi_connection_scan_and_ensure_network_found(ad, network_ssid)
1550    wifi_connect_by_id(ad, network_id)
1551    connect_data = ad.droid.wifiGetConnectionInfo()
1552    connect_ssid = connect_data[WifiEnums.SSID_KEY]
1553    ad.log.debug("Expected SSID = %s Connected SSID = %s" %
1554                 (network_ssid, connect_ssid))
1555    if connect_ssid != network_ssid:
1556        return False
1557    return True
1558
1559
1560def wifi_connect(ad,
1561                 network,
1562                 num_of_tries=1,
1563                 assert_on_fail=True,
1564                 check_connectivity=True):
1565    """Connect an Android device to a wifi network.
1566
1567    Initiate connection to a wifi network, wait for the "connected" event, then
1568    confirm the connected ssid is the one requested.
1569
1570    This will directly fail a test if anything goes wrong.
1571
1572    Args:
1573        ad: android_device object to initiate connection on.
1574        network: A dictionary representing the network to connect to. The
1575                 dictionary must have the key "SSID".
1576        num_of_tries: An integer that is the number of times to try before
1577                      delaring failure. Default is 1.
1578        assert_on_fail: If True, error checks in this function will raise test
1579                        failure signals.
1580
1581    Returns:
1582        Returns a value only if assert_on_fail is false.
1583        Returns True if the connection was successful, False otherwise.
1584    """
1585    return _assert_on_fail_handler(_wifi_connect,
1586                                   assert_on_fail,
1587                                   ad,
1588                                   network,
1589                                   num_of_tries=num_of_tries,
1590                                   check_connectivity=check_connectivity)
1591
1592
1593def _wifi_connect(ad, network, num_of_tries=1, check_connectivity=True):
1594    """Connect an Android device to a wifi network.
1595
1596    Initiate connection to a wifi network, wait for the "connected" event, then
1597    confirm the connected ssid is the one requested.
1598
1599    This will directly fail a test if anything goes wrong.
1600
1601    Args:
1602        ad: android_device object to initiate connection on.
1603        network: A dictionary representing the network to connect to. The
1604                 dictionary must have the key "SSID".
1605        num_of_tries: An integer that is the number of times to try before
1606                      delaring failure. Default is 1.
1607    """
1608    asserts.assert_true(
1609        WifiEnums.SSID_KEY in network,
1610        "Key '%s' must be present in network definition." % WifiEnums.SSID_KEY)
1611    ad.droid.wifiStartTrackingStateChange()
1612    expected_ssid = network[WifiEnums.SSID_KEY]
1613    ad.droid.wifiConnectByConfig(network)
1614    ad.log.info("Starting connection process to %s", expected_ssid)
1615    try:
1616        event = ad.ed.pop_event(wifi_constants.CONNECT_BY_CONFIG_SUCCESS, 30)
1617        connect_result = _wait_for_connect_event(ad,
1618                                                 ssid=expected_ssid,
1619                                                 tries=num_of_tries)
1620        asserts.assert_true(
1621            connect_result, "Failed to connect to Wi-Fi network %s on %s" %
1622            (network, ad.serial))
1623        ad.log.debug("Wi-Fi connection result: %s.", connect_result)
1624        actual_ssid = connect_result['data'][WifiEnums.SSID_KEY]
1625        asserts.assert_equal(
1626            actual_ssid, expected_ssid,
1627            "Connected to the wrong network on %s." % ad.serial)
1628        ad.log.info("Connected to Wi-Fi network %s.", actual_ssid)
1629
1630        if check_connectivity:
1631            internet = validate_connection(ad, DEFAULT_PING_ADDR)
1632            if not internet:
1633                raise signals.TestFailure(
1634                    "Failed to connect to internet on %s" % expected_ssid)
1635    except Empty:
1636        asserts.fail("Failed to start connection process to %s on %s" %
1637                     (network, ad.serial))
1638    except Exception as error:
1639        ad.log.error("Failed to connect to %s with error %s", expected_ssid,
1640                     error)
1641        raise signals.TestFailure("Failed to connect to %s network" % network)
1642
1643    finally:
1644        ad.droid.wifiStopTrackingStateChange()
1645
1646
1647def wifi_connect_by_id(ad, network_id, num_of_tries=3, assert_on_fail=True):
1648    """Connect an Android device to a wifi network using network Id.
1649
1650    Start connection to the wifi network, with the given network Id, wait for
1651    the "connected" event, then verify the connected network is the one requested.
1652
1653    This will directly fail a test if anything goes wrong.
1654
1655    Args:
1656        ad: android_device object to initiate connection on.
1657        network_id: Integer specifying the network id of the network.
1658        num_of_tries: An integer that is the number of times to try before
1659                      delaring failure. Default is 1.
1660        assert_on_fail: If True, error checks in this function will raise test
1661                        failure signals.
1662
1663    Returns:
1664        Returns a value only if assert_on_fail is false.
1665        Returns True if the connection was successful, False otherwise.
1666    """
1667    _assert_on_fail_handler(_wifi_connect_by_id, assert_on_fail, ad,
1668                            network_id, num_of_tries)
1669
1670
1671def _wifi_connect_by_id(ad, network_id, num_of_tries=1):
1672    """Connect an Android device to a wifi network using it's network id.
1673
1674    Start connection to the wifi network, with the given network id, wait for
1675    the "connected" event, then verify the connected network is the one requested.
1676
1677    Args:
1678        ad: android_device object to initiate connection on.
1679        network_id: Integer specifying the network id of the network.
1680        num_of_tries: An integer that is the number of times to try before
1681                      delaring failure. Default is 1.
1682    """
1683    ad.droid.wifiStartTrackingStateChange()
1684    # Clear all previous events.
1685    ad.ed.clear_all_events()
1686    ad.droid.wifiConnectByNetworkId(network_id)
1687    ad.log.info("Starting connection to network with id %d", network_id)
1688    try:
1689        event = ad.ed.pop_event(wifi_constants.CONNECT_BY_NETID_SUCCESS, 60)
1690        connect_result = _wait_for_connect_event(ad,
1691                                                 id=network_id,
1692                                                 tries=num_of_tries)
1693        asserts.assert_true(
1694            connect_result,
1695            "Failed to connect to Wi-Fi network using network id")
1696        ad.log.debug("Wi-Fi connection result: %s", connect_result)
1697        actual_id = connect_result['data'][WifiEnums.NETID_KEY]
1698        asserts.assert_equal(
1699            actual_id, network_id, "Connected to the wrong network on %s."
1700            "Expected network id = %d, but got %d." %
1701            (ad.serial, network_id, actual_id))
1702        expected_ssid = connect_result['data'][WifiEnums.SSID_KEY]
1703        ad.log.info("Connected to Wi-Fi network %s with %d network id.",
1704                    expected_ssid, network_id)
1705
1706        internet = validate_connection(ad, DEFAULT_PING_ADDR)
1707        if not internet:
1708            raise signals.TestFailure("Failed to connect to internet on %s" %
1709                                      expected_ssid)
1710    except Empty:
1711        asserts.fail("Failed to connect to network with id %d on %s" %
1712                     (network_id, ad.serial))
1713    except Exception as error:
1714        ad.log.error("Failed to connect to network with id %d with error %s",
1715                     network_id, error)
1716        raise signals.TestFailure("Failed to connect to network with network"
1717                                  " id %d" % network_id)
1718    finally:
1719        ad.droid.wifiStopTrackingStateChange()
1720
1721
1722def wifi_connect_using_network_request(ad,
1723                                       network,
1724                                       network_specifier,
1725                                       num_of_tries=3):
1726    """Connect an Android device to a wifi network using network request.
1727
1728    Trigger a network request with the provided network specifier,
1729    wait for the "onMatch" event, ensure that the scan results in "onMatch"
1730    event contain the specified network, then simulate the user granting the
1731    request with the specified network selected. Then wait for the "onAvailable"
1732    network callback indicating successful connection to network.
1733
1734    Args:
1735        ad: android_device object to initiate connection on.
1736        network_specifier: A dictionary representing the network specifier to
1737                           use.
1738        network: A dictionary representing the network to connect to. The
1739                 dictionary must have the key "SSID".
1740        num_of_tries: An integer that is the number of times to try before
1741                      delaring failure.
1742    Returns:
1743        key: Key corresponding to network request.
1744    """
1745    key = ad.droid.connectivityRequestWifiNetwork(network_specifier, 0)
1746    ad.log.info("Sent network request %s with %s " % (key, network_specifier))
1747    # Need a delay here because UI interaction should only start once wifi
1748    # starts processing the request.
1749    time.sleep(wifi_constants.NETWORK_REQUEST_CB_REGISTER_DELAY_SEC)
1750    _wait_for_wifi_connect_after_network_request(ad, network, key,
1751                                                 num_of_tries)
1752    return key
1753
1754
1755def wait_for_wifi_connect_after_network_request(ad,
1756                                                network,
1757                                                key,
1758                                                num_of_tries=3,
1759                                                assert_on_fail=True):
1760    """
1761    Simulate and verify the connection flow after initiating the network
1762    request.
1763
1764    Wait for the "onMatch" event, ensure that the scan results in "onMatch"
1765    event contain the specified network, then simulate the user granting the
1766    request with the specified network selected. Then wait for the "onAvailable"
1767    network callback indicating successful connection to network.
1768
1769    Args:
1770        ad: android_device object to initiate connection on.
1771        network: A dictionary representing the network to connect to. The
1772                 dictionary must have the key "SSID".
1773        key: Key corresponding to network request.
1774        num_of_tries: An integer that is the number of times to try before
1775                      delaring failure.
1776        assert_on_fail: If True, error checks in this function will raise test
1777                        failure signals.
1778
1779    Returns:
1780        Returns a value only if assert_on_fail is false.
1781        Returns True if the connection was successful, False otherwise.
1782    """
1783    _assert_on_fail_handler(_wait_for_wifi_connect_after_network_request,
1784                            assert_on_fail, ad, network, key, num_of_tries)
1785
1786
1787def _wait_for_wifi_connect_after_network_request(ad,
1788                                                 network,
1789                                                 key,
1790                                                 num_of_tries=3):
1791    """
1792    Simulate and verify the connection flow after initiating the network
1793    request.
1794
1795    Wait for the "onMatch" event, ensure that the scan results in "onMatch"
1796    event contain the specified network, then simulate the user granting the
1797    request with the specified network selected. Then wait for the "onAvailable"
1798    network callback indicating successful connection to network.
1799
1800    Args:
1801        ad: android_device object to initiate connection on.
1802        network: A dictionary representing the network to connect to. The
1803                 dictionary must have the key "SSID".
1804        key: Key corresponding to network request.
1805        num_of_tries: An integer that is the number of times to try before
1806                      delaring failure.
1807    """
1808    asserts.assert_true(
1809        WifiEnums.SSID_KEY in network,
1810        "Key '%s' must be present in network definition." % WifiEnums.SSID_KEY)
1811    ad.droid.wifiStartTrackingStateChange()
1812    expected_ssid = network[WifiEnums.SSID_KEY]
1813    ad.droid.wifiRegisterNetworkRequestMatchCallback()
1814    # Wait for the platform to scan and return a list of networks
1815    # matching the request
1816    try:
1817        matched_network = None
1818        for _ in [0, num_of_tries]:
1819            on_match_event = ad.ed.pop_event(
1820                wifi_constants.WIFI_NETWORK_REQUEST_MATCH_CB_ON_MATCH, 60)
1821            asserts.assert_true(on_match_event,
1822                                "Network request on match not received.")
1823            matched_scan_results = on_match_event["data"]
1824            ad.log.debug("Network request on match results %s",
1825                         matched_scan_results)
1826            matched_network = match_networks(
1827                {WifiEnums.SSID_KEY: network[WifiEnums.SSID_KEY]},
1828                matched_scan_results)
1829            ad.log.debug("Network request on match %s", matched_network)
1830            if matched_network:
1831                break
1832
1833        asserts.assert_true(matched_network,
1834                            "Target network %s not found" % network)
1835
1836        ad.droid.wifiSendUserSelectionForNetworkRequestMatch(network)
1837        ad.log.info("Sent user selection for network request %s",
1838                    expected_ssid)
1839
1840        # Wait for the platform to connect to the network.
1841        autils.wait_for_event_with_keys(
1842            ad, cconsts.EVENT_NETWORK_CALLBACK, 60,
1843            (cconsts.NETWORK_CB_KEY_ID, key),
1844            (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_AVAILABLE))
1845        on_capabilities_changed = autils.wait_for_event_with_keys(
1846            ad, cconsts.EVENT_NETWORK_CALLBACK, 10,
1847            (cconsts.NETWORK_CB_KEY_ID, key),
1848            (cconsts.NETWORK_CB_KEY_EVENT,
1849             cconsts.NETWORK_CB_CAPABILITIES_CHANGED))
1850        connected_network = None
1851        # WifiInfo is attached to TransportInfo only in S.
1852        if ad.droid.isSdkAtLeastS():
1853            connected_network = (
1854                on_capabilities_changed["data"][
1855                    cconsts.NETWORK_CB_KEY_TRANSPORT_INFO]
1856            )
1857        else:
1858            connected_network = ad.droid.wifiGetConnectionInfo()
1859        ad.log.info("Connected to network %s", connected_network)
1860        asserts.assert_equal(
1861            connected_network[WifiEnums.SSID_KEY], expected_ssid,
1862            "Connected to the wrong network."
1863            "Expected %s, but got %s." % (network, connected_network))
1864    except Empty:
1865        asserts.fail("Failed to connect to %s" % expected_ssid)
1866    except Exception as error:
1867        ad.log.error("Failed to connect to %s with error %s" %
1868                     (expected_ssid, error))
1869        raise signals.TestFailure("Failed to connect to %s network" % network)
1870    finally:
1871        ad.droid.wifiStopTrackingStateChange()
1872
1873
1874def wifi_passpoint_connect(ad,
1875                           passpoint_network,
1876                           num_of_tries=1,
1877                           assert_on_fail=True):
1878    """Connect an Android device to a wifi network.
1879
1880    Initiate connection to a wifi network, wait for the "connected" event, then
1881    confirm the connected ssid is the one requested.
1882
1883    This will directly fail a test if anything goes wrong.
1884
1885    Args:
1886        ad: android_device object to initiate connection on.
1887        passpoint_network: SSID of the Passpoint network to connect to.
1888        num_of_tries: An integer that is the number of times to try before
1889                      delaring failure. Default is 1.
1890        assert_on_fail: If True, error checks in this function will raise test
1891                        failure signals.
1892
1893    Returns:
1894        If assert_on_fail is False, function returns network id, if the connect was
1895        successful, False otherwise. If assert_on_fail is True, no return value.
1896    """
1897    _assert_on_fail_handler(_wifi_passpoint_connect,
1898                            assert_on_fail,
1899                            ad,
1900                            passpoint_network,
1901                            num_of_tries=num_of_tries)
1902
1903
1904def _wifi_passpoint_connect(ad, passpoint_network, num_of_tries=1):
1905    """Connect an Android device to a wifi network.
1906
1907    Initiate connection to a wifi network, wait for the "connected" event, then
1908    confirm the connected ssid is the one requested.
1909
1910    This will directly fail a test if anything goes wrong.
1911
1912    Args:
1913        ad: android_device object to initiate connection on.
1914        passpoint_network: SSID of the Passpoint network to connect to.
1915        num_of_tries: An integer that is the number of times to try before
1916                      delaring failure. Default is 1.
1917    """
1918    ad.droid.wifiStartTrackingStateChange()
1919    expected_ssid = passpoint_network
1920    ad.log.info("Starting connection process to passpoint %s", expected_ssid)
1921
1922    try:
1923        connect_result = _wait_for_connect_event(ad, expected_ssid,
1924                                                 num_of_tries)
1925        asserts.assert_true(
1926            connect_result, "Failed to connect to WiFi passpoint network %s on"
1927            " %s" % (expected_ssid, ad.serial))
1928        ad.log.info("Wi-Fi connection result: %s.", connect_result)
1929        actual_ssid = connect_result['data'][WifiEnums.SSID_KEY]
1930        asserts.assert_equal(
1931            actual_ssid, expected_ssid,
1932            "Connected to the wrong network on %s." % ad.serial)
1933        ad.log.info("Connected to Wi-Fi passpoint network %s.", actual_ssid)
1934
1935        internet = validate_connection(ad, DEFAULT_PING_ADDR)
1936        if not internet:
1937            raise signals.TestFailure("Failed to connect to internet on %s" %
1938                                      expected_ssid)
1939    except Exception as error:
1940        ad.log.error("Failed to connect to passpoint network %s with error %s",
1941                     expected_ssid, error)
1942        raise signals.TestFailure("Failed to connect to %s passpoint network" %
1943                                  expected_ssid)
1944
1945    finally:
1946        ad.droid.wifiStopTrackingStateChange()
1947
1948
1949def delete_passpoint(ad, fqdn):
1950    """Delete a required Passpoint configuration."""
1951    try:
1952        ad.droid.removePasspointConfig(fqdn)
1953        return True
1954    except Exception as error:
1955        ad.log.error(
1956            "Failed to remove passpoint configuration with FQDN=%s "
1957            "and error=%s", fqdn, error)
1958        return False
1959
1960
1961def start_wifi_single_scan(ad, scan_setting):
1962    """Starts wifi single shot scan.
1963
1964    Args:
1965        ad: android_device object to initiate connection on.
1966        scan_setting: A dict representing the settings of the scan.
1967
1968    Returns:
1969        If scan was started successfully, event data of success event is returned.
1970    """
1971    idx = ad.droid.wifiScannerStartScan(scan_setting)
1972    event = ad.ed.pop_event("WifiScannerScan%sonSuccess" % idx, SHORT_TIMEOUT)
1973    ad.log.debug("Got event %s", event)
1974    return event['data']
1975
1976
1977def track_connection(ad, network_ssid, check_connection_count):
1978    """Track wifi connection to network changes for given number of counts
1979
1980    Args:
1981        ad: android_device object for forget network.
1982        network_ssid: network ssid to which connection would be tracked
1983        check_connection_count: Integer for maximum number network connection
1984                                check.
1985    Returns:
1986        True if connection to given network happen, else return False.
1987    """
1988    ad.droid.wifiStartTrackingStateChange()
1989    while check_connection_count > 0:
1990        connect_network = ad.ed.pop_event("WifiNetworkConnected", 120)
1991        ad.log.info("Connected to network %s", connect_network)
1992        if (WifiEnums.SSID_KEY in connect_network['data'] and
1993                connect_network['data'][WifiEnums.SSID_KEY] == network_ssid):
1994            return True
1995        check_connection_count -= 1
1996    ad.droid.wifiStopTrackingStateChange()
1997    return False
1998
1999
2000def get_scan_time_and_channels(wifi_chs, scan_setting, stime_channel):
2001    """Calculate the scan time required based on the band or channels in scan
2002    setting
2003
2004    Args:
2005        wifi_chs: Object of channels supported
2006        scan_setting: scan setting used for start scan
2007        stime_channel: scan time per channel
2008
2009    Returns:
2010        scan_time: time required for completing a scan
2011        scan_channels: channel used for scanning
2012    """
2013    scan_time = 0
2014    scan_channels = []
2015    if "band" in scan_setting and "channels" not in scan_setting:
2016        scan_channels = wifi_chs.band_to_freq(scan_setting["band"])
2017    elif "channels" in scan_setting and "band" not in scan_setting:
2018        scan_channels = scan_setting["channels"]
2019    scan_time = len(scan_channels) * stime_channel
2020    for channel in scan_channels:
2021        if channel in WifiEnums.DFS_5G_FREQUENCIES:
2022            scan_time += 132  #passive scan time on DFS
2023    return scan_time, scan_channels
2024
2025
2026def start_wifi_track_bssid(ad, track_setting):
2027    """Start tracking Bssid for the given settings.
2028
2029    Args:
2030      ad: android_device object.
2031      track_setting: Setting for which the bssid tracking should be started
2032
2033    Returns:
2034      If tracking started successfully, event data of success event is returned.
2035    """
2036    idx = ad.droid.wifiScannerStartTrackingBssids(
2037        track_setting["bssidInfos"], track_setting["apLostThreshold"])
2038    event = ad.ed.pop_event("WifiScannerBssid{}onSuccess".format(idx),
2039                            SHORT_TIMEOUT)
2040    return event['data']
2041
2042
2043def convert_pem_key_to_pkcs8(in_file, out_file):
2044    """Converts the key file generated by us to the format required by
2045    Android using openssl.
2046
2047    The input file must have the extension "pem". The output file must
2048    have the extension "der".
2049
2050    Args:
2051        in_file: The original key file.
2052        out_file: The full path to the converted key file, including
2053        filename.
2054    """
2055    asserts.assert_true(in_file.endswith(".pem"), "Input file has to be .pem.")
2056    asserts.assert_true(out_file.endswith(".der"),
2057                        "Output file has to be .der.")
2058    cmd = ("openssl pkcs8 -inform PEM -in {} -outform DER -out {} -nocrypt"
2059           " -topk8").format(in_file, out_file)
2060    utils.exe_cmd(cmd)
2061
2062
2063def validate_connection(ad,
2064                        ping_addr=DEFAULT_PING_ADDR,
2065                        wait_time=15,
2066                        ping_gateway=True):
2067    """Validate internet connection by pinging the address provided.
2068
2069    Args:
2070        ad: android_device object.
2071        ping_addr: address on internet for pinging.
2072        wait_time: wait for some time before validating connection
2073
2074    Returns:
2075        ping output if successful, NULL otherwise.
2076    """
2077    android_version = int(ad.adb.shell("getprop ro.vendor.build.version.release"))
2078    # wait_time to allow for DHCP to complete.
2079    for i in range(wait_time):
2080        if ad.droid.connectivityNetworkIsConnected():
2081            if (android_version > 10 and ad.droid.connectivityGetIPv4DefaultGateway()) or android_version < 11:
2082                break
2083        time.sleep(1)
2084    ping = False
2085    try:
2086        ping = ad.droid.httpPing(ping_addr)
2087        ad.log.info("Http ping result: %s.", ping)
2088    except:
2089        pass
2090    if android_version > 10 and not ping and ping_gateway:
2091        ad.log.info("Http ping failed. Pinging default gateway")
2092        gw = ad.droid.connectivityGetIPv4DefaultGateway()
2093        result = ad.adb.shell("ping -c 6 {}".format(gw))
2094        ad.log.info("Default gateway ping result: %s" % result)
2095        ping = False if "100% packet loss" in result else True
2096    return ping
2097
2098
2099#TODO(angli): This can only verify if an actual value is exactly the same.
2100# Would be nice to be able to verify an actual value is one of serveral.
2101def verify_wifi_connection_info(ad, expected_con):
2102    """Verifies that the information of the currently connected wifi network is
2103    as expected.
2104
2105    Args:
2106        expected_con: A dict representing expected key-value pairs for wifi
2107            connection. e.g. {"SSID": "test_wifi"}
2108    """
2109    current_con = ad.droid.wifiGetConnectionInfo()
2110    case_insensitive = ["BSSID", "supplicant_state"]
2111    ad.log.debug("Current connection: %s", current_con)
2112    for k, expected_v in expected_con.items():
2113        # Do not verify authentication related fields.
2114        if k == "password":
2115            continue
2116        msg = "Field %s does not exist in wifi connection info %s." % (
2117            k, current_con)
2118        if k not in current_con:
2119            raise signals.TestFailure(msg)
2120        actual_v = current_con[k]
2121        if k in case_insensitive:
2122            actual_v = actual_v.lower()
2123            expected_v = expected_v.lower()
2124        msg = "Expected %s to be %s, actual %s is %s." % (k, expected_v, k,
2125                                                          actual_v)
2126        if actual_v != expected_v:
2127            raise signals.TestFailure(msg)
2128
2129
2130def check_autoconnect_to_open_network(
2131        ad, conn_timeout=WIFI_CONNECTION_TIMEOUT_DEFAULT):
2132    """Connects to any open WiFI AP
2133     Args:
2134         timeout value in sec to wait for UE to connect to a WiFi AP
2135     Returns:
2136         True if UE connects to WiFi AP (supplicant_state = completed)
2137         False if UE fails to complete connection within WIFI_CONNECTION_TIMEOUT time.
2138    """
2139    if ad.droid.wifiCheckState():
2140        return True
2141    ad.droid.wifiToggleState()
2142    wifi_connection_state = None
2143    timeout = time.time() + conn_timeout
2144    while wifi_connection_state != "completed":
2145        wifi_connection_state = ad.droid.wifiGetConnectionInfo(
2146        )['supplicant_state']
2147        if time.time() > timeout:
2148            ad.log.warning("Failed to connect to WiFi AP")
2149            return False
2150    return True
2151
2152
2153def expand_enterprise_config_by_phase2(config):
2154    """Take an enterprise config and generate a list of configs, each with
2155    a different phase2 auth type.
2156
2157    Args:
2158        config: A dict representing enterprise config.
2159
2160    Returns
2161        A list of enterprise configs.
2162    """
2163    results = []
2164    phase2_types = WifiEnums.EapPhase2
2165    if config[WifiEnums.Enterprise.EAP] == WifiEnums.Eap.PEAP:
2166        # Skip unsupported phase2 types for PEAP.
2167        phase2_types = [WifiEnums.EapPhase2.GTC, WifiEnums.EapPhase2.MSCHAPV2]
2168    for phase2_type in phase2_types:
2169        # Skip a special case for passpoint TTLS.
2170        if (WifiEnums.Enterprise.FQDN in config
2171                and phase2_type == WifiEnums.EapPhase2.GTC):
2172            continue
2173        c = dict(config)
2174        c[WifiEnums.Enterprise.PHASE2] = phase2_type.value
2175        results.append(c)
2176    return results
2177
2178
2179def generate_eap_test_name(config, ad=None):
2180    """ Generates a test case name based on an EAP configuration.
2181
2182    Args:
2183        config: A dict representing an EAP credential.
2184        ad object: Redundant but required as the same param is passed
2185                   to test_func in run_generated_tests
2186
2187    Returns:
2188        A string representing the name of a generated EAP test case.
2189    """
2190    eap = WifiEnums.Eap
2191    eap_phase2 = WifiEnums.EapPhase2
2192    Ent = WifiEnums.Enterprise
2193    name = "test_connect-"
2194    eap_name = ""
2195    for e in eap:
2196        if e.value == config[Ent.EAP]:
2197            eap_name = e.name
2198            break
2199    if "peap0" in config[WifiEnums.SSID_KEY].lower():
2200        eap_name = "PEAP0"
2201    if "peap1" in config[WifiEnums.SSID_KEY].lower():
2202        eap_name = "PEAP1"
2203    name += eap_name
2204    if Ent.PHASE2 in config:
2205        for e in eap_phase2:
2206            if e.value == config[Ent.PHASE2]:
2207                name += "-{}".format(e.name)
2208                break
2209    return name
2210
2211
2212def group_attenuators(attenuators):
2213    """Groups a list of attenuators into attenuator groups for backward
2214    compatibility reasons.
2215
2216    Most legacy Wi-Fi setups have two attenuators each connected to a separate
2217    AP. The new Wi-Fi setup has four attenuators, each connected to one channel
2218    on an AP, so two of them are connected to one AP.
2219
2220    To make the existing scripts work in the new setup, when the script needs
2221    to attenuate one AP, it needs to set attenuation on both attenuators
2222    connected to the same AP.
2223
2224    This function groups attenuators properly so the scripts work in both
2225    legacy and new Wi-Fi setups.
2226
2227    Args:
2228        attenuators: A list of attenuator objects, either two or four in length.
2229
2230    Raises:
2231        signals.TestFailure is raised if the attenuator list does not have two
2232        or four objects.
2233    """
2234    attn0 = attenuator.AttenuatorGroup("AP0")
2235    attn1 = attenuator.AttenuatorGroup("AP1")
2236    # Legacy testbed setup has two attenuation channels.
2237    num_of_attns = len(attenuators)
2238    if num_of_attns == 2:
2239        attn0.add(attenuators[0])
2240        attn1.add(attenuators[1])
2241    elif num_of_attns == 4:
2242        attn0.add(attenuators[0])
2243        attn0.add(attenuators[1])
2244        attn1.add(attenuators[2])
2245        attn1.add(attenuators[3])
2246    else:
2247        asserts.fail(("Either two or four attenuators are required for this "
2248                      "test, but found %s") % num_of_attns)
2249    return [attn0, attn1]
2250
2251
2252def set_attns(attenuator, attn_val_name, roaming_attn=ROAMING_ATTN):
2253    """Sets attenuation values on attenuators used in this test.
2254
2255    Args:
2256        attenuator: The attenuator object.
2257        attn_val_name: Name of the attenuation value pair to use.
2258        roaming_attn: Dictionary specifying the attenuation params.
2259    """
2260    logging.info("Set attenuation values to %s", roaming_attn[attn_val_name])
2261    try:
2262        attenuator[0].set_atten(roaming_attn[attn_val_name][0])
2263        attenuator[1].set_atten(roaming_attn[attn_val_name][1])
2264        attenuator[2].set_atten(roaming_attn[attn_val_name][2])
2265        attenuator[3].set_atten(roaming_attn[attn_val_name][3])
2266    except:
2267        logging.exception("Failed to set attenuation values %s.",
2268                          attn_val_name)
2269        raise
2270
2271
2272def set_attns_steps(attenuators,
2273                    atten_val_name,
2274                    roaming_attn=ROAMING_ATTN,
2275                    steps=10,
2276                    wait_time=12):
2277    """Set attenuation values on attenuators used in this test. It will change
2278    the attenuation values linearly from current value to target value step by
2279    step.
2280
2281    Args:
2282        attenuators: The list of attenuator objects that you want to change
2283                     their attenuation value.
2284        atten_val_name: Name of the attenuation value pair to use.
2285        roaming_attn: Dictionary specifying the attenuation params.
2286        steps: Number of attenuator changes to reach the target value.
2287        wait_time: Sleep time for each change of attenuator.
2288    """
2289    logging.info("Set attenuation values to %s in %d step(s)",
2290                 roaming_attn[atten_val_name], steps)
2291    start_atten = [attenuator.get_atten() for attenuator in attenuators]
2292    target_atten = roaming_attn[atten_val_name]
2293    for current_step in range(steps):
2294        progress = (current_step + 1) / steps
2295        for i, attenuator in enumerate(attenuators):
2296            amount_since_start = (target_atten[i] - start_atten[i]) * progress
2297            attenuator.set_atten(round(start_atten[i] + amount_since_start))
2298        time.sleep(wait_time)
2299
2300
2301def trigger_roaming_and_validate(dut,
2302                                 attenuator,
2303                                 attn_val_name,
2304                                 expected_con,
2305                                 roaming_attn=ROAMING_ATTN):
2306    """Sets attenuators to trigger roaming and validate the DUT connected
2307    to the BSSID expected.
2308
2309    Args:
2310        attenuator: The attenuator object.
2311        attn_val_name: Name of the attenuation value pair to use.
2312        expected_con: The network information of the expected network.
2313        roaming_attn: Dictionary specifying the attenaution params.
2314    """
2315    expected_con = {
2316        WifiEnums.SSID_KEY: expected_con[WifiEnums.SSID_KEY],
2317        WifiEnums.BSSID_KEY: expected_con["bssid"],
2318    }
2319    set_attns_steps(attenuator, attn_val_name, roaming_attn)
2320
2321    verify_wifi_connection_info(dut, expected_con)
2322    expected_bssid = expected_con[WifiEnums.BSSID_KEY]
2323    logging.info("Roamed to %s successfully", expected_bssid)
2324    if not validate_connection(dut):
2325        raise signals.TestFailure("Fail to connect to internet on %s" %
2326                                  expected_bssid)
2327
2328
2329def create_softap_config():
2330    """Create a softap config with random ssid and password."""
2331    ap_ssid = "softap_" + utils.rand_ascii_str(8)
2332    ap_password = utils.rand_ascii_str(8)
2333    logging.info("softap setup: %s %s", ap_ssid, ap_password)
2334    config = {
2335        WifiEnums.SSID_KEY: ap_ssid,
2336        WifiEnums.PWD_KEY: ap_password,
2337    }
2338    return config
2339
2340
2341def start_softap_and_verify(ad, band):
2342    """Bring-up softap and verify AP mode and in scan results.
2343
2344    Args:
2345        band: The band to use for softAP.
2346
2347    Returns: dict, the softAP config.
2348
2349    """
2350    # Register before start the test.
2351    callbackId = ad.dut.droid.registerSoftApCallback()
2352    # Check softap info value is default
2353    frequency, bandwdith = get_current_softap_info(ad.dut, callbackId, True)
2354    asserts.assert_true(frequency == 0, "Softap frequency is not reset")
2355    asserts.assert_true(bandwdith == 0, "Softap bandwdith is not reset")
2356
2357    config = create_softap_config()
2358    start_wifi_tethering(ad.dut,
2359                         config[WifiEnums.SSID_KEY],
2360                         config[WifiEnums.PWD_KEY],
2361                         band=band)
2362    asserts.assert_true(ad.dut.droid.wifiIsApEnabled(),
2363                        "SoftAp is not reported as running")
2364    start_wifi_connection_scan_and_ensure_network_found(
2365        ad.dut_client, config[WifiEnums.SSID_KEY])
2366
2367    # Check softap info can get from callback succeed and assert value should be
2368    # valid.
2369    frequency, bandwdith = get_current_softap_info(ad.dut, callbackId, True)
2370    asserts.assert_true(frequency > 0, "Softap frequency is not valid")
2371    asserts.assert_true(bandwdith > 0, "Softap bandwdith is not valid")
2372    # Unregister callback
2373    ad.dut.droid.unregisterSoftApCallback(callbackId)
2374
2375    return config
2376
2377
2378def wait_for_expected_number_of_softap_clients(ad, callbackId,
2379                                               expected_num_of_softap_clients):
2380    """Wait for the number of softap clients to be updated as expected.
2381    Args:
2382        callbackId: Id of the callback associated with registering.
2383        expected_num_of_softap_clients: expected number of softap clients.
2384    """
2385    eventStr = wifi_constants.SOFTAP_CALLBACK_EVENT + str(
2386        callbackId) + wifi_constants.SOFTAP_NUMBER_CLIENTS_CHANGED
2387    clientData = ad.ed.pop_event(eventStr, SHORT_TIMEOUT)['data']
2388    clientCount = clientData[wifi_constants.SOFTAP_NUMBER_CLIENTS_CALLBACK_KEY]
2389    clientMacAddresses = clientData[
2390        wifi_constants.SOFTAP_CLIENTS_MACS_CALLBACK_KEY]
2391    asserts.assert_equal(
2392        clientCount, expected_num_of_softap_clients,
2393        "The number of softap clients doesn't match the expected number")
2394    asserts.assert_equal(
2395        len(clientMacAddresses), expected_num_of_softap_clients,
2396        "The number of mac addresses doesn't match the expected number")
2397    for macAddress in clientMacAddresses:
2398        asserts.assert_true(checkMacAddress(macAddress),
2399                            "An invalid mac address was returned")
2400
2401
2402def checkMacAddress(input):
2403    """Validate whether a string is a valid mac address or not.
2404
2405    Args:
2406        input: The string to validate.
2407
2408    Returns: True/False, returns true for a valid mac address and false otherwise.
2409    """
2410    macValidationRegex = "[0-9a-f]{2}([-:]?)[0-9a-f]{2}(\\1[0-9a-f]{2}){4}$"
2411    if re.match(macValidationRegex, input.lower()):
2412        return True
2413    return False
2414
2415
2416def wait_for_expected_softap_state(ad, callbackId, expected_softap_state):
2417    """Wait for the expected softap state change.
2418    Args:
2419        callbackId: Id of the callback associated with registering.
2420        expected_softap_state: The expected softap state.
2421    """
2422    eventStr = wifi_constants.SOFTAP_CALLBACK_EVENT + str(
2423        callbackId) + wifi_constants.SOFTAP_STATE_CHANGED
2424    asserts.assert_equal(
2425        ad.ed.pop_event(eventStr, SHORT_TIMEOUT)['data'][
2426            wifi_constants.SOFTAP_STATE_CHANGE_CALLBACK_KEY],
2427        expected_softap_state,
2428        "Softap state doesn't match with expected state")
2429
2430
2431def get_current_number_of_softap_clients(ad, callbackId):
2432    """pop up all of softap client updated event from queue.
2433    Args:
2434        callbackId: Id of the callback associated with registering.
2435
2436    Returns:
2437        If exist aleast callback, returns last updated number_of_softap_clients.
2438        Returns None when no any match callback event in queue.
2439    """
2440    eventStr = wifi_constants.SOFTAP_CALLBACK_EVENT + str(
2441        callbackId) + wifi_constants.SOFTAP_NUMBER_CLIENTS_CHANGED
2442    events = ad.ed.pop_all(eventStr)
2443    for event in events:
2444        num_of_clients = event['data'][
2445            wifi_constants.SOFTAP_NUMBER_CLIENTS_CALLBACK_KEY]
2446    if len(events) == 0:
2447        return None
2448    return num_of_clients
2449
2450
2451def get_current_softap_info(ad, callbackId, need_to_wait):
2452    """pop up all of softap info changed event from queue.
2453    Args:
2454        callbackId: Id of the callback associated with registering.
2455        need_to_wait: Wait for the info callback event before pop all.
2456    Returns:
2457        Returns last updated information of softap.
2458    """
2459    eventStr = wifi_constants.SOFTAP_CALLBACK_EVENT + str(
2460        callbackId) + wifi_constants.SOFTAP_INFO_CHANGED
2461    ad.log.debug("softap info dump from eventStr %s", eventStr)
2462    frequency = 0
2463    bandwidth = 0
2464    if (need_to_wait):
2465        event = ad.ed.pop_event(eventStr, SHORT_TIMEOUT)
2466        frequency = event['data'][
2467            wifi_constants.SOFTAP_INFO_FREQUENCY_CALLBACK_KEY]
2468        bandwidth = event['data'][
2469            wifi_constants.SOFTAP_INFO_BANDWIDTH_CALLBACK_KEY]
2470        ad.log.info("softap info updated, frequency is %s, bandwidth is %s",
2471                    frequency, bandwidth)
2472
2473    events = ad.ed.pop_all(eventStr)
2474    for event in events:
2475        frequency = event['data'][
2476            wifi_constants.SOFTAP_INFO_FREQUENCY_CALLBACK_KEY]
2477        bandwidth = event['data'][
2478            wifi_constants.SOFTAP_INFO_BANDWIDTH_CALLBACK_KEY]
2479    ad.log.info("softap info, frequency is %s, bandwidth is %s", frequency,
2480                bandwidth)
2481    return frequency, bandwidth
2482
2483def get_current_softap_infos(ad, callbackId, need_to_wait):
2484    """pop up all of softap info list changed event from queue.
2485    Args:
2486        callbackId: Id of the callback associated with registering.
2487        need_to_wait: Wait for the info callback event before pop all.
2488    Returns:
2489        Returns last updated informations of softap.
2490    """
2491    eventStr = wifi_constants.SOFTAP_CALLBACK_EVENT + str(
2492        callbackId) + wifi_constants.SOFTAP_INFOLIST_CHANGED
2493    ad.log.debug("softap info dump from eventStr %s", eventStr)
2494
2495    if (need_to_wait):
2496        event = ad.ed.pop_event(eventStr, SHORT_TIMEOUT)
2497        infos = event['data']
2498
2499    events = ad.ed.pop_all(eventStr)
2500    for event in events:
2501        infos = event['data']
2502
2503    for info in infos:
2504        frequency = info[
2505            wifi_constants.SOFTAP_INFO_FREQUENCY_CALLBACK_KEY]
2506        bandwidth = info[
2507            wifi_constants.SOFTAP_INFO_BANDWIDTH_CALLBACK_KEY]
2508        wifistandard = info[
2509            wifi_constants.SOFTAP_INFO_WIFISTANDARD_CALLBACK_KEY]
2510        bssid = info[
2511            wifi_constants.SOFTAP_INFO_BSSID_CALLBACK_KEY]
2512        ad.log.info(
2513                "softap info, freq:%s, bw:%s, wifistandard:%s, bssid:%s",
2514                frequency, bandwidth, wifistandard, bssid)
2515
2516    return infos
2517
2518def get_current_softap_capability(ad, callbackId, need_to_wait):
2519    """pop up all of softap info list changed event from queue.
2520    Args:
2521        callbackId: Id of the callback associated with registering.
2522        need_to_wait: Wait for the info callback event before pop all.
2523    Returns:
2524        Returns last updated capability of softap.
2525    """
2526    eventStr = wifi_constants.SOFTAP_CALLBACK_EVENT + str(
2527            callbackId) + wifi_constants.SOFTAP_CAPABILITY_CHANGED
2528    ad.log.debug("softap capability dump from eventStr %s", eventStr)
2529    if (need_to_wait):
2530        event = ad.ed.pop_event(eventStr, SHORT_TIMEOUT)
2531        capability = event['data']
2532
2533    events = ad.ed.pop_all(eventStr)
2534    for event in events:
2535        capability = event['data']
2536
2537    return capability
2538
2539def get_ssrdumps(ad):
2540    """Pulls dumps in the ssrdump dir
2541    Args:
2542        ad: android device object.
2543    """
2544    logs = ad.get_file_names("/data/vendor/ssrdump/")
2545    if logs:
2546        ad.log.info("Pulling ssrdumps %s", logs)
2547        log_path = os.path.join(ad.device_log_path, "SSRDUMPS_%s" % ad.serial)
2548        os.makedirs(log_path, exist_ok=True)
2549        ad.pull_files(logs, log_path)
2550    ad.adb.shell("find /data/vendor/ssrdump/ -type f -delete",
2551                 ignore_status=True)
2552
2553
2554def start_pcap(pcap, wifi_band, test_name):
2555    """Start packet capture in monitor mode.
2556
2557    Args:
2558        pcap: packet capture object
2559        wifi_band: '2g' or '5g' or 'dual'
2560        test_name: test name to be used for pcap file name
2561
2562    Returns:
2563        Dictionary with wifi band as key and the tuple
2564        (pcap Process object, log directory) as the value
2565    """
2566    log_dir = os.path.join(
2567        context.get_current_context().get_full_output_path(), 'PacketCapture')
2568    os.makedirs(log_dir, exist_ok=True)
2569    if wifi_band == 'dual':
2570        bands = [BAND_2G, BAND_5G]
2571    else:
2572        bands = [wifi_band]
2573    procs = {}
2574    for band in bands:
2575        proc = pcap.start_packet_capture(band, log_dir, test_name)
2576        procs[band] = (proc, os.path.join(log_dir, test_name))
2577    return procs
2578
2579
2580def stop_pcap(pcap, procs, test_status=None):
2581    """Stop packet capture in monitor mode.
2582
2583    Since, the pcap logs in monitor mode can be very large, we will
2584    delete them if they are not required. 'test_status' if True, will delete
2585    the pcap files. If False, we will keep them.
2586
2587    Args:
2588        pcap: packet capture object
2589        procs: dictionary returned by start_pcap
2590        test_status: status of the test case
2591    """
2592    for proc, fname in procs.values():
2593        pcap.stop_packet_capture(proc)
2594
2595    if test_status:
2596        shutil.rmtree(os.path.dirname(fname))
2597
2598
2599def verify_mac_not_found_in_pcap(ad, mac, packets):
2600    """Verify that a mac address is not found in the captured packets.
2601
2602    Args:
2603        ad: android device object
2604        mac: string representation of the mac address
2605        packets: packets obtained by rdpcap(pcap_fname)
2606    """
2607    for pkt in packets:
2608        logging.debug("Packet Summary = %s", pkt.summary())
2609        if mac in pkt.summary():
2610            asserts.fail("Device %s caught Factory MAC: %s in packet sniffer."
2611                         "Packet = %s" % (ad.serial, mac, pkt.show()))
2612
2613
2614def verify_mac_is_found_in_pcap(ad, mac, packets):
2615    """Verify that a mac address is found in the captured packets.
2616
2617    Args:
2618        ad: android device object
2619        mac: string representation of the mac address
2620        packets: packets obtained by rdpcap(pcap_fname)
2621    """
2622    for pkt in packets:
2623        if mac in pkt.summary():
2624            return
2625    asserts.fail("Did not find MAC = %s in packet sniffer."
2626                 "for device %s" % (mac, ad.serial))
2627
2628def start_all_wlan_logs(ads):
2629    for ad in ads:
2630        start_wlan_logs(ad)
2631
2632def start_wlan_logs(ad):
2633    """Start Pixel Logger to record extra wifi logs
2634
2635    Args:
2636        ad: android device object.
2637    """
2638    if not ad.adb.shell("pm list package | grep com.android.pixellogger"):
2639        ad.log.info("Device doesn't have Pixel Logger")
2640        return
2641
2642    ad.adb.shell(
2643            "find /sdcard/Android/data/com.android.pixellogger/files/logs"
2644            "/wlan_logs/ -type f -delete",
2645            ignore_status=True)
2646    if ad.file_exists("/vendor/bin/cnss_diag"):
2647        ad.adb.shell("am startservice -a com.android.pixellogger.service"
2648                ".logging.LoggingService.ACTION_START_LOGGING "
2649                "-e intent_logger cnss_diag", ignore_status=True)
2650    else:
2651        ad.adb.shell("am startservice -a com.android.pixellogger.service"
2652                ".logging.LoggingService.ACTION_START_LOGGING "
2653                "-e intent_logger wlan_logs", ignore_status=True)
2654
2655def stop_all_wlan_logs(ads):
2656    for ad in ads:
2657        stop_wlan_logs(ad)
2658    ad.log.info("Wait 30s for the createion of zip file for wlan logs")
2659    time.sleep(30)
2660
2661def stop_wlan_logs(ad):
2662    """Stops Pixel Logger
2663
2664    Args:
2665        ad: android device object.
2666    """
2667    if not ad.adb.shell("pm list package | grep com.android.pixellogger"):
2668        return
2669
2670    ad.adb.shell("am startservice -a com.android.pixellogger.service.logging"
2671            ".LoggingService.ACTION_STOP_LOGGING", ignore_status=True)
2672
2673def get_wlan_logs(ad):
2674    """Pull logs from Pixel Logger folder
2675    Args:
2676        ad: android device object.
2677    """
2678    logs = ad.get_file_names("/sdcard/Android/data/com.android.pixellogger/files/logs/wlan_logs")
2679    if logs:
2680        ad.log.info("Pulling Pixel Logger logs %s", logs)
2681        log_path = os.path.join(ad.device_log_path, "WLAN_LOGS_%s" % ad.serial)
2682        os.makedirs(log_path, exist_ok=True)
2683        ad.pull_files(logs, log_path)
2684
2685LinkProbeResult = namedtuple(
2686    'LinkProbeResult',
2687    ('is_success', 'stdout', 'elapsed_time', 'failure_reason'))
2688
2689
2690def send_link_probe(ad):
2691    """Sends a link probe to the currently connected AP, and returns whether the
2692    probe succeeded or not.
2693
2694    Args:
2695         ad: android device object
2696    Returns:
2697        LinkProbeResult namedtuple
2698    """
2699    stdout = ad.adb.shell('cmd wifi send-link-probe')
2700    asserts.assert_false('Error' in stdout or 'Exception' in stdout,
2701                         'Exception while sending link probe: ' + stdout)
2702
2703    is_success = False
2704    elapsed_time = None
2705    failure_reason = None
2706    if 'succeeded' in stdout:
2707        is_success = True
2708        elapsed_time = next(
2709            (int(token) for token in stdout.split() if token.isdigit()), None)
2710    elif 'failed with reason' in stdout:
2711        failure_reason = next(
2712            (int(token) for token in stdout.split() if token.isdigit()), None)
2713    else:
2714        asserts.fail('Unexpected link probe result: ' + stdout)
2715
2716    return LinkProbeResult(is_success=is_success,
2717                           stdout=stdout,
2718                           elapsed_time=elapsed_time,
2719                           failure_reason=failure_reason)
2720
2721
2722def send_link_probes(ad, num_probes, delay_sec):
2723    """Sends a sequence of link probes to the currently connected AP, and
2724    returns whether the probes succeeded or not.
2725
2726    Args:
2727         ad: android device object
2728         num_probes: number of probes to perform
2729         delay_sec: delay time between probes, in seconds
2730    Returns:
2731        List[LinkProbeResult] one LinkProbeResults for each probe
2732    """
2733    logging.info('Sending link probes')
2734    results = []
2735    for _ in range(num_probes):
2736        # send_link_probe() will also fail the test if it sees an exception
2737        # in the stdout of the adb shell command
2738        result = send_link_probe(ad)
2739        logging.info('link probe results: ' + str(result))
2740        results.append(result)
2741        time.sleep(delay_sec)
2742
2743    return results
2744
2745
2746def ap_setup(test, index, ap, network, bandwidth=80, channel=6):
2747    """Set up the AP with provided network info.
2748
2749        Args:
2750            test: the calling test class object.
2751            index: int, index of the AP.
2752            ap: access_point object of the AP.
2753            network: dict with information of the network, including ssid,
2754                     password and bssid.
2755            bandwidth: the operation bandwidth for the AP, default 80MHz.
2756            channel: the channel number for the AP.
2757        Returns:
2758            brconfigs: the bridge interface configs
2759        """
2760    bss_settings = []
2761    ssid = network[WifiEnums.SSID_KEY]
2762    test.access_points[index].close()
2763    time.sleep(5)
2764
2765    # Configure AP as required.
2766    if "password" in network.keys():
2767        password = network["password"]
2768        security = hostapd_security.Security(security_mode="wpa",
2769                                             password=password)
2770    else:
2771        security = hostapd_security.Security(security_mode=None, password=None)
2772    config = hostapd_ap_preset.create_ap_preset(channel=channel,
2773                                                ssid=ssid,
2774                                                security=security,
2775                                                bss_settings=bss_settings,
2776                                                vht_bandwidth=bandwidth,
2777                                                profile_name='whirlwind',
2778                                                iface_wlan_2g=ap.wlan_2g,
2779                                                iface_wlan_5g=ap.wlan_5g)
2780    ap.start_ap(config)
2781    logging.info("AP started on channel {} with SSID {}".format(channel, ssid))
2782
2783
2784def turn_ap_off(test, AP):
2785    """Bring down hostapd on the Access Point.
2786    Args:
2787        test: The test class object.
2788        AP: int, indicating which AP to turn OFF.
2789    """
2790    hostapd_2g = test.access_points[AP - 1]._aps['wlan0'].hostapd
2791    if hostapd_2g.is_alive():
2792        hostapd_2g.stop()
2793        logging.debug('Turned WLAN0 AP%d off' % AP)
2794    hostapd_5g = test.access_points[AP - 1]._aps['wlan1'].hostapd
2795    if hostapd_5g.is_alive():
2796        hostapd_5g.stop()
2797        logging.debug('Turned WLAN1 AP%d off' % AP)
2798
2799
2800def turn_ap_on(test, AP):
2801    """Bring up hostapd on the Access Point.
2802    Args:
2803        test: The test class object.
2804        AP: int, indicating which AP to turn ON.
2805    """
2806    hostapd_2g = test.access_points[AP - 1]._aps['wlan0'].hostapd
2807    if not hostapd_2g.is_alive():
2808        hostapd_2g.start(hostapd_2g.config)
2809        logging.debug('Turned WLAN0 AP%d on' % AP)
2810    hostapd_5g = test.access_points[AP - 1]._aps['wlan1'].hostapd
2811    if not hostapd_5g.is_alive():
2812        hostapd_5g.start(hostapd_5g.config)
2813        logging.debug('Turned WLAN1 AP%d on' % AP)
2814
2815
2816def turn_location_off_and_scan_toggle_off(ad):
2817    """Turns off wifi location scans."""
2818    utils.set_location_service(ad, False)
2819    ad.droid.wifiScannerToggleAlwaysAvailable(False)
2820    msg = "Failed to turn off location service's scan."
2821    asserts.assert_true(not ad.droid.wifiScannerIsAlwaysAvailable(), msg)
2822
2823
2824def set_softap_channel(dut, ap_iface='wlan1', cs_count=10, channel=2462):
2825    """ Set SoftAP mode channel
2826
2827    Args:
2828        dut: android device object
2829        ap_iface: interface of SoftAP mode.
2830        cs_count: how many beacon frames before switch channel, default = 10
2831        channel: a wifi channel.
2832    """
2833    chan_switch_cmd = 'hostapd_cli -i {} chan_switch {} {}'
2834    chan_switch_cmd_show = chan_switch_cmd.format(ap_iface, cs_count, channel)
2835    dut.log.info('adb shell {}'.format(chan_switch_cmd_show))
2836    chan_switch_result = dut.adb.shell(
2837        chan_switch_cmd.format(ap_iface, cs_count, channel))
2838    if chan_switch_result == 'OK':
2839        dut.log.info('switch hotspot channel to {}'.format(channel))
2840        return chan_switch_result
2841
2842    asserts.fail("Failed to switch hotspot channel")
2843
2844def get_wlan0_link(dut):
2845    """ get wlan0 interface status"""
2846    get_wlan0 = 'wpa_cli -iwlan0 -g@android:wpa_wlan0 IFNAME=wlan0 status'
2847    out = dut.adb.shell(get_wlan0)
2848    out = dict(re.findall(r'(\S+)=(".*?"|\S+)', out))
2849    asserts.assert_true("ssid" in out,
2850                        "Client doesn't connect to any network")
2851    return out
2852
2853def verify_11ax_wifi_connection(ad, wifi6_supported_models, wifi6_ap):
2854    """Verify 11ax for wifi connection.
2855
2856    Args:
2857      ad: adndroid device object
2858      wifi6_supported_models: device supporting 11ax.
2859      wifi6_ap: if the AP supports 11ax.
2860    """
2861    if wifi6_ap and ad.model in wifi6_supported_models:
2862        logging.info("Verifying 11ax. Model: %s" % ad.model)
2863        asserts.assert_true(
2864            ad.droid.wifiGetConnectionStandard() ==
2865            wifi_constants.WIFI_STANDARD_11AX, "DUT did not connect to 11ax.")
2866
2867def verify_11ax_softap(dut, dut_client, wifi6_supported_models):
2868    """Verify 11ax SoftAp if devices support it.
2869
2870    Check if both DUT and DUT client supports 11ax, then SoftAp turns on
2871    with 11ax mode and DUT client can connect to it.
2872
2873    Args:
2874      dut: Softap device.
2875      dut_client: Client connecting to softap.
2876      wifi6_supported_models: List of device models supporting 11ax.
2877    """
2878    if dut.model in wifi6_supported_models and dut_client.model in wifi6_supported_models:
2879        logging.info(
2880            "Verifying 11ax softap. DUT model: %s, DUT Client model: %s",
2881            dut.model, dut_client.model)
2882        asserts.assert_true(
2883            dut_client.droid.wifiGetConnectionStandard() ==
2884            wifi_constants.WIFI_STANDARD_11AX,
2885            "DUT failed to start SoftAp in 11ax.")
2886
2887def check_available_channels_in_bands_2_5(dut, country_code):
2888    """Check if DUT is capable of enable BridgedAp.
2889    #TODO: Find a way to make this function flexible by taking an argument.
2890
2891    Check points:
2892        1. Check the DUT support by calling Android API.
2893        2. Check the dual SAP bands support by changing DUT to the given country_code.
2894
2895    Args:
2896        country_code: country code, e.g., 'US', 'JP'.
2897    Returns:
2898        True: If DUT is capable of enable BridgedAp.
2899        False: If DUT is not capable of enable BridgedAp.
2900    """
2901    # Check point #1
2902    is_bridged_ap_supported = dut.droid.wifiIsBridgedApConcurrencySupported()
2903    if not is_bridged_ap_supported:
2904        logging.error("DUT %s doesn't support bridged AP.", dut.model)
2905        return False
2906
2907    # Check point #2
2908    set_wifi_country_code(dut, country_code)
2909    country = dut.droid.wifiGetCountryCode()
2910    dut.log.info("DUT current country code : {}".format(country))
2911    # Wi-Fi ON and OFF to make sure country code take effet.
2912    wifi_toggle_state(dut, True)
2913    wifi_toggle_state(dut, False)
2914
2915    # Register SoftAp Callback and get SoftAp capability.
2916    callbackId = dut.droid.registerSoftApCallback()
2917    capability = get_current_softap_capability(dut, callbackId, True)
2918    dut.droid.unregisterSoftApCallback(callbackId)
2919
2920    if capability[wifi_constants.
2921                  SOFTAP_CAPABILITY_24GHZ_SUPPORTED_CHANNEL_LIST] and \
2922        capability[wifi_constants.
2923                   SOFTAP_CAPABILITY_5GHZ_SUPPORTED_CHANNEL_LIST]:
2924        return True
2925
2926    logging.error("DUT in %s doesn't support dual SAP bands (2G and 5G).", country_code)
2927    return False
2928
2929
2930@retry(tries=5, delay=2)
2931def validate_ping_between_two_clients(dut1, dut2):
2932    """Make 2 DUT ping each other.
2933
2934    Args:
2935        dut1: An AndroidDevice object.
2936        dut2: An AndroidDevice object.
2937    """
2938    # Get DUTs' IPv4 addresses.
2939    dut1_ip = ""
2940    dut2_ip = ""
2941    try:
2942        dut1_ip = dut1.droid.connectivityGetIPv4Addresses('wlan0')[0]
2943    except IndexError as e:
2944        dut1.log.info(
2945            "{} has no Wi-Fi connection, cannot get IPv4 address."
2946            .format(dut1.serial))
2947    try:
2948        dut2_ip = dut2.droid.connectivityGetIPv4Addresses('wlan0')[0]
2949    except IndexError as e:
2950        dut2.log.info(
2951            "{} has no Wi-Fi connection, cannot get IPv4 address."
2952            .format(dut2.serial))
2953    # Test fail if not able to obtain two DUT's IPv4 addresses.
2954    asserts.assert_true(dut1_ip and dut2_ip,
2955                        "Ping failed because no DUT's IPv4 address")
2956
2957    dut1.log.info("{} IPv4 addresses : {}".format(dut1.serial, dut1_ip))
2958    dut2.log.info("{} IPv4 addresses : {}".format(dut2.serial, dut2_ip))
2959
2960    # Two clients ping each other
2961    dut1.log.info("{} ping {}".format(dut1_ip, dut2_ip))
2962    asserts.assert_true(
2963        utils.adb_shell_ping(dut1, count=10, dest_ip=dut2_ip,
2964                             timeout=20),
2965        "%s ping %s failed" % (dut1.serial, dut2_ip))
2966
2967    dut2.log.info("{} ping {}".format(dut2_ip, dut1_ip))
2968    asserts.assert_true(
2969        utils.adb_shell_ping(dut2, count=10, dest_ip=dut1_ip,
2970                             timeout=20),
2971        "%s ping %s failed" % (dut2.serial, dut1_ip))
2972
2973def get_wear_wifimediator_disable_status(ad):
2974    """Gets WearWifiMediator disable status.
2975
2976    Args:
2977        ad: Android Device
2978
2979    Returns:
2980        True if WearWifiMediator is disabled, False otherwise.
2981    """
2982    status = ad.adb.shell("settings get global cw_disable_wifimediator")
2983    if status == "1":
2984        ad.log.info("WearWifiMediator is DISABLED")
2985        status = True
2986    else:
2987        ad.log.info("WearWifiMediator is ENABLED")
2988        status = False
2989    return status
2990
2991def disable_wear_wifimediator(ad, state):
2992    """Disables WearWifiMediator.
2993
2994    Args:
2995        ad: Android Device
2996        state: True to disable, False otherwise.
2997    """
2998    if state:
2999        ad.log.info("Disabling WearWifiMediator.....")
3000        ad.adb.shell("settings put global cw_disable_wifimediator 1")
3001        asserts.assert_true(get_wear_wifimediator_disable_status(ad),
3002                            "WearWifiMediator should be disabled")
3003    else:
3004        ad.log.info("Enabling WearWifiMediator.....")
3005        ad.adb.shell("settings put global cw_disable_wifimediator 0")
3006        asserts.assert_false(get_wear_wifimediator_disable_status(ad),
3007                             "WearWifiMediator should be enabled")
3008
3009def list_scan_results(ad, wait_time=15):
3010    """
3011    Initiates an Android Wi-Fi scan and retrieves the available Wi-Fi networks'.
3012
3013    Args:
3014        ad (AndroidDevice): The Android device on which the scan is performed.
3015        wait_time (int, optional):
3016          The time in seconds to wait for the scan to complete before fetching results.
3017          Default is 10 seconds.
3018    """
3019    ad.log.info("Start scan for available Wi-Fi networks...")
3020    ad.adb.shell("cmd wifi start-scan")
3021    ad.log.info("Wait %ss for scan to complete.", wait_time)
3022    time.sleep(wait_time)
3023    scan_results = ad.adb.shell("cmd wifi list-scan-results")
3024    ad.log.info("Available Wi-Fi networks: " + "\n" + scan_results + "\n")
3025
3026def kill_iperf3_server_by_port(port: str):
3027    """
3028        Kill an iperf3 server process running on the specified port.
3029
3030        Args:
3031            port: The port number on which the iperf3 server is running.
3032    """
3033    try:
3034        ps_output = subprocess.check_output(["ps", "aux"], universal_newlines=True)
3035        lines = ps_output.split('\n')
3036        for line in lines:
3037            if "iperf3" in line and str(port) in line:
3038                columns = line.split()
3039                pid = columns[1]
3040                subprocess.run(["kill", "-15", pid])
3041                logging.warning(f"iperf3 server on port {port} already in use,"
3042                              f"kill it with PID {pid}")
3043    except subprocess.CalledProcessError:
3044        logging.info("Error executing shell command with subprocess.")
3045
3046def get_host_iperf_ipv4_address(dut: AndroidDevice) -> str | None:
3047  """Gets the host's iPerf IPv4 address.
3048
3049  This function tries to get the host's iPerf IPv4 address by finding the first
3050  IPv4 address for iperf server that can be pinged.
3051
3052  Args:
3053    dut: The Android device.
3054
3055  Returns:
3056    The host's iPerf IPv4 address, if found; None, otherwise.
3057  """
3058  try:
3059    # Run ifconfig command and get its output
3060    output = subprocess.check_output(["ifconfig"], universal_newlines=True)
3061  except subprocess.CalledProcessError:
3062    logging.info("Error executing ifconfig command.")
3063    return None
3064  except FileNotFoundError:
3065    logging.info("ifconfig command not found.")
3066    return None
3067  except Exception as e:
3068    logging.info("An unexpected error occurred: %s", e)
3069    return None
3070
3071  # Regular expression to capture IPv4 addresses that come after the word 'inet'
3072  pattern = r'inet (\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})'
3073
3074  # Extract all matches
3075  matches = re.findall(pattern, output)
3076
3077  # Return the first public IP address found
3078  for ip_str in matches:
3079    try:
3080      ip = ipaddress.ip_address(ip_str)
3081    except ValueError:
3082      logging.warning("Invalid IP address: %s", str(ip))
3083      continue
3084    if ip.is_loopback:
3085      logging.info("Skip loopback IP address: %s", str(ip))
3086      continue
3087    try:
3088      ping_result = dut.adb.shell("ping -c 6 {}".format(str(ip)))
3089      dut.log.info("Host IP ping result: %s" % ping_result)
3090      if "100% packet loss" in ping_result:
3091        logging.warning("Ping host IP %s results: %s", str(ip), ping_result)
3092        continue
3093      return ip_str
3094    except AdbCommandError as e:
3095      logging.warning("Failed to ping host IP %s: %s", str(ip), e)
3096      continue
3097
3098  # Return None if no suitable host iPerf server IP found
3099  return None
3100
3101def get_iperf_server_port():
3102  """Gets a unique port number within the Dynamic port range (49152-65535).
3103
3104  This function first determines which ports are currently in use, and then
3105  selects a random port from the dynamic range that is not in use.
3106
3107  Returns:
3108    int: An unused port number.
3109
3110  Raises:
3111    Exception: If no available port is found in the Dynamic Ports range.
3112  """
3113
3114  def get_used_ports():
3115    """Retrieve a list of ports that are currently in use on the system.
3116
3117    This function uses the 'netstat' command to determine which ports are
3118    currently in use, and then parses the output to extract the port numbers.
3119
3120    Returns:
3121        list[int]: A list of ports currently in use.
3122    """
3123    try:
3124      # Get the output from the `netstat` command.
3125      output = subprocess.check_output(['netstat', '-tuln']).decode('utf-8')
3126
3127      # Use a regex to extract port numbers from the output.
3128      port_pattern = re.compile(r'(?<=:)\d+')
3129      ports = port_pattern.findall(output)
3130
3131      # Convert the list of ports to integers and return.
3132      return list(map(int, ports))
3133    except Exception as e:
3134      logging.error(f"Error: {e}")
3135      return []
3136
3137  # Define the range for Dynamic Ports.
3138  dynamic_ports_range = list(range(49152, 65536))
3139  used_ports = get_used_ports()
3140
3141  # Randomly shuffle the ports and then find one that's not in use.
3142  random.shuffle(dynamic_ports_range)
3143
3144  for port in dynamic_ports_range:
3145    if port not in used_ports:
3146      return port
3147
3148  raise RuntimeError("No available port found in the Dynamic Ports range!")
3149