1# Copyright 2020 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Stress test utility for repeating actions repeatedly on android devices. 16 17Configures multiple devices to simultaneously run through the same set of 18actions over and over, while keeping logs from various sources. Primarily 19designed for playing audio to the devices and scanning their log output for 20events, while running other adb commands in between. 21""" 22from __future__ import absolute_import 23from __future__ import division 24from __future__ import print_function 25 26import datetime 27from email import encoders 28from email.mime import text 29import email.mime.base as base 30import email.mime.multipart as multipart 31import logging 32import mimetypes 33import os 34import platform 35import re 36import shlex 37import signal 38import smtplib 39import socket 40import subprocess 41import sys 42import tempfile 43import threading 44import time 45import uuid 46import wave 47from absl import app 48from absl import flags 49import pexpect 50import queue 51import stress_test_common 52import stress_test_pb2 53from google.protobuf import text_format 54 55_SUMMARY_LINES = "-" * 73 56 57if sys.platform.startswith("win"): 58 pexpect = None 59 60_SUMMARY_COLUMNS = ( 61 "| Event Type | Event Count | Consecutive no event |") 62_SUMMARY_COL_FORMATT = "|%-25.25s|% 22d|% 22d|" 63 64FLAGS = flags.FLAGS 65flags.DEFINE_string("notification_address", "", 66 "Email address where to send notification events. Will " 67 "default to $USER@google.com if not provided. No emails " 68 "will be sent if suppress_notification_emails is True.") 69flags.DEFINE_bool("suppress_notification_emails", False, 70 "Prevents emails from being sent as notifications if True.") 71flags.DEFINE_string("test_name", None, 72 "Name of stress test to run. For example, if you set this " 73 "to 'dsp_trigger_sw_rejection', the stress test in " 74 "'stress_test.dsp_trigger_sw_rejection.ascii_proto' will " 75 "be loaded and executed.") 76# flags.mark_flag_as_required("test_name") 77flags.DEFINE_string("output_root", "./", 78 "Path where directory should be generated containing all " 79 "logs from devices and moved files.") 80flags.DEFINE_integer("num_iterations", None, 81 "If set to a positive number, the number of iterations of " 82 "the stress test to run. Otherwise, the test runs " 83 "forever.") 84flags.DEFINE_list("devices", [], 85 "Serial numbers of devices that should be included in the " 86 "stress test. If empty, all devices will be used.") 87flags.DEFINE_integer("print_summary_every_n", 10, 88 "Prints the summary to the log file every n iterations.") 89 90flags.DEFINE_string("email_sender_address", "", 91 "Account to use for sending notification emails.") 92flags.DEFINE_string("email_sender_password", "", 93 "Password to use for notification email account.") 94flags.DEFINE_string("email_smtp_server", "smtp.gmail.com", 95 "SMTP server to use for sending notification emails.") 96flags.DEFINE_integer("email_smtp_port", 465, 97 "Port to use for the notification SMTP server.") 98flags.DEFINE_integer("device_settle_time", 5, 99 "Time to wait for devices to settle.") 100flags.DEFINE_bool("use_sox", platform.system() != "Windows", 101 "Use sox for playback, otherwise, attempt to use platform " 102 "specific features.") 103flags.DEFINE_bool("attach_bugreport", True, 104 "Attach bugreport to email if test failed.") 105flags.DEFINE_bool("delete_data_dir", False, 106 "If true, code will delete all the files generated by this " 107 "test at the end.") 108 109if platform.system().startswith("CYGWIN"): 110 FLAGS.device_settle_time = 30 111 112 113def QueueWorker(worker_queue): 114 while True: 115 work = worker_queue.get() 116 try: 117 work() 118 except: # pylint:disable=bare-except 119 logging.exception("Exception in worker queue - task remains uncompleted.") 120 worker_queue.task_done() 121 122 123def SendNotificationEmail(subject, body, bugreport=None): 124 """Sends an email with the specified subject and body. 125 126 Also attach bugreport if bugreport location is provided as argument 127 128 Args: 129 subject: Subject of the email. 130 body: Body of the email. 131 bugreport: If provided, it will be attach to the email. 132 """ 133 if FLAGS.suppress_notification_emails: 134 logging.info("Email with subject '%s' has been suppressed", subject) 135 return 136 try: 137 # Assemble the message to send. 138 recpient_address = FLAGS.notification_address 139 message = multipart.MIMEMultipart("alternative") 140 message["From"] = "Stress Test on %s" % socket.gethostname() 141 message["To"] = recpient_address 142 message["Subject"] = subject 143 message.attach(text.MIMEText(body, "plain")) 144 message.attach(text.MIMEText("<pre>%s</pre>" % body, "html")) 145 146 if FLAGS.attach_bugreport and bugreport: 147 # buildozer: disable=unused-variable 148 ctype, _ = mimetypes.guess_type(bugreport) 149 maintype, subtype = ctype.split("/", 1) 150 with open(bugreport, "rb") as fp: 151 att = base.MIMEBase(maintype, subtype) 152 att.set_payload(fp.read()) 153 encoders.encode_base64(att) 154 att.add_header("Content-Disposition", "attachment", filename=bugreport) 155 message.attach(att) 156 157 # Send the message from our special account. 158 server = smtplib.SMTP_SSL(FLAGS.email_smtp_server, FLAGS.email_smtp_port) 159 server.login(FLAGS.email_sender_address, FLAGS.email_sender_password) 160 server.sendmail(FLAGS.email_sender_address, recpient_address, 161 message.as_string()) 162 server.quit() 163 logging.info("Email with subject '%s' has been sent", subject) 164 except: # pylint:disable=bare-except 165 logging.exception("Failed to send notification email") 166 167 168class ProcessLogger(threading.Thread): 169 170 class EventScanner(object): 171 172 def __init__(self, name, process_name, regexes): 173 """Struct to store the data about an event. 174 175 Args: 176 name: Name of event. 177 process_name: Name of the process that is being logged. 178 regexes: An iteratable of regex strings that indicate an event has 179 happened. 180 """ 181 182 self.name = name 183 self.process_name = process_name 184 self.searches = [re.compile(regex).search for regex in regexes] 185 self.count = 0 186 187 def ScanForEvent(self, line, lock=None): 188 """Checks the line for matches. If found, updates the internal counter.""" 189 190 for search in self.searches: 191 if search(line.decode("utf-8")): 192 # Grab the lock (if provided), update the counter, and release it. 193 if lock: lock.acquire() 194 self.count += 1 195 if lock: lock.release() 196 logging.info("Event '%s' detected on %s", self.name, 197 self.process_name) 198 199 def __init__(self, name, command, output, events, 200 restart_process, repeats_output_when_opened): 201 """Threaded class that monitors processes for events, and logs output. 202 203 Args: 204 name: The name of the process being logged. 205 command: A list of arguments to be passed to the subprocess to execute. 206 output: Name of output file to write process stdout to. If blank or None, 207 will not be generated. 208 events: An iterable of LoggingEventConfigs to look for in the output. 209 restart_process: Restart the process if it terminates by itself. This 210 should typically be true, but false for processes that only should be 211 run once and have their output logged. 212 repeats_output_when_opened: Set to true if the process will repeat the 213 output of a previous call when it is restarted. This will prevent 214 duplicate lines from being logged. 215 """ 216 super(ProcessLogger, self).__init__() 217 self.name = name 218 self.command = command 219 self.restart_process = restart_process 220 self.repeats_output_when_opened = repeats_output_when_opened 221 self.process = None 222 self.lock = threading.Lock() 223 self.looking = False 224 225 # Compile the list of regexes that we're supposed to be looking for. 226 self.events = [] 227 for event in events: 228 self.events.append(ProcessLogger.EventScanner(event.name, self.name, 229 event.regex)) 230 231 if output: 232 stress_test_common.MakeDirsIfNeeded(os.path.dirname(output)) 233 self.output_fp = open(output, "w") 234 logging.info("Logging device info to %s", output) 235 else: 236 self.output_fp = None 237 238 def GetEventCountsSinceLastCall(self): 239 """Returns the counts of all events since this method was last called.""" 240 event_map = {} 241 self.lock.acquire() 242 for event in self.events: 243 event_map[event.name] = event.count 244 event.count = 0 245 self.lock.release() 246 return event_map 247 248 def run(self): 249 last_line = None 250 should_log = True 251 first_run = True 252 self.lock.acquire() 253 last_run_time = 0 254 while self.restart_process: 255 self.lock.release() 256 if not first_run: 257 logging.info("Restarting process %s", "".join(str(self.command))) 258 time_since_last_run = datetime.datetime.now() - last_run_time 259 if time_since_last_run.total_seconds() < 1.0: 260 needed_delay = 1.0 - time_since_last_run.total_seconds() 261 logging.info("Delaying for %.2f seconds", needed_delay) 262 time.sleep(needed_delay) 263 else: 264 first_run = False 265 266 try: 267 if pexpect: 268 self.process = pexpect.spawn(" ".join(self.command), timeout=None) 269 output_source = self.process 270 else: 271 self.process = subprocess.Popen(self.command, stdout=subprocess.PIPE) 272 output_source = self.process.stdout 273 last_run_time = datetime.datetime.now() 274 for line in output_source: 275 # If the process we're logging likes to repeat its output, we need to 276 # look for the last line we saw before we start doing anything with 277 # these lines anymore. 278 if self.repeats_output_when_opened: 279 if not should_log: 280 if last_line == line: 281 should_log = True 282 continue 283 284 if self.output_fp: 285 self.output_fp.write(line.decode("utf-8").rstrip()) 286 self.output_fp.write("\n") 287 288 # Loop through all events we're watching for, to see if they occur on 289 # this line. If they do, update the fact that we've seen this event. 290 for event in self.events: 291 if self.looking: 292 event.ScanForEvent(line, lock=self.lock) 293 last_line = line 294 except: # pylint:disable=bare-except 295 logging.exception("Exception encountered running process") 296 finally: 297 if pexpect: 298 self.process.terminate() 299 else: 300 self.process.send_signal(signal.SIGTERM) 301 should_log = False 302 self.lock.acquire() 303 self.lock.release() 304 if pexpect: 305 if self.process.exitstatus is not None: 306 logging.info("Process finished - exit code %d", self.process.exitstatus) 307 else: 308 logging.info("Process finished - signal code %d", 309 self.process.signalstatus) 310 else: 311 if self.process.returncode is not None: 312 logging.info("Process finished - return code %d", 313 self.process.returncode) 314 else: 315 logging.info("Process finished - no return code") 316 317 def StopLogging(self): 318 if self.process: 319 self.lock.acquire() 320 self.restart_process = False 321 self.lock.release() 322 323 if pexpect: 324 self.process.kill(signal.SIGHUP) 325 self.process.kill(signal.SIGINT) 326 else: 327 self.process.send_signal(signal.SIGTERM) 328 329 330class Device(object): 331 332 SECONDS_TO_SLEEP_DURING_ROOT = 0.5 333 334 def __init__(self, serial_number, output_root, test_events, expected_result): 335 """Responsible for monitoring a specific device, and pulling files from it. 336 337 The actual work of the constructor will be handled asynchronously, you must 338 call WaitForTasks() before using the device. 339 340 Args: 341 serial_number: The device serial number. 342 output_root: The directory where to output log files/anything pulled from 343 the device. 344 test_events: The events (with conditions) that come from the StressTest 345 that should be evaluated at every iteration, along with a list of 346 actions to take when one of these events occur. For example, if there 347 have not been any detected hotword triggers, a bugreport can be 348 generated. 349 expected_result: Expected event count to pass the test. 350 """ 351 self.serial_number = serial_number 352 self.output_root = output_root 353 self.cmd_string_replacements = {} 354 self.iteration = 0 355 self.cmd_string_replacements["iteration"] = 0 356 self.cmd_string_replacements["serial_number"] = serial_number 357 self.cmd_string_replacements["output_root"] = output_root 358 self.name = None 359 self.process_loggers = [] 360 self.event_log = stress_test_pb2.EventLog() 361 self.cnt_per_iteration = expected_result 362 363 # Prepare the work queue, and offload the rest of the init into it. 364 self.work_queue = queue.Queue() 365 self.worker = threading.Thread(target=QueueWorker, args=[self.work_queue]) 366 self.worker.daemon = True 367 self.worker.name = self.name 368 self.worker.start() 369 self.abort_requested = False 370 self.remove_device = False 371 self.test_events = test_events 372 373 self.work_queue.put(self.__init_async__) 374 375 def __init_async__(self): 376 # Get the device type, and append it to the serial number. 377 self.device_type = self.Command(["shell", "getprop", 378 "ro.product.name"]).strip().decode("utf-8") 379 self.name = "%s_%s" % (self.device_type, self.serial_number) 380 self.worker.name = self.name 381 self.cmd_string_replacements["device"] = self.name 382 logging.info("Setting up device %s", self.name) 383 384 config = stress_test_common.LoadDeviceConfig(self.device_type, 385 self.serial_number) 386 387 # Get the device ready. 388 self.Root() 389 390 # Run any setup commands. 391 for cmd in config.setup_command: 392 result = self.Command( 393 shlex.split(cmd % self.cmd_string_replacements)).strip() 394 if result: 395 for line in result.splitlines(): 396 logging.info(line) 397 398 self.files_to_move = config.file_to_move 399 400 self.event_names = set([event.name for event in config.event]) 401 self.event_counter = {name: 0 for name in self.event_names} 402 self.iterations_since_event = {name: 0 for name in self.event_names} 403 404 for file_to_watch in config.file_to_watch: 405 # Are there any events that match up with this file? 406 events = [x for x in config.event if x.source == file_to_watch.source] 407 408 if file_to_watch.source == "LOGCAT": 409 command = [ 410 "adb", "-s", self.serial_number, "logcat", "-v", "usec", "" 411 ] 412 command.extend(["%s:S" % tag for tag in config.tag_to_suppress]) 413 name = "logcat_" + self.serial_number 414 else: 415 command = [ 416 "adb", "-s", self.serial_number, "shell", 417 "while : ; do cat %s 2>&1; done" % file_to_watch.source 418 ] 419 name = "%s_%s" % (os.path.basename( 420 file_to_watch.source), self.serial_number) 421 422 process_logger = ProcessLogger( 423 name, command, os.path.join( 424 self.output_root, 425 file_to_watch.destination % self.cmd_string_replacements), 426 events, True, file_to_watch.repeats_output_on_open) 427 self.process_loggers.append(process_logger) 428 process_logger.start() 429 430 # Add any of the background processes. 431 for daemon_process in config.daemon_process: 432 # Are there any events that match up with this file? 433 events = [x for x in config.event if x.source == daemon_process.name] 434 command = shlex.split( 435 daemon_process.command % self.cmd_string_replacements) 436 if daemon_process.destination: 437 output = os.path.join( 438 self.output_root, 439 daemon_process.destination % self.cmd_string_replacements) 440 else: 441 output = None 442 name = "%s_%s" % (daemon_process.name, self.serial_number) 443 process_logger = ProcessLogger(name, command, output, events, 444 daemon_process.restart, 445 daemon_process.repeats_output_on_open) 446 self.process_loggers.append(process_logger) 447 process_logger.start() 448 449 # Build up the list of events we can actually process. 450 self.__UpdateEventCounters(number_of_iterations=0) 451 test_events = self.test_events 452 self.test_events = [] 453 for event in test_events: 454 try: 455 eval(event.condition, # pylint:disable=eval-used 456 {"__builtins__": None}, self.__ValuesInEval()) 457 self.test_events.append(event) 458 except Exception as err: # pylint:disable=broad-except 459 logging.error("Test event %s is not compatible with %s", event.name, 460 self.name) 461 logging.error(str(err)) 462 # Make sure that device specific events don't have conditions. 463 self.device_events = [] 464 for event in config.test_event: 465 if not event.name: 466 logging.error("Device %s test event is missing a name", self.name) 467 continue 468 if event.condition: 469 self.test_events.append(event) 470 else: 471 self.device_events.append(event) 472 473 def StartLookingForEvents(self): 474 """Starts all child ProcessLoggers to start looking for events.""" 475 for process_logger in self.process_loggers: 476 process_logger.looking = True 477 478 def __ValuesInEval(self): 479 values_in_eval = {key: value for key, value 480 in list(self.event_counter.items())} 481 for key, value in list(self.iterations_since_event.items()): 482 values_in_eval["iterations_since_%s" % key] = value 483 return values_in_eval 484 485 def __GetExpectedEventCount(self, event): 486 if event == "logcat_iteration": 487 return -1 488 try: 489 event_cnt = getattr(self.cnt_per_iteration, event) 490 except AttributeError: 491 event_cnt = -1 492 logging.exception("%s is not an attribute of expected_result", event) 493 return event_cnt 494 495 def __UpdateEventCounters(self, number_of_iterations=1): 496 # Update the event counters 497 visited_events = set() 498 error_log = [] 499 for process_logger in self.process_loggers: 500 events = process_logger.GetEventCountsSinceLastCall() 501 for event, count in list(events.items()): 502 # Print log when there is any missed event 503 expected_count = self.__GetExpectedEventCount(event) 504 505 if expected_count > 0: 506 if count > expected_count * number_of_iterations: 507 logging.info( 508 "[STRESS_TEST] In iteration %d, got duplicated %s : %d", 509 self.iteration, self.name, count) 510 logging.info("[STRESS_TEST] Will count only : %d", 511 expected_count * number_of_iterations) 512 count = expected_count * number_of_iterations 513 514 if count: 515 self.event_counter[event] += count 516 visited_events.add(event) 517 518 if expected_count >= 0: 519 if expected_count * number_of_iterations != count: 520 error_log.append( 521 _SUMMARY_COL_FORMATT % 522 (event, count, expected_count * number_of_iterations)) 523 524 # Go clear all the events that weren't consecutive. 525 for event in self.iterations_since_event: 526 if event in visited_events: 527 self.iterations_since_event[event] = 0 528 else: 529 self.iterations_since_event[event] += number_of_iterations 530 531 if error_log: 532 logging.info(_SUMMARY_LINES) 533 logging.info(" iteration %d : Something wrong in %s.", 534 self.iteration, self.name) 535 logging.info(_SUMMARY_LINES) 536 logging.info(_SUMMARY_COLUMNS) 537 logging.info(_SUMMARY_LINES) 538 for line in error_log: 539 logging.info(line) 540 logging.info(_SUMMARY_LINES) 541 542 def ProcessEvents(self): 543 """Updates the event_counter and iterations_since_event maps.""" 544 self.work_queue.put(self.__ProcessEventsAsync) 545 546 def __ProcessEventsAsync(self): 547 # Move any files to the local machine that should be moved. 548 if self.files_to_move: 549 for file_to_move in self.files_to_move: 550 try: 551 self.Command(["pull", file_to_move.source, file_to_move.destination]) 552 except: # pylint:disable=bare-except 553 logging.exception("Failed to pull %s", file_to_move.source) 554 555 self.__UpdateEventCounters() 556 557 for event in self.test_events: 558 if eval(event.condition, # pylint:disable=eval-used 559 {"__builtins__": None}, self.__ValuesInEval()): 560 logging.info("Condition has been met for event '%s'", event.name) 561 # Write the updated event log. 562 event_log_details = self.event_log.event.add() 563 event_log_details.iteration = self.iteration 564 event_log_details.name = event.name 565 with open(os.path.join(self.output_root, 566 "%s_event_log.ascii_proto" % self.name), 567 "w") as fp: 568 text_format.PrintMessage(self.event_log, fp) 569 570 # Do whatever other actions that are part of the event. 571 self.__ProcessEventActionQueue(event) 572 573 # Run any device specific actions for this event. 574 for device_event in self.device_events: 575 if device_event.name == event.name: 576 self.__ProcessEventActionQueue(device_event) 577 578 # Set up the next iteration. 579 self.iteration += 1 580 self.cmd_string_replacements["iteration"] = self.iteration 581 582 def __ProcessEventActionQueue(self, event): 583 bugreport = None 584 for action in event.action: 585 if action == "BUGREPORT": 586 bugreport = self.TakeBugReport() 587 elif action.startswith("DUMPSYS "): 588 self.CaptureDumpsys(action[action.find(" ") + 1:]) 589 elif action == "NOTIFY": 590 SendNotificationEmail( 591 "%s had event '%s' occur" % (self.name, event.name), 592 "\n".join(["Current Summary:"] + self.GetSummaryLines()), bugreport) 593 elif action == "REMOVE_DEVICE": 594 logging.info("Removing %s from the test", self.serial_number) 595 self.remove_device = True 596 elif action == "ABORT": 597 logging.info("Abort requested") 598 self.abort_requested = True 599 else: 600 action %= self.cmd_string_replacements 601 logging.info("Running command %s on %s", action, self.name) 602 result = self.Command(shlex.split(action)).strip() 603 if result: 604 for line in result.splitlines(): 605 logging.info(line) 606 607 def Root(self): 608 self.Command(["root"]) 609 time.sleep(Device.SECONDS_TO_SLEEP_DURING_ROOT) 610 self.Command(["wait-for-device"]) 611 time.sleep(Device.SECONDS_TO_SLEEP_DURING_ROOT) 612 613 def Stop(self): 614 """Stops all file loggers attached to this device.""" 615 for process_logger in self.process_loggers: 616 process_logger.StopLogging() 617 self.process_loggers = [] 618 619 def Join(self): 620 for process_logger in self.process_loggers: 621 process_logger.join() 622 self.WaitForTasks() 623 624 def AsyncCommand(self, command, log_output=False): 625 self.work_queue.put( 626 lambda: self.__AsyncCommand(command, log_output=log_output)) 627 628 def __AsyncCommand(self, command, log_output=False): 629 result = self.Command(command).strip() 630 if result and log_output: 631 for line in result.splitlines(): 632 logging.info(line.decode("utf-8")) 633 634 def Command(self, command): 635 """Runs the provided command on this device.""" 636 if command[0] in {"bugreport", "root", "wait-for-device", "shell", 637 "logcat"}: 638 return subprocess.check_output( 639 ["adb", "-s", self.serial_number] + command) 640 elif command[0] == "DUMPSYS": 641 self.CaptureDumpsys(command[1]) 642 return "" 643 elif command[0] == "pull": 644 try: 645 files = subprocess.check_output( 646 ["adb", "-s", self.serial_number, "shell", "ls", command[1]] 647 ).strip().splitlines() 648 except subprocess.CalledProcessError: 649 return "" 650 if len(files) == 1 and "No such file or directory" in files[0]: 651 return "" 652 for source_file in files: 653 destination = os.path.join(self.output_root, 654 command[2] % self.cmd_string_replacements) 655 stress_test_common.MakeDirsIfNeeded(os.path.dirname(destination)) 656 logging.info("Moving %s from %s to %s", source_file, self.name, 657 destination) 658 subprocess.check_output(["adb", "-s", self.serial_number, "pull", 659 source_file, destination]) 660 if FLAGS.delete_data_dir: 661 subprocess.check_output([ 662 "adb", "-s", self.serial_number, "shell", "rm", "-rf", source_file 663 ]) 664 return "" 665 else: 666 return subprocess.check_output(command) 667 668 def TakeBugReport(self): 669 logging.info("Capturing bugreport on %s", self.name) 670 bugreport = os.path.join(self.output_root, 671 "%s_bugreport_iteration_%06d.zip" % 672 (self.name, self.iteration)) 673 sdk = int(self.Command( 674 ["shell", "getprop", "ro.build.version.sdk"]).strip()) 675 if sdk >= 24: # SDK 24 = Android N 676 with open(bugreport, "w") as bugreport_fp: 677 bugreport_fp.write(self.Command(["bugreport", bugreport])) 678 else: 679 bugreport_txt = os.path.join(self.output_root, 680 "%s_bugreport_iteration_%06d.txt" % 681 (self.name, self.iteration)) 682 with open(bugreport_txt, "w") as bugreport_fp: 683 bugreport_fp.write(self.Command(["bugreport"])) 684 self.Command(["zip", bugreport, bugreport_txt]) 685 686 self.Command(["pull", "/data/anr/traces.txt", 687 "%s_traces_iteration_%06d.txt" % (self.name, self.iteration)]) 688 self.Command(["pull", "/data/anr/traces.txt.bugreport", 689 "%s_traces_iteration_%06d.txt.bugreport" % (self.name, 690 self.iteration)]) 691 return bugreport 692 693 def CaptureDumpsys(self, dumpsys_unit): 694 logging.info("Taking dumpsys %s on %s", dumpsys_unit, self.name) 695 stress_test_common.MakeDirsIfNeeded(os.path.join(self.output_root, 696 self.name)) 697 with open(os.path.join(self.output_root, self.name, 698 "%s_%06d.txt" % (dumpsys_unit, self.iteration)), 699 "w") as dumpsys_fp: 700 dumpsys_fp.write(self.Command(["shell", "dumpsys", dumpsys_unit])) 701 702 def WaitForTasks(self): 703 self.work_queue.join() 704 705 def GetSummaryLines(self): 706 lines = [ 707 "Device {}".format(self.name), 708 _SUMMARY_LINES, _SUMMARY_COLUMNS, _SUMMARY_LINES 709 ] 710 for event, count in sorted(self.event_counter.items()): 711 lines.append(_SUMMARY_COL_FORMATT % ( 712 event, count, self.iterations_since_event[event])) 713 lines.append(_SUMMARY_LINES) 714 return lines 715 716 717def RunAsyncCommand(devices, command): 718 """Helper function for running async commands on many devices.""" 719 for device in devices: 720 device.AsyncCommand(command) 721 for device in devices: 722 device.WaitForTasks() 723 724 725class StressTest(object): 726 """Manages dispatching commands to devices/playing audio and events.""" 727 728 def __init__(self, output_root, test_name): 729 self.output_root = output_root 730 self.devices = [] 731 self.test_name = test_name 732 config = stress_test_pb2.StressTestConfig() 733 config_contents = stress_test_common.GetResourceContents( 734 os.path.join(stress_test_common.RESOURCE_DIR, 735 "stress_test.%s.ascii_proto" % test_name)) 736 text_format.Merge(config_contents, config) 737 self.events = config.event 738 self.setup_commands = config.setup_command 739 self.steps = config.step 740 self.audio_tempfiles = {} 741 self.uuid = str(uuid.uuid4()) 742 self.expected_result = None 743 self.iteration = 0 744 if config.expected_result: 745 self.expected_result = config.expected_result[0] 746 747 # Place all the audio files into temp files. 748 for step in self.steps: 749 if step.audio_file and step.audio_file not in self.audio_tempfiles: 750 # We can't delete the temp file on windows, since it gets nuked too 751 # early. 752 audio_tempfile = tempfile.NamedTemporaryFile( 753 delete=(platform.system() != "Windows"), 754 dir="." if platform.system().startswith("CYGWIN") else None 755 ) 756 if platform.system().startswith("CYGWIN"): 757 audio_tempfile.name = os.path.basename(audio_tempfile.name) 758 self.audio_tempfiles[step.audio_file] = audio_tempfile 759 if FLAGS.use_sox: 760 # Write out the raw PCM samples as a wave file. 761 audio_tempfile.write( 762 stress_test_common.GetResourceContents(step.audio_file)) 763 else: 764 # Make a temporary wave file for playout if we can't use sox. 765 wavefile = wave.open(audio_tempfile, "wb") 766 if step.audio_file_sample_rate <= 0: 767 step.audio_file_sample_rate = 16000 768 wavefile.setframerate(step.audio_file_sample_rate) 769 if step.audio_file_num_channels <= 0: 770 step.audio_file_num_channels = 1 771 wavefile.setnchannels(step.audio_file_num_channels) 772 if not step.audio_file_format: 773 wavefile.setsampwidth(2) 774 elif step.audio_file_format == "s8": 775 wavefile.setsampwidth(1) 776 elif step.audio_file_format == "s16": 777 wavefile.setsampwidth(2) 778 elif step.audio_file_format == "s32": 779 wavefile.setsampwidth(4) 780 else: 781 raise RuntimeError( 782 "Unsupported wave file format for %s" % step.audio_file) 783 wavefile.writeframes(stress_test_common.GetResourceContents( 784 step.audio_file)) 785 wavefile.close() 786 audio_tempfile.flush() 787 788 if platform.system() == "Windows": 789 audio_tempfile.close() 790 791 # Create all the devices that are attached to this machine. 792 for serial_number in self.GetActiveSerialNumbers(): 793 self.devices.append( 794 Device(serial_number, output_root, self.events, self.expected_result)) 795 if not self.devices: 796 raise app.UsageError("No devices connected") 797 798 self.devices.sort(key=lambda x: x.name) 799 800 # Make sure every device is done with their work for setup. 801 for device in self.devices: 802 device.WaitForTasks() 803 804 # Write out the info meta-data proto. Useful for doing analysis of the logs 805 # after the stress test has completed. 806 stress_test_info = stress_test_pb2.StressTestInfo() 807 stress_test_info.test_name = self.test_name 808 stress_test_info.test_description = config.description 809 stress_test_info.uuid = self.uuid 810 for device in self.devices: 811 device_pb = stress_test_info.device.add() 812 device_pb.device_type = device.device_type 813 device_pb.serial_number = device.serial_number 814 815 text_format.PrintMessage(stress_test_info, open(os.path.join( 816 self.output_root, "stress_test_info.ascii_proto"), "w")) 817 818 def GetActiveSerialNumbers(self): 819 serial_numbers = [] 820 for line in sorted( 821 subprocess.check_output(["adb", "devices"]).splitlines()): 822 if line.endswith(b"device"): 823 serial_number = line.split()[0].strip() 824 if FLAGS.devices and serial_number not in FLAGS.devices: 825 continue 826 serial_numbers.append(serial_number.decode("utf-8")) 827 return serial_numbers 828 829 def Start(self): 830 logging.info("Waiting for devices to settle") 831 time.sleep(5) 832 # Make a copy of the device list, as we'll be modifying this actual list. 833 devices = list(self.devices) 834 dropped_devices = [] 835 836 # If we have any setup commands, run them. 837 for command in self.setup_commands: 838 logging.info("Running command %s", command) 839 # Can't use the async command helper function since we need to get at 840 # the device cmd_string_replacements. 841 for device in devices: 842 device.AsyncCommand( 843 shlex.split(command % device.cmd_string_replacements), 844 log_output=True) 845 for device in devices: 846 device.WaitForTasks() 847 848 for device in devices: 849 device.StartLookingForEvents() 850 device.AsyncCommand(["shell", "log", "-t", "STRESS_TEST", 851 "Starting {%s} TZ=$(getprop persist.sys.timezone) " 852 "YEAR=$(date +%%Y)" % self.uuid], True) 853 self.iteration = 0 854 while True: 855 logging.info("Starting iteration %d", self.iteration) 856 # Perform all the actions specified in the test. 857 RunAsyncCommand(devices, [ 858 "shell", "log", "-t", "STRESS_TEST", 859 "Performing iteration %d $(head -n 3 " 860 "/proc/timer_list | tail -n 1)" % self.iteration 861 ]) 862 863 for step in self.steps: 864 if step.delay_before: 865 logging.info("Waiting for %.2f seconds", step.delay_before) 866 time.sleep(step.delay_before) 867 868 if step.audio_file: 869 logging.info("Playing %s", step.audio_file) 870 RunAsyncCommand(devices, ["shell", "log", "-t", "STRESS_TEST", 871 "Playing %s" % step.audio_file]) 872 873 if FLAGS.use_sox: 874 subprocess.check_call(["sox", "-q", 875 self.audio_tempfiles[step.audio_file].name, 876 "-d"]) 877 elif platform.system() == "Windows": 878 import winsound # pylint:disable=g-import-not-at-top 879 winsound.PlaySound(self.audio_tempfiles[step.audio_file].name, 880 winsound.SND_FILENAME | winsound.SND_NODEFAULT) 881 else: 882 raise app.RuntimeError("Unsupported platform for audio playback") 883 884 if step.command: 885 logging.info("Running command %s", step.command) 886 # Can't use the async command helper function since we need to get at 887 # the device cmd_string_replacements. 888 for device in devices: 889 device.AsyncCommand( 890 shlex.split(step.command % device.cmd_string_replacements), 891 log_output=True) 892 for device in devices: 893 device.WaitForTasks() 894 895 if step.delay_after: 896 logging.info("Waiting for %.2f seconds", step.delay_after) 897 time.sleep(step.delay_after) 898 899 RunAsyncCommand(devices, [ 900 "shell", "log", "-t", "STRESS_TEST", 901 "Iteration %d complete $(head -n 3 " 902 "/proc/timer_list | tail -n 1)" % self.iteration 903 ]) 904 self.iteration += 1 905 906 # TODO(somebody): Sometimes the logcat seems to get stuck and buffers for 907 # a bit. This throws off the event counts, so we should probably add some 908 # synchronization rules before we trigger any events. 909 910 # Go through each device, update the event counter, and see if we need to 911 # trigger any events. 912 devices_to_remove = [] 913 abort_requested = False 914 active_devices = self.GetActiveSerialNumbers() 915 for device in devices: 916 if device.serial_number in active_devices: 917 device.ProcessEvents() 918 else: 919 logging.error("Dropped device %s", device.name) 920 SendNotificationEmail( 921 "Dropped device %s" % device.name, 922 "Device %s is not longer present in the system" % device.name) 923 dropped_devices.append(device) 924 devices_to_remove.append(device) 925 926 # Check to see if any of the dropped devices have come back. If yes, grab 927 # a bug report. 928 for device in dropped_devices: 929 if device.serial_number in active_devices: 930 logging.info("Device %s reappeared", device.name) 931 device.Root() 932 device.TakeBugReport() 933 934 dropped_devices = [d for d in dropped_devices 935 if d.serial_number not in active_devices] 936 937 for device in devices: 938 device.WaitForTasks() 939 if device.remove_device: 940 devices_to_remove.append(device) 941 if device.abort_requested: 942 abort_requested = True 943 944 # Remove devices from our list of things to monitor if they've been marked 945 # for deletion. 946 if devices_to_remove: 947 for device in devices_to_remove: 948 device.Stop() 949 devices = [d for d in devices if d not in devices_to_remove] 950 951 # Print out the iteration summary. 952 if self.iteration % FLAGS.print_summary_every_n == 0: 953 for line in self.GetSummaryLines(): 954 logging.info(line) 955 956 # See if we need to break out of the outer loop. 957 if abort_requested or not devices: 958 break 959 if FLAGS.num_iterations: 960 if self.iteration >= FLAGS.num_iterations: 961 logging.info("Completed full iteration : %d", self.iteration) 962 break 963 SendNotificationEmail( 964 "Stress test %s completed" % (FLAGS.test_name), 965 "\n".join(["Summary:"] + self.GetSummaryLines())) 966 967 def Stop(self): 968 logging.debug("Stopping devices") 969 for device in self.devices: 970 device.Stop() 971 for device in self.devices: 972 device.Join() 973 974 def GetSummaryLines(self): 975 lines = [ 976 _SUMMARY_LINES, 977 "Conducted %d iterations out of %d" % 978 (self.iteration, FLAGS.num_iterations), 979 _SUMMARY_LINES 980 ] 981 for device in self.devices: 982 lines.extend(device.GetSummaryLines()) 983 lines.append(_SUMMARY_LINES) 984 return lines 985 986 987def main(unused_argv): 988 # Check to make sure that there are no other instances of ADB running - if 989 # there are, print a warning and wait a bit for them to see it and decide if 990 # they want to keep running, knowing that logs may be invalid. 991 try: 992 if "adb" in subprocess.check_output(["ps", "-ale"]).decode("utf-8"): 993 print("It looks like there are other instances of adb running. If these " 994 "other instances are also cating log files, you will not be " 995 "capturing everything in this stress test (so logs will be " 996 "invalid).") 997 print("Continuing in 3...", end=" ") 998 sys.stdout.flush() 999 for i in [2, 1, 0]: 1000 time.sleep(1) 1001 if i: 1002 print("%d..." % i, end=" ") 1003 else: 1004 print("") 1005 sys.stdout.flush() 1006 except OSError: 1007 print("Unexpected error:", sys.exc_info()[0]) 1008 if sys.platform.startswith("win"): 1009 pass 1010 else: 1011 raise 1012 1013 # Make the base output directory. 1014 output_root = os.path.join(FLAGS.output_root, "%s_%s" % ( 1015 FLAGS.test_name, datetime.datetime.now().strftime("%Y%m%d_%H%M%S"))) 1016 # output_root = os.path.join(FLAGS.output_root, FLAGS.test_name) 1017 stress_test_common.MakeDirsIfNeeded(output_root) 1018 1019 # Set up logging. 1020 formatter = logging.Formatter( 1021 "%(levelname)-1.1s %(asctime)s [%(threadName)-16.16s] %(message)s") 1022 root_logger = logging.getLogger() 1023 root_logger.setLevel(logging.INFO) 1024 root_logger.setLevel(logging.DEBUG) 1025 1026 file_handler = logging.FileHandler(os.path.join(output_root, 1027 "stress_test.log")) 1028 file_handler.setFormatter(formatter) 1029 root_logger.addHandler(file_handler) 1030 1031 console_handler = logging.StreamHandler() 1032 console_handler.setFormatter(formatter) 1033 root_logger.addHandler(console_handler) 1034 1035 stress_test = StressTest(output_root, FLAGS.test_name) 1036 try: 1037 stress_test.Start() 1038 finally: 1039 logging.info("Stopping device logging threads") 1040 stress_test.Stop() 1041 for line in stress_test.GetSummaryLines(): 1042 logging.info(line) 1043 if FLAGS.delete_data_dir: 1044 print("Deleting Data Dir") 1045 subprocess.check_output(["rm", "-r", "-f", output_root]) 1046 1047 1048if __name__ == "__main__": 1049 app.run(main) 1050