1#!/usr/bin/env python3 2# 3# Copyright 2018 Google, Inc. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16import logging 17 18from acts.controllers import adb 19from acts import asserts 20from acts.controllers.adb_lib.error import AdbError 21from acts.logger import epoch_to_log_line_timestamp 22from acts.utils import get_current_epoch_time 23from acts.logger import normalize_log_line_timestamp 24from acts.utils import start_standing_subprocess 25from acts.utils import stop_standing_subprocess 26from acts.test_utils.net import connectivity_const as cconst 27from acts.test_utils.tel.tel_test_utils import get_operator_name 28from acts.test_utils.tel.tel_data_utils import wait_for_cell_data_connection 29from acts.test_utils.tel.tel_test_utils import verify_http_connection 30from acts.test_utils.wifi import wifi_test_utils as wutils 31from scapy.all import get_if_list 32 33import os 34import re 35import time 36import urllib.request 37 38VPN_CONST = cconst.VpnProfile 39VPN_TYPE = cconst.VpnProfileType 40VPN_PARAMS = cconst.VpnReqParams 41TCPDUMP_PATH = "/data/local/tmp/" 42USB_CHARGE_MODE = "svc usb setFunctions" 43USB_TETHERING_MODE = "svc usb setFunctions rndis" 44DEVICE_IP_ADDRESS = "ip address" 45 46 47def verify_lte_data_and_tethering_supported(ad): 48 """Verify if LTE data is enabled and tethering supported""" 49 wutils.wifi_toggle_state(ad, False) 50 ad.droid.telephonyToggleDataConnection(True) 51 wait_for_cell_data_connection(ad.log, ad, True) 52 asserts.assert_true( 53 verify_http_connection(ad.log, ad), 54 "HTTP verification failed on cell data connection") 55 asserts.assert_true( 56 ad.droid.connectivityIsTetheringSupported(), 57 "Tethering is not supported for the provider") 58 wutils.wifi_toggle_state(ad, True) 59 60 61def set_chrome_browser_permissions(ad): 62 """Set chrome browser start with no-first-run verification. 63 Give permission to read from and write to storage 64 """ 65 commands = ["pm grant com.android.chrome " 66 "android.permission.READ_EXTERNAL_STORAGE", 67 "pm grant com.android.chrome " 68 "android.permission.WRITE_EXTERNAL_STORAGE", 69 "rm /data/local/chrome-command-line", 70 "am set-debug-app --persistent com.android.chrome", 71 'echo "chrome --no-default-browser-check --no-first-run ' 72 '--disable-fre" > /data/local/tmp/chrome-command-line'] 73 for cmd in commands: 74 try: 75 ad.adb.shell(cmd) 76 except AdbError: 77 logging.warning("adb command %s failed on %s" % (cmd, ad.serial)) 78 79 80def verify_ping_to_vpn_ip(ad, vpn_ping_addr): 81 """ Verify if IP behind VPN server is pingable. 82 Ping should pass, if VPN is connected. 83 Ping should fail, if VPN is disconnected. 84 85 Args: 86 ad: android device object 87 """ 88 ping_result = None 89 pkt_loss = "100% packet loss" 90 try: 91 ping_result = ad.adb.shell("ping -c 3 -W 2 %s" % vpn_ping_addr) 92 except AdbError: 93 pass 94 return ping_result and pkt_loss not in ping_result 95 96 97def legacy_vpn_connection_test_logic(ad, vpn_profile, vpn_ping_addr): 98 """ Test logic for each legacy VPN connection 99 100 Steps: 101 1. Generate profile for the VPN type 102 2. Establish connection to the server 103 3. Verify that connection is established using LegacyVpnInfo 104 4. Verify the connection by pinging the IP behind VPN 105 5. Stop the VPN connection 106 6. Check the connection status 107 7. Verify that ping to IP behind VPN fails 108 109 Args: 110 1. ad: Android device object 111 2. VpnProfileType (1 of the 6 types supported by Android) 112 """ 113 # Wait for sometime so that VPN server flushes all interfaces and 114 # connections after graceful termination 115 time.sleep(10) 116 117 ad.adb.shell("ip xfrm state flush") 118 ad.log.info("Connecting to: %s", vpn_profile) 119 ad.droid.vpnStartLegacyVpn(vpn_profile) 120 time.sleep(cconst.VPN_TIMEOUT) 121 122 connected_vpn_info = ad.droid.vpnGetLegacyVpnInfo() 123 asserts.assert_equal(connected_vpn_info["state"], 124 cconst.VPN_STATE_CONNECTED, 125 "Unable to establish VPN connection for %s" 126 % vpn_profile) 127 128 ping_result = verify_ping_to_vpn_ip(ad, vpn_ping_addr) 129 ip_xfrm_state = ad.adb.shell("ip xfrm state") 130 match_obj = re.search(r'hmac(.*)', "%s" % ip_xfrm_state) 131 if match_obj: 132 ip_xfrm_state = format(match_obj.group(0)).split() 133 ad.log.info("HMAC for ESP is %s " % ip_xfrm_state[0]) 134 135 ad.droid.vpnStopLegacyVpn() 136 asserts.assert_true(ping_result, 137 "Ping to the internal IP failed. " 138 "Expected to pass as VPN is connected") 139 140 connected_vpn_info = ad.droid.vpnGetLegacyVpnInfo() 141 asserts.assert_true(not connected_vpn_info, 142 "Unable to terminate VPN connection for %s" 143 % vpn_profile) 144 145 146def download_load_certs(ad, vpn_params, vpn_type, vpn_server_addr, 147 ipsec_server_type, log_path): 148 """ Download the certificates from VPN server and push to sdcard of DUT 149 150 Args: 151 ad: android device object 152 vpn_params: vpn params from config file 153 vpn_type: 1 of the 6 VPN types 154 vpn_server_addr: server addr to connect to 155 ipsec_server_type: ipsec version - strongswan or openswan 156 log_path: log path to download cert 157 158 Returns: 159 Client cert file name on DUT's sdcard 160 """ 161 url = "http://%s%s%s" % (vpn_server_addr, 162 vpn_params['cert_path_vpnserver'], 163 vpn_params['client_pkcs_file_name']) 164 local_cert_name = "%s_%s_%s" % (vpn_type.name, 165 ipsec_server_type, 166 vpn_params['client_pkcs_file_name']) 167 168 local_file_path = os.path.join(log_path, local_cert_name) 169 try: 170 ret = urllib.request.urlopen(url) 171 with open(local_file_path, "wb") as f: 172 f.write(ret.read()) 173 except Exception: 174 asserts.fail("Unable to download certificate from the server") 175 176 ad.adb.push("%s sdcard/" % local_file_path) 177 return local_cert_name 178 179 180def generate_legacy_vpn_profile(ad, 181 vpn_params, 182 vpn_type, 183 vpn_server_addr, 184 ipsec_server_type, 185 log_path): 186 """ Generate legacy VPN profile for a VPN 187 188 Args: 189 ad: android device object 190 vpn_params: vpn params from config file 191 vpn_type: 1 of the 6 VPN types 192 vpn_server_addr: server addr to connect to 193 ipsec_server_type: ipsec version - strongswan or openswan 194 log_path: log path to download cert 195 196 Returns: 197 Vpn profile 198 """ 199 vpn_profile = {VPN_CONST.USER: vpn_params['vpn_username'], 200 VPN_CONST.PWD: vpn_params['vpn_password'], 201 VPN_CONST.TYPE: vpn_type.value, 202 VPN_CONST.SERVER: vpn_server_addr, } 203 vpn_profile[VPN_CONST.NAME] = "test_%s_%s" % (vpn_type.name, 204 ipsec_server_type) 205 if vpn_type.name == "PPTP": 206 vpn_profile[VPN_CONST.NAME] = "test_%s" % vpn_type.name 207 208 psk_set = set(["L2TP_IPSEC_PSK", "IPSEC_XAUTH_PSK"]) 209 rsa_set = set(["L2TP_IPSEC_RSA", "IPSEC_XAUTH_RSA", "IPSEC_HYBRID_RSA"]) 210 211 if vpn_type.name in psk_set: 212 vpn_profile[VPN_CONST.IPSEC_SECRET] = vpn_params['psk_secret'] 213 elif vpn_type.name in rsa_set: 214 cert_name = download_load_certs(ad, 215 vpn_params, 216 vpn_type, 217 vpn_server_addr, 218 ipsec_server_type, 219 log_path) 220 vpn_profile[VPN_CONST.IPSEC_USER_CERT] = cert_name.split('.')[0] 221 vpn_profile[VPN_CONST.IPSEC_CA_CERT] = cert_name.split('.')[0] 222 ad.droid.installCertificate(vpn_profile, cert_name, 223 vpn_params['cert_password']) 224 else: 225 vpn_profile[VPN_CONST.MPPE] = "mppe" 226 227 return vpn_profile 228 229 230def start_tcpdump(ad, test_name): 231 """Start tcpdump on all interfaces 232 233 Args: 234 ad: android device object. 235 test_name: tcpdump file name will have this 236 """ 237 ad.log.info("Starting tcpdump on all interfaces") 238 try: 239 ad.adb.shell("killall -9 tcpdump") 240 except AdbError: 241 ad.log.warn("Killing existing tcpdump processes failed") 242 out = ad.adb.shell("ls -l %s" % TCPDUMP_PATH) 243 if "No such file" in out or not out: 244 ad.adb.shell("mkdir %s" % TCPDUMP_PATH) 245 else: 246 ad.adb.shell("rm -rf %s/*" % TCPDUMP_PATH, ignore_status=True) 247 248 begin_time = epoch_to_log_line_timestamp(get_current_epoch_time()) 249 begin_time = normalize_log_line_timestamp(begin_time) 250 251 file_name = "%s/tcpdump_%s_%s.pcap" % (TCPDUMP_PATH, ad.serial, test_name) 252 ad.log.info("tcpdump file is %s", file_name) 253 cmd = "adb -s {} shell tcpdump -i any -s0 -w {}".format(ad.serial, 254 file_name) 255 try: 256 return start_standing_subprocess(cmd, 5) 257 except Exception: 258 ad.log.exception('Could not start standing process %s' % repr(cmd)) 259 260 return None 261 262def stop_tcpdump(ad, 263 proc, 264 test_name, 265 adb_pull_timeout=adb.DEFAULT_ADB_PULL_TIMEOUT): 266 """Stops tcpdump on any iface 267 Pulls the tcpdump file in the tcpdump dir 268 269 Args: 270 ad: android device object. 271 proc: need to know which pid to stop 272 test_name: test name to save the tcpdump file 273 adb_pull_timeout: timeout for adb_pull 274 275 Returns: 276 log_path of the tcpdump file 277 """ 278 ad.log.info("Stopping and pulling tcpdump if any") 279 if proc is None: 280 return None 281 try: 282 stop_standing_subprocess(proc) 283 except Exception as e: 284 ad.log.warning(e) 285 log_path = os.path.join(ad.log_path, test_name) 286 os.makedirs(log_path, exist_ok=True) 287 ad.adb.pull("%s/. %s" % (TCPDUMP_PATH, log_path), timeout=adb_pull_timeout) 288 ad.adb.shell("rm -rf %s/*" % TCPDUMP_PATH, ignore_status=True) 289 file_name = "tcpdump_%s_%s.pcap" % (ad.serial, test_name) 290 return "%s/%s" % (log_path, file_name) 291 292def is_ipaddress_ipv6(ip_address): 293 """Verify if the given string is a valid IPv6 address. 294 295 Args: 296 ip_address: string containing the IP address 297 298 Returns: 299 True: if valid ipv6 address 300 False: if not 301 """ 302 try: 303 socket.inet_pton(socket.AF_INET6, ip_address) 304 return True 305 except socket.error: 306 return False 307 308def carrier_supports_ipv6(dut): 309 """Verify if carrier supports ipv6. 310 311 Args: 312 dut: Android device that is being checked 313 314 Returns: 315 True: if carrier supports ipv6 316 False: if not 317 """ 318 319 carrier_supports_ipv6 = ["vzw", "tmo", "Far EasTone", "Chunghwa Telecom"] 320 operator = get_operator_name("log", dut) 321 return operator in carrier_supports_ipv6 322 323def supports_ipv6_tethering(self, dut): 324 """ Check if provider supports IPv6 tethering. 325 326 Returns: 327 True: if provider supports IPv6 tethering 328 False: if not 329 """ 330 carrier_supports_tethering = ["vzw", "tmo", "Far EasTone", "Chunghwa Telecom"] 331 operator = get_operator_name(self.log, dut) 332 return operator in carrier_supports_tethering 333 334 335def start_usb_tethering(ad): 336 """Start USB tethering. 337 338 Args: 339 ad: android device object 340 """ 341 # TODO: test USB tethering by #startTethering API - b/149116235 342 ad.log.info("Starting USB Tethering") 343 ad.stop_services() 344 ad.adb.shell(USB_TETHERING_MODE, ignore_status=True) 345 ad.adb.wait_for_device() 346 ad.start_services() 347 if "rndis" not in ad.adb.shell(DEVICE_IP_ADDRESS): 348 raise signals.TestFailure("Unable to enable USB tethering.") 349 350 351def stop_usb_tethering(ad): 352 """Stop USB tethering. 353 354 Args: 355 ad: android device object 356 """ 357 ad.log.info("Stopping USB Tethering") 358 ad.stop_services() 359 ad.adb.shell(USB_CHARGE_MODE) 360 ad.adb.wait_for_device() 361 ad.start_services() 362 363 364def wait_for_new_iface(old_ifaces): 365 """Wait for the new interface to come up. 366 367 Args: 368 old_ifaces: list of old interfaces 369 """ 370 old_set = set(old_ifaces) 371 # Try 10 times to find a new interface with a 1s sleep every time 372 # (equivalent to a 9s timeout) 373 for i in range(0, 10): 374 new_ifaces = set(get_if_list()) - old_set 375 asserts.assert_true(len(new_ifaces) < 2, 376 "Too many new interfaces after turning on " 377 "tethering") 378 if len(new_ifaces) == 1: 379 return new_ifaces.pop() 380 time.sleep(1) 381 asserts.fail("Timeout waiting for tethering interface on host") 382 383