1#!/usr/bin/env python3.4
2#
3#   Copyright 2017 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import collections
18import itertools
19import json
20import logging
21import os
22import statistics
23from acts import asserts
24from acts import context
25from acts import base_test
26from acts import utils
27from acts.controllers.utils_lib import ssh
28from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger
29from acts.test_utils.wifi import ota_chamber
30from acts.test_utils.wifi import ota_sniffer
31from acts.test_utils.wifi import wifi_performance_test_utils as wputils
32from acts.test_utils.wifi import wifi_retail_ap as retail_ap
33from acts.test_utils.wifi import wifi_test_utils as wutils
34from functools import partial
35
36
37class WifiPingTest(base_test.BaseTestClass):
38    """Class for ping-based Wifi performance tests.
39
40    This class implements WiFi ping performance tests such as range and RTT.
41    The class setups up the AP in the desired configurations, configures
42    and connects the phone to the AP, and runs  For an example config file to
43    run this test class see example_connectivity_performance_ap_sta.json.
44    """
45
46    TEST_TIMEOUT = 10
47    RSSI_POLL_INTERVAL = 0.2
48    SHORT_SLEEP = 1
49    MED_SLEEP = 5
50    MAX_CONSECUTIVE_ZEROS = 5
51    DISCONNECTED_PING_RESULT = {
52        'connected': 0,
53        'rtt': [],
54        'time_stamp': [],
55        'ping_interarrivals': [],
56        'packet_loss_percentage': 100
57    }
58
59    def __init__(self, controllers):
60        base_test.BaseTestClass.__init__(self, controllers)
61        self.testcase_metric_logger = (
62            BlackboxMappedMetricLogger.for_test_case())
63        self.testclass_metric_logger = (
64            BlackboxMappedMetricLogger.for_test_class())
65        self.publish_testcase_metrics = True
66
67    def setup_class(self):
68        self.dut = self.android_devices[-1]
69        req_params = [
70            'ping_test_params', 'testbed_params', 'main_network',
71            'RetailAccessPoints', 'RemoteServer'
72        ]
73        opt_params = ['OTASniffer']
74        self.unpack_userparams(req_params, opt_params)
75        self.testclass_params = self.ping_test_params
76        self.num_atten = self.attenuators[0].instrument.num_atten
77        self.ping_server = ssh.connection.SshConnection(
78            ssh.settings.from_config(self.RemoteServer[0]['ssh_config']))
79        self.access_point = retail_ap.create(self.RetailAccessPoints)[0]
80        if hasattr(self,
81                   'OTASniffer') and self.testbed_params['sniffer_enable']:
82            self.sniffer = ota_sniffer.create(self.OTASniffer)[0]
83        self.log.info('Access Point Configuration: {}'.format(
84            self.access_point.ap_settings))
85        self.log_path = os.path.join(logging.log_path, 'results')
86        os.makedirs(self.log_path, exist_ok=True)
87        self.atten_dut_chain_map = {}
88        self.testclass_results = []
89
90        # Turn WiFi ON
91        if self.testclass_params.get('airplane_mode', 1):
92            self.log.info('Turning on airplane mode.')
93            asserts.assert_true(utils.force_airplane_mode(self.dut, True),
94                                "Can not turn on airplane mode.")
95        wutils.wifi_toggle_state(self.dut, True)
96
97        # Configure test retries
98        self.user_params['retry_tests'] = [self.__class__.__name__]
99
100    def teardown_class(self):
101        # Turn WiFi OFF and reset AP
102        for dev in self.android_devices:
103            wutils.wifi_toggle_state(dev, False)
104        self.process_testclass_results()
105
106    def setup_test(self):
107        self.retry_flag = False
108
109    def teardown_test(self):
110        self.retry_flag = False
111
112    def on_retry(self):
113        """Function to control test logic on retried tests.
114
115        This function is automatically executed on tests that are being
116        retried. In this case the function resets wifi, toggles it off and on
117        and sets a retry_flag to enable further tweaking the test logic on
118        second attempts.
119        """
120        self.retry_flag = True
121        for dev in self.android_devices:
122            wutils.reset_wifi(dev)
123            wutils.toggle_wifi_off_and_on(dev)
124
125    def process_testclass_results(self):
126        """Saves all test results to enable comparison."""
127        testclass_summary = {}
128        for test in self.testclass_results:
129            if 'range' in test['test_name']:
130                testclass_summary[test['test_name']] = test['range']
131        # Save results
132        results_file_path = os.path.join(self.log_path,
133                                         'testclass_summary.json')
134        with open(results_file_path, 'w') as results_file:
135            json.dump(testclass_summary, results_file, indent=4)
136
137    def pass_fail_check_ping_rtt(self, result):
138        """Check the test result and decide if it passed or failed.
139
140        The function computes RTT statistics and fails any tests in which the
141        tail of the ping latency results exceeds the threshold defined in the
142        configuration file.
143
144        Args:
145            result: dict containing ping results and other meta data
146        """
147        ignored_fraction = (self.testclass_params['rtt_ignored_interval'] /
148                            self.testclass_params['rtt_ping_duration'])
149        sorted_rtt = [
150            sorted(x['rtt'][round(ignored_fraction * len(x['rtt'])):])
151            for x in result['ping_results']
152        ]
153        disconnected = any([len(x) == 0 for x in sorted_rtt])
154        if disconnected:
155            asserts.fail('Test failed. DUT disconnected at least once.')
156
157        rtt_at_test_percentile = [
158            x[int((1 - self.testclass_params['rtt_test_percentile'] / 100) *
159                  len(x))] for x in sorted_rtt
160        ]
161        # Set blackbox metric
162        if self.publish_testcase_metrics:
163            self.testcase_metric_logger.add_metric('ping_rtt',
164                                                   max(rtt_at_test_percentile))
165        # Evaluate test pass/fail
166        rtt_failed = any([
167            rtt > self.testclass_params['rtt_threshold'] * 1000
168            for rtt in rtt_at_test_percentile
169        ])
170        if rtt_failed:
171            #TODO: figure out how to cleanly exclude RTT tests from retry
172            asserts.explicit_pass(
173                'Test failed. RTTs at test percentile = {}'.format(
174                    rtt_at_test_percentile))
175        else:
176            asserts.explicit_pass(
177                'Test Passed. RTTs at test percentile = {}'.format(
178                    rtt_at_test_percentile))
179
180    def pass_fail_check_ping_range(self, result):
181        """Check the test result and decide if it passed or failed.
182
183        Checks whether the attenuation at which ping packet losses begin to
184        exceed the threshold matches the range derived from golden
185        rate-vs-range result files. The test fails is ping range is
186        range_gap_threshold worse than RvR range.
187
188        Args:
189            result: dict containing ping results and meta data
190        """
191        # Get target range
192        #rvr_range = self.get_range_from_rvr()
193        # Set Blackbox metric
194        if self.publish_testcase_metrics:
195            self.testcase_metric_logger.add_metric('ping_range',
196                                                   result['range'])
197        # Evaluate test pass/fail
198        test_message = ('Attenuation at range is {}dB. '
199                        'LLStats at Range: {}'.format(
200                            result['range'], result['llstats_at_range']))
201        if result['peak_throughput_pct'] < 95:
202            asserts.fail("(RESULT NOT RELIABLE) {}".format(test_message))
203        else:
204            asserts.explicit_pass(test_message)
205
206    def pass_fail_check(self, result):
207        if 'range' in result['testcase_params']['test_type']:
208            self.pass_fail_check_ping_range(result)
209        else:
210            self.pass_fail_check_ping_rtt(result)
211
212    def process_ping_results(self, testcase_params, ping_range_result):
213        """Saves and plots ping results.
214
215        Args:
216            ping_range_result: dict containing ping results and metadata
217        """
218        # Compute range
219        ping_loss_over_att = [
220            x['packet_loss_percentage']
221            for x in ping_range_result['ping_results']
222        ]
223        ping_loss_above_threshold = [
224            x > self.testclass_params['range_ping_loss_threshold']
225            for x in ping_loss_over_att
226        ]
227        for idx in range(len(ping_loss_above_threshold)):
228            if all(ping_loss_above_threshold[idx:]):
229                range_index = max(idx, 1) - 1
230                break
231        else:
232            range_index = -1
233        ping_range_result['atten_at_range'] = testcase_params['atten_range'][
234            range_index]
235        ping_range_result['peak_throughput_pct'] = 100 - min(
236            ping_loss_over_att)
237        ping_range_result['range'] = (ping_range_result['atten_at_range'] +
238                                      ping_range_result['fixed_attenuation'])
239        ping_range_result['llstats_at_range'] = (
240            'TX MCS = {0} ({1:.1f}%). '
241            'RX MCS = {2} ({3:.1f}%)'.format(
242                ping_range_result['llstats'][range_index]['summary']
243                ['common_tx_mcs'], ping_range_result['llstats'][range_index]
244                ['summary']['common_tx_mcs_freq'] * 100,
245                ping_range_result['llstats'][range_index]['summary']
246                ['common_rx_mcs'], ping_range_result['llstats'][range_index]
247                ['summary']['common_rx_mcs_freq'] * 100))
248
249        # Save results
250        results_file_path = os.path.join(
251            self.log_path, '{}.json'.format(self.current_test_name))
252        with open(results_file_path, 'w') as results_file:
253            json.dump(ping_range_result, results_file, indent=4)
254
255        # Plot results
256        if 'range' not in self.current_test_name:
257            figure = wputils.BokehFigure(
258                self.current_test_name,
259                x_label='Timestamp (s)',
260                primary_y_label='Round Trip Time (ms)')
261            for idx, result in enumerate(ping_range_result['ping_results']):
262                if len(result['rtt']) > 1:
263                    x_data = [
264                        t - result['time_stamp'][0]
265                        for t in result['time_stamp']
266                    ]
267                    figure.add_line(
268                        x_data, result['rtt'], 'RTT @ {}dB'.format(
269                            ping_range_result['attenuation'][idx]))
270
271            output_file_path = os.path.join(
272                self.log_path, '{}.html'.format(self.current_test_name))
273            figure.generate_figure(output_file_path)
274
275    def run_ping_test(self, testcase_params):
276        """Main function to test ping.
277
278        The function sets up the AP in the correct channel and mode
279        configuration and calls get_ping_stats while sweeping attenuation
280
281        Args:
282            testcase_params: dict containing all test parameters
283        Returns:
284            test_result: dict containing ping results and other meta data
285        """
286        # Prepare results dict
287        llstats_obj = wputils.LinkLayerStats(
288            self.dut, self.testclass_params.get('llstats_enabled', True))
289        test_result = collections.OrderedDict()
290        test_result['testcase_params'] = testcase_params.copy()
291        test_result['test_name'] = self.current_test_name
292        test_result['ap_config'] = self.access_point.ap_settings.copy()
293        test_result['attenuation'] = testcase_params['atten_range']
294        test_result['fixed_attenuation'] = self.testbed_params[
295            'fixed_attenuation'][str(testcase_params['channel'])]
296        test_result['rssi_results'] = []
297        test_result['ping_results'] = []
298        test_result['llstats'] = []
299        # Setup sniffer
300        if self.testbed_params['sniffer_enable']:
301            self.sniffer.start_capture(
302                testcase_params['test_network'],
303                chan=int(testcase_params['channel']),
304                bw=int(testcase_params['mode'][3:]),
305                duration=testcase_params['ping_duration'] *
306                len(testcase_params['atten_range']) + self.TEST_TIMEOUT)
307        # Run ping and sweep attenuation as needed
308        zero_counter = 0
309        for atten in testcase_params['atten_range']:
310            for attenuator in self.attenuators:
311                attenuator.set_atten(atten, strict=False)
312            rssi_future = wputils.get_connected_rssi_nb(
313                self.dut,
314                int(testcase_params['ping_duration'] / 2 /
315                    self.RSSI_POLL_INTERVAL), self.RSSI_POLL_INTERVAL,
316                testcase_params['ping_duration'] / 2)
317            # Refresh link layer stats
318            llstats_obj.update_stats()
319            current_ping_stats = wputils.get_ping_stats(
320                self.ping_server, self.dut_ip,
321                testcase_params['ping_duration'],
322                testcase_params['ping_interval'], testcase_params['ping_size'])
323            current_rssi = rssi_future.result()
324            test_result['rssi_results'].append(current_rssi)
325            llstats_obj.update_stats()
326            curr_llstats = llstats_obj.llstats_incremental.copy()
327            test_result['llstats'].append(curr_llstats)
328            if current_ping_stats['connected']:
329                self.log.info(
330                    'Attenuation = {0}dB\tPacket Loss = {1}%\t'
331                    'Avg RTT = {2:.2f}ms\tRSSI = {3} [{4},{5}]\t'.format(
332                        atten, current_ping_stats['packet_loss_percentage'],
333                        statistics.mean(current_ping_stats['rtt']),
334                        current_rssi['signal_poll_rssi']['mean'],
335                        current_rssi['chain_0_rssi']['mean'],
336                        current_rssi['chain_1_rssi']['mean']))
337                if current_ping_stats['packet_loss_percentage'] == 100:
338                    zero_counter = zero_counter + 1
339                else:
340                    zero_counter = 0
341            else:
342                self.log.info(
343                    'Attenuation = {}dB. Disconnected.'.format(atten))
344                zero_counter = zero_counter + 1
345            test_result['ping_results'].append(current_ping_stats.as_dict())
346            if zero_counter == self.MAX_CONSECUTIVE_ZEROS:
347                self.log.info('Ping loss stable at 100%. Stopping test now.')
348                for idx in range(
349                        len(testcase_params['atten_range']) -
350                        len(test_result['ping_results'])):
351                    test_result['ping_results'].append(
352                        self.DISCONNECTED_PING_RESULT)
353                break
354        if self.testbed_params['sniffer_enable']:
355            self.sniffer.stop_capture()
356        return test_result
357
358    def setup_ap(self, testcase_params):
359        """Sets up the access point in the configuration required by the test.
360
361        Args:
362            testcase_params: dict containing AP and other test params
363        """
364        band = self.access_point.band_lookup_by_channel(
365            testcase_params['channel'])
366        if '2G' in band:
367            frequency = wutils.WifiEnums.channel_2G_to_freq[
368                testcase_params['channel']]
369        else:
370            frequency = wutils.WifiEnums.channel_5G_to_freq[
371                testcase_params['channel']]
372        if frequency in wutils.WifiEnums.DFS_5G_FREQUENCIES:
373            self.access_point.set_region(self.testbed_params['DFS_region'])
374        else:
375            self.access_point.set_region(self.testbed_params['default_region'])
376        self.access_point.set_channel(band, testcase_params['channel'])
377        self.access_point.set_bandwidth(band, testcase_params['mode'])
378        if 'low' in testcase_params['ap_power']:
379            self.log.info('Setting low AP power.')
380            self.access_point.set_power(
381                band, self.testclass_params['low_ap_tx_power'])
382        self.log.info('Access Point Configuration: {}'.format(
383            self.access_point.ap_settings))
384
385    def setup_dut(self, testcase_params):
386        """Sets up the DUT in the configuration required by the test.
387
388        Args:
389            testcase_params: dict containing AP and other test params
390        """
391        # Check battery level before test
392        if not wputils.health_check(self.dut, 10):
393            asserts.skip('Battery level too low. Skipping test.')
394        # Turn screen off to preserve battery
395        self.dut.go_to_sleep()
396        if wputils.validate_network(self.dut,
397                                    testcase_params['test_network']['SSID']):
398            self.log.info('Already connected to desired network')
399        else:
400            wutils.reset_wifi(self.dut)
401            wutils.set_wifi_country_code(self.dut,
402                                         self.testclass_params['country_code'])
403            testcase_params['test_network']['channel'] = testcase_params[
404                'channel']
405            wutils.wifi_connect(self.dut,
406                                testcase_params['test_network'],
407                                num_of_tries=5,
408                                check_connectivity=True)
409        self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0]
410        if testcase_params['channel'] not in self.atten_dut_chain_map.keys():
411            self.atten_dut_chain_map[testcase_params[
412                'channel']] = wputils.get_current_atten_dut_chain_map(
413                    self.attenuators, self.dut, self.ping_server)
414        self.log.info("Current Attenuator-DUT Chain Map: {}".format(
415            self.atten_dut_chain_map[testcase_params['channel']]))
416        for idx, atten in enumerate(self.attenuators):
417            if self.atten_dut_chain_map[testcase_params['channel']][
418                    idx] == testcase_params['attenuated_chain']:
419                atten.offset = atten.instrument.max_atten
420            else:
421                atten.offset = 0
422
423    def setup_ping_test(self, testcase_params):
424        """Function that gets devices ready for the test.
425
426        Args:
427            testcase_params: dict containing test-specific parameters
428        """
429        # Configure AP
430        self.setup_ap(testcase_params)
431        # Set attenuator to 0 dB
432        for attenuator in self.attenuators:
433            attenuator.set_atten(0, strict=False)
434        # Reset, configure, and connect DUT
435        self.setup_dut(testcase_params)
436
437    def get_range_start_atten(self, testcase_params):
438        """Gets the starting attenuation for this ping test.
439
440        This function is used to get the starting attenuation for ping range
441        tests. This implementation returns the default starting attenuation,
442        however, defining this function enables a more involved configuration
443        for over-the-air test classes.
444
445        Args:
446            testcase_params: dict containing all test params
447        """
448        return self.testclass_params['range_atten_start']
449
450    def compile_test_params(self, testcase_params):
451        band = self.access_point.band_lookup_by_channel(
452            testcase_params['channel'])
453        testcase_params['test_network'] = self.main_network[band]
454        if testcase_params['chain_mask'] in ['0', '1']:
455            testcase_params['attenuated_chain'] = 'DUT-Chain-{}'.format(
456                1 if testcase_params['chain_mask'] == '0' else 0)
457        else:
458            # Set attenuated chain to -1. Do not set to None as this will be
459            # compared to RF chain map which may include None
460            testcase_params['attenuated_chain'] = -1
461        if testcase_params['test_type'] == 'test_ping_range':
462            testcase_params.update(
463                ping_interval=self.testclass_params['range_ping_interval'],
464                ping_duration=self.testclass_params['range_ping_duration'],
465                ping_size=self.testclass_params['ping_size'],
466            )
467        elif testcase_params['test_type'] == 'test_fast_ping_rtt':
468            testcase_params.update(
469                ping_interval=self.testclass_params['rtt_ping_interval']
470                ['fast'],
471                ping_duration=self.testclass_params['rtt_ping_duration'],
472                ping_size=self.testclass_params['ping_size'],
473            )
474        elif testcase_params['test_type'] == 'test_slow_ping_rtt':
475            testcase_params.update(
476                ping_interval=self.testclass_params['rtt_ping_interval']
477                ['slow'],
478                ping_duration=self.testclass_params['rtt_ping_duration'],
479                ping_size=self.testclass_params['ping_size'])
480
481        if testcase_params['test_type'] == 'test_ping_range':
482            start_atten = self.get_range_start_atten(testcase_params)
483            num_atten_steps = int(
484                (self.testclass_params['range_atten_stop'] - start_atten) /
485                self.testclass_params['range_atten_step'])
486            testcase_params['atten_range'] = [
487                start_atten + x * self.testclass_params['range_atten_step']
488                for x in range(0, num_atten_steps)
489            ]
490        else:
491            testcase_params['atten_range'] = self.testclass_params[
492                'rtt_test_attenuation']
493        return testcase_params
494
495    def _test_ping(self, testcase_params):
496        """ Function that gets called for each range test case
497
498        The function gets called in each range test case. It customizes the
499        range test based on the test name of the test that called it
500
501        Args:
502            testcase_params: dict containing preliminary set of parameters
503        """
504        # Compile test parameters from config and test name
505        testcase_params = self.compile_test_params(testcase_params)
506        # Run ping test
507        self.setup_ping_test(testcase_params)
508        ping_result = self.run_ping_test(testcase_params)
509        # Postprocess results
510        self.process_ping_results(testcase_params, ping_result)
511        self.testclass_results.append(ping_result)
512        self.pass_fail_check(ping_result)
513
514    def generate_test_cases(self, ap_power, channels, modes, chain_mask,
515                            test_types):
516        test_cases = []
517        allowed_configs = {
518            'VHT20': [
519                1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 36, 40, 44, 48, 149, 153,
520                157, 161
521            ],
522            'VHT40': [36, 44, 149, 157],
523            'VHT80': [36, 149]
524        }
525        for channel, mode, chain, test_type in itertools.product(
526                channels, modes, chain_mask, test_types):
527            if channel not in allowed_configs[mode]:
528                continue
529            testcase_name = '{}_ch{}_{}_ch{}'.format(test_type, channel, mode,
530                                                     chain)
531            testcase_params = collections.OrderedDict(test_type=test_type,
532                                                      ap_power=ap_power,
533                                                      channel=channel,
534                                                      mode=mode,
535                                                      chain_mask=chain)
536            setattr(self, testcase_name,
537                    partial(self._test_ping, testcase_params))
538            test_cases.append(testcase_name)
539        return test_cases
540
541
542class WifiPing_TwoChain_Test(WifiPingTest):
543    def __init__(self, controllers):
544        super().__init__(controllers)
545        self.tests = self.generate_test_cases(
546            ap_power='standard',
547            channels=[1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161],
548            modes=['VHT20', 'VHT40', 'VHT80'],
549            test_types=[
550                'test_ping_range', 'test_fast_ping_rtt', 'test_slow_ping_rtt'
551            ],
552            chain_mask=['2x2'])
553
554
555class WifiPing_PerChainRange_Test(WifiPingTest):
556    def __init__(self, controllers):
557        super().__init__(controllers)
558        self.tests = self.generate_test_cases(
559            ap_power='standard',
560            chain_mask=['0', '1', '2x2'],
561            channels=[1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161],
562            modes=['VHT20', 'VHT40', 'VHT80'],
563            test_types=['test_ping_range'])
564
565
566class WifiPing_LowPowerAP_Test(WifiPingTest):
567    def __init__(self, controllers):
568        super().__init__(controllers)
569        self.tests = self.generate_test_cases(
570            ap_power='low_power',
571            chain_mask=['0', '1', '2x2'],
572            channels=[1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161],
573            modes=['VHT20', 'VHT40', 'VHT80'],
574            test_types=['test_ping_range'])
575
576
577# Over-the air version of ping tests
578class WifiOtaPingTest(WifiPingTest):
579    """Class to test over-the-air ping
580
581    This class tests WiFi ping performance in an OTA chamber. It enables
582    setting turntable orientation and other chamber parameters to study
583    performance in varying channel conditions
584    """
585    def __init__(self, controllers):
586        base_test.BaseTestClass.__init__(self, controllers)
587        self.testcase_metric_logger = (
588            BlackboxMappedMetricLogger.for_test_case())
589        self.testclass_metric_logger = (
590            BlackboxMappedMetricLogger.for_test_class())
591        self.publish_testcase_metrics = False
592
593    def setup_class(self):
594        WifiPingTest.setup_class(self)
595        self.ota_chamber = ota_chamber.create(
596            self.user_params['OTAChamber'])[0]
597
598    def teardown_class(self):
599        WifiPingTest.teardown_class(self)
600        self.process_testclass_results()
601        self.ota_chamber.reset_chamber()
602
603    def process_testclass_results(self):
604        """Saves all test results to enable comparison."""
605        WifiPingTest.process_testclass_results(self)
606
607        range_vs_angle = collections.OrderedDict()
608        for test in self.testclass_results:
609            curr_params = test['testcase_params']
610            curr_config = curr_params['channel']
611            if curr_config in range_vs_angle:
612                if curr_params['position'] not in range_vs_angle[curr_config][
613                        'position']:
614                    range_vs_angle[curr_config]['position'].append(
615                        curr_params['position'])
616                    range_vs_angle[curr_config]['range'].append(test['range'])
617                    range_vs_angle[curr_config]['llstats_at_range'].append(
618                        test['llstats_at_range'])
619                else:
620                    range_vs_angle[curr_config]['range'][-1] = test['range']
621                    range_vs_angle[curr_config]['llstats_at_range'][-1] = test[
622                        'llstats_at_range']
623            else:
624                range_vs_angle[curr_config] = {
625                    'position': [curr_params['position']],
626                    'range': [test['range']],
627                    'llstats_at_range': [test['llstats_at_range']]
628                }
629        chamber_mode = self.testclass_results[0]['testcase_params'][
630            'chamber_mode']
631        if chamber_mode == 'orientation':
632            x_label = 'Angle (deg)'
633        elif chamber_mode == 'stepped stirrers':
634            x_label = 'Position Index'
635        figure = wputils.BokehFigure(
636            title='Range vs. Position',
637            x_label=x_label,
638            primary_y_label='Range (dB)',
639        )
640        for channel, channel_data in range_vs_angle.items():
641            figure.add_line(x_data=channel_data['position'],
642                            y_data=channel_data['range'],
643                            hover_text=channel_data['llstats_at_range'],
644                            legend='Channel {}'.format(channel))
645            average_range = sum(channel_data['range']) / len(
646                channel_data['range'])
647            self.log.info('Average range for Channel {} is: {}dB'.format(
648                channel, average_range))
649            metric_name = 'ota_summary_ch{}.avg_range'.format(channel)
650            self.testclass_metric_logger.add_metric(metric_name, average_range)
651        current_context = context.get_current_context().get_full_output_path()
652        plot_file_path = os.path.join(current_context, 'results.html')
653        figure.generate_figure(plot_file_path)
654
655        # Save results
656        results_file_path = os.path.join(current_context,
657                                         'testclass_summary.json')
658        with open(results_file_path, 'w') as results_file:
659            json.dump(range_vs_angle, results_file, indent=4)
660
661    def setup_ping_test(self, testcase_params):
662        WifiPingTest.setup_ping_test(self, testcase_params)
663        # Setup turntable
664        if testcase_params['chamber_mode'] == 'orientation':
665            self.ota_chamber.set_orientation(testcase_params['position'])
666        elif testcase_params['chamber_mode'] == 'stepped stirrers':
667            self.ota_chamber.step_stirrers(testcase_params['total_positions'])
668
669    def extract_test_id(self, testcase_params, id_fields):
670        test_id = collections.OrderedDict(
671            (param, testcase_params[param]) for param in id_fields)
672        return test_id
673
674    def get_range_start_atten(self, testcase_params):
675        """Gets the starting attenuation for this ping test.
676
677        The function gets the starting attenuation by checking whether a test
678        at the same configuration has executed. If so it sets the starting
679        point a configurable number of dBs below the reference test.
680
681        Returns:
682            start_atten: starting attenuation for current test
683        """
684        # If the test is being retried, start from the beginning
685        if self.retry_flag:
686            self.log.info('Retry flag set. Setting attenuation to minimum.')
687            return self.testclass_params['range_atten_start']
688        # Get the current and reference test config. The reference test is the
689        # one performed at the current MCS+1
690        ref_test_params = self.extract_test_id(testcase_params,
691                                               ['channel', 'mode'])
692        # Check if reference test has been run and set attenuation accordingly
693        previous_params = [
694            self.extract_test_id(result['testcase_params'],
695                                 ['channel', 'mode'])
696            for result in self.testclass_results
697        ]
698        try:
699            ref_index = previous_params[::-1].index(ref_test_params)
700            ref_index = len(previous_params) - 1 - ref_index
701            start_atten = self.testclass_results[ref_index][
702                'atten_at_range'] - (
703                    self.testclass_params['adjacent_range_test_gap'])
704        except ValueError:
705            self.log.info(
706                'Reference test not found. Starting from {} dB'.format(
707                    self.testclass_params['range_atten_start']))
708            start_atten = self.testclass_params['range_atten_start']
709        return start_atten
710
711    def generate_test_cases(self, ap_power, channels, modes, chamber_mode,
712                            positions):
713        test_cases = []
714        allowed_configs = {
715            'VHT20': [
716                1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 36, 40, 44, 48, 149, 153,
717                157, 161
718            ],
719            'VHT40': [36, 44, 149, 157],
720            'VHT80': [36, 149]
721        }
722        for channel, mode, position in itertools.product(
723                channels, modes, positions):
724            if channel not in allowed_configs[mode]:
725                continue
726            testcase_name = 'test_ping_range_ch{}_{}_pos{}'.format(
727                channel, mode, position)
728            testcase_params = collections.OrderedDict(
729                test_type='test_ping_range',
730                ap_power=ap_power,
731                channel=channel,
732                mode=mode,
733                chain_mask='2x2',
734                chamber_mode=chamber_mode,
735                total_positions=len(positions),
736                position=position)
737            setattr(self, testcase_name,
738                    partial(self._test_ping, testcase_params))
739            test_cases.append(testcase_name)
740        return test_cases
741
742
743class WifiOtaPing_TenDegree_Test(WifiOtaPingTest):
744    def __init__(self, controllers):
745        WifiOtaPingTest.__init__(self, controllers)
746        self.tests = self.generate_test_cases(ap_power='standard',
747                                              channels=[6, 36, 149],
748                                              modes=['VHT20'],
749                                              chamber_mode='orientation',
750                                              positions=list(range(0, 360,
751                                                                   10)))
752
753
754class WifiOtaPing_45Degree_Test(WifiOtaPingTest):
755    def __init__(self, controllers):
756        WifiOtaPingTest.__init__(self, controllers)
757        self.tests = self.generate_test_cases(
758            ap_power='standard',
759            channels=[1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161],
760            modes=['VHT20'],
761            chamber_mode='orientation',
762            positions=list(range(0, 360, 45)))
763
764
765class WifiOtaPing_SteppedStirrers_Test(WifiOtaPingTest):
766    def __init__(self, controllers):
767        WifiOtaPingTest.__init__(self, controllers)
768        self.tests = self.generate_test_cases(ap_power='standard',
769                                              channels=[6, 36, 149],
770                                              modes=['VHT20'],
771                                              chamber_mode='stepped stirrers',
772                                              positions=list(range(100)))
773
774
775class WifiOtaPing_LowPowerAP_TenDegree_Test(WifiOtaPingTest):
776    def __init__(self, controllers):
777        WifiOtaPingTest.__init__(self, controllers)
778        self.tests = self.generate_test_cases(ap_power='low_power',
779                                              channels=[6, 36, 149],
780                                              modes=['VHT20'],
781                                              chamber_mode='orientation',
782                                              positions=list(range(0, 360,
783                                                                   10)))
784
785
786class WifiOtaPing_LowPowerAP_45Degree_Test(WifiOtaPingTest):
787    def __init__(self, controllers):
788        WifiOtaPingTest.__init__(self, controllers)
789        self.tests = self.generate_test_cases(
790            ap_power='low_power',
791            channels=[1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161],
792            modes=['VHT20'],
793            chamber_mode='orientation',
794            positions=list(range(0, 360, 45)))
795
796
797class WifiOtaPing_LowPowerAP_SteppedStirrers_Test(WifiOtaPingTest):
798    def __init__(self, controllers):
799        WifiOtaPingTest.__init__(self, controllers)
800        self.tests = self.generate_test_cases(ap_power='low_power',
801                                              channels=[6, 36, 149],
802                                              modes=['VHT20'],
803                                              chamber_mode='stepped stirrers',
804                                              positions=list(range(100)))
805