1#!/usr/bin/env python3
2#
3#   Copyright 2018 Google, Inc.
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16import logging
17
18from acts.controllers import adb
19from acts import asserts
20from acts.controllers.adb_lib.error import AdbError
21from acts.logger import epoch_to_log_line_timestamp
22from acts.utils import get_current_epoch_time
23from acts.logger import normalize_log_line_timestamp
24from acts.utils import start_standing_subprocess
25from acts.utils import stop_standing_subprocess
26from acts.test_utils.net import connectivity_const as cconst
27from acts.test_utils.tel.tel_test_utils import get_operator_name
28from acts.test_utils.tel.tel_data_utils import wait_for_cell_data_connection
29from acts.test_utils.tel.tel_test_utils import verify_http_connection
30from acts.test_utils.wifi import wifi_test_utils as wutils
31from scapy.all import get_if_list
32
33import os
34import re
35import time
36import urllib.request
37
38VPN_CONST = cconst.VpnProfile
39VPN_TYPE = cconst.VpnProfileType
40VPN_PARAMS = cconst.VpnReqParams
41TCPDUMP_PATH = "/data/local/tmp/"
42USB_CHARGE_MODE = "svc usb setFunctions"
43USB_TETHERING_MODE = "svc usb setFunctions rndis"
44DEVICE_IP_ADDRESS = "ip address"
45
46
47def verify_lte_data_and_tethering_supported(ad):
48    """Verify if LTE data is enabled and tethering supported"""
49    wutils.wifi_toggle_state(ad, False)
50    ad.droid.telephonyToggleDataConnection(True)
51    wait_for_cell_data_connection(ad.log, ad, True)
52    asserts.assert_true(
53        verify_http_connection(ad.log, ad),
54        "HTTP verification failed on cell data connection")
55    asserts.assert_true(
56        ad.droid.connectivityIsTetheringSupported(),
57        "Tethering is not supported for the provider")
58    wutils.wifi_toggle_state(ad, True)
59
60
61def set_chrome_browser_permissions(ad):
62    """Set chrome browser start with no-first-run verification.
63    Give permission to read from and write to storage
64    """
65    commands = ["pm grant com.android.chrome "
66                "android.permission.READ_EXTERNAL_STORAGE",
67                "pm grant com.android.chrome "
68                "android.permission.WRITE_EXTERNAL_STORAGE",
69                "rm /data/local/chrome-command-line",
70                "am set-debug-app --persistent com.android.chrome",
71                'echo "chrome --no-default-browser-check --no-first-run '
72                '--disable-fre" > /data/local/tmp/chrome-command-line']
73    for cmd in commands:
74        try:
75            ad.adb.shell(cmd)
76        except AdbError:
77            logging.warning("adb command %s failed on %s" % (cmd, ad.serial))
78
79
80def verify_ping_to_vpn_ip(ad, vpn_ping_addr):
81    """ Verify if IP behind VPN server is pingable.
82    Ping should pass, if VPN is connected.
83    Ping should fail, if VPN is disconnected.
84
85    Args:
86      ad: android device object
87    """
88    ping_result = None
89    pkt_loss = "100% packet loss"
90    try:
91        ping_result = ad.adb.shell("ping -c 3 -W 2 %s" % vpn_ping_addr)
92    except AdbError:
93        pass
94    return ping_result and pkt_loss not in ping_result
95
96
97def legacy_vpn_connection_test_logic(ad, vpn_profile, vpn_ping_addr):
98    """ Test logic for each legacy VPN connection
99
100    Steps:
101      1. Generate profile for the VPN type
102      2. Establish connection to the server
103      3. Verify that connection is established using LegacyVpnInfo
104      4. Verify the connection by pinging the IP behind VPN
105      5. Stop the VPN connection
106      6. Check the connection status
107      7. Verify that ping to IP behind VPN fails
108
109    Args:
110      1. ad: Android device object
111      2. VpnProfileType (1 of the 6 types supported by Android)
112    """
113    # Wait for sometime so that VPN server flushes all interfaces and
114    # connections after graceful termination
115    time.sleep(10)
116
117    ad.adb.shell("ip xfrm state flush")
118    ad.log.info("Connecting to: %s", vpn_profile)
119    ad.droid.vpnStartLegacyVpn(vpn_profile)
120    time.sleep(cconst.VPN_TIMEOUT)
121
122    connected_vpn_info = ad.droid.vpnGetLegacyVpnInfo()
123    asserts.assert_equal(connected_vpn_info["state"],
124                         cconst.VPN_STATE_CONNECTED,
125                         "Unable to establish VPN connection for %s"
126                         % vpn_profile)
127
128    ping_result = verify_ping_to_vpn_ip(ad, vpn_ping_addr)
129    ip_xfrm_state = ad.adb.shell("ip xfrm state")
130    match_obj = re.search(r'hmac(.*)', "%s" % ip_xfrm_state)
131    if match_obj:
132        ip_xfrm_state = format(match_obj.group(0)).split()
133        ad.log.info("HMAC for ESP is %s " % ip_xfrm_state[0])
134
135    ad.droid.vpnStopLegacyVpn()
136    asserts.assert_true(ping_result,
137                        "Ping to the internal IP failed. "
138                        "Expected to pass as VPN is connected")
139
140    connected_vpn_info = ad.droid.vpnGetLegacyVpnInfo()
141    asserts.assert_true(not connected_vpn_info,
142                        "Unable to terminate VPN connection for %s"
143                        % vpn_profile)
144
145
146def download_load_certs(ad, vpn_params, vpn_type, vpn_server_addr,
147                        ipsec_server_type, log_path):
148    """ Download the certificates from VPN server and push to sdcard of DUT
149
150    Args:
151      ad: android device object
152      vpn_params: vpn params from config file
153      vpn_type: 1 of the 6 VPN types
154      vpn_server_addr: server addr to connect to
155      ipsec_server_type: ipsec version - strongswan or openswan
156      log_path: log path to download cert
157
158    Returns:
159      Client cert file name on DUT's sdcard
160    """
161    url = "http://%s%s%s" % (vpn_server_addr,
162                             vpn_params['cert_path_vpnserver'],
163                             vpn_params['client_pkcs_file_name'])
164    local_cert_name = "%s_%s_%s" % (vpn_type.name,
165                                    ipsec_server_type,
166                                    vpn_params['client_pkcs_file_name'])
167
168    local_file_path = os.path.join(log_path, local_cert_name)
169    try:
170        ret = urllib.request.urlopen(url)
171        with open(local_file_path, "wb") as f:
172            f.write(ret.read())
173    except Exception:
174        asserts.fail("Unable to download certificate from the server")
175
176    ad.adb.push("%s sdcard/" % local_file_path)
177    return local_cert_name
178
179
180def generate_legacy_vpn_profile(ad,
181                                vpn_params,
182                                vpn_type,
183                                vpn_server_addr,
184                                ipsec_server_type,
185                                log_path):
186    """ Generate legacy VPN profile for a VPN
187
188    Args:
189      ad: android device object
190      vpn_params: vpn params from config file
191      vpn_type: 1 of the 6 VPN types
192      vpn_server_addr: server addr to connect to
193      ipsec_server_type: ipsec version - strongswan or openswan
194      log_path: log path to download cert
195
196    Returns:
197      Vpn profile
198    """
199    vpn_profile = {VPN_CONST.USER: vpn_params['vpn_username'],
200                   VPN_CONST.PWD: vpn_params['vpn_password'],
201                   VPN_CONST.TYPE: vpn_type.value,
202                   VPN_CONST.SERVER: vpn_server_addr, }
203    vpn_profile[VPN_CONST.NAME] = "test_%s_%s" % (vpn_type.name,
204                                                  ipsec_server_type)
205    if vpn_type.name == "PPTP":
206        vpn_profile[VPN_CONST.NAME] = "test_%s" % vpn_type.name
207
208    psk_set = set(["L2TP_IPSEC_PSK", "IPSEC_XAUTH_PSK"])
209    rsa_set = set(["L2TP_IPSEC_RSA", "IPSEC_XAUTH_RSA", "IPSEC_HYBRID_RSA"])
210
211    if vpn_type.name in psk_set:
212        vpn_profile[VPN_CONST.IPSEC_SECRET] = vpn_params['psk_secret']
213    elif vpn_type.name in rsa_set:
214        cert_name = download_load_certs(ad,
215                                        vpn_params,
216                                        vpn_type,
217                                        vpn_server_addr,
218                                        ipsec_server_type,
219                                        log_path)
220        vpn_profile[VPN_CONST.IPSEC_USER_CERT] = cert_name.split('.')[0]
221        vpn_profile[VPN_CONST.IPSEC_CA_CERT] = cert_name.split('.')[0]
222        ad.droid.installCertificate(vpn_profile, cert_name,
223                                    vpn_params['cert_password'])
224    else:
225        vpn_profile[VPN_CONST.MPPE] = "mppe"
226
227    return vpn_profile
228
229
230def start_tcpdump(ad, test_name):
231    """Start tcpdump on all interfaces
232
233    Args:
234        ad: android device object.
235        test_name: tcpdump file name will have this
236    """
237    ad.log.info("Starting tcpdump on all interfaces")
238    try:
239        ad.adb.shell("killall -9 tcpdump")
240    except AdbError:
241        ad.log.warn("Killing existing tcpdump processes failed")
242    out = ad.adb.shell("ls -l %s" % TCPDUMP_PATH)
243    if "No such file" in out or not out:
244        ad.adb.shell("mkdir %s" % TCPDUMP_PATH)
245    else:
246        ad.adb.shell("rm -rf %s/*" % TCPDUMP_PATH, ignore_status=True)
247
248    begin_time = epoch_to_log_line_timestamp(get_current_epoch_time())
249    begin_time = normalize_log_line_timestamp(begin_time)
250
251    file_name = "%s/tcpdump_%s_%s.pcap" % (TCPDUMP_PATH, ad.serial, test_name)
252    ad.log.info("tcpdump file is %s", file_name)
253    cmd = "adb -s {} shell tcpdump -i any -s0 -w {}".format(ad.serial,
254                                                            file_name)
255    try:
256        return start_standing_subprocess(cmd, 5)
257    except Exception:
258        ad.log.exception('Could not start standing process %s' % repr(cmd))
259
260    return None
261
262def stop_tcpdump(ad,
263                 proc,
264                 test_name,
265                 adb_pull_timeout=adb.DEFAULT_ADB_PULL_TIMEOUT):
266    """Stops tcpdump on any iface
267       Pulls the tcpdump file in the tcpdump dir
268
269    Args:
270        ad: android device object.
271        proc: need to know which pid to stop
272        test_name: test name to save the tcpdump file
273        adb_pull_timeout: timeout for adb_pull
274
275    Returns:
276      log_path of the tcpdump file
277    """
278    ad.log.info("Stopping and pulling tcpdump if any")
279    if proc is None:
280        return None
281    try:
282        stop_standing_subprocess(proc)
283    except Exception as e:
284        ad.log.warning(e)
285    log_path = os.path.join(ad.log_path, test_name)
286    os.makedirs(log_path, exist_ok=True)
287    ad.adb.pull("%s/. %s" % (TCPDUMP_PATH, log_path), timeout=adb_pull_timeout)
288    ad.adb.shell("rm -rf %s/*" % TCPDUMP_PATH, ignore_status=True)
289    file_name = "tcpdump_%s_%s.pcap" % (ad.serial, test_name)
290    return "%s/%s" % (log_path, file_name)
291
292def is_ipaddress_ipv6(ip_address):
293    """Verify if the given string is a valid IPv6 address.
294
295    Args:
296        ip_address: string containing the IP address
297
298    Returns:
299        True: if valid ipv6 address
300        False: if not
301    """
302    try:
303        socket.inet_pton(socket.AF_INET6, ip_address)
304        return True
305    except socket.error:
306        return False
307
308def carrier_supports_ipv6(dut):
309    """Verify if carrier supports ipv6.
310
311    Args:
312        dut: Android device that is being checked
313
314    Returns:
315        True: if carrier supports ipv6
316        False: if not
317    """
318
319    carrier_supports_ipv6 = ["vzw", "tmo", "Far EasTone", "Chunghwa Telecom"]
320    operator = get_operator_name("log", dut)
321    return operator in carrier_supports_ipv6
322
323def supports_ipv6_tethering(self, dut):
324    """ Check if provider supports IPv6 tethering.
325
326    Returns:
327        True: if provider supports IPv6 tethering
328        False: if not
329    """
330    carrier_supports_tethering = ["vzw", "tmo", "Far EasTone", "Chunghwa Telecom"]
331    operator = get_operator_name(self.log, dut)
332    return operator in carrier_supports_tethering
333
334
335def start_usb_tethering(ad):
336    """Start USB tethering.
337
338    Args:
339        ad: android device object
340    """
341    # TODO: test USB tethering by #startTethering API - b/149116235
342    ad.log.info("Starting USB Tethering")
343    ad.stop_services()
344    ad.adb.shell(USB_TETHERING_MODE, ignore_status=True)
345    ad.adb.wait_for_device()
346    ad.start_services()
347    if "rndis" not in ad.adb.shell(DEVICE_IP_ADDRESS):
348        raise signals.TestFailure("Unable to enable USB tethering.")
349
350
351def stop_usb_tethering(ad):
352    """Stop USB tethering.
353
354    Args:
355        ad: android device object
356    """
357    ad.log.info("Stopping USB Tethering")
358    ad.stop_services()
359    ad.adb.shell(USB_CHARGE_MODE)
360    ad.adb.wait_for_device()
361    ad.start_services()
362
363
364def wait_for_new_iface(old_ifaces):
365    """Wait for the new interface to come up.
366
367    Args:
368        old_ifaces: list of old interfaces
369    """
370    old_set = set(old_ifaces)
371    # Try 10 times to find a new interface with a 1s sleep every time
372    # (equivalent to a 9s timeout)
373    for i in range(0, 10):
374        new_ifaces = set(get_if_list()) - old_set
375        asserts.assert_true(len(new_ifaces) < 2,
376                            "Too many new interfaces after turning on "
377                            "tethering")
378        if len(new_ifaces) == 1:
379            return new_ifaces.pop()
380        time.sleep(1)
381    asserts.fail("Timeout waiting for tethering interface on host")
382
383