1#!/usr/bin/python3.4
2#
3#   Copyright 2017 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import queue
18import time
19
20from acts import asserts
21from acts.test_decorators import test_tracker_info
22from acts.test_utils.wifi.aware import aware_const as aconsts
23from acts.test_utils.wifi.aware import aware_test_utils as autils
24from acts.test_utils.wifi.aware.AwareBaseTest import AwareBaseTest
25from acts.test_utils.wifi.rtt import rtt_const as rconsts
26from acts.test_utils.wifi.rtt import rtt_test_utils as rutils
27from acts.test_utils.wifi.rtt.RttBaseTest import RttBaseTest
28
29
30class RangeAwareTest(AwareBaseTest, RttBaseTest):
31    """Test class for RTT ranging to Wi-Fi Aware peers"""
32    SERVICE_NAME = "GoogleTestServiceXY"
33
34    # Number of RTT iterations
35    NUM_ITER = 10
36
37    # Time gap (in seconds) between iterations
38    TIME_BETWEEN_ITERATIONS = 0
39
40    # Time gap (in seconds) when switching between Initiator and Responder
41    TIME_BETWEEN_ROLES = 4
42
43    def setup_test(self):
44        """Manual setup here due to multiple inheritance: explicitly execute the
45        setup method from both parents.
46        """
47        AwareBaseTest.setup_test(self)
48        RttBaseTest.setup_test(self)
49
50    def teardown_test(self):
51        """Manual teardown here due to multiple inheritance: explicitly execute the
52        teardown method from both parents.
53        """
54        AwareBaseTest.teardown_test(self)
55        RttBaseTest.teardown_test(self)
56
57    #############################################################################
58
59    def run_rtt_discovery(self, init_dut, resp_mac=None, resp_peer_id=None):
60        """Perform single RTT measurement, using Aware, from the Initiator DUT to
61        a Responder. The RTT Responder can be specified using its MAC address
62        (obtained using out- of-band discovery) or its Peer ID (using Aware
63        discovery).
64
65        Args:
66              init_dut: RTT Initiator device
67              resp_mac: MAC address of the RTT Responder device
68              resp_peer_id: Peer ID of the RTT Responder device
69        """
70        asserts.assert_true(
71            resp_mac is not None or resp_peer_id is not None,
72            "One of the Responder specifications (MAC or Peer ID)"
73            " must be provided!")
74        if resp_mac is not None:
75            id = init_dut.droid.wifiRttStartRangingToAwarePeerMac(resp_mac)
76        else:
77            id = init_dut.droid.wifiRttStartRangingToAwarePeerId(resp_peer_id)
78        event_name = rutils.decorate_event(rconsts.EVENT_CB_RANGING_ON_RESULT,
79                                           id)
80        try:
81            event = init_dut.ed.pop_event(event_name, rutils.EVENT_TIMEOUT)
82            result = event["data"][rconsts.EVENT_CB_RANGING_KEY_RESULTS][0]
83            if resp_mac is not None:
84                rutils.validate_aware_mac_result(result, resp_mac, "DUT")
85            else:
86                rutils.validate_aware_peer_id_result(result, resp_peer_id,
87                                                     "DUT")
88            return result
89        except queue.Empty:
90            self.log.warning("Timed-out waiting for %s", event_name)
91            return None
92
93    def run_rtt_ib_discovery_set(self, do_both_directions, iter_count,
94                                 time_between_iterations, time_between_roles):
95        """Perform a set of RTT measurements, using in-band (Aware) discovery.
96
97        Args:
98              do_both_directions: False - perform all measurements in one direction,
99                                  True - perform 2 measurements one in both directions.
100              iter_count: Number of measurements to perform.
101              time_between_iterations: Number of seconds to wait between iterations.
102              time_between_roles: Number of seconds to wait when switching between
103                                  Initiator and Responder roles (only matters if
104                                  do_both_directions=True).
105
106        Returns: a list of the events containing the RTT results (or None for a
107        failed measurement). If both directions are tested then returns a list of
108        2 elements: one set for each direction.
109        """
110        p_dut = self.android_devices[0]
111        s_dut = self.android_devices[1]
112
113        (p_id, s_id, p_disc_id, s_disc_id, peer_id_on_sub,
114         peer_id_on_pub) = autils.create_discovery_pair(
115             p_dut,
116             s_dut,
117             p_config=autils.add_ranging_to_pub(
118                 autils.create_discovery_config(
119                     self.SERVICE_NAME, aconsts.PUBLISH_TYPE_UNSOLICITED),
120                 True),
121             s_config=autils.add_ranging_to_pub(
122                 autils.create_discovery_config(
123                     self.SERVICE_NAME, aconsts.SUBSCRIBE_TYPE_PASSIVE), True),
124             device_startup_offset=self.device_startup_offset,
125             msg_id=self.get_next_msg_id())
126
127        resultsPS = []
128        resultsSP = []
129        for i in range(iter_count):
130            if i != 0 and time_between_iterations != 0:
131                time.sleep(time_between_iterations)
132
133            # perform RTT from pub -> sub
134            resultsPS.append(
135                self.run_rtt_discovery(p_dut, resp_peer_id=peer_id_on_pub))
136
137            if do_both_directions:
138                if time_between_roles != 0:
139                    time.sleep(time_between_roles)
140
141                # perform RTT from sub -> pub
142                resultsSP.append(
143                    self.run_rtt_discovery(s_dut, resp_peer_id=peer_id_on_sub))
144
145        return resultsPS if not do_both_directions else [resultsPS, resultsSP]
146
147    def run_rtt_oob_discovery_set(self, do_both_directions, iter_count,
148                                  time_between_iterations, time_between_roles):
149        """Perform a set of RTT measurements, using out-of-band discovery.
150
151        Args:
152              do_both_directions: False - perform all measurements in one direction,
153                                  True - perform 2 measurements one in both directions.
154              iter_count: Number of measurements to perform.
155              time_between_iterations: Number of seconds to wait between iterations.
156              time_between_roles: Number of seconds to wait when switching between
157                                  Initiator and Responder roles (only matters if
158                                  do_both_directions=True).
159              enable_ranging: True to enable Ranging, False to disable.
160
161        Returns: a list of the events containing the RTT results (or None for a
162        failed measurement). If both directions are tested then returns a list of
163        2 elements: one set for each direction.
164        """
165        dut0 = self.android_devices[0]
166        dut1 = self.android_devices[1]
167
168        id0, mac0 = autils.attach_with_identity(dut0)
169        id1, mac1 = autils.attach_with_identity(dut1)
170
171        # wait for for devices to synchronize with each other - there are no other
172        # mechanisms to make sure this happens for OOB discovery (except retrying
173        # to execute the data-path request)
174        time.sleep(autils.WAIT_FOR_CLUSTER)
175
176        # start publisher(s) on the Responder(s) with ranging enabled
177        p_config = autils.add_ranging_to_pub(
178            autils.create_discovery_config(self.SERVICE_NAME,
179                                           aconsts.PUBLISH_TYPE_UNSOLICITED),
180            enable_ranging=True)
181        dut1.droid.wifiAwarePublish(id1, p_config)
182        autils.wait_for_event(dut1, aconsts.SESSION_CB_ON_PUBLISH_STARTED)
183        if do_both_directions:
184            dut0.droid.wifiAwarePublish(id0, p_config)
185            autils.wait_for_event(dut0, aconsts.SESSION_CB_ON_PUBLISH_STARTED)
186
187        results01 = []
188        results10 = []
189        for i in range(iter_count):
190            if i != 0 and time_between_iterations != 0:
191                time.sleep(time_between_iterations)
192
193            # perform RTT from dut0 -> dut1
194            results01.append(self.run_rtt_discovery(dut0, resp_mac=mac1))
195
196            if do_both_directions:
197                if time_between_roles != 0:
198                    time.sleep(time_between_roles)
199
200                # perform RTT from dut1 -> dut0
201                results10.append(self.run_rtt_discovery(dut1, resp_mac=mac0))
202
203        return results01 if not do_both_directions else [results01, results10]
204
205    def verify_results(self, results, results_reverse_direction=None, accuracy_evaluation=False):
206        """Verifies the results of the RTT experiment.
207
208        Args:
209              results: List of RTT results.
210              results_reverse_direction: List of RTT results executed in the
211                                        reverse direction. Optional.
212              accuracy_evaluation: False - only evaluate success rate.
213                                   True - evaluate both success rate and accuracy
214                                   default is False.
215        """
216        stats = rutils.extract_stats(results, self.rtt_reference_distance_mm,
217                                     self.rtt_reference_distance_margin_mm,
218                                     self.rtt_min_expected_rssi_dbm)
219        stats_reverse_direction = None
220        if results_reverse_direction is not None:
221            stats_reverse_direction = rutils.extract_stats(
222                results_reverse_direction, self.rtt_reference_distance_mm,
223                self.rtt_reference_distance_margin_mm,
224                self.rtt_min_expected_rssi_dbm)
225        self.log.debug("Stats: %s", stats)
226        if stats_reverse_direction is not None:
227            self.log.debug("Stats in reverse direction: %s",
228                           stats_reverse_direction)
229
230        extras = stats if stats_reverse_direction is None else {
231            "forward": stats,
232            "reverse": stats_reverse_direction
233        }
234
235        asserts.assert_true(
236            stats['num_no_results'] == 0,
237            "Missing (timed-out) results",
238            extras=extras)
239        asserts.assert_false(
240            stats['any_lci_mismatch'], "LCI mismatch", extras=extras)
241        asserts.assert_false(
242            stats['any_lcr_mismatch'], "LCR mismatch", extras=extras)
243        asserts.assert_false(
244            stats['invalid_num_attempted'],
245            "Invalid (0) number of attempts",
246            extras=stats)
247        asserts.assert_false(
248            stats['invalid_num_successful'],
249            "Invalid (0) number of successes",
250            extras=stats)
251        asserts.assert_equal(
252            stats['num_invalid_rssi'], 0, "Invalid RSSI", extras=extras)
253        asserts.assert_true(
254            stats['num_failures'] <=
255            self.rtt_max_failure_rate_two_sided_rtt_percentage *
256            stats['num_results'] / 100,
257            "Failure rate is too high",
258            extras=extras)
259        if accuracy_evaluation:
260            asserts.assert_true(
261                stats['num_range_out_of_margin'] <=
262                self.rtt_max_margin_exceeded_rate_two_sided_rtt_percentage *
263                stats['num_success_results'] / 100,
264                "Results exceeding error margin rate is too high",
265                extras=extras)
266
267        if stats_reverse_direction is not None:
268            asserts.assert_true(
269                stats_reverse_direction['num_no_results'] == 0,
270                "Missing (timed-out) results",
271                extras=extras)
272            asserts.assert_false(
273                stats_reverse_direction['any_lci_mismatch'], "LCI mismatch", extras=extras)
274            asserts.assert_false(
275                stats_reverse_direction['any_lcr_mismatch'], "LCR mismatch", extras=extras)
276            asserts.assert_equal(
277                stats_reverse_direction['num_invalid_rssi'], 0, "Invalid RSSI", extras=extras)
278            asserts.assert_true(
279                stats_reverse_direction['num_failures'] <=
280                self.rtt_max_failure_rate_two_sided_rtt_percentage *
281                stats_reverse_direction['num_results'] / 100,
282                "Failure rate is too high",
283                extras=extras)
284            if accuracy_evaluation:
285                asserts.assert_true(
286                    stats_reverse_direction['num_range_out_of_margin'] <=
287                    self.rtt_max_margin_exceeded_rate_two_sided_rtt_percentage *
288                    stats_reverse_direction['num_success_results'] / 100,
289                    "Results exceeding error margin rate is too high",
290                    extras=extras)
291
292        asserts.explicit_pass("RTT Aware test done", extras=extras)
293
294    #############################################################################
295
296    @test_tracker_info(uuid="9e4e7ab4-2254-498c-9788-21e15ed9a370")
297    def test_rtt_oob_discovery_one_way(self):
298        """Perform RTT between 2 Wi-Fi Aware devices. Use out-of-band discovery
299        to communicate the MAC addresses to the peer. Test one-direction RTT only.
300        Functionality test: Only evaluate success rate.
301        """
302        rtt_results = self.run_rtt_oob_discovery_set(
303            do_both_directions=False,
304            iter_count=self.NUM_ITER,
305            time_between_iterations=self.TIME_BETWEEN_ITERATIONS,
306            time_between_roles=self.TIME_BETWEEN_ROLES)
307        self.verify_results(rtt_results)
308
309    @test_tracker_info(uuid="22edba77-eeb2-43ee-875a-84437550ad84")
310    def test_rtt_oob_discovery_both_ways(self):
311        """Perform RTT between 2 Wi-Fi Aware devices. Use out-of-band discovery
312        to communicate the MAC addresses to the peer. Test RTT both-ways:
313        switching rapidly between Initiator and Responder.
314        Functionality test: Only evaluate success rate.
315        """
316        rtt_results1, rtt_results2 = self.run_rtt_oob_discovery_set(
317            do_both_directions=True,
318            iter_count=self.NUM_ITER,
319            time_between_iterations=self.TIME_BETWEEN_ITERATIONS,
320            time_between_roles=self.TIME_BETWEEN_ROLES)
321        self.verify_results(rtt_results1, rtt_results2)
322
323    @test_tracker_info(uuid="18cef4be-95b4-4f7d-a140-5165874e7d1c")
324    def test_rtt_ib_discovery_one_way(self):
325        """Perform RTT between 2 Wi-Fi Aware devices. Use in-band (Aware) discovery
326        to communicate the MAC addresses to the peer. Test one-direction RTT only.
327        Functionality test: Only evaluate success rate.
328        """
329        rtt_results = self.run_rtt_ib_discovery_set(
330            do_both_directions=False,
331            iter_count=self.NUM_ITER,
332            time_between_iterations=self.TIME_BETWEEN_ITERATIONS,
333            time_between_roles=self.TIME_BETWEEN_ROLES)
334        self.verify_results(rtt_results)
335
336    @test_tracker_info(uuid="c67c8e70-c417-42d9-9bca-af3a89f1ddd9")
337    def test_rtt_ib_discovery_both_ways(self):
338        """Perform RTT between 2 Wi-Fi Aware devices. Use in-band (Aware) discovery
339        to communicate the MAC addresses to the peer. Test RTT both-ways:
340        switching rapidly between Initiator and Responder.
341        Functionality test: Only evaluate success rate.
342        """
343        rtt_results1, rtt_results2 = self.run_rtt_ib_discovery_set(
344            do_both_directions=True,
345            iter_count=self.NUM_ITER,
346            time_between_iterations=self.TIME_BETWEEN_ITERATIONS,
347            time_between_roles=self.TIME_BETWEEN_ROLES)
348        self.verify_results(rtt_results1, rtt_results2)
349
350    @test_tracker_info(uuid="3a1d19a2-7241-49e0-aaf2-0a1da4c29783")
351    def test_rtt_oob_discovery_one_way_with_accuracy_evaluation(self):
352        """Perform RTT between 2 Wi-Fi Aware devices. Use out-of-band discovery
353        to communicate the MAC addresses to the peer. Test one-direction RTT only.
354        Performance test: evaluate success rate and accuracy.
355        """
356        rtt_results = self.run_rtt_oob_discovery_set(
357            do_both_directions=False,
358            iter_count=self.NUM_ITER,
359            time_between_iterations=self.TIME_BETWEEN_ITERATIONS,
360            time_between_roles=self.TIME_BETWEEN_ROLES)
361        self.verify_results(rtt_results, accuracy_evaluation=True)
362
363    @test_tracker_info(uuid="82f954a5-c0ec-4bc6-8940-3b72199328ac")
364    def test_rtt_oob_discovery_both_ways_with_accuracy_evaluation(self):
365        """Perform RTT between 2 Wi-Fi Aware devices. Use out-of-band discovery
366        to communicate the MAC addresses to the peer. Test RTT both-ways:
367        switching rapidly between Initiator and Responder.
368        Performance test: evaluate success rate and accuracy.
369        """
370        rtt_results1, rtt_results2 = self.run_rtt_oob_discovery_set(
371            do_both_directions=True,
372            iter_count=self.NUM_ITER,
373            time_between_iterations=self.TIME_BETWEEN_ITERATIONS,
374            time_between_roles=self.TIME_BETWEEN_ROLES)
375        self.verify_results(rtt_results1, rtt_results2, accuracy_evaluation=True)
376
377    @test_tracker_info(uuid="4194e00c-ea49-488e-b67f-ad9360daa5fa")
378    def test_rtt_ib_discovery_one_way_with_accuracy_evaluation(self):
379        """Perform RTT between 2 Wi-Fi Aware devices. Use in-band (Aware) discovery
380        to communicate the MAC addresses to the peer. Test one-direction RTT only.
381        Performance test: evaluate success rate and accuracy.
382        """
383        rtt_results = self.run_rtt_ib_discovery_set(
384            do_both_directions=False,
385            iter_count=self.NUM_ITER,
386            time_between_iterations=self.TIME_BETWEEN_ITERATIONS,
387            time_between_roles=self.TIME_BETWEEN_ROLES)
388        self.verify_results(rtt_results, accuracy_evaluation=True)
389
390    @test_tracker_info(uuid="bea3ac8b-756d-4cd8-8936-b8bfe64676c9")
391    def test_rtt_ib_discovery_both_ways_with_accuracy_evaluation(self):
392        """Perform RTT between 2 Wi-Fi Aware devices. Use in-band (Aware) discovery
393        to communicate the MAC addresses to the peer. Test RTT both-ways:
394        switching rapidly between Initiator and Responder.
395        Performance test: evaluate success rate and accuracy.
396        """
397        rtt_results1, rtt_results2 = self.run_rtt_ib_discovery_set(
398            do_both_directions=True,
399            iter_count=self.NUM_ITER,
400            time_between_iterations=self.TIME_BETWEEN_ITERATIONS,
401            time_between_roles=self.TIME_BETWEEN_ROLES)
402        self.verify_results(rtt_results1, rtt_results2, accuracy_evaluation=True)
403
404    @test_tracker_info(uuid="54f9693d-45e5-4979-adbb-1b875d217c0c")
405    def test_rtt_without_initiator_aware(self):
406        """Try to perform RTT operation when there is no local Aware session (on the
407        Initiator). The Responder is configured normally: Aware on and a Publisher
408        with Ranging enable. Should FAIL.
409        """
410        init_dut = self.android_devices[0]
411        resp_dut = self.android_devices[1]
412
413        # Enable a Responder and start a Publisher
414        resp_id = resp_dut.droid.wifiAwareAttach(True)
415        autils.wait_for_event(resp_dut, aconsts.EVENT_CB_ON_ATTACHED)
416        resp_ident_event = autils.wait_for_event(
417            resp_dut, aconsts.EVENT_CB_ON_IDENTITY_CHANGED)
418        resp_mac = resp_ident_event['data']['mac']
419
420        resp_config = autils.add_ranging_to_pub(
421            autils.create_discovery_config(self.SERVICE_NAME,
422                                           aconsts.PUBLISH_TYPE_UNSOLICITED),
423            enable_ranging=True)
424        resp_dut.droid.wifiAwarePublish(resp_id, resp_config)
425        autils.wait_for_event(resp_dut, aconsts.SESSION_CB_ON_PUBLISH_STARTED)
426
427        # Initiate an RTT to Responder (no Aware started on Initiator!)
428        results = []
429        num_no_responses = 0
430        num_successes = 0
431        for i in range(self.NUM_ITER):
432            result = self.run_rtt_discovery(init_dut, resp_mac=resp_mac)
433            self.log.debug("result: %s", result)
434            results.append(result)
435            if result is None:
436                num_no_responses = num_no_responses + 1
437            elif (result[rconsts.EVENT_CB_RANGING_KEY_STATUS] ==
438                  rconsts.EVENT_CB_RANGING_STATUS_SUCCESS):
439                num_successes = num_successes + 1
440
441        asserts.assert_equal(
442            num_no_responses, 0, "No RTT response?", extras={"data": results})
443        asserts.assert_equal(
444            num_successes,
445            0,
446            "Aware RTT w/o Aware should FAIL!",
447            extras={"data": results})
448        asserts.explicit_pass("RTT Aware test done", extras={"data": results})
449
450    @test_tracker_info(uuid="87a69053-8261-4928-8ec1-c93aac7f3a8d")
451    def test_rtt_without_responder_aware(self):
452        """Try to perform RTT operation when there is no peer Aware session (on the
453        Responder). Should FAIL."""
454        init_dut = self.android_devices[0]
455        resp_dut = self.android_devices[1]
456
457        # Enable a Responder and start a Publisher
458        resp_id = resp_dut.droid.wifiAwareAttach(True)
459        autils.wait_for_event(resp_dut, aconsts.EVENT_CB_ON_ATTACHED)
460        resp_ident_event = autils.wait_for_event(
461            resp_dut, aconsts.EVENT_CB_ON_IDENTITY_CHANGED)
462        resp_mac = resp_ident_event['data']['mac']
463
464        resp_config = autils.add_ranging_to_pub(
465            autils.create_discovery_config(self.SERVICE_NAME,
466                                           aconsts.PUBLISH_TYPE_UNSOLICITED),
467            enable_ranging=True)
468        resp_dut.droid.wifiAwarePublish(resp_id, resp_config)
469        autils.wait_for_event(resp_dut, aconsts.SESSION_CB_ON_PUBLISH_STARTED)
470
471        # Disable Responder
472        resp_dut.droid.wifiAwareDestroy(resp_id)
473
474        # Enable the Initiator
475        init_id = init_dut.droid.wifiAwareAttach()
476        autils.wait_for_event(init_dut, aconsts.EVENT_CB_ON_ATTACHED)
477
478        # Initiate an RTT to Responder (no Aware started on Initiator!)
479        results = []
480        num_no_responses = 0
481        num_successes = 0
482        for i in range(self.NUM_ITER):
483            result = self.run_rtt_discovery(init_dut, resp_mac=resp_mac)
484            self.log.debug("result: %s", result)
485            results.append(result)
486            if result is None:
487                num_no_responses = num_no_responses + 1
488            elif (result[rconsts.EVENT_CB_RANGING_KEY_STATUS] ==
489                  rconsts.EVENT_CB_RANGING_STATUS_SUCCESS):
490                num_successes = num_successes + 1
491
492        asserts.assert_equal(
493            num_no_responses, 0, "No RTT response?", extras={"data": results})
494        asserts.assert_equal(
495            num_successes,
496            0,
497            "Aware RTT w/o Aware should FAIL!",
498            extras={"data": results})
499        asserts.explicit_pass("RTT Aware test done", extras={"data": results})
500