1# Copyright 2017, The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Command Line Translator for atest.""" 16 17# pylint: disable=line-too-long 18# pylint: disable=too-many-lines 19 20from __future__ import print_function 21 22import json 23import logging 24import os 25import re 26import sys 27import time 28 29import atest_error 30import atest_utils 31import constants 32import test_finder_handler 33import test_mapping 34 35from metrics import metrics 36from metrics import metrics_utils 37from test_finders import module_finder 38 39FUZZY_FINDER = 'FUZZY' 40CACHE_FINDER = 'CACHE' 41 42# Pattern used to identify comments start with '//' or '#' in TEST_MAPPING. 43_COMMENTS_RE = re.compile(r'(?m)[\s\t]*(#|//).*|(\".*?\")') 44_COMMENTS = frozenset(['//', '#']) 45 46#pylint: disable=no-self-use 47class CLITranslator: 48 """ 49 CLITranslator class contains public method translate() and some private 50 helper methods. The atest tool can call the translate() method with a list 51 of strings, each string referencing a test to run. Translate() will 52 "translate" this list of test strings into a list of build targets and a 53 list of TradeFederation run commands. 54 55 Translation steps for a test string reference: 56 1. Narrow down the type of reference the test string could be, i.e. 57 whether it could be referencing a Module, Class, Package, etc. 58 2. Try to find the test files assuming the test string is one of these 59 types of reference. 60 3. If test files found, generate Build Targets and the Run Command. 61 """ 62 63 def __init__(self, module_info=None, print_cache_msg=True): 64 """CLITranslator constructor 65 66 Args: 67 module_info: ModuleInfo class that has cached module-info.json. 68 print_cache_msg: Boolean whether printing clear cache message or not. 69 True will print message while False won't print. 70 """ 71 self.mod_info = module_info 72 self.enable_file_patterns = False 73 self.msg = '' 74 if print_cache_msg: 75 self.msg = ('(Test info has been cached for speeding up the next ' 76 'run, if test info need to be updated, please add -c ' 77 'to clean the old cache.)') 78 79 # pylint: disable=too-many-locals 80 def _find_test_infos(self, test, tm_test_detail): 81 """Return set of TestInfos based on a given test. 82 83 Args: 84 test: A string representing test references. 85 tm_test_detail: The TestDetail of test configured in TEST_MAPPING 86 files. 87 88 Returns: 89 Set of TestInfos based on the given test. 90 """ 91 test_infos = set() 92 test_find_starts = time.time() 93 test_found = False 94 test_finders = [] 95 test_info_str = '' 96 find_test_err_msg = None 97 for finder in test_finder_handler.get_find_methods_for_test( 98 self.mod_info, test): 99 # For tests in TEST_MAPPING, find method is only related to 100 # test name, so the details can be set after test_info object 101 # is created. 102 try: 103 found_test_infos = finder.find_method( 104 finder.test_finder_instance, test) 105 except atest_error.TestDiscoveryException as e: 106 find_test_err_msg = e 107 if found_test_infos: 108 finder_info = finder.finder_info 109 for test_info in found_test_infos: 110 if tm_test_detail: 111 test_info.data[constants.TI_MODULE_ARG] = ( 112 tm_test_detail.options) 113 test_info.from_test_mapping = True 114 test_info.host = tm_test_detail.host 115 if finder_info != CACHE_FINDER: 116 test_info.test_finder = finder_info 117 test_infos.add(test_info) 118 test_found = True 119 print("Found '%s' as %s" % ( 120 atest_utils.colorize(test, constants.GREEN), 121 finder_info)) 122 if finder_info == CACHE_FINDER and test_infos: 123 test_finders.append(list(test_infos)[0].test_finder) 124 test_finders.append(finder_info) 125 test_info_str = ','.join([str(x) for x in found_test_infos]) 126 break 127 if not test_found: 128 f_results = self._fuzzy_search_and_msg(test, find_test_err_msg) 129 if f_results: 130 test_infos.update(f_results) 131 test_found = True 132 test_finders.append(FUZZY_FINDER) 133 metrics.FindTestFinishEvent( 134 duration=metrics_utils.convert_duration( 135 time.time() - test_find_starts), 136 success=test_found, 137 test_reference=test, 138 test_finders=test_finders, 139 test_info=test_info_str) 140 # Cache test_infos by default except running with TEST_MAPPING which may 141 # include customized flags and they are likely to mess up other 142 # non-test_mapping tests. 143 if test_infos and not tm_test_detail: 144 atest_utils.update_test_info_cache(test, test_infos) 145 print(self.msg) 146 return test_infos 147 148 def _fuzzy_search_and_msg(self, test, find_test_err_msg): 149 """ Fuzzy search and print message. 150 151 Args: 152 test: A string representing test references 153 find_test_err_msg: A string of find test error message. 154 155 Returns: 156 A list of TestInfos if found, otherwise None. 157 """ 158 print('No test found for: %s' % 159 atest_utils.colorize(test, constants.RED)) 160 # Currently we focus on guessing module names. Append names on 161 # results if more finders support fuzzy searching. 162 mod_finder = module_finder.ModuleFinder(self.mod_info) 163 results = mod_finder.get_fuzzy_searching_results(test) 164 if len(results) == 1 and self._confirm_running(results): 165 found_test_infos = mod_finder.find_test_by_module_name(results[0]) 166 # found_test_infos is a list with at most 1 element. 167 if found_test_infos: 168 return found_test_infos 169 elif len(results) > 1: 170 self._print_fuzzy_searching_results(results) 171 else: 172 print('No matching result for {0}.'.format(test)) 173 if find_test_err_msg: 174 print('%s\n' % (atest_utils.colorize( 175 find_test_err_msg, constants.MAGENTA))) 176 else: 177 print('(This can happen after a repo sync or if the test' 178 ' is new. Running with "%s" may resolve the issue.)' 179 '\n' % (atest_utils.colorize( 180 constants.REBUILD_MODULE_INFO_FLAG, 181 constants.RED))) 182 return None 183 184 def _get_test_infos(self, tests, test_mapping_test_details=None): 185 """Return set of TestInfos based on passed in tests. 186 187 Args: 188 tests: List of strings representing test references. 189 test_mapping_test_details: List of TestDetail for tests configured 190 in TEST_MAPPING files. 191 192 Returns: 193 Set of TestInfos based on the passed in tests. 194 """ 195 test_infos = set() 196 if not test_mapping_test_details: 197 test_mapping_test_details = [None] * len(tests) 198 for test, tm_test_detail in zip(tests, test_mapping_test_details): 199 found_test_infos = self._find_test_infos(test, tm_test_detail) 200 test_infos.update(found_test_infos) 201 return test_infos 202 203 def _confirm_running(self, results): 204 """Listen to an answer from raw input. 205 206 Args: 207 results: A list of results. 208 209 Returns: 210 True is the answer is affirmative. 211 """ 212 decision = input('Did you mean {0}? [Y/n] '.format( 213 atest_utils.colorize(results[0], constants.GREEN))) 214 return decision in constants.AFFIRMATIVES 215 216 def _print_fuzzy_searching_results(self, results): 217 """Print modules when fuzzy searching gives multiple results. 218 219 If the result is lengthy, just print the first 10 items only since we 220 have already given enough-accurate result. 221 222 Args: 223 results: A list of guessed testable module names. 224 225 """ 226 atest_utils.colorful_print('Did you mean the following modules?', 227 constants.WHITE) 228 for mod in results[:10]: 229 atest_utils.colorful_print(mod, constants.GREEN) 230 231 def filter_comments(self, test_mapping_file): 232 """Remove comments in TEST_MAPPING file to valid format. Only '//' and 233 '#' are regarded as comments. 234 235 Args: 236 test_mapping_file: Path to a TEST_MAPPING file. 237 238 Returns: 239 Valid json string without comments. 240 """ 241 def _replace(match): 242 """Replace comments if found matching the defined regular 243 expression. 244 245 Args: 246 match: The matched regex pattern 247 248 Returns: 249 "" if it matches _COMMENTS, otherwise original string. 250 """ 251 line = match.group(0).strip() 252 return "" if any(map(line.startswith, _COMMENTS)) else line 253 with open(test_mapping_file) as json_file: 254 return re.sub(_COMMENTS_RE, _replace, json_file.read()) 255 256 def _read_tests_in_test_mapping(self, test_mapping_file): 257 """Read tests from a TEST_MAPPING file. 258 259 Args: 260 test_mapping_file: Path to a TEST_MAPPING file. 261 262 Returns: 263 A tuple of (all_tests, imports), where 264 all_tests is a dictionary of all tests in the TEST_MAPPING file, 265 grouped by test group. 266 imports is a list of test_mapping.Import to include other test 267 mapping files. 268 """ 269 all_tests = {} 270 imports = [] 271 test_mapping_dict = json.loads(self.filter_comments(test_mapping_file)) 272 for test_group_name, test_list in test_mapping_dict.items(): 273 if test_group_name == constants.TEST_MAPPING_IMPORTS: 274 for import_detail in test_list: 275 imports.append( 276 test_mapping.Import(test_mapping_file, import_detail)) 277 else: 278 grouped_tests = all_tests.setdefault(test_group_name, set()) 279 tests = [] 280 for test in test_list: 281 if (self.enable_file_patterns and 282 not test_mapping.is_match_file_patterns( 283 test_mapping_file, test)): 284 continue 285 test_mod_info = self.mod_info.name_to_module_info.get( 286 test['name']) 287 if not test_mod_info: 288 print('WARNING: %s is not a valid build target and ' 289 'may not be discoverable by TreeHugger. If you ' 290 'want to specify a class or test-package, ' 291 'please set \'name\' to the test module and use ' 292 '\'options\' to specify the right tests via ' 293 '\'include-filter\'.\nNote: this can also occur ' 294 'if the test module is not built for your ' 295 'current lunch target.\n' % 296 atest_utils.colorize(test['name'], constants.RED)) 297 elif not any(x in test_mod_info['compatibility_suites'] for 298 x in constants.TEST_MAPPING_SUITES): 299 print('WARNING: Please add %s to either suite: %s for ' 300 'this TEST_MAPPING file to work with TreeHugger.' % 301 (atest_utils.colorize(test['name'], 302 constants.RED), 303 atest_utils.colorize(constants.TEST_MAPPING_SUITES, 304 constants.GREEN))) 305 tests.append(test_mapping.TestDetail(test)) 306 grouped_tests.update(tests) 307 return all_tests, imports 308 309 def _get_tests_from_test_mapping_files( 310 self, test_group, test_mapping_files): 311 """Get tests in the given test mapping files with the match group. 312 313 Args: 314 test_group: Group of tests to run. Default is set to `presubmit`. 315 test_mapping_files: A list of path of TEST_MAPPING files. 316 317 Returns: 318 A tuple of (tests, all_tests, imports), where, 319 tests is a set of tests (test_mapping.TestDetail) defined in 320 TEST_MAPPING file of the given path, and its parent directories, 321 with matching test_group. 322 all_tests is a dictionary of all tests in TEST_MAPPING files, 323 grouped by test group. 324 imports is a list of test_mapping.Import objects that contains the 325 details of where to import a TEST_MAPPING file. 326 """ 327 all_imports = [] 328 # Read and merge the tests in all TEST_MAPPING files. 329 merged_all_tests = {} 330 for test_mapping_file in test_mapping_files: 331 all_tests, imports = self._read_tests_in_test_mapping( 332 test_mapping_file) 333 all_imports.extend(imports) 334 for test_group_name, test_list in all_tests.items(): 335 grouped_tests = merged_all_tests.setdefault( 336 test_group_name, set()) 337 grouped_tests.update(test_list) 338 339 tests = set(merged_all_tests.get(test_group, [])) 340 if test_group == constants.TEST_GROUP_ALL: 341 for grouped_tests in merged_all_tests.values(): 342 tests.update(grouped_tests) 343 return tests, merged_all_tests, all_imports 344 345 # pylint: disable=too-many-arguments 346 # pylint: disable=too-many-locals 347 def _find_tests_by_test_mapping( 348 self, path='', test_group=constants.TEST_GROUP_PRESUBMIT, 349 file_name=constants.TEST_MAPPING, include_subdirs=False, 350 checked_files=None): 351 """Find tests defined in TEST_MAPPING in the given path. 352 353 Args: 354 path: A string of path in source. Default is set to '', i.e., CWD. 355 test_group: Group of tests to run. Default is set to `presubmit`. 356 file_name: Name of TEST_MAPPING file. Default is set to 357 `TEST_MAPPING`. The argument is added for testing purpose. 358 include_subdirs: True to include tests in TEST_MAPPING files in sub 359 directories. 360 checked_files: Paths of TEST_MAPPING files that have been checked. 361 362 Returns: 363 A tuple of (tests, all_tests), where, 364 tests is a set of tests (test_mapping.TestDetail) defined in 365 TEST_MAPPING file of the given path, and its parent directories, 366 with matching test_group. 367 all_tests is a dictionary of all tests in TEST_MAPPING files, 368 grouped by test group. 369 """ 370 path = os.path.realpath(path) 371 test_mapping_files = set() 372 all_tests = {} 373 test_mapping_file = os.path.join(path, file_name) 374 if os.path.exists(test_mapping_file): 375 test_mapping_files.add(test_mapping_file) 376 # Include all TEST_MAPPING files in sub-directories if `include_subdirs` 377 # is set to True. 378 if include_subdirs: 379 test_mapping_files.update(atest_utils.find_files(path, file_name)) 380 # Include all possible TEST_MAPPING files in parent directories. 381 root_dir = os.environ.get(constants.ANDROID_BUILD_TOP, os.sep) 382 while path not in (root_dir, os.sep): 383 path = os.path.dirname(path) 384 test_mapping_file = os.path.join(path, file_name) 385 if os.path.exists(test_mapping_file): 386 test_mapping_files.add(test_mapping_file) 387 388 if checked_files is None: 389 checked_files = set() 390 test_mapping_files.difference_update(checked_files) 391 checked_files.update(test_mapping_files) 392 if not test_mapping_files: 393 return test_mapping_files, all_tests 394 395 tests, all_tests, imports = self._get_tests_from_test_mapping_files( 396 test_group, test_mapping_files) 397 398 # Load TEST_MAPPING files from imports recursively. 399 if imports: 400 for import_detail in imports: 401 path = import_detail.get_path() 402 # (b/110166535 #19) Import path might not exist if a project is 403 # located in different directory in different branches. 404 if path is None: 405 logging.warning( 406 'Failed to import TEST_MAPPING at %s', import_detail) 407 continue 408 # Search for tests based on the imported search path. 409 import_tests, import_all_tests = ( 410 self._find_tests_by_test_mapping( 411 path, test_group, file_name, include_subdirs, 412 checked_files)) 413 # Merge the collections 414 tests.update(import_tests) 415 for group, grouped_tests in import_all_tests.items(): 416 all_tests.setdefault(group, set()).update(grouped_tests) 417 418 return tests, all_tests 419 420 def _gather_build_targets(self, test_infos): 421 targets = set() 422 for test_info in test_infos: 423 targets |= test_info.build_targets 424 return targets 425 426 def _get_test_mapping_tests(self, args): 427 """Find the tests in TEST_MAPPING files. 428 429 Args: 430 args: arg parsed object. 431 432 Returns: 433 A tuple of (test_names, test_details_list), where 434 test_names: a list of test name 435 test_details_list: a list of test_mapping.TestDetail objects for 436 the tests in TEST_MAPPING files with matching test group. 437 """ 438 # Pull out tests from test mapping 439 src_path = '' 440 test_group = constants.TEST_GROUP_PRESUBMIT 441 if args.tests: 442 if ':' in args.tests[0]: 443 src_path, test_group = args.tests[0].split(':') 444 else: 445 src_path = args.tests[0] 446 447 test_details, all_test_details = self._find_tests_by_test_mapping( 448 path=src_path, test_group=test_group, 449 include_subdirs=args.include_subdirs, checked_files=set()) 450 test_details_list = list(test_details) 451 if not test_details_list: 452 logging.warning( 453 'No tests of group `%s` found in TEST_MAPPING at %s or its ' 454 'parent directories.\nYou might be missing atest arguments,' 455 ' try `atest --help` for more information', 456 test_group, os.path.realpath('')) 457 if all_test_details: 458 tests = '' 459 for test_group, test_list in all_test_details.items(): 460 tests += '%s:\n' % test_group 461 for test_detail in sorted(test_list): 462 tests += '\t%s\n' % test_detail 463 logging.warning( 464 'All available tests in TEST_MAPPING files are:\n%s', 465 tests) 466 metrics_utils.send_exit_event(constants.EXIT_CODE_TEST_NOT_FOUND) 467 sys.exit(constants.EXIT_CODE_TEST_NOT_FOUND) 468 469 logging.debug( 470 'Test details:\n%s', 471 '\n'.join([str(detail) for detail in test_details_list])) 472 test_names = [detail.name for detail in test_details_list] 473 return test_names, test_details_list 474 475 476 def translate(self, args): 477 """Translate atest command line into build targets and run commands. 478 479 Args: 480 args: arg parsed object. 481 482 Returns: 483 A tuple with set of build_target strings and list of TestInfos. 484 """ 485 tests = args.tests 486 # Test details from TEST_MAPPING files 487 test_details_list = None 488 if atest_utils.is_test_mapping(args): 489 if args.enable_file_patterns: 490 self.enable_file_patterns = True 491 tests, test_details_list = self._get_test_mapping_tests(args) 492 atest_utils.colorful_print("\nFinding Tests...", constants.CYAN) 493 logging.debug('Finding Tests: %s', tests) 494 start = time.time() 495 test_infos = self._get_test_infos(tests, test_details_list) 496 logging.debug('Found tests in %ss', time.time() - start) 497 for test_info in test_infos: 498 logging.debug('%s\n', test_info) 499 build_targets = self._gather_build_targets(test_infos) 500 return build_targets, test_infos 501