1#!/usr/bin/env python3 2# 3# Copyright 2018 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import logging 18import os 19import shlex 20import signal 21import subprocess 22import sys 23import time 24from threading import Thread 25 26_on_windows = sys.platform == 'win32' 27 28 29class ProcessError(Exception): 30 """Raised when invalid operations are run on a Process.""" 31 32 33class Process(object): 34 """A Process object used to run various commands. 35 36 Attributes: 37 _command: The initial command to run. 38 _subprocess_kwargs: The kwargs to send to Popen for more control over 39 execution. 40 _process: The subprocess.Popen object currently executing a process. 41 _listening_thread: The thread that is listening for the process to stop. 42 _redirection_thread: The thread that is redirecting process output. 43 _on_output_callback: The callback to call when output is received. 44 _on_terminate_callback: The callback to call when the process terminates 45 without stop() being called first. 46 _started: Whether or not start() was called. 47 _stopped: Whether or not stop() was called. 48 """ 49 50 def __init__(self, command, **kwargs): 51 """Creates a Process object. 52 53 Note that this constructor does not begin the process. To start the 54 process, use Process.start(). 55 """ 56 # Split command string into list if shell=True is not specified 57 if not kwargs.get('shell', False) and isinstance(command, str): 58 command = shlex.split(command) 59 self._command = command 60 self._subprocess_kwargs = kwargs 61 if _on_windows: 62 self._subprocess_kwargs['creationflags'] = ( 63 subprocess.CREATE_NEW_PROCESS_GROUP) 64 else: 65 self._subprocess_kwargs['start_new_session'] = True 66 self._process = None 67 68 self._listening_thread = None 69 self._redirection_thread = None 70 self._on_output_callback = lambda *args, **kw: None 71 self._binary_output = False 72 self._on_terminate_callback = lambda *args, **kw: '' 73 74 self._started = False 75 self._stopped = False 76 77 def set_on_output_callback(self, on_output_callback, binary=False): 78 """Sets the on_output_callback function. 79 80 Args: 81 on_output_callback: The function to be called when output is sent to 82 the output. The output callback has the following signature: 83 84 >>> def on_output_callback(output_line): 85 >>> return None 86 87 binary: If True, read the process output as raw binary. 88 Returns: 89 self 90 """ 91 self._on_output_callback = on_output_callback 92 self._binary_output = binary 93 return self 94 95 def set_on_terminate_callback(self, on_terminate_callback): 96 """Sets the on_self_terminate callback function. 97 98 Args: 99 on_terminate_callback: The function to be called when the process 100 has terminated on its own. The callback has the following 101 signature: 102 103 >>> def on_self_terminate_callback(popen_process): 104 >>> return 'command to run' or None 105 106 If a string is returned, the string returned will be the command 107 line used to run the command again. If None is returned, the 108 process will end without restarting. 109 110 Returns: 111 self 112 """ 113 self._on_terminate_callback = on_terminate_callback 114 return self 115 116 def start(self): 117 """Starts the process's execution.""" 118 if self._started: 119 raise ProcessError('Process has already started.') 120 self._started = True 121 self._process = None 122 123 self._listening_thread = Thread(target=self._exec_loop) 124 self._listening_thread.start() 125 126 time_up_at = time.time() + 1 127 128 while self._process is None: 129 if time.time() > time_up_at: 130 raise OSError('Unable to open process!') 131 132 self._stopped = False 133 134 @staticmethod 135 def _get_timeout_left(timeout, start_time): 136 return max(.1, timeout - (time.time() - start_time)) 137 138 def is_running(self): 139 """Checks that the underlying Popen process is still running 140 141 Returns: 142 True if the process is running. 143 """ 144 return self._process is not None and self._process.poll() is None 145 146 def _join_threads(self): 147 """Waits for the threads associated with the process to terminate.""" 148 if self._listening_thread is not None: 149 self._listening_thread.join() 150 self._listening_thread = None 151 152 if self._redirection_thread is not None: 153 self._redirection_thread.join() 154 self._redirection_thread = None 155 156 def _kill_process(self): 157 """Kills the underlying process/process group. Implementation is 158 platform-dependent.""" 159 if _on_windows: 160 subprocess.check_call('taskkill /F /T /PID %s' % self._process.pid) 161 else: 162 pgid = os.getpgid(self._process.pid) 163 os.killpg(pgid, signal.SIGKILL) 164 165 def wait(self, kill_timeout=60.0): 166 """Waits for the process to finish execution. 167 168 If the process has reached the kill_timeout, the process will be killed 169 instead. 170 171 Note: the on_self_terminate callback will NOT be called when calling 172 this function. 173 174 Args: 175 kill_timeout: The amount of time to wait until killing the process. 176 """ 177 if self._stopped: 178 raise ProcessError('Process is already being stopped.') 179 self._stopped = True 180 181 try: 182 self._process.wait(kill_timeout) 183 except subprocess.TimeoutExpired: 184 self._kill_process() 185 finally: 186 self._join_threads() 187 self._started = False 188 189 def stop(self): 190 """Stops the process. 191 192 This command is effectively equivalent to kill, but gives time to clean 193 up any related work on the process, such as output redirection. 194 195 Note: the on_self_terminate callback will NOT be called when calling 196 this function. 197 """ 198 self.wait(0) 199 200 def _redirect_output(self): 201 """Redirects the output from the command into the on_output_callback.""" 202 if self._binary_output: 203 while True: 204 data = self._process.stdout.read(1024) 205 206 if not data: 207 return 208 else: 209 self._on_output_callback(data) 210 else: 211 while True: 212 line = self._process.stdout.readline().decode('utf-8', 213 errors='replace') 214 215 if not line: 216 return 217 else: 218 # Output the line without trailing \n and whitespace. 219 self._on_output_callback(line.rstrip()) 220 221 @staticmethod 222 def __start_process(command, **kwargs): 223 """A convenient wrapper function for starting the process.""" 224 acts_logger = logging.getLogger() 225 acts_logger.debug( 226 'Starting command "%s" with kwargs %s', command, kwargs) 227 return subprocess.Popen(command, **kwargs) 228 229 def _exec_loop(self): 230 """Executes Popen in a loop. 231 232 When Popen terminates without stop() being called, 233 self._on_terminate_callback() will be called. The returned value from 234 _on_terminate_callback will then be used to determine if the loop should 235 continue and start up the process again. See set_on_terminate_callback() 236 for more information. 237 """ 238 command = self._command 239 while True: 240 self._process = self.__start_process(command, 241 stdout=subprocess.PIPE, 242 stderr=subprocess.STDOUT, 243 bufsize=1, 244 **self._subprocess_kwargs) 245 self._redirection_thread = Thread(target=self._redirect_output) 246 self._redirection_thread.start() 247 self._process.wait() 248 249 if self._stopped: 250 logging.debug('The process for command %s was stopped.', 251 command) 252 break 253 else: 254 logging.debug('The process for command %s terminated.', 255 command) 256 # Wait for all output to be processed before sending 257 # _on_terminate_callback() 258 self._redirection_thread.join() 259 logging.debug('Beginning on_terminate_callback for %s.', 260 command) 261 retry_value = self._on_terminate_callback(self._process) 262 if retry_value: 263 command = retry_value 264 else: 265 break 266