1# Copyright 2019 - The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Tests for pull."""
15import unittest
16
17import os
18import tempfile
19import mock
20
21from acloud import errors
22from acloud.internal import constants
23from acloud.internal.lib import driver_test_lib
24from acloud.internal.lib import ssh
25from acloud.internal.lib import utils
26from acloud.list import list as list_instances
27from acloud.public import config
28from acloud.pull import pull
29
30
31class PullTest(driver_test_lib.BaseDriverTest):
32    """Test pull."""
33
34    # pylint: disable=no-member
35    def testPullFileFromInstance(self):
36        """test PullFileFromInstance."""
37        cfg = mock.MagicMock()
38        cfg.ssh_private_key_path = "fake_ssh_path"
39        cfg.extra_args_ssh_tunnel = ""
40        instance = mock.MagicMock()
41        instance.ip = "1.1.1.1"
42        # Multiple selected files case.
43        selected_files = ["file1.log", "file2.log"]
44        self.Patch(pull, "SelectLogFileToPull", return_value=selected_files)
45        self.Patch(pull, "GetDownloadLogFolder", return_value="fake_folder")
46        self.Patch(pull, "PullLogs")
47        self.Patch(pull, "DisplayLog")
48        pull.PullFileFromInstance(cfg, instance)
49        self.assertEqual(pull.DisplayLog.call_count, 0)
50
51        # Only one file selected case.
52        selected_files = ["file1.log"]
53        self.Patch(pull, "SelectLogFileToPull", return_value=selected_files)
54        pull.PullFileFromInstance(cfg, instance)
55        self.assertEqual(pull.DisplayLog.call_count, 1)
56
57    # pylint: disable=no-member
58    def testPullLogs(self):
59        """test PullLogs."""
60        _ssh = mock.MagicMock()
61        self.Patch(utils, "PrintColorString")
62        log_files = ["file1.log", "file2.log"]
63        download_folder = "/fake_folder"
64        pull.PullLogs(_ssh, log_files, download_folder)
65        self.assertEqual(_ssh.ScpPullFile.call_count, 2)
66        utils.PrintColorString.assert_called_once()
67
68    @mock.patch.object(ssh.Ssh, "Run")
69    def testDisplayLog(self, mock_ssh_run):
70        """Test DisplayLog."""
71        fake_ip = ssh.IP(external="1.1.1.1", internal="10.1.1.1")
72        _ssh = ssh.Ssh(ip=fake_ip,
73                       user=constants.GCE_USER,
74                       ssh_private_key_path="/fake/acloud_rea")
75        self.Patch(utils, "GetUserAnswerYes", return_value="Y")
76        log_file = "file1.log"
77        pull.DisplayLog(_ssh, log_file)
78        expected_cmd = "tail -f -n +1 %s" % log_file
79        mock_ssh_run.assert_has_calls([
80            mock.call(expected_cmd, show_output=True)])
81
82    def testGetDownloadLogFolder(self):
83        """test GetDownloadLogFolder."""
84        self.Patch(tempfile, "gettempdir", return_value="/tmp")
85        self.Patch(os.path, "exists", return_value=True)
86        instance = "instance"
87        expected_path = "/tmp/instance"
88        self.assertEqual(pull.GetDownloadLogFolder(instance), expected_path)
89
90    def testSelectLogFileToPull(self):
91        """test choose log files from the remote instance."""
92        _ssh = mock.MagicMock()
93
94        # Test only one log file case
95        log_files = ["file1.log"]
96        self.Patch(pull, "GetAllLogFilePaths", return_value=log_files)
97        expected_result = ["file1.log"]
98        self.assertEqual(pull.SelectLogFileToPull(_ssh), expected_result)
99
100        # Test no log files case
101        self.Patch(pull, "GetAllLogFilePaths", return_value=[])
102        with self.assertRaises(errors.CheckPathError):
103            pull.SelectLogFileToPull(_ssh)
104
105        # Test two log files case.
106        log_files = ["file1.log", "file2.log"]
107        choose_log = ["file2.log"]
108        self.Patch(pull, "GetAllLogFilePaths", return_value=log_files)
109        self.Patch(utils, "GetAnswerFromList", return_value=choose_log)
110        expected_result = ["file2.log"]
111        self.assertEqual(pull.SelectLogFileToPull(_ssh), expected_result)
112
113        # Test user provided file name exist.
114        log_files = ["/home/vsoc-01/cuttlefish_runtime/file1.log",
115                     "/home/vsoc-01/cuttlefish_runtime/file2.log"]
116        input_file = "file1.log"
117        self.Patch(pull, "GetAllLogFilePaths", return_value=log_files)
118        expected_result = ["/home/vsoc-01/cuttlefish_runtime/file1.log"]
119        self.assertEqual(pull.SelectLogFileToPull(_ssh, input_file), expected_result)
120
121        # Test user provided file name not exist.
122        log_files = ["/home/vsoc-01/cuttlefish_runtime/file1.log",
123                     "/home/vsoc-01/cuttlefish_runtime/file2.log"]
124        input_file = "not_exist.log"
125        self.Patch(pull, "GetAllLogFilePaths", return_value=log_files)
126        with self.assertRaises(errors.CheckPathError):
127            pull.SelectLogFileToPull(_ssh, input_file)
128
129    def testFilterLogfiles(self):
130        """test filer log file from black list."""
131        # Filter out file name is "kernel".
132        files = ["kernel.log", "logcat", "kernel"]
133        expected_result = ["kernel.log", "logcat"]
134        self.assertEqual(pull.FilterLogfiles(files), expected_result)
135
136        # Filter out file extension is ".img".
137        files = ["kernel.log", "system.img", "userdata.img", "launcher.log"]
138        expected_result = ["kernel.log", "launcher.log"]
139        self.assertEqual(pull.FilterLogfiles(files), expected_result)
140
141    @mock.patch.object(pull, "PullFileFromInstance")
142    def testRun(self, mock_pull_file):
143        """test Run."""
144        cfg = mock.MagicMock()
145        args = mock.MagicMock()
146        instance_obj = mock.MagicMock()
147        # Test case with provided instance name.
148        args.instance_name = "instance_1"
149        args.file_name = "file1.log"
150        args.no_prompt = True
151        self.Patch(config, "GetAcloudConfig", return_value=cfg)
152        self.Patch(list_instances, "GetInstancesFromInstanceNames",
153                   return_value=[instance_obj])
154        pull.Run(args)
155        mock_pull_file.assert_has_calls([
156            mock.call(cfg, instance_obj, args.file_name, args.no_prompt)])
157
158        # Test case for user select one instance to pull log.
159        selected_instance = mock.MagicMock()
160        self.Patch(list_instances, "ChooseOneRemoteInstance",
161                   return_value=selected_instance)
162        args.instance_name = None
163        pull.Run(args)
164        mock_pull_file.assert_has_calls([
165            mock.call(cfg, selected_instance, args.file_name, args.no_prompt)])
166
167
168if __name__ == '__main__':
169    unittest.main()
170