1#!/usr/bin/env python3 2# 3# Copyright 2018 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the 'License'); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an 'AS IS' BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import time 18 19import scapy.all as scapy 20 21from acts import asserts 22from acts.metrics.loggers.blackbox import BlackboxMetricLogger 23from acts.test_utils.power import IperfHelper as IPH 24from acts.test_utils.power import plot_utils 25import acts.test_utils.power.cellular.cellular_power_base_test as PWCEL 26from acts.test_utils.tel import tel_test_utils as telutils 27 28 29class PowerTelTrafficTest(PWCEL.PowerCellularLabBaseTest): 30 """ Cellular traffic power test. 31 32 Inherits from PowerCellularLabBaseTest. Parses config specific 33 to this kind of test. Contains methods to start data traffic 34 between a local instance of iPerf and one running in the dut. 35 36 """ 37 38 # Keywords for test name parameters 39 PARAM_DIRECTION = 'direction' 40 PARAM_DIRECTION_UL = 'ul' 41 PARAM_DIRECTION_DL = 'dl' 42 PARAM_DIRECTION_DL_UL = 'dlul' 43 PARAM_BANDWIDTH_LIMIT = 'blimit' 44 45 # Iperf waiting time 46 IPERF_MARGIN = 10 47 48 def __init__(self, controllers): 49 """ Class initialization. 50 51 Sets test parameters to initial values. 52 """ 53 54 super().__init__(controllers) 55 56 # These variables are passed to iPerf when starting data 57 # traffic with the -b parameter to limit throughput on 58 # the application layer. 59 self.bandwidth_limit_dl = None 60 self.bandwidth_limit_ul = None 61 62 # Throughput obtained from iPerf 63 self.iperf_results = {} 64 65 # Blackbox metrics loggers 66 67 self.dl_tput_logger = BlackboxMetricLogger.for_test_case( 68 metric_name='avg_dl_tput') 69 self.ul_tput_logger = BlackboxMetricLogger.for_test_case( 70 metric_name='avg_ul_tput') 71 72 def setup_class(self): 73 super().setup_class() 74 75 # Unpack test parameters used in this class 76 self.unpack_userparams(tcp_window_fraction=0, tcp_dumps=False) 77 78 # Verify that at least one PacketSender controller has been initialized 79 if not hasattr(self, 'packet_senders'): 80 raise RuntimeError('At least one packet sender controller needs ' 81 'to be defined in the test config files.') 82 83 def setup_test(self): 84 """ Executed before every test case. 85 86 Parses test configuration from the test name and prepares 87 the simulation for measurement. 88 """ 89 90 # Reset results at the start of the test 91 self.iperf_results = {} 92 93 # Call parent method first to setup simulation 94 if not super().setup_test(): 95 return False 96 97 # Traffic direction 98 99 values = self.consume_parameter(self.PARAM_DIRECTION, 1) 100 101 if not values: 102 self.log.warning("The keyword {} was not included in the testname " 103 "parameters. Setting to {} by default.".format( 104 self.PARAM_DIRECTION, 105 self.PARAM_DIRECTION_DL_UL)) 106 self.traffic_direction = self.PARAM_DIRECTION_DL_UL 107 elif values[1] in [ 108 self.PARAM_DIRECTION_DL, self.PARAM_DIRECTION_UL, 109 self.PARAM_DIRECTION_DL_UL 110 ]: 111 self.traffic_direction = values[1] 112 else: 113 self.log.error("The test name has to include parameter {} " 114 "followed by {}/{}/{}.".format( 115 self.PARAM_DIRECTION, self.PARAM_DIRECTION_UL, 116 self.PARAM_DIRECTION_DL, 117 self.PARAM_DIRECTION_DL_UL)) 118 return False 119 120 # Bandwidth limit 121 122 values = self.consume_parameter(self.PARAM_BANDWIDTH_LIMIT, 2) 123 124 if values: 125 self.bandwidth_limit_dl = values[1] 126 self.bandwidth_limit_ul = values[2] 127 else: 128 self.bandwidth_limit_dl = 0 129 self.bandwidth_limit_ul = 0 130 self.log.error( 131 "No bandwidth limit was indicated in the test parameters. " 132 "Setting to default value of 0 (no limit to bandwidth). To set " 133 "a different value include parameter '{}' followed by two " 134 "strings indicating downlink and uplink bandwidth limits for " 135 "iPerf.".format(self.PARAM_BANDWIDTH_LIMIT)) 136 137 # No errors when parsing parameters 138 return True 139 140 def teardown_test(self): 141 """Tear down necessary objects after test case is finished. 142 143 """ 144 145 super().teardown_test() 146 147 # Log the throughput values to Blackbox 148 self.dl_tput_logger.metric_value = self.iperf_results.get('DL', 0) 149 self.ul_tput_logger.metric_value = self.iperf_results.get('UL', 0) 150 151 # Log the throughput values to Spanner 152 self.power_logger.set_dl_tput(self.iperf_results.get('DL', 0)) 153 self.power_logger.set_ul_tput(self.iperf_results.get('UL', 0)) 154 155 try: 156 dl_max_throughput = self.simulation.maximum_downlink_throughput() 157 ul_max_throughput = self.simulation.maximum_uplink_throughput() 158 self.power_logger.set_dl_tput_threshold(dl_max_throughput) 159 self.power_logger.set_ul_tput_threshold(ul_max_throughput) 160 except NotImplementedError as e: 161 self.log.error("%s Downlink/uplink thresholds will not be " 162 "logged in the power proto" % e) 163 164 for ips in self.iperf_servers: 165 ips.stop() 166 167 def power_tel_traffic_test(self): 168 """ Measures power and throughput during data transmission. 169 170 Measurement step in this test. Starts iPerf client in the DUT and then 171 initiates power measurement. After that, DUT is connected again and 172 the result from iPerf is collected. Pass or fail is decided with a 173 threshold value. 174 """ 175 176 # Start data traffic 177 iperf_helpers = self.start_tel_traffic(self.dut) 178 179 # Measure power 180 result = self.collect_power_data() 181 182 # Wait for iPerf to finish 183 time.sleep(self.IPERF_MARGIN + 2) 184 185 # Collect throughput measurement 186 self.iperf_results = self.get_iperf_results(self.dut, iperf_helpers) 187 188 # Check if power measurement is below the required value 189 self.pass_fail_check(result.average_current) 190 191 return result.average_current, self.iperf_results 192 193 def get_iperf_results(self, device, iperf_helpers): 194 """ Pulls iperf results from the device. 195 196 Args: 197 device: the device from which iperf results need to be pulled. 198 199 Returns: 200 a dictionary containing DL/UL throughput in Mbit/s. 201 """ 202 203 # Pull TCP logs if enabled 204 if self.tcp_dumps: 205 self.log.info('Pulling TCP dumps.') 206 telutils.stop_adb_tcpdump(self.dut) 207 telutils.get_tcpdump_log(self.dut) 208 209 throughput = {} 210 211 for iph in iperf_helpers: 212 213 self.log.info("Getting {} throughput results.".format( 214 iph.traffic_direction)) 215 216 iperf_result = iph.process_iperf_results(device, self.log, 217 self.iperf_servers, 218 self.test_name) 219 220 throughput[iph.traffic_direction] = iperf_result 221 222 return throughput 223 224 def check_throughput_results(self, iperf_results): 225 """ Checks throughput results. 226 227 Compares the obtained throughput with the expected value 228 provided by the simulation class. 229 230 """ 231 232 for direction, throughput in iperf_results.items(): 233 try: 234 if direction == "UL": 235 expected_t = self.simulation.maximum_uplink_throughput() 236 elif direction == "DL": 237 expected_t = self.simulation.maximum_downlink_throughput() 238 else: 239 raise RuntimeError("Unexpected traffic direction value.") 240 except NotImplementedError: 241 # Some simulation classes might not have implemented the max 242 # throughput calculation yet. 243 self.log.debug("Expected throughput is not available for the " 244 "current simulation class.") 245 else: 246 247 self.log.info( 248 "The expected {} throughput is {} Mbit/s.".format( 249 direction, expected_t)) 250 asserts.assert_true( 251 0.90 < throughput / expected_t < 1.10, 252 "{} throughput differed more than 10% from the expected " 253 "value! ({}/{} = {})".format( 254 direction, round(throughput, 3), round(expected_t, 3), 255 round(throughput / expected_t, 3))) 256 257 def pass_fail_check(self, average_current=None): 258 """ Checks power consumption and throughput. 259 260 Uses the base class method to check power consumption. Also, compares 261 the obtained throughput with the expected value provided by the 262 simulation class. 263 264 """ 265 self.check_throughput_results(self.iperf_results) 266 super().pass_fail_check(average_current) 267 268 def start_tel_traffic(self, client_host): 269 """ Starts iPerf in the indicated device and initiates traffic. 270 271 Starts the required iperf clients and servers according to the traffic 272 pattern config in the current test. 273 274 Args: 275 client_host: device handler in which to start the iperf client. 276 277 Returns: 278 A list of iperf helpers. 279 """ 280 # The iPerf server is hosted in this computer 281 self.iperf_server_address = scapy.get_if_addr( 282 self.packet_senders[0].interface) 283 284 # Start iPerf traffic 285 iperf_helpers = [] 286 287 # If the tcp_window_fraction parameter was set, calculate the TCP 288 # window size as a fraction of the peak throughput. 289 ul_tcp_window = None 290 dl_tcp_window = None 291 if self.tcp_window_fraction == 0: 292 self.log.info("tcp_window_fraction was not indicated. " 293 "Disabling fixed TCP window.") 294 else: 295 try: 296 max_dl_tput = self.simulation.maximum_downlink_throughput() 297 max_ul_tput = self.simulation.maximum_uplink_throughput() 298 dl_tcp_window = max_dl_tput / self.tcp_window_fraction 299 ul_tcp_window = max_ul_tput / self.tcp_window_fraction 300 except NotImplementedError: 301 self.log.error("Maximum downlink/uplink throughput method not " 302 "implemented for %s." % 303 type(self.simulation).__name__) 304 305 if self.traffic_direction in [ 306 self.PARAM_DIRECTION_DL, self.PARAM_DIRECTION_DL_UL 307 ]: 308 # Downlink traffic 309 iperf_helpers.append( 310 self.start_iperf_traffic(client_host, 311 server_idx=len(iperf_helpers), 312 traffic_direction='DL', 313 window=dl_tcp_window, 314 bandwidth=self.bandwidth_limit_dl)) 315 316 if self.traffic_direction in [ 317 self.PARAM_DIRECTION_UL, self.PARAM_DIRECTION_DL_UL 318 ]: 319 # Uplink traffic 320 iperf_helpers.append( 321 self.start_iperf_traffic(client_host, 322 server_idx=len(iperf_helpers), 323 traffic_direction='UL', 324 window=ul_tcp_window, 325 bandwidth=self.bandwidth_limit_ul)) 326 327 # Enable TCP logger. 328 if self.tcp_dumps: 329 self.log.info('Enabling TCP logger.') 330 telutils.start_adb_tcpdump(self.dut) 331 332 return iperf_helpers 333 334 def start_iperf_traffic(self, 335 client_host, 336 server_idx, 337 traffic_direction, 338 bandwidth=0, 339 window=None): 340 """Starts iPerf data traffic. 341 342 Starts an iperf client in an android device and a server locally. 343 344 Args: 345 client_host: device handler in which to start the iperf client 346 server_idx: id of the iperf server to connect to 347 traffic_direction: has to be either 'UL' or 'DL' 348 bandwidth: bandwidth limit for data traffic 349 window: the tcp window. if None, no window will be passed to iperf 350 351 Returns: 352 An IperfHelper object for the started client/server pair. 353 """ 354 355 # Start the server locally 356 self.iperf_servers[server_idx].start() 357 358 config = { 359 'traffic_type': 'TCP', 360 'duration': 361 self.mon_duration + self.mon_offset + self.IPERF_MARGIN, 362 'start_meas_time': 4, 363 'server_idx': server_idx, 364 'port': self.iperf_servers[server_idx].port, 365 'traffic_direction': traffic_direction, 366 'window': window 367 } 368 369 # If bandwidth is equal to zero then no bandwidth requirements are set 370 if bandwidth > 0: 371 config['bandwidth'] = bandwidth 372 373 iph = IPH.IperfHelper(config) 374 375 # Start the client in the android device 376 client_host.adb.shell_nb( 377 "nohup >/dev/null 2>&1 sh -c 'iperf3 -c {} {} " 378 "&'".format(self.iperf_server_address, iph.iperf_args)) 379 380 self.log.info('{} iPerf started on port {}.'.format( 381 traffic_direction, iph.port)) 382 383 return iph 384 385 386class PowerTelRvRTest(PowerTelTrafficTest): 387 """ Gets Range vs Rate curves while measuring power consumption. 388 389 Uses PowerTelTrafficTest as a base class. 390 """ 391 392 # Test name configuration keywords 393 PARAM_SWEEP = "sweep" 394 PARAM_SWEEP_UPLINK = "uplink" 395 PARAM_SWEEP_DOWNLINK = "downlink" 396 397 # Sweep values. Need to be set before starting test by test 398 # function or child class. 399 downlink_power_sweep = None 400 uplink_power_sweep = None 401 402 def setup_test(self): 403 """ Executed before every test case. 404 405 Parses test configuration from the test name and prepares 406 the simulation for measurement. 407 """ 408 409 # Call parent method first to setup simulation 410 if not super().setup_test(): 411 return False 412 413 # Get which power value to sweep from config 414 415 try: 416 values = self.consume_parameter(self.PARAM_SWEEP, 1) 417 418 if values[1] == self.PARAM_SWEEP_UPLINK: 419 self.sweep = self.PARAM_SWEEP_UPLINK 420 elif values[1] == self.PARAM_SWEEP_DOWNLINK: 421 self.sweep = self.PARAM_SWEEP_DOWNLINK 422 else: 423 raise ValueError() 424 except: 425 self.log.error( 426 "The test name has to include parameter {} followed by " 427 "either {} or {}.".format(self.PARAM_SWEEP, 428 self.PARAM_SWEEP_DOWNLINK, 429 self.PARAM_SWEEP_UPLINK)) 430 return False 431 432 return True 433 434 def power_tel_rvr_test(self): 435 """ Main function for the RvR test. 436 437 Produces the RvR curve according to the indicated sweep values. 438 """ 439 440 if self.sweep == self.PARAM_SWEEP_DOWNLINK: 441 sweep_range = self.downlink_power_sweep 442 elif self.sweep == self.PARAM_SWEEP_UPLINK: 443 sweep_range = self.uplink_power_sweep 444 445 current = [] 446 throughput = [] 447 448 for pw in sweep_range: 449 450 if self.sweep == self.PARAM_SWEEP_DOWNLINK: 451 self.simulation.set_downlink_rx_power(self.simulation.bts1, pw) 452 elif self.sweep == self.PARAM_SWEEP_UPLINK: 453 self.simulation.set_uplink_tx_power(self.simulation.bts1, pw) 454 455 i, t = self.power_tel_traffic_test() 456 self.log.info("---------------------") 457 self.log.info("{} -- {} --".format(self.sweep, pw)) 458 self.log.info("{} ----- {}".format(i, t[0])) 459 self.log.info("---------------------") 460 461 current.append(i) 462 throughput.append(t[0]) 463 464 print(sweep_range) 465 print(current) 466 print(throughput) 467 468 469class PowerTelTxPowerSweepTest(PowerTelTrafficTest): 470 """ Gets Average Current vs Tx Power plot. 471 472 Uses PowerTelTrafficTest as a base class. 473 """ 474 475 # Test config keywords 476 KEY_TX_STEP = 'step' 477 KEY_UP_TOLERANCE = 'up_tolerance' 478 KEY_DOWN_TOLERANCE = 'down_tolerance' 479 480 # Test name parameters 481 PARAM_TX_POWER_SWEEP = 'sweep' 482 483 def setup_class(self): 484 super().setup_class() 485 self.unpack_userparams( 486 [self.KEY_TX_STEP, self.KEY_UP_TOLERANCE, self.KEY_DOWN_TOLERANCE]) 487 488 def setup_test(self): 489 """ Executed before every test case. 490 491 Parses test configuration from the test name and prepares 492 the simulation for measurement. 493 """ 494 # Call parent method first to setup simulation 495 if not super().setup_test(): 496 return False 497 498 # Determine power range to sweep from test case params 499 try: 500 values = self.consume_parameter(self.PARAM_TX_POWER_SWEEP, 2) 501 502 if len(values) == 3: 503 self.start_dbm = int(values[1].replace('n', '-')) 504 self.end_dbm = int(values[2].replace('n', '-')) 505 else: 506 raise ValueError('Not enough params specified for sweep.') 507 except ValueError as e: 508 self.log.error("Unable to parse test param sweep: {}".format(e)) 509 return False 510 511 return True 512 513 def pass_fail_check(self, currents, txs, iperf_results): 514 """ Compares the obtained throughput with the expected 515 value provided by the simulation class. Also, ensures 516 consecutive currents do not increase or decrease beyond 517 specified tolerance 518 """ 519 for iperf_result in iperf_results: 520 self.check_throughput_results(iperf_result) 521 522 # x = reference current value, y = next current value, i = index of x 523 for i, (x, y) in enumerate(zip(currents[::], currents[1::])): 524 measured_change = (y - x) / x * 100 525 asserts.assert_true( 526 -self.down_tolerance < measured_change < self.up_tolerance, 527 "Current went from {} to {} ({}%) between {} dBm and {} dBm. " 528 "Tolerance range: -{}% to {}%".format(x, y, measured_change, 529 txs[i], txs[i + 1], 530 self.down_tolerance, 531 self.up_tolerance)) 532 533 def create_power_plot(self, currents, txs): 534 """ Creates average current vs tx power plot 535 """ 536 tag = '{}_{}_{}'.format(self.test_name, self.dut.model, 537 self.dut.build_info['build_id']) 538 plot_utils.monsoon_tx_power_sweep_plot(self.mon_info, tag, currents, 539 txs) 540 541 def power_tel_tx_sweep(self): 542 """ Main function for the Tx power sweep test. 543 544 Produces a plot of power consumption vs tx power 545 """ 546 currents = [] 547 txs = [] 548 iperf_results = [] 549 for tx in range(self.start_dbm, self.end_dbm + 1, self.step): 550 551 self.simulation.set_uplink_tx_power(tx) 552 553 iperf_helpers = self.start_tel_traffic(self.dut) 554 555 # Measure power 556 result = self.collect_power_data() 557 558 # Wait for iPerf to finish 559 time.sleep(self.IPERF_MARGIN + 2) 560 561 # Collect and check throughput measurement 562 iperf_result = self.get_iperf_results(self.dut, iperf_helpers) 563 564 currents.append(result.average_current) 565 566 # Get the actual Tx power as measured from the callbox side 567 measured_tx = self.simulation.get_measured_ul_power() 568 569 txs.append(measured_tx) 570 iperf_results.append(iperf_result) 571 572 self.create_power_plot(currents, txs) 573 self.pass_fail_check(currents, txs, iperf_results) 574