1#!/usr/bin/env python3 2# 3# Copyright 2018 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import itertools 18import pprint 19import time 20 21import acts.signals 22import acts.test_utils.wifi.wifi_test_utils as wutils 23 24from acts import asserts 25from acts.test_decorators import test_tracker_info 26from acts.test_utils.wifi.WifiBaseTest import WifiBaseTest 27from acts.controllers import iperf_server as ipf 28 29import json 30import logging 31import math 32import os 33from acts import utils 34import csv 35 36import serial 37import sys 38 39 40WifiEnums = wutils.WifiEnums 41 42 43class WifiRvrTWTest(WifiBaseTest): 44 """ Tests for wifi RVR performance 45 46 Test Bed Requirement: 47 * One Android device 48 * Wi-Fi networks visible to the device 49 """ 50 TEST_TIMEOUT = 10 51 52 def setup_class(self): 53 super().setup_class() 54 55 self.dut = self.android_devices[0] 56 wutils.wifi_test_device_init(self.dut) 57 58 req_params = [ "iot_networks","rvr_test_params"] 59 opt_params = [ "angle_params","usb_port"] 60 self.unpack_userparams(req_param_names=req_params, 61 opt_param_names=opt_params) 62 63 asserts.assert_true( 64 len(self.iot_networks) > 0, 65 "Need at least one iot network with psk.") 66 67 wutils.wifi_toggle_state(self.dut, True) 68 if "rvr_test_params" in self.user_params: 69 self.iperf_server = self.iperf_servers[0] 70 self.MaxdB= self.rvr_test_params ["rvr_atten_MaxDB"] 71 self.MindB= self.rvr_test_params ["rvr_atten_MinDB"] 72 self.stepdB= self.rvr_test_params ["rvr_atten_step"] 73 74 if "angle_params" in self.user_params: 75 self.angle = self.angle_params 76 77 if "usb_port" in self.user_params: 78 self.T1=self.readport(self.usb_port["turntable"]) 79 self.ATT1=self.readport(self.usb_port["atten1"]) 80 self.ATT2=self.readport(self.usb_port["atten2"]) 81 self.ATT3=self.readport(self.usb_port["atten3"]) 82 83 # create hashmap for testcase name and SSIDs 84 self.iot_test_prefix = "test_iot_connection_to_" 85 self.ssid_map = {} 86 for network in self.iot_networks: 87 SSID = network['SSID'].replace('-','_') 88 self.ssid_map[SSID] = network 89 90 # create folder for rvr test result 91 self.log_path = os.path.join(logging.log_path, "rvr_results") 92 os.makedirs(self.log_path, exist_ok=True) 93 94 Header=("test_SSID","Turn table (angle)","Attenuator(dBm)", 95 "TX throughput (Mbps)","RX throughput (Mbps)", 96 "RSSI","Link speed","Frequency") 97 self.csv_write(Header) 98 99 def setup_test(self): 100 self.dut.droid.wakeLockAcquireBright() 101 self.dut.droid.wakeUpNow() 102 103 def teardown_test(self): 104 self.dut.droid.wakeLockRelease() 105 self.dut.droid.goToSleepNow() 106 107 def teardown_class(self): 108 if "rvr_test_params" in self.user_params: 109 self.iperf_server.stop() 110 111 def on_fail(self, test_name, begin_time): 112 self.dut.take_bug_report(test_name, begin_time) 113 self.dut.cat_adb_log(test_name, begin_time) 114 115 """Helper Functions""" 116 117 def csv_write(self,data): 118 """Output .CSV file for test result. 119 120 Args: 121 data: Dict containing attenuation, throughput and other meta data. 122 """ 123 with open("{}/Result.csv".format(self.log_path), "a", newline="") as csv_file: 124 csv_writer = csv.writer(csv_file,delimiter=',') 125 csv_writer.writerow(data) 126 csv_file.close() 127 128 def readport(self,com): 129 """Read com port for current test. 130 131 Args: 132 com: Attenuator or turn table com port 133 """ 134 port=serial.Serial(com,9600,timeout=1) 135 time.sleep(1) 136 return port 137 138 def getdB(self,port): 139 """Get attenuator dB for current test. 140 141 Args: 142 port: Attenuator com port 143 """ 144 port.write('V?;'.encode()) 145 dB=port.readline().decode() 146 dB=dB.strip(';') 147 dB=dB[dB.find('V')+1:] 148 return int(dB) 149 150 def setdB(self,port,dB): 151 """Setup attenuator dB for current test. 152 153 Args: 154 port: Attenuator com port 155 dB: Attenuator setup dB 156 """ 157 if dB<0: 158 dB=0 159 elif dB>101: 160 dB=101 161 self.log.info("Set dB to "+str(dB)) 162 InputdB=str('V')+str(dB)+str(';') 163 port.write(InputdB.encode()) 164 time.sleep(0.1) 165 166 def set_Three_Att_dB(self,port1,port2,port3,dB): 167 """Setup 3 attenuator dB for current test. 168 169 Args: 170 port1: Attenuator1 com port 171 port1: Attenuator2 com port 172 port1: Attenuator com port 173 dB: Attenuator setup dB 174 """ 175 self.setdB(port1,dB) 176 self.setdB(port2,dB) 177 self.setdB(port3,dB) 178 self.checkdB(port1,dB) 179 self.checkdB(port2,dB) 180 self.checkdB(port3,dB) 181 182 def checkdB(self,port,dB): 183 """Check attenuator dB for current test. 184 185 Args: 186 port: Attenuator com port 187 dB: Attenuator setup dB 188 """ 189 retry=0 190 while self.getdB(port)!=dB and retry<10: 191 retry=retry+1 192 self.log.info("Current dB = "+str(self.getdB(port))) 193 self.log.info("Fail to set Attenuator to "+str(dB)+", " 194 +str(retry)+" times try to reset") 195 self.setdB(port,dB) 196 if retry ==10: 197 self.log.info("Retry Attenuator fail for 9 cycles, end test!") 198 sys.exit() 199 return 0 200 201 def getDG(self,port): 202 """Get turn table angle for current test. 203 204 Args: 205 port: Turn table com port 206 """ 207 DG = "" 208 port.write('DG?;'.encode()) 209 time.sleep(0.1) 210 data = port.readline().decode('utf-8') 211 for i in range(len(data)): 212 if (data[i].isdigit()) == True: 213 DG = DG + data[i] 214 if DG == "": 215 return -1 216 return int(DG) 217 218 def setDG(self,port,DG): 219 """Setup turn table angle for current test. 220 221 Args: 222 port: Turn table com port 223 DG: Turn table setup angle 224 """ 225 if DG>359: 226 DG=359 227 elif DG<0: 228 DG=0 229 self.log.info("Set angle to "+str(DG)) 230 InputDG=str('DG')+str(DG)+str(';') 231 port.write(InputDG.encode()) 232 233 def checkDG(self,port,DG): 234 """Check turn table angle for current test. 235 236 Args: 237 port: Turn table com port 238 DG: Turn table setup angle 239 """ 240 retrytime = self.TEST_TIMEOUT 241 retry = 0 242 while self.getDG(port)!=DG and retry<retrytime: 243 retry=retry+1 244 self.log.info('Current angle = '+str(self.getDG(port))) 245 self.log.info('Fail set angle to '+str(DG)+', '+str(retry)+' times try to reset') 246 self.setDG(port,DG) 247 time.sleep(10) 248 if retry == retrytime: 249 self.log.info('Retry turntable fail for '+str(retry)+' cycles, end test!') 250 sys.exit() 251 return 0 252 253 def getwifiinfo(self): 254 """Get WiFi RSSI/ link speed/ frequency for current test. 255 256 Returns: 257 [RSSI,LS,FR]: WiFi RSSI/ link speed/ frequency 258 """ 259 def is_number(string): 260 for i in string: 261 if i.isdigit() == False: 262 if (i=="-" or i=="."): 263 continue 264 return str(-1) 265 return string 266 267 try: 268 cmd = "adb shell iw wlan0 link" 269 wifiinfo = utils.subprocess.check_output(cmd,shell=True, 270 timeout=self.TEST_TIMEOUT) 271 # Check RSSI 272 RSSI = wifiinfo.decode("utf-8")[wifiinfo.decode("utf-8").find("signal:") + 273 7:wifiinfo.decode("utf-8").find("dBm") - 1] 274 RSSI = RSSI.strip(' ') 275 RSSI = is_number(RSSI) 276 # Check link speed 277 LS = wifiinfo.decode("utf-8")[wifiinfo.decode("utf-8").find("bitrate:") + 278 8:wifiinfo.decode("utf-8").find("Bit/s") - 2] 279 LS = LS.strip(' ') 280 LS = is_number(LS) 281 # Check frequency 282 FR = wifiinfo.decode("utf-8")[wifiinfo.decode("utf-8").find("freq:") + 283 6:wifiinfo.decode("utf-8").find("freq:") + 10] 284 FR = FR.strip(' ') 285 FR = is_number(FR) 286 except: 287 return -1, -1, -1 288 return [RSSI,LS,FR] 289 290 def post_process_results(self, rvr_result): 291 """Saves JSON formatted results. 292 293 Args: 294 rvr_result: Dict containing attenuation, throughput and other meta 295 data 296 """ 297 # Save output as text file 298 data=(rvr_result["test_name"],rvr_result["test_angle"],rvr_result["test_dB"], 299 rvr_result["throughput_TX"][0],rvr_result["throughput_RX"][0], 300 rvr_result["test_RSSI"],rvr_result["test_LS"],rvr_result["test_FR"]) 301 self.csv_write(data) 302 303 results_file_path = "{}/{}_angle{}_{}dB.json".format(self.log_path, 304 self.ssid, 305 self.angle[self.ag],self.DB) 306 with open(results_file_path, 'w') as results_file: 307 json.dump(rvr_result, results_file, indent=4) 308 309 def connect_to_wifi_network(self, network): 310 """Connection logic for psk wifi networks. 311 312 Args: 313 params: Dictionary with network info. 314 """ 315 SSID = network[WifiEnums.SSID_KEY] 316 self.dut.ed.clear_all_events() 317 wutils.start_wifi_connection_scan(self.dut) 318 scan_results = self.dut.droid.wifiGetScanResults() 319 wutils.assert_network_in_list({WifiEnums.SSID_KEY: SSID}, scan_results) 320 wutils.wifi_connect(self.dut, network, num_of_tries=3) 321 322 def run_iperf_client(self, network): 323 """Run iperf TX throughput after connection. 324 325 Args: 326 params: Dictionary with network info. 327 328 Returns: 329 rvr_result: Dict containing rvr_results 330 """ 331 rvr_result = [] 332 self.iperf_server.start(tag="TX_server_{}_angle{}_{}dB".format( 333 self.ssid,self.angle[self.ag],self.DB)) 334 wait_time = 5 335 SSID = network[WifiEnums.SSID_KEY] 336 self.log.info("Starting iperf traffic TX through {}".format(SSID)) 337 time.sleep(wait_time) 338 port_arg = "-p {} -J {}".format(self.iperf_server.port, 339 self.rvr_test_params["iperf_port_arg"]) 340 success, data = self.dut.run_iperf_client( 341 self.rvr_test_params["iperf_server_address"], 342 port_arg, 343 timeout=self.rvr_test_params["iperf_duration"] + self.TEST_TIMEOUT) 344 # Parse and log result 345 client_output_path = os.path.join( 346 self.iperf_server.log_path, "IperfDUT,{},TX_client_{}_angle{}_{}dB".format( 347 self.iperf_server.port,self.ssid,self.angle[self.ag],self.DB)) 348 with open(client_output_path, 'w') as out_file: 349 out_file.write("\n".join(data)) 350 self.iperf_server.stop() 351 352 iperf_file = self.iperf_server.log_files[-1] 353 try: 354 iperf_result = ipf.IPerfResult(iperf_file) 355 curr_throughput = (math.fsum(iperf_result.instantaneous_rates[ 356 self.rvr_test_params["iperf_ignored_interval"]:-1]) / len( 357 iperf_result.instantaneous_rates[self.rvr_test_params[ 358 "iperf_ignored_interval"]:-1])) * 8 * (1.024**2) 359 except: 360 self.log.warning( 361 "ValueError: Cannot get iperf result. Setting to 0") 362 curr_throughput = 0 363 rvr_result.append(curr_throughput) 364 self.log.info("TX Throughput at {0:.2f} dB is {1:.2f} Mbps".format( 365 self.DB, curr_throughput)) 366 367 self.log.debug(pprint.pformat(data)) 368 asserts.assert_true(success, "Error occurred in iPerf traffic.") 369 return rvr_result 370 371 def run_iperf_server(self, network): 372 """Run iperf RX throughput after connection. 373 374 Args: 375 params: Dictionary with network info. 376 377 Returns: 378 rvr_result: Dict containing rvr_results 379 """ 380 rvr_result = [] 381 self.iperf_server.start(tag="RX_client_{}_angle{}_{}dB".format( 382 self.ssid,self.angle[self.ag],self.DB)) 383 wait_time = 5 384 SSID = network[WifiEnums.SSID_KEY] 385 self.log.info("Starting iperf traffic RX through {}".format(SSID)) 386 time.sleep(wait_time) 387 port_arg = "-p {} -J -R {}".format(self.iperf_server.port, 388 self.rvr_test_params["iperf_port_arg"]) 389 success, data = self.dut.run_iperf_client( 390 self.rvr_test_params["iperf_server_address"], 391 port_arg, 392 timeout=self.rvr_test_params["iperf_duration"] + self.TEST_TIMEOUT) 393 # Parse and log result 394 client_output_path = os.path.join( 395 self.iperf_server.log_path, "IperfDUT,{},RX_server_{}_angle{}_{}dB".format( 396 self.iperf_server.port,self.ssid,self.angle[self.ag],self.DB)) 397 with open(client_output_path, 'w') as out_file: 398 out_file.write("\n".join(data)) 399 self.iperf_server.stop() 400 401 iperf_file = client_output_path 402 try: 403 iperf_result = ipf.IPerfResult(iperf_file) 404 curr_throughput = (math.fsum(iperf_result.instantaneous_rates[ 405 self.rvr_test_params["iperf_ignored_interval"]:-1]) / len( 406 iperf_result.instantaneous_rates[self.rvr_test_params[ 407 "iperf_ignored_interval"]:-1])) * 8 * (1.024**2) 408 except: 409 self.log.warning( 410 "ValueError: Cannot get iperf result. Setting to 0") 411 curr_throughput = 0 412 rvr_result.append(curr_throughput) 413 self.log.info("RX Throughput at {0:.2f} dB is {1:.2f} Mbps".format( 414 self.DB, curr_throughput)) 415 416 self.log.debug(pprint.pformat(data)) 417 asserts.assert_true(success, "Error occurred in iPerf traffic.") 418 return rvr_result 419 420 def iperf_test_func(self,network): 421 """Main function to test iperf TX/RX. 422 423 Args: 424 params: Dictionary with network info 425 """ 426 if "rvr_test_params" in self.user_params: 427 # Initialize 428 rvr_result = {} 429 # Run RvR and log result 430 wifiinfo = self.getwifiinfo() 431 rvr_result["throughput_TX"] = self.run_iperf_client(network) 432 rvr_result["throughput_RX"] = self.run_iperf_server(network) 433 rvr_result["test_name"] = self.ssid 434 rvr_result["test_angle"] = self.angle[self.ag] 435 rvr_result["test_dB"] = self.DB 436 rvr_result["test_RSSI"] = wifiinfo[0] 437 rvr_result["test_LS"] = wifiinfo[1] 438 rvr_result["test_FR"] = wifiinfo[2] 439 self.post_process_results(rvr_result) 440 441 def rvr_test(self,network): 442 """Test function to run RvR. 443 444 The function runs an RvR test in the current device/AP configuration. 445 Function is called from another wrapper function that sets up the 446 testbed for the RvR test 447 448 Args: 449 params: Dictionary with network info 450 """ 451 wait_time = 5 452 utils.subprocess.check_output('adb root', shell=True, timeout=20) 453 self.ssid = network[WifiEnums.SSID_KEY] 454 self.log.info("Start rvr test") 455 for i in range(len(self.angle)): 456 self.setDG(self.T1,self.angle[i]) 457 time.sleep(wait_time) 458 self.checkDG(self.T1,self.angle[i]) 459 self.set_Three_Att_dB(self.ATT1,self.ATT2,self.ATT3,0) 460 time.sleep(wait_time) 461 self.connect_to_wifi_network(network) 462 self.set_Three_Att_dB(self.ATT1,self.ATT2,self.ATT3,self.MindB) 463 for j in range(self.MindB,self.MaxdB+self.stepdB,self.stepdB): 464 self.DB=j 465 self.ag=i 466 self.set_Three_Att_dB(self.ATT1,self.ATT2,self.ATT3,self.DB) 467 self.iperf_test_func(network) 468 wutils.reset_wifi(self.dut) 469 470 """Tests""" 471 472 @test_tracker_info(uuid="93816af8-4c63-45f8-b296-cb49fae0b158") 473 def test_iot_connection_to_RVR_2G(self): 474 ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") 475 self.rvr_test(self.ssid_map[ssid_key]) 476 477 @test_tracker_info(uuid="e1a67e13-946f-4d91-aa73-3f945438a1ac") 478 def test_iot_connection_to_RVR_5G(self): 479 ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") 480 self.rvr_test(self.ssid_map[ssid_key])