1#!/usr/bin/env python3
2#
3#   Copyright 2016 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16import itertools
17
18from acts.metrics.loggers.usage_metadata_logger import UsageMetadataPublisher
19from future import standard_library
20
21standard_library.install_aliases()
22
23import importlib
24import inspect
25import fnmatch
26import json
27import logging
28import os
29import pkgutil
30import sys
31
32from acts import base_test
33from acts import keys
34from acts import logger
35from acts import records
36from acts import signals
37from acts import utils
38from acts import error
39
40from mobly.records import ExceptionRecord
41
42
43def _find_test_class():
44    """Finds the test class in a test script.
45
46    Walk through module members and find the subclass of BaseTestClass. Only
47    one subclass is allowed in a test script.
48
49    Returns:
50        The test class in the test module.
51    """
52    test_classes = []
53    main_module_members = sys.modules['__main__']
54    for _, module_member in main_module_members.__dict__.items():
55        if inspect.isclass(module_member):
56            if issubclass(module_member, base_test.BaseTestClass):
57                test_classes.append(module_member)
58    if len(test_classes) != 1:
59        logging.error('Expected 1 test class per file, found %s.',
60                      [t.__name__ for t in test_classes])
61        sys.exit(1)
62    return test_classes[0]
63
64
65def execute_one_test_class(test_class, test_config, test_identifier):
66    """Executes one specific test class.
67
68    You could call this function in your own cli test entry point if you choose
69    not to use act.py.
70
71    Args:
72        test_class: A subclass of acts.base_test.BaseTestClass that has the test
73                    logic to be executed.
74        test_config: A dict representing one set of configs for a test run.
75        test_identifier: A list of tuples specifying which test cases to run in
76                         the test class.
77
78    Returns:
79        True if all tests passed without any error, False otherwise.
80
81    Raises:
82        If signals.TestAbortAll is raised by a test run, pipe it through.
83    """
84    tr = TestRunner(test_config, test_identifier)
85    try:
86        tr.run(test_class)
87        return tr.results.is_all_pass
88    except signals.TestAbortAll:
89        raise
90    except:
91        logging.exception('Exception when executing %s.', tr.testbed_name)
92    finally:
93        tr.stop()
94
95
96class TestRunner(object):
97    """The class that instantiates test classes, executes test cases, and
98    report results.
99
100    Attributes:
101        test_run_config: The TestRunConfig object specifying what tests to run.
102        id: A string that is the unique identifier of this test run.
103        log: The logger object used throughout this test run.
104        test_classes: A dictionary where we can look up the test classes by name
105            to instantiate. Supports unix shell style wildcards.
106        run_list: A list of tuples specifying what tests to run.
107        results: The test result object used to record the results of this test
108            run.
109        running: A boolean signifies whether this test run is ongoing or not.
110    """
111    def __init__(self, test_configs, run_list):
112        self.test_run_config = test_configs
113        self.testbed_name = self.test_run_config.testbed_name
114        start_time = logger.get_log_file_timestamp()
115        self.id = '{}@{}'.format(self.testbed_name, start_time)
116        self.test_run_config.log_path = os.path.abspath(
117            os.path.join(self.test_run_config.log_path, self.testbed_name,
118                         start_time))
119        logger.setup_test_logger(self.log_path, self.testbed_name)
120        self.log = logging.getLogger()
121        self.test_run_config.summary_writer = records.TestSummaryWriter(
122            os.path.join(self.log_path, records.OUTPUT_FILE_SUMMARY))
123        self.run_list = run_list
124        self.dump_config()
125        self.results = records.TestResult()
126        self.running = False
127        self.usage_publisher = UsageMetadataPublisher()
128
129    @property
130    def log_path(self):
131        """The path to write logs of this test run to."""
132        return self.test_run_config.log_path
133
134    @property
135    def summary_writer(self):
136        """The object responsible for writing summary and results data."""
137        return self.test_run_config.summary_writer
138
139    def import_test_modules(self, test_paths):
140        """Imports test classes from test scripts.
141
142        1. Locate all .py files under test paths.
143        2. Import the .py files as modules.
144        3. Find the module members that are test classes.
145        4. Categorize the test classes by name.
146
147        Args:
148            test_paths: A list of directory paths where the test files reside.
149
150        Returns:
151            A dictionary where keys are test class name strings, values are
152            actual test classes that can be instantiated.
153        """
154        def is_testfile_name(name, ext):
155            if ext == '.py':
156                if name.endswith('Test') or name.endswith('_test'):
157                    return True
158            return False
159
160        file_list = utils.find_files(test_paths, is_testfile_name)
161        test_classes = {}
162        for path, name, _ in file_list:
163            sys.path.append(path)
164            try:
165                with utils.SuppressLogOutput(
166                        log_levels=[logging.INFO, logging.ERROR]):
167                    module = importlib.import_module(name)
168            except Exception as e:
169                logging.debug('Failed to import %s: %s', path, str(e))
170                for test_cls_name, _ in self.run_list:
171                    alt_name = name.replace('_', '').lower()
172                    alt_cls_name = test_cls_name.lower()
173                    # Only block if a test class on the run list causes an
174                    # import error. We need to check against both naming
175                    # conventions: AaaBbb and aaa_bbb.
176                    if name == test_cls_name or alt_name == alt_cls_name:
177                        msg = ('Encountered error importing test class %s, '
178                               'abort.') % test_cls_name
179                        # This exception is logged here to help with debugging
180                        # under py2, because "raise X from Y" syntax is only
181                        # supported under py3.
182                        self.log.exception(msg)
183                        raise ValueError(msg)
184                continue
185            for member_name in dir(module):
186                if not member_name.startswith('__'):
187                    if member_name.endswith('Test'):
188                        test_class = getattr(module, member_name)
189                        if inspect.isclass(test_class):
190                            test_classes[member_name] = test_class
191        return test_classes
192
193    def set_test_util_logs(self, module=None):
194        """Sets the log object to each test util module.
195
196        This recursively include all modules under acts.test_utils and sets the
197        main test logger to each module.
198
199        Args:
200            module: A module under acts.test_utils.
201        """
202        # Initial condition of recursion.
203        if not module:
204            module = importlib.import_module('acts.test_utils')
205        # Somehow pkgutil.walk_packages is not working for me.
206        # Using iter_modules for now.
207        pkg_iter = pkgutil.iter_modules(module.__path__, module.__name__ + '.')
208        for _, module_name, ispkg in pkg_iter:
209            m = importlib.import_module(module_name)
210            if ispkg:
211                self.set_test_util_logs(module=m)
212            else:
213                self.log.debug('Setting logger to test util module %s',
214                               module_name)
215                setattr(m, 'log', self.log)
216
217    def run_test_class(self, test_cls_name, test_cases=None):
218        """Instantiates and executes a test class.
219
220        If test_cases is None, the test cases listed by self.tests will be
221        executed instead. If self.tests is empty as well, no test case in this
222        test class will be executed.
223
224        Args:
225            test_cls_name: Name of the test class to execute.
226            test_cases: List of test case names to execute within the class.
227
228        Raises:
229            ValueError is raised if the requested test class could not be found
230            in the test_paths directories.
231        """
232        matches = fnmatch.filter(self.test_classes.keys(), test_cls_name)
233        if not matches:
234            self.log.info(
235                'Cannot find test class %s or classes matching pattern, '
236                'skipping for now.' % test_cls_name)
237            record = records.TestResultRecord('*all*', test_cls_name)
238            record.test_skip(signals.TestSkip('Test class does not exist.'))
239            self.results.add_record(record)
240            return
241        if matches != [test_cls_name]:
242            self.log.info('Found classes matching pattern %s: %s',
243                          test_cls_name, matches)
244
245        for test_cls_name_match in matches:
246            test_cls = self.test_classes[test_cls_name_match]
247            test_cls_instance = test_cls(self.test_run_config)
248            try:
249                cls_result = test_cls_instance.run(test_cases)
250                self.results += cls_result
251            except signals.TestAbortAll as e:
252                self.results += e.results
253                raise e
254
255    def run(self, test_class=None):
256        """Executes test cases.
257
258        This will instantiate controller and test classes, and execute test
259        classes. This can be called multiple times to repeatedly execute the
260        requested test cases.
261
262        A call to TestRunner.stop should eventually happen to conclude the life
263        cycle of a TestRunner.
264
265        Args:
266            test_class: The python module of a test class. If provided, run this
267                        class; otherwise, import modules in under test_paths
268                        based on run_list.
269        """
270        if not self.running:
271            self.running = True
272
273        if test_class:
274            self.test_classes = {test_class.__name__: test_class}
275        else:
276            t_paths = self.test_run_config.controller_configs[
277                keys.Config.key_test_paths.value]
278            self.test_classes = self.import_test_modules(t_paths)
279        self.log.debug('Executing run list %s.', self.run_list)
280        for test_cls_name, test_case_names in self.run_list:
281            if not self.running:
282                break
283
284            if test_case_names:
285                self.log.debug('Executing test cases %s in test class %s.',
286                               test_case_names, test_cls_name)
287            else:
288                self.log.debug('Executing test class %s', test_cls_name)
289
290            try:
291                self.run_test_class(test_cls_name, test_case_names)
292            except error.ActsError as e:
293                self.results.error.append(ExceptionRecord(e))
294                self.log.error('Test Runner Error: %s' % e.details)
295            except signals.TestAbortAll as e:
296                self.log.warning(
297                    'Abort all subsequent test classes. Reason: %s', e)
298                raise
299
300    def stop(self):
301        """Releases resources from test run. Should always be called after
302        TestRunner.run finishes.
303
304        This function concludes a test run and writes out a test report.
305        """
306        if self.running:
307            msg = '\nSummary for test run %s: %s\n' % (
308                self.id, self.results.summary_str())
309            self._write_results_to_file()
310            self.log.info(msg.strip())
311            logger.kill_test_logger(self.log)
312            self.usage_publisher.publish()
313            self.running = False
314
315    def _write_results_to_file(self):
316        """Writes test results to file(s) in a serializable format."""
317        # Old JSON format
318        path = os.path.join(self.log_path, 'test_run_summary.json')
319        with open(path, 'w') as f:
320            f.write(self.results.json_str())
321        # New YAML format
322        self.summary_writer.dump(self.results.summary_dict(),
323                                 records.TestSummaryEntryType.SUMMARY)
324
325    def dump_config(self):
326        """Writes the test config to a JSON file under self.log_path"""
327        config_path = os.path.join(self.log_path, 'test_configs.json')
328        with open(config_path, 'a') as f:
329            json.dump(dict(
330                itertools.chain(
331                    self.test_run_config.user_params.items(),
332                    self.test_run_config.controller_configs.items())),
333                      f,
334                      skipkeys=True,
335                      indent=4)
336
337    def write_test_campaign(self):
338        """Log test campaign file."""
339        path = os.path.join(self.log_path, 'test_campaign.log')
340        with open(path, 'w') as f:
341            for test_class, test_cases in self.run_list:
342                f.write('%s:\n%s' % (test_class, ',\n'.join(test_cases)))
343                f.write('\n\n')
344