1 /*
2  * Copyright (C) 2014 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.camera.one.v2.sharedimagereader.imagedistributor;
18 
19 import com.android.camera.async.BufferQueue;
20 import com.android.camera.async.BufferQueueController;
21 import com.android.camera.debug.Log;
22 import com.android.camera.debug.Logger;
23 import com.android.camera.one.v2.camera2proxy.ImageProxy;
24 
25 import java.util.ArrayList;
26 import java.util.HashSet;
27 import java.util.List;
28 import java.util.Set;
29 
30 import javax.annotation.ParametersAreNonnullByDefault;
31 import javax.annotation.concurrent.GuardedBy;
32 
33 /**
34  * Distributes incoming images to output {@link BufferQueueController}s
35  * according to their timestamp.
36  */
37 @ParametersAreNonnullByDefault
38 class ImageDistributorImpl implements ImageDistributor {
39     /**
40      * An input timestamp stream and an output image stream to receive images
41      * with timestamps which match those found in the input stream.
42      */
43     private static class DispatchRecord {
44         public final BufferQueue<Long> timestampBufferQueue;
45         public final BufferQueueController<ImageProxy> imageStream;
46 
DispatchRecord(BufferQueue<Long> timestampBufferQueue, BufferQueueController<ImageProxy> imageStream)47         private DispatchRecord(BufferQueue<Long> timestampBufferQueue,
48                 BufferQueueController<ImageProxy> imageStream) {
49             this.timestampBufferQueue = timestampBufferQueue;
50             this.imageStream = imageStream;
51         }
52     }
53 
54     private final Logger mLogger;
55 
56     /**
57      * Contains pairs mapping {@link BufferQueue}s of timestamps of images to
58      * the {@link BufferQueueController} to receive images with those
59      * timestamps.
60      */
61     @GuardedBy("mDispatchTable")
62     private final Set<DispatchRecord> mDispatchTable;
63 
64     /**
65      * A stream to consume timestamps for all images captured by the underlying
66      * device. This is used as a kind of clock-signal to indicate when timestamp
67      * streams for entries in the {@link #mDispatchTable} are up-to-date.
68      */
69     private final BufferQueue<Long> mGlobalTimestampBufferQueue;
70 
71     /*
72      * @param globalTimestampStream A stream of timestamps for every capture
73      * processed by the underlying {@link CaptureSession}. This is used to
74      * synchronize all of the timestamp streams associated with each added
75      * output stream.
76      */
ImageDistributorImpl(Logger.Factory logFactory, BufferQueue<Long> globalTimestampBufferQueue)77     public ImageDistributorImpl(Logger.Factory logFactory,
78             BufferQueue<Long> globalTimestampBufferQueue) {
79         mLogger = logFactory.create(new Log.Tag("ImgDistributorImpl"));
80         mGlobalTimestampBufferQueue = globalTimestampBufferQueue;
81         mDispatchTable = new HashSet<>();
82     }
83 
84     /**
85      * Distributes the image to all added routes according to timestamp. Note
86      * that this waits until the global timestamp stream indicates that the next
87      * image has been captured to ensure that the timestamp streams for all
88      * routes are up-to-date.
89      * <p>
90      * If interrupted, this will close the image and return.
91      * </p>
92      * It is assumed that incoming images will have unique, increasing
93      * timestamps.
94      *
95      * @param image The image to distribute.
96      */
distributeImage(ImageProxy image)97     public void distributeImage(ImageProxy image) {
98         final long timestamp = image.getTimestamp();
99 
100         // Wait until the global timestamp stream indicates that either the
101         // *next* image has been captured, or the stream has been closed. Both
102         // of these conditions are sufficient to guarantee that all other
103         // timestamp streams should have an entry for the *current* image's
104         // timestamp (if the associated image stream needs the image). Note that
105         // this assumes that {@link #mGlobalImageTimestamp} and each timestamp
106         // stream associated with a {@link DispatchRecord} are updated on the
107         // same thread in order.
108         try {
109             while (true) {
110                 if (mGlobalTimestampBufferQueue.getNext() > timestamp) {
111                     break;
112                 }
113             }
114         } catch (InterruptedException e) {
115             image.close();
116             return;
117         } catch (BufferQueue.BufferQueueClosedException e) {
118             // If the stream is closed, then all other timestamp streams must be
119             // up-to-date.
120         }
121 
122         List<BufferQueueController<ImageProxy>> streamsToReceiveImage = new ArrayList<>();
123         Set<DispatchRecord> deadRecords = new HashSet<>();
124 
125         // mDispatchTable may be modified in {@link #addRoute} while iterating,
126         // so to avoid unnecessary locking, make a copy to iterate over.
127         Set<DispatchRecord> recordsToProcess;
128         synchronized (mDispatchTable) {
129             recordsToProcess = new HashSet<>(mDispatchTable);
130         }
131         for (DispatchRecord dispatchRecord : recordsToProcess) {
132             // If either the input timestampBufferQueue or the output
133             // imageStream is closed, then the route can be removed.
134             if (dispatchRecord.timestampBufferQueue.isClosed() ||
135                     dispatchRecord.imageStream.isClosed()) {
136                 deadRecords.add(dispatchRecord);
137             }
138             Long requestedImageTimestamp = dispatchRecord.timestampBufferQueue.peekNext();
139             while (requestedImageTimestamp != null && requestedImageTimestamp < timestamp) {
140                 // This branch should only run if there is an error in the
141                 // camera framework/driver. (Technically, we could get here if
142                 // an ImageStream was not registered with the ImageDistributor
143                 // before the image arrived, or if the timestamp stream was not
144                 // updated appropriately. Both of these conditions would be
145                 // serious app-level bugs, however, and are less likely
146                 // than a framework/driver error.)
147                 // If the current image is newer than the image requested by a
148                 // stream in the dispatch table, then the driver must have
149                 // skipped the requested image.
150 
151                 mLogger.e(String.format("Image (%d) expected, but never received!  Instead, " +
152                         "received (%d)!  This is likely a camera driver error.",
153                         requestedImageTimestamp, timestamp), new RuntimeException());
154 
155                 // TODO There may be threads blocked, waiting to receive the
156                 // requested image.
157                 // This should propagate the absent-image through
158                 // dispatchRecord.imageStream to avoid starvation.
159 
160                 dispatchRecord.timestampBufferQueue.discardNext();
161                 requestedImageTimestamp = dispatchRecord.timestampBufferQueue.peekNext();
162             }
163             if (requestedImageTimestamp == null) {
164                 continue;
165             }
166             if (requestedImageTimestamp == timestamp) {
167                 // Discard the value we just looked at.
168                 dispatchRecord.timestampBufferQueue.discardNext();
169                 streamsToReceiveImage.add(dispatchRecord.imageStream);
170             }
171         }
172 
173         synchronized (mDispatchTable) {
174             mDispatchTable.removeAll(deadRecords);
175         }
176 
177         int streamsToReceiveImageSize = streamsToReceiveImage.size();
178         // If nobody needs the image, just close the image.
179         if (streamsToReceiveImageSize == 0) {
180             image.close();
181             return;
182         }
183 
184         RefCountedImageProxy sharedImage = new RefCountedImageProxy(image,
185                 streamsToReceiveImageSize);
186         for (BufferQueueController<ImageProxy> outputStream : streamsToReceiveImage) {
187             // Wrap shared image to ensure that *each* stream must close the
188             // image before the underlying reference count is decremented,
189             // regardless of how many times it is closed from each stream.
190             ImageProxy singleCloseImage = new SingleCloseImageProxy(sharedImage);
191             outputStream.update(singleCloseImage);
192         }
193     }
194 
195     /**
196      * Registers the given output image stream as a destination for images with
197      * timestamps present in inputTimestampStream. Note that
198      * inputTimestampStream is assumed to be synchronized with the global
199      * timestamp stream such that it must always contain a timestamp for a
200      * requested image before the global timestamp stream provides the timestamp
201      * for the *next* image.
202      *
203      * @param inputTimestampBufferQueue A stream of timestamps of images to be
204      *            routed to the given output stream. This should be closed by
205      *            the producer when no more images are expected.
206      * @param outputStream The image stream on which to add images.
207      */
208     @Override
addRoute(BufferQueue<Long> inputTimestampBufferQueue, BufferQueueController<ImageProxy> outputStream)209     public void addRoute(BufferQueue<Long> inputTimestampBufferQueue,
210             BufferQueueController<ImageProxy> outputStream) {
211         synchronized (mDispatchTable) {
212             mDispatchTable.add(new DispatchRecord(inputTimestampBufferQueue, outputStream));
213         }
214     }
215 }
216