1 /* 2 * Copyright (C) 2011 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.gallery3d.util; 18 19 import com.android.gallery3d.common.Utils; 20 import com.android.gallery3d.util.ThreadPool.Job; 21 import com.android.gallery3d.util.ThreadPool.JobContext; 22 23 import java.util.LinkedList; 24 25 // Limit the number of concurrent jobs that has been submitted into a ThreadPool 26 @SuppressWarnings("rawtypes") 27 public class JobLimiter implements FutureListener { 28 private static final String TAG = "JobLimiter"; 29 30 // State Transition: 31 // INIT -> DONE, CANCELLED 32 // DONE -> CANCELLED 33 private static final int STATE_INIT = 0; 34 private static final int STATE_DONE = 1; 35 private static final int STATE_CANCELLED = 2; 36 37 private final LinkedList<JobWrapper<?>> mJobs = new LinkedList<JobWrapper<?>>(); 38 private final ThreadPool mPool; 39 private int mLimit; 40 41 private static class JobWrapper<T> implements Future<T>, Job<T> { 42 private int mState = STATE_INIT; 43 private Job<T> mJob; 44 private Future<T> mDelegate; 45 private FutureListener<T> mListener; 46 private T mResult; 47 JobWrapper(Job<T> job, FutureListener<T> listener)48 public JobWrapper(Job<T> job, FutureListener<T> listener) { 49 mJob = job; 50 mListener = listener; 51 } 52 setFuture(Future<T> future)53 public synchronized void setFuture(Future<T> future) { 54 if (mState != STATE_INIT) return; 55 mDelegate = future; 56 } 57 58 @Override cancel()59 public void cancel() { 60 FutureListener<T> listener = null; 61 synchronized (this) { 62 if (mState != STATE_DONE) { 63 listener = mListener; 64 mJob = null; 65 mListener = null; 66 if (mDelegate != null) { 67 mDelegate.cancel(); 68 mDelegate = null; 69 } 70 } 71 mState = STATE_CANCELLED; 72 mResult = null; 73 notifyAll(); 74 } 75 if (listener != null) listener.onFutureDone(this); 76 } 77 78 @Override isCancelled()79 public synchronized boolean isCancelled() { 80 return mState == STATE_CANCELLED; 81 } 82 83 @Override isDone()84 public boolean isDone() { 85 // Both CANCELLED AND DONE is considered as done 86 return mState != STATE_INIT; 87 } 88 89 @Override get()90 public synchronized T get() { 91 while (mState == STATE_INIT) { 92 // handle the interrupted exception of wait() 93 Utils.waitWithoutInterrupt(this); 94 } 95 return mResult; 96 } 97 98 @Override waitDone()99 public void waitDone() { 100 get(); 101 } 102 103 @Override run(JobContext jc)104 public T run(JobContext jc) { 105 Job<T> job = null; 106 synchronized (this) { 107 if (mState == STATE_CANCELLED) return null; 108 job = mJob; 109 } 110 T result = null; 111 try { 112 result = job.run(jc); 113 } catch (Throwable t) { 114 Log.w(TAG, "error executing job: " + job, t); 115 } 116 FutureListener<T> listener = null; 117 synchronized (this) { 118 if (mState == STATE_CANCELLED) return null; 119 mState = STATE_DONE; 120 listener = mListener; 121 mListener = null; 122 mJob = null; 123 mResult = result; 124 notifyAll(); 125 } 126 if (listener != null) listener.onFutureDone(this); 127 return result; 128 } 129 } 130 JobLimiter(ThreadPool pool, int limit)131 public JobLimiter(ThreadPool pool, int limit) { 132 mPool = Utils.checkNotNull(pool); 133 mLimit = limit; 134 } 135 submit(Job<T> job, FutureListener<T> listener)136 public synchronized <T> Future<T> submit(Job<T> job, FutureListener<T> listener) { 137 JobWrapper<T> future = new JobWrapper<T>(Utils.checkNotNull(job), listener); 138 mJobs.addLast(future); 139 submitTasksIfAllowed(); 140 return future; 141 } 142 143 @SuppressWarnings({"rawtypes", "unchecked"}) submitTasksIfAllowed()144 private void submitTasksIfAllowed() { 145 while (mLimit > 0 && !mJobs.isEmpty()) { 146 JobWrapper wrapper = mJobs.removeFirst(); 147 if (!wrapper.isCancelled()) { 148 --mLimit; 149 wrapper.setFuture(mPool.submit(wrapper, this)); 150 } 151 } 152 } 153 154 @Override onFutureDone(Future future)155 public synchronized void onFutureDone(Future future) { 156 ++mLimit; 157 submitTasksIfAllowed(); 158 } 159 } 160