1 /* 2 * Copyright (C) 2017 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License 15 */ 16 17 package com.android.voicemail.impl.scheduling; 18 19 import android.annotation.TargetApi; 20 import android.content.Context; 21 import android.content.Intent; 22 import android.os.Build.VERSION_CODES; 23 import android.os.Bundle; 24 import android.os.Handler; 25 import android.os.HandlerThread; 26 import android.os.Looper; 27 import android.os.Message; 28 import android.support.annotation.MainThread; 29 import android.support.annotation.Nullable; 30 import android.support.annotation.VisibleForTesting; 31 import android.support.annotation.WorkerThread; 32 import com.android.voicemail.impl.Assert; 33 import com.android.voicemail.impl.NeededForTesting; 34 import com.android.voicemail.impl.VvmLog; 35 import com.android.voicemail.impl.scheduling.TaskQueue.NextTask; 36 import java.util.List; 37 38 /** 39 * A singleton to queue and run {@link Task} with the {@link android.app.job.JobScheduler}. A task 40 * is queued by sending a broadcast to {@link TaskReceiver}. The intent should contain enough 41 * information in {@link Intent#getExtras()} to construct the task (see {@link 42 * Tasks#createIntent(Context, Class)}). 43 * 44 * <p>The executor will only exist when {@link TaskSchedulerJobService} is running. 45 * 46 * <p>All tasks are ran in the background with a wakelock being held by the {@link 47 * android.app.job.JobScheduler}, which is between {@link #onStartJob(Job, List)} and {@link 48 * #finishJobAsync()}. The {@link TaskSchedulerJobService} also has a {@link TaskQueue}, but the 49 * data is stored in the {@link android.app.job.JobScheduler} instead of the process memory, so if 50 * the process is killed the queued tasks will be restored. If a new task is added, a new {@link 51 * TaskSchedulerJobService} will be scheduled to run the task. If the job is already scheduled, the 52 * new task will be pushed into the queue of the scheduled job. If the job is already running, the 53 * job will be queued in process memory. 54 * 55 * <p>Only one task will be ran at a time, and same task cannot exist in the queue at the same time. 56 * Refer to {@link TaskQueue} for queuing and execution order. 57 * 58 * <p>If there are still tasks in the queue but none are executable immediately, the service will 59 * enter a "sleep", pushing all remaining task into a new job and end the current job. 60 * 61 * <p>The executor will be started when {@link TaskSchedulerJobService} is running, and stopped when 62 * there are no more tasks in the queue or when the executor is put to sleep. 63 * 64 * <p>{@link android.app.job.JobScheduler} is not used directly due to: 65 * 66 * <ul> 67 * <li>The {@link android.telecom.PhoneAccountHandle} used to differentiate task can not be easily 68 * mapped into an integer for job id 69 * <li>A job cannot be mutated to store information such as retry count. 70 * </ul> 71 */ 72 @TargetApi(VERSION_CODES.O) 73 final class TaskExecutor { 74 75 /** 76 * An entity that holds execution resources for the {@link TaskExecutor} to run, usually a {@link 77 * android.app.job.JobService}. 78 */ 79 interface Job { 80 81 /** 82 * Signals to Job to end and release its' resources. This is an asynchronous call and may not 83 * take effect immediately. 84 */ 85 @MainThread finishAsync()86 void finishAsync(); 87 88 /** Whether the call to {@link #finishAsync()} has actually taken effect. */ 89 @MainThread isFinished()90 boolean isFinished(); 91 } 92 93 private static final String TAG = "VvmTaskExecutor"; 94 95 private static final int READY_TOLERANCE_MILLISECONDS = 100; 96 97 /** 98 * Threshold to determine whether to do a short or long sleep when a task is scheduled in the 99 * future. 100 * 101 * <p>A short sleep will continue the job and use {@link Handler#postDelayed(Runnable, long)} to 102 * wait for the next task. 103 * 104 * <p>A long sleep will finish the job and schedule a new one. The exact execution time is 105 * subjected to {@link android.app.job.JobScheduler} battery optimization, and is not exact. 106 */ 107 private static final int SHORT_SLEEP_THRESHOLD_MILLISECONDS = 10_000; 108 /** 109 * When there are no more tasks to be run the service should be stopped. But when all tasks has 110 * finished there might still be more tasks in the message queue waiting to be processed, 111 * especially the ones submitted in {@link Task#onCompleted()}. Wait for a while before stopping 112 * the service to make sure there are no pending messages. 113 */ 114 private static final int STOP_DELAY_MILLISECONDS = 5_000; 115 116 /** Interval between polling of whether the job is finished. */ 117 private static final int TERMINATE_POLLING_INTERVAL_MILLISECONDS = 1_000; 118 119 // The thread to run tasks on 120 private final WorkerThreadHandler workerThreadHandler; 121 122 private static TaskExecutor instance; 123 124 /** 125 * Used by tests to turn task handling into a single threaded process by calling {@link 126 * Handler#handleMessage(Message)} directly 127 */ 128 private MessageSender messageSender = new MessageSender(); 129 130 private final MainThreadHandler mainThreadHandler; 131 132 private final Context appContext; 133 134 /** Main thread only, access through {@link #getTasks()} */ 135 private final TaskQueue tasks = new TaskQueue(); 136 137 private boolean isWorkerThreadBusy = false; 138 139 private boolean isTerminating = false; 140 141 private Job job; 142 143 private final Runnable stopServiceWithDelay = 144 new Runnable() { 145 @MainThread 146 @Override 147 public void run() { 148 VvmLog.i(TAG, "Stopping service"); 149 if (!isJobRunning() || isTerminating()) { 150 VvmLog.e(TAG, "Service already stopped"); 151 return; 152 } 153 scheduleJobAndTerminate(0, true); 154 } 155 }; 156 157 /** 158 * Reschedule the {@link TaskSchedulerJobService} and terminate the executor when the {@link Job} 159 * is truly finished. If the job is still not finished, this runnable will requeue itself on the 160 * main thread. The requeue is only expected to happen a few times. 161 */ 162 private class JobFinishedPoller implements Runnable { 163 164 private final long delayMillis; 165 private final boolean isNewJob; 166 private int invocationCounter = 0; 167 JobFinishedPoller(long delayMillis, boolean isNewJob)168 JobFinishedPoller(long delayMillis, boolean isNewJob) { 169 this.delayMillis = delayMillis; 170 this.isNewJob = isNewJob; 171 } 172 173 @Override run()174 public void run() { 175 // The job should be finished relatively quickly. Assert to make sure this assumption is true. 176 Assert.isTrue(invocationCounter < 10); 177 invocationCounter++; 178 if (job.isFinished()) { 179 VvmLog.i("JobFinishedPoller.run", "Job finished"); 180 if (!getTasks().isEmpty()) { 181 TaskSchedulerJobService.scheduleJob( 182 appContext, serializePendingTasks(), delayMillis, isNewJob); 183 tasks.clear(); 184 } 185 terminate(); 186 return; 187 } 188 VvmLog.w("JobFinishedPoller.run", "Job still running"); 189 mainThreadHandler.postDelayed(this, TERMINATE_POLLING_INTERVAL_MILLISECONDS); 190 } 191 }; 192 193 /** Should attempt to run the next task when a task has finished or been added. */ 194 private boolean taskAutoRunDisabledForTesting = false; 195 196 /** Handles execution of the background task in teh worker thread. */ 197 @VisibleForTesting 198 final class WorkerThreadHandler extends Handler { 199 200 public WorkerThreadHandler(Looper looper) { 201 super(looper); 202 } 203 @Override 204 @WorkerThread 205 public void handleMessage(Message msg) { 206 Assert.isNotMainThread(); 207 Task task = (Task) msg.obj; 208 try { 209 VvmLog.i(TAG, "executing task " + task); 210 task.onExecuteInBackgroundThread(); 211 } catch (Throwable throwable) { 212 VvmLog.e(TAG, "Exception while executing task " + task + ":", throwable); 213 } 214 215 Message schedulerMessage = mainThreadHandler.obtainMessage(); 216 schedulerMessage.obj = task; 217 messageSender.send(schedulerMessage); 218 } 219 } 220 221 /** Handles completion of the background task in the main thread. */ 222 @VisibleForTesting 223 final class MainThreadHandler extends Handler { 224 225 public MainThreadHandler(Looper looper) { 226 super(looper); 227 } 228 229 @Override 230 @MainThread 231 public void handleMessage(Message msg) { 232 Assert.isMainThread(); 233 Task task = (Task) msg.obj; 234 getTasks().remove(task); 235 task.onCompleted(); 236 isWorkerThreadBusy = false; 237 if (!isJobRunning() || isTerminating()) { 238 // TaskExecutor was terminated when the task is running in background, don't need to run the 239 // next task or terminate again 240 return; 241 } 242 maybeRunNextTask(); 243 } 244 } 245 246 /** Starts a new TaskExecutor. May only be called by {@link TaskSchedulerJobService}. */ 247 @MainThread 248 static void createRunningInstance(Context context) { 249 Assert.isMainThread(); 250 Assert.isTrue(instance == null); 251 instance = new TaskExecutor(context); 252 } 253 254 /** @return the currently running instance, or {@code null} if the executor is not running. */ 255 @MainThread 256 @Nullable 257 static TaskExecutor getRunningInstance() { 258 return instance; 259 } 260 261 private TaskExecutor(Context context) { 262 this.appContext = context.getApplicationContext(); 263 HandlerThread thread = new HandlerThread("VvmTaskExecutor"); 264 thread.start(); 265 266 workerThreadHandler = new WorkerThreadHandler(thread.getLooper()); 267 mainThreadHandler = new MainThreadHandler(Looper.getMainLooper()); 268 } 269 270 @VisibleForTesting 271 void terminate() { 272 VvmLog.i(TAG, "terminated"); 273 Assert.isMainThread(); 274 job = null; 275 workerThreadHandler.getLooper().quit(); 276 instance = null; 277 TaskReceiver.resendDeferredBroadcasts(appContext); 278 } 279 280 @MainThread 281 void addTask(Task task) { 282 Assert.isMainThread(); 283 getTasks().add(task); 284 VvmLog.i(TAG, task + " added"); 285 mainThreadHandler.removeCallbacks(stopServiceWithDelay); 286 maybeRunNextTask(); 287 } 288 289 @MainThread 290 @VisibleForTesting 291 TaskQueue getTasks() { 292 Assert.isMainThread(); 293 return tasks; 294 } 295 296 @MainThread 297 private void maybeRunNextTask() { 298 Assert.isMainThread(); 299 300 if (isWorkerThreadBusy) { 301 return; 302 } 303 if (taskAutoRunDisabledForTesting) { 304 // If taskAutoRunDisabledForTesting is true, runNextTask() must be explicitly called 305 // to run the next task. 306 return; 307 } 308 309 runNextTask(); 310 } 311 312 @VisibleForTesting 313 @MainThread 314 void runNextTask() { 315 Assert.isMainThread(); 316 if (getTasks().isEmpty()) { 317 prepareStop(); 318 return; 319 } 320 NextTask nextTask = getTasks().getNextTask(READY_TOLERANCE_MILLISECONDS); 321 322 if (nextTask.task != null) { 323 nextTask.task.onBeforeExecute(); 324 Message message = workerThreadHandler.obtainMessage(); 325 message.obj = nextTask.task; 326 isWorkerThreadBusy = true; 327 messageSender.send(message); 328 return; 329 } 330 VvmLog.i(TAG, "minimal wait time:" + nextTask.minimalWaitTimeMillis); 331 if (!taskAutoRunDisabledForTesting && nextTask.minimalWaitTimeMillis != null) { 332 // No tasks are currently ready. Sleep until the next one should be. 333 // If a new task is added during the sleep the service will wake immediately. 334 sleep(nextTask.minimalWaitTimeMillis); 335 } 336 } 337 338 @MainThread 339 private void sleep(long timeMillis) { 340 VvmLog.i(TAG, "sleep for " + timeMillis + " millis"); 341 if (timeMillis < SHORT_SLEEP_THRESHOLD_MILLISECONDS) { 342 mainThreadHandler.postDelayed( 343 new Runnable() { 344 @Override 345 public void run() { 346 maybeRunNextTask(); 347 } 348 }, 349 timeMillis); 350 return; 351 } 352 scheduleJobAndTerminate(timeMillis, false); 353 } 354 355 private List<Bundle> serializePendingTasks() { 356 return getTasks().toBundles(); 357 } 358 359 private void prepareStop() { 360 VvmLog.i( 361 TAG, 362 "no more tasks, stopping service if no task are added in " 363 + STOP_DELAY_MILLISECONDS 364 + " millis"); 365 mainThreadHandler.postDelayed(stopServiceWithDelay, STOP_DELAY_MILLISECONDS); 366 } 367 368 @NeededForTesting 369 static class MessageSender { 370 371 public void send(Message message) { 372 message.sendToTarget(); 373 } 374 } 375 376 @NeededForTesting 377 void setTaskAutoRunDisabledForTest(boolean value) { 378 taskAutoRunDisabledForTesting = value; 379 } 380 381 @NeededForTesting 382 void setMessageSenderForTest(MessageSender sender) { 383 messageSender = sender; 384 } 385 386 /** 387 * The {@link TaskSchedulerJobService} has started and all queued task should be executed in the 388 * worker thread. 389 */ 390 @MainThread 391 public void onStartJob(Job job, List<Bundle> pendingTasks) { 392 VvmLog.i(TAG, "onStartJob"); 393 this.job = job; 394 tasks.fromBundles(appContext, pendingTasks); 395 maybeRunNextTask(); 396 } 397 398 /** 399 * The {@link TaskSchedulerJobService} is being terminated by the system (timeout or network 400 * lost). A new job will be queued to resume all pending tasks. The current unfinished job may be 401 * ran again. 402 */ 403 @MainThread 404 public void onStopJob() { 405 VvmLog.e(TAG, "onStopJob"); 406 if (isJobRunning() && !isTerminating()) { 407 scheduleJobAndTerminate(0, true); 408 } 409 } 410 411 /** 412 * Send all pending tasks and schedule a new {@link TaskSchedulerJobService}. The current executor 413 * will start the termination process, but restarted when the scheduled job runs in the future. 414 * 415 * @param delayMillis the delay before stating the job, see {@link 416 * android.app.job.JobInfo.Builder#setMinimumLatency(long)}. This must be 0 if {@code 417 * isNewJob} is true. 418 * @param isNewJob a new job will be requested to run immediately, bypassing all requirements. 419 */ 420 @MainThread 421 @VisibleForTesting 422 void scheduleJobAndTerminate(long delayMillis, boolean isNewJob) { 423 Assert.isMainThread(); 424 finishJobAsync(); 425 mainThreadHandler.post(new JobFinishedPoller(delayMillis, isNewJob)); 426 } 427 428 /** 429 * Whether the TaskExecutor is still terminating. {@link TaskReceiver} should defer all new task 430 * until {@link #getRunningInstance()} returns {@code null} so a new job can be started. {@link 431 * #scheduleJobAndTerminate(long, boolean)} does not run immediately because the job can only be 432 * scheduled after the main thread has returned. The TaskExecutor will be in a intermediate state 433 * between scheduleJobAndTerminate() and terminate(). In this state, {@link #getRunningInstance()} 434 * returns non-null because it has not been fully stopped yet, but the TaskExecutor cannot do 435 * anything. A new job should not be scheduled either because the current job might still be 436 * running. 437 */ 438 @MainThread 439 public boolean isTerminating() { 440 return isTerminating; 441 } 442 443 /** 444 * Signals {@link TaskSchedulerJobService} the current session of tasks has finished, and the wake 445 * lock can be released. Note: this only takes effect after the main thread has been returned. If 446 * a new job need to be scheduled, it should be posted on the main thread handler instead of 447 * calling directly. 448 */ 449 @MainThread 450 private void finishJobAsync() { 451 Assert.isTrue(!isTerminating()); 452 Assert.isMainThread(); 453 VvmLog.i(TAG, "finishing Job"); 454 job.finishAsync(); 455 isTerminating = true; 456 mainThreadHandler.removeCallbacks(stopServiceWithDelay); 457 } 458 459 private boolean isJobRunning() { 460 return job != null; 461 } 462 } 463