1#!/usr/bin/env python3 2# 3# Copyright 2018, The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17"""Unittests for atest_utils.""" 18 19# pylint: disable=line-too-long 20 21import hashlib 22import os 23import subprocess 24import sys 25import tempfile 26import unittest 27 28from io import StringIO 29from unittest import mock 30 31import atest_error 32import atest_utils 33import constants 34import unittest_utils 35import unittest_constants 36 37from test_finders import test_info 38 39 40TEST_MODULE_NAME_A = 'ModuleNameA' 41TEST_RUNNER_A = 'FakeTestRunnerA' 42TEST_BUILD_TARGET_A = set(['bt1', 'bt2']) 43TEST_DATA_A = {'test_data_a_1': 'a1', 44 'test_data_a_2': 'a2'} 45TEST_SUITE_A = 'FakeSuiteA' 46TEST_MODULE_CLASS_A = 'FAKE_MODULE_CLASS_A' 47TEST_INSTALL_LOC_A = set(['host', 'device']) 48TEST_FINDER_A = 'MODULE' 49TEST_INFO_A = test_info.TestInfo(TEST_MODULE_NAME_A, TEST_RUNNER_A, 50 TEST_BUILD_TARGET_A, TEST_DATA_A, 51 TEST_SUITE_A, TEST_MODULE_CLASS_A, 52 TEST_INSTALL_LOC_A) 53TEST_INFO_A.test_finder = TEST_FINDER_A 54TEST_ZIP_DATA_DIR = 'zip_files' 55TEST_SINGLE_ZIP_NAME = 'single_file.zip' 56TEST_MULTI_ZIP_NAME = 'multi_file.zip' 57 58#pylint: disable=protected-access 59class AtestUtilsUnittests(unittest.TestCase): 60 """Unit tests for atest_utils.py""" 61 62 def test_capture_fail_section_has_fail_section(self): 63 """Test capture_fail_section when has fail section.""" 64 test_list = ['AAAAAA', 'FAILED: Error1', '^\n', 'Error2\n', 65 '[ 6% 191/2997] BBBBBB\n', 'CCCCC', 66 '[ 20% 322/2997] DDDDDD\n', 'EEEEE'] 67 want_list = ['FAILED: Error1', '^\n', 'Error2\n'] 68 self.assertEqual(want_list, 69 atest_utils._capture_fail_section(test_list)) 70 71 def test_capture_fail_section_no_fail_section(self): 72 """Test capture_fail_section when no fail section.""" 73 test_list = ['[ 6% 191/2997] XXXXX', 'YYYYY: ZZZZZ'] 74 want_list = [] 75 self.assertEqual(want_list, 76 atest_utils._capture_fail_section(test_list)) 77 78 def test_is_test_mapping(self): 79 """Test method is_test_mapping.""" 80 tm_option_attributes = [ 81 'test_mapping', 82 'include_subdirs' 83 ] 84 for attr_to_test in tm_option_attributes: 85 args = mock.Mock() 86 for attr in tm_option_attributes: 87 setattr(args, attr, attr == attr_to_test) 88 args.tests = [] 89 self.assertTrue( 90 atest_utils.is_test_mapping(args), 91 'Failed to validate option %s' % attr_to_test) 92 93 args = mock.Mock() 94 for attr in tm_option_attributes: 95 setattr(args, attr, False) 96 args.tests = [':group_name'] 97 self.assertTrue(atest_utils.is_test_mapping(args)) 98 99 args = mock.Mock() 100 for attr in tm_option_attributes: 101 setattr(args, attr, False) 102 args.tests = [':test1', 'test2'] 103 self.assertFalse(atest_utils.is_test_mapping(args)) 104 105 args = mock.Mock() 106 for attr in tm_option_attributes: 107 setattr(args, attr, False) 108 args.tests = ['test2'] 109 self.assertFalse(atest_utils.is_test_mapping(args)) 110 111 @mock.patch('curses.tigetnum') 112 def test_has_colors(self, mock_curses_tigetnum): 113 """Test method _has_colors.""" 114 # stream is file I/O 115 stream = open('/tmp/test_has_colors.txt', 'wb') 116 self.assertFalse(atest_utils._has_colors(stream)) 117 stream.close() 118 119 # stream is not a tty(terminal). 120 stream = mock.Mock() 121 stream.isatty.return_value = False 122 self.assertFalse(atest_utils._has_colors(stream)) 123 124 # stream is a tty(terminal) and colors < 2. 125 stream = mock.Mock() 126 stream.isatty.return_value = True 127 mock_curses_tigetnum.return_value = 1 128 self.assertFalse(atest_utils._has_colors(stream)) 129 130 # stream is a tty(terminal) and colors > 2. 131 stream = mock.Mock() 132 stream.isatty.return_value = True 133 mock_curses_tigetnum.return_value = 256 134 self.assertTrue(atest_utils._has_colors(stream)) 135 136 137 @mock.patch('atest_utils._has_colors') 138 def test_colorize(self, mock_has_colors): 139 """Test method colorize.""" 140 original_str = "test string" 141 green_no = 2 142 143 # _has_colors() return False. 144 mock_has_colors.return_value = False 145 converted_str = atest_utils.colorize(original_str, green_no, 146 highlight=True) 147 self.assertEqual(original_str, converted_str) 148 149 # Green with highlight. 150 mock_has_colors.return_value = True 151 converted_str = atest_utils.colorize(original_str, green_no, 152 highlight=True) 153 green_highlight_string = '\x1b[1;42m%s\x1b[0m' % original_str 154 self.assertEqual(green_highlight_string, converted_str) 155 156 # Green, no highlight. 157 mock_has_colors.return_value = True 158 converted_str = atest_utils.colorize(original_str, green_no, 159 highlight=False) 160 green_no_highlight_string = '\x1b[1;32m%s\x1b[0m' % original_str 161 self.assertEqual(green_no_highlight_string, converted_str) 162 163 164 @mock.patch('atest_utils._has_colors') 165 def test_colorful_print(self, mock_has_colors): 166 """Test method colorful_print.""" 167 testing_str = "color_print_test" 168 green_no = 2 169 170 # _has_colors() return False. 171 mock_has_colors.return_value = False 172 capture_output = StringIO() 173 sys.stdout = capture_output 174 atest_utils.colorful_print(testing_str, green_no, highlight=True, 175 auto_wrap=False) 176 sys.stdout = sys.__stdout__ 177 uncolored_string = testing_str 178 self.assertEqual(capture_output.getvalue(), uncolored_string) 179 180 # Green with highlight, but no wrap. 181 mock_has_colors.return_value = True 182 capture_output = StringIO() 183 sys.stdout = capture_output 184 atest_utils.colorful_print(testing_str, green_no, highlight=True, 185 auto_wrap=False) 186 sys.stdout = sys.__stdout__ 187 green_highlight_no_wrap_string = '\x1b[1;42m%s\x1b[0m' % testing_str 188 self.assertEqual(capture_output.getvalue(), 189 green_highlight_no_wrap_string) 190 191 # Green, no highlight, no wrap. 192 mock_has_colors.return_value = True 193 capture_output = StringIO() 194 sys.stdout = capture_output 195 atest_utils.colorful_print(testing_str, green_no, highlight=False, 196 auto_wrap=False) 197 sys.stdout = sys.__stdout__ 198 green_no_high_no_wrap_string = '\x1b[1;32m%s\x1b[0m' % testing_str 199 self.assertEqual(capture_output.getvalue(), 200 green_no_high_no_wrap_string) 201 202 # Green with highlight and wrap. 203 mock_has_colors.return_value = True 204 capture_output = StringIO() 205 sys.stdout = capture_output 206 atest_utils.colorful_print(testing_str, green_no, highlight=True, 207 auto_wrap=True) 208 sys.stdout = sys.__stdout__ 209 green_highlight_wrap_string = '\x1b[1;42m%s\x1b[0m\n' % testing_str 210 self.assertEqual(capture_output.getvalue(), green_highlight_wrap_string) 211 212 # Green with wrap, but no highlight. 213 mock_has_colors.return_value = True 214 capture_output = StringIO() 215 sys.stdout = capture_output 216 atest_utils.colorful_print(testing_str, green_no, highlight=False, 217 auto_wrap=True) 218 sys.stdout = sys.__stdout__ 219 green_wrap_no_highlight_string = '\x1b[1;32m%s\x1b[0m\n' % testing_str 220 self.assertEqual(capture_output.getvalue(), 221 green_wrap_no_highlight_string) 222 223 @mock.patch('socket.gethostname') 224 @mock.patch('subprocess.check_output') 225 def test_is_external_run(self, mock_output, mock_hostname): 226 """Test method is_external_run.""" 227 mock_output.return_value = '' 228 mock_hostname.return_value = '' 229 self.assertTrue(atest_utils.is_external_run()) 230 231 mock_output.return_value = 'test@other.com' 232 mock_hostname.return_value = 'abc.com' 233 self.assertTrue(atest_utils.is_external_run()) 234 235 mock_output.return_value = 'test@other.com' 236 mock_hostname.return_value = 'abc.google.com' 237 self.assertFalse(atest_utils.is_external_run()) 238 239 mock_output.return_value = 'test@other.com' 240 mock_hostname.return_value = 'abc.google.def.com' 241 self.assertTrue(atest_utils.is_external_run()) 242 243 mock_output.return_value = 'test@google.com' 244 self.assertFalse(atest_utils.is_external_run()) 245 246 mock_output.return_value = 'test@other.com' 247 mock_hostname.return_value = 'c.googlers.com' 248 self.assertFalse(atest_utils.is_external_run()) 249 250 mock_output.return_value = 'test@other.com' 251 mock_hostname.return_value = 'a.googlers.com' 252 self.assertTrue(atest_utils.is_external_run()) 253 254 mock_output.side_effect = OSError() 255 self.assertTrue(atest_utils.is_external_run()) 256 257 mock_output.side_effect = subprocess.CalledProcessError(1, 'cmd') 258 self.assertTrue(atest_utils.is_external_run()) 259 260 @mock.patch('metrics.metrics_base.get_user_type') 261 def test_print_data_collection_notice(self, mock_get_user_type): 262 """Test method print_data_collection_notice.""" 263 264 # get_user_type return 1(external). 265 mock_get_user_type.return_value = 1 266 notice_str = ('\n==================\nNotice:\n' 267 ' We collect anonymous usage statistics' 268 ' in accordance with our' 269 ' Content Licenses (https://source.android.com/setup/start/licenses),' 270 ' Contributor License Agreement (https://opensource.google.com/docs/cla/),' 271 ' Privacy Policy (https://policies.google.com/privacy) and' 272 ' Terms of Service (https://policies.google.com/terms).' 273 '\n==================\n\n') 274 capture_output = StringIO() 275 sys.stdout = capture_output 276 atest_utils.print_data_collection_notice() 277 sys.stdout = sys.__stdout__ 278 uncolored_string = notice_str 279 self.assertEqual(capture_output.getvalue(), uncolored_string) 280 281 # get_user_type return 0(internal). 282 mock_get_user_type.return_value = 0 283 notice_str = ('\n==================\nNotice:\n' 284 ' We collect usage statistics' 285 ' in accordance with our' 286 ' Content Licenses (https://source.android.com/setup/start/licenses),' 287 ' Contributor License Agreement (https://cla.developers.google.com/),' 288 ' Privacy Policy (https://policies.google.com/privacy) and' 289 ' Terms of Service (https://policies.google.com/terms).' 290 '\n==================\n\n') 291 capture_output = StringIO() 292 sys.stdout = capture_output 293 atest_utils.print_data_collection_notice() 294 sys.stdout = sys.__stdout__ 295 uncolored_string = notice_str 296 self.assertEqual(capture_output.getvalue(), uncolored_string) 297 298 @mock.patch('builtins.input') 299 @mock.patch('json.load') 300 def test_update_test_runner_cmd(self, mock_json_load_data, mock_input): 301 """Test method handle_test_runner_cmd without enable do_verification.""" 302 former_cmd_str = 'Former cmds =' 303 write_result_str = 'Save result mapping to test_result' 304 tmp_file = tempfile.NamedTemporaryFile() 305 input_cmd = 'atest_args' 306 runner_cmds = ['cmd1', 'cmd2'] 307 capture_output = StringIO() 308 sys.stdout = capture_output 309 # Previous data is empty. Should not enter strtobool. 310 # If entered, exception will be raised cause test fail. 311 mock_json_load_data.return_value = {} 312 atest_utils.handle_test_runner_cmd(input_cmd, 313 runner_cmds, 314 do_verification=False, 315 result_path=tmp_file.name) 316 sys.stdout = sys.__stdout__ 317 self.assertEqual(capture_output.getvalue().find(former_cmd_str), -1) 318 # Previous data is the same as the new input. Should not enter strtobool. 319 # If entered, exception will be raised cause test fail 320 capture_output = StringIO() 321 sys.stdout = capture_output 322 mock_json_load_data.return_value = {input_cmd:runner_cmds} 323 atest_utils.handle_test_runner_cmd(input_cmd, 324 runner_cmds, 325 do_verification=False, 326 result_path=tmp_file.name) 327 sys.stdout = sys.__stdout__ 328 self.assertEqual(capture_output.getvalue().find(former_cmd_str), -1) 329 self.assertEqual(capture_output.getvalue().find(write_result_str), -1) 330 # Previous data has different cmds. Should enter strtobool not update, 331 # should not find write_result_str. 332 prev_cmds = ['cmd1'] 333 mock_input.return_value = 'n' 334 capture_output = StringIO() 335 sys.stdout = capture_output 336 mock_json_load_data.return_value = {input_cmd:prev_cmds} 337 atest_utils.handle_test_runner_cmd(input_cmd, 338 runner_cmds, 339 do_verification=False, 340 result_path=tmp_file.name) 341 sys.stdout = sys.__stdout__ 342 self.assertEqual(capture_output.getvalue().find(write_result_str), -1) 343 344 @mock.patch('json.load') 345 def test_verify_test_runner_cmd(self, mock_json_load_data): 346 """Test method handle_test_runner_cmd without enable update_result.""" 347 tmp_file = tempfile.NamedTemporaryFile() 348 input_cmd = 'atest_args' 349 runner_cmds = ['cmd1', 'cmd2'] 350 # Previous data is the same as the new input. Should not raise exception. 351 mock_json_load_data.return_value = {input_cmd:runner_cmds} 352 atest_utils.handle_test_runner_cmd(input_cmd, 353 runner_cmds, 354 do_verification=True, 355 result_path=tmp_file.name) 356 # Previous data has different cmds. Should enter strtobool and hit 357 # exception. 358 prev_cmds = ['cmd1'] 359 mock_json_load_data.return_value = {input_cmd:prev_cmds} 360 self.assertRaises(atest_error.DryRunVerificationError, 361 atest_utils.handle_test_runner_cmd, 362 input_cmd, 363 runner_cmds, 364 do_verification=True, 365 result_path=tmp_file.name) 366 367 def test_get_test_info_cache_path(self): 368 """Test method get_test_info_cache_path.""" 369 input_file_name = 'mytest_name' 370 cache_root = '/a/b/c' 371 expect_hashed_name = ('%s.cache' % hashlib.md5(str(input_file_name). 372 encode()).hexdigest()) 373 self.assertEqual(os.path.join(cache_root, expect_hashed_name), 374 atest_utils.get_test_info_cache_path(input_file_name, 375 cache_root)) 376 377 def test_get_and_load_cache(self): 378 """Test method update_test_info_cache and load_test_info_cache.""" 379 test_reference = 'myTestRefA' 380 test_cache_dir = tempfile.mkdtemp() 381 atest_utils.update_test_info_cache(test_reference, [TEST_INFO_A], 382 test_cache_dir) 383 unittest_utils.assert_equal_testinfo_sets( 384 self, set([TEST_INFO_A]), 385 atest_utils.load_test_info_cache(test_reference, test_cache_dir)) 386 387 @mock.patch('os.getcwd') 388 def test_get_build_cmd(self, mock_cwd): 389 """Test method get_build_cmd.""" 390 build_top = '/home/a/b/c' 391 rel_path = 'd/e' 392 mock_cwd.return_value = os.path.join(build_top, rel_path) 393 os_environ_mock = {constants.ANDROID_BUILD_TOP: build_top} 394 with mock.patch.dict('os.environ', os_environ_mock, clear=True): 395 expected_cmd = ['../../build/soong/soong_ui.bash', '--make-mode'] 396 self.assertEqual(expected_cmd, atest_utils.get_build_cmd()) 397 398 @mock.patch('subprocess.check_output') 399 def test_get_modified_files(self, mock_co): 400 """Test method get_modified_files""" 401 mock_co.side_effect = [ 402 x.encode('utf-8') for x in ['/a/b/', 403 '\n', 404 'test_fp1.java\nc/test_fp2.java']] 405 self.assertEqual({'/a/b/test_fp1.java', '/a/b/c/test_fp2.java'}, 406 atest_utils.get_modified_files('')) 407 mock_co.side_effect = [ 408 x.encode('utf-8') for x in ['/a/b/', 409 'test_fp4', 410 '/test_fp3.java']] 411 self.assertEqual({'/a/b/test_fp4', '/a/b/test_fp3.java'}, 412 atest_utils.get_modified_files('')) 413 414 def test_delimiter(self): 415 """Test method delimiter""" 416 self.assertEqual('\n===\n\n', atest_utils.delimiter('=', 3, 1, 2)) 417 418 def test_has_python_module(self): 419 """Test method has_python_module""" 420 self.assertFalse(atest_utils.has_python_module('M_M')) 421 self.assertTrue(atest_utils.has_python_module('os')) 422 423 @mock.patch.object(atest_utils, 'matched_tf_error_log', return_value=True) 424 def test_read_zip_single_text(self, _matched): 425 """Test method extract_zip_text include only one text file.""" 426 zip_path = os.path.join(unittest_constants.TEST_DATA_DIR, 427 TEST_ZIP_DATA_DIR, TEST_SINGLE_ZIP_NAME) 428 expect_content = '\nfile1_line1\nfile1_line2\n' 429 self.assertEqual(expect_content, atest_utils.extract_zip_text(zip_path)) 430 431 @mock.patch.object(atest_utils, 'matched_tf_error_log', return_value=True) 432 def test_read_zip_multi_text(self, _matched): 433 """Test method extract_zip_text include multiple text files.""" 434 zip_path = os.path.join(unittest_constants.TEST_DATA_DIR, 435 TEST_ZIP_DATA_DIR, TEST_MULTI_ZIP_NAME) 436 expect_content = ('\nfile1_line1\nfile1_line2\n\nfile2_line1\n' 437 'file2_line2\n') 438 self.assertEqual(expect_content, atest_utils.extract_zip_text(zip_path)) 439 440 def test_matched_tf_error_log(self): 441 """Test method extract_zip_text include multiple text files.""" 442 matched_content = '05-25 17:37:04 E/XXXXX YYYYY' 443 not_matched_content = '05-25 17:37:04 I/XXXXX YYYYY' 444 # Test matched content 445 self.assertEqual(True, 446 atest_utils.matched_tf_error_log(matched_content)) 447 # Test not matched content 448 self.assertEqual(False, 449 atest_utils.matched_tf_error_log(not_matched_content)) 450 451 @mock.patch('os.chmod') 452 @mock.patch('shutil.copy2') 453 @mock.patch('atest_utils.has_valid_cert') 454 @mock.patch('subprocess.check_output') 455 @mock.patch('os.path.exists') 456 def test_get_flakes(self, mock_path_exists, mock_output, mock_valid_cert, 457 _cpc, _cm): 458 """Test method get_flakes.""" 459 # Test par file does not exist. 460 mock_path_exists.return_value = False 461 self.assertEqual(None, atest_utils.get_flakes()) 462 # Test par file exists. 463 mock_path_exists.return_value = True 464 mock_output.return_value = (b'flake_percent:0.10001\n' 465 b'postsubmit_flakes_per_week:12.0') 466 mock_valid_cert.return_value = True 467 expected_flake_info = {'flake_percent':'0.10001', 468 'postsubmit_flakes_per_week':'12.0'} 469 self.assertEqual(expected_flake_info, 470 atest_utils.get_flakes()) 471 # Test no valid cert 472 mock_valid_cert.return_value = False 473 self.assertEqual(None, 474 atest_utils.get_flakes()) 475 476 @mock.patch('subprocess.check_call') 477 def test_has_valid_cert(self, mock_call): 478 """Test method has_valid_cert.""" 479 # raise subprocess.CalledProcessError 480 mock_call.raiseError.side_effect = subprocess.CalledProcessError 481 self.assertFalse(atest_utils.has_valid_cert()) 482 with mock.patch("constants.CERT_STATUS_CMD", ''): 483 self.assertFalse(atest_utils.has_valid_cert()) 484 with mock.patch("constants.CERT_STATUS_CMD", 'CMD'): 485 # has valid cert 486 mock_call.return_value = 0 487 self.assertTrue(atest_utils.has_valid_cert()) 488 # no valid cert 489 mock_call.return_value = 4 490 self.assertFalse(atest_utils.has_valid_cert()) 491 492 # pylint: disable=no-member 493 def test_read_test_record_proto(self): 494 """Test method read_test_record.""" 495 test_record_file_path = os.path.join(unittest_constants.TEST_DATA_DIR, 496 "test_record.proto.testonly") 497 test_record = atest_utils.read_test_record(test_record_file_path) 498 self.assertEqual(test_record.children[0].inline_test_record.test_record_id, 499 'x86 hello_world_test') 500 501 502if __name__ == "__main__": 503 unittest.main() 504