1#!/usr/bin/env python3
2#
3#   Copyright 2016 Google, Inc.
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import logging
18import os
19import shutil
20import time
21
22from collections import namedtuple
23from enum import IntEnum
24from queue import Empty
25
26from acts import asserts
27from acts import context
28from acts import signals
29from acts import utils
30from acts.controllers import attenuator
31from acts.controllers.ap_lib import hostapd_security
32from acts.controllers.ap_lib import hostapd_ap_preset
33from acts.controllers.ap_lib.hostapd_constants import BAND_2G
34from acts.controllers.ap_lib.hostapd_constants import BAND_5G
35from acts.test_utils.wifi import wifi_constants
36from acts.test_utils.tel import tel_defines
37
38# Default timeout used for reboot, toggle WiFi and Airplane mode,
39# for the system to settle down after the operation.
40DEFAULT_TIMEOUT = 10
41# Number of seconds to wait for events that are supposed to happen quickly.
42# Like onSuccess for start background scan and confirmation on wifi state
43# change.
44SHORT_TIMEOUT = 30
45ROAMING_TIMEOUT = 30
46WIFI_CONNECTION_TIMEOUT_DEFAULT = 30
47# Speed of light in m/s.
48SPEED_OF_LIGHT = 299792458
49
50DEFAULT_PING_ADDR = "https://www.google.com/robots.txt"
51
52roaming_attn = {
53        "AP1_on_AP2_off": [
54            0,
55            0,
56            95,
57            95
58        ],
59        "AP1_off_AP2_on": [
60            95,
61            95,
62            0,
63            0
64        ],
65        "default": [
66            0,
67            0,
68            0,
69            0
70        ]
71    }
72
73
74class WifiEnums():
75
76    SSID_KEY = "SSID"
77    SSID_PATTERN_KEY = "ssidPattern"
78    NETID_KEY = "network_id"
79    BSSID_KEY = "BSSID"
80    BSSID_PATTERN_KEY = "bssidPattern"
81    PWD_KEY = "password"
82    frequency_key = "frequency"
83    APBAND_KEY = "apBand"
84    HIDDEN_KEY = "hiddenSSID"
85    IS_APP_INTERACTION_REQUIRED = "isAppInteractionRequired"
86    IS_USER_INTERACTION_REQUIRED = "isUserInteractionRequired"
87    IS_METERED = "isMetered"
88    PRIORITY = "priority"
89    SECURITY = "security"
90
91    WIFI_CONFIG_APBAND_2G = 0
92    WIFI_CONFIG_APBAND_5G = 1
93    WIFI_CONFIG_APBAND_AUTO = -1
94
95    WIFI_WPS_INFO_PBC = 0
96    WIFI_WPS_INFO_DISPLAY = 1
97    WIFI_WPS_INFO_KEYPAD = 2
98    WIFI_WPS_INFO_LABEL = 3
99    WIFI_WPS_INFO_INVALID = 4
100
101    class CountryCode():
102        CHINA = "CN"
103        JAPAN = "JP"
104        UK = "GB"
105        US = "US"
106        UNKNOWN = "UNKNOWN"
107
108    # Start of Macros for EAP
109    # EAP types
110    class Eap(IntEnum):
111        NONE = -1
112        PEAP = 0
113        TLS = 1
114        TTLS = 2
115        PWD = 3
116        SIM = 4
117        AKA = 5
118        AKA_PRIME = 6
119        UNAUTH_TLS = 7
120
121    # EAP Phase2 types
122    class EapPhase2(IntEnum):
123        NONE = 0
124        PAP = 1
125        MSCHAP = 2
126        MSCHAPV2 = 3
127        GTC = 4
128
129    class Enterprise:
130        # Enterprise Config Macros
131        EMPTY_VALUE = "NULL"
132        EAP = "eap"
133        PHASE2 = "phase2"
134        IDENTITY = "identity"
135        ANON_IDENTITY = "anonymous_identity"
136        PASSWORD = "password"
137        SUBJECT_MATCH = "subject_match"
138        ALTSUBJECT_MATCH = "altsubject_match"
139        DOM_SUFFIX_MATCH = "domain_suffix_match"
140        CLIENT_CERT = "client_cert"
141        CA_CERT = "ca_cert"
142        ENGINE = "engine"
143        ENGINE_ID = "engine_id"
144        PRIVATE_KEY_ID = "key_id"
145        REALM = "realm"
146        PLMN = "plmn"
147        FQDN = "FQDN"
148        FRIENDLY_NAME = "providerFriendlyName"
149        ROAMING_IDS = "roamingConsortiumIds"
150    # End of Macros for EAP
151
152    # Macros for wifi p2p.
153    WIFI_P2P_SERVICE_TYPE_ALL = 0
154    WIFI_P2P_SERVICE_TYPE_BONJOUR = 1
155    WIFI_P2P_SERVICE_TYPE_UPNP = 2
156    WIFI_P2P_SERVICE_TYPE_VENDOR_SPECIFIC = 255
157
158    class ScanResult:
159        CHANNEL_WIDTH_20MHZ = 0
160        CHANNEL_WIDTH_40MHZ = 1
161        CHANNEL_WIDTH_80MHZ = 2
162        CHANNEL_WIDTH_160MHZ = 3
163        CHANNEL_WIDTH_80MHZ_PLUS_MHZ = 4
164
165    # Macros for wifi rtt.
166    class RttType(IntEnum):
167        TYPE_ONE_SIDED = 1
168        TYPE_TWO_SIDED = 2
169
170    class RttPeerType(IntEnum):
171        PEER_TYPE_AP = 1
172        PEER_TYPE_STA = 2  # Requires NAN.
173        PEER_P2P_GO = 3
174        PEER_P2P_CLIENT = 4
175        PEER_NAN = 5
176
177    class RttPreamble(IntEnum):
178        PREAMBLE_LEGACY = 0x01
179        PREAMBLE_HT = 0x02
180        PREAMBLE_VHT = 0x04
181
182    class RttBW(IntEnum):
183        BW_5_SUPPORT = 0x01
184        BW_10_SUPPORT = 0x02
185        BW_20_SUPPORT = 0x04
186        BW_40_SUPPORT = 0x08
187        BW_80_SUPPORT = 0x10
188        BW_160_SUPPORT = 0x20
189
190    class Rtt(IntEnum):
191        STATUS_SUCCESS = 0
192        STATUS_FAILURE = 1
193        STATUS_FAIL_NO_RSP = 2
194        STATUS_FAIL_REJECTED = 3
195        STATUS_FAIL_NOT_SCHEDULED_YET = 4
196        STATUS_FAIL_TM_TIMEOUT = 5
197        STATUS_FAIL_AP_ON_DIFF_CHANNEL = 6
198        STATUS_FAIL_NO_CAPABILITY = 7
199        STATUS_ABORTED = 8
200        STATUS_FAIL_INVALID_TS = 9
201        STATUS_FAIL_PROTOCOL = 10
202        STATUS_FAIL_SCHEDULE = 11
203        STATUS_FAIL_BUSY_TRY_LATER = 12
204        STATUS_INVALID_REQ = 13
205        STATUS_NO_WIFI = 14
206        STATUS_FAIL_FTM_PARAM_OVERRIDE = 15
207
208        REASON_UNSPECIFIED = -1
209        REASON_NOT_AVAILABLE = -2
210        REASON_INVALID_LISTENER = -3
211        REASON_INVALID_REQUEST = -4
212
213    class RttParam:
214        device_type = "deviceType"
215        request_type = "requestType"
216        BSSID = "bssid"
217        channel_width = "channelWidth"
218        frequency = "frequency"
219        center_freq0 = "centerFreq0"
220        center_freq1 = "centerFreq1"
221        number_burst = "numberBurst"
222        interval = "interval"
223        num_samples_per_burst = "numSamplesPerBurst"
224        num_retries_per_measurement_frame = "numRetriesPerMeasurementFrame"
225        num_retries_per_FTMR = "numRetriesPerFTMR"
226        lci_request = "LCIRequest"
227        lcr_request = "LCRRequest"
228        burst_timeout = "burstTimeout"
229        preamble = "preamble"
230        bandwidth = "bandwidth"
231        margin = "margin"
232
233    RTT_MARGIN_OF_ERROR = {
234        RttBW.BW_80_SUPPORT: 2,
235        RttBW.BW_40_SUPPORT: 5,
236        RttBW.BW_20_SUPPORT: 5
237    }
238
239    # Macros as specified in the WifiScanner code.
240    WIFI_BAND_UNSPECIFIED = 0  # not specified
241    WIFI_BAND_24_GHZ = 1  # 2.4 GHz band
242    WIFI_BAND_5_GHZ = 2  # 5 GHz band without DFS channels
243    WIFI_BAND_5_GHZ_DFS_ONLY = 4  # 5 GHz band with DFS channels
244    WIFI_BAND_5_GHZ_WITH_DFS = 6  # 5 GHz band with DFS channels
245    WIFI_BAND_BOTH = 3  # both bands without DFS channels
246    WIFI_BAND_BOTH_WITH_DFS = 7  # both bands with DFS channels
247
248    REPORT_EVENT_AFTER_BUFFER_FULL = 0
249    REPORT_EVENT_AFTER_EACH_SCAN = 1
250    REPORT_EVENT_FULL_SCAN_RESULT = 2
251
252    SCAN_TYPE_LOW_LATENCY = 0
253    SCAN_TYPE_LOW_POWER = 1
254    SCAN_TYPE_HIGH_ACCURACY = 2
255
256    # US Wifi frequencies
257    ALL_2G_FREQUENCIES = [2412, 2417, 2422, 2427, 2432, 2437, 2442, 2447, 2452,
258                          2457, 2462]
259    DFS_5G_FREQUENCIES = [5260, 5280, 5300, 5320, 5500, 5520, 5540, 5560, 5580,
260                          5600, 5620, 5640, 5660, 5680, 5700, 5720]
261    NONE_DFS_5G_FREQUENCIES = [5180, 5200, 5220, 5240, 5745, 5765, 5785, 5805,
262                               5825]
263    ALL_5G_FREQUENCIES = DFS_5G_FREQUENCIES + NONE_DFS_5G_FREQUENCIES
264
265    band_to_frequencies = {
266        WIFI_BAND_24_GHZ: ALL_2G_FREQUENCIES,
267        WIFI_BAND_5_GHZ: NONE_DFS_5G_FREQUENCIES,
268        WIFI_BAND_5_GHZ_DFS_ONLY: DFS_5G_FREQUENCIES,
269        WIFI_BAND_5_GHZ_WITH_DFS: ALL_5G_FREQUENCIES,
270        WIFI_BAND_BOTH: ALL_2G_FREQUENCIES + NONE_DFS_5G_FREQUENCIES,
271        WIFI_BAND_BOTH_WITH_DFS: ALL_5G_FREQUENCIES + ALL_2G_FREQUENCIES
272    }
273
274    # All Wifi frequencies to channels lookup.
275    freq_to_channel = {
276        2412: 1,
277        2417: 2,
278        2422: 3,
279        2427: 4,
280        2432: 5,
281        2437: 6,
282        2442: 7,
283        2447: 8,
284        2452: 9,
285        2457: 10,
286        2462: 11,
287        2467: 12,
288        2472: 13,
289        2484: 14,
290        4915: 183,
291        4920: 184,
292        4925: 185,
293        4935: 187,
294        4940: 188,
295        4945: 189,
296        4960: 192,
297        4980: 196,
298        5035: 7,
299        5040: 8,
300        5045: 9,
301        5055: 11,
302        5060: 12,
303        5080: 16,
304        5170: 34,
305        5180: 36,
306        5190: 38,
307        5200: 40,
308        5210: 42,
309        5220: 44,
310        5230: 46,
311        5240: 48,
312        5260: 52,
313        5280: 56,
314        5300: 60,
315        5320: 64,
316        5500: 100,
317        5520: 104,
318        5540: 108,
319        5560: 112,
320        5580: 116,
321        5600: 120,
322        5620: 124,
323        5640: 128,
324        5660: 132,
325        5680: 136,
326        5700: 140,
327        5745: 149,
328        5765: 153,
329        5785: 157,
330        5795: 159,
331        5805: 161,
332        5825: 165,
333    }
334
335    # All Wifi channels to frequencies lookup.
336    channel_2G_to_freq = {
337        1: 2412,
338        2: 2417,
339        3: 2422,
340        4: 2427,
341        5: 2432,
342        6: 2437,
343        7: 2442,
344        8: 2447,
345        9: 2452,
346        10: 2457,
347        11: 2462,
348        12: 2467,
349        13: 2472,
350        14: 2484
351    }
352
353    channel_5G_to_freq = {
354        183: 4915,
355        184: 4920,
356        185: 4925,
357        187: 4935,
358        188: 4940,
359        189: 4945,
360        192: 4960,
361        196: 4980,
362        7: 5035,
363        8: 5040,
364        9: 5045,
365        11: 5055,
366        12: 5060,
367        16: 5080,
368        34: 5170,
369        36: 5180,
370        38: 5190,
371        40: 5200,
372        42: 5210,
373        44: 5220,
374        46: 5230,
375        48: 5240,
376        52: 5260,
377        56: 5280,
378        60: 5300,
379        64: 5320,
380        100: 5500,
381        104: 5520,
382        108: 5540,
383        112: 5560,
384        116: 5580,
385        120: 5600,
386        124: 5620,
387        128: 5640,
388        132: 5660,
389        136: 5680,
390        140: 5700,
391        149: 5745,
392        151: 5755,
393        153: 5765,
394        155: 5775,
395        157: 5785,
396        159: 5795,
397        161: 5805,
398        165: 5825
399    }
400
401
402class WifiChannelBase:
403    ALL_2G_FREQUENCIES = []
404    DFS_5G_FREQUENCIES = []
405    NONE_DFS_5G_FREQUENCIES = []
406    ALL_5G_FREQUENCIES = DFS_5G_FREQUENCIES + NONE_DFS_5G_FREQUENCIES
407    MIX_CHANNEL_SCAN = []
408
409    def band_to_freq(self, band):
410        _band_to_frequencies = {
411            WifiEnums.WIFI_BAND_24_GHZ: self.ALL_2G_FREQUENCIES,
412            WifiEnums.WIFI_BAND_5_GHZ: self.NONE_DFS_5G_FREQUENCIES,
413            WifiEnums.WIFI_BAND_5_GHZ_DFS_ONLY: self.DFS_5G_FREQUENCIES,
414            WifiEnums.WIFI_BAND_5_GHZ_WITH_DFS: self.ALL_5G_FREQUENCIES,
415            WifiEnums.WIFI_BAND_BOTH:
416            self.ALL_2G_FREQUENCIES + self.NONE_DFS_5G_FREQUENCIES,
417            WifiEnums.WIFI_BAND_BOTH_WITH_DFS:
418            self.ALL_5G_FREQUENCIES + self.ALL_2G_FREQUENCIES
419        }
420        return _band_to_frequencies[band]
421
422
423class WifiChannelUS(WifiChannelBase):
424    # US Wifi frequencies
425    ALL_2G_FREQUENCIES = [2412, 2417, 2422, 2427, 2432, 2437, 2442, 2447, 2452,
426                          2457, 2462]
427    NONE_DFS_5G_FREQUENCIES = [5180, 5200, 5220, 5240, 5745, 5765, 5785, 5805,
428                               5825]
429    MIX_CHANNEL_SCAN = [2412, 2437, 2462, 5180, 5200, 5280, 5260, 5300, 5500,
430                        5320, 5520, 5560, 5700, 5745, 5805]
431
432    def __init__(self, model=None):
433        self.DFS_5G_FREQUENCIES = [5260, 5280, 5300, 5320, 5500, 5520,
434                                   5540, 5560, 5580, 5600, 5620, 5640,
435                                   5660, 5680, 5700, 5720]
436        self.ALL_5G_FREQUENCIES = self.DFS_5G_FREQUENCIES + self.NONE_DFS_5G_FREQUENCIES
437
438
439class WifiReferenceNetworks:
440    """ Class to parse and return networks of different band and
441        auth type from reference_networks
442    """
443    def __init__(self, obj):
444        self.reference_networks = obj
445        self.WIFI_2G = "2g"
446        self.WIFI_5G = "5g"
447
448        self.secure_networks_2g = []
449        self.secure_networks_5g = []
450        self.open_networks_2g = []
451        self.open_networks_5g = []
452        self._parse_networks()
453
454    def _parse_networks(self):
455        for network in self.reference_networks:
456            for key in network:
457                if key == self.WIFI_2G:
458                    if "password" in network[key]:
459                        self.secure_networks_2g.append(network[key])
460                    else:
461                        self.open_networks_2g.append(network[key])
462                else:
463                    if "password" in network[key]:
464                        self.secure_networks_5g.append(network[key])
465                    else:
466                        self.open_networks_5g.append(network[key])
467
468    def return_2g_secure_networks(self):
469        return self.secure_networks_2g
470
471    def return_5g_secure_networks(self):
472        return self.secure_networks_5g
473
474    def return_2g_open_networks(self):
475        return self.open_networks_2g
476
477    def return_5g_open_networks(self):
478        return self.open_networks_5g
479
480    def return_secure_networks(self):
481        return self.secure_networks_2g + self.secure_networks_5g
482
483    def return_open_networks(self):
484        return self.open_networks_2g + self.open_networks_5g
485
486
487def _assert_on_fail_handler(func, assert_on_fail, *args, **kwargs):
488    """Wrapper function that handles the bahevior of assert_on_fail.
489
490    When assert_on_fail is True, let all test signals through, which can
491    terminate test cases directly. When assert_on_fail is False, the wrapper
492    raises no test signals and reports operation status by returning True or
493    False.
494
495    Args:
496        func: The function to wrap. This function reports operation status by
497              raising test signals.
498        assert_on_fail: A boolean that specifies if the output of the wrapper
499                        is test signal based or return value based.
500        args: Positional args for func.
501        kwargs: Name args for func.
502
503    Returns:
504        If assert_on_fail is True, returns True/False to signal operation
505        status, otherwise return nothing.
506    """
507    try:
508        func(*args, **kwargs)
509        if not assert_on_fail:
510            return True
511    except signals.TestSignal:
512        if assert_on_fail:
513            raise
514        return False
515
516
517def assert_network_in_list(target, network_list):
518    """Makes sure a specified target Wi-Fi network exists in a list of Wi-Fi
519    networks.
520
521    Args:
522        target: A dict representing a Wi-Fi network.
523                E.g. {WifiEnums.SSID_KEY: "SomeNetwork"}
524        network_list: A list of dicts, each representing a Wi-Fi network.
525    """
526    match_results = match_networks(target, network_list)
527    asserts.assert_true(
528        match_results, "Target network %s, does not exist in network list %s" %
529        (target, network_list))
530
531
532def match_networks(target_params, networks):
533    """Finds the WiFi networks that match a given set of parameters in a list
534    of WiFi networks.
535
536    To be considered a match, the network should contain every key-value pair
537    of target_params
538
539    Args:
540        target_params: A dict with 1 or more key-value pairs representing a Wi-Fi network.
541                       E.g { 'SSID': 'wh_ap1_5g', 'BSSID': '30:b5:c2:33:e4:47' }
542        networks: A list of dict objects representing WiFi networks.
543
544    Returns:
545        The networks that match the target parameters.
546    """
547    results = []
548    asserts.assert_true(target_params,
549                        "Expected networks object 'target_params' is empty")
550    for n in networks:
551        add_network = 1
552        for k, v in target_params.items():
553            if k not in n:
554                add_network = 0
555                break
556            if n[k] != v:
557                add_network = 0
558                break
559        if add_network:
560            results.append(n)
561    return results
562
563
564def wait_for_wifi_state(ad, state, assert_on_fail=True):
565    """Waits for the device to transition to the specified wifi state
566
567    Args:
568        ad: An AndroidDevice object.
569        state: Wifi state to wait for.
570        assert_on_fail: If True, error checks in this function will raise test
571                        failure signals.
572
573    Returns:
574        If assert_on_fail is False, function returns True if the device transitions
575        to the specified state, False otherwise. If assert_on_fail is True, no return value.
576    """
577    return _assert_on_fail_handler(
578        _wait_for_wifi_state, assert_on_fail, ad, state=state)
579
580
581def _wait_for_wifi_state(ad, state):
582    """Toggles the state of wifi.
583
584    TestFailure signals are raised when something goes wrong.
585
586    Args:
587        ad: An AndroidDevice object.
588        state: Wifi state to wait for.
589    """
590    if state == ad.droid.wifiCheckState():
591        # Check if the state is already achieved, so we don't wait for the
592        # state change event by mistake.
593        return
594    ad.droid.wifiStartTrackingStateChange()
595    fail_msg = "Device did not transition to Wi-Fi state to %s on %s." % (state,
596                                                           ad.serial)
597    try:
598        ad.ed.wait_for_event(wifi_constants.WIFI_STATE_CHANGED,
599                             lambda x: x["data"]["enabled"] == state,
600                             SHORT_TIMEOUT)
601    except Empty:
602        asserts.assert_equal(state, ad.droid.wifiCheckState(), fail_msg)
603    finally:
604        ad.droid.wifiStopTrackingStateChange()
605
606
607def wifi_toggle_state(ad, new_state=None, assert_on_fail=True):
608    """Toggles the state of wifi.
609
610    Args:
611        ad: An AndroidDevice object.
612        new_state: Wifi state to set to. If None, opposite of the current state.
613        assert_on_fail: If True, error checks in this function will raise test
614                        failure signals.
615
616    Returns:
617        If assert_on_fail is False, function returns True if the toggle was
618        successful, False otherwise. If assert_on_fail is True, no return value.
619    """
620    return _assert_on_fail_handler(
621        _wifi_toggle_state, assert_on_fail, ad, new_state=new_state)
622
623
624def _wifi_toggle_state(ad, new_state=None):
625    """Toggles the state of wifi.
626
627    TestFailure signals are raised when something goes wrong.
628
629    Args:
630        ad: An AndroidDevice object.
631        new_state: The state to set Wi-Fi to. If None, opposite of the current
632                   state will be set.
633    """
634    if new_state is None:
635        new_state = not ad.droid.wifiCheckState()
636    elif new_state == ad.droid.wifiCheckState():
637        # Check if the new_state is already achieved, so we don't wait for the
638        # state change event by mistake.
639        return
640    ad.droid.wifiStartTrackingStateChange()
641    ad.log.info("Setting Wi-Fi state to %s.", new_state)
642    ad.ed.clear_all_events()
643    # Setting wifi state.
644    ad.droid.wifiToggleState(new_state)
645    fail_msg = "Failed to set Wi-Fi state to %s on %s." % (new_state,
646                                                           ad.serial)
647    try:
648        ad.ed.wait_for_event(wifi_constants.WIFI_STATE_CHANGED,
649                             lambda x: x["data"]["enabled"] == new_state,
650                             SHORT_TIMEOUT)
651    except Empty:
652        asserts.assert_equal(new_state, ad.droid.wifiCheckState(), fail_msg)
653    finally:
654        ad.droid.wifiStopTrackingStateChange()
655
656
657def reset_wifi(ad):
658    """Clears all saved Wi-Fi networks on a device.
659
660    This will turn Wi-Fi on.
661
662    Args:
663        ad: An AndroidDevice object.
664
665    """
666    networks = ad.droid.wifiGetConfiguredNetworks()
667    if not networks:
668        return
669    for n in networks:
670        ad.droid.wifiForgetNetwork(n['networkId'])
671        try:
672            event = ad.ed.pop_event(wifi_constants.WIFI_FORGET_NW_SUCCESS,
673                                    SHORT_TIMEOUT)
674        except Empty:
675            logging.warning("Could not confirm the removal of network %s.", n)
676    # Check again to see if there's any network left.
677    asserts.assert_true(
678        not ad.droid.wifiGetConfiguredNetworks(),
679        "Failed to remove these configured Wi-Fi networks: %s" % networks)
680
681
682def toggle_airplane_mode_on_and_off(ad):
683    """Turn ON and OFF Airplane mode.
684
685    ad: An AndroidDevice object.
686    Returns: Assert if turning on/off Airplane mode fails.
687
688    """
689    ad.log.debug("Toggling Airplane mode ON.")
690    asserts.assert_true(
691        utils.force_airplane_mode(ad, True),
692        "Can not turn on airplane mode on: %s" % ad.serial)
693    time.sleep(DEFAULT_TIMEOUT)
694    ad.log.debug("Toggling Airplane mode OFF.")
695    asserts.assert_true(
696        utils.force_airplane_mode(ad, False),
697        "Can not turn on airplane mode on: %s" % ad.serial)
698    time.sleep(DEFAULT_TIMEOUT)
699
700
701def toggle_wifi_off_and_on(ad):
702    """Turn OFF and ON WiFi.
703
704    ad: An AndroidDevice object.
705    Returns: Assert if turning off/on WiFi fails.
706
707    """
708    ad.log.debug("Toggling wifi OFF.")
709    wifi_toggle_state(ad, False)
710    time.sleep(DEFAULT_TIMEOUT)
711    ad.log.debug("Toggling wifi ON.")
712    wifi_toggle_state(ad, True)
713    time.sleep(DEFAULT_TIMEOUT)
714
715
716def wifi_forget_network(ad, net_ssid):
717    """Remove configured Wifi network on an android device.
718
719    Args:
720        ad: android_device object for forget network.
721        net_ssid: ssid of network to be forget
722
723    """
724    networks = ad.droid.wifiGetConfiguredNetworks()
725    if not networks:
726        return
727    for n in networks:
728        if net_ssid in n[WifiEnums.SSID_KEY]:
729            ad.droid.wifiForgetNetwork(n['networkId'])
730            try:
731                event = ad.ed.pop_event(wifi_constants.WIFI_FORGET_NW_SUCCESS,
732                                        SHORT_TIMEOUT)
733            except Empty:
734                asserts.fail("Failed to remove network %s." % n)
735
736
737def wifi_test_device_init(ad):
738    """Initializes an android device for wifi testing.
739
740    0. Make sure SL4A connection is established on the android device.
741    1. Disable location service's WiFi scan.
742    2. Turn WiFi on.
743    3. Clear all saved networks.
744    4. Set country code to US.
745    5. Enable WiFi verbose logging.
746    6. Sync device time with computer time.
747    7. Turn off cellular data.
748    8. Turn off ambient display.
749    """
750    utils.require_sl4a((ad, ))
751    ad.droid.wifiScannerToggleAlwaysAvailable(False)
752    msg = "Failed to turn off location service's scan."
753    asserts.assert_true(not ad.droid.wifiScannerIsAlwaysAvailable(), msg)
754    wifi_toggle_state(ad, True)
755    reset_wifi(ad)
756    ad.droid.wifiEnableVerboseLogging(1)
757    msg = "Failed to enable WiFi verbose logging."
758    asserts.assert_equal(ad.droid.wifiGetVerboseLoggingLevel(), 1, msg)
759    # We don't verify the following settings since they are not critical.
760    # Set wpa_supplicant log level to EXCESSIVE.
761    output = ad.adb.shell("wpa_cli -i wlan0 -p -g@android:wpa_wlan0 IFNAME="
762                          "wlan0 log_level EXCESSIVE")
763    ad.log.info("wpa_supplicant log change status: %s", output)
764    utils.sync_device_time(ad)
765    ad.droid.telephonyToggleDataConnection(False)
766    set_wifi_country_code(ad, WifiEnums.CountryCode.US)
767    utils.set_ambient_display(ad, False)
768
769def set_wifi_country_code(ad, country_code):
770    """Sets the wifi country code on the device.
771
772    Args:
773        ad: An AndroidDevice object.
774        country_code: 2 letter ISO country code
775
776    Raises:
777        An RpcException if unable to set the country code.
778    """
779    try:
780        ad.adb.shell("cmd wifi force-country-code enabled %s" % country_code)
781    except ad.adb.AdbError as e:
782        ad.droid.wifiSetCountryCode(WifiEnums.CountryCode.US)
783
784
785def start_wifi_connection_scan(ad):
786    """Starts a wifi connection scan and wait for results to become available.
787
788    Args:
789        ad: An AndroidDevice object.
790    """
791    ad.ed.clear_all_events()
792    ad.droid.wifiStartScan()
793    try:
794        ad.ed.pop_event("WifiManagerScanResultsAvailable", 60)
795    except Empty:
796        asserts.fail("Wi-Fi results did not become available within 60s.")
797
798
799def start_wifi_connection_scan_and_return_status(ad):
800    """
801    Starts a wifi connection scan and wait for results to become available
802    or a scan failure to be reported.
803
804    Args:
805        ad: An AndroidDevice object.
806    Returns:
807        True: if scan succeeded & results are available
808        False: if scan failed
809    """
810    ad.ed.clear_all_events()
811    ad.droid.wifiStartScan()
812    try:
813        events = ad.ed.pop_events(
814            "WifiManagerScan(ResultsAvailable|Failure)", 60)
815    except Empty:
816        asserts.fail(
817            "Wi-Fi scan results/failure did not become available within 60s.")
818    # If there are multiple matches, we check for atleast one success.
819    for event in events:
820        if event["name"] == "WifiManagerScanResultsAvailable":
821            return True
822        elif event["name"] == "WifiManagerScanFailure":
823            ad.log.debug("Scan failure received")
824    return False
825
826
827def start_wifi_connection_scan_and_check_for_network(ad, network_ssid,
828                                                     max_tries=3):
829    """
830    Start connectivity scans & checks if the |network_ssid| is seen in
831    scan results. The method performs a max of |max_tries| connectivity scans
832    to find the network.
833
834    Args:
835        ad: An AndroidDevice object.
836        network_ssid: SSID of the network we are looking for.
837        max_tries: Number of scans to try.
838    Returns:
839        True: if network_ssid is found in scan results.
840        False: if network_ssid is not found in scan results.
841    """
842    for num_tries in range(max_tries):
843        if start_wifi_connection_scan_and_return_status(ad):
844            scan_results = ad.droid.wifiGetScanResults()
845            match_results = match_networks(
846                {WifiEnums.SSID_KEY: network_ssid}, scan_results)
847            if len(match_results) > 0:
848                return True
849    return False
850
851
852def start_wifi_connection_scan_and_ensure_network_found(ad, network_ssid,
853                                                        max_tries=3):
854    """
855    Start connectivity scans & ensure the |network_ssid| is seen in
856    scan results. The method performs a max of |max_tries| connectivity scans
857    to find the network.
858    This method asserts on failure!
859
860    Args:
861        ad: An AndroidDevice object.
862        network_ssid: SSID of the network we are looking for.
863        max_tries: Number of scans to try.
864    """
865    ad.log.info("Starting scans to ensure %s is present", network_ssid)
866    assert_msg = "Failed to find " + network_ssid + " in scan results" \
867        " after " + str(max_tries) + " tries"
868    asserts.assert_true(start_wifi_connection_scan_and_check_for_network(
869        ad, network_ssid, max_tries), assert_msg)
870
871
872def start_wifi_connection_scan_and_ensure_network_not_found(ad, network_ssid,
873                                                            max_tries=3):
874    """
875    Start connectivity scans & ensure the |network_ssid| is not seen in
876    scan results. The method performs a max of |max_tries| connectivity scans
877    to find the network.
878    This method asserts on failure!
879
880    Args:
881        ad: An AndroidDevice object.
882        network_ssid: SSID of the network we are looking for.
883        max_tries: Number of scans to try.
884    """
885    ad.log.info("Starting scans to ensure %s is not present", network_ssid)
886    assert_msg = "Found " + network_ssid + " in scan results" \
887        " after " + str(max_tries) + " tries"
888    asserts.assert_false(start_wifi_connection_scan_and_check_for_network(
889        ad, network_ssid, max_tries), assert_msg)
890
891
892def start_wifi_background_scan(ad, scan_setting):
893    """Starts wifi background scan.
894
895    Args:
896        ad: android_device object to initiate connection on.
897        scan_setting: A dict representing the settings of the scan.
898
899    Returns:
900        If scan was started successfully, event data of success event is returned.
901    """
902    idx = ad.droid.wifiScannerStartBackgroundScan(scan_setting)
903    event = ad.ed.pop_event("WifiScannerScan{}onSuccess".format(idx),
904                            SHORT_TIMEOUT)
905    return event['data']
906
907
908def start_wifi_tethering(ad, ssid, password, band=None, hidden=None):
909    """Starts wifi tethering on an android_device.
910
911    Args:
912        ad: android_device to start wifi tethering on.
913        ssid: The SSID the soft AP should broadcast.
914        password: The password the soft AP should use.
915        band: The band the soft AP should be set on. It should be either
916            WifiEnums.WIFI_CONFIG_APBAND_2G or WifiEnums.WIFI_CONFIG_APBAND_5G.
917        hidden: boolean to indicate if the AP needs to be hidden or not.
918
919    Returns:
920        No return value. Error checks in this function will raise test failure signals
921    """
922    config = {WifiEnums.SSID_KEY: ssid}
923    if password:
924        config[WifiEnums.PWD_KEY] = password
925    if band:
926        config[WifiEnums.APBAND_KEY] = band
927    if hidden:
928      config[WifiEnums.HIDDEN_KEY] = hidden
929    asserts.assert_true(
930        ad.droid.wifiSetWifiApConfiguration(config),
931        "Failed to update WifiAp Configuration")
932    ad.droid.wifiStartTrackingTetherStateChange()
933    ad.droid.connectivityStartTethering(tel_defines.TETHERING_WIFI, False)
934    try:
935        ad.ed.pop_event("ConnectivityManagerOnTetheringStarted")
936        ad.ed.wait_for_event("TetherStateChanged",
937                             lambda x: x["data"]["ACTIVE_TETHER"], 30)
938        ad.log.debug("Tethering started successfully.")
939    except Empty:
940        msg = "Failed to receive confirmation of wifi tethering starting"
941        asserts.fail(msg)
942    finally:
943        ad.droid.wifiStopTrackingTetherStateChange()
944
945
946def save_wifi_soft_ap_config(ad, wifi_config, band=None, hidden=None):
947    """ Save a soft ap configuration """
948    if band:
949        wifi_config[WifiEnums.APBAND_KEY] = band
950    if hidden:
951        wifi_config[WifiEnums.HIDDEN_KEY] = hidden
952    asserts.assert_true(ad.droid.wifiSetWifiApConfiguration(wifi_config),
953                        "Failed to set WifiAp Configuration")
954
955    wifi_ap = ad.droid.wifiGetApConfiguration()
956    asserts.assert_true(
957        wifi_ap[WifiEnums.SSID_KEY] == wifi_config[WifiEnums.SSID_KEY],
958        "Hotspot SSID doesn't match with expected SSID")
959
960
961def start_wifi_tethering_saved_config(ad):
962    """ Turn on wifi hotspot with a config that is already saved """
963    ad.droid.wifiStartTrackingTetherStateChange()
964    ad.droid.connectivityStartTethering(tel_defines.TETHERING_WIFI, False)
965    try:
966        ad.ed.pop_event("ConnectivityManagerOnTetheringStarted")
967        ad.ed.wait_for_event("TetherStateChanged",
968                             lambda x: x["data"]["ACTIVE_TETHER"], 30)
969    except:
970        asserts.fail("Didn't receive wifi tethering starting confirmation")
971    finally:
972        ad.droid.wifiStopTrackingTetherStateChange()
973
974
975def stop_wifi_tethering(ad):
976    """Stops wifi tethering on an android_device.
977
978    Args:
979        ad: android_device to stop wifi tethering on.
980    """
981    ad.droid.wifiStartTrackingTetherStateChange()
982    ad.droid.connectivityStopTethering(tel_defines.TETHERING_WIFI)
983    try:
984        ad.ed.pop_event("WifiManagerApDisabled", 30)
985        ad.ed.wait_for_event("TetherStateChanged",
986                             lambda x: not x["data"]["ACTIVE_TETHER"], 30)
987    except Empty:
988        msg = "Failed to receive confirmation of wifi tethering stopping"
989        asserts.fail(msg)
990    finally:
991        ad.droid.wifiStopTrackingTetherStateChange()
992
993
994def toggle_wifi_and_wait_for_reconnection(ad,
995                                          network,
996                                          num_of_tries=1,
997                                          assert_on_fail=True):
998    """Toggle wifi state and then wait for Android device to reconnect to
999    the provided wifi network.
1000
1001    This expects the device to be already connected to the provided network.
1002
1003    Logic steps are
1004     1. Ensure that we're connected to the network.
1005     2. Turn wifi off.
1006     3. Wait for 10 seconds.
1007     4. Turn wifi on.
1008     5. Wait for the "connected" event, then confirm the connected ssid is the
1009        one requested.
1010
1011    Args:
1012        ad: android_device object to initiate connection on.
1013        network: A dictionary representing the network to await connection. The
1014                 dictionary must have the key "SSID".
1015        num_of_tries: An integer that is the number of times to try before
1016                      delaring failure. Default is 1.
1017        assert_on_fail: If True, error checks in this function will raise test
1018                        failure signals.
1019
1020    Returns:
1021        If assert_on_fail is False, function returns True if the toggle was
1022        successful, False otherwise. If assert_on_fail is True, no return value.
1023    """
1024    return _assert_on_fail_handler(
1025        _toggle_wifi_and_wait_for_reconnection,
1026        assert_on_fail,
1027        ad,
1028        network,
1029        num_of_tries=num_of_tries)
1030
1031
1032def _toggle_wifi_and_wait_for_reconnection(ad, network, num_of_tries=1):
1033    """Toggle wifi state and then wait for Android device to reconnect to
1034    the provided wifi network.
1035
1036    This expects the device to be already connected to the provided network.
1037
1038    Logic steps are
1039     1. Ensure that we're connected to the network.
1040     2. Turn wifi off.
1041     3. Wait for 10 seconds.
1042     4. Turn wifi on.
1043     5. Wait for the "connected" event, then confirm the connected ssid is the
1044        one requested.
1045
1046    This will directly fail a test if anything goes wrong.
1047
1048    Args:
1049        ad: android_device object to initiate connection on.
1050        network: A dictionary representing the network to await connection. The
1051                 dictionary must have the key "SSID".
1052        num_of_tries: An integer that is the number of times to try before
1053                      delaring failure. Default is 1.
1054    """
1055    expected_ssid = network[WifiEnums.SSID_KEY]
1056    # First ensure that we're already connected to the provided network.
1057    verify_con = {WifiEnums.SSID_KEY: expected_ssid}
1058    verify_wifi_connection_info(ad, verify_con)
1059    # Now toggle wifi state and wait for the connection event.
1060    wifi_toggle_state(ad, False)
1061    time.sleep(10)
1062    wifi_toggle_state(ad, True)
1063    ad.droid.wifiStartTrackingStateChange()
1064    try:
1065        connect_result = None
1066        for i in range(num_of_tries):
1067            try:
1068                connect_result = ad.ed.pop_event(wifi_constants.WIFI_CONNECTED,
1069                                                 30)
1070                break
1071            except Empty:
1072                pass
1073        asserts.assert_true(connect_result,
1074                            "Failed to connect to Wi-Fi network %s on %s" %
1075                            (network, ad.serial))
1076        logging.debug("Connection result on %s: %s.", ad.serial,
1077                      connect_result)
1078        actual_ssid = connect_result['data'][WifiEnums.SSID_KEY]
1079        asserts.assert_equal(actual_ssid, expected_ssid,
1080                             "Connected to the wrong network on %s."
1081                             "Expected %s, but got %s." %
1082                             (ad.serial, expected_ssid, actual_ssid))
1083        logging.info("Connected to Wi-Fi network %s on %s", actual_ssid,
1084                     ad.serial)
1085    finally:
1086        ad.droid.wifiStopTrackingStateChange()
1087
1088
1089def wait_for_connect(ad, expected_ssid=None, expected_id=None, tries=2,
1090                     assert_on_fail=True):
1091    """Wait for a connect event.
1092
1093    This will directly fail a test if anything goes wrong.
1094
1095    Args:
1096        ad: An Android device object.
1097        expected_ssid: SSID of the network to connect to.
1098        expected_id: Network Id of the network to connect to.
1099        tries: An integer that is the number of times to try before failing.
1100        assert_on_fail: If True, error checks in this function will raise test
1101                        failure signals.
1102
1103    Returns:
1104        Returns a value only if assert_on_fail is false.
1105        Returns True if the connection was successful, False otherwise.
1106    """
1107    return _assert_on_fail_handler(
1108        _wait_for_connect, assert_on_fail, ad, expected_ssid, expected_id,
1109        tries)
1110
1111
1112def _wait_for_connect(ad, expected_ssid=None, expected_id=None, tries=2):
1113    """Wait for a connect event.
1114
1115    Args:
1116        ad: An Android device object.
1117        expected_ssid: SSID of the network to connect to.
1118        expected_id: Network Id of the network to connect to.
1119        tries: An integer that is the number of times to try before failing.
1120    """
1121    ad.droid.wifiStartTrackingStateChange()
1122    try:
1123        connect_result = _wait_for_connect_event(
1124            ad, ssid=expected_ssid, id=expected_id, tries=tries)
1125        asserts.assert_true(connect_result,
1126                            "Failed to connect to Wi-Fi network %s" %
1127                            expected_ssid)
1128        ad.log.debug("Wi-Fi connection result: %s.", connect_result)
1129        actual_ssid = connect_result['data'][WifiEnums.SSID_KEY]
1130        if expected_ssid:
1131            asserts.assert_equal(actual_ssid, expected_ssid,
1132                                 "Connected to the wrong network")
1133        actual_id = connect_result['data'][WifiEnums.NETID_KEY]
1134        if expected_id:
1135            asserts.assert_equal(actual_id, expected_id,
1136                                 "Connected to the wrong network")
1137        ad.log.info("Connected to Wi-Fi network %s.", actual_ssid)
1138    except Empty:
1139        asserts.fail("Failed to start connection process to %s" %
1140                     expected_ssid)
1141    except Exception as error:
1142        ad.log.error("Failed to connect to %s with error %s", expected_ssid,
1143                     error)
1144        raise signals.TestFailure("Failed to connect to %s network" %
1145                                  expected_ssid)
1146    finally:
1147        ad.droid.wifiStopTrackingStateChange()
1148
1149
1150def _wait_for_connect_event(ad, ssid=None, id=None, tries=1):
1151    """Wait for a connect event on queue and pop when available.
1152
1153    Args:
1154        ad: An Android device object.
1155        ssid: SSID of the network to connect to.
1156        id: Network Id of the network to connect to.
1157        tries: An integer that is the number of times to try before failing.
1158
1159    Returns:
1160        A dict with details of the connection data, which looks like this:
1161        {
1162         'time': 1485460337798,
1163         'name': 'WifiNetworkConnected',
1164         'data': {
1165                  'rssi': -27,
1166                  'is_24ghz': True,
1167                  'mac_address': '02:00:00:00:00:00',
1168                  'network_id': 1,
1169                  'BSSID': '30:b5:c2:33:d3:fc',
1170                  'ip_address': 117483712,
1171                  'link_speed': 54,
1172                  'supplicant_state': 'completed',
1173                  'hidden_ssid': False,
1174                  'SSID': 'wh_ap1_2g',
1175                  'is_5ghz': False}
1176        }
1177
1178    """
1179    conn_result = None
1180
1181    # If ssid and network id is None, just wait for any connect event.
1182    if id is None and ssid is None:
1183        for i in range(tries):
1184            try:
1185                conn_result = ad.ed.pop_event(wifi_constants.WIFI_CONNECTED, 30)
1186                break
1187            except Empty:
1188                pass
1189    else:
1190    # If ssid or network id is specified, wait for specific connect event.
1191        for i in range(tries):
1192            try:
1193                conn_result = ad.ed.pop_event(wifi_constants.WIFI_CONNECTED, 30)
1194                if id and conn_result['data'][WifiEnums.NETID_KEY] == id:
1195                    break
1196                elif ssid and conn_result['data'][WifiEnums.SSID_KEY] == ssid:
1197                    break
1198            except Empty:
1199                pass
1200
1201    return conn_result
1202
1203
1204def wait_for_disconnect(ad, timeout=10):
1205    """Wait for a disconnect event within the specified timeout.
1206
1207    Args:
1208        ad: Android device object.
1209        timeout: Timeout in seconds.
1210
1211    """
1212    try:
1213        ad.droid.wifiStartTrackingStateChange()
1214        event = ad.ed.pop_event("WifiNetworkDisconnected", timeout)
1215    except Empty:
1216        raise signals.TestFailure("Device did not disconnect from the network")
1217    finally:
1218        ad.droid.wifiStopTrackingStateChange()
1219
1220
1221def ensure_no_disconnect(ad, duration=10):
1222    """Ensure that there is no disconnect for the specified duration.
1223
1224    Args:
1225        ad: Android device object.
1226        duration: Duration in seconds.
1227
1228    """
1229    try:
1230        ad.droid.wifiStartTrackingStateChange()
1231        event = ad.ed.pop_event("WifiNetworkDisconnected", duration)
1232        raise signals.TestFailure("Device disconnected from the network")
1233    except Empty:
1234        pass
1235    finally:
1236        ad.droid.wifiStopTrackingStateChange()
1237
1238
1239def connect_to_wifi_network(ad, network, assert_on_fail=True,
1240        check_connectivity=True, hidden=False):
1241    """Connection logic for open and psk wifi networks.
1242
1243    Args:
1244        ad: AndroidDevice to use for connection
1245        network: network info of the network to connect to
1246        assert_on_fail: If true, errors from wifi_connect will raise
1247                        test failure signals.
1248        hidden: Is the Wifi network hidden.
1249    """
1250    if hidden:
1251        start_wifi_connection_scan_and_ensure_network_not_found(
1252            ad, network[WifiEnums.SSID_KEY])
1253    else:
1254        start_wifi_connection_scan_and_ensure_network_found(
1255            ad, network[WifiEnums.SSID_KEY])
1256    wifi_connect(ad,
1257                 network,
1258                 num_of_tries=3,
1259                 assert_on_fail=assert_on_fail,
1260                 check_connectivity=check_connectivity)
1261
1262
1263def connect_to_wifi_network_with_id(ad, network_id, network_ssid):
1264    """Connect to the given network using network id and verify SSID.
1265
1266    Args:
1267        network_id: int Network Id of the network.
1268        network_ssid: string SSID of the network.
1269
1270    Returns: True if connect using network id was successful;
1271             False otherwise.
1272
1273    """
1274    start_wifi_connection_scan_and_ensure_network_found(ad, network_ssid)
1275    wifi_connect_by_id(ad, network_id)
1276    connect_data = ad.droid.wifiGetConnectionInfo()
1277    connect_ssid = connect_data[WifiEnums.SSID_KEY]
1278    ad.log.debug("Expected SSID = %s Connected SSID = %s" %
1279                   (network_ssid, connect_ssid))
1280    if connect_ssid != network_ssid:
1281        return False
1282    return True
1283
1284
1285def wifi_connect(ad, network, num_of_tries=1, assert_on_fail=True,
1286        check_connectivity=True):
1287    """Connect an Android device to a wifi network.
1288
1289    Initiate connection to a wifi network, wait for the "connected" event, then
1290    confirm the connected ssid is the one requested.
1291
1292    This will directly fail a test if anything goes wrong.
1293
1294    Args:
1295        ad: android_device object to initiate connection on.
1296        network: A dictionary representing the network to connect to. The
1297                 dictionary must have the key "SSID".
1298        num_of_tries: An integer that is the number of times to try before
1299                      delaring failure. Default is 1.
1300        assert_on_fail: If True, error checks in this function will raise test
1301                        failure signals.
1302
1303    Returns:
1304        Returns a value only if assert_on_fail is false.
1305        Returns True if the connection was successful, False otherwise.
1306    """
1307    return _assert_on_fail_handler(
1308        _wifi_connect, assert_on_fail, ad, network, num_of_tries=num_of_tries,
1309          check_connectivity=check_connectivity)
1310
1311
1312def _wifi_connect(ad, network, num_of_tries=1, check_connectivity=True):
1313    """Connect an Android device to a wifi network.
1314
1315    Initiate connection to a wifi network, wait for the "connected" event, then
1316    confirm the connected ssid is the one requested.
1317
1318    This will directly fail a test if anything goes wrong.
1319
1320    Args:
1321        ad: android_device object to initiate connection on.
1322        network: A dictionary representing the network to connect to. The
1323                 dictionary must have the key "SSID".
1324        num_of_tries: An integer that is the number of times to try before
1325                      delaring failure. Default is 1.
1326    """
1327    asserts.assert_true(WifiEnums.SSID_KEY in network,
1328                        "Key '%s' must be present in network definition." %
1329                        WifiEnums.SSID_KEY)
1330    ad.droid.wifiStartTrackingStateChange()
1331    expected_ssid = network[WifiEnums.SSID_KEY]
1332    ad.droid.wifiConnectByConfig(network)
1333    ad.log.info("Starting connection process to %s", expected_ssid)
1334    try:
1335        event = ad.ed.pop_event(wifi_constants.CONNECT_BY_CONFIG_SUCCESS, 30)
1336        connect_result = _wait_for_connect_event(
1337            ad, ssid=expected_ssid, tries=num_of_tries)
1338        asserts.assert_true(connect_result,
1339                            "Failed to connect to Wi-Fi network %s on %s" %
1340                            (network, ad.serial))
1341        ad.log.debug("Wi-Fi connection result: %s.", connect_result)
1342        actual_ssid = connect_result['data'][WifiEnums.SSID_KEY]
1343        asserts.assert_equal(actual_ssid, expected_ssid,
1344                             "Connected to the wrong network on %s." %
1345                             ad.serial)
1346        ad.log.info("Connected to Wi-Fi network %s.", actual_ssid)
1347
1348        # Wait for data connection to stabilize.
1349        time.sleep(5)
1350
1351        if check_connectivity:
1352            internet = validate_connection(ad, DEFAULT_PING_ADDR, 10)
1353            if not internet:
1354                raise signals.TestFailure("Failed to connect to internet on %s" %
1355                                          expected_ssid)
1356    except Empty:
1357        asserts.fail("Failed to start connection process to %s on %s" %
1358                     (network, ad.serial))
1359    except Exception as error:
1360        ad.log.error("Failed to connect to %s with error %s", expected_ssid,
1361                     error)
1362        raise signals.TestFailure("Failed to connect to %s network" % network)
1363
1364    finally:
1365        ad.droid.wifiStopTrackingStateChange()
1366
1367
1368def wifi_connect_by_id(ad, network_id, num_of_tries=3, assert_on_fail=True):
1369    """Connect an Android device to a wifi network using network Id.
1370
1371    Start connection to the wifi network, with the given network Id, wait for
1372    the "connected" event, then verify the connected network is the one requested.
1373
1374    This will directly fail a test if anything goes wrong.
1375
1376    Args:
1377        ad: android_device object to initiate connection on.
1378        network_id: Integer specifying the network id of the network.
1379        num_of_tries: An integer that is the number of times to try before
1380                      delaring failure. Default is 1.
1381        assert_on_fail: If True, error checks in this function will raise test
1382                        failure signals.
1383
1384    Returns:
1385        Returns a value only if assert_on_fail is false.
1386        Returns True if the connection was successful, False otherwise.
1387    """
1388    _assert_on_fail_handler(_wifi_connect_by_id, assert_on_fail, ad,
1389                            network_id, num_of_tries)
1390
1391
1392def _wifi_connect_by_id(ad, network_id, num_of_tries=1):
1393    """Connect an Android device to a wifi network using it's network id.
1394
1395    Start connection to the wifi network, with the given network id, wait for
1396    the "connected" event, then verify the connected network is the one requested.
1397
1398    Args:
1399        ad: android_device object to initiate connection on.
1400        network_id: Integer specifying the network id of the network.
1401        num_of_tries: An integer that is the number of times to try before
1402                      delaring failure. Default is 1.
1403    """
1404    ad.droid.wifiStartTrackingStateChange()
1405    # Clear all previous events.
1406    ad.ed.clear_all_events()
1407    ad.droid.wifiConnectByNetworkId(network_id)
1408    ad.log.info("Starting connection to network with id %d", network_id)
1409    try:
1410        event = ad.ed.pop_event(wifi_constants.CONNECT_BY_NETID_SUCCESS, 60)
1411        connect_result = _wait_for_connect_event(
1412            ad, id=network_id, tries=num_of_tries)
1413        asserts.assert_true(connect_result,
1414                            "Failed to connect to Wi-Fi network using network id")
1415        ad.log.debug("Wi-Fi connection result: %s", connect_result)
1416        actual_id = connect_result['data'][WifiEnums.NETID_KEY]
1417        asserts.assert_equal(actual_id, network_id,
1418                             "Connected to the wrong network on %s."
1419                             "Expected network id = %d, but got %d." %
1420                             (ad.serial, network_id, actual_id))
1421        expected_ssid = connect_result['data'][WifiEnums.SSID_KEY]
1422        ad.log.info("Connected to Wi-Fi network %s with %d network id.",
1423                     expected_ssid, network_id)
1424
1425        # Wait for data connection to stabilize.
1426        time.sleep(5)
1427
1428        internet = validate_connection(ad, DEFAULT_PING_ADDR)
1429        if not internet:
1430            raise signals.TestFailure("Failed to connect to internet on %s" %
1431                                      expected_ssid)
1432    except Empty:
1433        asserts.fail("Failed to connect to network with id %d on %s" %
1434                    (network_id, ad.serial))
1435    except Exception as error:
1436        ad.log.error("Failed to connect to network with id %d with error %s",
1437                      network_id, error)
1438        raise signals.TestFailure("Failed to connect to network with network"
1439                                  " id %d" % network_id)
1440    finally:
1441        ad.droid.wifiStopTrackingStateChange()
1442
1443
1444def wifi_connect_using_network_request(ad, network, network_specifier,
1445                                       num_of_tries=3, assert_on_fail=True):
1446    """Connect an Android device to a wifi network using network request.
1447
1448    Trigger a network request with the provided network specifier,
1449    wait for the "onMatch" event, ensure that the scan results in "onMatch"
1450    event contain the specified network, then simulate the user granting the
1451    request with the specified network selected. Then wait for the "onAvailable"
1452    network callback indicating successful connection to network.
1453
1454    This will directly fail a test if anything goes wrong.
1455
1456    Args:
1457        ad: android_device object to initiate connection on.
1458        network_specifier: A dictionary representing the network specifier to
1459                           use.
1460        network: A dictionary representing the network to connect to. The
1461                 dictionary must have the key "SSID".
1462        num_of_tries: An integer that is the number of times to try before
1463                      delaring failure.
1464        assert_on_fail: If True, error checks in this function will raise test
1465                        failure signals.
1466
1467    Returns:
1468        Returns a value only if assert_on_fail is false.
1469        Returns True if the connection was successful, False otherwise.
1470    """
1471    _assert_on_fail_handler(_wifi_connect_using_network_request, assert_on_fail,
1472                            ad, network, network_specifier, num_of_tries)
1473
1474
1475def _wifi_connect_using_network_request(ad, network, network_specifier,
1476                                        num_of_tries=3):
1477    """Connect an Android device to a wifi network using network request.
1478
1479    Trigger a network request with the provided network specifier,
1480    wait for the "onMatch" event, ensure that the scan results in "onMatch"
1481    event contain the specified network, then simulate the user granting the
1482    request with the specified network selected. Then wait for the "onAvailable"
1483    network callback indicating successful connection to network.
1484
1485    Args:
1486        ad: android_device object to initiate connection on.
1487        network_specifier: A dictionary representing the network specifier to
1488                           use.
1489        network: A dictionary representing the network to connect to. The
1490                 dictionary must have the key "SSID".
1491        num_of_tries: An integer that is the number of times to try before
1492                      delaring failure.
1493    """
1494    ad.droid.wifiRequestNetworkWithSpecifier(network_specifier)
1495    ad.log.info("Sent network request with %s", network_specifier)
1496    # Need a delay here because UI interaction should only start once wifi
1497    # starts processing the request.
1498    time.sleep(wifi_constants.NETWORK_REQUEST_CB_REGISTER_DELAY_SEC)
1499    _wait_for_wifi_connect_after_network_request(ad, network, num_of_tries)
1500
1501
1502def wait_for_wifi_connect_after_network_request(ad, network, num_of_tries=3,
1503                                                assert_on_fail=True):
1504    """
1505    Simulate and verify the connection flow after initiating the network
1506    request.
1507
1508    Wait for the "onMatch" event, ensure that the scan results in "onMatch"
1509    event contain the specified network, then simulate the user granting the
1510    request with the specified network selected. Then wait for the "onAvailable"
1511    network callback indicating successful connection to network.
1512
1513    Args:
1514        ad: android_device object to initiate connection on.
1515        network: A dictionary representing the network to connect to. The
1516                 dictionary must have the key "SSID".
1517        num_of_tries: An integer that is the number of times to try before
1518                      delaring failure.
1519        assert_on_fail: If True, error checks in this function will raise test
1520                        failure signals.
1521
1522    Returns:
1523        Returns a value only if assert_on_fail is false.
1524        Returns True if the connection was successful, False otherwise.
1525    """
1526    _assert_on_fail_handler(_wait_for_wifi_connect_after_network_request,
1527                            assert_on_fail, ad, network, num_of_tries)
1528
1529
1530def _wait_for_wifi_connect_after_network_request(ad, network, num_of_tries=3):
1531    """
1532    Simulate and verify the connection flow after initiating the network
1533    request.
1534
1535    Wait for the "onMatch" event, ensure that the scan results in "onMatch"
1536    event contain the specified network, then simulate the user granting the
1537    request with the specified network selected. Then wait for the "onAvailable"
1538    network callback indicating successful connection to network.
1539
1540    Args:
1541        ad: android_device object to initiate connection on.
1542        network: A dictionary representing the network to connect to. The
1543                 dictionary must have the key "SSID".
1544        num_of_tries: An integer that is the number of times to try before
1545                      delaring failure.
1546    """
1547    asserts.assert_true(WifiEnums.SSID_KEY in network,
1548                        "Key '%s' must be present in network definition." %
1549                        WifiEnums.SSID_KEY)
1550    ad.droid.wifiStartTrackingStateChange()
1551    expected_ssid = network[WifiEnums.SSID_KEY]
1552    ad.droid.wifiRegisterNetworkRequestMatchCallback()
1553    # Wait for the platform to scan and return a list of networks
1554    # matching the request
1555    try:
1556        matched_network = None
1557        for _ in [0,  num_of_tries]:
1558            on_match_event = ad.ed.pop_event(
1559                wifi_constants.WIFI_NETWORK_REQUEST_MATCH_CB_ON_MATCH, 60)
1560            asserts.assert_true(on_match_event,
1561                                "Network request on match not received.")
1562            matched_scan_results = on_match_event["data"]
1563            ad.log.debug("Network request on match results %s",
1564                         matched_scan_results)
1565            matched_network = match_networks(
1566                {WifiEnums.SSID_KEY: network[WifiEnums.SSID_KEY]},
1567                matched_scan_results)
1568            if matched_network:
1569                break;
1570
1571        asserts.assert_true(
1572            matched_network, "Target network %s not found" % network)
1573
1574        ad.droid.wifiSendUserSelectionForNetworkRequestMatch(network)
1575        ad.log.info("Sent user selection for network request %s",
1576                    expected_ssid)
1577
1578        # Wait for the platform to connect to the network.
1579        on_available_event = ad.ed.pop_event(
1580            wifi_constants.WIFI_NETWORK_CB_ON_AVAILABLE, 60)
1581        asserts.assert_true(on_available_event,
1582                            "Network request on available not received.")
1583        connected_network = on_available_event["data"]
1584        ad.log.info("Connected to network %s", connected_network)
1585        asserts.assert_equal(connected_network[WifiEnums.SSID_KEY],
1586                             expected_ssid,
1587                             "Connected to the wrong network."
1588                             "Expected %s, but got %s." %
1589                             (network, connected_network))
1590    except Empty:
1591        asserts.fail("Failed to connect to %s" % expected_ssid)
1592    except Exception as error:
1593        ad.log.error("Failed to connect to %s with error %s",
1594                     (expected_ssid, error))
1595        raise signals.TestFailure("Failed to connect to %s network" % network)
1596    finally:
1597        ad.droid.wifiStopTrackingStateChange()
1598
1599
1600def wifi_passpoint_connect(ad, passpoint_network, num_of_tries=1,
1601                           assert_on_fail=True):
1602    """Connect an Android device to a wifi network.
1603
1604    Initiate connection to a wifi network, wait for the "connected" event, then
1605    confirm the connected ssid is the one requested.
1606
1607    This will directly fail a test if anything goes wrong.
1608
1609    Args:
1610        ad: android_device object to initiate connection on.
1611        passpoint_network: SSID of the Passpoint network to connect to.
1612        num_of_tries: An integer that is the number of times to try before
1613                      delaring failure. Default is 1.
1614        assert_on_fail: If True, error checks in this function will raise test
1615                        failure signals.
1616
1617    Returns:
1618        If assert_on_fail is False, function returns network id, if the connect was
1619        successful, False otherwise. If assert_on_fail is True, no return value.
1620    """
1621    _assert_on_fail_handler(_wifi_passpoint_connect, assert_on_fail, ad,
1622                            passpoint_network, num_of_tries = num_of_tries)
1623
1624
1625def _wifi_passpoint_connect(ad, passpoint_network, num_of_tries=1):
1626    """Connect an Android device to a wifi network.
1627
1628    Initiate connection to a wifi network, wait for the "connected" event, then
1629    confirm the connected ssid is the one requested.
1630
1631    This will directly fail a test if anything goes wrong.
1632
1633    Args:
1634        ad: android_device object to initiate connection on.
1635        passpoint_network: SSID of the Passpoint network to connect to.
1636        num_of_tries: An integer that is the number of times to try before
1637                      delaring failure. Default is 1.
1638    """
1639    ad.droid.wifiStartTrackingStateChange()
1640    expected_ssid = passpoint_network
1641    ad.log.info("Starting connection process to passpoint %s", expected_ssid)
1642
1643    try:
1644        connect_result = _wait_for_connect_event(
1645            ad, expected_ssid, num_of_tries)
1646        asserts.assert_true(connect_result,
1647                            "Failed to connect to WiFi passpoint network %s on"
1648                            " %s" % (expected_ssid, ad.serial))
1649        ad.log.info("Wi-Fi connection result: %s.", connect_result)
1650        actual_ssid = connect_result['data'][WifiEnums.SSID_KEY]
1651        asserts.assert_equal(actual_ssid, expected_ssid,
1652                             "Connected to the wrong network on %s." % ad.serial)
1653        ad.log.info("Connected to Wi-Fi passpoint network %s.", actual_ssid)
1654
1655        # Wait for data connection to stabilize.
1656        time.sleep(5)
1657
1658        internet = validate_connection(ad, DEFAULT_PING_ADDR)
1659        if not internet:
1660            raise signals.TestFailure("Failed to connect to internet on %s" %
1661                                      expected_ssid)
1662    except Exception as error:
1663        ad.log.error("Failed to connect to passpoint network %s with error %s",
1664                      expected_ssid, error)
1665        raise signals.TestFailure("Failed to connect to %s passpoint network" %
1666                                   expected_ssid)
1667
1668    finally:
1669        ad.droid.wifiStopTrackingStateChange()
1670
1671
1672def delete_passpoint(ad, fqdn):
1673    """Delete a required Passpoint configuration."""
1674    try:
1675        ad.droid.removePasspointConfig(fqdn)
1676        return True
1677    except Exception as error:
1678        ad.log.error("Failed to remove passpoint configuration with FQDN=%s "
1679                     "and error=%s" , fqdn, error)
1680        return False
1681
1682
1683def start_wifi_single_scan(ad, scan_setting):
1684    """Starts wifi single shot scan.
1685
1686    Args:
1687        ad: android_device object to initiate connection on.
1688        scan_setting: A dict representing the settings of the scan.
1689
1690    Returns:
1691        If scan was started successfully, event data of success event is returned.
1692    """
1693    idx = ad.droid.wifiScannerStartScan(scan_setting)
1694    event = ad.ed.pop_event("WifiScannerScan%sonSuccess" % idx, SHORT_TIMEOUT)
1695    ad.log.debug("Got event %s", event)
1696    return event['data']
1697
1698
1699def track_connection(ad, network_ssid, check_connection_count):
1700    """Track wifi connection to network changes for given number of counts
1701
1702    Args:
1703        ad: android_device object for forget network.
1704        network_ssid: network ssid to which connection would be tracked
1705        check_connection_count: Integer for maximum number network connection
1706                                check.
1707    Returns:
1708        True if connection to given network happen, else return False.
1709    """
1710    ad.droid.wifiStartTrackingStateChange()
1711    while check_connection_count > 0:
1712        connect_network = ad.ed.pop_event("WifiNetworkConnected", 120)
1713        ad.log.info("Connected to network %s", connect_network)
1714        if (WifiEnums.SSID_KEY in connect_network['data'] and
1715                connect_network['data'][WifiEnums.SSID_KEY] == network_ssid):
1716            return True
1717        check_connection_count -= 1
1718    ad.droid.wifiStopTrackingStateChange()
1719    return False
1720
1721
1722def get_scan_time_and_channels(wifi_chs, scan_setting, stime_channel):
1723    """Calculate the scan time required based on the band or channels in scan
1724    setting
1725
1726    Args:
1727        wifi_chs: Object of channels supported
1728        scan_setting: scan setting used for start scan
1729        stime_channel: scan time per channel
1730
1731    Returns:
1732        scan_time: time required for completing a scan
1733        scan_channels: channel used for scanning
1734    """
1735    scan_time = 0
1736    scan_channels = []
1737    if "band" in scan_setting and "channels" not in scan_setting:
1738        scan_channels = wifi_chs.band_to_freq(scan_setting["band"])
1739    elif "channels" in scan_setting and "band" not in scan_setting:
1740        scan_channels = scan_setting["channels"]
1741    scan_time = len(scan_channels) * stime_channel
1742    for channel in scan_channels:
1743        if channel in WifiEnums.DFS_5G_FREQUENCIES:
1744            scan_time += 132  #passive scan time on DFS
1745    return scan_time, scan_channels
1746
1747
1748def start_wifi_track_bssid(ad, track_setting):
1749    """Start tracking Bssid for the given settings.
1750
1751    Args:
1752      ad: android_device object.
1753      track_setting: Setting for which the bssid tracking should be started
1754
1755    Returns:
1756      If tracking started successfully, event data of success event is returned.
1757    """
1758    idx = ad.droid.wifiScannerStartTrackingBssids(
1759        track_setting["bssidInfos"], track_setting["apLostThreshold"])
1760    event = ad.ed.pop_event("WifiScannerBssid{}onSuccess".format(idx),
1761                            SHORT_TIMEOUT)
1762    return event['data']
1763
1764
1765def convert_pem_key_to_pkcs8(in_file, out_file):
1766    """Converts the key file generated by us to the format required by
1767    Android using openssl.
1768
1769    The input file must have the extension "pem". The output file must
1770    have the extension "der".
1771
1772    Args:
1773        in_file: The original key file.
1774        out_file: The full path to the converted key file, including
1775        filename.
1776    """
1777    asserts.assert_true(in_file.endswith(".pem"), "Input file has to be .pem.")
1778    asserts.assert_true(
1779        out_file.endswith(".der"), "Output file has to be .der.")
1780    cmd = ("openssl pkcs8 -inform PEM -in {} -outform DER -out {} -nocrypt"
1781           " -topk8").format(in_file, out_file)
1782    utils.exe_cmd(cmd)
1783
1784
1785def validate_connection(ad, ping_addr=DEFAULT_PING_ADDR, wait_time=2):
1786    """Validate internet connection by pinging the address provided.
1787
1788    Args:
1789        ad: android_device object.
1790        ping_addr: address on internet for pinging.
1791        wait_time: wait for some time before validating connection
1792
1793    Returns:
1794        ping output if successful, NULL otherwise.
1795    """
1796    # wait_time to allow for DHCP to complete.
1797    time.sleep(wait_time)
1798    ping = ad.droid.httpPing(ping_addr)
1799    ad.log.info("Http ping result: %s.", ping)
1800    return ping
1801
1802
1803#TODO(angli): This can only verify if an actual value is exactly the same.
1804# Would be nice to be able to verify an actual value is one of serveral.
1805def verify_wifi_connection_info(ad, expected_con):
1806    """Verifies that the information of the currently connected wifi network is
1807    as expected.
1808
1809    Args:
1810        expected_con: A dict representing expected key-value pairs for wifi
1811            connection. e.g. {"SSID": "test_wifi"}
1812    """
1813    current_con = ad.droid.wifiGetConnectionInfo()
1814    case_insensitive = ["BSSID", "supplicant_state"]
1815    ad.log.debug("Current connection: %s", current_con)
1816    for k, expected_v in expected_con.items():
1817        # Do not verify authentication related fields.
1818        if k == "password":
1819            continue
1820        msg = "Field %s does not exist in wifi connection info %s." % (
1821            k, current_con)
1822        if k not in current_con:
1823            raise signals.TestFailure(msg)
1824        actual_v = current_con[k]
1825        if k in case_insensitive:
1826            actual_v = actual_v.lower()
1827            expected_v = expected_v.lower()
1828        msg = "Expected %s to be %s, actual %s is %s." % (k, expected_v, k,
1829                                                          actual_v)
1830        if actual_v != expected_v:
1831            raise signals.TestFailure(msg)
1832
1833
1834def check_autoconnect_to_open_network(ad, conn_timeout=WIFI_CONNECTION_TIMEOUT_DEFAULT):
1835    """Connects to any open WiFI AP
1836     Args:
1837         timeout value in sec to wait for UE to connect to a WiFi AP
1838     Returns:
1839         True if UE connects to WiFi AP (supplicant_state = completed)
1840         False if UE fails to complete connection within WIFI_CONNECTION_TIMEOUT time.
1841    """
1842    if ad.droid.wifiCheckState():
1843        return True
1844    ad.droid.wifiToggleState()
1845    wifi_connection_state = None
1846    timeout = time.time() + conn_timeout
1847    while wifi_connection_state != "completed":
1848        wifi_connection_state = ad.droid.wifiGetConnectionInfo()[
1849            'supplicant_state']
1850        if time.time() > timeout:
1851            ad.log.warning("Failed to connect to WiFi AP")
1852            return False
1853    return True
1854
1855
1856def expand_enterprise_config_by_phase2(config):
1857    """Take an enterprise config and generate a list of configs, each with
1858    a different phase2 auth type.
1859
1860    Args:
1861        config: A dict representing enterprise config.
1862
1863    Returns
1864        A list of enterprise configs.
1865    """
1866    results = []
1867    phase2_types = WifiEnums.EapPhase2
1868    if config[WifiEnums.Enterprise.EAP] == WifiEnums.Eap.PEAP:
1869        # Skip unsupported phase2 types for PEAP.
1870        phase2_types = [WifiEnums.EapPhase2.GTC, WifiEnums.EapPhase2.MSCHAPV2]
1871    for phase2_type in phase2_types:
1872        # Skip a special case for passpoint TTLS.
1873        if (WifiEnums.Enterprise.FQDN in config and
1874                phase2_type == WifiEnums.EapPhase2.GTC):
1875            continue
1876        c = dict(config)
1877        c[WifiEnums.Enterprise.PHASE2] = phase2_type.value
1878        results.append(c)
1879    return results
1880
1881
1882def generate_eap_test_name(config, ad=None):
1883    """ Generates a test case name based on an EAP configuration.
1884
1885    Args:
1886        config: A dict representing an EAP credential.
1887        ad object: Redundant but required as the same param is passed
1888                   to test_func in run_generated_tests
1889
1890    Returns:
1891        A string representing the name of a generated EAP test case.
1892    """
1893    eap = WifiEnums.Eap
1894    eap_phase2 = WifiEnums.EapPhase2
1895    Ent = WifiEnums.Enterprise
1896    name = "test_connect-"
1897    eap_name = ""
1898    for e in eap:
1899        if e.value == config[Ent.EAP]:
1900            eap_name = e.name
1901            break
1902    if "peap0" in config[WifiEnums.SSID_KEY].lower():
1903        eap_name = "PEAP0"
1904    if "peap1" in config[WifiEnums.SSID_KEY].lower():
1905        eap_name = "PEAP1"
1906    name += eap_name
1907    if Ent.PHASE2 in config:
1908        for e in eap_phase2:
1909            if e.value == config[Ent.PHASE2]:
1910                name += "-{}".format(e.name)
1911                break
1912    return name
1913
1914
1915def group_attenuators(attenuators):
1916    """Groups a list of attenuators into attenuator groups for backward
1917    compatibility reasons.
1918
1919    Most legacy Wi-Fi setups have two attenuators each connected to a separate
1920    AP. The new Wi-Fi setup has four attenuators, each connected to one channel
1921    on an AP, so two of them are connected to one AP.
1922
1923    To make the existing scripts work in the new setup, when the script needs
1924    to attenuate one AP, it needs to set attenuation on both attenuators
1925    connected to the same AP.
1926
1927    This function groups attenuators properly so the scripts work in both
1928    legacy and new Wi-Fi setups.
1929
1930    Args:
1931        attenuators: A list of attenuator objects, either two or four in length.
1932
1933    Raises:
1934        signals.TestFailure is raised if the attenuator list does not have two
1935        or four objects.
1936    """
1937    attn0 = attenuator.AttenuatorGroup("AP0")
1938    attn1 = attenuator.AttenuatorGroup("AP1")
1939    # Legacy testbed setup has two attenuation channels.
1940    num_of_attns = len(attenuators)
1941    if num_of_attns == 2:
1942        attn0.add(attenuators[0])
1943        attn1.add(attenuators[1])
1944    elif num_of_attns == 4:
1945        attn0.add(attenuators[0])
1946        attn0.add(attenuators[1])
1947        attn1.add(attenuators[2])
1948        attn1.add(attenuators[3])
1949    else:
1950        asserts.fail(("Either two or four attenuators are required for this "
1951                      "test, but found %s") % num_of_attns)
1952    return [attn0, attn1]
1953
1954
1955def set_attns(attenuator, attn_val_name):
1956    """Sets attenuation values on attenuators used in this test.
1957
1958    Args:
1959        attenuator: The attenuator object.
1960        attn_val_name: Name of the attenuation value pair to use.
1961    """
1962    logging.info("Set attenuation values to %s", roaming_attn[attn_val_name])
1963    try:
1964        attenuator[0].set_atten(roaming_attn[attn_val_name][0])
1965        attenuator[1].set_atten(roaming_attn[attn_val_name][1])
1966        attenuator[2].set_atten(roaming_attn[attn_val_name][2])
1967        attenuator[3].set_atten(roaming_attn[attn_val_name][3])
1968    except:
1969        logging.exception("Failed to set attenuation values %s.",
1970                       attn_val_name)
1971        raise
1972
1973def set_attns_steps(attenuators, atten_val_name, steps=10, wait_time=12):
1974    """Set attenuation values on attenuators used in this test. It will change
1975    the attenuation values linearly from current value to target value step by
1976    step.
1977
1978    Args:
1979        attenuators: The list of attenuator objects that you want to change
1980                     their attenuation value.
1981        atten_val_name: Name of the attenuation value pair to use.
1982        steps: Number of attenuator changes to reach the target value.
1983        wait_time: Sleep time for each change of attenuator.
1984    """
1985    logging.info("Set attenuation values to %s in %d step(s)",
1986            roaming_attn[atten_val_name], steps)
1987    start_atten = [attenuator.get_atten() for attenuator in attenuators]
1988    target_atten = roaming_attn[atten_val_name]
1989    for current_step in range(steps):
1990        progress = (current_step + 1) / steps
1991        for i, attenuator in enumerate(attenuators):
1992            amount_since_start = (target_atten[i] - start_atten[i]) * progress
1993            attenuator.set_atten(round(start_atten[i] + amount_since_start))
1994        time.sleep(wait_time)
1995
1996
1997def trigger_roaming_and_validate(dut, attenuator, attn_val_name, expected_con):
1998    """Sets attenuators to trigger roaming and validate the DUT connected
1999    to the BSSID expected.
2000
2001    Args:
2002        attenuator: The attenuator object.
2003        attn_val_name: Name of the attenuation value pair to use.
2004        expected_con: The network information of the expected network.
2005    """
2006    expected_con = {
2007        WifiEnums.SSID_KEY: expected_con[WifiEnums.SSID_KEY],
2008        WifiEnums.BSSID_KEY: expected_con["bssid"],
2009    }
2010    set_attns(attenuator, attn_val_name)
2011    logging.info("Wait %ss for roaming to finish.", ROAMING_TIMEOUT)
2012    time.sleep(ROAMING_TIMEOUT)
2013
2014    verify_wifi_connection_info(dut, expected_con)
2015    expected_bssid = expected_con[WifiEnums.BSSID_KEY]
2016    logging.info("Roamed to %s successfully", expected_bssid)
2017    if not validate_connection(dut):
2018        raise signals.TestFailure("Fail to connect to internet on %s" %
2019                                      expected_bssid)
2020
2021def create_softap_config():
2022    """Create a softap config with random ssid and password."""
2023    ap_ssid = "softap_" + utils.rand_ascii_str(8)
2024    ap_password = utils.rand_ascii_str(8)
2025    logging.info("softap setup: %s %s", ap_ssid, ap_password)
2026    config = {
2027        WifiEnums.SSID_KEY: ap_ssid,
2028        WifiEnums.PWD_KEY: ap_password,
2029    }
2030    return config
2031
2032def start_softap_and_verify(ad, band):
2033    """Bring-up softap and verify AP mode and in scan results.
2034
2035    Args:
2036        band: The band to use for softAP.
2037
2038    Returns: dict, the softAP config.
2039
2040    """
2041    config = create_softap_config()
2042    start_wifi_tethering(ad.dut,
2043                         config[WifiEnums.SSID_KEY],
2044                         config[WifiEnums.PWD_KEY], band=band)
2045    asserts.assert_true(ad.dut.droid.wifiIsApEnabled(),
2046                         "SoftAp is not reported as running")
2047    start_wifi_connection_scan_and_ensure_network_found(ad.dut_client,
2048        config[WifiEnums.SSID_KEY])
2049    return config
2050
2051def wait_for_expected_number_of_softap_clients(ad, callbackId,
2052        expected_num_of_softap_clients):
2053    """Wait for the number of softap clients to be updated as expected.
2054    Args:
2055        callbackId: Id of the callback associated with registering.
2056        expected_num_of_softap_clients: expected number of softap clients.
2057    """
2058    eventStr = wifi_constants.SOFTAP_CALLBACK_EVENT + str(
2059            callbackId) + wifi_constants.SOFTAP_NUMBER_CLIENTS_CHANGED
2060    asserts.assert_equal(ad.ed.pop_event(eventStr,
2061            SHORT_TIMEOUT)['data'][wifi_constants.
2062            SOFTAP_NUMBER_CLIENTS_CALLBACK_KEY],
2063            expected_num_of_softap_clients,
2064            "Number of softap clients doesn't match with expected number")
2065
2066def wait_for_expected_softap_state(ad, callbackId, expected_softap_state):
2067    """Wait for the expected softap state change.
2068    Args:
2069        callbackId: Id of the callback associated with registering.
2070        expected_softap_state: The expected softap state.
2071    """
2072    eventStr = wifi_constants.SOFTAP_CALLBACK_EVENT + str(
2073            callbackId) + wifi_constants.SOFTAP_STATE_CHANGED
2074    asserts.assert_equal(ad.ed.pop_event(eventStr,
2075            SHORT_TIMEOUT)['data'][wifi_constants.
2076            SOFTAP_STATE_CHANGE_CALLBACK_KEY],
2077            expected_softap_state,
2078            "Softap state doesn't match with expected state")
2079
2080def get_current_number_of_softap_clients(ad, callbackId):
2081    """pop up all of softap client updated event from queue.
2082    Args:
2083        callbackId: Id of the callback associated with registering.
2084
2085    Returns:
2086        If exist aleast callback, returns last updated number_of_softap_clients.
2087        Returns None when no any match callback event in queue.
2088    """
2089    eventStr = wifi_constants.SOFTAP_CALLBACK_EVENT + str(
2090            callbackId) + wifi_constants.SOFTAP_NUMBER_CLIENTS_CHANGED
2091    events = ad.ed.pop_all(eventStr)
2092    for event in events:
2093        num_of_clients = event['data'][wifi_constants.
2094                SOFTAP_NUMBER_CLIENTS_CALLBACK_KEY]
2095    if len(events) == 0:
2096        return None
2097    return num_of_clients
2098
2099def get_ssrdumps(ad, test_name=""):
2100    """Pulls dumps in the ssrdump dir
2101    Args:
2102        ad: android device object.
2103        test_name: test case name
2104    """
2105    logs = ad.get_file_names("/data/vendor/ssrdump/")
2106    if logs:
2107        ad.log.info("Pulling ssrdumps %s", logs)
2108        log_path = os.path.join(ad.log_path, test_name,
2109                                "SSRDUMP_%s" % ad.serial)
2110        os.makedirs(log_path, exist_ok=True)
2111        ad.pull_files(logs, log_path)
2112    ad.adb.shell("find /data/vendor/ssrdump/ -type f -delete")
2113
2114def start_pcap(pcap, wifi_band, test_name):
2115    """Start packet capture in monitor mode.
2116
2117    Args:
2118        pcap: packet capture object
2119        wifi_band: '2g' or '5g' or 'dual'
2120        test_name: test name to be used for pcap file name
2121
2122    Returns:
2123        Dictionary with wifi band as key and the tuple
2124        (pcap Process object, log directory) as the value
2125    """
2126    log_dir = os.path.join(
2127        context.get_current_context().get_full_output_path(), 'PacketCapture')
2128    os.makedirs(log_dir, exist_ok=True)
2129    if wifi_band == 'dual':
2130        bands = [BAND_2G, BAND_5G]
2131    else:
2132        bands = [wifi_band]
2133    procs = {}
2134    for band in bands:
2135        proc = pcap.start_packet_capture(band, log_dir, test_name)
2136        procs[band] = (proc, os.path.join(log_dir, test_name))
2137    return procs
2138
2139
2140def stop_pcap(pcap, procs, test_status=None):
2141    """Stop packet capture in monitor mode.
2142
2143    Since, the pcap logs in monitor mode can be very large, we will
2144    delete them if they are not required. 'test_status' if True, will delete
2145    the pcap files. If False, we will keep them.
2146
2147    Args:
2148        pcap: packet capture object
2149        procs: dictionary returned by start_pcap
2150        test_status: status of the test case
2151    """
2152    for proc, fname in procs.values():
2153        pcap.stop_packet_capture(proc)
2154
2155    if test_status:
2156        shutil.rmtree(os.path.dirname(fname))
2157
2158def verify_mac_not_found_in_pcap(mac, packets):
2159    """Verify that a mac address is not found in the captured packets.
2160
2161    Args:
2162        mac: string representation of the mac address
2163        packets: packets obtained by rdpcap(pcap_fname)
2164    """
2165    for pkt in packets:
2166        logging.debug("Packet Summary = %s", pkt.summary())
2167        if mac in pkt.summary():
2168            asserts.fail("Caught Factory MAC: %s in packet sniffer."
2169                         "Packet = %s" % (mac, pkt.show()))
2170
2171def verify_mac_is_found_in_pcap(mac, packets):
2172    """Verify that a mac address is found in the captured packets.
2173
2174    Args:
2175        mac: string representation of the mac address
2176        packets: packets obtained by rdpcap(pcap_fname)
2177    """
2178    for pkt in packets:
2179        if mac in pkt.summary():
2180            return
2181    asserts.fail("Did not find MAC = %s in packet sniffer." % mac)
2182
2183def start_cnss_diags(ads):
2184    for ad in ads:
2185        start_cnss_diag(ad)
2186
2187
2188def start_cnss_diag(ad):
2189    """Start cnss_diag to record extra wifi logs
2190
2191    Args:
2192        ad: android device object.
2193    """
2194    if ad.model in wifi_constants.DEVICES_USING_LEGACY_PROP:
2195        prop = wifi_constants.LEGACY_CNSS_DIAG_PROP
2196    else:
2197        prop = wifi_constants.CNSS_DIAG_PROP
2198    if ad.adb.getprop(prop) != 'true':
2199        ad.adb.shell("find /data/vendor/wifi/cnss_diag/wlan_logs/ -type f -delete")
2200        ad.adb.shell("setprop %s true" % prop, ignore_status=True)
2201
2202
2203def stop_cnss_diags(ads):
2204    for ad in ads:
2205        stop_cnss_diag(ad)
2206
2207
2208def stop_cnss_diag(ad):
2209    """Stops cnss_diag
2210
2211    Args:
2212        ad: android device object.
2213    """
2214    if ad.model in wifi_constants.DEVICES_USING_LEGACY_PROP:
2215        prop = wifi_constants.LEGACY_CNSS_DIAG_PROP
2216    else:
2217        prop = wifi_constants.CNSS_DIAG_PROP
2218    ad.adb.shell("setprop %s false" % prop, ignore_status=True)
2219
2220
2221def get_cnss_diag_log(ad, test_name=""):
2222    """Pulls the cnss_diag logs in the wlan_logs dir
2223    Args:
2224        ad: android device object.
2225        test_name: test case name
2226    """
2227    logs = ad.get_file_names("/data/vendor/wifi/cnss_diag/wlan_logs/")
2228    if logs:
2229        ad.log.info("Pulling cnss_diag logs %s", logs)
2230        log_path = os.path.join(ad.device_log_path, "CNSS_DIAG_%s" % ad.serial)
2231        os.makedirs(log_path, exist_ok=True)
2232        ad.pull_files(logs, log_path)
2233
2234
2235LinkProbeResult = namedtuple('LinkProbeResult', (
2236    'is_success', 'stdout', 'elapsed_time', 'failure_reason'))
2237
2238
2239def send_link_probe(ad):
2240    """Sends a link probe to the currently connected AP, and returns whether the
2241    probe succeeded or not.
2242
2243    Args:
2244         ad: android device object
2245    Returns:
2246        LinkProbeResult namedtuple
2247    """
2248    stdout = ad.adb.shell('cmd wifi send-link-probe')
2249    asserts.assert_false('Error' in stdout or 'Exception' in stdout,
2250                         'Exception while sending link probe: ' + stdout)
2251
2252    is_success = False
2253    elapsed_time = None
2254    failure_reason = None
2255    if 'succeeded' in stdout:
2256        is_success = True
2257        elapsed_time = next(
2258            (int(token) for token in stdout.split() if token.isdigit()), None)
2259    elif 'failed with reason' in stdout:
2260        failure_reason = next(
2261            (int(token) for token in stdout.split() if token.isdigit()), None)
2262    else:
2263        asserts.fail('Unexpected link probe result: ' + stdout)
2264
2265    return LinkProbeResult(
2266        is_success=is_success, stdout=stdout,
2267        elapsed_time=elapsed_time, failure_reason=failure_reason)
2268
2269
2270def send_link_probes(ad, num_probes, delay_sec):
2271    """Sends a sequence of link probes to the currently connected AP, and
2272    returns whether the probes succeeded or not.
2273
2274    Args:
2275         ad: android device object
2276         num_probes: number of probes to perform
2277         delay_sec: delay time between probes, in seconds
2278    Returns:
2279        List[LinkProbeResult] one LinkProbeResults for each probe
2280    """
2281    logging.info('Sending link probes')
2282    results = []
2283    for _ in range(num_probes):
2284        # send_link_probe() will also fail the test if it sees an exception
2285        # in the stdout of the adb shell command
2286        result = send_link_probe(ad)
2287        logging.info('link probe results: ' + str(result))
2288        results.append(result)
2289        time.sleep(delay_sec)
2290
2291    return results
2292
2293
2294def ap_setup(test, index, ap, network, bandwidth=80, channel=6):
2295        """Set up the AP with provided network info.
2296
2297        Args:
2298            test: the calling test class object.
2299            index: int, index of the AP.
2300            ap: access_point object of the AP.
2301            network: dict with information of the network, including ssid,
2302                     password and bssid.
2303            bandwidth: the operation bandwidth for the AP, default 80MHz.
2304            channel: the channel number for the AP.
2305        Returns:
2306            brconfigs: the bridge interface configs
2307        """
2308        bss_settings = []
2309        ssid = network[WifiEnums.SSID_KEY]
2310        test.access_points[index].close()
2311        time.sleep(5)
2312
2313        # Configure AP as required.
2314        if "password" in network.keys():
2315            password = network["password"]
2316            security = hostapd_security.Security(
2317                security_mode="wpa", password=password)
2318        else:
2319            security = hostapd_security.Security(security_mode=None, password=None)
2320        config = hostapd_ap_preset.create_ap_preset(
2321                                                    channel=channel,
2322                                                    ssid=ssid,
2323                                                    security=security,
2324                                                    bss_settings=bss_settings,
2325                                                    vht_bandwidth=bandwidth,
2326                                                    profile_name='whirlwind',
2327                                                    iface_wlan_2g=ap.wlan_2g,
2328                                                    iface_wlan_5g=ap.wlan_5g)
2329        ap.start_ap(config)
2330        logging.info("AP started on channel {} with SSID {}".format(channel, ssid))
2331
2332
2333def turn_ap_off(test, AP):
2334    """Bring down hostapd on the Access Point.
2335    Args:
2336        test: The test class object.
2337        AP: int, indicating which AP to turn OFF.
2338    """
2339    hostapd_2g = test.access_points[AP-1]._aps['wlan0'].hostapd
2340    if hostapd_2g.is_alive():
2341        hostapd_2g.stop()
2342        logging.debug('Turned WLAN0 AP%d off' % AP)
2343    hostapd_5g = test.access_points[AP-1]._aps['wlan1'].hostapd
2344    if hostapd_5g.is_alive():
2345        hostapd_5g.stop()
2346        logging.debug('Turned WLAN1 AP%d off' % AP)
2347
2348
2349def turn_ap_on(test, AP):
2350    """Bring up hostapd on the Access Point.
2351    Args:
2352        test: The test class object.
2353        AP: int, indicating which AP to turn ON.
2354    """
2355    hostapd_2g = test.access_points[AP-1]._aps['wlan0'].hostapd
2356    if not hostapd_2g.is_alive():
2357        hostapd_2g.start(hostapd_2g.config)
2358        logging.debug('Turned WLAN0 AP%d on' % AP)
2359    hostapd_5g = test.access_points[AP-1]._aps['wlan1'].hostapd
2360    if not hostapd_5g.is_alive():
2361        hostapd_5g.start(hostapd_5g.config)
2362        logging.debug('Turned WLAN1 AP%d on' % AP)
2363
2364
2365def turn_location_off_and_scan_toggle_off(ad):
2366    """Turns off wifi location scans."""
2367    utils.set_location_service(ad, False)
2368    ad.droid.wifiScannerToggleAlwaysAvailable(False)
2369    msg = "Failed to turn off location service's scan."
2370    asserts.assert_true(not ad.droid.wifiScannerIsAlwaysAvailable(), msg)
2371