1#!/usr/bin/env python3.4
2#
3#   Copyright 2017 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import collections
18import csv
19import itertools
20import logging
21import numpy
22import os
23from acts import asserts
24from acts import context
25from acts import base_test
26from acts import utils
27from acts.controllers import iperf_client
28from acts.controllers.utils_lib import ssh
29from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger
30from acts.test_utils.wifi import ota_chamber
31from acts.test_utils.wifi import wifi_performance_test_utils as wputils
32from acts.test_utils.wifi import wifi_test_utils as wutils
33from acts.test_utils.wifi import wifi_retail_ap as retail_ap
34from functools import partial
35from WifiRvrTest import WifiRvrTest
36from WifiPingTest import WifiPingTest
37
38
39class WifiSensitivityTest(WifiRvrTest, WifiPingTest):
40    """Class to test WiFi sensitivity tests.
41
42    This class implements measures WiFi sensitivity per rate. It heavily
43    leverages the WifiRvrTest class and introduced minor differences to set
44    specific rates and the access point, and implements a different pass/fail
45    check. For an example config file to run this test class see
46    example_connectivity_performance_ap_sta.json.
47    """
48
49    RSSI_POLL_INTERVAL = 0.2
50    VALID_TEST_CONFIGS = {
51        1: ['legacy', 'VHT20'],
52        2: ['legacy', 'VHT20'],
53        6: ['legacy', 'VHT20'],
54        10: ['legacy', 'VHT20'],
55        11: ['legacy', 'VHT20'],
56        36: ['legacy', 'VHT20', 'VHT40', 'VHT80'],
57        40: ['legacy', 'VHT20'],
58        44: ['legacy', 'VHT20'],
59        48: ['legacy', 'VHT20'],
60        149: ['legacy', 'VHT20', 'VHT40', 'VHT80'],
61        153: ['legacy', 'VHT20'],
62        157: ['legacy', 'VHT20'],
63        161: ['legacy', 'VHT20']
64    }
65    RateTuple = collections.namedtuple(('RateTuple'),
66                                       ['mcs', 'streams', 'data_rate'])
67    #yapf:disable
68    VALID_RATES = {
69        'legacy_2GHz': [
70            RateTuple(54, 1, 54), RateTuple(48, 1, 48),
71            RateTuple(36, 1, 36), RateTuple(24, 1, 24),
72            RateTuple(18, 1, 18), RateTuple(12, 1, 12),
73            RateTuple(11, 1, 11), RateTuple(9, 1, 9),
74            RateTuple(6, 1, 6), RateTuple(5.5, 1, 5.5),
75            RateTuple(2, 1, 2), RateTuple(1, 1, 1)],
76        'legacy_5GHz': [
77            RateTuple(54, 1, 54), RateTuple(48, 1, 48),
78            RateTuple(36, 1, 36), RateTuple(24, 1, 24),
79            RateTuple(18, 1, 18), RateTuple(12, 1, 12),
80            RateTuple(9, 1, 9), RateTuple(6, 1, 6)],
81        'HT20': [
82            RateTuple(7, 1, 72.2), RateTuple(6, 1, 65),
83            RateTuple(5, 1, 57.8), RateTuple(4, 1, 43.3),
84            RateTuple(3, 1, 26), RateTuple(2, 1, 21.7),
85            RateTuple(1, 1, 14.4), RateTuple(0, 1, 7.2),
86            RateTuple(15, 2, 144.4), RateTuple(14, 2, 130),
87            RateTuple(13, 2, 115.6), RateTuple(12, 2, 86.7),
88            RateTuple(11, 2, 57.8), RateTuple(10, 2, 43.4),
89            RateTuple(9, 2, 28.9), RateTuple(8, 2, 14.4)],
90        'VHT20': [
91            RateTuple(9, 1, 96), RateTuple(8, 1, 86.7),
92            RateTuple(7, 1, 72.2), RateTuple(6, 1, 65),
93            RateTuple(5, 1, 57.8), RateTuple(4, 1, 43.3),
94            RateTuple(3, 1, 28.9), RateTuple(2, 1, 21.7),
95            RateTuple(1, 1, 14.4), RateTuple(0, 1, 7.2),
96            RateTuple(9, 2, 192), RateTuple(8, 2, 173.3),
97            RateTuple(7, 2, 144.4), RateTuple(6, 2, 130.3),
98            RateTuple(5, 2, 115.6), RateTuple(4, 2, 86.7),
99            RateTuple(3, 2, 57.8), RateTuple(2, 2, 43.3),
100            RateTuple(1, 2, 28.9), RateTuple(0, 2, 14.4)],
101        'VHT40': [
102            RateTuple(9, 1, 96), RateTuple(8, 1, 86.7),
103            RateTuple(7, 1, 72.2), RateTuple(6, 1, 65),
104            RateTuple(5, 1, 57.8), RateTuple(4, 1, 43.3),
105            RateTuple(3, 1, 28.9), RateTuple(2, 1, 21.7),
106            RateTuple(1, 1, 14.4), RateTuple(0, 1, 7.2),
107            RateTuple(9, 2, 192), RateTuple(8, 2, 173.3),
108            RateTuple(7, 2, 144.4), RateTuple(6, 2, 130.3),
109            RateTuple(5, 2, 115.6), RateTuple(4, 2, 86.7),
110            RateTuple(3, 2, 57.8), RateTuple(2, 2, 43.3),
111            RateTuple(1, 2, 28.9), RateTuple(0, 2, 14.4)],
112        'VHT80': [
113            RateTuple(9, 1, 96), RateTuple(8, 1, 86.7),
114            RateTuple(7, 1, 72.2), RateTuple(6, 1, 65),
115            RateTuple(5, 1, 57.8), RateTuple(4, 1, 43.3),
116            RateTuple(3, 1, 28.9), RateTuple(2, 1, 21.7),
117            RateTuple(1, 1, 14.4), RateTuple(0, 1, 7.2),
118            RateTuple(9, 2, 192), RateTuple(8, 2, 173.3),
119            RateTuple(7, 2, 144.4), RateTuple(6, 2, 130.3),
120            RateTuple(5, 2, 115.6), RateTuple(4, 2, 86.7),
121            RateTuple(3, 2, 57.8), RateTuple(2, 2, 43.3),
122            RateTuple(1, 2, 28.9), RateTuple(0, 2, 14.4)],
123    }
124    #yapf:enable
125
126    def __init__(self, controllers):
127        base_test.BaseTestClass.__init__(self, controllers)
128        self.testcase_metric_logger = (
129            BlackboxMappedMetricLogger.for_test_case())
130        self.testclass_metric_logger = (
131            BlackboxMappedMetricLogger.for_test_class())
132        self.publish_testcase_metrics = True
133
134    def setup_class(self):
135        """Initializes common test hardware and parameters.
136
137        This function initializes hardwares and compiles parameters that are
138        common to all tests in this class.
139        """
140        self.dut = self.android_devices[-1]
141        req_params = [
142            'RetailAccessPoints', 'sensitivity_test_params', 'testbed_params',
143            'RemoteServer'
144        ]
145        opt_params = ['main_network']
146        self.unpack_userparams(req_params, opt_params)
147        self.testclass_params = self.sensitivity_test_params
148        self.num_atten = self.attenuators[0].instrument.num_atten
149        self.ping_server = ssh.connection.SshConnection(
150            ssh.settings.from_config(self.RemoteServer[0]['ssh_config']))
151        self.iperf_server = self.iperf_servers[0]
152        self.iperf_client = self.iperf_clients[0]
153        self.access_point = retail_ap.create(self.RetailAccessPoints)[0]
154        self.log.info('Access Point Configuration: {}'.format(
155            self.access_point.ap_settings))
156        self.log_path = os.path.join(logging.log_path, 'results')
157        os.makedirs(self.log_path, exist_ok=True)
158        self.atten_dut_chain_map = {}
159        self.testclass_results = []
160
161        # Turn WiFi ON
162        if self.testclass_params.get('airplane_mode', 1):
163            self.log.info('Turning on airplane mode.')
164            asserts.assert_true(utils.force_airplane_mode(self.dut, True),
165                                "Can not turn on airplane mode.")
166        wutils.wifi_toggle_state(self.dut, True)
167
168        # Configure test retries
169        self.user_params['retry_tests'] = [self.__class__.__name__]
170
171    def teardown_class(self):
172        # Turn WiFi OFF
173        for dev in self.android_devices:
174            wutils.wifi_toggle_state(dev, False)
175        self.process_testclass_results()
176
177    def setup_test(self):
178        self.retry_flag = False
179
180    def teardown_test(self):
181        self.retry_flag = False
182
183    def on_retry(self):
184        """Function to control test logic on retried tests.
185
186        This function is automatically executed on tests that are being
187        retried. In this case the function resets wifi, toggles it off and on
188        and sets a retry_flag to enable further tweaking the test logic on
189        second attempts.
190        """
191        self.retry_flag = True
192        for dev in self.android_devices:
193            wutils.reset_wifi(dev)
194            wutils.toggle_wifi_off_and_on(dev)
195
196    def pass_fail_check(self, result):
197        """Checks sensitivity results and decides on pass/fail.
198
199        Args:
200            result: dict containing attenuation, throughput and other meta
201                data
202        """
203        result_string = ('Throughput = {}%, Sensitivity = {}.'.format(
204            result['peak_throughput_pct'], result['sensitivity']))
205        if result['peak_throughput_pct'] < 95:
206            asserts.fail('Result unreliable. {}'.format(result_string))
207        else:
208            asserts.explicit_pass('Test Passed. {}'.format(result_string))
209
210    def process_testclass_results(self):
211        """Saves and plots test results from all executed test cases."""
212        # write json output
213        testclass_results_dict = collections.OrderedDict()
214        id_fields = ['mode', 'rate', 'num_streams', 'chain_mask']
215        channels_tested = []
216        for result in self.testclass_results:
217            testcase_params = result['testcase_params']
218            test_id = self.extract_test_id(testcase_params, id_fields)
219            test_id = tuple(test_id.items())
220            if test_id not in testclass_results_dict:
221                testclass_results_dict[test_id] = collections.OrderedDict()
222            channel = testcase_params['channel']
223            if channel not in channels_tested:
224                channels_tested.append(channel)
225            if result['peak_throughput_pct'] >= 95:
226                testclass_results_dict[test_id][channel] = result[
227                    'sensitivity']
228            else:
229                testclass_results_dict[test_id][channel] = ''
230
231        # calculate average metrics
232        metrics_dict = collections.OrderedDict()
233        id_fields = ['channel', 'mode', 'num_streams', 'chain_mask']
234        for test_id in testclass_results_dict.keys():
235            for channel in testclass_results_dict[test_id].keys():
236                metric_tag = collections.OrderedDict(test_id, channel=channel)
237                metric_tag = self.extract_test_id(metric_tag, id_fields)
238                metric_tag = tuple(metric_tag.items())
239                metrics_dict.setdefault(metric_tag, [])
240                sensitivity_result = testclass_results_dict[test_id][channel]
241                if sensitivity_result != '':
242                    metrics_dict[metric_tag].append(sensitivity_result)
243        for metric_tag_tuple, metric_data in metrics_dict.items():
244            metric_tag_dict = collections.OrderedDict(metric_tag_tuple)
245            metric_tag = 'ch{}_{}_nss{}_chain{}'.format(
246                metric_tag_dict['channel'], metric_tag_dict['mode'],
247                metric_tag_dict['num_streams'], metric_tag_dict['chain_mask'])
248            metric_key = "{}.avg_sensitivity".format(metric_tag)
249            metric_value = numpy.nanmean(metric_data)
250            self.testclass_metric_logger.add_metric(metric_key, metric_value)
251
252        # write csv
253        csv_header = ['Mode', 'MCS', 'Streams', 'Chain', 'Rate (Mbps)']
254        for channel in channels_tested:
255            csv_header.append('Ch. ' + str(channel))
256        results_file_path = os.path.join(self.log_path, 'results.csv')
257        with open(results_file_path, mode='w') as csv_file:
258            writer = csv.DictWriter(csv_file, fieldnames=csv_header)
259            writer.writeheader()
260            for test_id, test_results in testclass_results_dict.items():
261                test_id_dict = dict(test_id)
262                if 'legacy' in test_id_dict['mode']:
263                    rate_list = self.VALID_RATES['legacy_2GHz']
264                else:
265                    rate_list = self.VALID_RATES[test_id_dict['mode']]
266                data_rate = next(rate.data_rate for rate in rate_list
267                                 if rate[:-1] == (test_id_dict['rate'],
268                                                  test_id_dict['num_streams']))
269                row_value = {
270                    'Mode': test_id_dict['mode'],
271                    'MCS': test_id_dict['rate'],
272                    'Streams': test_id_dict['num_streams'],
273                    'Chain': test_id_dict['chain_mask'],
274                    'Rate (Mbps)': data_rate,
275                }
276                for channel in channels_tested:
277                    row_value['Ch. ' + str(channel)] = test_results.pop(
278                        channel, ' ')
279                writer.writerow(row_value)
280
281        if not self.testclass_params['traffic_type'].lower() == 'ping':
282            WifiRvrTest.process_testclass_results(self)
283
284    def process_rvr_test_results(self, testcase_params, rvr_result):
285        """Post processes RvR results to compute sensitivity.
286
287        Takes in the results of the RvR tests and computes the sensitivity of
288        the current rate by looking at the point at which throughput drops
289        below the percentage specified in the config file. The function then
290        calls on its parent class process_test_results to plot the result.
291
292        Args:
293            rvr_result: dict containing attenuation, throughput and other meta
294            data
295        """
296        rvr_result['peak_throughput'] = max(rvr_result['throughput_receive'])
297        rvr_result['peak_throughput_pct'] = 100
298        throughput_check = [
299            throughput < rvr_result['peak_throughput'] *
300            (self.testclass_params['throughput_pct_at_sensitivity'] / 100)
301            for throughput in rvr_result['throughput_receive']
302        ]
303        consistency_check = [
304            idx for idx in range(len(throughput_check))
305            if all(throughput_check[idx:])
306        ]
307        rvr_result['atten_at_range'] = rvr_result['attenuation'][
308            consistency_check[0] - 1]
309        rvr_result['range'] = rvr_result['fixed_attenuation'] + (
310            rvr_result['atten_at_range'])
311        rvr_result['sensitivity'] = self.testclass_params['ap_tx_power'] + (
312            self.testbed_params['ap_tx_power_offset'][str(
313                testcase_params['channel'])] - rvr_result['range'])
314        WifiRvrTest.process_test_results(self, rvr_result)
315
316    def process_ping_test_results(self, testcase_params, ping_result):
317        """Post processes RvR results to compute sensitivity.
318
319        Takes in the results of the RvR tests and computes the sensitivity of
320        the current rate by looking at the point at which throughput drops
321        below the percentage specified in the config file. The function then
322        calls on its parent class process_test_results to plot the result.
323
324        Args:
325            rvr_result: dict containing attenuation, throughput and other meta
326            data
327        """
328        WifiPingTest.process_ping_results(self, testcase_params, ping_result)
329        ping_result['sensitivity'] = self.testclass_params['ap_tx_power'] + (
330            self.testbed_params['ap_tx_power_offset'][str(
331                testcase_params['channel'])] - ping_result['range'])
332
333    def setup_sensitivity_test(self, testcase_params):
334        if testcase_params['traffic_type'].lower() == 'ping':
335            self.setup_ping_test(testcase_params)
336            self.run_sensitivity_test = self.run_ping_test
337            self.process_sensitivity_test_results = (
338                self.process_ping_test_results)
339        else:
340            self.setup_rvr_test(testcase_params)
341            self.run_sensitivity_test = self.run_rvr_test
342            self.process_sensitivity_test_results = (
343                self.process_rvr_test_results)
344
345    def setup_ap(self, testcase_params):
346        """Sets up the AP and attenuator to compensate for AP chain imbalance.
347
348        Args:
349            testcase_params: dict containing AP and other test params
350        """
351        band = self.access_point.band_lookup_by_channel(
352            testcase_params['channel'])
353        if '2G' in band:
354            frequency = wutils.WifiEnums.channel_2G_to_freq[
355                testcase_params['channel']]
356        else:
357            frequency = wutils.WifiEnums.channel_5G_to_freq[
358                testcase_params['channel']]
359        if frequency in wutils.WifiEnums.DFS_5G_FREQUENCIES:
360            self.access_point.set_region(self.testbed_params['DFS_region'])
361        else:
362            self.access_point.set_region(self.testbed_params['default_region'])
363        self.access_point.set_channel(band, testcase_params['channel'])
364        self.access_point.set_bandwidth(band, testcase_params['mode'])
365        self.access_point.set_power(band, testcase_params['ap_tx_power'])
366        self.access_point.set_rate(band, testcase_params['mode'],
367                                   testcase_params['num_streams'],
368                                   testcase_params['rate'],
369                                   testcase_params['short_gi'])
370        # Set attenuator offsets and set attenuators to initial condition
371        atten_offsets = self.testbed_params['chain_offset'][str(
372            testcase_params['channel'])]
373        for atten in self.attenuators:
374            if 'AP-Chain-0' in atten.path:
375                atten.offset = atten_offsets[0]
376            elif 'AP-Chain-1' in atten.path:
377                atten.offset = atten_offsets[1]
378            else:
379                atten.offset = 0
380        self.log.info('Access Point Configuration: {}'.format(
381            self.access_point.ap_settings))
382
383    def setup_dut(self, testcase_params):
384        """Sets up the DUT in the configuration required by the test.
385
386        Args:
387            testcase_params: dict containing AP and other test params
388        """
389        # Check battery level before test
390        if not wputils.health_check(self.dut, 10):
391            asserts.skip('Battery level too low. Skipping test.')
392        # Turn screen off to preserve battery
393        self.dut.go_to_sleep()
394        if wputils.validate_network(self.dut,
395                                    testcase_params['test_network']['SSID']):
396            self.log.info('Already connected to desired network')
397        else:
398            wutils.reset_wifi(self.dut)
399            wutils.set_wifi_country_code(self.dut,
400                                         self.testclass_params['country_code'])
401            testcase_params['test_network']['channel'] = testcase_params[
402                'channel']
403            wutils.wifi_connect(self.dut,
404                                testcase_params['test_network'],
405                                num_of_tries=5,
406                                check_connectivity=False)
407        self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0]
408        # Activate/attenuate the correct chains
409        if testcase_params['channel'] not in self.atten_dut_chain_map.keys():
410            self.atten_dut_chain_map[testcase_params[
411                'channel']] = wputils.get_current_atten_dut_chain_map(
412                    self.attenuators, self.dut, self.ping_server)
413        self.log.info("Current Attenuator-DUT Chain Map: {}".format(
414            self.atten_dut_chain_map[testcase_params['channel']]))
415        for idx, atten in enumerate(self.attenuators):
416            if self.atten_dut_chain_map[testcase_params['channel']][
417                    idx] == testcase_params['attenuated_chain']:
418                atten.offset = atten.instrument.max_atten
419
420    def extract_test_id(self, testcase_params, id_fields):
421        test_id = collections.OrderedDict(
422            (param, testcase_params[param]) for param in id_fields)
423        return test_id
424
425    def get_start_atten(self, testcase_params):
426        """Gets the starting attenuation for this sensitivity test.
427
428        The function gets the starting attenuation by checking whether a test
429        as the next higher MCS has been executed. If so it sets the starting
430        point a configurable number of dBs below the next MCS's sensitivity.
431
432        Returns:
433            start_atten: starting attenuation for current test
434        """
435        # If the test is being retried, start from the beginning
436        if self.retry_flag:
437            self.log.info('Retry flag set. Setting attenuation to minimum.')
438            return self.testclass_params['atten_start']
439        # Get the current and reference test config. The reference test is the
440        # one performed at the current MCS+1
441        current_rate = testcase_params['rate']
442        ref_test_params = self.extract_test_id(
443            testcase_params,
444            ['channel', 'mode', 'rate', 'num_streams', 'chain_mask'])
445        if 'legacy' in testcase_params['mode']:
446            if testcase_params['channel'] <= 13:
447                rate_list = self.VALID_RATES['legacy_2GHz']
448            else:
449                rate_list = self.VALID_RATES['legacy_5GHz']
450            ref_index = max(
451                0,
452                rate_list.index(self.RateTuple(current_rate, 1, current_rate))
453                - 1)
454            ref_test_params['rate'] = rate_list[ref_index].mcs
455        else:
456            ref_test_params['rate'] = current_rate + 1
457
458        # Check if reference test has been run and set attenuation accordingly
459        previous_params = [
460            self.extract_test_id(
461                result['testcase_params'],
462                ['channel', 'mode', 'rate', 'num_streams', 'chain_mask'])
463            for result in self.testclass_results
464        ]
465
466        try:
467            ref_index = previous_params.index(ref_test_params)
468            start_atten = self.testclass_results[ref_index][
469                'atten_at_range'] - (
470                    self.testclass_params['adjacent_mcs_range_gap'])
471        except ValueError:
472            self.log.warning(
473                'Reference test not found. Starting from {} dB'.format(
474                    self.testclass_params['atten_start']))
475            start_atten = self.testclass_params['atten_start']
476            start_atten = max(start_atten, 0)
477        return start_atten
478
479    def compile_test_params(self, testcase_params):
480        """Function that generates test params based on the test name."""
481        band = self.access_point.band_lookup_by_channel(
482            testcase_params['channel'])
483        testcase_params['test_network'] = self.main_network[band]
484        if testcase_params['chain_mask'] in ['0', '1']:
485            testcase_params['attenuated_chain'] = 'DUT-Chain-{}'.format(
486                1 if testcase_params['chain_mask'] == '0' else 0)
487        else:
488            # Set attenuated chain to -1. Do not set to None as this will be
489            # compared to RF chain map which may include None
490            testcase_params['attenuated_chain'] = -1
491
492        self.testclass_params[
493            'range_ping_loss_threshold'] = 100 - self.testclass_params[
494                'throughput_pct_at_sensitivity']
495        if self.testclass_params['traffic_type'] == 'UDP':
496            testcase_params['iperf_args'] = '-i 1 -t {} -J -u -b {}'.format(
497                self.testclass_params['iperf_duration'],
498                self.testclass_params['UDP_rates'][testcase_params['mode']])
499        elif self.testclass_params['traffic_type'] == 'TCP':
500            testcase_params['iperf_args'] = '-i 1 -t {} -J'.format(
501                self.testclass_params['iperf_duration'])
502
503        if self.testclass_params['traffic_type'] != 'ping' and isinstance(
504                self.iperf_client, iperf_client.IPerfClientOverAdb):
505            testcase_params['iperf_args'] += ' -R'
506            testcase_params['use_client_output'] = True
507        else:
508            testcase_params['use_client_output'] = False
509
510        return testcase_params
511
512    def _test_sensitivity(self, testcase_params):
513        """ Function that gets called for each test case
514
515        The function gets called in each rvr test case. The function customizes
516        the rvr test based on the test name of the test that called it
517        """
518        # Compile test parameters from config and test name
519        testcase_params = self.compile_test_params(testcase_params)
520        testcase_params.update(self.testclass_params)
521        testcase_params['atten_start'] = self.get_start_atten(testcase_params)
522        num_atten_steps = int(
523            (testcase_params['atten_stop'] - testcase_params['atten_start']) /
524            testcase_params['atten_step'])
525        testcase_params['atten_range'] = [
526            testcase_params['atten_start'] + x * testcase_params['atten_step']
527            for x in range(0, num_atten_steps)
528        ]
529
530        # Prepare devices and run test
531        self.setup_sensitivity_test(testcase_params)
532        result = self.run_sensitivity_test(testcase_params)
533        self.process_sensitivity_test_results(testcase_params, result)
534
535        # Post-process results
536        self.testclass_results.append(result)
537        self.pass_fail_check(result)
538
539    def generate_test_cases(self, channels, modes, chain_mask):
540        """Function that auto-generates test cases for a test class."""
541        test_cases = []
542        for channel in channels:
543            requested_modes = [
544                mode for mode in modes
545                if mode in self.VALID_TEST_CONFIGS[channel]
546            ]
547            for mode in requested_modes:
548                if 'VHT' in mode:
549                    rates = self.VALID_RATES[mode]
550                elif 'HT' in mode:
551                    rates = self.VALID_RATES[mode]
552                elif 'legacy' in mode and channel < 14:
553                    rates = self.VALID_RATES['legacy_2GHz']
554                elif 'legacy' in mode and channel > 14:
555                    rates = self.VALID_RATES['legacy_5GHz']
556                else:
557                    raise ValueError('Invalid test mode.')
558                for chain, rate in itertools.product(chain_mask, rates):
559                    testcase_params = collections.OrderedDict(
560                        channel=channel,
561                        mode=mode,
562                        rate=rate.mcs,
563                        num_streams=rate.streams,
564                        short_gi=1,
565                        chain_mask=chain)
566                    if chain in ['0', '1'] and rate[1] == 2:
567                        # Do not test 2-stream rates in single chain mode
568                        continue
569                    if 'legacy' in mode:
570                        testcase_name = ('test_sensitivity_ch{}_{}_{}_nss{}'
571                                         '_ch{}'.format(
572                                             channel, mode,
573                                             str(rate.mcs).replace('.', 'p'),
574                                             rate.streams, chain))
575                    else:
576                        testcase_name = ('test_sensitivity_ch{}_{}_mcs{}_nss{}'
577                                         '_ch{}'.format(
578                                             channel, mode, rate.mcs,
579                                             rate.streams, chain))
580                    setattr(self, testcase_name,
581                            partial(self._test_sensitivity, testcase_params))
582                    test_cases.append(testcase_name)
583        return test_cases
584
585
586class WifiSensitivity_AllChannels_Test(WifiSensitivityTest):
587    def __init__(self, controllers):
588        super().__init__(controllers)
589        self.tests = self.generate_test_cases(
590            [6, 36, 40, 44, 48, 149, 153, 157, 161],
591            ['VHT20', 'VHT40', 'VHT80'], ['0', '1', '2x2'])
592
593
594class WifiSensitivity_SampleChannels_Test(WifiSensitivityTest):
595    def __init__(self, controllers):
596        super().__init__(controllers)
597        self.tests = self.generate_test_cases([6, 36, 149],
598                                              ['VHT20', 'VHT40', 'VHT80'],
599                                              ['0', '1', '2x2'])
600
601
602class WifiSensitivity_2GHz_Test(WifiSensitivityTest):
603    def __init__(self, controllers):
604        super().__init__(controllers)
605        self.tests = self.generate_test_cases([1, 2, 6, 10, 11], ['VHT20'],
606                                              ['0', '1', '2x2'])
607
608
609class WifiSensitivity_5GHz_Test(WifiSensitivityTest):
610    def __init__(self, controllers):
611        super().__init__(controllers)
612        self.tests = self.generate_test_cases(
613            [36, 40, 44, 48, 149, 153, 157, 161], ['VHT20', 'VHT40', 'VHT80'],
614            ['0', '1', '2x2'])
615
616
617class WifiSensitivity_UNII1_Test(WifiSensitivityTest):
618    def __init__(self, controllers):
619        super().__init__(controllers)
620        self.tests = self.generate_test_cases([36, 40, 44, 48],
621                                              ['VHT20', 'VHT40', 'VHT80'],
622                                              ['0', '1', '2x2'])
623
624
625class WifiSensitivity_UNII3_Test(WifiSensitivityTest):
626    def __init__(self, controllers):
627        super().__init__(controllers)
628        self.tests = self.generate_test_cases([149, 153, 157, 161],
629                                              ['VHT20', 'VHT40', 'VHT80'],
630                                              ['0', '1', '2x2'])
631
632
633# Over-the air version of senstivity tests
634class WifiOtaSensitivityTest(WifiSensitivityTest):
635    """Class to test over-the-air senstivity.
636
637    This class implements measures WiFi sensitivity tests in an OTA chamber.
638    It allows setting orientation and other chamber parameters to study
639    performance in varying channel conditions
640    """
641    def __init__(self, controllers):
642        base_test.BaseTestClass.__init__(self, controllers)
643        self.testcase_metric_logger = (
644            BlackboxMappedMetricLogger.for_test_case())
645        self.testclass_metric_logger = (
646            BlackboxMappedMetricLogger.for_test_class())
647        self.publish_testcase_metrics = False
648
649    def setup_class(self):
650        WifiSensitivityTest.setup_class(self)
651        self.current_chain_mask = '2x2'
652        self.ota_chamber = ota_chamber.create(
653            self.user_params['OTAChamber'])[0]
654
655    def teardown_class(self):
656        WifiSensitivityTest.teardown_class(self)
657        self.ota_chamber.reset_chamber()
658
659    def setup_sensitivity_test(self, testcase_params):
660        # Setup turntable
661        self.ota_chamber.set_orientation(testcase_params['orientation'])
662        # Continue test setup
663        WifiSensitivityTest.setup_sensitivity_test(self, testcase_params)
664
665    def setup_dut(self, testcase_params):
666        """Sets up the DUT in the configuration required by the test.
667
668        Args:
669            testcase_params: dict containing AP and other test params
670        """
671        # Configure the right INI settings
672        if testcase_params['chain_mask'] != self.current_chain_mask:
673            self.log.info('Updating WiFi chain mask to: {}'.format(
674                testcase_params['chain_mask']))
675            self.current_chain_mask = testcase_params['chain_mask']
676            if testcase_params['chain_mask'] in ['0', '1']:
677                wputils.set_ini_single_chain_mode(
678                    self.dut, int(testcase_params['chain_mask']))
679            else:
680                wputils.set_ini_two_chain_mode(self.dut)
681        # Check battery level before test
682        if not wputils.health_check(self.dut, 10):
683            asserts.skip('Battery level too low. Skipping test.')
684        # Turn screen off to preserve battery
685        self.dut.go_to_sleep()
686        if wputils.validate_network(self.dut,
687                                    testcase_params['test_network']['SSID']):
688            self.log.info('Already connected to desired network')
689        else:
690            wutils.reset_wifi(self.dut)
691            wutils.set_wifi_country_code(self.dut,
692                                         self.testclass_params['country_code'])
693            testcase_params['test_network']['channel'] = testcase_params[
694                'channel']
695            wutils.wifi_connect(self.dut,
696                                testcase_params['test_network'],
697                                num_of_tries=5,
698                                check_connectivity=False)
699        self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0]
700
701    def process_testclass_results(self):
702        """Saves and plots test results from all executed test cases."""
703        testclass_results_dict = collections.OrderedDict()
704        id_fields = ['channel', 'mode', 'rate']
705        plots = []
706        for result in self.testclass_results:
707            test_id = self.extract_test_id(result['testcase_params'],
708                                           id_fields)
709            test_id = tuple(test_id.items())
710            chain_mask = result['testcase_params']['chain_mask']
711            num_streams = result['testcase_params']['num_streams']
712            line_id = (chain_mask, num_streams)
713            if test_id not in testclass_results_dict:
714                testclass_results_dict[test_id] = collections.OrderedDict()
715            if line_id not in testclass_results_dict[test_id]:
716                testclass_results_dict[test_id][line_id] = {
717                    'orientation': [],
718                    'sensitivity': []
719                }
720            orientation = result['testcase_params']['orientation']
721            if result['peak_throughput_pct'] >= 95:
722                sensitivity = result['sensitivity']
723            else:
724                sensitivity = float('nan')
725            if orientation not in testclass_results_dict[test_id][line_id][
726                    'orientation']:
727                testclass_results_dict[test_id][line_id]['orientation'].append(
728                    orientation)
729                testclass_results_dict[test_id][line_id]['sensitivity'].append(
730                    sensitivity)
731            else:
732                testclass_results_dict[test_id][line_id]['sensitivity'][
733                    -1] = sensitivity
734
735        for test_id, test_data in testclass_results_dict.items():
736            test_id_dict = dict(test_id)
737            if 'legacy' in test_id_dict['mode']:
738                test_id_str = 'Channel {} - {} {}Mbps'.format(
739                    test_id_dict['channel'], test_id_dict['mode'],
740                    test_id_dict['rate'])
741            else:
742                test_id_str = 'Channel {} - {} MCS{}'.format(
743                    test_id_dict['channel'], test_id_dict['mode'],
744                    test_id_dict['rate'])
745            curr_plot = wputils.BokehFigure(
746                title=str(test_id_str),
747                x_label='Orientation (deg)',
748                primary_y_label='Sensitivity (dBm)')
749            for line_id, line_results in test_data.items():
750                curr_plot.add_line(line_results['orientation'],
751                                   line_results['sensitivity'],
752                                   legend='Nss{} - Chain Mask {}'.format(
753                                       line_id[1], line_id[0]),
754                                   marker='circle')
755                if 'legacy' in test_id_dict['mode']:
756                    metric_tag = 'ota_summary_ch{}_{}_{}_ch{}'.format(
757                        test_id_dict['channel'], test_id_dict['mode'],
758                        test_id_dict['rate'], line_id[0])
759                else:
760                    metric_tag = 'ota_summary_ch{}_{}_mcs{}_nss{}_ch{}'.format(
761                        test_id_dict['channel'], test_id_dict['mode'],
762                        test_id_dict['rate'], line_id[1], line_id[0])
763
764                metric_name = metric_tag + '.avg_sensitivity'
765                metric_value = numpy.nanmean(line_results['sensitivity'])
766                self.testclass_metric_logger.add_metric(
767                    metric_name, metric_value)
768                self.log.info(("Average Sensitivity for {}: {:.1f}").format(
769                    metric_tag, metric_value))
770            current_context = (
771                context.get_current_context().get_full_output_path())
772            output_file_path = os.path.join(current_context,
773                                            str(test_id_str) + '.html')
774            curr_plot.generate_figure(output_file_path)
775            plots.append(curr_plot)
776        output_file_path = os.path.join(current_context, 'results.html')
777        wputils.BokehFigure.save_figures(plots, output_file_path)
778
779    def get_start_atten(self, testcase_params):
780        """Gets the starting attenuation for this sensitivity test.
781
782        The function gets the starting attenuation by checking whether a test
783        at the same rate configuration has executed. If so it sets the starting
784        point a configurable number of dBs below the reference test.
785
786        Returns:
787            start_atten: starting attenuation for current test
788        """
789        # If the test is being retried, start from the beginning
790        if self.retry_flag:
791            self.log.info('Retry flag set. Setting attenuation to minimum.')
792            return self.testclass_params['atten_start']
793        # Get the current and reference test config. The reference test is the
794        # one performed at the current MCS+1
795        ref_test_params = self.extract_test_id(
796            testcase_params,
797            ['channel', 'mode', 'rate', 'num_streams', 'chain_mask'])
798        # Check if reference test has been run and set attenuation accordingly
799        previous_params = [
800            self.extract_test_id(
801                result['testcase_params'],
802                ['channel', 'mode', 'rate', 'num_streams', 'chain_mask'])
803            for result in self.testclass_results
804        ]
805        try:
806            ref_index = previous_params[::-1].index(ref_test_params)
807            ref_index = len(previous_params) - 1 - ref_index
808            start_atten = self.testclass_results[ref_index][
809                'atten_at_range'] - (
810                    self.testclass_params['adjacent_mcs_range_gap'])
811        except ValueError:
812            print('Reference test not found. Starting from {} dB'.format(
813                self.testclass_params['atten_start']))
814            start_atten = self.testclass_params['atten_start']
815        start_atten = max(start_atten, 0)
816        return start_atten
817
818    def generate_test_cases(self, channels, modes, requested_rates, chain_mask,
819                            angles):
820        """Function that auto-generates test cases for a test class."""
821        test_cases = []
822        for channel in channels:
823            requested_modes = [
824                mode for mode in modes
825                if mode in self.VALID_TEST_CONFIGS[channel]
826            ]
827            for chain, mode in itertools.product(chain_mask, requested_modes):
828                if 'VHT' in mode:
829                    valid_rates = self.VALID_RATES[mode]
830                elif 'HT' in mode:
831                    valid_rates = self.VALID_RATES[mode]
832                elif 'legacy' in mode and channel < 14:
833                    valid_rates = self.VALID_RATES['legacy_2GHz']
834                elif 'legacy' in mode and channel > 14:
835                    valid_rates = self.VALID_RATES['legacy_5GHz']
836                else:
837                    raise ValueError('Invalid test mode.')
838                for rate, angle in itertools.product(valid_rates, angles):
839                    testcase_params = collections.OrderedDict(
840                        channel=channel,
841                        mode=mode,
842                        rate=rate.mcs,
843                        num_streams=rate.streams,
844                        short_gi=1,
845                        chain_mask=chain,
846                        orientation=angle)
847                    if rate not in requested_rates:
848                        continue
849                    if str(chain) in ['0', '1'] and rate[1] == 2:
850                        # Do not test 2-stream rates in single chain mode
851                        continue
852                    if 'legacy' in mode:
853                        testcase_name = ('test_sensitivity_ch{}_{}_{}_nss{}'
854                                         '_ch{}_{}deg'.format(
855                                             channel, mode,
856                                             str(rate.mcs).replace('.', 'p'),
857                                             rate.streams, chain, angle))
858                    else:
859                        testcase_name = ('test_sensitivity_ch{}_{}_mcs{}_nss{}'
860                                         '_ch{}_{}deg'.format(
861                                             channel, mode, rate.mcs,
862                                             rate.streams, chain, angle))
863                    setattr(self, testcase_name,
864                            partial(self._test_sensitivity, testcase_params))
865                    test_cases.append(testcase_name)
866        return test_cases
867
868
869class WifiOtaSensitivity_TenDegree_Test(WifiOtaSensitivityTest):
870    def __init__(self, controllers):
871        WifiOtaSensitivityTest.__init__(self, controllers)
872        requested_channels = [6, 36, 149]
873        requested_rates = [
874            self.RateTuple(8, 1, 86.7),
875            self.RateTuple(2, 1, 21.7),
876            self.RateTuple(8, 2, 173.3),
877            self.RateTuple(2, 2, 43.3)
878        ]
879        self.tests = self.generate_test_cases(requested_channels,
880                                              ['VHT20', 'VHT80'],
881                                              requested_rates, ['2x2'],
882                                              list(range(0, 360, 10)))
883
884
885class WifiOtaSensitivity_PerChain_TenDegree_Test(WifiOtaSensitivityTest):
886    def __init__(self, controllers):
887        WifiOtaSensitivityTest.__init__(self, controllers)
888        requested_channels = [6, 36, 149]
889        requested_rates = [
890            self.RateTuple(2, 1, 21.7),
891            self.RateTuple(2, 2, 43.3)
892        ]
893        self.tests = self.generate_test_cases(requested_channels, ['VHT20'],
894                                              requested_rates,
895                                              ['0', '1', '2x2'],
896                                              list(range(0, 360, 10)))
897
898
899class WifiOtaSensitivity_ThirtyDegree_Test(WifiOtaSensitivityTest):
900    def __init__(self, controllers):
901        WifiOtaSensitivityTest.__init__(self, controllers)
902        requested_channels = [6, 36, 149]
903        requested_rates = [
904            self.RateTuple(9, 1, 96),
905            self.RateTuple(8, 1, 86.7),
906            self.RateTuple(7, 1, 72.2),
907            self.RateTuple(4, 1, 43.3),
908            self.RateTuple(2, 1, 21.7),
909            self.RateTuple(0, 1, 7.2),
910            self.RateTuple(9, 2, 192),
911            self.RateTuple(8, 2, 173.3),
912            self.RateTuple(7, 2, 144.4),
913            self.RateTuple(4, 2, 86.7),
914            self.RateTuple(2, 2, 43.3),
915            self.RateTuple(0, 2, 14.4)
916        ]
917        self.tests = self.generate_test_cases(requested_channels,
918                                              ['VHT20', 'VHT80'],
919                                              requested_rates, ['2x2'],
920                                              list(range(0, 360, 30)))
921
922
923class WifiOtaSensitivity_45Degree_Test(WifiOtaSensitivityTest):
924    def __init__(self, controllers):
925        WifiOtaSensitivityTest.__init__(self, controllers)
926        requested_rates = [
927            self.RateTuple(8, 1, 86.7),
928            self.RateTuple(2, 1, 21.7),
929            self.RateTuple(8, 2, 173.3),
930            self.RateTuple(2, 2, 43.3)
931        ]
932        self.tests = self.generate_test_cases(
933            [1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161], ['VHT20', 'VHT80'],
934            requested_rates, ['2x2'], list(range(0, 360, 45)))
935