1#!/usr/bin/env python3 2# 3# Copyright 2019 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the 'License'); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an 'AS IS' BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import os 18import re 19import time 20import logging 21import pandas as pd 22 23from acts import asserts 24from acts.libs.proc import job 25from acts.base_test import BaseTestClass 26 27from acts.test_utils.bt.bt_power_test_utils import MediaControl 28from acts.test_utils.bt.ble_performance_test_utils import run_ble_throughput_and_read_rssi 29from acts.test_utils.abstract_devices.bluetooth_handsfree_abstract_device import BluetoothHandsfreeAbstractDeviceFactory as bt_factory 30 31import acts.test_utils.bt.bt_test_utils as bt_utils 32import acts.test_utils.wifi.wifi_performance_test_utils as wifi_utils 33 34PHONE_MUSIC_FILE_DIRECTORY = '/sdcard/Music' 35 36FORCE_SAR_ADB_COMMAND = ('am broadcast -n' 37 'com.google.android.apps.scone/.coex.TestReceiver -a ' 38 'com.google.android.apps.scone.coex.SIMULATE_STATE ') 39 40DEFAULT_DURATION = 5 41DEFAULT_MAX_ERROR_THRESHOLD = 2 42DEFAULT_AGG_MAX_ERROR_THRESHOLD = 2 43FIXED_ATTENUATION = 36 44 45 46class BtSarBaseTest(BaseTestClass): 47 """ Base class for all BT SAR Test classes. 48 49 This class implements functions common to BT SAR test Classes. 50 """ 51 BACKUP_BT_SAR_TABLE_NAME = 'backup_bt_sar_table.csv' 52 53 def __init__(self, controllers): 54 BaseTestClass.__init__(self, controllers) 55 self.power_file_paths = [ 56 '/vendor/etc/bluetooth_power_limits.csv', 57 '/data/vendor/radio/bluetooth_power_limits.csv' 58 ] 59 self.sar_file_name = os.path.basename(self.power_file_paths[0]) 60 self.power_column = 'BluetoothPower' 61 self.REG_DOMAIN_DICT = { 62 ('us', 'ca', 'in'): 'US', 63 ('uk', 'fr', 'es', 'de', 'it', 'ie', 'sg', 'au', 'tw'): 'EU', 64 ('jp', ): 'JP' 65 } 66 67 def setup_class(self): 68 """Initializes common test hardware and parameters. 69 70 This function initializes hardware and compiles parameters that are 71 common to all tests in this class and derived classes. 72 """ 73 super().setup_class() 74 75 self.test_params = self.user_params.get('bt_sar_test_params', {}) 76 if not self.test_params: 77 self.log.warning( 78 'bt_sar_test_params was not found in the config file.') 79 80 self.user_params.update(self.test_params) 81 req_params = ['bt_devices', 'calibration_params'] 82 83 self.unpack_userparams( 84 req_params, 85 country_code='us', 86 duration=DEFAULT_DURATION, 87 custom_sar_path=None, 88 music_files=None, 89 sort_order=None, 90 max_error_threshold=DEFAULT_MAX_ERROR_THRESHOLD, 91 agg_error_threshold=DEFAULT_AGG_MAX_ERROR_THRESHOLD, 92 tpc_threshold=[2, 8], 93 ) 94 95 self.attenuator = self.attenuators[0] 96 self.dut = self.android_devices[0] 97 for key in self.REG_DOMAIN_DICT.keys(): 98 if self.country_code.lower() in key: 99 self.reg_domain = self.REG_DOMAIN_DICT[key] 100 101 self.sar_version_2 = False 102 103 if 'Error' not in self.dut.adb.shell('bluetooth_sar_test -r'): 104 #Flag for SAR version 2 105 self.sar_version_2 = True 106 self.power_column = 'BluetoothEDRPower' 107 self.power_file_paths[0] = os.path.join( 108 os.path.dirname(self.power_file_paths[0]), 109 'bluetooth_power_limits_{}.csv'.format(self.reg_domain)) 110 self.sar_file_name = os.path.basename(self.power_file_paths[0]) 111 112 self.sar_file_path = self.power_file_paths[0] 113 self.atten_min = 0 114 self.atten_max = int(self.attenuator.get_max_atten()) 115 116 # Initializing media controller 117 if self.music_files: 118 music_src = self.music_files[0] 119 music_dest = PHONE_MUSIC_FILE_DIRECTORY 120 success = self.dut.push_system_file(music_src, music_dest) 121 if success: 122 self.music_file = os.path.join(PHONE_MUSIC_FILE_DIRECTORY, 123 os.path.basename(music_src)) 124 # Initialize media_control class 125 self.media = MediaControl(self.dut, self.music_file) 126 127 #Initializing BT device controller 128 if self.bt_devices: 129 attr, idx = self.bt_devices.split(':') 130 self.bt_device_controller = getattr(self, attr)[int(idx)] 131 self.bt_device = bt_factory().generate(self.bt_device_controller) 132 else: 133 self.log.error('No BT devices config is provided!') 134 135 bt_utils.enable_bqr(self.android_devices) 136 137 self.log_path = os.path.join(logging.log_path, 'results') 138 os.makedirs(self.log_path, exist_ok=True) 139 140 # Reading BT SAR table from the phone 141 self.bt_sar_df = self.read_sar_table(self.dut) 142 143 def setup_test(self): 144 super().setup_test() 145 146 # Starting BT on the master 147 self.dut.droid.bluetoothFactoryReset() 148 bt_utils.enable_bluetooth(self.dut.droid, self.dut.ed) 149 150 # Starting BT on the slave 151 self.bt_device.reset() 152 self.bt_device.power_on() 153 154 # Connect master and slave 155 bt_utils.connect_phone_to_headset(self.dut, self.bt_device, 60) 156 157 # Playing music 158 self.media.play() 159 160 # Find and set PL10 level for the DUT 161 self.pl10_atten = self.set_PL10_atten_level(self.dut) 162 self.attenuator.set_atten(self.pl10_atten) 163 164 def teardown_test(self): 165 #Stopping Music 166 if hasattr(self, 'media'): 167 self.media.stop() 168 169 # Stopping BT on slave 170 self.bt_device.reset() 171 self.bt_device.power_off() 172 173 #Stopping BT on master 174 bt_utils.disable_bluetooth(self.dut.droid) 175 176 #Resetting the atten to initial levels 177 self.attenuator.set_atten(self.atten_min) 178 self.log.info('Attenuation set to {} dB'.format(self.atten_min)) 179 180 def teardown_class(self): 181 182 super().teardown_class() 183 self.dut.droid.bluetoothFactoryReset() 184 185 # Stopping BT on slave 186 self.bt_device.reset() 187 self.bt_device.power_off() 188 189 #Stopping BT on master 190 bt_utils.disable_bluetooth(self.dut.droid) 191 192 def save_sar_plot(self, df): 193 """ Saves SAR plot to the path given. 194 195 Args: 196 df: Processed SAR table sweep results 197 """ 198 self.plot.add_line(df.index, 199 df['expected_tx_power'], 200 legend='expected', 201 marker='circle') 202 self.plot.add_line(df.index, 203 df['measured_tx_power'], 204 legend='measured', 205 marker='circle') 206 self.plot.add_line(df.index, 207 df['delta'], 208 legend='delta', 209 marker='circle') 210 211 results_file_path = os.path.join( 212 self.log_path, '{}.html'.format(self.current_test_name)) 213 self.plot.generate_figure() 214 wifi_utils.BokehFigure.save_figures([self.plot], results_file_path) 215 216 def sweep_table(self, 217 client_ad=None, 218 server_ad=None, 219 client_conn_id=None, 220 gatt_server=None, 221 gatt_callback=None, 222 isBLE=False): 223 """Iterates over the BT SAR table and forces signal states. 224 225 Iterates over BT SAR table and forces signal states, 226 measuring RSSI and power level for each state. 227 228 Args: 229 client_ad: the Android device performing the connection. 230 server_ad: the Android device accepting the connection. 231 client_conn_id: the client connection ID. 232 gatt_server: the gatt server 233 gatt_callback: Gatt callback objec 234 isBLE : boolean variable for BLE connection 235 Returns: 236 sar_df : SAR table sweep results in pandas dataframe 237 """ 238 239 sar_df = self.bt_sar_df.copy() 240 sar_df['power_cap'] = -128 241 sar_df['slave_rssi'] = -128 242 sar_df['master_rssi'] = -128 243 sar_df['ble_rssi'] = -128 244 sar_df['pwlv'] = -1 245 246 # Sorts the table 247 if self.sort_order: 248 if self.sort_order.lower() == 'ascending': 249 sar_df = sar_df.sort_values(by=[self.power_column], 250 ascending=True) 251 else: 252 sar_df = sar_df.sort_values(by=[self.power_column], 253 ascending=False) 254 sar_df = sar_df.reset_index(drop=True) 255 256 # Sweeping BT SAR table 257 for scenario in range(sar_df.shape[0]): 258 # Reading BT SAR Scenario from the table 259 read_scenario = sar_df.loc[scenario].to_dict() 260 261 start_time = self.dut.adb.shell('date +%s.%m') 262 time.sleep(1) 263 264 #Setting SAR State 265 self.set_sar_state(self.dut, read_scenario, self.country_code) 266 267 if isBLE: 268 sar_df.loc[scenario, 'power_cap'] = self.get_current_power_cap( 269 self.dut, start_time, type='BLE') 270 271 sar_df.loc[scenario, 272 'ble_rssi'] = run_ble_throughput_and_read_rssi( 273 client_ad, server_ad, client_conn_id, 274 gatt_server, gatt_callback) 275 276 self.log.info('scenario:{}, power_cap:{}, ble_rssi:{}'.format( 277 scenario, sar_df.loc[scenario, 'power_cap'], 278 sar_df.loc[scenario, 'ble_rssi'])) 279 else: 280 sar_df.loc[scenario, 'power_cap'] = self.get_current_power_cap( 281 self.dut, start_time) 282 283 processed_bqr_results = bt_utils.get_bt_metric( 284 self.android_devices, self.duration) 285 sar_df.loc[scenario, 286 'slave_rssi'] = processed_bqr_results['rssi'][ 287 self.bt_device_controller.serial] 288 sar_df.loc[scenario, 289 'master_rssi'] = processed_bqr_results['rssi'][ 290 self.dut.serial] 291 sar_df.loc[scenario, 'pwlv'] = processed_bqr_results['pwlv'][ 292 self.dut.serial] 293 self.log.info( 294 'scenario:{}, power_cap:{}, s_rssi:{}, m_rssi:{}, m_pwlv:{}' 295 .format(scenario, sar_df.loc[scenario, 'power_cap'], 296 sar_df.loc[scenario, 'slave_rssi'], 297 sar_df.loc[scenario, 298 'master_rssi'], sar_df.loc[scenario, 299 'pwlv'])) 300 301 self.log.info('BT SAR Table swept') 302 303 return sar_df 304 305 def process_table(self, sar_df): 306 """Processes the results of sweep_table and computes BT TX power. 307 308 Processes the results of sweep_table and computes BT TX power 309 after factoring in the path loss and FTM offsets. 310 311 Args: 312 sar_df: BT SAR table after the sweep 313 314 Returns: 315 sar_df: processed BT SAR table 316 """ 317 318 sar_df['pathloss'] = self.calibration_params['pathloss'] 319 320 if hasattr(self, 'pl10_atten'): 321 sar_df['atten'] = self.pl10_atten 322 else: 323 sar_df['atten'] = FIXED_ATTENUATION 324 325 # BT SAR Backoff for each scenario 326 if self.sar_version_2: 327 #Reads OTP values from the phone 328 self.otp = bt_utils.read_otp(self.dut) 329 330 #OTP backoff 331 edr_otp = min(0, float(self.otp['EDR']['10'])) 332 bdr_otp = min(0, float(self.otp['BR']['10'])) 333 ble_otp = min(0, float(self.otp['BLE']['10'])) 334 335 # EDR TX Power for PL10 336 edr_tx_power_pl10 = self.calibration_params['target_power']['EDR'][ 337 '10'] - edr_otp 338 339 # BDR TX Power for PL10 340 bdr_tx_power_pl10 = self.calibration_params['target_power']['BDR'][ 341 '10'] - bdr_otp 342 343 # RSSI being measured is BDR 344 offset = bdr_tx_power_pl10 - edr_tx_power_pl10 345 346 # BDR-EDR offset 347 sar_df['offset'] = offset 348 349 # Max TX power permissible 350 sar_df['max_power'] = self.calibration_params['max_power'] 351 352 # Adding a target power column 353 if 'ble_rssi' in sar_df.columns: 354 sar_df['target_power'] = self.calibration_params[ 355 'target_power']['BLE']['10'] - ble_otp 356 else: 357 sar_df['target_power'] = sar_df['pwlv'].astype(str).map( 358 self.calibration_params['target_power']['EDR']) - edr_otp 359 360 #Translates power_cap values to expected TX power level 361 sar_df['cap_tx_power'] = sar_df['power_cap'] / 4.0 362 363 sar_df['expected_tx_power'] = sar_df[[ 364 'cap_tx_power', 'target_power', 'max_power' 365 ]].min(axis=1) 366 367 if hasattr(self, 'pl10_atten'): 368 sar_df['measured_tx_power'] = sar_df['slave_rssi'] + sar_df[ 369 'pathloss'] + self.pl10_atten - offset 370 else: 371 sar_df['measured_tx_power'] = sar_df['ble_rssi'] + sar_df[ 372 'pathloss'] + FIXED_ATTENUATION 373 374 else: 375 376 # Adding a target power column 377 sar_df['target_power'] = sar_df['pwlv'].astype(str).map( 378 self.calibration_params['target_power']['EDR']['10']) 379 380 # Adding a ftm power column 381 sar_df['ftm_power'] = sar_df['pwlv'].astype(str).map( 382 self.calibration_params['ftm_power']['EDR']) 383 sar_df[ 384 'backoff'] = sar_df['target_power'] - sar_df['power_cap'] / 4.0 385 386 sar_df[ 387 'expected_tx_power'] = sar_df['ftm_power'] - sar_df['backoff'] 388 sar_df['measured_tx_power'] = sar_df['slave_rssi'] + sar_df[ 389 'pathloss'] + self.pl10_atten 390 391 sar_df['delta'] = sar_df['expected_tx_power'] - sar_df[ 392 'measured_tx_power'] 393 394 self.log.info('Sweep results processed') 395 396 results_file_path = os.path.join(self.log_path, self.current_test_name) 397 sar_df.to_csv('{}.csv'.format(results_file_path)) 398 self.save_sar_plot(sar_df) 399 400 return sar_df 401 402 def process_results(self, sar_df, type='EDR'): 403 """Determines the test results of the sweep. 404 405 Parses the processed table with computed BT TX power values 406 to return pass or fail. 407 408 Args: 409 sar_df: processed BT SAR table 410 """ 411 412 # checks for errors at particular points in the sweep 413 max_error_result = abs( 414 sar_df['delta']) > self.max_error_threshold[type] 415 if False in max_error_result: 416 asserts.fail('Maximum Error Threshold Exceeded') 417 418 # checks for error accumulation across the sweep 419 if sar_df['delta'].sum() > self.agg_error_threshold[type]: 420 asserts.fail( 421 'Aggregate Error Threshold Exceeded. Error: {} Threshold: {}'. 422 format(sar_df['delta'].sum(), self.agg_error_threshold)) 423 424 else: 425 asserts.explicit_pass('Measured and Expected Power Values in line') 426 427 def set_sar_state(self, ad, signal_dict, country_code='us'): 428 """Sets the SAR state corresponding to the BT SAR signal. 429 430 The SAR state is forced using an adb command that takes 431 device signals as input. 432 433 Args: 434 ad: android_device object. 435 signal_dict: dict of BT SAR signals read from the SAR file. 436 Returns: 437 enforced_state: dict of device signals. 438 """ 439 signal_dict = {k: max(int(v), 0) for (k, v) in signal_dict.items()} 440 signal_dict["Wifi"] = signal_dict['WIFI5Ghz'] 441 signal_dict['WIFI2Ghz'] = 0 if signal_dict['WIFI5Ghz'] else 1 442 443 device_state_dict = { 444 ('Earpiece', 'earpiece'): signal_dict['Head'], 445 ('Wifi', 'wifi'): signal_dict['WIFI5Ghz'], 446 ('Wifi 2.4G', 'wifi_24g'): signal_dict['WIFI2Ghz'], 447 ('Voice', 'voice'): 0, 448 ('Wifi AP', 'wifi_ap'): signal_dict['HotspotVoice'], 449 ('Bluetooth', 'bluetooth'): 1, 450 ('Bluetooth media', 'bt_media'): signal_dict['BTMedia'], 451 ('Radio', 'radio_power'): signal_dict['Cell'], 452 ('Motion', 'motion'): signal_dict['IMU'], 453 ('Bluetooth connected', 'bt_connected'): 1 454 } 455 456 if 'BTHotspot' in signal_dict.keys(): 457 device_state_dict[('Bluetooth tethering', 458 'bt_tethering')] = signal_dict['BTHotspot'] 459 460 enforced_state = {} 461 sar_state_command = FORCE_SAR_ADB_COMMAND 462 for key in device_state_dict: 463 enforced_state[key[0]] = device_state_dict[key] 464 sar_state_command = '{} --ei {} {}'.format(sar_state_command, 465 key[1], 466 device_state_dict[key]) 467 if self.sar_version_2: 468 sar_state_command = '{} --es country_iso "{}"'.format( 469 sar_state_command, country_code.lower()) 470 471 #Forcing the SAR state 472 adb_output = ad.adb.shell(sar_state_command) 473 474 # Checking if command was successfully enforced 475 if 'result=0' in adb_output: 476 self.log.info('Requested BT SAR state successfully enforced.') 477 return enforced_state 478 else: 479 self.log.error("Couldn't force BT SAR state.") 480 481 def parse_bt_logs(self, ad, begin_time, regex=''): 482 """Returns bt software stats by parsing logcat since begin_time. 483 484 The quantity to be fetched is dictated by the regex provided. 485 486 Args: 487 ad: android_device object. 488 begin_time: time stamp to start the logcat parsing. 489 regex: regex for fetching the required BT software stats. 490 491 Returns: 492 stat: the desired BT stat. 493 """ 494 # Waiting for logcat to update 495 time.sleep(1) 496 bt_adb_log = ad.adb.logcat('-b all -t %s' % begin_time) 497 for line in bt_adb_log.splitlines(): 498 if re.findall(regex, line): 499 stat = re.findall(regex, line)[0] 500 return stat 501 502 def set_country_code(self, ad, cc): 503 """Sets the SAR regulatory domain as per given country code 504 505 The SAR regulatory domain is forced using an adb command that takes 506 country code as input. 507 508 Args: 509 ad: android_device object. 510 cc: country code 511 """ 512 513 ad.adb.shell("{} --es country_iso {}".format(FORCE_SAR_ADB_COMMAND, 514 cc)) 515 self.log.info("Country Code set to {}".format(cc)) 516 517 def get_country_code(self, ad, begin_time): 518 """Returns the enforced regulatory domain since begin_time 519 520 Returns enforced regulatory domain since begin_time by parsing logcat. 521 Function should follow a function call to set a country code 522 523 Args: 524 ad : android_device obj 525 begin_time: time stamp to start 526 527 Returns: 528 read enforced regulatory domain 529 """ 530 531 reg_domain_regex = "updateRegulatoryDomain:\s+(\S+)" 532 reg_domain = self.parse_bt_logs(ad, begin_time, reg_domain_regex) 533 return reg_domain 534 535 def get_current_power_cap(self, ad, begin_time, type='EDR'): 536 """ Returns the enforced software EDR power cap since begin_time. 537 538 Returns the enforced EDR power cap since begin_time by parsing logcat. 539 Function should follow a function call that forces a SAR state 540 541 Args: 542 ad: android_device obj. 543 begin_time: time stamp to start. 544 545 Returns: 546 read enforced power cap 547 """ 548 power_cap_regex_dict = { 549 'BDR': [ 550 'Bluetooth powers: BR:\s+(\d+), EDR:\s+\d+', 551 'Bluetooth Tx Power Cap\s+(\d+)' 552 ], 553 'EDR': [ 554 'Bluetooth powers: BR:\s+\d+, EDR:\s+(\d+)', 555 'Bluetooth Tx Power Cap\s+(\d+)' 556 ], 557 'BLE': [ 558 'Bluetooth powers: BR:\s+\d+, EDR:\s+\d+, BLE:\s+(\d+)', 559 'Bluetooth Tx Power Cap\s+(\d+)' 560 ] 561 } 562 563 power_cap_regex_list = power_cap_regex_dict[type] 564 565 for power_cap_regex in power_cap_regex_list: 566 power_cap = self.parse_bt_logs(ad, begin_time, power_cap_regex) 567 if power_cap: 568 return int(power_cap) 569 570 raise ValueError('Failed to get TX power cap') 571 572 def get_current_device_state(self, ad, begin_time): 573 """ Returns the device state of the android dut since begin_time. 574 575 Returns the device state of the android dut by parsing logcat since 576 begin_time. Function should follow a function call that forces 577 a SAR state. 578 579 Args: 580 ad: android_device obj. 581 begin_time: time stamp to start. 582 583 Returns: 584 device_state: device state of the android device. 585 """ 586 587 device_state_regex = 'updateDeviceState: DeviceState: ([\s*\S+\s]+)' 588 time.sleep(2) 589 device_state = self.parse_bt_logs(ad, begin_time, device_state_regex) 590 if device_state: 591 return device_state 592 593 raise ValueError("Couldn't fetch device state") 594 595 def read_sar_table(self, ad): 596 """Extracts the BT SAR table from the phone. 597 598 Extracts the BT SAR table from the phone into the android device 599 log path directory. 600 601 Args: 602 ad: android_device object. 603 604 Returns: 605 df : BT SAR table (as pandas DataFrame). 606 """ 607 output_path = os.path.join(ad.device_log_path, self.sar_file_name) 608 ad.adb.pull('{} {}'.format(self.sar_file_path, output_path)) 609 df = pd.read_csv(os.path.join(ad.device_log_path, self.sar_file_name)) 610 self.log.info('BT SAR table read from the phone') 611 return df 612 613 def push_table(self, ad, src_path): 614 """Pushes a BT SAR table to the phone. 615 616 Pushes a BT SAR table to the android device and reboots the device. 617 Also creates a backup file if backup flag is True. 618 619 Args: 620 ad: android_device object. 621 src_path: path to the BT SAR table. 622 """ 623 #Copying the to-be-pushed file for logging 624 if os.path.dirname(src_path) != ad.device_log_path: 625 job.run('cp {} {}'.format(src_path, ad.device_log_path)) 626 627 #Pushing the file provided in the config 628 ad.push_system_file(src_path, self.sar_file_path) 629 self.log.info('BT SAR table pushed') 630 ad.reboot() 631 self.bt_sar_df = self.read_sar_table(self.dut) 632 633 def set_PL10_atten_level(self, ad): 634 """Finds the attenuation level at which the phone is at PL10 635 636 Finds PL10 attenuation level by sweeping the attenuation range. 637 If the power level is not achieved during sweep, 638 returns the max atten level 639 640 Args: 641 ad: android object class 642 Returns: 643 atten : attenuation level when the phone is at PL10 644 """ 645 BT_SAR_ATTEN_STEP = 3 646 647 for atten in range(self.atten_min, self.atten_max, BT_SAR_ATTEN_STEP): 648 self.attenuator.set_atten(atten) 649 # Sleep required for BQR to reflect the change in parameters 650 time.sleep(2) 651 metrics = bt_utils.get_bt_metric(ad) 652 if metrics['pwlv'][ad.serial] == 10: 653 self.log.info('PL10 located at {}'.format(atten + 654 BT_SAR_ATTEN_STEP)) 655 return atten + BT_SAR_ATTEN_STEP 656 657 self.log.warn( 658 "PL10 couldn't be located in the given attenuation range") 659