1# Copyright 2017, The Android Open Source Project
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
7#     http://www.apache.org/licenses/LICENSE-2.0
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15#pylint: disable=too-many-lines
17Command Line Translator for atest.
20from __future__ import print_function
22import fnmatch
23import json
24import logging
25import os
26import re
27import sys
28import time
30import atest_error
31import atest_utils
32import constants
33import test_finder_handler
34import test_mapping
36from metrics import metrics
37from metrics import metrics_utils
38from test_finders import module_finder
43# Pattern used to identify comments start with '//' or '#' in TEST_MAPPING.
44_COMMENTS_RE = re.compile(r'(?m)[\s\t]*(#|//).*|(\".*?\")')
45_COMMENTS = frozenset(['//', '#'])
47#pylint: disable=no-self-use
48class CLITranslator(object):
49    """
50    CLITranslator class contains public method translate() and some private
51    helper methods. The atest tool can call the translate() method with a list
52    of strings, each string referencing a test to run. Translate() will
53    "translate" this list of test strings into a list of build targets and a
54    list of TradeFederation run commands.
56    Translation steps for a test string reference:
57        1. Narrow down the type of reference the test string could be, i.e.
58           whether it could be referencing a Module, Class, Package, etc.
59        2. Try to find the test files assuming the test string is one of these
60           types of reference.
61        3. If test files found, generate Build Targets and the Run Command.
62    """
64    def __init__(self, module_info=None, print_cache_msg=True):
65        """CLITranslator constructor
67        Args:
68            module_info: ModuleInfo class that has cached module-info.json.
69            print_cache_msg: Boolean whether printing clear cache message or not.
70                             True will print message while False won't print.
71        """
72        self.mod_info = module_info
73        self.enable_file_patterns = False
74        self.msg = ''
75        if print_cache_msg:
76            self.msg = ('(Test info has been cached for speeding up the next '
77                        'run, if test info need to be updated, please add -c '
78                        'to clean the old cache.)')
80    # pylint: disable=too-many-locals
81    def _find_test_infos(self, test, tm_test_detail):
82        """Return set of TestInfos based on a given test.
84        Args:
85            test: A string representing test references.
86            tm_test_detail: The TestDetail of test configured in TEST_MAPPING
87                files.
89        Returns:
90            Set of TestInfos based on the given test.
91        """
92        test_infos = set()
93        test_find_starts = time.time()
94        test_found = False
95        test_finders = []
96        test_info_str = ''
97        find_test_err_msg = None
98        for finder in test_finder_handler.get_find_methods_for_test(
99                self.mod_info, test):
100            # For tests in TEST_MAPPING, find method is only related to
101            # test name, so the details can be set after test_info object
102            # is created.
103            try:
104                found_test_infos = finder.find_method(
105                    finder.test_finder_instance, test)
106            except atest_error.TestDiscoveryException as e:
107                find_test_err_msg = e
108            if found_test_infos:
109                finder_info = finder.finder_info
110                for test_info in found_test_infos:
111                    if tm_test_detail:
112                        test_info.data[constants.TI_MODULE_ARG] = (
113                            tm_test_detail.options)
114                        test_info.from_test_mapping = True
115                        test_info.host = tm_test_detail.host
116                    if finder_info != CACHE_FINDER:
117                        test_info.test_finder = finder_info
118                    test_infos.add(test_info)
119                test_found = True
120                print("Found '%s' as %s" % (
121                    atest_utils.colorize(test, constants.GREEN),
122                    finder_info))
123                if finder_info == CACHE_FINDER and test_infos:
124                    test_finders.append(list(test_infos)[0].test_finder)
125                test_finders.append(finder_info)
126                test_info_str = ','.join([str(x) for x in found_test_infos])
127                break
128        if not test_found:
129            f_results = self._fuzzy_search_and_msg(test, find_test_err_msg)
130            if f_results:
131                test_infos.update(f_results)
132                test_found = True
133                test_finders.append(FUZZY_FINDER)
134        metrics.FindTestFinishEvent(
135            duration=metrics_utils.convert_duration(
136                time.time() - test_find_starts),
137            success=test_found,
138            test_reference=test,
139            test_finders=test_finders,
140            test_info=test_info_str)
141        # Cache test_infos by default except running with TEST_MAPPING which may
142        # include customized flags and they are likely to mess up other
143        # non-test_mapping tests.
144        if test_infos and not tm_test_detail:
145            atest_utils.update_test_info_cache(test, test_infos)
146            print(self.msg)
147        return test_infos
149    def _fuzzy_search_and_msg(self, test, find_test_err_msg):
150        """ Fuzzy search and print message.
152        Args:
153            test: A string representing test references
154            find_test_err_msg: A string of find test error message.
156        Returns:
157            A list of TestInfos if found, otherwise None.
158        """
159        print('No test found for: %s' %
160              atest_utils.colorize(test, constants.RED))
161        # Currently we focus on guessing module names. Append names on
162        # results if more finders support fuzzy searching.
163        mod_finder = module_finder.ModuleFinder(self.mod_info)
164        results = mod_finder.get_fuzzy_searching_results(test)
165        if len(results) == 1 and self._confirm_running(results):
166            found_test_infos = mod_finder.find_test_by_module_name(results[0])
167            # found_test_infos is a list with at most 1 element.
168            if found_test_infos:
169                return found_test_infos
170        elif len(results) > 1:
171            self._print_fuzzy_searching_results(results)
172        else:
173            print('No matching result for {0}.'.format(test))
174        if find_test_err_msg:
175            print('%s\n' % (atest_utils.colorize(
176                find_test_err_msg, constants.MAGENTA)))
177        else:
178            print('(This can happen after a repo sync or if the test'
179                  ' is new. Running: with "%s" may resolve the issue.)'
180                  '\n' % (atest_utils.colorize(
181                      constants.REBUILD_MODULE_INFO_FLAG,
182                      constants.RED)))
183        return None
185    def _get_test_infos(self, tests, test_mapping_test_details=None):
186        """Return set of TestInfos based on passed in tests.
188        Args:
189            tests: List of strings representing test references.
190            test_mapping_test_details: List of TestDetail for tests configured
191                in TEST_MAPPING files.
193        Returns:
194            Set of TestInfos based on the passed in tests.
195        """
196        test_infos = set()
197        if not test_mapping_test_details:
198            test_mapping_test_details = [None] * len(tests)
199        for test, tm_test_detail in zip(tests, test_mapping_test_details):
200            found_test_infos = self._find_test_infos(test, tm_test_detail)
201            test_infos.update(found_test_infos)
202        return test_infos
204    def _confirm_running(self, results):
205        """Listen to an answer from raw input.
207        Args:
208            results: A list of results.
210        Returns:
211            True is the answer is affirmative.
212        """
213        decision = raw_input('Did you mean {0}? [Y/n] '.format(
214            atest_utils.colorize(results[0], constants.GREEN)))
215        return decision in constants.AFFIRMATIVES
217    def _print_fuzzy_searching_results(self, results):
218        """Print modules when fuzzy searching gives multiple results.
220        If the result is lengthy, just print the first 10 items only since we
221        have already given enough-accurate result.
223        Args:
224            results: A list of guessed testable module names.
226        """
227        atest_utils.colorful_print('Did you mean the following modules?',
228                                   constants.WHITE)
229        for mod in results[:10]:
230            atest_utils.colorful_print(mod, constants.GREEN)
232    def filter_comments(self, test_mapping_file):
233        """Remove comments in TEST_MAPPING file to valid format. Only '//' and
234        '#' are regarded as comments.
236        Args:
237            test_mapping_file: Path to a TEST_MAPPING file.
239        Returns:
240            Valid json string without comments.
241        """
242        def _replace(match):
243            """Replace comments if found matching the defined regular expression.
245            Args:
246                match: The matched regex pattern
248            Returns:
249                "" if it matches _COMMENTS, otherwise original string.
250            """
251            line = match.group(0).strip()
252            return "" if any(map(line.startswith, _COMMENTS)) else line
253        with open(test_mapping_file) as json_file:
254            return re.sub(_COMMENTS_RE, _replace, json_file.read())
256    def _read_tests_in_test_mapping(self, test_mapping_file):
257        """Read tests from a TEST_MAPPING file.
259        Args:
260            test_mapping_file: Path to a TEST_MAPPING file.
262        Returns:
263            A tuple of (all_tests, imports), where
264            all_tests is a dictionary of all tests in the TEST_MAPPING file,
265                grouped by test group.
266            imports is a list of test_mapping.Import to include other test
267                mapping files.
268        """
269        all_tests = {}
270        imports = []
271        test_mapping_dict = json.loads(self.filter_comments(test_mapping_file))
272        for test_group_name, test_list in test_mapping_dict.items():
273            if test_group_name == constants.TEST_MAPPING_IMPORTS:
274                for import_detail in test_list:
275                    imports.append(
276                        test_mapping.Import(test_mapping_file, import_detail))
277            else:
278                grouped_tests = all_tests.setdefault(test_group_name, set())
279                tests = []
280                for test in test_list:
281                    if (self.enable_file_patterns and
282                            not test_mapping.is_match_file_patterns(
283                                test_mapping_file, test)):
284                        continue
285                    test_mod_info = self.mod_info.name_to_module_info.get(
286                        test['name'])
287                    if not test_mod_info:
288                        print('WARNING: %s is not a valid build target and '
289                              'may not be discoverable by TreeHugger. If you '
290                              'want to specify a class or test-package, '
291                              'please set \'name\' to the test module and use '
292                              '\'options\' to specify the right tests via '
293                              '\'include-filter\'.\nNote: this can also occur '
294                              'if the test module is not built for your '
295                              'current lunch target.\n' %
296                              atest_utils.colorize(test['name'], constants.RED))
297                    elif not any(x in test_mod_info['compatibility_suites'] for
298                                 x in constants.TEST_MAPPING_SUITES):
299                        print('WARNING: Please add %s to either suite: %s for '
300                              'this TEST_MAPPING file to work with TreeHugger.' %
301                              (atest_utils.colorize(test['name'],
302                                                    constants.RED),
303                               atest_utils.colorize(constants.TEST_MAPPING_SUITES,
304                                                    constants.GREEN)))
305                    tests.append(test_mapping.TestDetail(test))
306                grouped_tests.update(tests)
307        return all_tests, imports
309    def _find_files(self, path, file_name=constants.TEST_MAPPING):
310        """Find all files with given name under the given path.
312        Args:
313            path: A string of path in source.
315        Returns:
316            A list of paths of the files with the matching name under the given
317            path.
318        """
319        test_mapping_files = []
320        for root, _, filenames in os.walk(path):
321            for filename in fnmatch.filter(filenames, file_name):
322                test_mapping_files.append(os.path.join(root, filename))
323        return test_mapping_files
325    def _get_tests_from_test_mapping_files(
326            self, test_group, test_mapping_files):
327        """Get tests in the given test mapping files with the match group.
329        Args:
330            test_group: Group of tests to run. Default is set to `presubmit`.
331            test_mapping_files: A list of path of TEST_MAPPING files.
333        Returns:
334            A tuple of (tests, all_tests, imports), where,
335            tests is a set of tests (test_mapping.TestDetail) defined in
336            TEST_MAPPING file of the given path, and its parent directories,
337            with matching test_group.
338            all_tests is a dictionary of all tests in TEST_MAPPING files,
339            grouped by test group.
340            imports is a list of test_mapping.Import objects that contains the
341            details of where to import a TEST_MAPPING file.
342        """
343        all_imports = []
344        # Read and merge the tests in all TEST_MAPPING files.
345        merged_all_tests = {}
346        for test_mapping_file in test_mapping_files:
347            all_tests, imports = self._read_tests_in_test_mapping(
348                test_mapping_file)
349            all_imports.extend(imports)
350            for test_group_name, test_list in all_tests.items():
351                grouped_tests = merged_all_tests.setdefault(
352                    test_group_name, set())
353                grouped_tests.update(test_list)
355        tests = set(merged_all_tests.get(test_group, []))
356        if test_group == constants.TEST_GROUP_ALL:
357            for grouped_tests in merged_all_tests.values():
358                tests.update(grouped_tests)
359        return tests, merged_all_tests, all_imports
361    # pylint: disable=too-many-arguments
362    # pylint: disable=too-many-locals
363    def _find_tests_by_test_mapping(
364            self, path='', test_group=constants.TEST_GROUP_PRESUBMIT,
365            file_name=constants.TEST_MAPPING, include_subdirs=False,
366            checked_files=None):
367        """Find tests defined in TEST_MAPPING in the given path.
369        Args:
370            path: A string of path in source. Default is set to '', i.e., CWD.
371            test_group: Group of tests to run. Default is set to `presubmit`.
372            file_name: Name of TEST_MAPPING file. Default is set to
373                `TEST_MAPPING`. The argument is added for testing purpose.
374            include_subdirs: True to include tests in TEST_MAPPING files in sub
375                directories.
376            checked_files: Paths of TEST_MAPPING files that have been checked.
378        Returns:
379            A tuple of (tests, all_tests), where,
380            tests is a set of tests (test_mapping.TestDetail) defined in
381            TEST_MAPPING file of the given path, and its parent directories,
382            with matching test_group.
383            all_tests is a dictionary of all tests in TEST_MAPPING files,
384            grouped by test group.
385        """
386        path = os.path.realpath(path)
387        test_mapping_files = set()
388        all_tests = {}
389        test_mapping_file = os.path.join(path, file_name)
390        if os.path.exists(test_mapping_file):
391            test_mapping_files.add(test_mapping_file)
392        # Include all TEST_MAPPING files in sub-directories if `include_subdirs`
393        # is set to True.
394        if include_subdirs:
395            test_mapping_files.update(self._find_files(path, file_name))
396        # Include all possible TEST_MAPPING files in parent directories.
397        root_dir = os.environ.get(constants.ANDROID_BUILD_TOP, os.sep)
398        while path != root_dir and path != os.sep:
399            path = os.path.dirname(path)
400            test_mapping_file = os.path.join(path, file_name)
401            if os.path.exists(test_mapping_file):
402                test_mapping_files.add(test_mapping_file)
404        if checked_files is None:
405            checked_files = set()
406        test_mapping_files.difference_update(checked_files)
407        checked_files.update(test_mapping_files)
408        if not test_mapping_files:
409            return test_mapping_files, all_tests
411        tests, all_tests, imports = self._get_tests_from_test_mapping_files(
412            test_group, test_mapping_files)
414        # Load TEST_MAPPING files from imports recursively.
415        if imports:
416            for import_detail in imports:
417                path = import_detail.get_path()
418                # (b/110166535 #19) Import path might not exist if a project is
419                # located in different directory in different branches.
420                if path is None:
421                    logging.warn(
422                        'Failed to import TEST_MAPPING at %s', import_detail)
423                    continue
424                # Search for tests based on the imported search path.
425                import_tests, import_all_tests = (
426                    self._find_tests_by_test_mapping(
427                        path, test_group, file_name, include_subdirs,
428                        checked_files))
429                # Merge the collections
430                tests.update(import_tests)
431                for group, grouped_tests in import_all_tests.items():
432                    all_tests.setdefault(group, set()).update(grouped_tests)
434        return tests, all_tests
436    def _gather_build_targets(self, test_infos):
437        targets = set()
438        for test_info in test_infos:
439            targets |= test_info.build_targets
440        return targets
442    def _get_test_mapping_tests(self, args):
443        """Find the tests in TEST_MAPPING files.
445        Args:
446            args: arg parsed object.
448        Returns:
449            A tuple of (test_names, test_details_list), where
450            test_names: a list of test name
451            test_details_list: a list of test_mapping.TestDetail objects for
452                the tests in TEST_MAPPING files with matching test group.
453        """
454        # Pull out tests from test mapping
455        src_path = ''
456        test_group = constants.TEST_GROUP_PRESUBMIT
457        if args.tests:
458            if ':' in args.tests[0]:
459                src_path, test_group = args.tests[0].split(':')
460            else:
461                src_path = args.tests[0]
463        test_details, all_test_details = self._find_tests_by_test_mapping(
464            path=src_path, test_group=test_group,
465            include_subdirs=args.include_subdirs, checked_files=set())
466        test_details_list = list(test_details)
467        if not test_details_list:
468            logging.warn(
469                'No tests of group `%s` found in TEST_MAPPING at %s or its '
470                'parent directories.\nYou might be missing atest arguments,'
471                ' try `atest --help` for more information',
472                test_group, os.path.realpath(''))
473            if all_test_details:
474                tests = ''
475                for test_group, test_list in all_test_details.items():
476                    tests += '%s:\n' % test_group
477                    for test_detail in sorted(test_list):
478                        tests += '\t%s\n' % test_detail
479                logging.warn(
480                    'All available tests in TEST_MAPPING files are:\n%s',
481                    tests)
482            metrics_utils.send_exit_event(constants.EXIT_CODE_TEST_NOT_FOUND)
483            sys.exit(constants.EXIT_CODE_TEST_NOT_FOUND)
485        logging.debug(
486            'Test details:\n%s',
487            '\n'.join([str(detail) for detail in test_details_list]))
488        test_names = [detail.name for detail in test_details_list]
489        return test_names, test_details_list
492    def translate(self, args):
493        """Translate atest command line into build targets and run commands.
495        Args:
496            args: arg parsed object.
498        Returns:
499            A tuple with set of build_target strings and list of TestInfos.
500        """
501        tests = args.tests
502        # Test details from TEST_MAPPING files
503        test_details_list = None
504        if atest_utils.is_test_mapping(args):
505            if args.enable_file_patterns:
506                self.enable_file_patterns = True
507            tests, test_details_list = self._get_test_mapping_tests(args)
508        atest_utils.colorful_print("\nFinding Tests...", constants.CYAN)
509        logging.debug('Finding Tests: %s', tests)
510        start = time.time()
511        test_infos = self._get_test_infos(tests, test_details_list)
512        logging.debug('Found tests in %ss', time.time() - start)
513        for test_info in test_infos:
514            logging.debug('%s\n', test_info)
515        build_targets = self._gather_build_targets(test_infos)
516        return build_targets, test_infos