# # Copyright (C) 2018 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import logging import os import shutil import tempfile import environment_variables as env import syzkaller_test_case from vts.runners.host import asserts from vts.runners.host import base_test from vts.runners.host import config_parser from vts.runners.host import keys from vts.runners.host import test_runner from vts.utils.python.common import cmd_utils from vts.utils.python.controllers import adb from vts.utils.python.controllers import android_device from vts.utils.python.gcs import gcs_api_utils class SyzkallerTest(base_test.BaseTestClass): """Runs Syzkaller tests on target.""" start_vts_agents = False def setUpClass(self): """Creates a remote shell instance, and copies data files. Attributes: _env: dict, a mapping from key to environment path of this test. _gcs_api_utils: a GcsApiUtils object used for uploading logs. _dut: an android device controller object used for getting device info. """ required_params = [ keys.ConfigKeys.IKEY_SERVICE_JSON_PATH, keys.ConfigKeys.IKEY_FUZZING_GCS_BUCKET_NAME, keys.ConfigKeys.IKEY_SYZKALLER_PACKAGES_PATH, keys.ConfigKeys.IKEY_SYZKALLER_TEMPLATE_PATH ] self.getUserParams(required_params) _temp_dir = tempfile.mkdtemp() self._env = dict() self._env['temp_dir'] = _temp_dir self._env['syzkaller_dir'] = os.path.join(_temp_dir, env.syzkaller_dir) self._env['syzkaller_bin_dir'] = os.path.join(_temp_dir, env.syzkaller_bin_dir) self._env['syzkaller_work_dir'] = os.path.join(_temp_dir, env.syzkaller_work_dir) self._env['template_cfg'] = os.path.join(_temp_dir, env.template_cfg) self._gcs_api_utils = gcs_api_utils.GcsApiUtils( self.service_key_json_path, self.fuzzing_gcs_bucket_name) self._dut = self.android_devices[0] def FetchSyzkaller(self): """Fetch Syzkaller program from x20 and make sure files are executable.""" try: logging.info('Fetching Syzkaller program.') shutil.copytree(self.syzkaller_packages_path, self._env['syzkaller_dir']) logging.info('Fetching Syzkaller template configuration.') shutil.copy(self.syzkaller_template_path, self._env['temp_dir']) except IOError, e: logging.exception(e) self.skipAllTests( 'Syzkaller program is not available. Skipping all tests.') for root, dirs, files in os.walk(self._env['syzkaller_bin_dir']): for filepath in files: os.chmod(os.path.join(root, filepath), 0755) def CreateTestCases(self): """Create syzkaller test cases. Returns: test_cases, list, the list of test_cases for this test. """ test_cases = [] test_cases.append( syzkaller_test_case.SyzkallerTestCase(self._env, 'linux/arm64', self._dut.serial, 'adb', 'false', 12)) return test_cases def RunTestCase(self, test_case): """Run a syzkaller test case. Args: test_case: SyzkallerTestCase object, the test case to run. """ test_command = test_case.GetRunCommand() stdout, stderr, err_code = cmd_utils.ExecuteOneShellCommand( test_command, timeout=18000) if err_code: logging.error(stderr) else: logging.info(stdout) self.ReportTestCase(test_case) def ReportTestCase(self, test_case): """Asserts the result of the test case and uploads report to GCS. Args: test_case: SyzkallerTestCase object, the test case to report. """ self.AssertTestCaseResult(test_case) self._gcs_api_utils.UploadDir(test_case._work_dir_path, 'kernelfuzz_reports') def AssertTestCaseResult(self, test_case): """Asserts that test case finished as expected. If crash reports were generated during the test, reports test as failure. If crash reports were not generated during the test, reports test as success. Args: test_case: SyzkallerTestCase object, the test case to assert as failure or success. """ logging.info('Test case results.') crash_dir = os.path.join(test_case._work_dir_path, 'crashes') if os.listdir(crash_dir) == []: logging.info('%s did not cause crash in our device.', test_case._test_name) else: asserts.fail('%s caused crash in our device.', test_case._test_name) def tearDownClass(self): """Removes the temporary directory used for Syzkaller.""" logging.debug('Temporary directory %s is being deleted', self._env['temp_dir']) try: shutil.rmtree(self._env['temp_dir']) except OSError as e: logging.exception(e) def generateKernelFuzzerTests(self): """Runs kernel fuzzer tests.""" self.FetchSyzkaller() self.runGeneratedTests( test_func=self.RunTestCase, settings=self.CreateTestCases(), name_func=lambda x: x._test_name) if __name__ == '__main__': test_runner.main()