1#!/usr/bin/env python 2# 3# Copyright 2018, The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17"""Unittests for atest_utils.""" 18 19import hashlib 20import os 21import subprocess 22import sys 23import tempfile 24import unittest 25import mock 26 27import atest_error 28import atest_utils 29import constants 30import unittest_utils 31from test_finders import test_info 32 33if sys.version_info[0] == 2: 34 from StringIO import StringIO 35else: 36 from io import StringIO 37 38TEST_MODULE_NAME_A = 'ModuleNameA' 39TEST_RUNNER_A = 'FakeTestRunnerA' 40TEST_BUILD_TARGET_A = set(['bt1', 'bt2']) 41TEST_DATA_A = {'test_data_a_1': 'a1', 42 'test_data_a_2': 'a2'} 43TEST_SUITE_A = 'FakeSuiteA' 44TEST_MODULE_CLASS_A = 'FAKE_MODULE_CLASS_A' 45TEST_INSTALL_LOC_A = set(['host', 'device']) 46TEST_FINDER_A = 'MODULE' 47TEST_INFO_A = test_info.TestInfo(TEST_MODULE_NAME_A, TEST_RUNNER_A, 48 TEST_BUILD_TARGET_A, TEST_DATA_A, 49 TEST_SUITE_A, TEST_MODULE_CLASS_A, 50 TEST_INSTALL_LOC_A) 51TEST_INFO_A.test_finder = TEST_FINDER_A 52 53#pylint: disable=protected-access 54class AtestUtilsUnittests(unittest.TestCase): 55 """Unit tests for atest_utils.py""" 56 57 def test_capture_fail_section_has_fail_section(self): 58 """Test capture_fail_section when has fail section.""" 59 test_list = ['AAAAAA', 'FAILED: Error1', '^\n', 'Error2\n', 60 '[ 6% 191/2997] BBBBBB\n', 'CCCCC', 61 '[ 20% 322/2997] DDDDDD\n', 'EEEEE'] 62 want_list = ['FAILED: Error1', '^\n', 'Error2\n'] 63 self.assertEqual(want_list, 64 atest_utils._capture_fail_section(test_list)) 65 66 def test_capture_fail_section_no_fail_section(self): 67 """Test capture_fail_section when no fail section.""" 68 test_list = ['[ 6% 191/2997] XXXXX', 'YYYYY: ZZZZZ'] 69 want_list = [] 70 self.assertEqual(want_list, 71 atest_utils._capture_fail_section(test_list)) 72 73 def test_is_test_mapping(self): 74 """Test method is_test_mapping.""" 75 tm_option_attributes = [ 76 'test_mapping', 77 'include_subdirs' 78 ] 79 for attr_to_test in tm_option_attributes: 80 args = mock.Mock() 81 for attr in tm_option_attributes: 82 setattr(args, attr, attr == attr_to_test) 83 args.tests = [] 84 self.assertTrue( 85 atest_utils.is_test_mapping(args), 86 'Failed to validate option %s' % attr_to_test) 87 88 args = mock.Mock() 89 for attr in tm_option_attributes: 90 setattr(args, attr, False) 91 args.tests = [':group_name'] 92 self.assertTrue(atest_utils.is_test_mapping(args)) 93 94 args = mock.Mock() 95 for attr in tm_option_attributes: 96 setattr(args, attr, False) 97 args.tests = [':test1', 'test2'] 98 self.assertFalse(atest_utils.is_test_mapping(args)) 99 100 args = mock.Mock() 101 for attr in tm_option_attributes: 102 setattr(args, attr, False) 103 args.tests = ['test2'] 104 self.assertFalse(atest_utils.is_test_mapping(args)) 105 106 @mock.patch('curses.tigetnum') 107 def test_has_colors(self, mock_curses_tigetnum): 108 """Test method _has_colors.""" 109 # stream is file I/O 110 stream = open('/tmp/test_has_colors.txt', 'wb') 111 self.assertFalse(atest_utils._has_colors(stream)) 112 stream.close() 113 114 # stream is not a tty(terminal). 115 stream = mock.Mock() 116 stream.isatty.return_value = False 117 self.assertFalse(atest_utils._has_colors(stream)) 118 119 # stream is a tty(terminal) and colors < 2. 120 stream = mock.Mock() 121 stream.isatty.return_value = True 122 mock_curses_tigetnum.return_value = 1 123 self.assertFalse(atest_utils._has_colors(stream)) 124 125 # stream is a tty(terminal) and colors > 2. 126 stream = mock.Mock() 127 stream.isatty.return_value = True 128 mock_curses_tigetnum.return_value = 256 129 self.assertTrue(atest_utils._has_colors(stream)) 130 131 132 @mock.patch('atest_utils._has_colors') 133 def test_colorize(self, mock_has_colors): 134 """Test method colorize.""" 135 original_str = "test string" 136 green_no = 2 137 138 # _has_colors() return False. 139 mock_has_colors.return_value = False 140 converted_str = atest_utils.colorize(original_str, green_no, 141 highlight=True) 142 self.assertEqual(original_str, converted_str) 143 144 # Green with highlight. 145 mock_has_colors.return_value = True 146 converted_str = atest_utils.colorize(original_str, green_no, 147 highlight=True) 148 green_highlight_string = '\x1b[1;42m%s\x1b[0m' % original_str 149 self.assertEqual(green_highlight_string, converted_str) 150 151 # Green, no highlight. 152 mock_has_colors.return_value = True 153 converted_str = atest_utils.colorize(original_str, green_no, 154 highlight=False) 155 green_no_highlight_string = '\x1b[1;32m%s\x1b[0m' % original_str 156 self.assertEqual(green_no_highlight_string, converted_str) 157 158 159 @mock.patch('atest_utils._has_colors') 160 def test_colorful_print(self, mock_has_colors): 161 """Test method colorful_print.""" 162 testing_str = "color_print_test" 163 green_no = 2 164 165 # _has_colors() return False. 166 mock_has_colors.return_value = False 167 capture_output = StringIO() 168 sys.stdout = capture_output 169 atest_utils.colorful_print(testing_str, green_no, highlight=True, 170 auto_wrap=False) 171 sys.stdout = sys.__stdout__ 172 uncolored_string = testing_str 173 self.assertEqual(capture_output.getvalue(), uncolored_string) 174 175 # Green with highlight, but no wrap. 176 mock_has_colors.return_value = True 177 capture_output = StringIO() 178 sys.stdout = capture_output 179 atest_utils.colorful_print(testing_str, green_no, highlight=True, 180 auto_wrap=False) 181 sys.stdout = sys.__stdout__ 182 green_highlight_no_wrap_string = '\x1b[1;42m%s\x1b[0m' % testing_str 183 self.assertEqual(capture_output.getvalue(), 184 green_highlight_no_wrap_string) 185 186 # Green, no highlight, no wrap. 187 mock_has_colors.return_value = True 188 capture_output = StringIO() 189 sys.stdout = capture_output 190 atest_utils.colorful_print(testing_str, green_no, highlight=False, 191 auto_wrap=False) 192 sys.stdout = sys.__stdout__ 193 green_no_high_no_wrap_string = '\x1b[1;32m%s\x1b[0m' % testing_str 194 self.assertEqual(capture_output.getvalue(), 195 green_no_high_no_wrap_string) 196 197 # Green with highlight and wrap. 198 mock_has_colors.return_value = True 199 capture_output = StringIO() 200 sys.stdout = capture_output 201 atest_utils.colorful_print(testing_str, green_no, highlight=True, 202 auto_wrap=True) 203 sys.stdout = sys.__stdout__ 204 green_highlight_wrap_string = '\x1b[1;42m%s\x1b[0m\n' % testing_str 205 self.assertEqual(capture_output.getvalue(), green_highlight_wrap_string) 206 207 # Green with wrap, but no highlight. 208 mock_has_colors.return_value = True 209 capture_output = StringIO() 210 sys.stdout = capture_output 211 atest_utils.colorful_print(testing_str, green_no, highlight=False, 212 auto_wrap=True) 213 sys.stdout = sys.__stdout__ 214 green_wrap_no_highlight_string = '\x1b[1;32m%s\x1b[0m\n' % testing_str 215 self.assertEqual(capture_output.getvalue(), 216 green_wrap_no_highlight_string) 217 218 @mock.patch('socket.gethostname') 219 @mock.patch('subprocess.check_output') 220 def test_is_external_run(self, mock_output, mock_hostname): 221 """Test method is_external_run.""" 222 mock_output.return_value = '' 223 mock_hostname.return_value = '' 224 self.assertTrue(atest_utils.is_external_run()) 225 226 mock_output.return_value = 'test@other.com' 227 mock_hostname.return_value = 'abc.com' 228 self.assertTrue(atest_utils.is_external_run()) 229 230 mock_output.return_value = 'test@other.com' 231 mock_hostname.return_value = 'abc.google.com' 232 self.assertFalse(atest_utils.is_external_run()) 233 234 mock_output.return_value = 'test@other.com' 235 mock_hostname.return_value = 'abc.google.def.com' 236 self.assertTrue(atest_utils.is_external_run()) 237 238 mock_output.return_value = 'test@google.com' 239 self.assertFalse(atest_utils.is_external_run()) 240 241 mock_output.return_value = 'test@other.com' 242 mock_hostname.return_value = 'c.googlers.com' 243 self.assertFalse(atest_utils.is_external_run()) 244 245 mock_output.return_value = 'test@other.com' 246 mock_hostname.return_value = 'a.googlers.com' 247 self.assertTrue(atest_utils.is_external_run()) 248 249 mock_output.side_effect = OSError() 250 self.assertTrue(atest_utils.is_external_run()) 251 252 mock_output.side_effect = subprocess.CalledProcessError(1, 'cmd') 253 self.assertTrue(atest_utils.is_external_run()) 254 255 @mock.patch('metrics.metrics_base.get_user_type') 256 def test_print_data_collection_notice(self, mock_get_user_type): 257 """Test method print_data_collection_notice.""" 258 259 # get_user_type return 1(external). 260 mock_get_user_type.return_value = 1 261 notice_str = ('\n==================\nNotice:\n' 262 ' We collect anonymous usage statistics' 263 ' in accordance with our' 264 ' Content Licenses (https://source.android.com/setup/start/licenses),' 265 ' Contributor License Agreement (https://opensource.google.com/docs/cla/),' 266 ' Privacy Policy (https://policies.google.com/privacy) and' 267 ' Terms of Service (https://policies.google.com/terms).' 268 '\n==================\n\n') 269 capture_output = StringIO() 270 sys.stdout = capture_output 271 atest_utils.print_data_collection_notice() 272 sys.stdout = sys.__stdout__ 273 uncolored_string = notice_str 274 self.assertEqual(capture_output.getvalue(), uncolored_string) 275 276 # get_user_type return 0(internal). 277 mock_get_user_type.return_value = 0 278 notice_str = ('\n==================\nNotice:\n' 279 ' We collect usage statistics' 280 ' in accordance with our' 281 ' Content Licenses (https://source.android.com/setup/start/licenses),' 282 ' Contributor License Agreement (https://cla.developers.google.com/),' 283 ' Privacy Policy (https://policies.google.com/privacy) and' 284 ' Terms of Service (https://policies.google.com/terms).' 285 '\n==================\n\n') 286 capture_output = StringIO() 287 sys.stdout = capture_output 288 atest_utils.print_data_collection_notice() 289 sys.stdout = sys.__stdout__ 290 uncolored_string = notice_str 291 self.assertEqual(capture_output.getvalue(), uncolored_string) 292 293 @mock.patch('__builtin__.raw_input') 294 @mock.patch('json.load') 295 def test_update_test_runner_cmd(self, mock_json_load_data, mock_raw_input): 296 """Test method handle_test_runner_cmd without enable do_verification.""" 297 former_cmd_str = 'Former cmds =' 298 write_result_str = 'Save result mapping to test_result' 299 tmp_file = tempfile.NamedTemporaryFile() 300 input_cmd = 'atest_args' 301 runner_cmds = ['cmd1', 'cmd2'] 302 capture_output = StringIO() 303 sys.stdout = capture_output 304 # Previous data is empty. Should not enter strtobool. 305 # If entered, exception will be raised cause test fail. 306 mock_json_load_data.return_value = {} 307 atest_utils.handle_test_runner_cmd(input_cmd, 308 runner_cmds, 309 do_verification=False, 310 result_path=tmp_file.name) 311 sys.stdout = sys.__stdout__ 312 self.assertEqual(capture_output.getvalue().find(former_cmd_str), -1) 313 # Previous data is the same as the new input. Should not enter strtobool. 314 # If entered, exception will be raised cause test fail 315 capture_output = StringIO() 316 sys.stdout = capture_output 317 mock_json_load_data.return_value = {input_cmd:runner_cmds} 318 atest_utils.handle_test_runner_cmd(input_cmd, 319 runner_cmds, 320 do_verification=False, 321 result_path=tmp_file.name) 322 sys.stdout = sys.__stdout__ 323 self.assertEqual(capture_output.getvalue().find(former_cmd_str), -1) 324 self.assertEqual(capture_output.getvalue().find(write_result_str), -1) 325 # Previous data has different cmds. Should enter strtobool not update, 326 # should not find write_result_str. 327 prev_cmds = ['cmd1'] 328 mock_raw_input.return_value = 'n' 329 capture_output = StringIO() 330 sys.stdout = capture_output 331 mock_json_load_data.return_value = {input_cmd:prev_cmds} 332 atest_utils.handle_test_runner_cmd(input_cmd, 333 runner_cmds, 334 do_verification=False, 335 result_path=tmp_file.name) 336 sys.stdout = sys.__stdout__ 337 self.assertEqual(capture_output.getvalue().find(write_result_str), -1) 338 339 @mock.patch('json.load') 340 def test_verify_test_runner_cmd(self, mock_json_load_data): 341 """Test method handle_test_runner_cmd without enable update_result.""" 342 tmp_file = tempfile.NamedTemporaryFile() 343 input_cmd = 'atest_args' 344 runner_cmds = ['cmd1', 'cmd2'] 345 # Previous data is the same as the new input. Should not raise exception. 346 mock_json_load_data.return_value = {input_cmd:runner_cmds} 347 atest_utils.handle_test_runner_cmd(input_cmd, 348 runner_cmds, 349 do_verification=True, 350 result_path=tmp_file.name) 351 # Previous data has different cmds. Should enter strtobool and hit 352 # exception. 353 prev_cmds = ['cmd1'] 354 mock_json_load_data.return_value = {input_cmd:prev_cmds} 355 self.assertRaises(atest_error.DryRunVerificationError, 356 atest_utils.handle_test_runner_cmd, 357 input_cmd, 358 runner_cmds, 359 do_verification=True, 360 result_path=tmp_file.name) 361 362 def test_get_test_info_cache_path(self): 363 """Test method get_test_info_cache_path.""" 364 input_file_name = 'mytest_name' 365 cache_root = '/a/b/c' 366 expect_hashed_name = ('%s.cache' % hashlib.md5(str(input_file_name). 367 encode()).hexdigest()) 368 self.assertEqual(os.path.join(cache_root, expect_hashed_name), 369 atest_utils.get_test_info_cache_path(input_file_name, 370 cache_root)) 371 372 def test_get_and_load_cache(self): 373 """Test method update_test_info_cache and load_test_info_cache.""" 374 test_reference = 'myTestRefA' 375 test_cache_dir = tempfile.mkdtemp() 376 atest_utils.update_test_info_cache(test_reference, [TEST_INFO_A], 377 test_cache_dir) 378 unittest_utils.assert_equal_testinfo_sets( 379 self, set([TEST_INFO_A]), 380 atest_utils.load_test_info_cache(test_reference, test_cache_dir)) 381 382 @mock.patch('os.getcwd') 383 def test_get_build_cmd(self, mock_cwd): 384 """Test method get_build_cmd.""" 385 build_top = '/home/a/b/c' 386 rel_path = 'd/e' 387 mock_cwd.return_value = os.path.join(build_top, rel_path) 388 os_environ_mock = {constants.ANDROID_BUILD_TOP: build_top} 389 with mock.patch.dict('os.environ', os_environ_mock, clear=True): 390 expected_cmd = ['../../build/soong/soong_ui.bash', '--make-mode'] 391 self.assertEqual(expected_cmd, atest_utils.get_build_cmd()) 392 393 @mock.patch('subprocess.check_output') 394 def test_get_modified_files(self, mock_co): 395 """Test method get_modified_files""" 396 mock_co.side_effect = ['/a/b/', 397 '\n', 398 'test_fp1.java\nc/test_fp2.java'] 399 self.assertEqual({'/a/b/test_fp1.java', '/a/b/c/test_fp2.java'}, 400 atest_utils.get_modified_files('')) 401 mock_co.side_effect = ['/a/b/', 402 'test_fp4', 403 '/test_fp3.java'] 404 self.assertEqual({'/a/b/test_fp4', '/a/b/test_fp3.java'}, 405 atest_utils.get_modified_files('')) 406 407 408if __name__ == "__main__": 409 unittest.main() 410