1#
2#   Copyright 2018 - The Android Open Source Project
3#
4#   Licensed under the Apache License, Version 2.0 (the "License");
5#   you may not use this file except in compliance with the License.
6#   You may obtain a copy of the License at
7#
8#       http://www.apache.org/licenses/LICENSE-2.0
9#
10#   Unless required by applicable law or agreed to in writing, software
11#   distributed under the License is distributed on an "AS IS" BASIS,
12#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13#   See the License for the specific language governing permissions and
14#   limitations under the License.
15
16from acts import asserts
17from acts.test_decorators import test_tracker_info
18from acts.test_utils.net.net_test_utils import start_tcpdump
19from acts.test_utils.net.net_test_utils import stop_tcpdump
20from acts.test_utils.wifi.WifiBaseTest import WifiBaseTest
21from acts.test_utils.wifi import wifi_test_utils as wutils
22
23from scapy.all import ICMPv6ND_RA
24from scapy.all import rdpcap
25from scapy.all import Scapy_Exception
26
27import acts.base_test
28import acts.test_utils.wifi.wifi_test_utils as wutils
29
30import copy
31import os
32import random
33import time
34
35WifiEnums = wutils.WifiEnums
36
37RA_SCRIPT = 'sendra.py'
38SCAPY = 'scapy-2.2.0.tar.gz'
39SCAPY_INSTALL_COMMAND = 'sudo python setup.py install'
40PROC_NET_SNMP6 = '/proc/net/snmp6'
41LIFETIME_FRACTION = 6
42LIFETIME = 180
43INTERVAL = 2
44
45
46class ApfCountersTest(WifiBaseTest):
47
48    def __init__(self, controllers):
49        WifiBaseTest.__init__(self, controllers)
50        self.tests = ("test_IPv6_RA_packets",
51                      "test_IPv6_RA_with_RTT", )
52
53    def setup_class(self):
54        self.dut = self.android_devices[0]
55        wutils.wifi_test_device_init(self.dut)
56        req_params = []
57        opt_param = ["reference_networks", ]
58
59        self.unpack_userparams(
60            req_param_names=req_params, opt_param_names=opt_param)
61
62        if "AccessPoint" in self.user_params:
63            self.legacy_configure_ap_and_start()
64
65        asserts.assert_true(
66            len(self.reference_networks) > 0,
67            "Need at least one reference network with psk.")
68        wutils.wifi_toggle_state(self.dut, True)
69
70        self.wpapsk_2g = self.reference_networks[0]["2g"]
71        self.wpapsk_5g = self.reference_networks[0]["5g"]
72
73        # install scapy
74        current_dir = os.path.dirname(os.path.realpath(__file__))
75        send_ra = os.path.join(current_dir, RA_SCRIPT)
76        send_scapy = os.path.join(current_dir, SCAPY)
77        self.access_points[0].install_scapy(send_scapy, send_ra)
78        self.tcpdump_pid = None
79
80    def setup_test(self):
81        if 'RTT' not in self.test_name:
82            self.tcpdump_pid = start_tcpdump(self.dut, self.test_name)
83
84    def teardown_test(self):
85        if 'RTT' not in self.test_name:
86            stop_tcpdump(self.dut, self.tcpdump_pid, self.test_name)
87
88    def on_fail(self, test_name, begin_time):
89        self.dut.take_bug_report(test_name, begin_time)
90        self.dut.cat_adb_log(test_name, begin_time)
91
92    def teardown_class(self):
93        if "AccessPoint" in self.user_params:
94            del self.user_params["reference_networks"]
95        self.access_points[0].cleanup_scapy()
96        wutils.reset_wifi(self.dut)
97
98    """ Helper methods """
99
100    def _get_icmp6intype134(self):
101        """ Get ICMP6 Type 134 packet count on the DUT
102
103        Returns:
104            Number of ICMP6 type 134 packets
105        """
106        cmd = "grep Icmp6InType134 %s || true" % PROC_NET_SNMP6
107        ra_count = self.dut.adb.shell(cmd)
108        if not ra_count:
109            return 0
110        ra_count = int(ra_count.split()[-1].rstrip())
111        self.dut.log.info("RA Count %s:" % ra_count)
112        return ra_count
113
114    def _get_rtt_list_from_tcpdump(self, pcap_file):
115        """ Get RTT of each RA pkt in a list
116
117        Args:
118            pcap_file: tcpdump file from the DUT
119
120        Returns:
121            List of RTT of 400 pkts
122        """
123        rtt = []
124        try:
125            packets = rdpcap(pcap_file)
126        except Scapy_Exception:
127            self.log.error("Not a valid pcap file")
128            return rtt
129
130        for pkt in packets:
131            if ICMPv6ND_RA in pkt:
132                rtt.append(int(pkt[ICMPv6ND_RA].retranstimer))
133        return rtt
134
135    """ Tests """
136
137    @test_tracker_info(uuid="bc8d3f27-582a-464a-be30-556f07b77ee1")
138    def test_IPv6_RA_packets(self):
139        """ Test if the device filters the IPv6 packets
140
141        Steps:
142          1. Send a RA packet to DUT. DUT should accept this
143          2. Send duplicate RA packets. The RA packets should be filtered
144             for the next 30 seconds (1/6th of RA lifetime)
145          3. The next RA packets should be accepted
146        """
147        # get mac address of the dut
148        ap = self.access_points[0]
149        wifi_network = copy.deepcopy(self.wpapsk_5g)
150        wifi_network['meteredOverride'] = 1
151        wutils.connect_to_wifi_network(self.dut, wifi_network)
152        mac_addr = self.dut.droid.wifiGetConnectionInfo()['mac_address']
153        self.log.info("mac_addr %s" % mac_addr)
154        time.sleep(30) # wait 30 sec before sending RAs
155
156        # get the current ra count
157        ra_count = self._get_icmp6intype134()
158
159        # Start scapy to send RA to the phone's MAC
160        ap.send_ra('wlan1', mac_addr, 0, 1)
161
162        # get the latest ra count
163        ra_count_latest = self._get_icmp6intype134()
164        asserts.assert_true(ra_count_latest == ra_count + 1,
165                            "Device dropped the first RA in sequence")
166
167        # Generate and send 'x' number of duplicate RAs, for 1/6th of the the
168        # lifetime of the original RA. Test assumes that the original RA has a
169        # lifetime of 180s. Hence, all RAs received within the next 30s of the
170        # original RA should be filtered.
171        ra_count = ra_count_latest
172        count = LIFETIME / LIFETIME_FRACTION / INTERVAL
173        ap.send_ra('wlan1', mac_addr, interval=INTERVAL, count=count)
174
175        # Fail test if at least 90% of RAs were not dropped.
176        ra_count_latest = self._get_icmp6intype134()
177        pkt_loss = count - (ra_count_latest - ra_count)
178        percentage_loss = float(pkt_loss) / count * 100
179        asserts.assert_true(percentage_loss >= 90, "Device did not filter "
180                            "duplicate RAs correctly. %d Percent of duplicate"
181                            " RAs were accepted" % (100 - percentage_loss))
182
183        # Any new RA after this should be accepted.
184        ap.send_ra('wlan1', mac_addr, interval=INTERVAL, count=1)
185        ra_count_latest = self._get_icmp6intype134()
186        asserts.assert_true(ra_count_latest == ra_count + 1,
187                            "Device did not accept new RA after 1/6th time "
188                            "interval. Device dropped a valid RA in sequence.")
189
190    @test_tracker_info(uuid="d2a0aff0-048c-475f-9bba-d90d8d9ebae3")
191    def test_IPv6_RA_with_RTT(self):
192        """Test if the device filters IPv6 RA packets with different re-trans time
193
194        Steps:
195          1. Get the current RA count
196          2. Send 400 packets with different re-trans time
197          3. Verify that RA count increased by 400
198          4. Verify internet connectivity
199        """
200        pkt_num = 400
201        rtt_list = random.sample(range(10, 10000), pkt_num)
202        self.log.info("RTT List: %s" % rtt_list)
203
204        # get mac address of the dut
205        ap = self.access_points[0]
206        wutils.connect_to_wifi_network(self.dut, self.wpapsk_5g)
207        mac_addr = self.dut.droid.wifiGetConnectionInfo()['mac_address']
208        self.log.info("mac_addr %s" % mac_addr)
209        time.sleep(30) # wait 30 sec before sending RAs
210
211        # get the current ra count
212        ra_count = self._get_icmp6intype134()
213
214        # start tcpdump on the device
215        tcpdump_pid = start_tcpdump(self.dut, self.test_name)
216
217        # send RA with differnt re-trans time
218        for rtt in rtt_list:
219            ap.send_ra('wlan1', mac_addr, 0, 1, rtt=rtt)
220
221        # stop tcpdump and pull file
222        time.sleep(60)
223        pcap_file = stop_tcpdump(self.dut, tcpdump_pid, self.test_name)
224
225        # get the new RA count
226        new_ra_count = self._get_icmp6intype134()
227        asserts.assert_true(new_ra_count >= ra_count + pkt_num,
228                            "Device did not accept all RAs")
229
230        # verify the RA pkts RTT match
231        tcpdump_rtt_list = self._get_rtt_list_from_tcpdump(pcap_file)
232        asserts.assert_true(set(rtt_list).issubset(set(tcpdump_rtt_list)),
233                            "RA packets didn't match with tcpdump")
234
235        # verify if internet connectivity works after sending RA packets
236        wutils.validate_connection(self.dut)
237