1#!/usr/bin/env python3
2#
3#   Copyright 2019 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import os
18import re
19import time
20import logging
21import pandas as pd
22
23from acts import asserts
24from acts.libs.proc import job
25from acts.base_test import BaseTestClass
26
27from acts.test_utils.bt.bt_power_test_utils import MediaControl
28from acts.test_utils.bt.ble_performance_test_utils import run_ble_throughput_and_read_rssi
29from acts.test_utils.abstract_devices.bluetooth_handsfree_abstract_device import BluetoothHandsfreeAbstractDeviceFactory as bt_factory
30
31import acts.test_utils.bt.bt_test_utils as bt_utils
32import acts.test_utils.wifi.wifi_performance_test_utils as wifi_utils
33
34PHONE_MUSIC_FILE_DIRECTORY = '/sdcard/Music'
35
36FORCE_SAR_ADB_COMMAND = ('am broadcast -n'
37                         'com.google.android.apps.scone/.coex.TestReceiver -a '
38                         'com.google.android.apps.scone.coex.SIMULATE_STATE ')
39
40DEFAULT_DURATION = 5
41DEFAULT_MAX_ERROR_THRESHOLD = 2
42DEFAULT_AGG_MAX_ERROR_THRESHOLD = 2
43FIXED_ATTENUATION = 36
44
45
46class BtSarBaseTest(BaseTestClass):
47    """ Base class for all BT SAR Test classes.
48
49        This class implements functions common to BT SAR test Classes.
50    """
51    BACKUP_BT_SAR_TABLE_NAME = 'backup_bt_sar_table.csv'
52
53    def __init__(self, controllers):
54        BaseTestClass.__init__(self, controllers)
55        self.power_file_paths = [
56            '/vendor/etc/bluetooth_power_limits.csv',
57            '/data/vendor/radio/bluetooth_power_limits.csv'
58        ]
59        self.sar_file_name = os.path.basename(self.power_file_paths[0])
60        self.power_column = 'BluetoothPower'
61        self.REG_DOMAIN_DICT = {
62            ('us', 'ca', 'in'): 'US',
63            ('uk', 'fr', 'es', 'de', 'it', 'ie', 'sg', 'au', 'tw'): 'EU',
64            ('jp', ): 'JP'
65        }
66
67    def setup_class(self):
68        """Initializes common test hardware and parameters.
69
70        This function initializes hardware and compiles parameters that are
71        common to all tests in this class and derived classes.
72        """
73        super().setup_class()
74
75        self.test_params = self.user_params.get('bt_sar_test_params', {})
76        if not self.test_params:
77            self.log.warning(
78                'bt_sar_test_params was not found in the config file.')
79
80        self.user_params.update(self.test_params)
81        req_params = ['bt_devices', 'calibration_params']
82
83        self.unpack_userparams(
84            req_params,
85            country_code='us',
86            duration=DEFAULT_DURATION,
87            custom_sar_path=None,
88            music_files=None,
89            sort_order=None,
90            max_error_threshold=DEFAULT_MAX_ERROR_THRESHOLD,
91            agg_error_threshold=DEFAULT_AGG_MAX_ERROR_THRESHOLD,
92            tpc_threshold=[2, 8],
93        )
94
95        self.attenuator = self.attenuators[0]
96        self.dut = self.android_devices[0]
97        for key in self.REG_DOMAIN_DICT.keys():
98            if self.country_code.lower() in key:
99                self.reg_domain = self.REG_DOMAIN_DICT[key]
100
101        self.sar_version_2 = False
102
103        if 'Error' not in self.dut.adb.shell('bluetooth_sar_test -r'):
104            #Flag for SAR version 2
105            self.sar_version_2 = True
106            self.power_column = 'BluetoothEDRPower'
107            self.power_file_paths[0] = os.path.join(
108                os.path.dirname(self.power_file_paths[0]),
109                'bluetooth_power_limits_{}.csv'.format(self.reg_domain))
110            self.sar_file_name = os.path.basename(self.power_file_paths[0])
111
112        self.sar_file_path = self.power_file_paths[0]
113        self.atten_min = 0
114        self.atten_max = int(self.attenuator.get_max_atten())
115
116        # Initializing media controller
117        if self.music_files:
118            music_src = self.music_files[0]
119            music_dest = PHONE_MUSIC_FILE_DIRECTORY
120            success = self.dut.push_system_file(music_src, music_dest)
121            if success:
122                self.music_file = os.path.join(PHONE_MUSIC_FILE_DIRECTORY,
123                                               os.path.basename(music_src))
124            # Initialize media_control class
125            self.media = MediaControl(self.dut, self.music_file)
126
127        #Initializing BT device controller
128        if self.bt_devices:
129            attr, idx = self.bt_devices.split(':')
130            self.bt_device_controller = getattr(self, attr)[int(idx)]
131            self.bt_device = bt_factory().generate(self.bt_device_controller)
132        else:
133            self.log.error('No BT devices config is provided!')
134
135        bt_utils.enable_bqr(self.android_devices)
136
137        self.log_path = os.path.join(logging.log_path, 'results')
138        os.makedirs(self.log_path, exist_ok=True)
139
140        # Reading BT SAR table from the phone
141        self.bt_sar_df = self.read_sar_table(self.dut)
142
143    def setup_test(self):
144        super().setup_test()
145
146        # Starting BT on the master
147        self.dut.droid.bluetoothFactoryReset()
148        bt_utils.enable_bluetooth(self.dut.droid, self.dut.ed)
149
150        # Starting BT on the slave
151        self.bt_device.reset()
152        self.bt_device.power_on()
153
154        # Connect master and slave
155        bt_utils.connect_phone_to_headset(self.dut, self.bt_device, 60)
156
157        # Playing music
158        self.media.play()
159
160        # Find and set PL10 level for the DUT
161        self.pl10_atten = self.set_PL10_atten_level(self.dut)
162        self.attenuator.set_atten(self.pl10_atten)
163
164    def teardown_test(self):
165        #Stopping Music
166        if hasattr(self, 'media'):
167            self.media.stop()
168
169        # Stopping BT on slave
170        self.bt_device.reset()
171        self.bt_device.power_off()
172
173        #Stopping BT on master
174        bt_utils.disable_bluetooth(self.dut.droid)
175
176        #Resetting the atten to initial levels
177        self.attenuator.set_atten(self.atten_min)
178        self.log.info('Attenuation set to {} dB'.format(self.atten_min))
179
180    def teardown_class(self):
181
182        super().teardown_class()
183        self.dut.droid.bluetoothFactoryReset()
184
185        # Stopping BT on slave
186        self.bt_device.reset()
187        self.bt_device.power_off()
188
189        #Stopping BT on master
190        bt_utils.disable_bluetooth(self.dut.droid)
191
192    def save_sar_plot(self, df):
193        """ Saves SAR plot to the path given.
194
195        Args:
196            df: Processed SAR table sweep results
197        """
198        self.plot.add_line(df.index,
199                           df['expected_tx_power'],
200                           legend='expected',
201                           marker='circle')
202        self.plot.add_line(df.index,
203                           df['measured_tx_power'],
204                           legend='measured',
205                           marker='circle')
206        self.plot.add_line(df.index,
207                           df['delta'],
208                           legend='delta',
209                           marker='circle')
210
211        results_file_path = os.path.join(
212            self.log_path, '{}.html'.format(self.current_test_name))
213        self.plot.generate_figure()
214        wifi_utils.BokehFigure.save_figures([self.plot], results_file_path)
215
216    def sweep_table(self,
217                    client_ad=None,
218                    server_ad=None,
219                    client_conn_id=None,
220                    gatt_server=None,
221                    gatt_callback=None,
222                    isBLE=False):
223        """Iterates over the BT SAR table and forces signal states.
224
225        Iterates over BT SAR table and forces signal states,
226        measuring RSSI and power level for each state.
227
228        Args:
229            client_ad: the Android device performing the connection.
230            server_ad: the Android device accepting the connection.
231            client_conn_id: the client connection ID.
232            gatt_server: the gatt server
233            gatt_callback: Gatt callback objec
234            isBLE : boolean variable for BLE connection
235        Returns:
236            sar_df : SAR table sweep results in pandas dataframe
237        """
238
239        sar_df = self.bt_sar_df.copy()
240        sar_df['power_cap'] = -128
241        sar_df['slave_rssi'] = -128
242        sar_df['master_rssi'] = -128
243        sar_df['ble_rssi'] = -128
244        sar_df['pwlv'] = -1
245
246        # Sorts the table
247        if self.sort_order:
248            if self.sort_order.lower() == 'ascending':
249                sar_df = sar_df.sort_values(by=[self.power_column],
250                                            ascending=True)
251            else:
252                sar_df = sar_df.sort_values(by=[self.power_column],
253                                            ascending=False)
254            sar_df = sar_df.reset_index(drop=True)
255
256        # Sweeping BT SAR table
257        for scenario in range(sar_df.shape[0]):
258            # Reading BT SAR Scenario from the table
259            read_scenario = sar_df.loc[scenario].to_dict()
260
261            start_time = self.dut.adb.shell('date +%s.%m')
262            time.sleep(1)
263
264            #Setting SAR State
265            self.set_sar_state(self.dut, read_scenario, self.country_code)
266
267            if isBLE:
268                sar_df.loc[scenario, 'power_cap'] = self.get_current_power_cap(
269                    self.dut, start_time, type='BLE')
270
271                sar_df.loc[scenario,
272                           'ble_rssi'] = run_ble_throughput_and_read_rssi(
273                               client_ad, server_ad, client_conn_id,
274                               gatt_server, gatt_callback)
275
276                self.log.info('scenario:{}, power_cap:{},  ble_rssi:{}'.format(
277                    scenario, sar_df.loc[scenario, 'power_cap'],
278                    sar_df.loc[scenario, 'ble_rssi']))
279            else:
280                sar_df.loc[scenario, 'power_cap'] = self.get_current_power_cap(
281                    self.dut, start_time)
282
283                processed_bqr_results = bt_utils.get_bt_metric(
284                    self.android_devices, self.duration)
285                sar_df.loc[scenario,
286                           'slave_rssi'] = processed_bqr_results['rssi'][
287                               self.bt_device_controller.serial]
288                sar_df.loc[scenario,
289                           'master_rssi'] = processed_bqr_results['rssi'][
290                               self.dut.serial]
291                sar_df.loc[scenario, 'pwlv'] = processed_bqr_results['pwlv'][
292                    self.dut.serial]
293                self.log.info(
294                    'scenario:{}, power_cap:{},  s_rssi:{}, m_rssi:{}, m_pwlv:{}'
295                    .format(scenario, sar_df.loc[scenario, 'power_cap'],
296                            sar_df.loc[scenario, 'slave_rssi'],
297                            sar_df.loc[scenario,
298                                       'master_rssi'], sar_df.loc[scenario,
299                                                                  'pwlv']))
300
301        self.log.info('BT SAR Table swept')
302
303        return sar_df
304
305    def process_table(self, sar_df):
306        """Processes the results of sweep_table and computes BT TX power.
307
308        Processes the results of sweep_table and computes BT TX power
309        after factoring in the path loss and FTM offsets.
310
311        Args:
312             sar_df: BT SAR table after the sweep
313
314        Returns:
315            sar_df: processed BT SAR table
316        """
317
318        sar_df['pathloss'] = self.calibration_params['pathloss']
319
320        if hasattr(self, 'pl10_atten'):
321            sar_df['atten'] = self.pl10_atten
322        else:
323            sar_df['atten'] = FIXED_ATTENUATION
324
325        # BT SAR Backoff for each scenario
326        if self.sar_version_2:
327            #Reads OTP values from the phone
328            self.otp = bt_utils.read_otp(self.dut)
329
330            #OTP backoff
331            edr_otp = min(0, float(self.otp['EDR']['10']))
332            bdr_otp = min(0, float(self.otp['BR']['10']))
333            ble_otp = min(0, float(self.otp['BLE']['10']))
334
335            # EDR TX Power for PL10
336            edr_tx_power_pl10 = self.calibration_params['target_power']['EDR'][
337                '10'] - edr_otp
338
339            # BDR TX Power for PL10
340            bdr_tx_power_pl10 = self.calibration_params['target_power']['BDR'][
341                '10'] - bdr_otp
342
343            # RSSI being measured is BDR
344            offset = bdr_tx_power_pl10 - edr_tx_power_pl10
345
346            # BDR-EDR offset
347            sar_df['offset'] = offset
348
349            # Max TX power permissible
350            sar_df['max_power'] = self.calibration_params['max_power']
351
352            # Adding a target power column
353            if 'ble_rssi' in sar_df.columns:
354                sar_df['target_power'] = self.calibration_params[
355                    'target_power']['BLE']['10'] - ble_otp
356            else:
357                sar_df['target_power'] = sar_df['pwlv'].astype(str).map(
358                    self.calibration_params['target_power']['EDR']) - edr_otp
359
360            #Translates power_cap values to expected TX power level
361            sar_df['cap_tx_power'] = sar_df['power_cap'] / 4.0
362
363            sar_df['expected_tx_power'] = sar_df[[
364                'cap_tx_power', 'target_power', 'max_power'
365            ]].min(axis=1)
366
367            if hasattr(self, 'pl10_atten'):
368                sar_df['measured_tx_power'] = sar_df['slave_rssi'] + sar_df[
369                    'pathloss'] + self.pl10_atten - offset
370            else:
371                sar_df['measured_tx_power'] = sar_df['ble_rssi'] + sar_df[
372                    'pathloss'] + FIXED_ATTENUATION
373
374        else:
375
376            # Adding a target power column
377            sar_df['target_power'] = sar_df['pwlv'].astype(str).map(
378                self.calibration_params['target_power']['EDR']['10'])
379
380            # Adding a ftm  power column
381            sar_df['ftm_power'] = sar_df['pwlv'].astype(str).map(
382                self.calibration_params['ftm_power']['EDR'])
383            sar_df[
384                'backoff'] = sar_df['target_power'] - sar_df['power_cap'] / 4.0
385
386            sar_df[
387                'expected_tx_power'] = sar_df['ftm_power'] - sar_df['backoff']
388            sar_df['measured_tx_power'] = sar_df['slave_rssi'] + sar_df[
389                'pathloss'] + self.pl10_atten
390
391        sar_df['delta'] = sar_df['expected_tx_power'] - sar_df[
392            'measured_tx_power']
393
394        self.log.info('Sweep results processed')
395
396        results_file_path = os.path.join(self.log_path, self.current_test_name)
397        sar_df.to_csv('{}.csv'.format(results_file_path))
398        self.save_sar_plot(sar_df)
399
400        return sar_df
401
402    def process_results(self, sar_df, type='EDR'):
403        """Determines the test results of the sweep.
404
405         Parses the processed table with computed BT TX power values
406         to return pass or fail.
407
408        Args:
409             sar_df: processed BT SAR table
410        """
411
412        # checks for errors at particular points in the sweep
413        max_error_result = abs(
414            sar_df['delta']) > self.max_error_threshold[type]
415        if False in max_error_result:
416            asserts.fail('Maximum Error Threshold Exceeded')
417
418        # checks for error accumulation across the sweep
419        if sar_df['delta'].sum() > self.agg_error_threshold[type]:
420            asserts.fail(
421                'Aggregate Error Threshold Exceeded. Error: {} Threshold: {}'.
422                format(sar_df['delta'].sum(), self.agg_error_threshold))
423
424        else:
425            asserts.explicit_pass('Measured and Expected Power Values in line')
426
427    def set_sar_state(self, ad, signal_dict, country_code='us'):
428        """Sets the SAR state corresponding to the BT SAR signal.
429
430        The SAR state is forced using an adb command that takes
431        device signals as input.
432
433        Args:
434            ad: android_device object.
435            signal_dict: dict of BT SAR signals read from the SAR file.
436        Returns:
437            enforced_state: dict of device signals.
438        """
439        signal_dict = {k: max(int(v), 0) for (k, v) in signal_dict.items()}
440        signal_dict["Wifi"] = signal_dict['WIFI5Ghz']
441        signal_dict['WIFI2Ghz'] = 0 if signal_dict['WIFI5Ghz'] else 1
442
443        device_state_dict = {
444            ('Earpiece', 'earpiece'): signal_dict['Head'],
445            ('Wifi', 'wifi'): signal_dict['WIFI5Ghz'],
446            ('Wifi 2.4G', 'wifi_24g'): signal_dict['WIFI2Ghz'],
447            ('Voice', 'voice'): 0,
448            ('Wifi AP', 'wifi_ap'): signal_dict['HotspotVoice'],
449            ('Bluetooth', 'bluetooth'): 1,
450            ('Bluetooth media', 'bt_media'): signal_dict['BTMedia'],
451            ('Radio', 'radio_power'): signal_dict['Cell'],
452            ('Motion', 'motion'): signal_dict['IMU'],
453            ('Bluetooth connected', 'bt_connected'): 1
454        }
455
456        if 'BTHotspot' in signal_dict.keys():
457            device_state_dict[('Bluetooth tethering',
458                               'bt_tethering')] = signal_dict['BTHotspot']
459
460        enforced_state = {}
461        sar_state_command = FORCE_SAR_ADB_COMMAND
462        for key in device_state_dict:
463            enforced_state[key[0]] = device_state_dict[key]
464            sar_state_command = '{} --ei {} {}'.format(sar_state_command,
465                                                       key[1],
466                                                       device_state_dict[key])
467        if self.sar_version_2:
468            sar_state_command = '{} --es country_iso "{}"'.format(
469                sar_state_command, country_code.lower())
470
471        #Forcing the SAR state
472        adb_output = ad.adb.shell(sar_state_command)
473
474        # Checking if command was successfully enforced
475        if 'result=0' in adb_output:
476            self.log.info('Requested BT SAR state successfully enforced.')
477            return enforced_state
478        else:
479            self.log.error("Couldn't force BT SAR state.")
480
481    def parse_bt_logs(self, ad, begin_time, regex=''):
482        """Returns bt software stats by parsing logcat since begin_time.
483
484        The quantity to be fetched is dictated by the regex provided.
485
486        Args:
487             ad: android_device object.
488             begin_time: time stamp to start the logcat parsing.
489             regex: regex for fetching the required BT software stats.
490
491        Returns:
492             stat: the desired BT stat.
493        """
494        # Waiting for logcat to update
495        time.sleep(1)
496        bt_adb_log = ad.adb.logcat('-b all -t %s' % begin_time)
497        for line in bt_adb_log.splitlines():
498            if re.findall(regex, line):
499                stat = re.findall(regex, line)[0]
500                return stat
501
502    def set_country_code(self, ad, cc):
503        """Sets the SAR regulatory domain as per given country code
504
505        The SAR regulatory domain is forced using an adb command that takes
506        country code as input.
507
508        Args:
509            ad: android_device object.
510            cc: country code
511        """
512
513        ad.adb.shell("{} --es country_iso {}".format(FORCE_SAR_ADB_COMMAND,
514                                                     cc))
515        self.log.info("Country Code set to {}".format(cc))
516
517    def get_country_code(self, ad, begin_time):
518        """Returns the enforced regulatory domain since begin_time
519
520        Returns enforced regulatory domain since begin_time by parsing logcat.
521        Function should follow a function call to set a country code
522
523        Args:
524            ad : android_device obj
525            begin_time: time stamp to start
526
527        Returns:
528            read enforced regulatory domain
529        """
530
531        reg_domain_regex = "updateRegulatoryDomain:\s+(\S+)"
532        reg_domain = self.parse_bt_logs(ad, begin_time, reg_domain_regex)
533        return reg_domain
534
535    def get_current_power_cap(self, ad, begin_time, type='EDR'):
536        """ Returns the enforced software EDR power cap since begin_time.
537
538        Returns the enforced EDR power cap since begin_time by parsing logcat.
539        Function should follow a function call that forces a SAR state
540
541        Args:
542            ad: android_device obj.
543            begin_time: time stamp to start.
544
545        Returns:
546            read enforced power cap
547        """
548        power_cap_regex_dict = {
549            'BDR': [
550                'Bluetooth powers: BR:\s+(\d+), EDR:\s+\d+',
551                'Bluetooth Tx Power Cap\s+(\d+)'
552            ],
553            'EDR': [
554                'Bluetooth powers: BR:\s+\d+, EDR:\s+(\d+)',
555                'Bluetooth Tx Power Cap\s+(\d+)'
556            ],
557            'BLE': [
558                'Bluetooth powers: BR:\s+\d+, EDR:\s+\d+, BLE:\s+(\d+)',
559                'Bluetooth Tx Power Cap\s+(\d+)'
560            ]
561        }
562
563        power_cap_regex_list = power_cap_regex_dict[type]
564
565        for power_cap_regex in power_cap_regex_list:
566            power_cap = self.parse_bt_logs(ad, begin_time, power_cap_regex)
567            if power_cap:
568                return int(power_cap)
569
570        raise ValueError('Failed to get TX power cap')
571
572    def get_current_device_state(self, ad, begin_time):
573        """ Returns the device state of the android dut since begin_time.
574
575        Returns the device state of the android dut by parsing logcat since
576        begin_time. Function should follow a function call that forces
577        a SAR state.
578
579        Args:
580            ad: android_device obj.
581            begin_time: time stamp to start.
582
583        Returns:
584            device_state: device state of the android device.
585        """
586
587        device_state_regex = 'updateDeviceState: DeviceState: ([\s*\S+\s]+)'
588        time.sleep(2)
589        device_state = self.parse_bt_logs(ad, begin_time, device_state_regex)
590        if device_state:
591            return device_state
592
593        raise ValueError("Couldn't fetch device state")
594
595    def read_sar_table(self, ad):
596        """Extracts the BT SAR table from the phone.
597
598        Extracts the BT SAR table from the phone into the android device
599        log path directory.
600
601        Args:
602            ad: android_device object.
603
604        Returns:
605            df : BT SAR table (as pandas DataFrame).
606        """
607        output_path = os.path.join(ad.device_log_path, self.sar_file_name)
608        ad.adb.pull('{} {}'.format(self.sar_file_path, output_path))
609        df = pd.read_csv(os.path.join(ad.device_log_path, self.sar_file_name))
610        self.log.info('BT SAR table read from the phone')
611        return df
612
613    def push_table(self, ad, src_path):
614        """Pushes a BT SAR table to the phone.
615
616        Pushes a BT SAR table to the android device and reboots the device.
617        Also creates a backup file if backup flag is True.
618
619        Args:
620            ad: android_device object.
621            src_path: path to the  BT SAR table.
622        """
623        #Copying the to-be-pushed file for logging
624        if os.path.dirname(src_path) != ad.device_log_path:
625            job.run('cp {} {}'.format(src_path, ad.device_log_path))
626
627        #Pushing the file provided in the config
628        ad.push_system_file(src_path, self.sar_file_path)
629        self.log.info('BT SAR table pushed')
630        ad.reboot()
631        self.bt_sar_df = self.read_sar_table(self.dut)
632
633    def set_PL10_atten_level(self, ad):
634        """Finds the attenuation level at which the phone is at PL10
635
636        Finds PL10 attenuation level by sweeping the attenuation range.
637        If the power level is not achieved during sweep,
638        returns the max atten level
639
640        Args:
641            ad: android object class
642        Returns:
643            atten : attenuation level when the phone is at PL10
644        """
645        BT_SAR_ATTEN_STEP = 3
646
647        for atten in range(self.atten_min, self.atten_max, BT_SAR_ATTEN_STEP):
648            self.attenuator.set_atten(atten)
649            # Sleep required for BQR to reflect the change in parameters
650            time.sleep(2)
651            metrics = bt_utils.get_bt_metric(ad)
652            if metrics['pwlv'][ad.serial] == 10:
653                self.log.info('PL10 located at {}'.format(atten +
654                                                          BT_SAR_ATTEN_STEP))
655                return atten + BT_SAR_ATTEN_STEP
656
657        self.log.warn(
658            "PL10 couldn't be located in the given attenuation range")
659