1#!/usr/bin/env python3
2#
3#   Copyright 2018 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import itertools
18import pprint
19import time
20
21import acts.signals
22import acts.test_utils.wifi.wifi_test_utils as wutils
23
24from acts import asserts
25from acts.test_decorators import test_tracker_info
26from acts.test_utils.wifi.WifiBaseTest import WifiBaseTest
27from acts.controllers import iperf_server as ipf
28
29import json
30import logging
31import math
32import os
33from acts import utils
34import csv
35
36import serial
37import sys
38
39
40WifiEnums = wutils.WifiEnums
41
42
43class WifiRvrTWTest(WifiBaseTest):
44    """ Tests for wifi RVR performance
45
46        Test Bed Requirement:
47          * One Android device
48          * Wi-Fi networks visible to the device
49    """
50    TEST_TIMEOUT = 10
51
52    def setup_class(self):
53        super().setup_class()
54
55        self.dut = self.android_devices[0]
56        wutils.wifi_test_device_init(self.dut)
57
58        req_params = [ "iot_networks","rvr_test_params"]
59        opt_params = [ "angle_params","usb_port"]
60        self.unpack_userparams(req_param_names=req_params,
61                               opt_param_names=opt_params)
62
63        asserts.assert_true(
64            len(self.iot_networks) > 0,
65            "Need at least one iot network with psk.")
66
67        wutils.wifi_toggle_state(self.dut, True)
68        if "rvr_test_params" in self.user_params:
69            self.iperf_server = self.iperf_servers[0]
70            self.MaxdB= self.rvr_test_params ["rvr_atten_MaxDB"]
71            self.MindB= self.rvr_test_params ["rvr_atten_MinDB"]
72            self.stepdB= self.rvr_test_params ["rvr_atten_step"]
73
74        if "angle_params" in self.user_params:
75            self.angle = self.angle_params
76
77        if "usb_port" in self.user_params:
78            self.T1=self.readport(self.usb_port["turntable"])
79            self.ATT1=self.readport(self.usb_port["atten1"])
80            self.ATT2=self.readport(self.usb_port["atten2"])
81            self.ATT3=self.readport(self.usb_port["atten3"])
82
83        # create hashmap for testcase name and SSIDs
84        self.iot_test_prefix = "test_iot_connection_to_"
85        self.ssid_map = {}
86        for network in self.iot_networks:
87            SSID = network['SSID'].replace('-','_')
88            self.ssid_map[SSID] = network
89
90        # create folder for rvr test result
91        self.log_path = os.path.join(logging.log_path, "rvr_results")
92        os.makedirs(self.log_path, exist_ok=True)
93
94        Header=("test_SSID","Turn table (angle)","Attenuator(dBm)",
95                "TX throughput (Mbps)","RX throughput (Mbps)",
96                "RSSI","Link speed","Frequency")
97        self.csv_write(Header)
98
99    def setup_test(self):
100        self.dut.droid.wakeLockAcquireBright()
101        self.dut.droid.wakeUpNow()
102
103    def teardown_test(self):
104        self.dut.droid.wakeLockRelease()
105        self.dut.droid.goToSleepNow()
106
107    def teardown_class(self):
108        if "rvr_test_params" in self.user_params:
109            self.iperf_server.stop()
110
111    def on_fail(self, test_name, begin_time):
112        self.dut.take_bug_report(test_name, begin_time)
113        self.dut.cat_adb_log(test_name, begin_time)
114
115    """Helper Functions"""
116
117    def csv_write(self,data):
118        """Output .CSV file for test result.
119
120        Args:
121            data: Dict containing attenuation, throughput and other meta data.
122        """
123        with open("{}/Result.csv".format(self.log_path), "a", newline="") as csv_file:
124            csv_writer = csv.writer(csv_file,delimiter=',')
125            csv_writer.writerow(data)
126            csv_file.close()
127
128    def readport(self,com):
129        """Read com port for current test.
130
131        Args:
132            com: Attenuator or turn table com port
133        """
134        port=serial.Serial(com,9600,timeout=1)
135        time.sleep(1)
136        return port
137
138    def getdB(self,port):
139        """Get attenuator dB for current test.
140
141        Args:
142            port: Attenuator com port
143        """
144        port.write('V?;'.encode())
145        dB=port.readline().decode()
146        dB=dB.strip(';')
147        dB=dB[dB.find('V')+1:]
148        return int(dB)
149
150    def setdB(self,port,dB):
151        """Setup attenuator dB for current test.
152
153        Args:
154            port: Attenuator com port
155            dB: Attenuator setup dB
156        """
157        if dB<0:
158            dB=0
159        elif dB>101:
160            dB=101
161        self.log.info("Set dB to "+str(dB))
162        InputdB=str('V')+str(dB)+str(';')
163        port.write(InputdB.encode())
164        time.sleep(0.1)
165
166    def set_Three_Att_dB(self,port1,port2,port3,dB):
167        """Setup 3 attenuator dB for current test.
168
169        Args:
170            port1: Attenuator1 com port
171            port1: Attenuator2 com port
172            port1: Attenuator com port
173            dB: Attenuator setup dB
174        """
175        self.setdB(port1,dB)
176        self.setdB(port2,dB)
177        self.setdB(port3,dB)
178        self.checkdB(port1,dB)
179        self.checkdB(port2,dB)
180        self.checkdB(port3,dB)
181
182    def checkdB(self,port,dB):
183        """Check attenuator dB for current test.
184
185        Args:
186            port: Attenuator com port
187            dB: Attenuator setup dB
188        """
189        retry=0
190        while self.getdB(port)!=dB and retry<10:
191            retry=retry+1
192            self.log.info("Current dB = "+str(self.getdB(port)))
193            self.log.info("Fail to set Attenuator to "+str(dB)+", "
194                          +str(retry)+" times try to reset")
195            self.setdB(port,dB)
196        if retry ==10:
197            self.log.info("Retry Attenuator fail for 9 cycles, end test!")
198            sys.exit()
199        return 0
200
201    def getDG(self,port):
202        """Get turn table angle for current test.
203
204        Args:
205            port: Turn table com port
206        """
207        DG = ""
208        port.write('DG?;'.encode())
209        time.sleep(0.1)
210        data = port.readline().decode('utf-8')
211        for i in range(len(data)):
212            if (data[i].isdigit()) == True:
213                DG = DG + data[i]
214        if DG == "":
215            return -1
216        return int(DG)
217
218    def setDG(self,port,DG):
219        """Setup turn table angle for current test.
220
221        Args:
222            port: Turn table com port
223            DG: Turn table setup angle
224        """
225        if DG>359:
226            DG=359
227        elif DG<0:
228            DG=0
229        self.log.info("Set angle to "+str(DG))
230        InputDG=str('DG')+str(DG)+str(';')
231        port.write(InputDG.encode())
232
233    def checkDG(self,port,DG):
234        """Check turn table angle for current test.
235
236        Args:
237            port: Turn table com port
238            DG: Turn table setup angle
239        """
240        retrytime = self.TEST_TIMEOUT
241        retry = 0
242        while self.getDG(port)!=DG and retry<retrytime:
243            retry=retry+1
244            self.log.info('Current angle = '+str(self.getDG(port)))
245            self.log.info('Fail set angle to '+str(DG)+', '+str(retry)+' times try to reset')
246            self.setDG(port,DG)
247            time.sleep(10)
248        if retry == retrytime:
249            self.log.info('Retry turntable fail for '+str(retry)+' cycles, end test!')
250            sys.exit()
251        return 0
252
253    def getwifiinfo(self):
254        """Get WiFi RSSI/ link speed/ frequency for current test.
255
256        Returns:
257            [RSSI,LS,FR]: WiFi RSSI/ link speed/ frequency
258        """
259        def is_number(string):
260            for i in string:
261                if i.isdigit() == False:
262                    if (i=="-" or i=="."):
263                        continue
264                    return str(-1)
265            return string
266
267        try:
268            cmd = "adb shell iw wlan0 link"
269            wifiinfo = utils.subprocess.check_output(cmd,shell=True,
270                                                     timeout=self.TEST_TIMEOUT)
271            # Check RSSI
272            RSSI = wifiinfo.decode("utf-8")[wifiinfo.decode("utf-8").find("signal:") +
273                                            7:wifiinfo.decode("utf-8").find("dBm") - 1]
274            RSSI = RSSI.strip(' ')
275            RSSI = is_number(RSSI)
276            # Check link speed
277            LS = wifiinfo.decode("utf-8")[wifiinfo.decode("utf-8").find("bitrate:") +
278                                          8:wifiinfo.decode("utf-8").find("Bit/s") - 2]
279            LS = LS.strip(' ')
280            LS = is_number(LS)
281            # Check frequency
282            FR = wifiinfo.decode("utf-8")[wifiinfo.decode("utf-8").find("freq:") +
283                                          6:wifiinfo.decode("utf-8").find("freq:") + 10]
284            FR = FR.strip(' ')
285            FR = is_number(FR)
286        except:
287            return -1, -1, -1
288        return [RSSI,LS,FR]
289
290    def post_process_results(self, rvr_result):
291        """Saves JSON formatted results.
292
293        Args:
294            rvr_result: Dict containing attenuation, throughput and other meta
295            data
296        """
297        # Save output as text file
298        data=(rvr_result["test_name"],rvr_result["test_angle"],rvr_result["test_dB"],
299              rvr_result["throughput_TX"][0],rvr_result["throughput_RX"][0],
300              rvr_result["test_RSSI"],rvr_result["test_LS"],rvr_result["test_FR"])
301        self.csv_write(data)
302
303        results_file_path = "{}/{}_angle{}_{}dB.json".format(self.log_path,
304                                                        self.ssid,
305                                                        self.angle[self.ag],self.DB)
306        with open(results_file_path, 'w') as results_file:
307            json.dump(rvr_result, results_file, indent=4)
308
309    def connect_to_wifi_network(self, network):
310        """Connection logic for psk wifi networks.
311
312        Args:
313            params: Dictionary with network info.
314        """
315        SSID = network[WifiEnums.SSID_KEY]
316        self.dut.ed.clear_all_events()
317        wutils.start_wifi_connection_scan(self.dut)
318        scan_results = self.dut.droid.wifiGetScanResults()
319        wutils.assert_network_in_list({WifiEnums.SSID_KEY: SSID}, scan_results)
320        wutils.wifi_connect(self.dut, network, num_of_tries=3)
321
322    def run_iperf_client(self, network):
323        """Run iperf TX throughput after connection.
324
325        Args:
326            params: Dictionary with network info.
327
328        Returns:
329            rvr_result: Dict containing rvr_results
330        """
331        rvr_result = []
332        self.iperf_server.start(tag="TX_server_{}_angle{}_{}dB".format(
333            self.ssid,self.angle[self.ag],self.DB))
334        wait_time = 5
335        SSID = network[WifiEnums.SSID_KEY]
336        self.log.info("Starting iperf traffic TX through {}".format(SSID))
337        time.sleep(wait_time)
338        port_arg = "-p {} -J {}".format(self.iperf_server.port,
339                                        self.rvr_test_params["iperf_port_arg"])
340        success, data = self.dut.run_iperf_client(
341            self.rvr_test_params["iperf_server_address"],
342            port_arg,
343            timeout=self.rvr_test_params["iperf_duration"] + self.TEST_TIMEOUT)
344        # Parse and log result
345        client_output_path = os.path.join(
346            self.iperf_server.log_path, "IperfDUT,{},TX_client_{}_angle{}_{}dB".format(
347                self.iperf_server.port,self.ssid,self.angle[self.ag],self.DB))
348        with open(client_output_path, 'w') as out_file:
349            out_file.write("\n".join(data))
350        self.iperf_server.stop()
351
352        iperf_file = self.iperf_server.log_files[-1]
353        try:
354            iperf_result = ipf.IPerfResult(iperf_file)
355            curr_throughput = (math.fsum(iperf_result.instantaneous_rates[
356                self.rvr_test_params["iperf_ignored_interval"]:-1]) / len(
357                    iperf_result.instantaneous_rates[self.rvr_test_params[
358                        "iperf_ignored_interval"]:-1])) * 8 * (1.024**2)
359        except:
360            self.log.warning(
361                "ValueError: Cannot get iperf result. Setting to 0")
362            curr_throughput = 0
363        rvr_result.append(curr_throughput)
364        self.log.info("TX Throughput at {0:.2f} dB is {1:.2f} Mbps".format(
365            self.DB, curr_throughput))
366
367        self.log.debug(pprint.pformat(data))
368        asserts.assert_true(success, "Error occurred in iPerf traffic.")
369        return rvr_result
370
371    def run_iperf_server(self, network):
372        """Run iperf RX throughput after connection.
373
374        Args:
375            params: Dictionary with network info.
376
377        Returns:
378            rvr_result: Dict containing rvr_results
379        """
380        rvr_result = []
381        self.iperf_server.start(tag="RX_client_{}_angle{}_{}dB".format(
382            self.ssid,self.angle[self.ag],self.DB))
383        wait_time = 5
384        SSID = network[WifiEnums.SSID_KEY]
385        self.log.info("Starting iperf traffic RX through {}".format(SSID))
386        time.sleep(wait_time)
387        port_arg = "-p {} -J -R {}".format(self.iperf_server.port,
388                                           self.rvr_test_params["iperf_port_arg"])
389        success, data = self.dut.run_iperf_client(
390            self.rvr_test_params["iperf_server_address"],
391            port_arg,
392            timeout=self.rvr_test_params["iperf_duration"] + self.TEST_TIMEOUT)
393        # Parse and log result
394        client_output_path = os.path.join(
395        self.iperf_server.log_path, "IperfDUT,{},RX_server_{}_angle{}_{}dB".format(
396            self.iperf_server.port,self.ssid,self.angle[self.ag],self.DB))
397        with open(client_output_path, 'w') as out_file:
398            out_file.write("\n".join(data))
399        self.iperf_server.stop()
400
401        iperf_file = client_output_path
402        try:
403            iperf_result = ipf.IPerfResult(iperf_file)
404            curr_throughput = (math.fsum(iperf_result.instantaneous_rates[
405                self.rvr_test_params["iperf_ignored_interval"]:-1]) / len(
406                    iperf_result.instantaneous_rates[self.rvr_test_params[
407                        "iperf_ignored_interval"]:-1])) * 8 * (1.024**2)
408        except:
409            self.log.warning(
410                "ValueError: Cannot get iperf result. Setting to 0")
411            curr_throughput = 0
412        rvr_result.append(curr_throughput)
413        self.log.info("RX Throughput at {0:.2f} dB is {1:.2f} Mbps".format(
414            self.DB, curr_throughput))
415
416        self.log.debug(pprint.pformat(data))
417        asserts.assert_true(success, "Error occurred in iPerf traffic.")
418        return rvr_result
419
420    def iperf_test_func(self,network):
421        """Main function to test iperf TX/RX.
422
423        Args:
424            params: Dictionary with network info
425        """
426        if "rvr_test_params" in self.user_params:
427            # Initialize
428            rvr_result = {}
429            # Run RvR and log result
430            wifiinfo = self.getwifiinfo()
431            rvr_result["throughput_TX"] = self.run_iperf_client(network)
432            rvr_result["throughput_RX"] = self.run_iperf_server(network)
433            rvr_result["test_name"] = self.ssid
434            rvr_result["test_angle"] = self.angle[self.ag]
435            rvr_result["test_dB"] = self.DB
436            rvr_result["test_RSSI"] = wifiinfo[0]
437            rvr_result["test_LS"] = wifiinfo[1]
438            rvr_result["test_FR"] = wifiinfo[2]
439            self.post_process_results(rvr_result)
440
441    def rvr_test(self,network):
442        """Test function to run RvR.
443
444        The function runs an RvR test in the current device/AP configuration.
445        Function is called from another wrapper function that sets up the
446        testbed for the RvR test
447
448        Args:
449            params: Dictionary with network info
450        """
451        wait_time = 5
452        utils.subprocess.check_output('adb root', shell=True, timeout=20)
453        self.ssid = network[WifiEnums.SSID_KEY]
454        self.log.info("Start rvr test")
455        for i in range(len(self.angle)):
456          self.setDG(self.T1,self.angle[i])
457          time.sleep(wait_time)
458          self.checkDG(self.T1,self.angle[i])
459          self.set_Three_Att_dB(self.ATT1,self.ATT2,self.ATT3,0)
460          time.sleep(wait_time)
461          self.connect_to_wifi_network(network)
462          self.set_Three_Att_dB(self.ATT1,self.ATT2,self.ATT3,self.MindB)
463          for j in range(self.MindB,self.MaxdB+self.stepdB,self.stepdB):
464            self.DB=j
465            self.ag=i
466            self.set_Three_Att_dB(self.ATT1,self.ATT2,self.ATT3,self.DB)
467            self.iperf_test_func(network)
468          wutils.reset_wifi(self.dut)
469
470    """Tests"""
471
472    @test_tracker_info(uuid="93816af8-4c63-45f8-b296-cb49fae0b158")
473    def test_iot_connection_to_RVR_2G(self):
474        ssid_key = self.current_test_name.replace(self.iot_test_prefix, "")
475        self.rvr_test(self.ssid_map[ssid_key])
476
477    @test_tracker_info(uuid="e1a67e13-946f-4d91-aa73-3f945438a1ac")
478    def test_iot_connection_to_RVR_5G(self):
479        ssid_key = self.current_test_name.replace(self.iot_test_prefix, "")
480        self.rvr_test(self.ssid_map[ssid_key])