1#!/usr/bin/env python3.4 2# 3# Copyright 2017 - Google 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import json 18import queue 19import statistics 20import time 21 22from acts import asserts 23from acts.test_utils.wifi import wifi_test_utils as wutils 24from acts.test_utils.wifi.rtt import rtt_const as rconsts 25 26# arbitrary timeout for events 27EVENT_TIMEOUT = 15 28 29 30def decorate_event(event_name, id): 31 return '%s_%d' % (event_name, id) 32 33 34def wait_for_event(ad, event_name, timeout=EVENT_TIMEOUT): 35 """Wait for the specified event or timeout. 36 37 Args: 38 ad: The android device 39 event_name: The event to wait on 40 timeout: Number of seconds to wait 41 Returns: 42 The event (if available) 43 """ 44 prefix = '' 45 if hasattr(ad, 'pretty_name'): 46 prefix = '[%s] ' % ad.pretty_name 47 try: 48 event = ad.ed.pop_event(event_name, timeout) 49 ad.log.info('%s%s: %s', prefix, event_name, event['data']) 50 return event 51 except queue.Empty: 52 ad.log.info('%sTimed out while waiting for %s', prefix, event_name) 53 asserts.fail(event_name) 54 55 56def fail_on_event(ad, event_name, timeout=EVENT_TIMEOUT): 57 """Wait for a timeout period and looks for the specified event - fails if it 58 is observed. 59 60 Args: 61 ad: The android device 62 event_name: The event to wait for (and fail on its appearance) 63 """ 64 prefix = '' 65 if hasattr(ad, 'pretty_name'): 66 prefix = '[%s] ' % ad.pretty_name 67 try: 68 event = ad.ed.pop_event(event_name, timeout) 69 ad.log.info('%sReceived unwanted %s: %s', prefix, event_name, 70 event['data']) 71 asserts.fail(event_name, extras=event) 72 except queue.Empty: 73 ad.log.info('%s%s not seen (as expected)', prefix, event_name) 74 return 75 76 77def get_rtt_capabilities(ad): 78 """Get the Wi-Fi RTT capabilities from the specified device. The 79 capabilities are a dictionary keyed by rtt_const.CAP_* keys. 80 81 Args: 82 ad: the Android device 83 Returns: the capability dictionary. 84 """ 85 return json.loads(ad.adb.shell('cmd wifirtt get_capabilities')) 86 87 88def config_privilege_override(dut, override_to_no_privilege): 89 """Configure the device to override the permission check and to disallow any 90 privileged RTT operations, e.g. disallow one-sided RTT to Responders (APs) 91 which do not support IEEE 802.11mc. 92 93 Args: 94 dut: Device to configure. 95 override_to_no_privilege: True to indicate no privileged ops, False for 96 default (which will allow privileged ops). 97 """ 98 dut.adb.shell("cmd wifirtt set override_assume_no_privilege %d" % 99 (1 if override_to_no_privilege else 0)) 100 101 102def get_rtt_constrained_results(scanned_networks, support_rtt): 103 """Filter the input list and only return those networks which either support 104 or do not support RTT (IEEE 802.11mc.) 105 106 Args: 107 scanned_networks: A list of networks from scan results. 108 support_rtt: True - only return those APs which support RTT, False - only 109 return those APs which do not support RTT. 110 111 Returns: a sub-set of the scanned_networks per support_rtt constraint. 112 """ 113 matching_networks = [] 114 for network in scanned_networks: 115 if support_rtt: 116 if (rconsts.SCAN_RESULT_KEY_RTT_RESPONDER in network 117 and network[rconsts.SCAN_RESULT_KEY_RTT_RESPONDER]): 118 matching_networks.append(network) 119 else: 120 if (rconsts.SCAN_RESULT_KEY_RTT_RESPONDER not in network 121 or not network[rconsts.SCAN_RESULT_KEY_RTT_RESPONDER]): 122 matching_networks.append(network) 123 124 return matching_networks 125 126 127def scan_networks(dut, max_tries=3): 128 """Perform a scan and return scan results. 129 130 Args: 131 dut: Device under test. 132 max_retries: Retry scan to ensure network is found 133 134 Returns: an array of scan results. 135 """ 136 scan_results = [] 137 for num_tries in range(max_tries): 138 wutils.start_wifi_connection_scan(dut) 139 scan_results = dut.droid.wifiGetScanResults() 140 if scan_results: 141 break 142 return scan_results 143 144 145def scan_with_rtt_support_constraint(dut, support_rtt, repeat=0): 146 """Perform a scan and return scan results of APs: only those that support or 147 do not support RTT (IEEE 802.11mc) - per the support_rtt parameter. 148 149 Args: 150 dut: Device under test. 151 support_rtt: True - only return those APs which support RTT, False - only 152 return those APs which do not support RTT. 153 repeat: Re-scan this many times to find an RTT supporting network. 154 155 Returns: an array of scan results. 156 """ 157 for i in range(repeat + 1): 158 scan_results = scan_networks(dut) 159 aps = get_rtt_constrained_results(scan_results, support_rtt) 160 if len(aps) != 0: 161 return aps 162 163 return [] 164 165 166def select_best_scan_results(scans, select_count, lowest_rssi=-80): 167 """Select the strongest 'select_count' scans in the input list based on 168 highest RSSI. Exclude all very weak signals, even if results in a shorter 169 list. 170 171 Args: 172 scans: List of scan results. 173 select_count: An integer specifying how many scans to return at most. 174 lowest_rssi: The lowest RSSI to accept into the output. 175 Returns: a list of the strongest 'select_count' scan results from the scans 176 list. 177 """ 178 179 def takeRssi(element): 180 return element['level'] 181 182 result = [] 183 scans.sort(key=takeRssi, reverse=True) 184 for scan in scans: 185 if len(result) == select_count: 186 break 187 if scan['level'] < lowest_rssi: 188 break # rest are lower since we're sorted 189 result.append(scan) 190 191 return result 192 193 194def validate_ap_result(scan_result, range_result): 195 """Validate the range results: 196 - Successful if AP (per scan result) support 802.11mc (allowed to fail 197 otherwise) 198 - MAC of result matches the BSSID 199 200 Args: 201 scan_result: Scan result for the AP 202 range_result: Range result returned by the RTT API 203 """ 204 asserts.assert_equal( 205 scan_result[wutils.WifiEnums.BSSID_KEY], 206 range_result[rconsts.EVENT_CB_RANGING_KEY_MAC_AS_STRING_BSSID], 207 'MAC/BSSID mismatch') 208 if (rconsts.SCAN_RESULT_KEY_RTT_RESPONDER in scan_result 209 and scan_result[rconsts.SCAN_RESULT_KEY_RTT_RESPONDER]): 210 asserts.assert_true( 211 range_result[rconsts.EVENT_CB_RANGING_KEY_STATUS] == 212 rconsts.EVENT_CB_RANGING_STATUS_SUCCESS, 213 'Ranging failed for an AP which supports 802.11mc!') 214 215 216def validate_ap_results(scan_results, range_results): 217 """Validate an array of ranging results against the scan results used to 218 trigger the range. The assumption is that the results are returned in the 219 same order as the request (which were the scan results). 220 221 Args: 222 scan_results: Scans results used to trigger the range request 223 range_results: Range results returned by the RTT API 224 """ 225 asserts.assert_equal( 226 len(scan_results), len(range_results), 227 'Mismatch in length of scan results and range results') 228 229 # sort first based on BSSID/MAC 230 scan_results.sort(key=lambda x: x[wutils.WifiEnums.BSSID_KEY]) 231 range_results.sort( 232 key=lambda x: x[rconsts.EVENT_CB_RANGING_KEY_MAC_AS_STRING_BSSID]) 233 234 for i in range(len(scan_results)): 235 validate_ap_result(scan_results[i], range_results[i]) 236 237 238def validate_aware_mac_result(range_result, mac, description): 239 """Validate the range result for an Aware peer specified with a MAC address: 240 - Correct MAC address. 241 242 The MAC addresses may contain ":" (which are ignored for the comparison) and 243 may be in any case (which is ignored for the comparison). 244 245 Args: 246 range_result: Range result returned by the RTT API 247 mac: MAC address of the peer 248 description: Additional content to print on failure 249 """ 250 mac1 = mac.replace(':', '').lower() 251 mac2 = range_result[rconsts.EVENT_CB_RANGING_KEY_MAC_AS_STRING].replace( 252 ':', '').lower() 253 asserts.assert_equal(mac1, mac2, '%s: MAC mismatch' % description) 254 255 256def validate_aware_peer_id_result(range_result, peer_id, description): 257 """Validate the range result for An Aware peer specified with a Peer ID: 258 - Correct Peer ID 259 - MAC address information not available 260 261 Args: 262 range_result: Range result returned by the RTT API 263 peer_id: Peer ID of the peer 264 description: Additional content to print on failure 265 """ 266 asserts.assert_equal(peer_id, 267 range_result[rconsts.EVENT_CB_RANGING_KEY_PEER_ID], 268 '%s: Peer Id mismatch' % description) 269 asserts.assert_false(rconsts.EVENT_CB_RANGING_KEY_MAC in range_result, 270 '%s: MAC Address not empty!' % description) 271 272 273def extract_stats(results, 274 range_reference_mm, 275 range_margin_mm, 276 min_rssi, 277 reference_lci=[], 278 reference_lcr=[], 279 summary_only=False): 280 """Extract statistics from a list of RTT results. Returns a dictionary 281 with results: 282 - num_results (success or fails) 283 - num_success_results 284 - num_no_results (e.g. timeout) 285 - num_failures 286 - num_range_out_of_margin (only for successes) 287 - num_invalid_rssi (only for successes) 288 - distances: extracted list of distances 289 - distance_std_devs: extracted list of distance standard-deviations 290 - rssis: extracted list of RSSI 291 - distance_mean 292 - distance_std_dev (based on distance - ignoring the individual std-devs) 293 - rssi_mean 294 - rssi_std_dev 295 - status_codes 296 - lcis: extracted list of all of the individual LCI 297 - lcrs: extracted list of all of the individual LCR 298 - any_lci_mismatch: True/False - checks if all LCI results are identical to 299 the reference LCI. 300 - any_lcr_mismatch: True/False - checks if all LCR results are identical to 301 the reference LCR. 302 - num_attempted_measurements: extracted list of all of the individual 303 number of attempted measurements. 304 - num_successful_measurements: extracted list of all of the individual 305 number of successful measurements. 306 - invalid_num_attempted: True/False - checks if number of attempted 307 measurements is non-zero for successful results. 308 - invalid_num_successful: True/False - checks if number of successful 309 measurements is non-zero for successful results. 310 311 Args: 312 results: List of RTT results. 313 range_reference_mm: Reference value for the distance (in mm) 314 range_margin_mm: Acceptable absolute margin for distance (in mm) 315 min_rssi: Acceptable minimum RSSI value. 316 reference_lci, reference_lcr: Reference values for LCI and LCR. 317 summary_only: Only include summary keys (reduce size). 318 319 Returns: A dictionary of stats. 320 """ 321 stats = {} 322 stats['num_results'] = 0 323 stats['num_success_results'] = 0 324 stats['num_no_results'] = 0 325 stats['num_failures'] = 0 326 stats['num_range_out_of_margin'] = 0 327 stats['num_invalid_rssi'] = 0 328 stats['any_lci_mismatch'] = False 329 stats['any_lcr_mismatch'] = False 330 stats['invalid_num_attempted'] = False 331 stats['invalid_num_successful'] = False 332 333 range_max_mm = range_reference_mm + range_margin_mm 334 range_min_mm = range_reference_mm - range_margin_mm 335 336 distances = [] 337 distance_std_devs = [] 338 rssis = [] 339 num_attempted_measurements = [] 340 num_successful_measurements = [] 341 status_codes = [] 342 lcis = [] 343 lcrs = [] 344 345 for i in range(len(results)): 346 result = results[i] 347 348 if result is None: # None -> timeout waiting for RTT result 349 stats['num_no_results'] = stats['num_no_results'] + 1 350 continue 351 stats['num_results'] = stats['num_results'] + 1 352 353 status_codes.append(result[rconsts.EVENT_CB_RANGING_KEY_STATUS]) 354 if status_codes[-1] != rconsts.EVENT_CB_RANGING_STATUS_SUCCESS: 355 stats['num_failures'] = stats['num_failures'] + 1 356 continue 357 stats['num_success_results'] = stats['num_success_results'] + 1 358 359 distance_mm = result[rconsts.EVENT_CB_RANGING_KEY_DISTANCE_MM] 360 distances.append(distance_mm) 361 if not range_min_mm <= distance_mm <= range_max_mm: 362 stats[ 363 'num_range_out_of_margin'] = stats['num_range_out_of_margin'] + 1 364 distance_std_devs.append( 365 result[rconsts.EVENT_CB_RANGING_KEY_DISTANCE_STD_DEV_MM]) 366 367 rssi = result[rconsts.EVENT_CB_RANGING_KEY_RSSI] 368 rssis.append(rssi) 369 if not min_rssi <= rssi <= 0: 370 stats['num_invalid_rssi'] = stats['num_invalid_rssi'] + 1 371 372 num_attempted = result[ 373 rconsts.EVENT_CB_RANGING_KEY_NUM_ATTEMPTED_MEASUREMENTS] 374 num_attempted_measurements.append(num_attempted) 375 if num_attempted == 0: 376 stats['invalid_num_attempted'] = True 377 378 num_successful = result[ 379 rconsts.EVENT_CB_RANGING_KEY_NUM_SUCCESSFUL_MEASUREMENTS] 380 num_successful_measurements.append(num_successful) 381 if num_successful == 0: 382 stats['invalid_num_successful'] = True 383 384 lcis.append(result[rconsts.EVENT_CB_RANGING_KEY_LCI]) 385 if (result[rconsts.EVENT_CB_RANGING_KEY_LCI] != reference_lci): 386 stats['any_lci_mismatch'] = True 387 lcrs.append(result[rconsts.EVENT_CB_RANGING_KEY_LCR]) 388 if (result[rconsts.EVENT_CB_RANGING_KEY_LCR] != reference_lcr): 389 stats['any_lcr_mismatch'] = True 390 391 if len(distances) > 0: 392 stats['distance_mean'] = statistics.mean(distances) 393 if len(distances) > 1: 394 stats['distance_std_dev'] = statistics.stdev(distances) 395 if len(rssis) > 0: 396 stats['rssi_mean'] = statistics.mean(rssis) 397 if len(rssis) > 1: 398 stats['rssi_std_dev'] = statistics.stdev(rssis) 399 if not summary_only: 400 stats['distances'] = distances 401 stats['distance_std_devs'] = distance_std_devs 402 stats['rssis'] = rssis 403 stats['num_attempted_measurements'] = num_attempted_measurements 404 stats['num_successful_measurements'] = num_successful_measurements 405 stats['status_codes'] = status_codes 406 stats['lcis'] = lcis 407 stats['lcrs'] = lcrs 408 409 return stats 410 411 412def run_ranging(dut, 413 aps, 414 iter_count, 415 time_between_iterations, 416 target_run_time_sec=0): 417 """Executing ranging to the set of APs. 418 419 Will execute a minimum of 'iter_count' iterations. Will continue to run 420 until execution time (just) exceeds 'target_run_time_sec'. 421 422 Args: 423 dut: Device under test 424 aps: A list of APs (Access Points) to range to. 425 iter_count: (Minimum) Number of measurements to perform. 426 time_between_iterations: Number of seconds to wait between iterations. 427 target_run_time_sec: The target run time in seconds. 428 429 Returns: a list of the events containing the RTT results (or None for a 430 failed measurement). 431 """ 432 max_peers = dut.droid.wifiRttMaxPeersInRequest() 433 434 asserts.assert_true(len(aps) > 0, "Need at least one AP!") 435 if len(aps) > max_peers: 436 aps = aps[0:max_peers] 437 438 events = {} # need to keep track per BSSID! 439 for ap in aps: 440 events[ap["BSSID"]] = [] 441 442 start_clock = time.time() 443 iterations_done = 0 444 run_time = 0 445 while iterations_done < iter_count or (target_run_time_sec != 0 446 and run_time < target_run_time_sec): 447 if iterations_done != 0 and time_between_iterations != 0: 448 time.sleep(time_between_iterations) 449 450 id = dut.droid.wifiRttStartRangingToAccessPoints(aps) 451 try: 452 event = dut.ed.pop_event( 453 decorate_event(rconsts.EVENT_CB_RANGING_ON_RESULT, id), 454 EVENT_TIMEOUT) 455 range_results = event["data"][rconsts.EVENT_CB_RANGING_KEY_RESULTS] 456 asserts.assert_equal( 457 len(aps), len(range_results), 458 'Mismatch in length of scan results and range results') 459 for result in range_results: 460 bssid = result[rconsts.EVENT_CB_RANGING_KEY_MAC_AS_STRING] 461 asserts.assert_true( 462 bssid in events, 463 "Result BSSID %s not in requested AP!?" % bssid) 464 asserts.assert_equal( 465 len(events[bssid]), iterations_done, 466 "Duplicate results for BSSID %s!?" % bssid) 467 events[bssid].append(result) 468 except queue.Empty: 469 for ap in aps: 470 events[ap["BSSID"]].append(None) 471 472 iterations_done = iterations_done + 1 473 run_time = time.time() - start_clock 474 475 return events 476 477 478def analyze_results(all_aps_events, 479 rtt_reference_distance_mm, 480 distance_margin_mm, 481 min_expected_rssi, 482 lci_reference, 483 lcr_reference, 484 summary_only=False): 485 """Verifies the results of the RTT experiment. 486 487 Args: 488 all_aps_events: Dictionary of APs, each a list of RTT result events. 489 rtt_reference_distance_mm: Expected distance to the AP (source of truth). 490 distance_margin_mm: Accepted error marging in distance measurement. 491 min_expected_rssi: Minimum acceptable RSSI value 492 lci_reference, lcr_reference: Expected LCI/LCR values (arrays of bytes). 493 summary_only: Only include summary keys (reduce size). 494 """ 495 all_stats = {} 496 for bssid, events in all_aps_events.items(): 497 stats = extract_stats(events, rtt_reference_distance_mm, 498 distance_margin_mm, min_expected_rssi, 499 lci_reference, lcr_reference, summary_only) 500 all_stats[bssid] = stats 501 return all_stats 502