1#!/usr/bin/env python3
2#
3#   Copyright 2018 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import logging
18import os
19import shlex
20import signal
21import subprocess
22import sys
23import time
24from threading import Thread
25
26_on_windows = sys.platform == 'win32'
27
28
29class ProcessError(Exception):
30    """Raised when invalid operations are run on a Process."""
31
32
33class Process(object):
34    """A Process object used to run various commands.
35
36    Attributes:
37        _command: The initial command to run.
38        _subprocess_kwargs: The kwargs to send to Popen for more control over
39                            execution.
40        _process: The subprocess.Popen object currently executing a process.
41        _listening_thread: The thread that is listening for the process to stop.
42        _redirection_thread: The thread that is redirecting process output.
43        _on_output_callback: The callback to call when output is received.
44        _on_terminate_callback: The callback to call when the process terminates
45                                without stop() being called first.
46        _started: Whether or not start() was called.
47        _stopped: Whether or not stop() was called.
48    """
49
50    def __init__(self, command, **kwargs):
51        """Creates a Process object.
52
53        Note that this constructor does not begin the process. To start the
54        process, use Process.start().
55        """
56        # Split command string into list if shell=True is not specified
57        if not kwargs.get('shell', False) and isinstance(command, str):
58            command = shlex.split(command)
59        self._command = command
60        self._subprocess_kwargs = kwargs
61        if _on_windows:
62            self._subprocess_kwargs['creationflags'] = (
63                subprocess.CREATE_NEW_PROCESS_GROUP)
64        else:
65            self._subprocess_kwargs['start_new_session'] = True
66        self._process = None
67
68        self._listening_thread = None
69        self._redirection_thread = None
70        self._on_output_callback = lambda *args, **kw: None
71        self._binary_output = False
72        self._on_terminate_callback = lambda *args, **kw: ''
73
74        self._started = False
75        self._stopped = False
76
77    def set_on_output_callback(self, on_output_callback, binary=False):
78        """Sets the on_output_callback function.
79
80        Args:
81            on_output_callback: The function to be called when output is sent to
82                the output. The output callback has the following signature:
83
84                >>> def on_output_callback(output_line):
85                >>>     return None
86
87            binary: If True, read the process output as raw binary.
88        Returns:
89            self
90        """
91        self._on_output_callback = on_output_callback
92        self._binary_output = binary
93        return self
94
95    def set_on_terminate_callback(self, on_terminate_callback):
96        """Sets the on_self_terminate callback function.
97
98        Args:
99            on_terminate_callback: The function to be called when the process
100                has terminated on its own. The callback has the following
101                signature:
102
103                >>> def on_self_terminate_callback(popen_process):
104                >>>     return 'command to run' or None
105
106                If a string is returned, the string returned will be the command
107                line used to run the command again. If None is returned, the
108                process will end without restarting.
109
110        Returns:
111            self
112        """
113        self._on_terminate_callback = on_terminate_callback
114        return self
115
116    def start(self):
117        """Starts the process's execution."""
118        if self._started:
119            raise ProcessError('Process has already started.')
120        self._started = True
121        self._process = None
122
123        self._listening_thread = Thread(target=self._exec_loop)
124        self._listening_thread.start()
125
126        time_up_at = time.time() + 1
127
128        while self._process is None:
129            if time.time() > time_up_at:
130                raise OSError('Unable to open process!')
131
132        self._stopped = False
133
134    @staticmethod
135    def _get_timeout_left(timeout, start_time):
136        return max(.1, timeout - (time.time() - start_time))
137
138    def is_running(self):
139        """Checks that the underlying Popen process is still running
140
141        Returns:
142            True if the process is running.
143        """
144        return self._process is not None and self._process.poll() is None
145
146    def _join_threads(self):
147        """Waits for the threads associated with the process to terminate."""
148        if self._listening_thread is not None:
149            self._listening_thread.join()
150            self._listening_thread = None
151
152        if self._redirection_thread is not None:
153            self._redirection_thread.join()
154            self._redirection_thread = None
155
156    def _kill_process(self):
157        """Kills the underlying process/process group. Implementation is
158        platform-dependent."""
159        if _on_windows:
160            subprocess.check_call('taskkill /F /T /PID %s' % self._process.pid)
161        else:
162            pgid = os.getpgid(self._process.pid)
163            os.killpg(pgid, signal.SIGKILL)
164
165    def wait(self, kill_timeout=60.0):
166        """Waits for the process to finish execution.
167
168        If the process has reached the kill_timeout, the process will be killed
169        instead.
170
171        Note: the on_self_terminate callback will NOT be called when calling
172        this function.
173
174        Args:
175            kill_timeout: The amount of time to wait until killing the process.
176        """
177        if self._stopped:
178            raise ProcessError('Process is already being stopped.')
179        self._stopped = True
180
181        try:
182            self._process.wait(kill_timeout)
183        except subprocess.TimeoutExpired:
184            self._kill_process()
185        finally:
186            self._join_threads()
187            self._started = False
188
189    def stop(self):
190        """Stops the process.
191
192        This command is effectively equivalent to kill, but gives time to clean
193        up any related work on the process, such as output redirection.
194
195        Note: the on_self_terminate callback will NOT be called when calling
196        this function.
197        """
198        self.wait(0)
199
200    def _redirect_output(self):
201        """Redirects the output from the command into the on_output_callback."""
202        if self._binary_output:
203            while True:
204                data = self._process.stdout.read(1024)
205
206                if not data:
207                    return
208                else:
209                    self._on_output_callback(data)
210        else:
211            while True:
212                line = self._process.stdout.readline().decode('utf-8',
213                                                              errors='replace')
214
215                if not line:
216                    return
217                else:
218                    # Output the line without trailing \n and whitespace.
219                    self._on_output_callback(line.rstrip())
220
221    @staticmethod
222    def __start_process(command, **kwargs):
223        """A convenient wrapper function for starting the process."""
224        acts_logger = logging.getLogger()
225        acts_logger.debug(
226            'Starting command "%s" with kwargs %s', command, kwargs)
227        return subprocess.Popen(command, **kwargs)
228
229    def _exec_loop(self):
230        """Executes Popen in a loop.
231
232        When Popen terminates without stop() being called,
233        self._on_terminate_callback() will be called. The returned value from
234        _on_terminate_callback will then be used to determine if the loop should
235        continue and start up the process again. See set_on_terminate_callback()
236        for more information.
237        """
238        command = self._command
239        while True:
240            self._process = self.__start_process(command,
241                                                 stdout=subprocess.PIPE,
242                                                 stderr=subprocess.STDOUT,
243                                                 bufsize=1,
244                                                 **self._subprocess_kwargs)
245            self._redirection_thread = Thread(target=self._redirect_output)
246            self._redirection_thread.start()
247            self._process.wait()
248
249            if self._stopped:
250                logging.debug('The process for command %s was stopped.',
251                              command)
252                break
253            else:
254                logging.debug('The process for command %s terminated.',
255                              command)
256                # Wait for all output to be processed before sending
257                # _on_terminate_callback()
258                self._redirection_thread.join()
259                logging.debug('Beginning on_terminate_callback for %s.',
260                              command)
261                retry_value = self._on_terminate_callback(self._process)
262                if retry_value:
263                    command = retry_value
264                else:
265                    break
266