1#!/usr/bin/env python
2#
3# Copyright 2018, The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""Unittests for atest_utils."""
18
19import hashlib
20import os
21import subprocess
22import sys
23import tempfile
24import unittest
25import mock
26
27import atest_error
28import atest_utils
29import constants
30import unittest_utils
31from test_finders import test_info
32
33if sys.version_info[0] == 2:
34    from StringIO import StringIO
35else:
36    from io import StringIO
37
38TEST_MODULE_NAME_A = 'ModuleNameA'
39TEST_RUNNER_A = 'FakeTestRunnerA'
40TEST_BUILD_TARGET_A = set(['bt1', 'bt2'])
41TEST_DATA_A = {'test_data_a_1': 'a1',
42               'test_data_a_2': 'a2'}
43TEST_SUITE_A = 'FakeSuiteA'
44TEST_MODULE_CLASS_A = 'FAKE_MODULE_CLASS_A'
45TEST_INSTALL_LOC_A = set(['host', 'device'])
46TEST_FINDER_A = 'MODULE'
47TEST_INFO_A = test_info.TestInfo(TEST_MODULE_NAME_A, TEST_RUNNER_A,
48                                 TEST_BUILD_TARGET_A, TEST_DATA_A,
49                                 TEST_SUITE_A, TEST_MODULE_CLASS_A,
50                                 TEST_INSTALL_LOC_A)
51TEST_INFO_A.test_finder = TEST_FINDER_A
52
53#pylint: disable=protected-access
54class AtestUtilsUnittests(unittest.TestCase):
55    """Unit tests for atest_utils.py"""
56
57    def test_capture_fail_section_has_fail_section(self):
58        """Test capture_fail_section when has fail section."""
59        test_list = ['AAAAAA', 'FAILED: Error1', '^\n', 'Error2\n',
60                     '[  6% 191/2997] BBBBBB\n', 'CCCCC',
61                     '[  20% 322/2997] DDDDDD\n', 'EEEEE']
62        want_list = ['FAILED: Error1', '^\n', 'Error2\n']
63        self.assertEqual(want_list,
64                         atest_utils._capture_fail_section(test_list))
65
66    def test_capture_fail_section_no_fail_section(self):
67        """Test capture_fail_section when no fail section."""
68        test_list = ['[ 6% 191/2997] XXXXX', 'YYYYY: ZZZZZ']
69        want_list = []
70        self.assertEqual(want_list,
71                         atest_utils._capture_fail_section(test_list))
72
73    def test_is_test_mapping(self):
74        """Test method is_test_mapping."""
75        tm_option_attributes = [
76            'test_mapping',
77            'include_subdirs'
78        ]
79        for attr_to_test in tm_option_attributes:
80            args = mock.Mock()
81            for attr in tm_option_attributes:
82                setattr(args, attr, attr == attr_to_test)
83            args.tests = []
84            self.assertTrue(
85                atest_utils.is_test_mapping(args),
86                'Failed to validate option %s' % attr_to_test)
87
88        args = mock.Mock()
89        for attr in tm_option_attributes:
90            setattr(args, attr, False)
91        args.tests = [':group_name']
92        self.assertTrue(atest_utils.is_test_mapping(args))
93
94        args = mock.Mock()
95        for attr in tm_option_attributes:
96            setattr(args, attr, False)
97        args.tests = [':test1', 'test2']
98        self.assertFalse(atest_utils.is_test_mapping(args))
99
100        args = mock.Mock()
101        for attr in tm_option_attributes:
102            setattr(args, attr, False)
103        args.tests = ['test2']
104        self.assertFalse(atest_utils.is_test_mapping(args))
105
106    @mock.patch('curses.tigetnum')
107    def test_has_colors(self, mock_curses_tigetnum):
108        """Test method _has_colors."""
109        # stream is file I/O
110        stream = open('/tmp/test_has_colors.txt', 'wb')
111        self.assertFalse(atest_utils._has_colors(stream))
112        stream.close()
113
114        # stream is not a tty(terminal).
115        stream = mock.Mock()
116        stream.isatty.return_value = False
117        self.assertFalse(atest_utils._has_colors(stream))
118
119        # stream is a tty(terminal) and colors < 2.
120        stream = mock.Mock()
121        stream.isatty.return_value = True
122        mock_curses_tigetnum.return_value = 1
123        self.assertFalse(atest_utils._has_colors(stream))
124
125        # stream is a tty(terminal) and colors > 2.
126        stream = mock.Mock()
127        stream.isatty.return_value = True
128        mock_curses_tigetnum.return_value = 256
129        self.assertTrue(atest_utils._has_colors(stream))
130
131
132    @mock.patch('atest_utils._has_colors')
133    def test_colorize(self, mock_has_colors):
134        """Test method colorize."""
135        original_str = "test string"
136        green_no = 2
137
138        # _has_colors() return False.
139        mock_has_colors.return_value = False
140        converted_str = atest_utils.colorize(original_str, green_no,
141                                             highlight=True)
142        self.assertEqual(original_str, converted_str)
143
144        # Green with highlight.
145        mock_has_colors.return_value = True
146        converted_str = atest_utils.colorize(original_str, green_no,
147                                             highlight=True)
148        green_highlight_string = '\x1b[1;42m%s\x1b[0m' % original_str
149        self.assertEqual(green_highlight_string, converted_str)
150
151        # Green, no highlight.
152        mock_has_colors.return_value = True
153        converted_str = atest_utils.colorize(original_str, green_no,
154                                             highlight=False)
155        green_no_highlight_string = '\x1b[1;32m%s\x1b[0m' % original_str
156        self.assertEqual(green_no_highlight_string, converted_str)
157
158
159    @mock.patch('atest_utils._has_colors')
160    def test_colorful_print(self, mock_has_colors):
161        """Test method colorful_print."""
162        testing_str = "color_print_test"
163        green_no = 2
164
165        # _has_colors() return False.
166        mock_has_colors.return_value = False
167        capture_output = StringIO()
168        sys.stdout = capture_output
169        atest_utils.colorful_print(testing_str, green_no, highlight=True,
170                                   auto_wrap=False)
171        sys.stdout = sys.__stdout__
172        uncolored_string = testing_str
173        self.assertEqual(capture_output.getvalue(), uncolored_string)
174
175        # Green with highlight, but no wrap.
176        mock_has_colors.return_value = True
177        capture_output = StringIO()
178        sys.stdout = capture_output
179        atest_utils.colorful_print(testing_str, green_no, highlight=True,
180                                   auto_wrap=False)
181        sys.stdout = sys.__stdout__
182        green_highlight_no_wrap_string = '\x1b[1;42m%s\x1b[0m' % testing_str
183        self.assertEqual(capture_output.getvalue(),
184                         green_highlight_no_wrap_string)
185
186        # Green, no highlight, no wrap.
187        mock_has_colors.return_value = True
188        capture_output = StringIO()
189        sys.stdout = capture_output
190        atest_utils.colorful_print(testing_str, green_no, highlight=False,
191                                   auto_wrap=False)
192        sys.stdout = sys.__stdout__
193        green_no_high_no_wrap_string = '\x1b[1;32m%s\x1b[0m' % testing_str
194        self.assertEqual(capture_output.getvalue(),
195                         green_no_high_no_wrap_string)
196
197        # Green with highlight and wrap.
198        mock_has_colors.return_value = True
199        capture_output = StringIO()
200        sys.stdout = capture_output
201        atest_utils.colorful_print(testing_str, green_no, highlight=True,
202                                   auto_wrap=True)
203        sys.stdout = sys.__stdout__
204        green_highlight_wrap_string = '\x1b[1;42m%s\x1b[0m\n' % testing_str
205        self.assertEqual(capture_output.getvalue(), green_highlight_wrap_string)
206
207        # Green with wrap, but no highlight.
208        mock_has_colors.return_value = True
209        capture_output = StringIO()
210        sys.stdout = capture_output
211        atest_utils.colorful_print(testing_str, green_no, highlight=False,
212                                   auto_wrap=True)
213        sys.stdout = sys.__stdout__
214        green_wrap_no_highlight_string = '\x1b[1;32m%s\x1b[0m\n' % testing_str
215        self.assertEqual(capture_output.getvalue(),
216                         green_wrap_no_highlight_string)
217
218    @mock.patch('socket.gethostname')
219    @mock.patch('subprocess.check_output')
220    def test_is_external_run(self, mock_output, mock_hostname):
221        """Test method is_external_run."""
222        mock_output.return_value = ''
223        mock_hostname.return_value = ''
224        self.assertTrue(atest_utils.is_external_run())
225
226        mock_output.return_value = 'test@other.com'
227        mock_hostname.return_value = 'abc.com'
228        self.assertTrue(atest_utils.is_external_run())
229
230        mock_output.return_value = 'test@other.com'
231        mock_hostname.return_value = 'abc.google.com'
232        self.assertFalse(atest_utils.is_external_run())
233
234        mock_output.return_value = 'test@other.com'
235        mock_hostname.return_value = 'abc.google.def.com'
236        self.assertTrue(atest_utils.is_external_run())
237
238        mock_output.return_value = 'test@google.com'
239        self.assertFalse(atest_utils.is_external_run())
240
241        mock_output.return_value = 'test@other.com'
242        mock_hostname.return_value = 'c.googlers.com'
243        self.assertFalse(atest_utils.is_external_run())
244
245        mock_output.return_value = 'test@other.com'
246        mock_hostname.return_value = 'a.googlers.com'
247        self.assertTrue(atest_utils.is_external_run())
248
249        mock_output.side_effect = OSError()
250        self.assertTrue(atest_utils.is_external_run())
251
252        mock_output.side_effect = subprocess.CalledProcessError(1, 'cmd')
253        self.assertTrue(atest_utils.is_external_run())
254
255    @mock.patch('metrics.metrics_base.get_user_type')
256    def test_print_data_collection_notice(self, mock_get_user_type):
257        """Test method print_data_collection_notice."""
258
259        # get_user_type return 1(external).
260        mock_get_user_type.return_value = 1
261        notice_str = ('\n==================\nNotice:\n'
262                      '  We collect anonymous usage statistics'
263                      ' in accordance with our'
264                      ' Content Licenses (https://source.android.com/setup/start/licenses),'
265                      ' Contributor License Agreement (https://opensource.google.com/docs/cla/),'
266                      ' Privacy Policy (https://policies.google.com/privacy) and'
267                      ' Terms of Service (https://policies.google.com/terms).'
268                      '\n==================\n\n')
269        capture_output = StringIO()
270        sys.stdout = capture_output
271        atest_utils.print_data_collection_notice()
272        sys.stdout = sys.__stdout__
273        uncolored_string = notice_str
274        self.assertEqual(capture_output.getvalue(), uncolored_string)
275
276        # get_user_type return 0(internal).
277        mock_get_user_type.return_value = 0
278        notice_str = ('\n==================\nNotice:\n'
279                      '  We collect usage statistics'
280                      ' in accordance with our'
281                      ' Content Licenses (https://source.android.com/setup/start/licenses),'
282                      ' Contributor License Agreement (https://cla.developers.google.com/),'
283                      ' Privacy Policy (https://policies.google.com/privacy) and'
284                      ' Terms of Service (https://policies.google.com/terms).'
285                      '\n==================\n\n')
286        capture_output = StringIO()
287        sys.stdout = capture_output
288        atest_utils.print_data_collection_notice()
289        sys.stdout = sys.__stdout__
290        uncolored_string = notice_str
291        self.assertEqual(capture_output.getvalue(), uncolored_string)
292
293    @mock.patch('__builtin__.raw_input')
294    @mock.patch('json.load')
295    def test_update_test_runner_cmd(self, mock_json_load_data, mock_raw_input):
296        """Test method handle_test_runner_cmd without enable do_verification."""
297        former_cmd_str = 'Former cmds ='
298        write_result_str = 'Save result mapping to test_result'
299        tmp_file = tempfile.NamedTemporaryFile()
300        input_cmd = 'atest_args'
301        runner_cmds = ['cmd1', 'cmd2']
302        capture_output = StringIO()
303        sys.stdout = capture_output
304        # Previous data is empty. Should not enter strtobool.
305        # If entered, exception will be raised cause test fail.
306        mock_json_load_data.return_value = {}
307        atest_utils.handle_test_runner_cmd(input_cmd,
308                                           runner_cmds,
309                                           do_verification=False,
310                                           result_path=tmp_file.name)
311        sys.stdout = sys.__stdout__
312        self.assertEqual(capture_output.getvalue().find(former_cmd_str), -1)
313        # Previous data is the same as the new input. Should not enter strtobool.
314        # If entered, exception will be raised cause test fail
315        capture_output = StringIO()
316        sys.stdout = capture_output
317        mock_json_load_data.return_value = {input_cmd:runner_cmds}
318        atest_utils.handle_test_runner_cmd(input_cmd,
319                                           runner_cmds,
320                                           do_verification=False,
321                                           result_path=tmp_file.name)
322        sys.stdout = sys.__stdout__
323        self.assertEqual(capture_output.getvalue().find(former_cmd_str), -1)
324        self.assertEqual(capture_output.getvalue().find(write_result_str), -1)
325        # Previous data has different cmds. Should enter strtobool not update,
326        # should not find write_result_str.
327        prev_cmds = ['cmd1']
328        mock_raw_input.return_value = 'n'
329        capture_output = StringIO()
330        sys.stdout = capture_output
331        mock_json_load_data.return_value = {input_cmd:prev_cmds}
332        atest_utils.handle_test_runner_cmd(input_cmd,
333                                           runner_cmds,
334                                           do_verification=False,
335                                           result_path=tmp_file.name)
336        sys.stdout = sys.__stdout__
337        self.assertEqual(capture_output.getvalue().find(write_result_str), -1)
338
339    @mock.patch('json.load')
340    def test_verify_test_runner_cmd(self, mock_json_load_data):
341        """Test method handle_test_runner_cmd without enable update_result."""
342        tmp_file = tempfile.NamedTemporaryFile()
343        input_cmd = 'atest_args'
344        runner_cmds = ['cmd1', 'cmd2']
345        # Previous data is the same as the new input. Should not raise exception.
346        mock_json_load_data.return_value = {input_cmd:runner_cmds}
347        atest_utils.handle_test_runner_cmd(input_cmd,
348                                           runner_cmds,
349                                           do_verification=True,
350                                           result_path=tmp_file.name)
351        # Previous data has different cmds. Should enter strtobool and hit
352        # exception.
353        prev_cmds = ['cmd1']
354        mock_json_load_data.return_value = {input_cmd:prev_cmds}
355        self.assertRaises(atest_error.DryRunVerificationError,
356                          atest_utils.handle_test_runner_cmd,
357                          input_cmd,
358                          runner_cmds,
359                          do_verification=True,
360                          result_path=tmp_file.name)
361
362    def test_get_test_info_cache_path(self):
363        """Test method get_test_info_cache_path."""
364        input_file_name = 'mytest_name'
365        cache_root = '/a/b/c'
366        expect_hashed_name = ('%s.cache' % hashlib.md5(str(input_file_name).
367                                                       encode()).hexdigest())
368        self.assertEqual(os.path.join(cache_root, expect_hashed_name),
369                         atest_utils.get_test_info_cache_path(input_file_name,
370                                                              cache_root))
371
372    def test_get_and_load_cache(self):
373        """Test method update_test_info_cache and load_test_info_cache."""
374        test_reference = 'myTestRefA'
375        test_cache_dir = tempfile.mkdtemp()
376        atest_utils.update_test_info_cache(test_reference, [TEST_INFO_A],
377                                           test_cache_dir)
378        unittest_utils.assert_equal_testinfo_sets(
379            self, set([TEST_INFO_A]),
380            atest_utils.load_test_info_cache(test_reference, test_cache_dir))
381
382    @mock.patch('os.getcwd')
383    def test_get_build_cmd(self, mock_cwd):
384        """Test method get_build_cmd."""
385        build_top = '/home/a/b/c'
386        rel_path = 'd/e'
387        mock_cwd.return_value = os.path.join(build_top, rel_path)
388        os_environ_mock = {constants.ANDROID_BUILD_TOP: build_top}
389        with mock.patch.dict('os.environ', os_environ_mock, clear=True):
390            expected_cmd = ['../../build/soong/soong_ui.bash', '--make-mode']
391            self.assertEqual(expected_cmd, atest_utils.get_build_cmd())
392
393    @mock.patch('subprocess.check_output')
394    def test_get_modified_files(self, mock_co):
395        """Test method get_modified_files"""
396        mock_co.side_effect = ['/a/b/',
397                               '\n',
398                               'test_fp1.java\nc/test_fp2.java']
399        self.assertEqual({'/a/b/test_fp1.java', '/a/b/c/test_fp2.java'},
400                         atest_utils.get_modified_files(''))
401        mock_co.side_effect = ['/a/b/',
402                               'test_fp4',
403                               '/test_fp3.java']
404        self.assertEqual({'/a/b/test_fp4', '/a/b/test_fp3.java'},
405                         atest_utils.get_modified_files(''))
406
407
408if __name__ == "__main__":
409    unittest.main()
410