1# Copyright 2020 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Stress test utility for repeating actions repeatedly on android devices.
16
17Configures multiple devices to simultaneously run through the same set of
18actions over and over, while keeping logs from various sources. Primarily
19designed for playing audio to the devices and scanning their log output for
20events, while running other adb commands in between.
21"""
22from __future__ import absolute_import
23from __future__ import division
24from __future__ import print_function
25
26import datetime
27from email import encoders
28from email.mime import text
29import email.mime.base as base
30import email.mime.multipart as multipart
31import logging
32import mimetypes
33import os
34import platform
35import re
36import shlex
37import signal
38import smtplib
39import socket
40import subprocess
41import sys
42import tempfile
43import threading
44import time
45import uuid
46import wave
47from absl import app
48from absl import flags
49import pexpect
50import queue
51import stress_test_common
52import stress_test_pb2
53from google.protobuf import text_format
54
55_SUMMARY_LINES = "-" * 73
56
57if sys.platform.startswith("win"):
58  pexpect = None
59
60_SUMMARY_COLUMNS = (
61    "|        Event Type       |      Event Count     | Consecutive no event |")
62_SUMMARY_COL_FORMATT = "|%-25.25s|% 22d|% 22d|"
63
64FLAGS = flags.FLAGS
65flags.DEFINE_string("notification_address", "",
66                    "Email address where to send notification events. Will "
67                    "default to $USER@google.com if not provided. No emails "
68                    "will be sent if suppress_notification_emails is True.")
69flags.DEFINE_bool("suppress_notification_emails", False,
70                  "Prevents emails from being sent as notifications if True.")
71flags.DEFINE_string("test_name", None,
72                    "Name of stress test to run. For example, if you set this "
73                    "to 'dsp_trigger_sw_rejection', the stress test in "
74                    "'stress_test.dsp_trigger_sw_rejection.ascii_proto' will "
75                    "be loaded and executed.")
76# flags.mark_flag_as_required("test_name")
77flags.DEFINE_string("output_root", "./",
78                    "Path where directory should be generated containing all "
79                    "logs from devices and moved files.")
80flags.DEFINE_integer("num_iterations", None,
81                     "If set to a positive number, the number of iterations of "
82                     "the stress test to run. Otherwise, the test runs "
83                     "forever.")
84flags.DEFINE_list("devices", [],
85                  "Serial numbers of devices that should be included in the "
86                  "stress test. If empty, all devices will be used.")
87flags.DEFINE_integer("print_summary_every_n", 10,
88                     "Prints the summary to the log file every n iterations.")
89
90flags.DEFINE_string("email_sender_address", "",
91                    "Account to use for sending notification emails.")
92flags.DEFINE_string("email_sender_password", "",
93                    "Password to use for notification email account.")
94flags.DEFINE_string("email_smtp_server", "smtp.gmail.com",
95                    "SMTP server to use for sending notification emails.")
96flags.DEFINE_integer("email_smtp_port", 465,
97                     "Port to use for the notification SMTP server.")
98flags.DEFINE_integer("device_settle_time", 5,
99                     "Time to wait for devices to settle.")
100flags.DEFINE_bool("use_sox", platform.system() != "Windows",
101                  "Use sox for playback, otherwise, attempt to use platform "
102                  "specific features.")
103flags.DEFINE_bool("attach_bugreport", True,
104                  "Attach bugreport to email if test failed.")
105flags.DEFINE_bool("delete_data_dir", False,
106                  "If true, code will delete all the files generated by this "
107                  "test at the end.")
108
109if platform.system().startswith("CYGWIN"):
110  FLAGS.device_settle_time = 30
111
112
113def QueueWorker(worker_queue):
114  while True:
115    work = worker_queue.get()
116    try:
117      work()
118    except:  # pylint:disable=bare-except
119      logging.exception("Exception in worker queue - task remains uncompleted.")
120    worker_queue.task_done()
121
122
123def SendNotificationEmail(subject, body, bugreport=None):
124  """Sends an email with the specified subject and body.
125
126     Also attach bugreport if bugreport location is provided as argument
127
128  Args:
129    subject: Subject of the email.
130    body: Body of the email.
131    bugreport: If provided, it will be attach to the email.
132  """
133  if FLAGS.suppress_notification_emails:
134    logging.info("Email with subject '%s' has been suppressed", subject)
135    return
136  try:
137    # Assemble the message to send.
138    recpient_address = FLAGS.notification_address
139    message = multipart.MIMEMultipart("alternative")
140    message["From"] = "Stress Test on %s" % socket.gethostname()
141    message["To"] = recpient_address
142    message["Subject"] = subject
143    message.attach(text.MIMEText(body, "plain"))
144    message.attach(text.MIMEText("<pre>%s</pre>" % body, "html"))
145
146    if FLAGS.attach_bugreport and bugreport:
147      # buildozer: disable=unused-variable
148      ctype, _ = mimetypes.guess_type(bugreport)
149      maintype, subtype = ctype.split("/", 1)
150      with open(bugreport, "rb") as fp:
151        att = base.MIMEBase(maintype, subtype)
152        att.set_payload(fp.read())
153        encoders.encode_base64(att)
154        att.add_header("Content-Disposition", "attachment", filename=bugreport)
155        message.attach(att)
156
157    # Send the message from our special account.
158    server = smtplib.SMTP_SSL(FLAGS.email_smtp_server, FLAGS.email_smtp_port)
159    server.login(FLAGS.email_sender_address, FLAGS.email_sender_password)
160    server.sendmail(FLAGS.email_sender_address, recpient_address,
161                    message.as_string())
162    server.quit()
163    logging.info("Email with subject '%s' has been sent", subject)
164  except:  # pylint:disable=bare-except
165    logging.exception("Failed to send notification email")
166
167
168class ProcessLogger(threading.Thread):
169
170  class EventScanner(object):
171
172    def __init__(self, name, process_name, regexes):
173      """Struct to store the data about an event.
174
175      Args:
176        name: Name of event.
177        process_name: Name of the process that is being logged.
178        regexes: An iteratable of regex strings that indicate an event has
179            happened.
180      """
181
182      self.name = name
183      self.process_name = process_name
184      self.searches = [re.compile(regex).search for regex in regexes]
185      self.count = 0
186
187    def ScanForEvent(self, line, lock=None):
188      """Checks the line for matches. If found, updates the internal counter."""
189
190      for search in self.searches:
191        if search(line.decode("utf-8")):
192          # Grab the lock (if provided), update the counter, and release it.
193          if lock: lock.acquire()
194          self.count += 1
195          if lock: lock.release()
196          logging.info("Event '%s' detected on %s", self.name,
197                       self.process_name)
198
199  def __init__(self, name, command, output, events,
200               restart_process, repeats_output_when_opened):
201    """Threaded class that monitors processes for events, and logs output.
202
203    Args:
204      name: The name of the process being logged.
205      command: A list of arguments to be passed to the subprocess to execute.
206      output: Name of output file to write process stdout to. If blank or None,
207          will not be generated.
208      events: An iterable of LoggingEventConfigs to look for in the output.
209      restart_process: Restart the process if it terminates by itself. This
210          should typically be true, but false for processes that only should be
211          run once and have their output logged.
212      repeats_output_when_opened: Set to true if the process will repeat the
213          output of a previous call when it is restarted. This will prevent
214          duplicate lines from being logged.
215    """
216    super(ProcessLogger, self).__init__()
217    self.name = name
218    self.command = command
219    self.restart_process = restart_process
220    self.repeats_output_when_opened = repeats_output_when_opened
221    self.process = None
222    self.lock = threading.Lock()
223    self.looking = False
224
225    # Compile the list of regexes that we're supposed to be looking for.
226    self.events = []
227    for event in events:
228      self.events.append(ProcessLogger.EventScanner(event.name, self.name,
229                                                    event.regex))
230
231    if output:
232      stress_test_common.MakeDirsIfNeeded(os.path.dirname(output))
233      self.output_fp = open(output, "w")
234      logging.info("Logging device info to %s", output)
235    else:
236      self.output_fp = None
237
238  def GetEventCountsSinceLastCall(self):
239    """Returns the counts of all events since this method was last called."""
240    event_map = {}
241    self.lock.acquire()
242    for event in self.events:
243      event_map[event.name] = event.count
244      event.count = 0
245    self.lock.release()
246    return event_map
247
248  def run(self):
249    last_line = None
250    should_log = True
251    first_run = True
252    self.lock.acquire()
253    last_run_time = 0
254    while self.restart_process:
255      self.lock.release()
256      if not first_run:
257        logging.info("Restarting process %s", "".join(str(self.command)))
258        time_since_last_run = datetime.datetime.now() - last_run_time
259        if time_since_last_run.total_seconds() < 1.0:
260          needed_delay = 1.0 - time_since_last_run.total_seconds()
261          logging.info("Delaying for %.2f seconds", needed_delay)
262          time.sleep(needed_delay)
263      else:
264        first_run = False
265
266      try:
267        if pexpect:
268          self.process = pexpect.spawn(" ".join(self.command), timeout=None)
269          output_source = self.process
270        else:
271          self.process = subprocess.Popen(self.command, stdout=subprocess.PIPE)
272          output_source = self.process.stdout
273        last_run_time = datetime.datetime.now()
274        for line in output_source:
275          # If the process we're logging likes to repeat its output, we need to
276          # look for the last line we saw before we start doing anything with
277          # these lines anymore.
278          if self.repeats_output_when_opened:
279            if not should_log:
280              if last_line == line:
281                should_log = True
282              continue
283
284          if self.output_fp:
285            self.output_fp.write(line.decode("utf-8").rstrip())
286            self.output_fp.write("\n")
287
288          # Loop through all events we're watching for, to see if they occur on
289          # this line. If they do, update the fact that we've seen this event.
290          for event in self.events:
291            if self.looking:
292              event.ScanForEvent(line, lock=self.lock)
293          last_line = line
294      except:  # pylint:disable=bare-except
295        logging.exception("Exception encountered running process")
296      finally:
297        if pexpect:
298          self.process.terminate()
299        else:
300          self.process.send_signal(signal.SIGTERM)
301        should_log = False
302      self.lock.acquire()
303    self.lock.release()
304    if pexpect:
305      if self.process.exitstatus is not None:
306        logging.info("Process finished - exit code %d", self.process.exitstatus)
307      else:
308        logging.info("Process finished - signal code %d",
309                     self.process.signalstatus)
310    else:
311      if self.process.returncode is not None:
312        logging.info("Process finished - return code %d",
313                     self.process.returncode)
314      else:
315        logging.info("Process finished - no return code")
316
317  def StopLogging(self):
318    if self.process:
319      self.lock.acquire()
320      self.restart_process = False
321      self.lock.release()
322
323      if pexpect:
324        self.process.kill(signal.SIGHUP)
325        self.process.kill(signal.SIGINT)
326      else:
327        self.process.send_signal(signal.SIGTERM)
328
329
330class Device(object):
331
332  SECONDS_TO_SLEEP_DURING_ROOT = 0.5
333
334  def __init__(self, serial_number, output_root, test_events, expected_result):
335    """Responsible for monitoring a specific device, and pulling files from it.
336
337    The actual work of the constructor will be handled asynchronously, you must
338    call WaitForTasks() before using the device.
339
340    Args:
341      serial_number: The device serial number.
342      output_root: The directory where to output log files/anything pulled from
343          the device.
344      test_events: The events (with conditions) that come from the StressTest
345          that should be evaluated at every iteration, along with a list of
346          actions to take when one of these events occur. For example, if there
347          have not been any detected hotword triggers, a bugreport can be
348          generated.
349      expected_result: Expected event count to pass the test.
350    """
351    self.serial_number = serial_number
352    self.output_root = output_root
353    self.cmd_string_replacements = {}
354    self.iteration = 0
355    self.cmd_string_replacements["iteration"] = 0
356    self.cmd_string_replacements["serial_number"] = serial_number
357    self.cmd_string_replacements["output_root"] = output_root
358    self.name = None
359    self.process_loggers = []
360    self.event_log = stress_test_pb2.EventLog()
361    self.cnt_per_iteration = expected_result
362
363    # Prepare the work queue, and offload the rest of the init into it.
364    self.work_queue = queue.Queue()
365    self.worker = threading.Thread(target=QueueWorker, args=[self.work_queue])
366    self.worker.daemon = True
367    self.worker.name = self.name
368    self.worker.start()
369    self.abort_requested = False
370    self.remove_device = False
371    self.test_events = test_events
372
373    self.work_queue.put(self.__init_async__)
374
375  def __init_async__(self):
376    # Get the device type, and append it to the serial number.
377    self.device_type = self.Command(["shell", "getprop",
378                                     "ro.product.name"]).strip().decode("utf-8")
379    self.name = "%s_%s" % (self.device_type, self.serial_number)
380    self.worker.name = self.name
381    self.cmd_string_replacements["device"] = self.name
382    logging.info("Setting up device %s", self.name)
383
384    config = stress_test_common.LoadDeviceConfig(self.device_type,
385                                                 self.serial_number)
386
387    # Get the device ready.
388    self.Root()
389
390    # Run any setup commands.
391    for cmd in config.setup_command:
392      result = self.Command(
393          shlex.split(cmd % self.cmd_string_replacements)).strip()
394      if result:
395        for line in result.splitlines():
396          logging.info(line)
397
398    self.files_to_move = config.file_to_move
399
400    self.event_names = set([event.name for event in config.event])
401    self.event_counter = {name: 0 for name in self.event_names}
402    self.iterations_since_event = {name: 0 for name in self.event_names}
403
404    for file_to_watch in config.file_to_watch:
405      # Are there any events that match up with this file?
406      events = [x for x in config.event if x.source == file_to_watch.source]
407
408      if file_to_watch.source == "LOGCAT":
409        command = [
410            "adb", "-s", self.serial_number, "logcat", "-v", "usec", ""
411        ]
412        command.extend(["%s:S" % tag for tag in config.tag_to_suppress])
413        name = "logcat_" + self.serial_number
414      else:
415        command = [
416            "adb", "-s", self.serial_number, "shell",
417            "while : ; do cat %s 2>&1; done" % file_to_watch.source
418        ]
419        name = "%s_%s" % (os.path.basename(
420            file_to_watch.source), self.serial_number)
421
422      process_logger = ProcessLogger(
423          name, command, os.path.join(
424              self.output_root,
425              file_to_watch.destination % self.cmd_string_replacements),
426          events, True, file_to_watch.repeats_output_on_open)
427      self.process_loggers.append(process_logger)
428      process_logger.start()
429
430    # Add any of the background processes.
431    for daemon_process in config.daemon_process:
432      # Are there any events that match up with this file?
433      events = [x for x in config.event if x.source == daemon_process.name]
434      command = shlex.split(
435          daemon_process.command % self.cmd_string_replacements)
436      if daemon_process.destination:
437        output = os.path.join(
438            self.output_root,
439            daemon_process.destination % self.cmd_string_replacements)
440      else:
441        output = None
442      name = "%s_%s" % (daemon_process.name, self.serial_number)
443      process_logger = ProcessLogger(name, command, output, events,
444                                     daemon_process.restart,
445                                     daemon_process.repeats_output_on_open)
446      self.process_loggers.append(process_logger)
447      process_logger.start()
448
449    # Build up the list of events we can actually process.
450    self.__UpdateEventCounters(number_of_iterations=0)
451    test_events = self.test_events
452    self.test_events = []
453    for event in test_events:
454      try:
455        eval(event.condition,  # pylint:disable=eval-used
456             {"__builtins__": None}, self.__ValuesInEval())
457        self.test_events.append(event)
458      except Exception as err:  # pylint:disable=broad-except
459        logging.error("Test event %s is not compatible with %s", event.name,
460                      self.name)
461        logging.error(str(err))
462    # Make sure that device specific events don't have conditions.
463    self.device_events = []
464    for event in config.test_event:
465      if not event.name:
466        logging.error("Device %s test event is missing a name", self.name)
467        continue
468      if event.condition:
469        self.test_events.append(event)
470      else:
471        self.device_events.append(event)
472
473  def StartLookingForEvents(self):
474    """Starts all child ProcessLoggers to start looking for events."""
475    for process_logger in self.process_loggers:
476      process_logger.looking = True
477
478  def __ValuesInEval(self):
479    values_in_eval = {key: value for key, value
480                      in list(self.event_counter.items())}
481    for key, value in list(self.iterations_since_event.items()):
482      values_in_eval["iterations_since_%s" % key] = value
483    return values_in_eval
484
485  def __GetExpectedEventCount(self, event):
486    if event == "logcat_iteration":
487      return -1
488    try:
489      event_cnt = getattr(self.cnt_per_iteration, event)
490    except AttributeError:
491      event_cnt = -1
492      logging.exception("%s is not an attribute of expected_result", event)
493    return event_cnt
494
495  def __UpdateEventCounters(self, number_of_iterations=1):
496    # Update the event counters
497    visited_events = set()
498    error_log = []
499    for process_logger in self.process_loggers:
500      events = process_logger.GetEventCountsSinceLastCall()
501      for event, count in list(events.items()):
502        # Print log when there is any missed event
503        expected_count = self.__GetExpectedEventCount(event)
504
505        if expected_count > 0:
506          if count > expected_count * number_of_iterations:
507            logging.info(
508                "[STRESS_TEST] In iteration %d, got duplicated %s : %d",
509                self.iteration, self.name, count)
510            logging.info("[STRESS_TEST] Will count only : %d",
511                         expected_count * number_of_iterations)
512            count = expected_count * number_of_iterations
513
514        if count:
515          self.event_counter[event] += count
516          visited_events.add(event)
517
518        if expected_count >= 0:
519          if expected_count * number_of_iterations != count:
520            error_log.append(
521                _SUMMARY_COL_FORMATT %
522                (event, count, expected_count * number_of_iterations))
523
524    # Go clear all the events that weren't consecutive.
525    for event in self.iterations_since_event:
526      if event in visited_events:
527        self.iterations_since_event[event] = 0
528      else:
529        self.iterations_since_event[event] += number_of_iterations
530
531    if error_log:
532      logging.info(_SUMMARY_LINES)
533      logging.info(" iteration %d : Something wrong in %s.",
534                   self.iteration, self.name)
535      logging.info(_SUMMARY_LINES)
536      logging.info(_SUMMARY_COLUMNS)
537      logging.info(_SUMMARY_LINES)
538      for line in error_log:
539        logging.info(line)
540      logging.info(_SUMMARY_LINES)
541
542  def ProcessEvents(self):
543    """Updates the event_counter and iterations_since_event maps."""
544    self.work_queue.put(self.__ProcessEventsAsync)
545
546  def __ProcessEventsAsync(self):
547    # Move any files to the local machine that should be moved.
548    if self.files_to_move:
549      for file_to_move in self.files_to_move:
550        try:
551          self.Command(["pull", file_to_move.source, file_to_move.destination])
552        except:  # pylint:disable=bare-except
553          logging.exception("Failed to pull %s", file_to_move.source)
554
555    self.__UpdateEventCounters()
556
557    for event in self.test_events:
558      if eval(event.condition,  # pylint:disable=eval-used
559              {"__builtins__": None}, self.__ValuesInEval()):
560        logging.info("Condition has been met for event '%s'", event.name)
561        # Write the updated event log.
562        event_log_details = self.event_log.event.add()
563        event_log_details.iteration = self.iteration
564        event_log_details.name = event.name
565        with open(os.path.join(self.output_root,
566                               "%s_event_log.ascii_proto" % self.name),
567                  "w") as fp:
568          text_format.PrintMessage(self.event_log, fp)
569
570        # Do whatever other actions that are part of the event.
571        self.__ProcessEventActionQueue(event)
572
573        # Run any device specific actions for this event.
574        for device_event in self.device_events:
575          if device_event.name == event.name:
576            self.__ProcessEventActionQueue(device_event)
577
578    # Set up the next iteration.
579    self.iteration += 1
580    self.cmd_string_replacements["iteration"] = self.iteration
581
582  def __ProcessEventActionQueue(self, event):
583    bugreport = None
584    for action in event.action:
585      if action == "BUGREPORT":
586        bugreport = self.TakeBugReport()
587      elif action.startswith("DUMPSYS "):
588        self.CaptureDumpsys(action[action.find(" ") + 1:])
589      elif action == "NOTIFY":
590        SendNotificationEmail(
591            "%s had event '%s' occur" % (self.name, event.name),
592            "\n".join(["Current Summary:"] + self.GetSummaryLines()), bugreport)
593      elif action == "REMOVE_DEVICE":
594        logging.info("Removing %s from the test", self.serial_number)
595        self.remove_device = True
596      elif action == "ABORT":
597        logging.info("Abort requested")
598        self.abort_requested = True
599      else:
600        action %= self.cmd_string_replacements
601        logging.info("Running command %s on %s", action, self.name)
602        result = self.Command(shlex.split(action)).strip()
603        if result:
604          for line in result.splitlines():
605            logging.info(line)
606
607  def Root(self):
608    self.Command(["root"])
609    time.sleep(Device.SECONDS_TO_SLEEP_DURING_ROOT)
610    self.Command(["wait-for-device"])
611    time.sleep(Device.SECONDS_TO_SLEEP_DURING_ROOT)
612
613  def Stop(self):
614    """Stops all file loggers attached to this device."""
615    for process_logger in self.process_loggers:
616      process_logger.StopLogging()
617    self.process_loggers = []
618
619  def Join(self):
620    for process_logger in self.process_loggers:
621      process_logger.join()
622    self.WaitForTasks()
623
624  def AsyncCommand(self, command, log_output=False):
625    self.work_queue.put(
626        lambda: self.__AsyncCommand(command, log_output=log_output))
627
628  def __AsyncCommand(self, command, log_output=False):
629    result = self.Command(command).strip()
630    if result and log_output:
631      for line in result.splitlines():
632        logging.info(line.decode("utf-8"))
633
634  def Command(self, command):
635    """Runs the provided command on this device."""
636    if command[0] in {"bugreport", "root", "wait-for-device", "shell",
637                      "logcat"}:
638      return subprocess.check_output(
639          ["adb", "-s", self.serial_number] + command)
640    elif command[0] == "DUMPSYS":
641      self.CaptureDumpsys(command[1])
642      return ""
643    elif command[0] == "pull":
644      try:
645        files = subprocess.check_output(
646            ["adb", "-s", self.serial_number, "shell", "ls", command[1]]
647        ).strip().splitlines()
648      except subprocess.CalledProcessError:
649        return ""
650      if len(files) == 1 and "No such file or directory" in files[0]:
651        return ""
652      for source_file in files:
653        destination = os.path.join(self.output_root,
654                                   command[2] % self.cmd_string_replacements)
655        stress_test_common.MakeDirsIfNeeded(os.path.dirname(destination))
656        logging.info("Moving %s from %s to %s", source_file, self.name,
657                     destination)
658        subprocess.check_output(["adb", "-s", self.serial_number, "pull",
659                                 source_file, destination])
660        if FLAGS.delete_data_dir:
661          subprocess.check_output([
662              "adb", "-s", self.serial_number, "shell", "rm", "-rf", source_file
663          ])
664        return ""
665    else:
666      return subprocess.check_output(command)
667
668  def TakeBugReport(self):
669    logging.info("Capturing bugreport on %s", self.name)
670    bugreport = os.path.join(self.output_root,
671                             "%s_bugreport_iteration_%06d.zip" %
672                             (self.name, self.iteration))
673    sdk = int(self.Command(
674        ["shell", "getprop", "ro.build.version.sdk"]).strip())
675    if sdk >= 24:  # SDK 24 = Android N
676      with open(bugreport, "w") as bugreport_fp:
677        bugreport_fp.write(self.Command(["bugreport", bugreport]))
678    else:
679      bugreport_txt = os.path.join(self.output_root,
680                                   "%s_bugreport_iteration_%06d.txt" %
681                                   (self.name, self.iteration))
682      with open(bugreport_txt, "w") as bugreport_fp:
683        bugreport_fp.write(self.Command(["bugreport"]))
684      self.Command(["zip", bugreport, bugreport_txt])
685
686    self.Command(["pull", "/data/anr/traces.txt",
687                  "%s_traces_iteration_%06d.txt" % (self.name, self.iteration)])
688    self.Command(["pull", "/data/anr/traces.txt.bugreport",
689                  "%s_traces_iteration_%06d.txt.bugreport" % (self.name,
690                                                              self.iteration)])
691    return bugreport
692
693  def CaptureDumpsys(self, dumpsys_unit):
694    logging.info("Taking dumpsys %s on %s", dumpsys_unit, self.name)
695    stress_test_common.MakeDirsIfNeeded(os.path.join(self.output_root,
696                                                     self.name))
697    with open(os.path.join(self.output_root, self.name,
698                           "%s_%06d.txt" % (dumpsys_unit, self.iteration)),
699              "w") as dumpsys_fp:
700      dumpsys_fp.write(self.Command(["shell", "dumpsys", dumpsys_unit]))
701
702  def WaitForTasks(self):
703    self.work_queue.join()
704
705  def GetSummaryLines(self):
706    lines = [
707        "Device {}".format(self.name),
708        _SUMMARY_LINES, _SUMMARY_COLUMNS, _SUMMARY_LINES
709    ]
710    for event, count in sorted(self.event_counter.items()):
711      lines.append(_SUMMARY_COL_FORMATT % (
712          event, count, self.iterations_since_event[event]))
713    lines.append(_SUMMARY_LINES)
714    return lines
715
716
717def RunAsyncCommand(devices, command):
718  """Helper function for running async commands on many devices."""
719  for device in devices:
720    device.AsyncCommand(command)
721  for device in devices:
722    device.WaitForTasks()
723
724
725class StressTest(object):
726  """Manages dispatching commands to devices/playing audio and events."""
727
728  def __init__(self, output_root, test_name):
729    self.output_root = output_root
730    self.devices = []
731    self.test_name = test_name
732    config = stress_test_pb2.StressTestConfig()
733    config_contents = stress_test_common.GetResourceContents(
734        os.path.join(stress_test_common.RESOURCE_DIR,
735                     "stress_test.%s.ascii_proto" % test_name))
736    text_format.Merge(config_contents, config)
737    self.events = config.event
738    self.setup_commands = config.setup_command
739    self.steps = config.step
740    self.audio_tempfiles = {}
741    self.uuid = str(uuid.uuid4())
742    self.expected_result = None
743    self.iteration = 0
744    if config.expected_result:
745      self.expected_result = config.expected_result[0]
746
747    # Place all the audio files into temp files.
748    for step in self.steps:
749      if step.audio_file and step.audio_file not in self.audio_tempfiles:
750        # We can't delete the temp file on windows, since it gets nuked too
751        # early.
752        audio_tempfile = tempfile.NamedTemporaryFile(
753            delete=(platform.system() != "Windows"),
754            dir="." if platform.system().startswith("CYGWIN") else None
755        )
756        if platform.system().startswith("CYGWIN"):
757          audio_tempfile.name = os.path.basename(audio_tempfile.name)
758        self.audio_tempfiles[step.audio_file] = audio_tempfile
759        if FLAGS.use_sox:
760          # Write out the raw PCM samples as a wave file.
761          audio_tempfile.write(
762              stress_test_common.GetResourceContents(step.audio_file))
763        else:
764          # Make a temporary wave file for playout if we can't use sox.
765          wavefile = wave.open(audio_tempfile, "wb")
766          if step.audio_file_sample_rate <= 0:
767            step.audio_file_sample_rate = 16000
768          wavefile.setframerate(step.audio_file_sample_rate)
769          if step.audio_file_num_channels <= 0:
770            step.audio_file_num_channels = 1
771          wavefile.setnchannels(step.audio_file_num_channels)
772          if not step.audio_file_format:
773            wavefile.setsampwidth(2)
774          elif step.audio_file_format == "s8":
775            wavefile.setsampwidth(1)
776          elif step.audio_file_format == "s16":
777            wavefile.setsampwidth(2)
778          elif step.audio_file_format == "s32":
779            wavefile.setsampwidth(4)
780          else:
781            raise RuntimeError(
782                "Unsupported wave file format for %s" % step.audio_file)
783          wavefile.writeframes(stress_test_common.GetResourceContents(
784              step.audio_file))
785          wavefile.close()
786        audio_tempfile.flush()
787
788        if platform.system() == "Windows":
789          audio_tempfile.close()
790
791    # Create all the devices that are attached to this machine.
792    for serial_number in self.GetActiveSerialNumbers():
793      self.devices.append(
794          Device(serial_number, output_root, self.events, self.expected_result))
795    if not self.devices:
796      raise app.UsageError("No devices connected")
797
798    self.devices.sort(key=lambda x: x.name)
799
800    # Make sure every device is done with their work for setup.
801    for device in self.devices:
802      device.WaitForTasks()
803
804    # Write out the info meta-data proto. Useful for doing analysis of the logs
805    # after the stress test has completed.
806    stress_test_info = stress_test_pb2.StressTestInfo()
807    stress_test_info.test_name = self.test_name
808    stress_test_info.test_description = config.description
809    stress_test_info.uuid = self.uuid
810    for device in self.devices:
811      device_pb = stress_test_info.device.add()
812      device_pb.device_type = device.device_type
813      device_pb.serial_number = device.serial_number
814
815    text_format.PrintMessage(stress_test_info, open(os.path.join(
816        self.output_root, "stress_test_info.ascii_proto"), "w"))
817
818  def GetActiveSerialNumbers(self):
819    serial_numbers = []
820    for line in sorted(
821        subprocess.check_output(["adb", "devices"]).splitlines()):
822      if line.endswith(b"device"):
823        serial_number = line.split()[0].strip()
824        if FLAGS.devices and serial_number not in FLAGS.devices:
825          continue
826        serial_numbers.append(serial_number.decode("utf-8"))
827    return serial_numbers
828
829  def Start(self):
830    logging.info("Waiting for devices to settle")
831    time.sleep(5)
832    # Make a copy of the device list, as we'll be modifying this actual list.
833    devices = list(self.devices)
834    dropped_devices = []
835
836    # If we have any setup commands, run them.
837    for command in self.setup_commands:
838      logging.info("Running command %s", command)
839      # Can't use the async command helper function since we need to get at
840      # the device cmd_string_replacements.
841      for device in devices:
842        device.AsyncCommand(
843            shlex.split(command % device.cmd_string_replacements),
844            log_output=True)
845      for device in devices:
846        device.WaitForTasks()
847
848    for device in devices:
849      device.StartLookingForEvents()
850      device.AsyncCommand(["shell", "log", "-t", "STRESS_TEST",
851                           "Starting {%s} TZ=$(getprop persist.sys.timezone) "
852                           "YEAR=$(date +%%Y)" % self.uuid], True)
853    self.iteration = 0
854    while True:
855      logging.info("Starting iteration %d", self.iteration)
856      # Perform all the actions specified in the test.
857      RunAsyncCommand(devices, [
858          "shell", "log", "-t", "STRESS_TEST",
859          "Performing iteration %d $(head -n 3 "
860          "/proc/timer_list | tail -n 1)" % self.iteration
861      ])
862
863      for step in self.steps:
864        if step.delay_before:
865          logging.info("Waiting for %.2f seconds", step.delay_before)
866          time.sleep(step.delay_before)
867
868        if step.audio_file:
869          logging.info("Playing %s", step.audio_file)
870          RunAsyncCommand(devices, ["shell", "log", "-t", "STRESS_TEST",
871                                    "Playing %s" % step.audio_file])
872
873          if FLAGS.use_sox:
874            subprocess.check_call(["sox", "-q",
875                                   self.audio_tempfiles[step.audio_file].name,
876                                   "-d"])
877          elif platform.system() == "Windows":
878            import winsound  # pylint:disable=g-import-not-at-top
879            winsound.PlaySound(self.audio_tempfiles[step.audio_file].name,
880                               winsound.SND_FILENAME | winsound.SND_NODEFAULT)
881          else:
882            raise app.RuntimeError("Unsupported platform for audio playback")
883
884        if step.command:
885          logging.info("Running command %s", step.command)
886          # Can't use the async command helper function since we need to get at
887          # the device cmd_string_replacements.
888          for device in devices:
889            device.AsyncCommand(
890                shlex.split(step.command % device.cmd_string_replacements),
891                log_output=True)
892          for device in devices:
893            device.WaitForTasks()
894
895        if step.delay_after:
896          logging.info("Waiting for %.2f seconds", step.delay_after)
897          time.sleep(step.delay_after)
898
899      RunAsyncCommand(devices, [
900          "shell", "log", "-t", "STRESS_TEST",
901          "Iteration %d complete $(head -n 3 "
902          "/proc/timer_list | tail -n 1)" % self.iteration
903      ])
904      self.iteration += 1
905
906      # TODO(somebody): Sometimes the logcat seems to get stuck and buffers for
907      # a bit. This throws off the event counts, so we should probably add some
908      # synchronization rules before we trigger any events.
909
910      # Go through each device, update the event counter, and see if we need to
911      # trigger any events.
912      devices_to_remove = []
913      abort_requested = False
914      active_devices = self.GetActiveSerialNumbers()
915      for device in devices:
916        if device.serial_number in active_devices:
917          device.ProcessEvents()
918        else:
919          logging.error("Dropped device %s", device.name)
920          SendNotificationEmail(
921              "Dropped device %s" % device.name,
922              "Device %s is not longer present in the system" % device.name)
923          dropped_devices.append(device)
924          devices_to_remove.append(device)
925
926      # Check to see if any of the dropped devices have come back. If yes, grab
927      # a bug report.
928      for device in dropped_devices:
929        if device.serial_number in active_devices:
930          logging.info("Device %s reappeared", device.name)
931          device.Root()
932          device.TakeBugReport()
933
934      dropped_devices = [d for d in dropped_devices
935                         if d.serial_number not in active_devices]
936
937      for device in devices:
938        device.WaitForTasks()
939        if device.remove_device:
940          devices_to_remove.append(device)
941        if device.abort_requested:
942          abort_requested = True
943
944      # Remove devices from our list of things to monitor if they've been marked
945      # for deletion.
946      if devices_to_remove:
947        for device in devices_to_remove:
948          device.Stop()
949        devices = [d for d in devices if d not in devices_to_remove]
950
951      # Print out the iteration summary.
952      if self.iteration % FLAGS.print_summary_every_n == 0:
953        for line in self.GetSummaryLines():
954          logging.info(line)
955
956      # See if we need to break out of the outer loop.
957      if abort_requested or not devices:
958        break
959      if FLAGS.num_iterations:
960        if self.iteration >= FLAGS.num_iterations:
961          logging.info("Completed full iteration : %d", self.iteration)
962          break
963    SendNotificationEmail(
964        "Stress test %s completed" % (FLAGS.test_name),
965        "\n".join(["Summary:"] + self.GetSummaryLines()))
966
967  def Stop(self):
968    logging.debug("Stopping devices")
969    for device in self.devices:
970      device.Stop()
971    for device in self.devices:
972      device.Join()
973
974  def GetSummaryLines(self):
975    lines = [
976        _SUMMARY_LINES,
977        "Conducted %d iterations out of %d" %
978        (self.iteration, FLAGS.num_iterations),
979        _SUMMARY_LINES
980    ]
981    for device in self.devices:
982      lines.extend(device.GetSummaryLines())
983    lines.append(_SUMMARY_LINES)
984    return lines
985
986
987def main(unused_argv):
988  # Check to make sure that there are no other instances of ADB running - if
989  # there are, print a warning and wait a bit for them to see it and decide if
990  # they want to keep running, knowing that logs may be invalid.
991  try:
992    if "adb" in subprocess.check_output(["ps", "-ale"]).decode("utf-8"):
993      print("It looks like there are other instances of adb running. If these "
994            "other instances are also cating log files, you will not be "
995            "capturing everything in this stress test (so logs will be "
996            "invalid).")
997      print("Continuing in 3...", end=" ")
998      sys.stdout.flush()
999      for i in [2, 1, 0]:
1000        time.sleep(1)
1001        if i:
1002          print("%d..." % i, end=" ")
1003        else:
1004          print("")
1005        sys.stdout.flush()
1006  except OSError:
1007    print("Unexpected error:", sys.exc_info()[0])
1008    if sys.platform.startswith("win"):
1009      pass
1010    else:
1011      raise
1012
1013  # Make the base output directory.
1014  output_root = os.path.join(FLAGS.output_root, "%s_%s" % (
1015      FLAGS.test_name, datetime.datetime.now().strftime("%Y%m%d_%H%M%S")))
1016  # output_root = os.path.join(FLAGS.output_root, FLAGS.test_name)
1017  stress_test_common.MakeDirsIfNeeded(output_root)
1018
1019  # Set up logging.
1020  formatter = logging.Formatter(
1021      "%(levelname)-1.1s %(asctime)s [%(threadName)-16.16s] %(message)s")
1022  root_logger = logging.getLogger()
1023  root_logger.setLevel(logging.INFO)
1024  root_logger.setLevel(logging.DEBUG)
1025
1026  file_handler = logging.FileHandler(os.path.join(output_root,
1027                                                  "stress_test.log"))
1028  file_handler.setFormatter(formatter)
1029  root_logger.addHandler(file_handler)
1030
1031  console_handler = logging.StreamHandler()
1032  console_handler.setFormatter(formatter)
1033  root_logger.addHandler(console_handler)
1034
1035  stress_test = StressTest(output_root, FLAGS.test_name)
1036  try:
1037    stress_test.Start()
1038  finally:
1039    logging.info("Stopping device logging threads")
1040    stress_test.Stop()
1041    for line in stress_test.GetSummaryLines():
1042      logging.info(line)
1043    if FLAGS.delete_data_dir:
1044      print("Deleting Data Dir")
1045      subprocess.check_output(["rm", "-r", "-f", output_root])
1046
1047
1048if __name__ == "__main__":
1049  app.run(main)
1050