1#!/usr/bin/python3.4
2#
3#   Copyright 2019 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17from acts import asserts
18from acts.test_decorators import test_tracker_info
19from acts.test_utils.net import connectivity_const as cconsts
20from acts.test_utils.wifi.aware import aware_const as aconsts
21from acts.controllers.ap_lib.hostapd_constants import BAND_2G
22from acts.controllers.ap_lib.hostapd_constants import BAND_5G
23from acts.test_utils.wifi.aware import aware_test_utils as autils
24from acts.test_utils.wifi import wifi_test_utils as wutils
25from acts.test_utils.wifi.aware.AwareBaseTest import AwareBaseTest
26from acts.test_utils.wifi.WifiBaseTest import WifiBaseTest
27from scapy.all import *
28
29
30class MacRandomNoLeakageTest(AwareBaseTest, WifiBaseTest):
31    """Set of tests for Wi-Fi Aware MAC address randomization of NMI (NAN
32    management interface) and NDI (NAN data interface)."""
33
34    SERVICE_NAME = "GoogleTestServiceXYZ"
35    ping_msg = 'PING'
36
37    AWARE_DEFAULT_CHANNEL_5_BAND = 149
38    AWARE_DEFAULT_CHANNEL_24_BAND = 6
39
40    ENCR_TYPE_OPEN = 0
41    ENCR_TYPE_PASSPHRASE = 1
42    ENCR_TYPE_PMK = 2
43
44    PASSPHRASE = "This is some random passphrase - very very secure!!"
45    PMK = "ODU0YjE3YzdmNDJiNWI4NTQ2NDJjNDI3M2VkZTQyZGU="
46
47    def setup_class(self):
48        super().setup_class()
49
50        asserts.assert_true(hasattr(self, 'packet_capture'),
51                            "Needs packet_capture attribute to support sniffing.")
52        self.configure_packet_capture(channel_5g=self.AWARE_DEFAULT_CHANNEL_5_BAND,
53                                      channel_2g=self.AWARE_DEFAULT_CHANNEL_24_BAND)
54
55    def setup_test(self):
56        WifiBaseTest.setup_test(self)
57        AwareBaseTest.setup_test(self)
58
59    def teardown_test(self):
60        WifiBaseTest.teardown_test(self)
61        AwareBaseTest.teardown_test(self)
62
63    def verify_mac_no_leakage(self, pcap_procs, factory_mac_addresses, mac_addresses):
64        # Get 2G and 5G pcaps
65        pcap_fname = '%s_%s.pcap' % (pcap_procs[BAND_5G][1], BAND_5G.upper())
66        pcap_5g = rdpcap(pcap_fname)
67
68        pcap_fname = '%s_%s.pcap' % (pcap_procs[BAND_2G][1], BAND_2G.upper())
69        pcap_2g = rdpcap(pcap_fname)
70        pcaps = pcap_5g + pcap_2g
71
72        # Verify factory MAC is not leaked in both 2G and 5G pcaps
73        for mac in factory_mac_addresses:
74            wutils.verify_mac_not_found_in_pcap(mac, pcaps)
75
76        # Verify random MACs are being used and in pcaps
77        for mac in mac_addresses:
78            wutils.verify_mac_is_found_in_pcap(mac, pcaps)
79
80    def transfer_mac_format(self, mac):
81        """add ':' to mac String, and transfer to lower case
82
83    Args:
84        mac: String of mac without ':'
85        @return: Lower case String of mac like "xx:xx:xx:xx:xx:xx"
86    """
87        return re.sub(r"(?<=\w)(?=(?:\w\w)+$)", ":", mac.lower())
88
89    def start_aware(self, dut, is_publish):
90        """Start Aware attach, then start Publish/Subscribe based on role
91
92     Args:
93         dut: Aware device
94         is_publish: True for Publisher, False for subscriber
95         @:return: dict with Aware discovery session info
96    """
97        aware_id = dut.droid.wifiAwareAttach(True)
98        autils.wait_for_event(dut, aconsts.EVENT_CB_ON_ATTACHED)
99        event = autils.wait_for_event(dut, aconsts.EVENT_CB_ON_IDENTITY_CHANGED)
100        mac = self.transfer_mac_format(event["data"]["mac"])
101        dut.log.info("NMI=%s", mac)
102
103        if is_publish:
104            config = autils.create_discovery_config(self.SERVICE_NAME,
105                                                    aconsts.PUBLISH_TYPE_UNSOLICITED)
106            disc_id = dut.droid.wifiAwarePublish(aware_id, config)
107            autils.wait_for_event(dut, aconsts.SESSION_CB_ON_PUBLISH_STARTED)
108        else:
109            config = autils.create_discovery_config(self.SERVICE_NAME,
110                                                    aconsts.SUBSCRIBE_TYPE_PASSIVE)
111            disc_id = dut.droid.wifiAwareSubscribe(aware_id, config)
112            autils.wait_for_event(dut, aconsts.SESSION_CB_ON_SUBSCRIBE_STARTED)
113        aware_session = {"awareId": aware_id, "discId": disc_id, "mac": mac}
114        return aware_session
115
116    def create_date_path(self, p_dut, pub_session, s_dut, sub_session, sec_type):
117        """Create NDP based on the security type(open, PMK, PASSPHRASE), run socket connect
118
119    Args:
120        p_dut: Publish device
121        p_disc_id: Publisher discovery id
122        peer_id_on_pub: peer id on publisher
123        s_dut: Subscribe device
124        s_disc_id: Subscriber discovery id
125        peer_id_on_sub: peer id on subscriber
126        sec_type: NDP security type(open, PMK or PASSPHRASE)
127        @:return: dict with NDP info
128    """
129
130        passphrase = None
131        pmk = None
132
133        if sec_type == self.ENCR_TYPE_PASSPHRASE:
134            passphrase = self.PASSPHRASE
135        if sec_type == self.ENCR_TYPE_PMK:
136            pmk = self.PMK
137
138        p_req_key = autils.request_network(
139            p_dut,
140            p_dut.droid.wifiAwareCreateNetworkSpecifier(pub_session["discId"],
141                                                        None,
142                                                        passphrase, pmk))
143        s_req_key = autils.request_network(
144            s_dut,
145            s_dut.droid.wifiAwareCreateNetworkSpecifier(sub_session["discId"],
146                                                        sub_session["peerId"],
147                                                        passphrase, pmk))
148
149        p_net_event_nc = autils.wait_for_event_with_keys(
150            p_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT,
151            (cconsts.NETWORK_CB_KEY_EVENT,
152             cconsts.NETWORK_CB_CAPABILITIES_CHANGED),
153            (cconsts.NETWORK_CB_KEY_ID, p_req_key))
154        s_net_event_nc = autils.wait_for_event_with_keys(
155            s_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT,
156            (cconsts.NETWORK_CB_KEY_EVENT,
157             cconsts.NETWORK_CB_CAPABILITIES_CHANGED),
158            (cconsts.NETWORK_CB_KEY_ID, s_req_key))
159        p_net_event_lp = autils.wait_for_event_with_keys(
160            p_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT,
161            (cconsts.NETWORK_CB_KEY_EVENT,
162             cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED),
163            (cconsts.NETWORK_CB_KEY_ID, p_req_key))
164        s_net_event_lp = autils.wait_for_event_with_keys(
165            s_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT,
166            (cconsts.NETWORK_CB_KEY_EVENT,
167             cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED),
168            (cconsts.NETWORK_CB_KEY_ID, s_req_key))
169
170        p_aware_if = p_net_event_lp["data"][cconsts.NETWORK_CB_KEY_INTERFACE_NAME]
171        s_aware_if = s_net_event_lp["data"][cconsts.NETWORK_CB_KEY_INTERFACE_NAME]
172        p_if_mac = self.transfer_mac_format(autils.get_mac_addr(p_dut, p_aware_if))
173        p_dut.log.info("NDI %s=%s", p_aware_if, p_if_mac)
174        s_if_mac = self.transfer_mac_format(autils.get_mac_addr(s_dut, s_aware_if))
175        s_dut.log.info("NDI %s=%s", s_aware_if, s_if_mac)
176
177        s_ipv6 = p_net_event_nc["data"][aconsts.NET_CAP_IPV6]
178        p_ipv6 = s_net_event_nc["data"][aconsts.NET_CAP_IPV6]
179        asserts.assert_true(
180            autils.verify_socket_connect(p_dut, s_dut, p_ipv6, s_ipv6, 0),
181            "Failed socket link with Pub as Server")
182        asserts.assert_true(
183            autils.verify_socket_connect(s_dut, p_dut, s_ipv6, p_ipv6, 0),
184            "Failed socket link with Sub as Server")
185
186        ndp_info = {"pubReqKey": p_req_key, "pubIfMac": p_if_mac,
187                    "subReqKey": s_req_key, "subIfMac": s_if_mac}
188        return ndp_info
189
190    @test_tracker_info(uuid="c9c66873-a8e0-4830-8baa-ada03223bcef")
191    def test_ib_multi_data_path_mac_random_test(self):
192        """Verify there is no factory MAC Address leakage during the Aware discovery, NDP creation,
193        socket setup and IP service connection."""
194
195        p_dut = self.android_devices[0]
196        s_dut = self.android_devices[1]
197        mac_addresses = []
198        factory_mac_addresses = []
199        sec_types = [self.ENCR_TYPE_PMK, self.ENCR_TYPE_PASSPHRASE]
200
201        self.log.info("Starting packet capture")
202        pcap_procs = wutils.start_pcap(
203            self.packet_capture, 'dual', self.test_name)
204
205        factory_mac_1 = p_dut.droid.wifigetFactorymacAddresses()[0]
206        p_dut.log.info("Factory Address: %s", factory_mac_1)
207        factory_mac_2 = s_dut.droid.wifigetFactorymacAddresses()[0]
208        s_dut.log.info("Factory Address: %s", factory_mac_2)
209        factory_mac_addresses.append(factory_mac_1)
210        factory_mac_addresses.append(factory_mac_2)
211
212        # Start Aware and exchange messages
213        publish_session = self.start_aware(p_dut, True)
214        subscribe_session = self.start_aware(s_dut, False)
215        mac_addresses.append(publish_session["mac"])
216        mac_addresses.append(subscribe_session["mac"])
217        discovery_event = autils.wait_for_event(s_dut, aconsts.SESSION_CB_ON_SERVICE_DISCOVERED)
218        subscribe_session["peerId"] = discovery_event['data'][aconsts.SESSION_CB_KEY_PEER_ID]
219
220        msg_id = self.get_next_msg_id()
221        s_dut.droid.wifiAwareSendMessage(subscribe_session["discId"], subscribe_session["peerId"],
222                                         msg_id, self.ping_msg, aconsts.MAX_TX_RETRIES)
223        autils.wait_for_event(s_dut, aconsts.SESSION_CB_ON_MESSAGE_SENT)
224        pub_rx_msg_event = autils.wait_for_event(p_dut, aconsts.SESSION_CB_ON_MESSAGE_RECEIVED)
225        publish_session["peerId"] = pub_rx_msg_event['data'][aconsts.SESSION_CB_KEY_PEER_ID]
226
227        msg_id = self.get_next_msg_id()
228        p_dut.droid.wifiAwareSendMessage(publish_session["discId"], publish_session["peerId"],
229                                         msg_id, self.ping_msg, aconsts.MAX_TX_RETRIES)
230        autils.wait_for_event(p_dut, aconsts.SESSION_CB_ON_MESSAGE_SENT)
231        autils.wait_for_event(s_dut, aconsts.SESSION_CB_ON_MESSAGE_RECEIVED)
232
233        # Create Aware NDP
234        p_req_keys = []
235        s_req_keys = []
236        for sec in sec_types:
237            ndp_info = self.create_date_path(p_dut, publish_session, s_dut, subscribe_session, sec)
238            p_req_keys.append(ndp_info["pubReqKey"])
239            s_req_keys.append(ndp_info["subReqKey"])
240            mac_addresses.append(ndp_info["pubIfMac"])
241            mac_addresses.append(ndp_info["subIfMac"])
242
243        # clean-up
244        for p_req_key in p_req_keys:
245            p_dut.droid.connectivityUnregisterNetworkCallback(p_req_key)
246        for s_req_key in s_req_keys:
247            s_dut.droid.connectivityUnregisterNetworkCallback(s_req_key)
248        p_dut.droid.wifiAwareDestroyAll()
249        s_dut.droid.wifiAwareDestroyAll()
250
251        self.log.info("Stopping packet capture")
252        wutils.stop_pcap(self.packet_capture, pcap_procs, False)
253
254        self.verify_mac_no_leakage(pcap_procs, factory_mac_addresses, mac_addresses)
255