1#!/usr/bin/env python3
2#
3# Copyright 2018, The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""Unittests for atest_utils."""
18
19# pylint: disable=line-too-long
20
21import hashlib
22import os
23import subprocess
24import sys
25import tempfile
26import unittest
27
28from io import StringIO
29from unittest import mock
30
31import atest_error
32import atest_utils
33import constants
34import unittest_utils
35import unittest_constants
36
37from test_finders import test_info
38
39
40TEST_MODULE_NAME_A = 'ModuleNameA'
41TEST_RUNNER_A = 'FakeTestRunnerA'
42TEST_BUILD_TARGET_A = set(['bt1', 'bt2'])
43TEST_DATA_A = {'test_data_a_1': 'a1',
44               'test_data_a_2': 'a2'}
45TEST_SUITE_A = 'FakeSuiteA'
46TEST_MODULE_CLASS_A = 'FAKE_MODULE_CLASS_A'
47TEST_INSTALL_LOC_A = set(['host', 'device'])
48TEST_FINDER_A = 'MODULE'
49TEST_INFO_A = test_info.TestInfo(TEST_MODULE_NAME_A, TEST_RUNNER_A,
50                                 TEST_BUILD_TARGET_A, TEST_DATA_A,
51                                 TEST_SUITE_A, TEST_MODULE_CLASS_A,
52                                 TEST_INSTALL_LOC_A)
53TEST_INFO_A.test_finder = TEST_FINDER_A
54TEST_ZIP_DATA_DIR = 'zip_files'
55TEST_SINGLE_ZIP_NAME = 'single_file.zip'
56TEST_MULTI_ZIP_NAME = 'multi_file.zip'
57
58#pylint: disable=protected-access
59class AtestUtilsUnittests(unittest.TestCase):
60    """Unit tests for atest_utils.py"""
61
62    def test_capture_fail_section_has_fail_section(self):
63        """Test capture_fail_section when has fail section."""
64        test_list = ['AAAAAA', 'FAILED: Error1', '^\n', 'Error2\n',
65                     '[  6% 191/2997] BBBBBB\n', 'CCCCC',
66                     '[  20% 322/2997] DDDDDD\n', 'EEEEE']
67        want_list = ['FAILED: Error1', '^\n', 'Error2\n']
68        self.assertEqual(want_list,
69                         atest_utils._capture_fail_section(test_list))
70
71    def test_capture_fail_section_no_fail_section(self):
72        """Test capture_fail_section when no fail section."""
73        test_list = ['[ 6% 191/2997] XXXXX', 'YYYYY: ZZZZZ']
74        want_list = []
75        self.assertEqual(want_list,
76                         atest_utils._capture_fail_section(test_list))
77
78    def test_is_test_mapping(self):
79        """Test method is_test_mapping."""
80        tm_option_attributes = [
81            'test_mapping',
82            'include_subdirs'
83        ]
84        for attr_to_test in tm_option_attributes:
85            args = mock.Mock()
86            for attr in tm_option_attributes:
87                setattr(args, attr, attr == attr_to_test)
88            args.tests = []
89            self.assertTrue(
90                atest_utils.is_test_mapping(args),
91                'Failed to validate option %s' % attr_to_test)
92
93        args = mock.Mock()
94        for attr in tm_option_attributes:
95            setattr(args, attr, False)
96        args.tests = [':group_name']
97        self.assertTrue(atest_utils.is_test_mapping(args))
98
99        args = mock.Mock()
100        for attr in tm_option_attributes:
101            setattr(args, attr, False)
102        args.tests = [':test1', 'test2']
103        self.assertFalse(atest_utils.is_test_mapping(args))
104
105        args = mock.Mock()
106        for attr in tm_option_attributes:
107            setattr(args, attr, False)
108        args.tests = ['test2']
109        self.assertFalse(atest_utils.is_test_mapping(args))
110
111    @mock.patch('curses.tigetnum')
112    def test_has_colors(self, mock_curses_tigetnum):
113        """Test method _has_colors."""
114        # stream is file I/O
115        stream = open('/tmp/test_has_colors.txt', 'wb')
116        self.assertFalse(atest_utils._has_colors(stream))
117        stream.close()
118
119        # stream is not a tty(terminal).
120        stream = mock.Mock()
121        stream.isatty.return_value = False
122        self.assertFalse(atest_utils._has_colors(stream))
123
124        # stream is a tty(terminal) and colors < 2.
125        stream = mock.Mock()
126        stream.isatty.return_value = True
127        mock_curses_tigetnum.return_value = 1
128        self.assertFalse(atest_utils._has_colors(stream))
129
130        # stream is a tty(terminal) and colors > 2.
131        stream = mock.Mock()
132        stream.isatty.return_value = True
133        mock_curses_tigetnum.return_value = 256
134        self.assertTrue(atest_utils._has_colors(stream))
135
136
137    @mock.patch('atest_utils._has_colors')
138    def test_colorize(self, mock_has_colors):
139        """Test method colorize."""
140        original_str = "test string"
141        green_no = 2
142
143        # _has_colors() return False.
144        mock_has_colors.return_value = False
145        converted_str = atest_utils.colorize(original_str, green_no,
146                                             highlight=True)
147        self.assertEqual(original_str, converted_str)
148
149        # Green with highlight.
150        mock_has_colors.return_value = True
151        converted_str = atest_utils.colorize(original_str, green_no,
152                                             highlight=True)
153        green_highlight_string = '\x1b[1;42m%s\x1b[0m' % original_str
154        self.assertEqual(green_highlight_string, converted_str)
155
156        # Green, no highlight.
157        mock_has_colors.return_value = True
158        converted_str = atest_utils.colorize(original_str, green_no,
159                                             highlight=False)
160        green_no_highlight_string = '\x1b[1;32m%s\x1b[0m' % original_str
161        self.assertEqual(green_no_highlight_string, converted_str)
162
163
164    @mock.patch('atest_utils._has_colors')
165    def test_colorful_print(self, mock_has_colors):
166        """Test method colorful_print."""
167        testing_str = "color_print_test"
168        green_no = 2
169
170        # _has_colors() return False.
171        mock_has_colors.return_value = False
172        capture_output = StringIO()
173        sys.stdout = capture_output
174        atest_utils.colorful_print(testing_str, green_no, highlight=True,
175                                   auto_wrap=False)
176        sys.stdout = sys.__stdout__
177        uncolored_string = testing_str
178        self.assertEqual(capture_output.getvalue(), uncolored_string)
179
180        # Green with highlight, but no wrap.
181        mock_has_colors.return_value = True
182        capture_output = StringIO()
183        sys.stdout = capture_output
184        atest_utils.colorful_print(testing_str, green_no, highlight=True,
185                                   auto_wrap=False)
186        sys.stdout = sys.__stdout__
187        green_highlight_no_wrap_string = '\x1b[1;42m%s\x1b[0m' % testing_str
188        self.assertEqual(capture_output.getvalue(),
189                         green_highlight_no_wrap_string)
190
191        # Green, no highlight, no wrap.
192        mock_has_colors.return_value = True
193        capture_output = StringIO()
194        sys.stdout = capture_output
195        atest_utils.colorful_print(testing_str, green_no, highlight=False,
196                                   auto_wrap=False)
197        sys.stdout = sys.__stdout__
198        green_no_high_no_wrap_string = '\x1b[1;32m%s\x1b[0m' % testing_str
199        self.assertEqual(capture_output.getvalue(),
200                         green_no_high_no_wrap_string)
201
202        # Green with highlight and wrap.
203        mock_has_colors.return_value = True
204        capture_output = StringIO()
205        sys.stdout = capture_output
206        atest_utils.colorful_print(testing_str, green_no, highlight=True,
207                                   auto_wrap=True)
208        sys.stdout = sys.__stdout__
209        green_highlight_wrap_string = '\x1b[1;42m%s\x1b[0m\n' % testing_str
210        self.assertEqual(capture_output.getvalue(), green_highlight_wrap_string)
211
212        # Green with wrap, but no highlight.
213        mock_has_colors.return_value = True
214        capture_output = StringIO()
215        sys.stdout = capture_output
216        atest_utils.colorful_print(testing_str, green_no, highlight=False,
217                                   auto_wrap=True)
218        sys.stdout = sys.__stdout__
219        green_wrap_no_highlight_string = '\x1b[1;32m%s\x1b[0m\n' % testing_str
220        self.assertEqual(capture_output.getvalue(),
221                         green_wrap_no_highlight_string)
222
223    @mock.patch('socket.gethostname')
224    @mock.patch('subprocess.check_output')
225    def test_is_external_run(self, mock_output, mock_hostname):
226        """Test method is_external_run."""
227        mock_output.return_value = ''
228        mock_hostname.return_value = ''
229        self.assertTrue(atest_utils.is_external_run())
230
231        mock_output.return_value = 'test@other.com'
232        mock_hostname.return_value = 'abc.com'
233        self.assertTrue(atest_utils.is_external_run())
234
235        mock_output.return_value = 'test@other.com'
236        mock_hostname.return_value = 'abc.google.com'
237        self.assertFalse(atest_utils.is_external_run())
238
239        mock_output.return_value = 'test@other.com'
240        mock_hostname.return_value = 'abc.google.def.com'
241        self.assertTrue(atest_utils.is_external_run())
242
243        mock_output.return_value = 'test@google.com'
244        self.assertFalse(atest_utils.is_external_run())
245
246        mock_output.return_value = 'test@other.com'
247        mock_hostname.return_value = 'c.googlers.com'
248        self.assertFalse(atest_utils.is_external_run())
249
250        mock_output.return_value = 'test@other.com'
251        mock_hostname.return_value = 'a.googlers.com'
252        self.assertTrue(atest_utils.is_external_run())
253
254        mock_output.side_effect = OSError()
255        self.assertTrue(atest_utils.is_external_run())
256
257        mock_output.side_effect = subprocess.CalledProcessError(1, 'cmd')
258        self.assertTrue(atest_utils.is_external_run())
259
260    @mock.patch('metrics.metrics_base.get_user_type')
261    def test_print_data_collection_notice(self, mock_get_user_type):
262        """Test method print_data_collection_notice."""
263
264        # get_user_type return 1(external).
265        mock_get_user_type.return_value = 1
266        notice_str = ('\n==================\nNotice:\n'
267                      '  We collect anonymous usage statistics'
268                      ' in accordance with our'
269                      ' Content Licenses (https://source.android.com/setup/start/licenses),'
270                      ' Contributor License Agreement (https://opensource.google.com/docs/cla/),'
271                      ' Privacy Policy (https://policies.google.com/privacy) and'
272                      ' Terms of Service (https://policies.google.com/terms).'
273                      '\n==================\n\n')
274        capture_output = StringIO()
275        sys.stdout = capture_output
276        atest_utils.print_data_collection_notice()
277        sys.stdout = sys.__stdout__
278        uncolored_string = notice_str
279        self.assertEqual(capture_output.getvalue(), uncolored_string)
280
281        # get_user_type return 0(internal).
282        mock_get_user_type.return_value = 0
283        notice_str = ('\n==================\nNotice:\n'
284                      '  We collect usage statistics'
285                      ' in accordance with our'
286                      ' Content Licenses (https://source.android.com/setup/start/licenses),'
287                      ' Contributor License Agreement (https://cla.developers.google.com/),'
288                      ' Privacy Policy (https://policies.google.com/privacy) and'
289                      ' Terms of Service (https://policies.google.com/terms).'
290                      '\n==================\n\n')
291        capture_output = StringIO()
292        sys.stdout = capture_output
293        atest_utils.print_data_collection_notice()
294        sys.stdout = sys.__stdout__
295        uncolored_string = notice_str
296        self.assertEqual(capture_output.getvalue(), uncolored_string)
297
298    @mock.patch('builtins.input')
299    @mock.patch('json.load')
300    def test_update_test_runner_cmd(self, mock_json_load_data, mock_input):
301        """Test method handle_test_runner_cmd without enable do_verification."""
302        former_cmd_str = 'Former cmds ='
303        write_result_str = 'Save result mapping to test_result'
304        tmp_file = tempfile.NamedTemporaryFile()
305        input_cmd = 'atest_args'
306        runner_cmds = ['cmd1', 'cmd2']
307        capture_output = StringIO()
308        sys.stdout = capture_output
309        # Previous data is empty. Should not enter strtobool.
310        # If entered, exception will be raised cause test fail.
311        mock_json_load_data.return_value = {}
312        atest_utils.handle_test_runner_cmd(input_cmd,
313                                           runner_cmds,
314                                           do_verification=False,
315                                           result_path=tmp_file.name)
316        sys.stdout = sys.__stdout__
317        self.assertEqual(capture_output.getvalue().find(former_cmd_str), -1)
318        # Previous data is the same as the new input. Should not enter strtobool.
319        # If entered, exception will be raised cause test fail
320        capture_output = StringIO()
321        sys.stdout = capture_output
322        mock_json_load_data.return_value = {input_cmd:runner_cmds}
323        atest_utils.handle_test_runner_cmd(input_cmd,
324                                           runner_cmds,
325                                           do_verification=False,
326                                           result_path=tmp_file.name)
327        sys.stdout = sys.__stdout__
328        self.assertEqual(capture_output.getvalue().find(former_cmd_str), -1)
329        self.assertEqual(capture_output.getvalue().find(write_result_str), -1)
330        # Previous data has different cmds. Should enter strtobool not update,
331        # should not find write_result_str.
332        prev_cmds = ['cmd1']
333        mock_input.return_value = 'n'
334        capture_output = StringIO()
335        sys.stdout = capture_output
336        mock_json_load_data.return_value = {input_cmd:prev_cmds}
337        atest_utils.handle_test_runner_cmd(input_cmd,
338                                           runner_cmds,
339                                           do_verification=False,
340                                           result_path=tmp_file.name)
341        sys.stdout = sys.__stdout__
342        self.assertEqual(capture_output.getvalue().find(write_result_str), -1)
343
344    @mock.patch('json.load')
345    def test_verify_test_runner_cmd(self, mock_json_load_data):
346        """Test method handle_test_runner_cmd without enable update_result."""
347        tmp_file = tempfile.NamedTemporaryFile()
348        input_cmd = 'atest_args'
349        runner_cmds = ['cmd1', 'cmd2']
350        # Previous data is the same as the new input. Should not raise exception.
351        mock_json_load_data.return_value = {input_cmd:runner_cmds}
352        atest_utils.handle_test_runner_cmd(input_cmd,
353                                           runner_cmds,
354                                           do_verification=True,
355                                           result_path=tmp_file.name)
356        # Previous data has different cmds. Should enter strtobool and hit
357        # exception.
358        prev_cmds = ['cmd1']
359        mock_json_load_data.return_value = {input_cmd:prev_cmds}
360        self.assertRaises(atest_error.DryRunVerificationError,
361                          atest_utils.handle_test_runner_cmd,
362                          input_cmd,
363                          runner_cmds,
364                          do_verification=True,
365                          result_path=tmp_file.name)
366
367    def test_get_test_info_cache_path(self):
368        """Test method get_test_info_cache_path."""
369        input_file_name = 'mytest_name'
370        cache_root = '/a/b/c'
371        expect_hashed_name = ('%s.cache' % hashlib.md5(str(input_file_name).
372                                                       encode()).hexdigest())
373        self.assertEqual(os.path.join(cache_root, expect_hashed_name),
374                         atest_utils.get_test_info_cache_path(input_file_name,
375                                                              cache_root))
376
377    def test_get_and_load_cache(self):
378        """Test method update_test_info_cache and load_test_info_cache."""
379        test_reference = 'myTestRefA'
380        test_cache_dir = tempfile.mkdtemp()
381        atest_utils.update_test_info_cache(test_reference, [TEST_INFO_A],
382                                           test_cache_dir)
383        unittest_utils.assert_equal_testinfo_sets(
384            self, set([TEST_INFO_A]),
385            atest_utils.load_test_info_cache(test_reference, test_cache_dir))
386
387    @mock.patch('os.getcwd')
388    def test_get_build_cmd(self, mock_cwd):
389        """Test method get_build_cmd."""
390        build_top = '/home/a/b/c'
391        rel_path = 'd/e'
392        mock_cwd.return_value = os.path.join(build_top, rel_path)
393        os_environ_mock = {constants.ANDROID_BUILD_TOP: build_top}
394        with mock.patch.dict('os.environ', os_environ_mock, clear=True):
395            expected_cmd = ['../../build/soong/soong_ui.bash', '--make-mode']
396            self.assertEqual(expected_cmd, atest_utils.get_build_cmd())
397
398    @mock.patch('subprocess.check_output')
399    def test_get_modified_files(self, mock_co):
400        """Test method get_modified_files"""
401        mock_co.side_effect = [
402            x.encode('utf-8') for x in ['/a/b/',
403                                        '\n',
404                                        'test_fp1.java\nc/test_fp2.java']]
405        self.assertEqual({'/a/b/test_fp1.java', '/a/b/c/test_fp2.java'},
406                         atest_utils.get_modified_files(''))
407        mock_co.side_effect = [
408            x.encode('utf-8') for x in ['/a/b/',
409                                        'test_fp4',
410                                        '/test_fp3.java']]
411        self.assertEqual({'/a/b/test_fp4', '/a/b/test_fp3.java'},
412                         atest_utils.get_modified_files(''))
413
414    def test_delimiter(self):
415        """Test method delimiter"""
416        self.assertEqual('\n===\n\n', atest_utils.delimiter('=', 3, 1, 2))
417
418    def test_has_python_module(self):
419        """Test method has_python_module"""
420        self.assertFalse(atest_utils.has_python_module('M_M'))
421        self.assertTrue(atest_utils.has_python_module('os'))
422
423    @mock.patch.object(atest_utils, 'matched_tf_error_log', return_value=True)
424    def test_read_zip_single_text(self, _matched):
425        """Test method extract_zip_text include only one text file."""
426        zip_path = os.path.join(unittest_constants.TEST_DATA_DIR,
427                                TEST_ZIP_DATA_DIR, TEST_SINGLE_ZIP_NAME)
428        expect_content = '\nfile1_line1\nfile1_line2\n'
429        self.assertEqual(expect_content, atest_utils.extract_zip_text(zip_path))
430
431    @mock.patch.object(atest_utils, 'matched_tf_error_log', return_value=True)
432    def test_read_zip_multi_text(self, _matched):
433        """Test method extract_zip_text include multiple text files."""
434        zip_path = os.path.join(unittest_constants.TEST_DATA_DIR,
435                                TEST_ZIP_DATA_DIR, TEST_MULTI_ZIP_NAME)
436        expect_content = ('\nfile1_line1\nfile1_line2\n\nfile2_line1\n'
437                          'file2_line2\n')
438        self.assertEqual(expect_content, atest_utils.extract_zip_text(zip_path))
439
440    def test_matched_tf_error_log(self):
441        """Test method extract_zip_text include multiple text files."""
442        matched_content = '05-25 17:37:04 E/XXXXX YYYYY'
443        not_matched_content = '05-25 17:37:04 I/XXXXX YYYYY'
444        # Test matched content
445        self.assertEqual(True,
446                         atest_utils.matched_tf_error_log(matched_content))
447        # Test not matched content
448        self.assertEqual(False,
449                         atest_utils.matched_tf_error_log(not_matched_content))
450
451    @mock.patch('os.chmod')
452    @mock.patch('shutil.copy2')
453    @mock.patch('atest_utils.has_valid_cert')
454    @mock.patch('subprocess.check_output')
455    @mock.patch('os.path.exists')
456    def test_get_flakes(self, mock_path_exists, mock_output, mock_valid_cert,
457                        _cpc, _cm):
458        """Test method get_flakes."""
459        # Test par file does not exist.
460        mock_path_exists.return_value = False
461        self.assertEqual(None, atest_utils.get_flakes())
462        # Test par file exists.
463        mock_path_exists.return_value = True
464        mock_output.return_value = (b'flake_percent:0.10001\n'
465                                    b'postsubmit_flakes_per_week:12.0')
466        mock_valid_cert.return_value = True
467        expected_flake_info = {'flake_percent':'0.10001',
468                               'postsubmit_flakes_per_week':'12.0'}
469        self.assertEqual(expected_flake_info,
470                         atest_utils.get_flakes())
471        # Test no valid cert
472        mock_valid_cert.return_value = False
473        self.assertEqual(None,
474                         atest_utils.get_flakes())
475
476    @mock.patch('subprocess.check_call')
477    def test_has_valid_cert(self, mock_call):
478        """Test method has_valid_cert."""
479        # raise subprocess.CalledProcessError
480        mock_call.raiseError.side_effect = subprocess.CalledProcessError
481        self.assertFalse(atest_utils.has_valid_cert())
482        with mock.patch("constants.CERT_STATUS_CMD", ''):
483            self.assertFalse(atest_utils.has_valid_cert())
484        with mock.patch("constants.CERT_STATUS_CMD", 'CMD'):
485            # has valid cert
486            mock_call.return_value = 0
487            self.assertTrue(atest_utils.has_valid_cert())
488            # no valid cert
489            mock_call.return_value = 4
490            self.assertFalse(atest_utils.has_valid_cert())
491
492    # pylint: disable=no-member
493    def test_read_test_record_proto(self):
494        """Test method read_test_record."""
495        test_record_file_path = os.path.join(unittest_constants.TEST_DATA_DIR,
496                                             "test_record.proto.testonly")
497        test_record = atest_utils.read_test_record(test_record_file_path)
498        self.assertEqual(test_record.children[0].inline_test_record.test_record_id,
499                         'x86 hello_world_test')
500
501
502if __name__ == "__main__":
503    unittest.main()
504