1 /*
2  * Copyright (C) 2011 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.gallery3d.util;
18 
19 import com.android.gallery3d.common.Utils;
20 import com.android.gallery3d.util.ThreadPool.Job;
21 import com.android.gallery3d.util.ThreadPool.JobContext;
22 
23 import java.util.LinkedList;
24 
25 // Limit the number of concurrent jobs that has been submitted into a ThreadPool
26 @SuppressWarnings("rawtypes")
27 public class JobLimiter implements FutureListener {
28     private static final String TAG = "JobLimiter";
29 
30     // State Transition:
31     //      INIT -> DONE, CANCELLED
32     //      DONE -> CANCELLED
33     private static final int STATE_INIT = 0;
34     private static final int STATE_DONE = 1;
35     private static final int STATE_CANCELLED = 2;
36 
37     private final LinkedList<JobWrapper<?>> mJobs = new LinkedList<JobWrapper<?>>();
38     private final ThreadPool mPool;
39     private int mLimit;
40 
41     private static class JobWrapper<T> implements Future<T>, Job<T> {
42         private int mState = STATE_INIT;
43         private Job<T> mJob;
44         private Future<T> mDelegate;
45         private FutureListener<T> mListener;
46         private T mResult;
47 
JobWrapper(Job<T> job, FutureListener<T> listener)48         public JobWrapper(Job<T> job, FutureListener<T> listener) {
49             mJob = job;
50             mListener = listener;
51         }
52 
setFuture(Future<T> future)53         public synchronized void setFuture(Future<T> future) {
54             if (mState != STATE_INIT) return;
55             mDelegate = future;
56         }
57 
58         @Override
cancel()59         public void cancel() {
60             FutureListener<T> listener = null;
61             synchronized (this) {
62                 if (mState != STATE_DONE) {
63                     listener = mListener;
64                     mJob = null;
65                     mListener = null;
66                     if (mDelegate != null) {
67                         mDelegate.cancel();
68                         mDelegate = null;
69                     }
70                 }
71                 mState = STATE_CANCELLED;
72                 mResult = null;
73                 notifyAll();
74             }
75             if (listener != null) listener.onFutureDone(this);
76         }
77 
78         @Override
isCancelled()79         public synchronized boolean isCancelled() {
80             return mState == STATE_CANCELLED;
81         }
82 
83         @Override
isDone()84         public boolean isDone() {
85             // Both CANCELLED AND DONE is considered as done
86             return mState !=  STATE_INIT;
87         }
88 
89         @Override
get()90         public synchronized T get() {
91             while (mState == STATE_INIT) {
92                 // handle the interrupted exception of wait()
93                 Utils.waitWithoutInterrupt(this);
94             }
95             return mResult;
96         }
97 
98         @Override
waitDone()99         public void waitDone() {
100             get();
101         }
102 
103         @Override
run(JobContext jc)104         public T run(JobContext jc) {
105             Job<T> job = null;
106             synchronized (this) {
107                 if (mState == STATE_CANCELLED) return null;
108                 job = mJob;
109             }
110             T result  = null;
111             try {
112                 result = job.run(jc);
113             } catch (Throwable t) {
114                 Log.w(TAG, "error executing job: " + job, t);
115             }
116             FutureListener<T> listener = null;
117             synchronized (this) {
118                 if (mState == STATE_CANCELLED) return null;
119                 mState = STATE_DONE;
120                 listener = mListener;
121                 mListener = null;
122                 mJob = null;
123                 mResult = result;
124                 notifyAll();
125             }
126             if (listener != null) listener.onFutureDone(this);
127             return result;
128         }
129     }
130 
JobLimiter(ThreadPool pool, int limit)131     public JobLimiter(ThreadPool pool, int limit) {
132         mPool = Utils.checkNotNull(pool);
133         mLimit = limit;
134     }
135 
submit(Job<T> job, FutureListener<T> listener)136     public synchronized <T> Future<T> submit(Job<T> job, FutureListener<T> listener) {
137         JobWrapper<T> future = new JobWrapper<T>(Utils.checkNotNull(job), listener);
138         mJobs.addLast(future);
139         submitTasksIfAllowed();
140         return future;
141     }
142 
143     @SuppressWarnings({"rawtypes", "unchecked"})
submitTasksIfAllowed()144     private void submitTasksIfAllowed() {
145         while (mLimit > 0 && !mJobs.isEmpty()) {
146             JobWrapper wrapper = mJobs.removeFirst();
147             if (!wrapper.isCancelled()) {
148                 --mLimit;
149                 wrapper.setFuture(mPool.submit(wrapper, this));
150             }
151         }
152     }
153 
154     @Override
onFutureDone(Future future)155     public synchronized void onFutureDone(Future future) {
156         ++mLimit;
157         submitTasksIfAllowed();
158     }
159 }
160