1#!/usr/bin/env python3
2#
3#   Copyright 2018 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16import json
17import logging
18import math
19import os
20import re
21import time
22
23import acts.controllers.iperf_server as ipf
24from acts import asserts
25from acts import base_test
26from acts import utils
27from acts.controllers.monsoon_lib.api.common import MonsoonError
28from acts.controllers.monsoon_lib.api.common import PassthroughStates
29from acts.metrics.loggers.blackbox import BlackboxMetricLogger
30from acts.test_utils.power.loggers.power_metric_logger import PowerMetricLogger
31from acts.test_utils.power import plot_utils
32from acts.test_utils.wifi import wifi_test_utils as wutils
33
34RESET_BATTERY_STATS = 'dumpsys batterystats --reset'
35IPERF_TIMEOUT = 180
36THRESHOLD_TOLERANCE_DEFAULT = 0.2
37GET_FROM_PHONE = 'get_from_dut'
38GET_FROM_AP = 'get_from_ap'
39PHONE_BATTERY_VOLTAGE_DEFAULT = 4.2
40MONSOON_MAX_CURRENT = 8.0
41MONSOON_RETRY_INTERVAL = 300
42DEFAULT_MONSOON_FREQUENCY = 500
43MEASUREMENT_RETRY_COUNT = 3
44RECOVER_MONSOON_RETRY_COUNT = 3
45MIN_PERCENT_SAMPLE = 95
46ENABLED_MODULATED_DTIM = 'gEnableModulatedDTIM='
47MAX_MODULATED_DTIM = 'gMaxLIModulatedDTIM='
48TEMP_FILE = '/sdcard/Download/tmp.log'
49
50
51class ObjNew():
52    """Create a random obj with unknown attributes and value.
53
54    """
55    def __init__(self, **kwargs):
56        self.__dict__.update(kwargs)
57
58    def __contains__(self, item):
59        """Function to check if one attribute is contained in the object.
60
61        Args:
62            item: the item to check
63        Return:
64            True/False
65        """
66        return hasattr(self, item)
67
68
69class PowerBaseTest(base_test.BaseTestClass):
70    """Base class for all wireless power related tests.
71
72    """
73    def __init__(self, controllers):
74
75        base_test.BaseTestClass.__init__(self, controllers)
76        self.power_result = BlackboxMetricLogger.for_test_case(
77            metric_name='avg_power')
78        self.start_meas_time = 0
79        self.rockbottom_script = None
80        self.img_name = ''
81        self.dut = None
82        self.power_logger = PowerMetricLogger.for_test_case()
83        self.avg_current = 0
84
85    @property
86    def final_test(self):
87        return len(
88            self.results.requested
89        ) > 0 and self.current_test_name == self.results.requested[-1]
90
91    @property
92    def display_name_test_suite(self):
93        return getattr(self, '_display_name_test_suite',
94                       self.__class__.__name__)
95
96    @display_name_test_suite.setter
97    def display_name_test_suite(self, name):
98        self._display_name_test_suite = name
99
100    @property
101    def display_name_test_case(self):
102        default_test_name = getattr(self, 'test_name', None)
103        return getattr(self, '_display_name_test_case', default_test_name)
104
105    @display_name_test_case.setter
106    def display_name_test_case(self, name):
107        self._display_name_test_case = name
108
109    def setup_class(self):
110
111        self.log = logging.getLogger()
112        self.tests = self.get_existing_test_names()
113
114        # Obtain test parameters from user_params
115        TEST_PARAMS = self.TAG + '_params'
116        self.test_params = self.user_params.get(TEST_PARAMS, {})
117        if not self.test_params:
118            self.log.warning(TEST_PARAMS + ' was not found in the user '
119                             'parameters defined in the config file.')
120
121        # Override user_param values with test parameters
122        self.user_params.update(self.test_params)
123
124        # Unpack user_params with default values. All the usages of user_params
125        # as self attributes need to be included either as a required parameter
126        # or as a parameter with a default value.
127        req_params = ['custom_files', 'mon_duration']
128        self.unpack_userparams(req_params,
129                               mon_freq=DEFAULT_MONSOON_FREQUENCY,
130                               mon_offset=0,
131                               bug_report=False,
132                               extra_wait=None,
133                               iperf_duration=None,
134                               pass_fail_tolerance=THRESHOLD_TOLERANCE_DEFAULT,
135                               mon_voltage=PHONE_BATTERY_VOLTAGE_DEFAULT)
136
137        # Setup the must have controllers, phone and monsoon
138        self.dut = self.android_devices[0]
139        self.mon_data_path = os.path.join(self.log_path, 'Monsoon')
140        os.makedirs(self.mon_data_path, exist_ok=True)
141        self.mon = self.monsoons[0]
142        self.mon.set_max_current(8.0)
143        self.mon.set_voltage(self.mon_voltage)
144        self.mon.attach_device(self.dut)
145
146        # Unpack the thresholds file or fail class setup if it can't be found
147        for file in self.custom_files:
148            if 'pass_fail_threshold_' + self.dut.model in file:
149                self.threshold_file = file
150                break
151        else:
152            raise RuntimeError('Required test pass/fail threshold file is '
153                               'missing')
154
155        # Unpack the rockbottom script or fail class setup if it can't be found
156        for file in self.custom_files:
157            if 'rockbottom_' + self.dut.model in file:
158                self.rockbottom_script = file
159                break
160        else:
161            raise RuntimeError('Required rockbottom script is missing.')
162
163        # Unpack optional custom files
164        for file in self.custom_files:
165            if 'attenuator_setting' in file:
166                self.attenuation_file = file
167            elif 'network_config' in file:
168                self.network_file = file
169
170        if hasattr(self, 'attenuators'):
171            self.num_atten = self.attenuators[0].instrument.num_atten
172            self.atten_level = self.unpack_custom_file(self.attenuation_file)
173        self.threshold = self.unpack_custom_file(self.threshold_file)
174        self.mon_info = self.create_monsoon_info()
175
176        # Sync device time, timezone and country code
177        utils.require_sl4a((self.dut, ))
178        utils.sync_device_time(self.dut)
179        wutils.set_wifi_country_code(self.dut, 'US')
180
181        screen_on_img = self.user_params.get('screen_on_img', [])
182        if screen_on_img:
183            img_src = screen_on_img[0]
184            img_dest = '/sdcard/Pictures/'
185            success = self.dut.push_system_file(img_src, img_dest)
186            if success:
187                self.img_name = os.path.basename(img_src)
188
189    def setup_test(self):
190        """Set up test specific parameters or configs.
191
192        """
193        # Reset the power consumption to 0 before each tests
194        self.power_result.metric_value = 0
195        # Set the device into rockbottom state
196        self.dut_rockbottom()
197        wutils.reset_wifi(self.dut)
198        wutils.wifi_toggle_state(self.dut, False)
199
200        # Wait for extra time if needed for the first test
201        if self.extra_wait:
202            self.more_wait_first_test()
203
204    def teardown_test(self):
205        """Tear down necessary objects after test case is finished.
206
207        """
208        self.log.info('Tearing down the test case')
209        self.mon.usb('on')
210        self.power_logger.set_avg_power(self.power_result.metric_value)
211        self.power_logger.set_avg_current(self.avg_current)
212        self.power_logger.set_voltage(self.mon_voltage)
213        self.power_logger.set_testbed(self.testbed_name)
214
215        # If a threshold was provided, log it in the power proto
216        if self.threshold and self.test_name in self.threshold:
217            avg_current_threshold = self.threshold[self.test_name]
218            self.power_logger.set_avg_current_threshold(avg_current_threshold)
219
220        build_id = self.dut.build_info.get('build_id', '')
221        incr_build_id = self.dut.build_info.get('incremental_build_id', '')
222        branch = self.user_params.get('branch', '')
223        target = self.dut.device_info.get('flavor', '')
224
225        self.power_logger.set_branch(branch)
226        self.power_logger.set_build_id(build_id)
227        self.power_logger.set_incremental_build_id(incr_build_id)
228        self.power_logger.set_target(target)
229
230        # Log the display name of the test suite and test case
231        if self.display_name_test_suite:
232            name = self.display_name_test_suite
233            self.power_logger.set_test_suite_display_name(name)
234
235        if self.display_name_test_case:
236            name = self.display_name_test_case
237            self.power_logger.set_test_case_display_name(name)
238
239        # Take Bugreport
240        if self.bug_report:
241            begin_time = utils.get_current_epoch_time()
242            self.dut.take_bug_report(self.test_name, begin_time)
243
244        # Allow the device to cooldown before executing the next test
245        cooldown = self.test_params.get('cooldown', None)
246        if cooldown and not self.final_test:
247            time.sleep(cooldown)
248
249    def teardown_class(self):
250        """Clean up the test class after tests finish running
251
252        """
253        self.log.info('Tearing down the test class')
254        if hasattr(self, 'monsoons'):
255            self.monsoons[0].usb('on')
256
257    def on_fail(self, test_name, begin_time):
258        self.power_logger.set_pass_fail_status('FAIL')
259
260    def on_pass(self, test_name, begin_time):
261        self.power_logger.set_pass_fail_status('PASS')
262
263    def dut_rockbottom(self):
264        """Set the dut to rockbottom state
265
266        """
267        # The rockbottom script might include a device reboot, so it is
268        # necessary to stop SL4A during its execution.
269        self.dut.stop_services()
270        self.log.info('Executing rockbottom script for ' + self.dut.model)
271        os.chmod(self.rockbottom_script, 0o777)
272        os.system('{} {} {}'.format(self.rockbottom_script, self.dut.serial,
273                                    self.img_name))
274        # Make sure the DUT is in root mode after coming back
275        self.dut.root_adb()
276        # Restart SL4A
277        self.dut.start_services()
278
279    def unpack_custom_file(self, file, test_specific=True):
280        """Unpack the pass_fail_thresholds from a common file.
281
282        Args:
283            file: the common file containing pass fail threshold.
284            test_specific: if True, returns the JSON element within the file
285                that starts with the test class name.
286        """
287        with open(file, 'r') as f:
288            params = json.load(f)
289        if test_specific:
290            try:
291                return params[self.TAG]
292            except KeyError:
293                pass
294        else:
295            return params
296
297    def decode_test_configs(self, attrs, indices):
298        """Decode the test config/params from test name.
299
300        Remove redundant function calls when tests are similar.
301        Args:
302            attrs: a list of the attrs of the test config obj
303            indices: a list of the location indices of keyword in the test name.
304        """
305        # Decode test parameters for the current test
306        test_params = self.current_test_name.split('_')
307        values = [test_params[x] for x in indices]
308        config_dict = dict(zip(attrs, values))
309        self.test_configs = ObjNew(**config_dict)
310
311    def more_wait_first_test(self):
312        # For the first test, increase the offset for longer wait time
313        if self.current_test_name == self.tests[0]:
314            self.mon_info.offset = self.mon_offset + self.extra_wait
315        else:
316            self.mon_info.offset = self.mon_offset
317
318    def set_attenuation(self, atten_list):
319        """Function to set the attenuator to desired attenuations.
320
321        Args:
322            atten_list: list containing the attenuation for each attenuator.
323        """
324        if len(atten_list) != self.num_atten:
325            raise Exception('List given does not have the correct length')
326        for i in range(self.num_atten):
327            self.attenuators[i].set_atten(atten_list[i])
328
329    def measure_power_and_validate(self):
330        """The actual test flow and result processing and validate.
331
332        """
333        result = self.collect_power_data()
334        self.pass_fail_check(result.average_current)
335
336    def collect_power_data(self):
337        """Measure power, plot and take log if needed.
338
339        Returns:
340            A MonsoonResult object.
341        """
342        # Collecting current measurement data and plot
343        result = self.monsoon_data_collect_save()
344        self.power_result.metric_value = (result.average_current *
345                                          self.mon_voltage)
346        self.avg_current = result.average_current
347
348        plot_utils.monsoon_data_plot(self.mon_info, result)
349        plot_utils.monsoon_histogram_plot(self.mon_info, result)
350
351        return result
352
353    def pass_fail_check(self, average_current=None):
354        """Check the test result and decide if it passed or failed.
355
356        The threshold is provided in the config file. In this class, result is
357        current in mA.
358        """
359
360        if not self.threshold or self.test_name not in self.threshold:
361            self.log.error("No threshold is provided for the test '{}' in "
362                           "the configuration file.".format(self.test_name))
363            return
364
365        current_threshold = self.threshold[self.test_name]
366        if average_current:
367            asserts.assert_true(
368                abs(average_current - current_threshold) / current_threshold <
369                self.pass_fail_tolerance,
370                'Measured average current in [{}]: {:.2f}mA, which is '
371                'out of the acceptable range {:.2f}±{:.2f}mA'.format(
372                    self.test_name, average_current, current_threshold,
373                    self.pass_fail_tolerance * current_threshold))
374            asserts.explicit_pass(
375                'Measurement finished for [{}]: {:.2f}mA, which is '
376                'within the acceptable range {:.2f}±{:.2f}'.format(
377                    self.test_name, average_current, current_threshold,
378                    self.pass_fail_tolerance * current_threshold))
379        else:
380            asserts.fail(
381                'Something happened, measurement is not complete, test failed')
382
383    def create_monsoon_info(self):
384        """Creates the config dictionary for monsoon
385
386        Returns:
387            mon_info: Dictionary with the monsoon packet config
388        """
389        mon_info = ObjNew(dut=self.mon,
390                          freq=self.mon_freq,
391                          duration=self.mon_duration,
392                          offset=self.mon_offset,
393                          data_path=self.mon_data_path)
394        return mon_info
395
396    def monsoon_recover(self):
397        """Test loop to wait for monsoon recover from unexpected error.
398
399        Wait for a certain time duration, then quit.0
400        Args:
401            mon: monsoon object
402        Returns:
403            True/False
404        """
405        try:
406            self.mon.reconnect_monsoon()
407            time.sleep(2)
408            self.mon.usb('on')
409            logging.info('Monsoon recovered from unexpected error')
410            time.sleep(2)
411            return True
412        except MonsoonError:
413            try:
414                self.log.info(self.mon_info.dut._mon.ser.in_waiting)
415            except AttributeError:
416                # This attribute does not exist for HVPMs.
417                pass
418            logging.warning('Unable to recover monsoon from unexpected error')
419            return False
420
421    def monsoon_data_collect_save(self):
422        """Current measurement and save the log file.
423
424        Collect current data using Monsoon box and return the path of the
425        log file. Take bug report if requested.
426
427        Returns:
428            A MonsoonResult object containing information about the gathered
429            data.
430        """
431
432        tag = '{}_{}_{}'.format(self.test_name, self.dut.model,
433                                self.dut.build_info['build_id'])
434
435        data_path = os.path.join(self.mon_info.data_path, '{}.txt'.format(tag))
436
437        # If the specified Monsoon data file already exists (e.g., multiple
438        # measurements in a single test), write the results to a new file with
439        # the postfix "_#".
440        if os.path.exists(data_path):
441            highest_value = 1
442            for filename in os.listdir(os.path.dirname(data_path)):
443                match = re.match(r'{}_(\d+).txt'.format(tag), filename)
444                if match:
445                    highest_value = max(highest_value, int(match.group(1)))
446
447            data_path = os.path.join(self.mon_info.data_path,
448                                     '%s_%s.txt' % (tag, highest_value + 1))
449
450        total_expected_samples = self.mon_info.freq * self.mon_info.duration
451        min_required_samples = (total_expected_samples * MIN_PERCENT_SAMPLE /
452                                100)
453        for retry_measure in range(1, MEASUREMENT_RETRY_COUNT + 1):
454            # Resets the battery status right before the test starts.
455            self.dut.adb.shell(RESET_BATTERY_STATS)
456            self.log.info('Starting power measurement. Duration: {}s. Offset: '
457                          '{}s. Voltage: {} V. attempt #{}.'.format(
458                              self.mon_info.duration, self.mon_info.offset,
459                              self.mon_voltage, retry_measure))
460            # Start the power measurement using monsoon.
461            self.mon_info.dut.usb(PassthroughStates.AUTO)
462            result = self.mon_info.dut.measure_power(
463                self.mon_info.duration,
464                measure_after_seconds=self.mon_info.offset,
465                hz=self.mon_info.freq,
466                output_path=data_path)
467            self.mon_info.dut.usb(PassthroughStates.ON)
468
469            self.log.debug(result)
470            self.log.debug('Samples Gathered: %s. Max Samples: %s '
471                           'Min Samples Required: %s.' %
472                           (result.num_samples, total_expected_samples,
473                            min_required_samples))
474
475            if result.num_samples <= min_required_samples:
476                retry_measure += 1
477                self.log.warning(
478                    'More than {} percent of samples are missing due to '
479                    'dropped packets. Need to remeasure.'.format(
480                        100 - MIN_PERCENT_SAMPLE))
481                continue
482
483            self.log.info('Measurement successful after {} attempt(s).'.format(
484                retry_measure))
485            return result
486        else:
487            try:
488                self.log.info(self.mon_info.dut._mon.ser.in_waiting)
489            except AttributeError:
490                # This attribute does not exist for HVPMs.
491                pass
492            self.log.error(
493                'Unable to gather enough samples to run validation.')
494
495    def process_iperf_results(self):
496        """Get the iperf results and process.
497
498        Returns:
499             throughput: the average throughput during tests.
500        """
501        # Get IPERF results and add this to the plot title
502        RESULTS_DESTINATION = os.path.join(
503            self.iperf_server.log_path,
504            'iperf_client_output_{}.log'.format(self.current_test_name))
505        self.dut.pull_files(TEMP_FILE, RESULTS_DESTINATION)
506        # Calculate the average throughput
507        if self.use_client_output:
508            iperf_file = RESULTS_DESTINATION
509        else:
510            iperf_file = self.iperf_server.log_files[-1]
511        try:
512            iperf_result = ipf.IPerfResult(iperf_file)
513
514            # Compute the throughput in Mbit/s
515            throughput = (math.fsum(
516                iperf_result.instantaneous_rates[self.start_meas_time:-1]
517            ) / len(iperf_result.instantaneous_rates[self.start_meas_time:-1])
518                          ) * 8 * (1.024**2)
519
520            self.log.info('The average throughput is {}'.format(throughput))
521        except ValueError:
522            self.log.warning('Cannot get iperf result. Setting to 0')
523            throughput = 0
524        return throughput
525