1#!/usr/bin/env python3 2# 3# Copyright 2018 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the 'License'); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an 'AS IS' BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16import json 17import logging 18import math 19import os 20import re 21import time 22 23import acts.controllers.iperf_server as ipf 24from acts import asserts 25from acts import base_test 26from acts import utils 27from acts.controllers.monsoon_lib.api.common import MonsoonError 28from acts.controllers.monsoon_lib.api.common import PassthroughStates 29from acts.metrics.loggers.blackbox import BlackboxMetricLogger 30from acts.test_utils.power.loggers.power_metric_logger import PowerMetricLogger 31from acts.test_utils.power import plot_utils 32from acts.test_utils.wifi import wifi_test_utils as wutils 33 34RESET_BATTERY_STATS = 'dumpsys batterystats --reset' 35IPERF_TIMEOUT = 180 36THRESHOLD_TOLERANCE_DEFAULT = 0.2 37GET_FROM_PHONE = 'get_from_dut' 38GET_FROM_AP = 'get_from_ap' 39PHONE_BATTERY_VOLTAGE_DEFAULT = 4.2 40MONSOON_MAX_CURRENT = 8.0 41MONSOON_RETRY_INTERVAL = 300 42DEFAULT_MONSOON_FREQUENCY = 500 43MEASUREMENT_RETRY_COUNT = 3 44RECOVER_MONSOON_RETRY_COUNT = 3 45MIN_PERCENT_SAMPLE = 95 46ENABLED_MODULATED_DTIM = 'gEnableModulatedDTIM=' 47MAX_MODULATED_DTIM = 'gMaxLIModulatedDTIM=' 48TEMP_FILE = '/sdcard/Download/tmp.log' 49 50 51class ObjNew(): 52 """Create a random obj with unknown attributes and value. 53 54 """ 55 def __init__(self, **kwargs): 56 self.__dict__.update(kwargs) 57 58 def __contains__(self, item): 59 """Function to check if one attribute is contained in the object. 60 61 Args: 62 item: the item to check 63 Return: 64 True/False 65 """ 66 return hasattr(self, item) 67 68 69class PowerBaseTest(base_test.BaseTestClass): 70 """Base class for all wireless power related tests. 71 72 """ 73 def __init__(self, controllers): 74 75 base_test.BaseTestClass.__init__(self, controllers) 76 self.power_result = BlackboxMetricLogger.for_test_case( 77 metric_name='avg_power') 78 self.start_meas_time = 0 79 self.rockbottom_script = None 80 self.img_name = '' 81 self.dut = None 82 self.power_logger = PowerMetricLogger.for_test_case() 83 self.avg_current = 0 84 85 @property 86 def final_test(self): 87 return len( 88 self.results.requested 89 ) > 0 and self.current_test_name == self.results.requested[-1] 90 91 @property 92 def display_name_test_suite(self): 93 return getattr(self, '_display_name_test_suite', 94 self.__class__.__name__) 95 96 @display_name_test_suite.setter 97 def display_name_test_suite(self, name): 98 self._display_name_test_suite = name 99 100 @property 101 def display_name_test_case(self): 102 default_test_name = getattr(self, 'test_name', None) 103 return getattr(self, '_display_name_test_case', default_test_name) 104 105 @display_name_test_case.setter 106 def display_name_test_case(self, name): 107 self._display_name_test_case = name 108 109 def setup_class(self): 110 111 self.log = logging.getLogger() 112 self.tests = self.get_existing_test_names() 113 114 # Obtain test parameters from user_params 115 TEST_PARAMS = self.TAG + '_params' 116 self.test_params = self.user_params.get(TEST_PARAMS, {}) 117 if not self.test_params: 118 self.log.warning(TEST_PARAMS + ' was not found in the user ' 119 'parameters defined in the config file.') 120 121 # Override user_param values with test parameters 122 self.user_params.update(self.test_params) 123 124 # Unpack user_params with default values. All the usages of user_params 125 # as self attributes need to be included either as a required parameter 126 # or as a parameter with a default value. 127 req_params = ['custom_files', 'mon_duration'] 128 self.unpack_userparams(req_params, 129 mon_freq=DEFAULT_MONSOON_FREQUENCY, 130 mon_offset=0, 131 bug_report=False, 132 extra_wait=None, 133 iperf_duration=None, 134 pass_fail_tolerance=THRESHOLD_TOLERANCE_DEFAULT, 135 mon_voltage=PHONE_BATTERY_VOLTAGE_DEFAULT) 136 137 # Setup the must have controllers, phone and monsoon 138 self.dut = self.android_devices[0] 139 self.mon_data_path = os.path.join(self.log_path, 'Monsoon') 140 os.makedirs(self.mon_data_path, exist_ok=True) 141 self.mon = self.monsoons[0] 142 self.mon.set_max_current(8.0) 143 self.mon.set_voltage(self.mon_voltage) 144 self.mon.attach_device(self.dut) 145 146 # Unpack the thresholds file or fail class setup if it can't be found 147 for file in self.custom_files: 148 if 'pass_fail_threshold_' + self.dut.model in file: 149 self.threshold_file = file 150 break 151 else: 152 raise RuntimeError('Required test pass/fail threshold file is ' 153 'missing') 154 155 # Unpack the rockbottom script or fail class setup if it can't be found 156 for file in self.custom_files: 157 if 'rockbottom_' + self.dut.model in file: 158 self.rockbottom_script = file 159 break 160 else: 161 raise RuntimeError('Required rockbottom script is missing.') 162 163 # Unpack optional custom files 164 for file in self.custom_files: 165 if 'attenuator_setting' in file: 166 self.attenuation_file = file 167 elif 'network_config' in file: 168 self.network_file = file 169 170 if hasattr(self, 'attenuators'): 171 self.num_atten = self.attenuators[0].instrument.num_atten 172 self.atten_level = self.unpack_custom_file(self.attenuation_file) 173 self.threshold = self.unpack_custom_file(self.threshold_file) 174 self.mon_info = self.create_monsoon_info() 175 176 # Sync device time, timezone and country code 177 utils.require_sl4a((self.dut, )) 178 utils.sync_device_time(self.dut) 179 wutils.set_wifi_country_code(self.dut, 'US') 180 181 screen_on_img = self.user_params.get('screen_on_img', []) 182 if screen_on_img: 183 img_src = screen_on_img[0] 184 img_dest = '/sdcard/Pictures/' 185 success = self.dut.push_system_file(img_src, img_dest) 186 if success: 187 self.img_name = os.path.basename(img_src) 188 189 def setup_test(self): 190 """Set up test specific parameters or configs. 191 192 """ 193 # Reset the power consumption to 0 before each tests 194 self.power_result.metric_value = 0 195 # Set the device into rockbottom state 196 self.dut_rockbottom() 197 wutils.reset_wifi(self.dut) 198 wutils.wifi_toggle_state(self.dut, False) 199 200 # Wait for extra time if needed for the first test 201 if self.extra_wait: 202 self.more_wait_first_test() 203 204 def teardown_test(self): 205 """Tear down necessary objects after test case is finished. 206 207 """ 208 self.log.info('Tearing down the test case') 209 self.mon.usb('on') 210 self.power_logger.set_avg_power(self.power_result.metric_value) 211 self.power_logger.set_avg_current(self.avg_current) 212 self.power_logger.set_voltage(self.mon_voltage) 213 self.power_logger.set_testbed(self.testbed_name) 214 215 # If a threshold was provided, log it in the power proto 216 if self.threshold and self.test_name in self.threshold: 217 avg_current_threshold = self.threshold[self.test_name] 218 self.power_logger.set_avg_current_threshold(avg_current_threshold) 219 220 build_id = self.dut.build_info.get('build_id', '') 221 incr_build_id = self.dut.build_info.get('incremental_build_id', '') 222 branch = self.user_params.get('branch', '') 223 target = self.dut.device_info.get('flavor', '') 224 225 self.power_logger.set_branch(branch) 226 self.power_logger.set_build_id(build_id) 227 self.power_logger.set_incremental_build_id(incr_build_id) 228 self.power_logger.set_target(target) 229 230 # Log the display name of the test suite and test case 231 if self.display_name_test_suite: 232 name = self.display_name_test_suite 233 self.power_logger.set_test_suite_display_name(name) 234 235 if self.display_name_test_case: 236 name = self.display_name_test_case 237 self.power_logger.set_test_case_display_name(name) 238 239 # Take Bugreport 240 if self.bug_report: 241 begin_time = utils.get_current_epoch_time() 242 self.dut.take_bug_report(self.test_name, begin_time) 243 244 # Allow the device to cooldown before executing the next test 245 cooldown = self.test_params.get('cooldown', None) 246 if cooldown and not self.final_test: 247 time.sleep(cooldown) 248 249 def teardown_class(self): 250 """Clean up the test class after tests finish running 251 252 """ 253 self.log.info('Tearing down the test class') 254 if hasattr(self, 'monsoons'): 255 self.monsoons[0].usb('on') 256 257 def on_fail(self, test_name, begin_time): 258 self.power_logger.set_pass_fail_status('FAIL') 259 260 def on_pass(self, test_name, begin_time): 261 self.power_logger.set_pass_fail_status('PASS') 262 263 def dut_rockbottom(self): 264 """Set the dut to rockbottom state 265 266 """ 267 # The rockbottom script might include a device reboot, so it is 268 # necessary to stop SL4A during its execution. 269 self.dut.stop_services() 270 self.log.info('Executing rockbottom script for ' + self.dut.model) 271 os.chmod(self.rockbottom_script, 0o777) 272 os.system('{} {} {}'.format(self.rockbottom_script, self.dut.serial, 273 self.img_name)) 274 # Make sure the DUT is in root mode after coming back 275 self.dut.root_adb() 276 # Restart SL4A 277 self.dut.start_services() 278 279 def unpack_custom_file(self, file, test_specific=True): 280 """Unpack the pass_fail_thresholds from a common file. 281 282 Args: 283 file: the common file containing pass fail threshold. 284 test_specific: if True, returns the JSON element within the file 285 that starts with the test class name. 286 """ 287 with open(file, 'r') as f: 288 params = json.load(f) 289 if test_specific: 290 try: 291 return params[self.TAG] 292 except KeyError: 293 pass 294 else: 295 return params 296 297 def decode_test_configs(self, attrs, indices): 298 """Decode the test config/params from test name. 299 300 Remove redundant function calls when tests are similar. 301 Args: 302 attrs: a list of the attrs of the test config obj 303 indices: a list of the location indices of keyword in the test name. 304 """ 305 # Decode test parameters for the current test 306 test_params = self.current_test_name.split('_') 307 values = [test_params[x] for x in indices] 308 config_dict = dict(zip(attrs, values)) 309 self.test_configs = ObjNew(**config_dict) 310 311 def more_wait_first_test(self): 312 # For the first test, increase the offset for longer wait time 313 if self.current_test_name == self.tests[0]: 314 self.mon_info.offset = self.mon_offset + self.extra_wait 315 else: 316 self.mon_info.offset = self.mon_offset 317 318 def set_attenuation(self, atten_list): 319 """Function to set the attenuator to desired attenuations. 320 321 Args: 322 atten_list: list containing the attenuation for each attenuator. 323 """ 324 if len(atten_list) != self.num_atten: 325 raise Exception('List given does not have the correct length') 326 for i in range(self.num_atten): 327 self.attenuators[i].set_atten(atten_list[i]) 328 329 def measure_power_and_validate(self): 330 """The actual test flow and result processing and validate. 331 332 """ 333 result = self.collect_power_data() 334 self.pass_fail_check(result.average_current) 335 336 def collect_power_data(self): 337 """Measure power, plot and take log if needed. 338 339 Returns: 340 A MonsoonResult object. 341 """ 342 # Collecting current measurement data and plot 343 result = self.monsoon_data_collect_save() 344 self.power_result.metric_value = (result.average_current * 345 self.mon_voltage) 346 self.avg_current = result.average_current 347 348 plot_utils.monsoon_data_plot(self.mon_info, result) 349 plot_utils.monsoon_histogram_plot(self.mon_info, result) 350 351 return result 352 353 def pass_fail_check(self, average_current=None): 354 """Check the test result and decide if it passed or failed. 355 356 The threshold is provided in the config file. In this class, result is 357 current in mA. 358 """ 359 360 if not self.threshold or self.test_name not in self.threshold: 361 self.log.error("No threshold is provided for the test '{}' in " 362 "the configuration file.".format(self.test_name)) 363 return 364 365 current_threshold = self.threshold[self.test_name] 366 if average_current: 367 asserts.assert_true( 368 abs(average_current - current_threshold) / current_threshold < 369 self.pass_fail_tolerance, 370 'Measured average current in [{}]: {:.2f}mA, which is ' 371 'out of the acceptable range {:.2f}±{:.2f}mA'.format( 372 self.test_name, average_current, current_threshold, 373 self.pass_fail_tolerance * current_threshold)) 374 asserts.explicit_pass( 375 'Measurement finished for [{}]: {:.2f}mA, which is ' 376 'within the acceptable range {:.2f}±{:.2f}'.format( 377 self.test_name, average_current, current_threshold, 378 self.pass_fail_tolerance * current_threshold)) 379 else: 380 asserts.fail( 381 'Something happened, measurement is not complete, test failed') 382 383 def create_monsoon_info(self): 384 """Creates the config dictionary for monsoon 385 386 Returns: 387 mon_info: Dictionary with the monsoon packet config 388 """ 389 mon_info = ObjNew(dut=self.mon, 390 freq=self.mon_freq, 391 duration=self.mon_duration, 392 offset=self.mon_offset, 393 data_path=self.mon_data_path) 394 return mon_info 395 396 def monsoon_recover(self): 397 """Test loop to wait for monsoon recover from unexpected error. 398 399 Wait for a certain time duration, then quit.0 400 Args: 401 mon: monsoon object 402 Returns: 403 True/False 404 """ 405 try: 406 self.mon.reconnect_monsoon() 407 time.sleep(2) 408 self.mon.usb('on') 409 logging.info('Monsoon recovered from unexpected error') 410 time.sleep(2) 411 return True 412 except MonsoonError: 413 try: 414 self.log.info(self.mon_info.dut._mon.ser.in_waiting) 415 except AttributeError: 416 # This attribute does not exist for HVPMs. 417 pass 418 logging.warning('Unable to recover monsoon from unexpected error') 419 return False 420 421 def monsoon_data_collect_save(self): 422 """Current measurement and save the log file. 423 424 Collect current data using Monsoon box and return the path of the 425 log file. Take bug report if requested. 426 427 Returns: 428 A MonsoonResult object containing information about the gathered 429 data. 430 """ 431 432 tag = '{}_{}_{}'.format(self.test_name, self.dut.model, 433 self.dut.build_info['build_id']) 434 435 data_path = os.path.join(self.mon_info.data_path, '{}.txt'.format(tag)) 436 437 # If the specified Monsoon data file already exists (e.g., multiple 438 # measurements in a single test), write the results to a new file with 439 # the postfix "_#". 440 if os.path.exists(data_path): 441 highest_value = 1 442 for filename in os.listdir(os.path.dirname(data_path)): 443 match = re.match(r'{}_(\d+).txt'.format(tag), filename) 444 if match: 445 highest_value = max(highest_value, int(match.group(1))) 446 447 data_path = os.path.join(self.mon_info.data_path, 448 '%s_%s.txt' % (tag, highest_value + 1)) 449 450 total_expected_samples = self.mon_info.freq * self.mon_info.duration 451 min_required_samples = (total_expected_samples * MIN_PERCENT_SAMPLE / 452 100) 453 for retry_measure in range(1, MEASUREMENT_RETRY_COUNT + 1): 454 # Resets the battery status right before the test starts. 455 self.dut.adb.shell(RESET_BATTERY_STATS) 456 self.log.info('Starting power measurement. Duration: {}s. Offset: ' 457 '{}s. Voltage: {} V. attempt #{}.'.format( 458 self.mon_info.duration, self.mon_info.offset, 459 self.mon_voltage, retry_measure)) 460 # Start the power measurement using monsoon. 461 self.mon_info.dut.usb(PassthroughStates.AUTO) 462 result = self.mon_info.dut.measure_power( 463 self.mon_info.duration, 464 measure_after_seconds=self.mon_info.offset, 465 hz=self.mon_info.freq, 466 output_path=data_path) 467 self.mon_info.dut.usb(PassthroughStates.ON) 468 469 self.log.debug(result) 470 self.log.debug('Samples Gathered: %s. Max Samples: %s ' 471 'Min Samples Required: %s.' % 472 (result.num_samples, total_expected_samples, 473 min_required_samples)) 474 475 if result.num_samples <= min_required_samples: 476 retry_measure += 1 477 self.log.warning( 478 'More than {} percent of samples are missing due to ' 479 'dropped packets. Need to remeasure.'.format( 480 100 - MIN_PERCENT_SAMPLE)) 481 continue 482 483 self.log.info('Measurement successful after {} attempt(s).'.format( 484 retry_measure)) 485 return result 486 else: 487 try: 488 self.log.info(self.mon_info.dut._mon.ser.in_waiting) 489 except AttributeError: 490 # This attribute does not exist for HVPMs. 491 pass 492 self.log.error( 493 'Unable to gather enough samples to run validation.') 494 495 def process_iperf_results(self): 496 """Get the iperf results and process. 497 498 Returns: 499 throughput: the average throughput during tests. 500 """ 501 # Get IPERF results and add this to the plot title 502 RESULTS_DESTINATION = os.path.join( 503 self.iperf_server.log_path, 504 'iperf_client_output_{}.log'.format(self.current_test_name)) 505 self.dut.pull_files(TEMP_FILE, RESULTS_DESTINATION) 506 # Calculate the average throughput 507 if self.use_client_output: 508 iperf_file = RESULTS_DESTINATION 509 else: 510 iperf_file = self.iperf_server.log_files[-1] 511 try: 512 iperf_result = ipf.IPerfResult(iperf_file) 513 514 # Compute the throughput in Mbit/s 515 throughput = (math.fsum( 516 iperf_result.instantaneous_rates[self.start_meas_time:-1] 517 ) / len(iperf_result.instantaneous_rates[self.start_meas_time:-1]) 518 ) * 8 * (1.024**2) 519 520 self.log.info('The average throughput is {}'.format(throughput)) 521 except ValueError: 522 self.log.warning('Cannot get iperf result. Setting to 0') 523 throughput = 0 524 return throughput 525