1#!/usr/bin/env python3.4
2#
3#   Copyright 2017 - Google
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import json
18import queue
19import statistics
20import time
21
22from acts import asserts
23from acts.test_utils.wifi import wifi_test_utils as wutils
24from acts.test_utils.wifi.rtt import rtt_const as rconsts
25
26# arbitrary timeout for events
27EVENT_TIMEOUT = 15
28
29
30def decorate_event(event_name, id):
31    return '%s_%d' % (event_name, id)
32
33
34def wait_for_event(ad, event_name, timeout=EVENT_TIMEOUT):
35    """Wait for the specified event or timeout.
36
37  Args:
38    ad: The android device
39    event_name: The event to wait on
40    timeout: Number of seconds to wait
41  Returns:
42    The event (if available)
43  """
44    prefix = ''
45    if hasattr(ad, 'pretty_name'):
46        prefix = '[%s] ' % ad.pretty_name
47    try:
48        event = ad.ed.pop_event(event_name, timeout)
49        ad.log.info('%s%s: %s', prefix, event_name, event['data'])
50        return event
51    except queue.Empty:
52        ad.log.info('%sTimed out while waiting for %s', prefix, event_name)
53        asserts.fail(event_name)
54
55
56def fail_on_event(ad, event_name, timeout=EVENT_TIMEOUT):
57    """Wait for a timeout period and looks for the specified event - fails if it
58  is observed.
59
60  Args:
61    ad: The android device
62    event_name: The event to wait for (and fail on its appearance)
63  """
64    prefix = ''
65    if hasattr(ad, 'pretty_name'):
66        prefix = '[%s] ' % ad.pretty_name
67    try:
68        event = ad.ed.pop_event(event_name, timeout)
69        ad.log.info('%sReceived unwanted %s: %s', prefix, event_name,
70                    event['data'])
71        asserts.fail(event_name, extras=event)
72    except queue.Empty:
73        ad.log.info('%s%s not seen (as expected)', prefix, event_name)
74        return
75
76
77def get_rtt_capabilities(ad):
78    """Get the Wi-Fi RTT capabilities from the specified device. The
79  capabilities are a dictionary keyed by rtt_const.CAP_* keys.
80
81  Args:
82    ad: the Android device
83  Returns: the capability dictionary.
84  """
85    return json.loads(ad.adb.shell('cmd wifirtt get_capabilities'))
86
87
88def config_privilege_override(dut, override_to_no_privilege):
89    """Configure the device to override the permission check and to disallow any
90  privileged RTT operations, e.g. disallow one-sided RTT to Responders (APs)
91  which do not support IEEE 802.11mc.
92
93  Args:
94    dut: Device to configure.
95    override_to_no_privilege: True to indicate no privileged ops, False for
96                              default (which will allow privileged ops).
97  """
98    dut.adb.shell("cmd wifirtt set override_assume_no_privilege %d" %
99                  (1 if override_to_no_privilege else 0))
100
101
102def get_rtt_constrained_results(scanned_networks, support_rtt):
103    """Filter the input list and only return those networks which either support
104  or do not support RTT (IEEE 802.11mc.)
105
106  Args:
107    scanned_networks: A list of networks from scan results.
108      support_rtt: True - only return those APs which support RTT, False - only
109                   return those APs which do not support RTT.
110
111  Returns: a sub-set of the scanned_networks per support_rtt constraint.
112  """
113    matching_networks = []
114    for network in scanned_networks:
115        if support_rtt:
116            if (rconsts.SCAN_RESULT_KEY_RTT_RESPONDER in network
117                    and network[rconsts.SCAN_RESULT_KEY_RTT_RESPONDER]):
118                matching_networks.append(network)
119        else:
120            if (rconsts.SCAN_RESULT_KEY_RTT_RESPONDER not in network
121                    or not network[rconsts.SCAN_RESULT_KEY_RTT_RESPONDER]):
122                matching_networks.append(network)
123
124    return matching_networks
125
126
127def scan_networks(dut, max_tries=3):
128    """Perform a scan and return scan results.
129
130  Args:
131    dut: Device under test.
132    max_retries: Retry scan to ensure network is found
133
134  Returns: an array of scan results.
135  """
136    scan_results = []
137    for num_tries in range(max_tries):
138        wutils.start_wifi_connection_scan(dut)
139        scan_results = dut.droid.wifiGetScanResults()
140        if scan_results:
141            break
142    return scan_results
143
144
145def scan_with_rtt_support_constraint(dut, support_rtt, repeat=0):
146    """Perform a scan and return scan results of APs: only those that support or
147  do not support RTT (IEEE 802.11mc) - per the support_rtt parameter.
148
149  Args:
150    dut: Device under test.
151    support_rtt: True - only return those APs which support RTT, False - only
152                 return those APs which do not support RTT.
153    repeat: Re-scan this many times to find an RTT supporting network.
154
155  Returns: an array of scan results.
156  """
157    for i in range(repeat + 1):
158        scan_results = scan_networks(dut)
159        aps = get_rtt_constrained_results(scan_results, support_rtt)
160        if len(aps) != 0:
161            return aps
162
163    return []
164
165
166def select_best_scan_results(scans, select_count, lowest_rssi=-80):
167    """Select the strongest 'select_count' scans in the input list based on
168  highest RSSI. Exclude all very weak signals, even if results in a shorter
169  list.
170
171  Args:
172    scans: List of scan results.
173    select_count: An integer specifying how many scans to return at most.
174    lowest_rssi: The lowest RSSI to accept into the output.
175  Returns: a list of the strongest 'select_count' scan results from the scans
176           list.
177  """
178
179    def takeRssi(element):
180        return element['level']
181
182    result = []
183    scans.sort(key=takeRssi, reverse=True)
184    for scan in scans:
185        if len(result) == select_count:
186            break
187        if scan['level'] < lowest_rssi:
188            break  # rest are lower since we're sorted
189        result.append(scan)
190
191    return result
192
193
194def validate_ap_result(scan_result, range_result):
195    """Validate the range results:
196  - Successful if AP (per scan result) support 802.11mc (allowed to fail
197    otherwise)
198  - MAC of result matches the BSSID
199
200  Args:
201    scan_result: Scan result for the AP
202    range_result: Range result returned by the RTT API
203  """
204    asserts.assert_equal(
205        scan_result[wutils.WifiEnums.BSSID_KEY],
206        range_result[rconsts.EVENT_CB_RANGING_KEY_MAC_AS_STRING_BSSID],
207        'MAC/BSSID mismatch')
208    if (rconsts.SCAN_RESULT_KEY_RTT_RESPONDER in scan_result
209            and scan_result[rconsts.SCAN_RESULT_KEY_RTT_RESPONDER]):
210        asserts.assert_true(
211            range_result[rconsts.EVENT_CB_RANGING_KEY_STATUS] ==
212            rconsts.EVENT_CB_RANGING_STATUS_SUCCESS,
213            'Ranging failed for an AP which supports 802.11mc!')
214
215
216def validate_ap_results(scan_results, range_results):
217    """Validate an array of ranging results against the scan results used to
218  trigger the range. The assumption is that the results are returned in the
219  same order as the request (which were the scan results).
220
221  Args:
222    scan_results: Scans results used to trigger the range request
223    range_results: Range results returned by the RTT API
224  """
225    asserts.assert_equal(
226        len(scan_results), len(range_results),
227        'Mismatch in length of scan results and range results')
228
229    # sort first based on BSSID/MAC
230    scan_results.sort(key=lambda x: x[wutils.WifiEnums.BSSID_KEY])
231    range_results.sort(
232        key=lambda x: x[rconsts.EVENT_CB_RANGING_KEY_MAC_AS_STRING_BSSID])
233
234    for i in range(len(scan_results)):
235        validate_ap_result(scan_results[i], range_results[i])
236
237
238def validate_aware_mac_result(range_result, mac, description):
239    """Validate the range result for an Aware peer specified with a MAC address:
240  - Correct MAC address.
241
242  The MAC addresses may contain ":" (which are ignored for the comparison) and
243  may be in any case (which is ignored for the comparison).
244
245  Args:
246    range_result: Range result returned by the RTT API
247    mac: MAC address of the peer
248    description: Additional content to print on failure
249  """
250    mac1 = mac.replace(':', '').lower()
251    mac2 = range_result[rconsts.EVENT_CB_RANGING_KEY_MAC_AS_STRING].replace(
252        ':', '').lower()
253    asserts.assert_equal(mac1, mac2, '%s: MAC mismatch' % description)
254
255
256def validate_aware_peer_id_result(range_result, peer_id, description):
257    """Validate the range result for An Aware peer specified with a Peer ID:
258  - Correct Peer ID
259  - MAC address information not available
260
261  Args:
262    range_result: Range result returned by the RTT API
263    peer_id: Peer ID of the peer
264    description: Additional content to print on failure
265  """
266    asserts.assert_equal(peer_id,
267                         range_result[rconsts.EVENT_CB_RANGING_KEY_PEER_ID],
268                         '%s: Peer Id mismatch' % description)
269    asserts.assert_false(rconsts.EVENT_CB_RANGING_KEY_MAC in range_result,
270                         '%s: MAC Address not empty!' % description)
271
272
273def extract_stats(results,
274                  range_reference_mm,
275                  range_margin_mm,
276                  min_rssi,
277                  reference_lci=[],
278                  reference_lcr=[],
279                  summary_only=False):
280    """Extract statistics from a list of RTT results. Returns a dictionary
281   with results:
282     - num_results (success or fails)
283     - num_success_results
284     - num_no_results (e.g. timeout)
285     - num_failures
286     - num_range_out_of_margin (only for successes)
287     - num_invalid_rssi (only for successes)
288     - distances: extracted list of distances
289     - distance_std_devs: extracted list of distance standard-deviations
290     - rssis: extracted list of RSSI
291     - distance_mean
292     - distance_std_dev (based on distance - ignoring the individual std-devs)
293     - rssi_mean
294     - rssi_std_dev
295     - status_codes
296     - lcis: extracted list of all of the individual LCI
297     - lcrs: extracted list of all of the individual LCR
298     - any_lci_mismatch: True/False - checks if all LCI results are identical to
299                         the reference LCI.
300     - any_lcr_mismatch: True/False - checks if all LCR results are identical to
301                         the reference LCR.
302     - num_attempted_measurements: extracted list of all of the individual
303                                   number of attempted measurements.
304     - num_successful_measurements: extracted list of all of the individual
305                                    number of successful measurements.
306     - invalid_num_attempted: True/False - checks if number of attempted
307                              measurements is non-zero for successful results.
308     - invalid_num_successful: True/False - checks if number of successful
309                               measurements is non-zero for successful results.
310
311  Args:
312    results: List of RTT results.
313    range_reference_mm: Reference value for the distance (in mm)
314    range_margin_mm: Acceptable absolute margin for distance (in mm)
315    min_rssi: Acceptable minimum RSSI value.
316    reference_lci, reference_lcr: Reference values for LCI and LCR.
317    summary_only: Only include summary keys (reduce size).
318
319  Returns: A dictionary of stats.
320  """
321    stats = {}
322    stats['num_results'] = 0
323    stats['num_success_results'] = 0
324    stats['num_no_results'] = 0
325    stats['num_failures'] = 0
326    stats['num_range_out_of_margin'] = 0
327    stats['num_invalid_rssi'] = 0
328    stats['any_lci_mismatch'] = False
329    stats['any_lcr_mismatch'] = False
330    stats['invalid_num_attempted'] = False
331    stats['invalid_num_successful'] = False
332
333    range_max_mm = range_reference_mm + range_margin_mm
334    range_min_mm = range_reference_mm - range_margin_mm
335
336    distances = []
337    distance_std_devs = []
338    rssis = []
339    num_attempted_measurements = []
340    num_successful_measurements = []
341    status_codes = []
342    lcis = []
343    lcrs = []
344
345    for i in range(len(results)):
346        result = results[i]
347
348        if result is None:  # None -> timeout waiting for RTT result
349            stats['num_no_results'] = stats['num_no_results'] + 1
350            continue
351        stats['num_results'] = stats['num_results'] + 1
352
353        status_codes.append(result[rconsts.EVENT_CB_RANGING_KEY_STATUS])
354        if status_codes[-1] != rconsts.EVENT_CB_RANGING_STATUS_SUCCESS:
355            stats['num_failures'] = stats['num_failures'] + 1
356            continue
357        stats['num_success_results'] = stats['num_success_results'] + 1
358
359        distance_mm = result[rconsts.EVENT_CB_RANGING_KEY_DISTANCE_MM]
360        distances.append(distance_mm)
361        if not range_min_mm <= distance_mm <= range_max_mm:
362            stats[
363                'num_range_out_of_margin'] = stats['num_range_out_of_margin'] + 1
364        distance_std_devs.append(
365            result[rconsts.EVENT_CB_RANGING_KEY_DISTANCE_STD_DEV_MM])
366
367        rssi = result[rconsts.EVENT_CB_RANGING_KEY_RSSI]
368        rssis.append(rssi)
369        if not min_rssi <= rssi <= 0:
370            stats['num_invalid_rssi'] = stats['num_invalid_rssi'] + 1
371
372        num_attempted = result[
373            rconsts.EVENT_CB_RANGING_KEY_NUM_ATTEMPTED_MEASUREMENTS]
374        num_attempted_measurements.append(num_attempted)
375        if num_attempted == 0:
376            stats['invalid_num_attempted'] = True
377
378        num_successful = result[
379            rconsts.EVENT_CB_RANGING_KEY_NUM_SUCCESSFUL_MEASUREMENTS]
380        num_successful_measurements.append(num_successful)
381        if num_successful == 0:
382            stats['invalid_num_successful'] = True
383
384        lcis.append(result[rconsts.EVENT_CB_RANGING_KEY_LCI])
385        if (result[rconsts.EVENT_CB_RANGING_KEY_LCI] != reference_lci):
386            stats['any_lci_mismatch'] = True
387        lcrs.append(result[rconsts.EVENT_CB_RANGING_KEY_LCR])
388        if (result[rconsts.EVENT_CB_RANGING_KEY_LCR] != reference_lcr):
389            stats['any_lcr_mismatch'] = True
390
391    if len(distances) > 0:
392        stats['distance_mean'] = statistics.mean(distances)
393    if len(distances) > 1:
394        stats['distance_std_dev'] = statistics.stdev(distances)
395    if len(rssis) > 0:
396        stats['rssi_mean'] = statistics.mean(rssis)
397    if len(rssis) > 1:
398        stats['rssi_std_dev'] = statistics.stdev(rssis)
399    if not summary_only:
400        stats['distances'] = distances
401        stats['distance_std_devs'] = distance_std_devs
402        stats['rssis'] = rssis
403        stats['num_attempted_measurements'] = num_attempted_measurements
404        stats['num_successful_measurements'] = num_successful_measurements
405        stats['status_codes'] = status_codes
406        stats['lcis'] = lcis
407        stats['lcrs'] = lcrs
408
409    return stats
410
411
412def run_ranging(dut,
413                aps,
414                iter_count,
415                time_between_iterations,
416                target_run_time_sec=0):
417    """Executing ranging to the set of APs.
418
419  Will execute a minimum of 'iter_count' iterations. Will continue to run
420  until execution time (just) exceeds 'target_run_time_sec'.
421
422  Args:
423    dut: Device under test
424    aps: A list of APs (Access Points) to range to.
425    iter_count: (Minimum) Number of measurements to perform.
426    time_between_iterations: Number of seconds to wait between iterations.
427    target_run_time_sec: The target run time in seconds.
428
429  Returns: a list of the events containing the RTT results (or None for a
430  failed measurement).
431  """
432    max_peers = dut.droid.wifiRttMaxPeersInRequest()
433
434    asserts.assert_true(len(aps) > 0, "Need at least one AP!")
435    if len(aps) > max_peers:
436        aps = aps[0:max_peers]
437
438    events = {}  # need to keep track per BSSID!
439    for ap in aps:
440        events[ap["BSSID"]] = []
441
442    start_clock = time.time()
443    iterations_done = 0
444    run_time = 0
445    while iterations_done < iter_count or (target_run_time_sec != 0
446                                           and run_time < target_run_time_sec):
447        if iterations_done != 0 and time_between_iterations != 0:
448            time.sleep(time_between_iterations)
449
450        id = dut.droid.wifiRttStartRangingToAccessPoints(aps)
451        try:
452            event = dut.ed.pop_event(
453                decorate_event(rconsts.EVENT_CB_RANGING_ON_RESULT, id),
454                EVENT_TIMEOUT)
455            range_results = event["data"][rconsts.EVENT_CB_RANGING_KEY_RESULTS]
456            asserts.assert_equal(
457                len(aps), len(range_results),
458                'Mismatch in length of scan results and range results')
459            for result in range_results:
460                bssid = result[rconsts.EVENT_CB_RANGING_KEY_MAC_AS_STRING]
461                asserts.assert_true(
462                    bssid in events,
463                    "Result BSSID %s not in requested AP!?" % bssid)
464                asserts.assert_equal(
465                    len(events[bssid]), iterations_done,
466                    "Duplicate results for BSSID %s!?" % bssid)
467                events[bssid].append(result)
468        except queue.Empty:
469            for ap in aps:
470                events[ap["BSSID"]].append(None)
471
472        iterations_done = iterations_done + 1
473        run_time = time.time() - start_clock
474
475    return events
476
477
478def analyze_results(all_aps_events,
479                    rtt_reference_distance_mm,
480                    distance_margin_mm,
481                    min_expected_rssi,
482                    lci_reference,
483                    lcr_reference,
484                    summary_only=False):
485    """Verifies the results of the RTT experiment.
486
487  Args:
488    all_aps_events: Dictionary of APs, each a list of RTT result events.
489    rtt_reference_distance_mm: Expected distance to the AP (source of truth).
490    distance_margin_mm: Accepted error marging in distance measurement.
491    min_expected_rssi: Minimum acceptable RSSI value
492    lci_reference, lcr_reference: Expected LCI/LCR values (arrays of bytes).
493    summary_only: Only include summary keys (reduce size).
494  """
495    all_stats = {}
496    for bssid, events in all_aps_events.items():
497        stats = extract_stats(events, rtt_reference_distance_mm,
498                              distance_margin_mm, min_expected_rssi,
499                              lci_reference, lcr_reference, summary_only)
500        all_stats[bssid] = stats
501    return all_stats
502