1#!/usr/bin/env python3.4
2#
3#   Copyright 2018 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import collections
18import itertools
19import json
20import logging
21import math
22import numpy
23import os
24import statistics
25from acts import asserts
26from acts import base_test
27from acts import context
28from acts import utils
29from acts.controllers.utils_lib import ssh
30from acts.controllers import iperf_server as ipf
31from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger
32from acts.test_utils.wifi import ota_chamber
33from acts.test_utils.wifi import wifi_performance_test_utils as wputils
34from acts.test_utils.wifi import wifi_retail_ap as retail_ap
35from acts.test_utils.wifi import wifi_test_utils as wutils
36from concurrent.futures import ThreadPoolExecutor
37from functools import partial
38
39SHORT_SLEEP = 1
40MED_SLEEP = 6
41CONST_3dB = 3.01029995664
42RSSI_ERROR_VAL = float('nan')
43
44
45class WifiRssiTest(base_test.BaseTestClass):
46    """Class to test WiFi RSSI reporting.
47
48    This class tests RSSI reporting on android devices. The class tests RSSI
49    accuracy by checking RSSI over a large attenuation range, checks for RSSI
50    stability over time when attenuation is fixed, and checks that RSSI quickly
51    and reacts to changes attenuation by checking RSSI trajectories over
52    configurable attenuation waveforms.For an example config file to run this
53    test class see example_connectivity_performance_ap_sta.json.
54    """
55    def __init__(self, controllers):
56        base_test.BaseTestClass.__init__(self, controllers)
57        self.testcase_metric_logger = (
58            BlackboxMappedMetricLogger.for_test_case())
59        self.testclass_metric_logger = (
60            BlackboxMappedMetricLogger.for_test_class())
61        self.publish_test_metrics = True
62
63    def setup_class(self):
64        self.dut = self.android_devices[0]
65        req_params = [
66            'RemoteServer', 'RetailAccessPoints', 'rssi_test_params',
67            'main_network', 'testbed_params'
68        ]
69        self.unpack_userparams(req_params)
70        self.testclass_params = self.rssi_test_params
71        self.num_atten = self.attenuators[0].instrument.num_atten
72        self.iperf_server = self.iperf_servers[0]
73        self.iperf_client = self.iperf_clients[0]
74        self.remote_server = ssh.connection.SshConnection(
75            ssh.settings.from_config(self.RemoteServer[0]['ssh_config']))
76        self.access_point = retail_ap.create(self.RetailAccessPoints)[0]
77        self.log_path = os.path.join(logging.log_path, 'results')
78        os.makedirs(self.log_path, exist_ok=True)
79        self.log.info('Access Point Configuration: {}'.format(
80            self.access_point.ap_settings))
81        self.testclass_results = []
82
83        # Turn WiFi ON
84        if self.testclass_params.get('airplane_mode', 1):
85            self.log.info('Turning on airplane mode.')
86            asserts.assert_true(utils.force_airplane_mode(self.dut, True),
87                                "Can not turn on airplane mode.")
88        wutils.wifi_toggle_state(self.dut, True)
89
90    def teardown_test(self):
91        self.iperf_server.stop()
92
93    def pass_fail_check_rssi_stability(self, testcase_params,
94                                       postprocessed_results):
95        """Check the test result and decide if it passed or failed.
96
97        Checks the RSSI test result and fails the test if the standard
98        deviation of signal_poll_rssi is beyond the threshold defined in the
99        config file.
100
101        Args:
102            testcase_params: dict containing test-specific parameters
103            postprocessed_results: compiled arrays of RSSI measurements
104        """
105        # Set Blackbox metric values
106        if self.publish_test_metrics:
107            self.testcase_metric_logger.add_metric(
108                'signal_poll_rssi_stdev',
109                max(postprocessed_results['signal_poll_rssi']['stdev']))
110            self.testcase_metric_logger.add_metric(
111                'chain_0_rssi_stdev',
112                max(postprocessed_results['chain_0_rssi']['stdev']))
113            self.testcase_metric_logger.add_metric(
114                'chain_1_rssi_stdev',
115                max(postprocessed_results['chain_1_rssi']['stdev']))
116
117        # Evaluate test pass/fail
118        test_failed = any([
119            stdev > self.testclass_params['stdev_tolerance']
120            for stdev in postprocessed_results['signal_poll_rssi']['stdev']
121        ])
122        test_message = (
123            'RSSI stability {0}. Standard deviation was {1} dB '
124            '(limit {2}), per chain standard deviation [{3}, {4}] dB'.format(
125                'failed' * test_failed + 'passed' * (not test_failed), [
126                    float('{:.2f}'.format(x))
127                    for x in postprocessed_results['signal_poll_rssi']['stdev']
128                ], self.testclass_params['stdev_tolerance'], [
129                    float('{:.2f}'.format(x))
130                    for x in postprocessed_results['chain_0_rssi']['stdev']
131                ], [
132                    float('{:.2f}'.format(x))
133                    for x in postprocessed_results['chain_1_rssi']['stdev']
134                ]))
135        if test_failed:
136            asserts.fail(test_message)
137        asserts.explicit_pass(test_message)
138
139    def pass_fail_check_rssi_accuracy(self, testcase_params,
140                                      postprocessed_results):
141        """Check the test result and decide if it passed or failed.
142
143        Checks the RSSI test result and compares and compute its deviation from
144        the predicted RSSI. This computation is done for all reported RSSI
145        values. The test fails if any of the RSSI values specified in
146        rssi_under_test have an average error beyond what is specified in the
147        configuration file.
148
149        Args:
150            postprocessed_results: compiled arrays of RSSI measurements
151            testcase_params: dict containing params such as list of RSSIs under
152            test, i.e., can cause test to fail and boolean indicating whether
153            to look at absolute RSSI accuracy, or centered RSSI accuracy.
154            Centered accuracy is computed after systematic RSSI shifts are
155            removed.
156        """
157        test_failed = False
158        test_message = ''
159        if testcase_params['absolute_accuracy']:
160            error_type = 'absolute'
161        else:
162            error_type = 'centered'
163
164        for key, val in postprocessed_results.items():
165            # Compute the error metrics ignoring invalid RSSI readings
166            # If all readings invalid, set error to RSSI_ERROR_VAL
167            if 'rssi' in key and 'predicted' not in key:
168                filtered_error = [x for x in val['error'] if not math.isnan(x)]
169                if filtered_error:
170                    avg_shift = statistics.mean(filtered_error)
171                    if testcase_params['absolute_accuracy']:
172                        avg_error = statistics.mean(
173                            [abs(x) for x in filtered_error])
174                    else:
175                        avg_error = statistics.mean(
176                            [abs(x - avg_shift) for x in filtered_error])
177                else:
178                    avg_error = RSSI_ERROR_VAL
179                    avg_shift = RSSI_ERROR_VAL
180                # Set Blackbox metric values
181                if self.publish_test_metrics:
182                    self.testcase_metric_logger.add_metric(
183                        '{}_error'.format(key), avg_error)
184                    self.testcase_metric_logger.add_metric(
185                        '{}_shift'.format(key), avg_shift)
186                # Evaluate test pass/fail
187                rssi_failure = (avg_error >
188                                self.testclass_params['abs_tolerance']
189                                ) or math.isnan(avg_error)
190                if rssi_failure and key in testcase_params['rssi_under_test']:
191                    test_message = test_message + (
192                        '{} failed ({} error = {:.2f} dB, '
193                        'shift = {:.2f} dB)\n').format(key, error_type,
194                                                       avg_error, avg_shift)
195                    test_failed = True
196                elif rssi_failure:
197                    test_message = test_message + (
198                        '{} failed (ignored) ({} error = {:.2f} dB, '
199                        'shift = {:.2f} dB)\n').format(key, error_type,
200                                                       avg_error, avg_shift)
201                else:
202                    test_message = test_message + (
203                        '{} passed ({} error = {:.2f} dB, '
204                        'shift = {:.2f} dB)\n').format(key, error_type,
205                                                       avg_error, avg_shift)
206        if test_failed:
207            asserts.fail(test_message)
208        asserts.explicit_pass(test_message)
209
210    def post_process_rssi_sweep(self, rssi_result):
211        """Postprocesses and saves JSON formatted results.
212
213        Args:
214            rssi_result: dict containing attenuation, rssi and other meta
215            data
216        Returns:
217            postprocessed_results: compiled arrays of RSSI data used in
218            pass/fail check
219        """
220        # Save output as text file
221        results_file_path = os.path.join(self.log_path, self.current_test_name)
222        with open(results_file_path, 'w') as results_file:
223            json.dump(rssi_result, results_file, indent=4)
224        # Compile results into arrays of RSSIs suitable for plotting
225        # yapf: disable
226        postprocessed_results = collections.OrderedDict(
227            [('signal_poll_rssi', {}),
228             ('signal_poll_avg_rssi', {}),
229             ('scan_rssi', {}),
230             ('chain_0_rssi', {}),
231             ('chain_1_rssi', {}),
232             ('total_attenuation', []),
233             ('predicted_rssi', [])])
234        # yapf: enable
235        for key, val in postprocessed_results.items():
236            if 'scan_rssi' in key:
237                postprocessed_results[key]['data'] = [
238                    x for data_point in rssi_result['rssi_result'] for x in
239                    data_point[key][rssi_result['connected_bssid']]['data']
240                ]
241                postprocessed_results[key]['mean'] = [
242                    x[key][rssi_result['connected_bssid']]['mean']
243                    for x in rssi_result['rssi_result']
244                ]
245                postprocessed_results[key]['stdev'] = [
246                    x[key][rssi_result['connected_bssid']]['stdev']
247                    for x in rssi_result['rssi_result']
248                ]
249            elif 'predicted_rssi' in key:
250                postprocessed_results['total_attenuation'] = [
251                    att + rssi_result['fixed_attenuation'] +
252                    rssi_result['dut_front_end_loss']
253                    for att in rssi_result['attenuation']
254                ]
255                postprocessed_results['predicted_rssi'] = [
256                    rssi_result['ap_tx_power'] - att
257                    for att in postprocessed_results['total_attenuation']
258                ]
259            elif 'rssi' in key:
260                postprocessed_results[key]['data'] = [
261                    x for data_point in rssi_result['rssi_result']
262                    for x in data_point[key]['data']
263                ]
264                postprocessed_results[key]['mean'] = [
265                    x[key]['mean'] for x in rssi_result['rssi_result']
266                ]
267                postprocessed_results[key]['stdev'] = [
268                    x[key]['stdev'] for x in rssi_result['rssi_result']
269                ]
270        # Compute RSSI errors
271        for key, val in postprocessed_results.items():
272            if 'chain' in key:
273                postprocessed_results[key]['error'] = [
274                    postprocessed_results[key]['mean'][idx] + CONST_3dB -
275                    postprocessed_results['predicted_rssi'][idx]
276                    for idx in range(
277                        len(postprocessed_results['predicted_rssi']))
278                ]
279            elif 'rssi' in key and 'predicted' not in key:
280                postprocessed_results[key]['error'] = [
281                    postprocessed_results[key]['mean'][idx] -
282                    postprocessed_results['predicted_rssi'][idx]
283                    for idx in range(
284                        len(postprocessed_results['predicted_rssi']))
285                ]
286        return postprocessed_results
287
288    def plot_rssi_vs_attenuation(self, postprocessed_results):
289        """Function to plot RSSI vs attenuation sweeps
290
291        Args:
292            postprocessed_results: compiled arrays of RSSI data.
293        """
294        figure = wputils.BokehFigure(self.current_test_name,
295                                     x_label='Attenuation (dB)',
296                                     primary_y_label='RSSI (dBm)')
297        figure.add_line(postprocessed_results['total_attenuation'],
298                        postprocessed_results['signal_poll_rssi']['mean'],
299                        'Signal Poll RSSI',
300                        marker='circle')
301        figure.add_line(postprocessed_results['total_attenuation'],
302                        postprocessed_results['scan_rssi']['mean'],
303                        'Scan RSSI',
304                        marker='circle')
305        figure.add_line(postprocessed_results['total_attenuation'],
306                        postprocessed_results['chain_0_rssi']['mean'],
307                        'Chain 0 RSSI',
308                        marker='circle')
309        figure.add_line(postprocessed_results['total_attenuation'],
310                        postprocessed_results['chain_1_rssi']['mean'],
311                        'Chain 1 RSSI',
312                        marker='circle')
313        figure.add_line(postprocessed_results['total_attenuation'],
314                        postprocessed_results['predicted_rssi'],
315                        'Predicted RSSI',
316                        marker='circle')
317
318        output_file_path = os.path.join(self.log_path,
319                                        self.current_test_name + '.html')
320        figure.generate_figure(output_file_path)
321
322    def plot_rssi_vs_time(self, rssi_result, postprocessed_results,
323                          center_curves):
324        """Function to plot RSSI vs time.
325
326        Args:
327            rssi_result: dict containing raw RSSI data
328            postprocessed_results: compiled arrays of RSSI data
329            center_curvers: boolean indicating whether to shift curves to align
330            them with predicted RSSIs
331        """
332        figure = wputils.BokehFigure(
333            self.current_test_name,
334            x_label='Time (s)',
335            primary_y_label=center_curves * 'Centered' + 'RSSI (dBm)',
336        )
337
338        # yapf: disable
339        rssi_time_series = collections.OrderedDict(
340            [('signal_poll_rssi', []),
341             ('signal_poll_avg_rssi', []),
342             ('scan_rssi', []),
343             ('chain_0_rssi', []),
344             ('chain_1_rssi', []),
345             ('predicted_rssi', [])])
346        # yapf: enable
347        for key, val in rssi_time_series.items():
348            if 'predicted_rssi' in key:
349                rssi_time_series[key] = [
350                    x for x in postprocessed_results[key] for copies in range(
351                        len(rssi_result['rssi_result'][0]['signal_poll_rssi']
352                            ['data']))
353                ]
354            elif 'rssi' in key:
355                if center_curves:
356                    filtered_error = [
357                        x for x in postprocessed_results[key]['error']
358                        if not math.isnan(x)
359                    ]
360                    if filtered_error:
361                        avg_shift = statistics.mean(filtered_error)
362                    else:
363                        avg_shift = 0
364                    rssi_time_series[key] = [
365                        x - avg_shift
366                        for x in postprocessed_results[key]['data']
367                    ]
368                else:
369                    rssi_time_series[key] = postprocessed_results[key]['data']
370            time_vec = [
371                self.testclass_params['polling_frequency'] * x
372                for x in range(len(rssi_time_series[key]))
373            ]
374            if len(rssi_time_series[key]) > 0:
375                figure.add_line(time_vec, rssi_time_series[key], key)
376
377        output_file_path = os.path.join(self.log_path,
378                                        self.current_test_name + '.html')
379        figure.generate_figure(output_file_path)
380
381    def plot_rssi_distribution(self, postprocessed_results):
382        """Function to plot RSSI distributions.
383
384        Args:
385            postprocessed_results: compiled arrays of RSSI data
386        """
387        monitored_rssis = ['signal_poll_rssi', 'chain_0_rssi', 'chain_1_rssi']
388
389        rssi_dist = collections.OrderedDict()
390        for rssi_key in monitored_rssis:
391            rssi_data = postprocessed_results[rssi_key]
392            rssi_dist[rssi_key] = collections.OrderedDict()
393            unique_rssi = sorted(set(rssi_data['data']))
394            rssi_counts = []
395            for value in unique_rssi:
396                rssi_counts.append(rssi_data['data'].count(value))
397            total_count = sum(rssi_counts)
398            rssi_dist[rssi_key]['rssi_values'] = unique_rssi
399            rssi_dist[rssi_key]['rssi_pdf'] = [
400                x / total_count for x in rssi_counts
401            ]
402            rssi_dist[rssi_key]['rssi_cdf'] = []
403            cum_prob = 0
404            for prob in rssi_dist[rssi_key]['rssi_pdf']:
405                cum_prob += prob
406                rssi_dist[rssi_key]['rssi_cdf'].append(cum_prob)
407
408        figure = wputils.BokehFigure(self.current_test_name,
409                                     x_label='RSSI (dBm)',
410                                     primary_y_label='p(RSSI = x)',
411                                     secondary_y_label='p(RSSI <= x)')
412        for rssi_key, rssi_data in rssi_dist.items():
413            figure.add_line(x_data=rssi_data['rssi_values'],
414                            y_data=rssi_data['rssi_pdf'],
415                            legend='{} PDF'.format(rssi_key),
416                            y_axis='default')
417            figure.add_line(x_data=rssi_data['rssi_values'],
418                            y_data=rssi_data['rssi_cdf'],
419                            legend='{} CDF'.format(rssi_key),
420                            y_axis='secondary')
421        output_file_path = os.path.join(self.log_path,
422                                        self.current_test_name + '_dist.html')
423        figure.generate_figure(output_file_path)
424
425    def run_rssi_test(self, testcase_params):
426        """Test function to run RSSI tests.
427
428        The function runs an RSSI test in the current device/AP configuration.
429        Function is called from another wrapper function that sets up the
430        testbed for the RvR test
431
432        Args:
433            testcase_params: dict containing test-specific parameters
434        Returns:
435            rssi_result: dict containing rssi_result and meta data
436        """
437        # Run test and log result
438        rssi_result = collections.OrderedDict()
439        rssi_result['test_name'] = self.current_test_name
440        rssi_result['testcase_params'] = testcase_params
441        rssi_result['ap_settings'] = self.access_point.ap_settings.copy()
442        rssi_result['attenuation'] = list(testcase_params['rssi_atten_range'])
443        rssi_result['connected_bssid'] = self.main_network[
444            testcase_params['band']].get('BSSID', '00:00:00:00')
445        channel_mode_combo = '{}_{}'.format(str(testcase_params['channel']),
446                                            testcase_params['mode'])
447        channel_str = str(testcase_params['channel'])
448        if channel_mode_combo in self.testbed_params['ap_tx_power']:
449            rssi_result['ap_tx_power'] = self.testbed_params['ap_tx_power'][
450                channel_mode_combo]
451        else:
452            rssi_result['ap_tx_power'] = self.testbed_params['ap_tx_power'][
453                str(testcase_params['channel'])]
454        rssi_result['fixed_attenuation'] = self.testbed_params[
455            'fixed_attenuation'][channel_str]
456        rssi_result['dut_front_end_loss'] = self.testbed_params[
457            'dut_front_end_loss'][channel_str]
458
459        self.log.info('Start running RSSI test.')
460        rssi_result['rssi_result'] = []
461        rssi_result['llstats'] = []
462        llstats_obj = wputils.LinkLayerStats(self.dut)
463        # Start iperf traffic if required by test
464        if testcase_params['active_traffic'] and testcase_params[
465                'traffic_type'] == 'iperf':
466            self.iperf_server.start(tag=0)
467            if isinstance(self.iperf_server, ipf.IPerfServerOverAdb):
468                iperf_server_address = self.dut_ip
469            else:
470                iperf_server_address = wputils.get_server_address(
471                    self.remote_server, self.dut_ip, '255.255.255.0')
472            executor = ThreadPoolExecutor(max_workers=1)
473            thread_future = executor.submit(
474                self.iperf_client.start, iperf_server_address,
475                testcase_params['iperf_args'], 0,
476                testcase_params['traffic_timeout'] + SHORT_SLEEP)
477            executor.shutdown(wait=False)
478        elif testcase_params['active_traffic'] and testcase_params[
479                'traffic_type'] == 'ping':
480            thread_future = wputils.get_ping_stats_nb(
481                self.remote_server, self.dut_ip,
482                testcase_params['traffic_timeout'], 0.02, 64)
483        else:
484            thread_future = wputils.get_ping_stats_nb(
485                self.remote_server, self.dut_ip,
486                testcase_params['traffic_timeout'], 0.5, 64)
487        for atten in testcase_params['rssi_atten_range']:
488            # Set Attenuation
489            self.log.info('Setting attenuation to {} dB'.format(atten))
490            for attenuator in self.attenuators:
491                attenuator.set_atten(atten)
492            llstats_obj.update_stats()
493            current_rssi = collections.OrderedDict()
494            current_rssi = wputils.get_connected_rssi(
495                self.dut, testcase_params['connected_measurements'],
496                self.testclass_params['polling_frequency'],
497                testcase_params['first_measurement_delay'])
498            current_rssi['scan_rssi'] = wputils.get_scan_rssi(
499                self.dut, testcase_params['tracked_bssid'],
500                testcase_params['scan_measurements'])
501            rssi_result['rssi_result'].append(current_rssi)
502            llstats_obj.update_stats()
503            curr_llstats = llstats_obj.llstats_incremental.copy()
504            rssi_result['llstats'].append(curr_llstats)
505            self.log.info(
506                'Connected RSSI at {0:.2f} dB is {1:.2f} [{2:.2f}, {3:.2f}] dB'
507                .format(atten, current_rssi['signal_poll_rssi']['mean'],
508                        current_rssi['chain_0_rssi']['mean'],
509                        current_rssi['chain_1_rssi']['mean']))
510        # Stop iperf traffic if needed
511        for attenuator in self.attenuators:
512            attenuator.set_atten(0)
513        thread_future.result()
514        if testcase_params['active_traffic'] and testcase_params[
515                'traffic_type'] == 'iperf':
516            self.iperf_server.stop()
517        return rssi_result
518
519    def setup_ap(self, testcase_params):
520        """Function that gets devices ready for the test.
521
522        Args:
523            testcase_params: dict containing test-specific parameters
524        """
525        if '2G' in testcase_params['band']:
526            frequency = wutils.WifiEnums.channel_2G_to_freq[
527                testcase_params['channel']]
528        else:
529            frequency = wutils.WifiEnums.channel_5G_to_freq[
530                testcase_params['channel']]
531        if frequency in wutils.WifiEnums.DFS_5G_FREQUENCIES:
532            self.access_point.set_region(self.testbed_params['DFS_region'])
533        else:
534            self.access_point.set_region(self.testbed_params['default_region'])
535        self.access_point.set_channel(testcase_params['band'],
536                                      testcase_params['channel'])
537        self.access_point.set_bandwidth(testcase_params['band'],
538                                        testcase_params['mode'])
539        self.log.info('Access Point Configuration: {}'.format(
540            self.access_point.ap_settings))
541
542    def setup_dut(self, testcase_params):
543        """Sets up the DUT in the configuration required by the test."""
544        # Check battery level before test
545        if not wputils.health_check(self.dut, 10):
546            asserts.skip('Battery level too low. Skipping test.')
547        # Turn screen off to preserve battery
548        self.dut.go_to_sleep()
549        if wputils.validate_network(self.dut,
550                                    testcase_params['test_network']['SSID']):
551            self.log.info('Already connected to desired network')
552        else:
553            wutils.wifi_toggle_state(self.dut, True)
554            wutils.reset_wifi(self.dut)
555            self.main_network[testcase_params['band']][
556                'channel'] = testcase_params['channel']
557            wutils.set_wifi_country_code(self.dut,
558                                         self.testclass_params['country_code'])
559            wutils.wifi_connect(self.dut,
560                                self.main_network[testcase_params['band']],
561                                num_of_tries=5)
562        self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0]
563
564    def setup_rssi_test(self, testcase_params):
565        """Main function to test RSSI.
566
567        The function sets up the AP in the correct channel and mode
568        configuration and called rssi_test to sweep attenuation and measure
569        RSSI
570
571        Args:
572            testcase_params: dict containing test-specific parameters
573        Returns:
574            rssi_result: dict containing rssi_results and meta data
575        """
576        # Configure AP
577        self.setup_ap(testcase_params)
578        # Initialize attenuators
579        for attenuator in self.attenuators:
580            attenuator.set_atten(testcase_params['rssi_atten_range'][0])
581        # Connect DUT to Network
582        self.setup_dut(testcase_params)
583
584    def get_traffic_timeout(self, testcase_params):
585        """Function to comput iperf session length required in RSSI test.
586
587        Args:
588            testcase_params: dict containing test-specific parameters
589        Returns:
590            traffic_timeout: length of iperf session required in rssi test
591        """
592        atten_step_duration = testcase_params['first_measurement_delay'] + (
593            testcase_params['connected_measurements'] *
594            self.testclass_params['polling_frequency']
595        ) + testcase_params['scan_measurements'] * MED_SLEEP
596        timeout = len(testcase_params['rssi_atten_range']
597                      ) * atten_step_duration + MED_SLEEP
598        return timeout
599
600    def compile_rssi_vs_atten_test_params(self, testcase_params):
601        """Function to complete compiling test-specific parameters
602
603        Args:
604            testcase_params: dict containing test-specific parameters
605        """
606        testcase_params.update(
607            connected_measurements=self.
608            testclass_params['rssi_vs_atten_connected_measurements'],
609            scan_measurements=self.
610            testclass_params['rssi_vs_atten_scan_measurements'],
611            first_measurement_delay=MED_SLEEP,
612            rssi_under_test=self.testclass_params['rssi_vs_atten_metrics'],
613            absolute_accuracy=1)
614
615        testcase_params['band'] = self.access_point.band_lookup_by_channel(
616            testcase_params['channel'])
617        testcase_params['test_network'] = self.main_network[
618            testcase_params['band']]
619        testcase_params['tracked_bssid'] = [
620            self.main_network[testcase_params['band']].get(
621                'BSSID', '00:00:00:00')
622        ]
623
624        num_atten_steps = int((self.testclass_params['rssi_vs_atten_stop'] -
625                               self.testclass_params['rssi_vs_atten_start']) /
626                              self.testclass_params['rssi_vs_atten_step'])
627        testcase_params['rssi_atten_range'] = [
628            self.testclass_params['rssi_vs_atten_start'] +
629            x * self.testclass_params['rssi_vs_atten_step']
630            for x in range(0, num_atten_steps)
631        ]
632        testcase_params['traffic_timeout'] = self.get_traffic_timeout(
633            testcase_params)
634
635        if isinstance(self.iperf_server, ipf.IPerfServerOverAdb):
636            testcase_params['iperf_args'] = '-i 1 -t {} -J'.format(
637                testcase_params['traffic_timeout'])
638        else:
639            testcase_params['iperf_args'] = '-i 1 -t {} -J -R'.format(
640                testcase_params['traffic_timeout'])
641        return testcase_params
642
643    def compile_rssi_stability_test_params(self, testcase_params):
644        """Function to complete compiling test-specific parameters
645
646        Args:
647            testcase_params: dict containing test-specific parameters
648        """
649        testcase_params.update(
650            connected_measurements=int(
651                self.testclass_params['rssi_stability_duration'] /
652                self.testclass_params['polling_frequency']),
653            scan_measurements=0,
654            first_measurement_delay=MED_SLEEP,
655            rssi_atten_range=self.testclass_params['rssi_stability_atten'])
656        testcase_params['band'] = self.access_point.band_lookup_by_channel(
657            testcase_params['channel'])
658        testcase_params['test_network'] = self.main_network[
659            testcase_params['band']]
660        testcase_params['tracked_bssid'] = [
661            self.main_network[testcase_params['band']].get(
662                'BSSID', '00:00:00:00')
663        ]
664
665        testcase_params['traffic_timeout'] = self.get_traffic_timeout(
666            testcase_params)
667        if isinstance(self.iperf_server, ipf.IPerfServerOverAdb):
668            testcase_params['iperf_args'] = '-i 1 -t {} -J'.format(
669                testcase_params['traffic_timeout'])
670        else:
671            testcase_params['iperf_args'] = '-i 1 -t {} -J -R'.format(
672                testcase_params['traffic_timeout'])
673        return testcase_params
674
675    def compile_rssi_tracking_test_params(self, testcase_params):
676        """Function to complete compiling test-specific parameters
677
678        Args:
679            testcase_params: dict containing test-specific parameters
680        """
681        testcase_params.update(connected_measurements=int(
682            1 / self.testclass_params['polling_frequency']),
683                               scan_measurements=0,
684                               first_measurement_delay=0,
685                               rssi_under_test=['signal_poll_rssi'],
686                               absolute_accuracy=0)
687        testcase_params['band'] = self.access_point.band_lookup_by_channel(
688            testcase_params['channel'])
689        testcase_params['test_network'] = self.main_network[
690            testcase_params['band']]
691        testcase_params['tracked_bssid'] = [
692            self.main_network[testcase_params['band']].get(
693                'BSSID', '00:00:00:00')
694        ]
695
696        rssi_atten_range = []
697        for waveform in self.testclass_params['rssi_tracking_waveforms']:
698            waveform_vector = []
699            for section in range(len(waveform['atten_levels']) - 1):
700                section_limits = waveform['atten_levels'][section:section + 2]
701                up_down = (1 - 2 * (section_limits[1] < section_limits[0]))
702                temp_section = list(
703                    range(section_limits[0], section_limits[1] + up_down,
704                          up_down * waveform['step_size']))
705                temp_section = [
706                    temp_section[idx] for idx in range(len(temp_section))
707                    for n in range(waveform['step_duration'])
708                ]
709                waveform_vector += temp_section
710            waveform_vector = waveform_vector * waveform['repetitions']
711            rssi_atten_range = rssi_atten_range + waveform_vector
712        testcase_params['rssi_atten_range'] = rssi_atten_range
713        testcase_params['traffic_timeout'] = self.get_traffic_timeout(
714            testcase_params)
715
716        if isinstance(self.iperf_server, ipf.IPerfServerOverAdb):
717            testcase_params['iperf_args'] = '-i 1 -t {} -J'.format(
718                testcase_params['traffic_timeout'])
719        else:
720            testcase_params['iperf_args'] = '-i 1 -t {} -J -R'.format(
721                testcase_params['traffic_timeout'])
722        return testcase_params
723
724    def _test_rssi_vs_atten(self, testcase_params):
725        """Function that gets called for each test case of rssi_vs_atten
726
727        The function gets called in each rssi test case. The function
728        customizes the test based on the test name of the test that called it
729
730        Args:
731            testcase_params: dict containing test-specific parameters
732        """
733        testcase_params = self.compile_rssi_vs_atten_test_params(
734            testcase_params)
735
736        self.setup_rssi_test(testcase_params)
737        rssi_result = self.run_rssi_test(testcase_params)
738        rssi_result['postprocessed_results'] = self.post_process_rssi_sweep(
739            rssi_result)
740        self.testclass_results.append(rssi_result)
741        self.plot_rssi_vs_attenuation(rssi_result['postprocessed_results'])
742        self.pass_fail_check_rssi_accuracy(
743            testcase_params, rssi_result['postprocessed_results'])
744
745    def _test_rssi_stability(self, testcase_params):
746        """ Function that gets called for each test case of rssi_stability
747
748        The function gets called in each stability test case. The function
749        customizes test based on the test name of the test that called it
750        """
751        testcase_params = self.compile_rssi_stability_test_params(
752            testcase_params)
753
754        self.setup_rssi_test(testcase_params)
755        rssi_result = self.run_rssi_test(testcase_params)
756        rssi_result['postprocessed_results'] = self.post_process_rssi_sweep(
757            rssi_result)
758        self.testclass_results.append(rssi_result)
759        self.plot_rssi_vs_time(rssi_result,
760                               rssi_result['postprocessed_results'], 1)
761        self.plot_rssi_distribution(rssi_result['postprocessed_results'])
762        self.pass_fail_check_rssi_stability(
763            testcase_params, rssi_result['postprocessed_results'])
764
765    def _test_rssi_tracking(self, testcase_params):
766        """ Function that gets called for each test case of rssi_tracking
767
768        The function gets called in each rssi test case. The function
769        customizes the test based on the test name of the test that called it
770        """
771        testcase_params = self.compile_rssi_tracking_test_params(
772            testcase_params)
773
774        self.setup_rssi_test(testcase_params)
775        rssi_result = self.run_rssi_test(testcase_params)
776        rssi_result['postprocessed_results'] = self.post_process_rssi_sweep(
777            rssi_result)
778        self.testclass_results.append(rssi_result)
779        self.plot_rssi_vs_time(rssi_result,
780                               rssi_result['postprocessed_results'], 1)
781        self.pass_fail_check_rssi_accuracy(
782            testcase_params, rssi_result['postprocessed_results'])
783
784    def generate_test_cases(self, test_types, channels, modes, traffic_modes):
785        """Function that auto-generates test cases for a test class."""
786        test_cases = []
787        allowed_configs = {
788            'VHT20': [
789                1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 36, 40, 44, 48, 149, 153,
790                157, 161
791            ],
792            'VHT40': [36, 44, 149, 157],
793            'VHT80': [36, 149]
794        }
795
796        for channel, mode, traffic_mode, test_type in itertools.product(
797                channels, modes, traffic_modes, test_types):
798            if channel not in allowed_configs[mode]:
799                continue
800            test_name = test_type + '_ch{}_{}_{}'.format(
801                channel, mode, traffic_mode)
802            testcase_params = collections.OrderedDict(
803                channel=channel,
804                mode=mode,
805                active_traffic=(traffic_mode == 'ActiveTraffic'),
806                traffic_type=self.user_params['rssi_test_params']
807                ['traffic_type'],
808            )
809            test_function = getattr(self, '_{}'.format(test_type))
810            setattr(self, test_name, partial(test_function, testcase_params))
811            test_cases.append(test_name)
812        return test_cases
813
814
815class WifiRssi_2GHz_ActiveTraffic_Test(WifiRssiTest):
816    def __init__(self, controllers):
817        super().__init__(controllers)
818        self.tests = self.generate_test_cases(
819            ['test_rssi_stability', 'test_rssi_vs_atten'], [1, 2, 6, 10, 11],
820            ['VHT20'], ['ActiveTraffic'])
821
822
823class WifiRssi_5GHz_ActiveTraffic_Test(WifiRssiTest):
824    def __init__(self, controllers):
825        super().__init__(controllers)
826        self.tests = self.generate_test_cases(
827            ['test_rssi_stability', 'test_rssi_vs_atten'],
828            [36, 40, 44, 48, 149, 153, 157, 161], ['VHT20', 'VHT40', 'VHT80'],
829            ['ActiveTraffic'])
830
831
832class WifiRssi_AllChannels_ActiveTraffic_Test(WifiRssiTest):
833    def __init__(self, controllers):
834        super().__init__(controllers)
835        self.tests = self.generate_test_cases(
836            ['test_rssi_stability', 'test_rssi_vs_atten'],
837            [1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161],
838            ['VHT20', 'VHT40', 'VHT80'], ['ActiveTraffic'])
839
840
841class WifiRssi_SampleChannels_NoTraffic_Test(WifiRssiTest):
842    def __init__(self, controllers):
843        super().__init__(controllers)
844        self.tests = self.generate_test_cases(
845            ['test_rssi_stability', 'test_rssi_vs_atten'], [6, 36, 149],
846            ['VHT20', 'VHT40', 'VHT80'], ['NoTraffic'])
847
848
849class WifiRssiTrackingTest(WifiRssiTest):
850    def __init__(self, controllers):
851        super().__init__(controllers)
852        self.tests = self.generate_test_cases(['test_rssi_tracking'],
853                                              [6, 36, 149],
854                                              ['VHT20', 'VHT40', 'VHT80'],
855                                              ['ActiveTraffic', 'NoTraffic'])
856
857
858# Over-the air version of RSSI tests
859class WifiOtaRssiTest(WifiRssiTest):
860    """Class to test over-the-air rssi tests.
861
862    This class implements measures WiFi RSSI tests in an OTA chamber.
863    It allows setting orientation and other chamber parameters to study
864    performance in varying channel conditions
865    """
866    def __init__(self, controllers):
867        base_test.BaseTestClass.__init__(self, controllers)
868        self.testcase_metric_logger = (
869            BlackboxMappedMetricLogger.for_test_case())
870        self.testclass_metric_logger = (
871            BlackboxMappedMetricLogger.for_test_class())
872        self.publish_test_metrics = False
873
874    def setup_class(self):
875        WifiRssiTest.setup_class(self)
876        self.ota_chamber = ota_chamber.create(
877            self.user_params['OTAChamber'])[0]
878
879    def teardown_class(self):
880        self.ota_chamber.reset_chamber()
881        self.process_testclass_results()
882
883    def teardown_test(self):
884        if self.ota_chamber.current_mode == 'continuous':
885            self.ota_chamber.reset_chamber()
886
887    def extract_test_id(self, testcase_params, id_fields):
888        test_id = collections.OrderedDict(
889            (param, testcase_params[param]) for param in id_fields)
890        return test_id
891
892    def process_testclass_results(self):
893        """Saves all test results to enable comparison."""
894        testclass_data = collections.OrderedDict()
895        for test_result in self.testclass_results:
896            current_params = test_result['testcase_params']
897
898            channel = current_params['channel']
899            channel_data = testclass_data.setdefault(
900                channel,
901                collections.OrderedDict(orientation=[],
902                                        rssi=collections.OrderedDict(
903                                            signal_poll_rssi=[],
904                                            chain_0_rssi=[],
905                                            chain_1_rssi=[])))
906
907            channel_data['orientation'].append(current_params['orientation'])
908            channel_data['rssi']['signal_poll_rssi'].append(
909                test_result['postprocessed_results']['signal_poll_rssi']
910                ['mean'][0])
911            channel_data['rssi']['chain_0_rssi'].append(
912                test_result['postprocessed_results']['chain_0_rssi']['mean']
913                [0])
914            channel_data['rssi']['chain_1_rssi'].append(
915                test_result['postprocessed_results']['chain_1_rssi']['mean']
916                [0])
917
918        # Publish test class metrics
919        for channel, channel_data in testclass_data.items():
920            for rssi_metric, rssi_metric_value in channel_data['rssi'].items():
921                metric_name = 'ota_summary_ch{}.avg_{}'.format(
922                    channel, rssi_metric)
923                metric_value = numpy.mean(rssi_metric_value)
924                self.testclass_metric_logger.add_metric(
925                    metric_name, metric_value)
926
927        # Plot test class results
928        chamber_mode = self.testclass_results[0]['testcase_params'][
929            'chamber_mode']
930        if chamber_mode == 'orientation':
931            x_label = 'Angle (deg)'
932        elif chamber_mode == 'stepped stirrers':
933            x_label = 'Position Index'
934        elif chamber_mode == 'StirrersOn':
935            return
936        plots = []
937        for channel, channel_data in testclass_data.items():
938            current_plot = wputils.BokehFigure(
939                title='Channel {} - Rssi vs. Position'.format(channel),
940                x_label=x_label,
941                primary_y_label='RSSI (dBm)',
942            )
943            for rssi_metric, rssi_metric_value in channel_data['rssi'].items():
944                legend = rssi_metric
945                current_plot.add_line(channel_data['orientation'],
946                                      rssi_metric_value, legend)
947            current_plot.generate_figure()
948            plots.append(current_plot)
949        current_context = context.get_current_context().get_full_output_path()
950        plot_file_path = os.path.join(current_context, 'results.html')
951        wputils.BokehFigure.save_figures(plots, plot_file_path)
952
953    def setup_rssi_test(self, testcase_params):
954        # Test setup
955        WifiRssiTest.setup_rssi_test(self, testcase_params)
956        if testcase_params['chamber_mode'] == 'StirrersOn':
957            self.ota_chamber.start_continuous_stirrers()
958        else:
959            self.ota_chamber.set_orientation(testcase_params['orientation'])
960
961    def compile_ota_rssi_test_params(self, testcase_params):
962        """Function to complete compiling test-specific parameters
963
964        Args:
965            testcase_params: dict containing test-specific parameters
966        """
967        if "rssi_over_orientation" in self.test_name:
968            rssi_test_duration = self.testclass_params[
969                'rssi_over_orientation_duration']
970        elif "rssi_variation" in self.test_name:
971            rssi_test_duration = self.testclass_params[
972                'rssi_variation_duration']
973
974        testcase_params.update(
975            connected_measurements=int(
976                rssi_test_duration /
977                self.testclass_params['polling_frequency']),
978            scan_measurements=0,
979            first_measurement_delay=MED_SLEEP,
980            rssi_atten_range=[
981                self.testclass_params['rssi_ota_test_attenuation']
982            ])
983        testcase_params['band'] = self.access_point.band_lookup_by_channel(
984            testcase_params['channel'])
985        testcase_params['test_network'] = self.main_network[
986            testcase_params['band']]
987        testcase_params['tracked_bssid'] = [
988            self.main_network[testcase_params['band']].get(
989                'BSSID', '00:00:00:00')
990        ]
991
992        testcase_params['traffic_timeout'] = self.get_traffic_timeout(
993            testcase_params)
994        if isinstance(self.iperf_server, ipf.IPerfServerOverAdb):
995            testcase_params['iperf_args'] = '-i 1 -t {} -J'.format(
996                testcase_params['traffic_timeout'])
997        else:
998            testcase_params['iperf_args'] = '-i 1 -t {} -J -R'.format(
999                testcase_params['traffic_timeout'])
1000        return testcase_params
1001
1002    def _test_ota_rssi(self, testcase_params):
1003        testcase_params = self.compile_ota_rssi_test_params(testcase_params)
1004
1005        self.setup_rssi_test(testcase_params)
1006        rssi_result = self.run_rssi_test(testcase_params)
1007        rssi_result['postprocessed_results'] = self.post_process_rssi_sweep(
1008            rssi_result)
1009        self.testclass_results.append(rssi_result)
1010        self.plot_rssi_vs_time(rssi_result,
1011                               rssi_result['postprocessed_results'], 1)
1012        self.plot_rssi_distribution(rssi_result['postprocessed_results'])
1013
1014    def generate_test_cases(self, test_types, channels, modes, traffic_modes,
1015                            chamber_modes, orientations):
1016        test_cases = []
1017        allowed_configs = {
1018            'VHT20': [
1019                1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 36, 40, 44, 48, 149, 153,
1020                157, 161
1021            ],
1022            'VHT40': [36, 44, 149, 157],
1023            'VHT80': [36, 149]
1024        }
1025
1026        for (channel, mode, traffic, chamber_mode, orientation,
1027             test_type) in itertools.product(channels, modes, traffic_modes,
1028                                             chamber_modes, orientations,
1029                                             test_types):
1030            if channel not in allowed_configs[mode]:
1031                continue
1032            test_name = test_type + '_ch{}_{}_{}_{}deg'.format(
1033                channel, mode, traffic, orientation)
1034            testcase_params = collections.OrderedDict(
1035                channel=channel,
1036                mode=mode,
1037                active_traffic=(traffic == 'ActiveTraffic'),
1038                traffic_type=self.user_params['rssi_test_params']
1039                ['traffic_type'],
1040                chamber_mode=chamber_mode,
1041                orientation=orientation)
1042            test_function = self._test_ota_rssi
1043            setattr(self, test_name, partial(test_function, testcase_params))
1044            test_cases.append(test_name)
1045        return test_cases
1046
1047
1048class WifiOtaRssi_Accuracy_Test(WifiOtaRssiTest):
1049    def __init__(self, controllers):
1050        super().__init__(controllers)
1051        self.tests = self.generate_test_cases(['test_rssi_vs_atten'],
1052                                              [6, 36, 149], ['VHT20'],
1053                                              ['ActiveTraffic'],
1054                                              ['orientation'],
1055                                              list(range(0, 360, 45)))
1056
1057
1058class WifiOtaRssi_StirrerVariation_Test(WifiOtaRssiTest):
1059    def __init__(self, controllers):
1060        WifiRssiTest.__init__(self, controllers)
1061        self.tests = self.generate_test_cases(['test_rssi_variation'],
1062                                              [6, 36, 149], ['VHT20'],
1063                                              ['ActiveTraffic'],
1064                                              ['StirrersOn'], [0])
1065
1066
1067class WifiOtaRssi_TenDegree_Test(WifiOtaRssiTest):
1068    def __init__(self, controllers):
1069        WifiRssiTest.__init__(self, controllers)
1070        self.tests = self.generate_test_cases(['test_rssi_over_orientation'],
1071                                              [6, 36, 149], ['VHT20'],
1072                                              ['ActiveTraffic'],
1073                                              ['orientation'],
1074                                              list(range(0, 360, 10)))
1075