1# 2# Copyright 2018 - The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16from acts import asserts 17from acts.test_decorators import test_tracker_info 18from acts.test_utils.net.net_test_utils import start_tcpdump 19from acts.test_utils.net.net_test_utils import stop_tcpdump 20from acts.test_utils.wifi.WifiBaseTest import WifiBaseTest 21from acts.test_utils.wifi import wifi_test_utils as wutils 22 23from scapy.all import ICMPv6ND_RA 24from scapy.all import rdpcap 25from scapy.all import Scapy_Exception 26 27import acts.base_test 28import acts.test_utils.wifi.wifi_test_utils as wutils 29 30import copy 31import os 32import random 33import time 34 35WifiEnums = wutils.WifiEnums 36 37RA_SCRIPT = 'sendra.py' 38SCAPY = 'scapy-2.2.0.tar.gz' 39SCAPY_INSTALL_COMMAND = 'sudo python setup.py install' 40PROC_NET_SNMP6 = '/proc/net/snmp6' 41LIFETIME_FRACTION = 6 42LIFETIME = 180 43INTERVAL = 2 44 45 46class ApfCountersTest(WifiBaseTest): 47 48 def __init__(self, controllers): 49 WifiBaseTest.__init__(self, controllers) 50 self.tests = ("test_IPv6_RA_packets", 51 "test_IPv6_RA_with_RTT", ) 52 53 def setup_class(self): 54 self.dut = self.android_devices[0] 55 wutils.wifi_test_device_init(self.dut) 56 req_params = [] 57 opt_param = ["reference_networks", ] 58 59 self.unpack_userparams( 60 req_param_names=req_params, opt_param_names=opt_param) 61 62 if "AccessPoint" in self.user_params: 63 self.legacy_configure_ap_and_start() 64 65 asserts.assert_true( 66 len(self.reference_networks) > 0, 67 "Need at least one reference network with psk.") 68 wutils.wifi_toggle_state(self.dut, True) 69 70 self.wpapsk_2g = self.reference_networks[0]["2g"] 71 self.wpapsk_5g = self.reference_networks[0]["5g"] 72 73 # install scapy 74 current_dir = os.path.dirname(os.path.realpath(__file__)) 75 send_ra = os.path.join(current_dir, RA_SCRIPT) 76 send_scapy = os.path.join(current_dir, SCAPY) 77 self.access_points[0].install_scapy(send_scapy, send_ra) 78 self.tcpdump_pid = None 79 80 def setup_test(self): 81 if 'RTT' not in self.test_name: 82 self.tcpdump_pid = start_tcpdump(self.dut, self.test_name) 83 84 def teardown_test(self): 85 if 'RTT' not in self.test_name: 86 stop_tcpdump(self.dut, self.tcpdump_pid, self.test_name) 87 88 def on_fail(self, test_name, begin_time): 89 self.dut.take_bug_report(test_name, begin_time) 90 self.dut.cat_adb_log(test_name, begin_time) 91 92 def teardown_class(self): 93 if "AccessPoint" in self.user_params: 94 del self.user_params["reference_networks"] 95 self.access_points[0].cleanup_scapy() 96 wutils.reset_wifi(self.dut) 97 98 """ Helper methods """ 99 100 def _get_icmp6intype134(self): 101 """ Get ICMP6 Type 134 packet count on the DUT 102 103 Returns: 104 Number of ICMP6 type 134 packets 105 """ 106 cmd = "grep Icmp6InType134 %s || true" % PROC_NET_SNMP6 107 ra_count = self.dut.adb.shell(cmd) 108 if not ra_count: 109 return 0 110 ra_count = int(ra_count.split()[-1].rstrip()) 111 self.dut.log.info("RA Count %s:" % ra_count) 112 return ra_count 113 114 def _get_rtt_list_from_tcpdump(self, pcap_file): 115 """ Get RTT of each RA pkt in a list 116 117 Args: 118 pcap_file: tcpdump file from the DUT 119 120 Returns: 121 List of RTT of 400 pkts 122 """ 123 rtt = [] 124 try: 125 packets = rdpcap(pcap_file) 126 except Scapy_Exception: 127 self.log.error("Not a valid pcap file") 128 return rtt 129 130 for pkt in packets: 131 if ICMPv6ND_RA in pkt: 132 rtt.append(int(pkt[ICMPv6ND_RA].retranstimer)) 133 return rtt 134 135 """ Tests """ 136 137 @test_tracker_info(uuid="bc8d3f27-582a-464a-be30-556f07b77ee1") 138 def test_IPv6_RA_packets(self): 139 """ Test if the device filters the IPv6 packets 140 141 Steps: 142 1. Send a RA packet to DUT. DUT should accept this 143 2. Send duplicate RA packets. The RA packets should be filtered 144 for the next 30 seconds (1/6th of RA lifetime) 145 3. The next RA packets should be accepted 146 """ 147 # get mac address of the dut 148 ap = self.access_points[0] 149 wifi_network = copy.deepcopy(self.wpapsk_5g) 150 wifi_network['meteredOverride'] = 1 151 wutils.connect_to_wifi_network(self.dut, wifi_network) 152 mac_addr = self.dut.droid.wifiGetConnectionInfo()['mac_address'] 153 self.log.info("mac_addr %s" % mac_addr) 154 time.sleep(30) # wait 30 sec before sending RAs 155 156 # get the current ra count 157 ra_count = self._get_icmp6intype134() 158 159 # Start scapy to send RA to the phone's MAC 160 ap.send_ra('wlan1', mac_addr, 0, 1) 161 162 # get the latest ra count 163 ra_count_latest = self._get_icmp6intype134() 164 asserts.assert_true(ra_count_latest == ra_count + 1, 165 "Device dropped the first RA in sequence") 166 167 # Generate and send 'x' number of duplicate RAs, for 1/6th of the the 168 # lifetime of the original RA. Test assumes that the original RA has a 169 # lifetime of 180s. Hence, all RAs received within the next 30s of the 170 # original RA should be filtered. 171 ra_count = ra_count_latest 172 count = LIFETIME / LIFETIME_FRACTION / INTERVAL 173 ap.send_ra('wlan1', mac_addr, interval=INTERVAL, count=count) 174 175 # Fail test if at least 90% of RAs were not dropped. 176 ra_count_latest = self._get_icmp6intype134() 177 pkt_loss = count - (ra_count_latest - ra_count) 178 percentage_loss = float(pkt_loss) / count * 100 179 asserts.assert_true(percentage_loss >= 90, "Device did not filter " 180 "duplicate RAs correctly. %d Percent of duplicate" 181 " RAs were accepted" % (100 - percentage_loss)) 182 183 # Any new RA after this should be accepted. 184 ap.send_ra('wlan1', mac_addr, interval=INTERVAL, count=1) 185 ra_count_latest = self._get_icmp6intype134() 186 asserts.assert_true(ra_count_latest == ra_count + 1, 187 "Device did not accept new RA after 1/6th time " 188 "interval. Device dropped a valid RA in sequence.") 189 190 @test_tracker_info(uuid="d2a0aff0-048c-475f-9bba-d90d8d9ebae3") 191 def test_IPv6_RA_with_RTT(self): 192 """Test if the device filters IPv6 RA packets with different re-trans time 193 194 Steps: 195 1. Get the current RA count 196 2. Send 400 packets with different re-trans time 197 3. Verify that RA count increased by 400 198 4. Verify internet connectivity 199 """ 200 pkt_num = 400 201 rtt_list = random.sample(range(10, 10000), pkt_num) 202 self.log.info("RTT List: %s" % rtt_list) 203 204 # get mac address of the dut 205 ap = self.access_points[0] 206 wutils.connect_to_wifi_network(self.dut, self.wpapsk_5g) 207 mac_addr = self.dut.droid.wifiGetConnectionInfo()['mac_address'] 208 self.log.info("mac_addr %s" % mac_addr) 209 time.sleep(30) # wait 30 sec before sending RAs 210 211 # get the current ra count 212 ra_count = self._get_icmp6intype134() 213 214 # start tcpdump on the device 215 tcpdump_pid = start_tcpdump(self.dut, self.test_name) 216 217 # send RA with differnt re-trans time 218 for rtt in rtt_list: 219 ap.send_ra('wlan1', mac_addr, 0, 1, rtt=rtt) 220 221 # stop tcpdump and pull file 222 time.sleep(60) 223 pcap_file = stop_tcpdump(self.dut, tcpdump_pid, self.test_name) 224 225 # get the new RA count 226 new_ra_count = self._get_icmp6intype134() 227 asserts.assert_true(new_ra_count >= ra_count + pkt_num, 228 "Device did not accept all RAs") 229 230 # verify the RA pkts RTT match 231 tcpdump_rtt_list = self._get_rtt_list_from_tcpdump(pcap_file) 232 asserts.assert_true(set(rtt_list).issubset(set(tcpdump_rtt_list)), 233 "RA packets didn't match with tcpdump") 234 235 # verify if internet connectivity works after sending RA packets 236 wutils.validate_connection(self.dut) 237