1 /* 2 * Copyright (C) 2014 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.camera.one.v2.sharedimagereader.imagedistributor; 18 19 import com.android.camera.async.BufferQueue; 20 import com.android.camera.async.BufferQueueController; 21 import com.android.camera.debug.Log; 22 import com.android.camera.debug.Logger; 23 import com.android.camera.one.v2.camera2proxy.ImageProxy; 24 25 import java.util.ArrayList; 26 import java.util.HashSet; 27 import java.util.List; 28 import java.util.Set; 29 30 import javax.annotation.ParametersAreNonnullByDefault; 31 import javax.annotation.concurrent.GuardedBy; 32 33 /** 34 * Distributes incoming images to output {@link BufferQueueController}s 35 * according to their timestamp. 36 */ 37 @ParametersAreNonnullByDefault 38 class ImageDistributorImpl implements ImageDistributor { 39 /** 40 * An input timestamp stream and an output image stream to receive images 41 * with timestamps which match those found in the input stream. 42 */ 43 private static class DispatchRecord { 44 public final BufferQueue<Long> timestampBufferQueue; 45 public final BufferQueueController<ImageProxy> imageStream; 46 DispatchRecord(BufferQueue<Long> timestampBufferQueue, BufferQueueController<ImageProxy> imageStream)47 private DispatchRecord(BufferQueue<Long> timestampBufferQueue, 48 BufferQueueController<ImageProxy> imageStream) { 49 this.timestampBufferQueue = timestampBufferQueue; 50 this.imageStream = imageStream; 51 } 52 } 53 54 private final Logger mLogger; 55 56 /** 57 * Contains pairs mapping {@link BufferQueue}s of timestamps of images to 58 * the {@link BufferQueueController} to receive images with those 59 * timestamps. 60 */ 61 @GuardedBy("mDispatchTable") 62 private final Set<DispatchRecord> mDispatchTable; 63 64 /** 65 * A stream to consume timestamps for all images captured by the underlying 66 * device. This is used as a kind of clock-signal to indicate when timestamp 67 * streams for entries in the {@link #mDispatchTable} are up-to-date. 68 */ 69 private final BufferQueue<Long> mGlobalTimestampBufferQueue; 70 71 /* 72 * @param globalTimestampStream A stream of timestamps for every capture 73 * processed by the underlying {@link CaptureSession}. This is used to 74 * synchronize all of the timestamp streams associated with each added 75 * output stream. 76 */ ImageDistributorImpl(Logger.Factory logFactory, BufferQueue<Long> globalTimestampBufferQueue)77 public ImageDistributorImpl(Logger.Factory logFactory, 78 BufferQueue<Long> globalTimestampBufferQueue) { 79 mLogger = logFactory.create(new Log.Tag("ImgDistributorImpl")); 80 mGlobalTimestampBufferQueue = globalTimestampBufferQueue; 81 mDispatchTable = new HashSet<>(); 82 } 83 84 /** 85 * Distributes the image to all added routes according to timestamp. Note 86 * that this waits until the global timestamp stream indicates that the next 87 * image has been captured to ensure that the timestamp streams for all 88 * routes are up-to-date. 89 * <p> 90 * If interrupted, this will close the image and return. 91 * </p> 92 * It is assumed that incoming images will have unique, increasing 93 * timestamps. 94 * 95 * @param image The image to distribute. 96 */ distributeImage(ImageProxy image)97 public void distributeImage(ImageProxy image) { 98 final long timestamp = image.getTimestamp(); 99 100 // Wait until the global timestamp stream indicates that either the 101 // *next* image has been captured, or the stream has been closed. Both 102 // of these conditions are sufficient to guarantee that all other 103 // timestamp streams should have an entry for the *current* image's 104 // timestamp (if the associated image stream needs the image). Note that 105 // this assumes that {@link #mGlobalImageTimestamp} and each timestamp 106 // stream associated with a {@link DispatchRecord} are updated on the 107 // same thread in order. 108 try { 109 while (true) { 110 if (mGlobalTimestampBufferQueue.getNext() > timestamp) { 111 break; 112 } 113 } 114 } catch (InterruptedException e) { 115 image.close(); 116 return; 117 } catch (BufferQueue.BufferQueueClosedException e) { 118 // If the stream is closed, then all other timestamp streams must be 119 // up-to-date. 120 } 121 122 List<BufferQueueController<ImageProxy>> streamsToReceiveImage = new ArrayList<>(); 123 Set<DispatchRecord> deadRecords = new HashSet<>(); 124 125 // mDispatchTable may be modified in {@link #addRoute} while iterating, 126 // so to avoid unnecessary locking, make a copy to iterate over. 127 Set<DispatchRecord> recordsToProcess; 128 synchronized (mDispatchTable) { 129 recordsToProcess = new HashSet<>(mDispatchTable); 130 } 131 for (DispatchRecord dispatchRecord : recordsToProcess) { 132 // If either the input timestampBufferQueue or the output 133 // imageStream is closed, then the route can be removed. 134 if (dispatchRecord.timestampBufferQueue.isClosed() || 135 dispatchRecord.imageStream.isClosed()) { 136 deadRecords.add(dispatchRecord); 137 } 138 Long requestedImageTimestamp = dispatchRecord.timestampBufferQueue.peekNext(); 139 while (requestedImageTimestamp != null && requestedImageTimestamp < timestamp) { 140 // This branch should only run if there is an error in the 141 // camera framework/driver. (Technically, we could get here if 142 // an ImageStream was not registered with the ImageDistributor 143 // before the image arrived, or if the timestamp stream was not 144 // updated appropriately. Both of these conditions would be 145 // serious app-level bugs, however, and are less likely 146 // than a framework/driver error.) 147 // If the current image is newer than the image requested by a 148 // stream in the dispatch table, then the driver must have 149 // skipped the requested image. 150 151 mLogger.e(String.format("Image (%d) expected, but never received! Instead, " + 152 "received (%d)! This is likely a camera driver error.", 153 requestedImageTimestamp, timestamp), new RuntimeException()); 154 155 // TODO There may be threads blocked, waiting to receive the 156 // requested image. 157 // This should propagate the absent-image through 158 // dispatchRecord.imageStream to avoid starvation. 159 160 dispatchRecord.timestampBufferQueue.discardNext(); 161 requestedImageTimestamp = dispatchRecord.timestampBufferQueue.peekNext(); 162 } 163 if (requestedImageTimestamp == null) { 164 continue; 165 } 166 if (requestedImageTimestamp == timestamp) { 167 // Discard the value we just looked at. 168 dispatchRecord.timestampBufferQueue.discardNext(); 169 streamsToReceiveImage.add(dispatchRecord.imageStream); 170 } 171 } 172 173 synchronized (mDispatchTable) { 174 mDispatchTable.removeAll(deadRecords); 175 } 176 177 int streamsToReceiveImageSize = streamsToReceiveImage.size(); 178 // If nobody needs the image, just close the image. 179 if (streamsToReceiveImageSize == 0) { 180 image.close(); 181 return; 182 } 183 184 RefCountedImageProxy sharedImage = new RefCountedImageProxy(image, 185 streamsToReceiveImageSize); 186 for (BufferQueueController<ImageProxy> outputStream : streamsToReceiveImage) { 187 // Wrap shared image to ensure that *each* stream must close the 188 // image before the underlying reference count is decremented, 189 // regardless of how many times it is closed from each stream. 190 ImageProxy singleCloseImage = new SingleCloseImageProxy(sharedImage); 191 outputStream.update(singleCloseImage); 192 } 193 } 194 195 /** 196 * Registers the given output image stream as a destination for images with 197 * timestamps present in inputTimestampStream. Note that 198 * inputTimestampStream is assumed to be synchronized with the global 199 * timestamp stream such that it must always contain a timestamp for a 200 * requested image before the global timestamp stream provides the timestamp 201 * for the *next* image. 202 * 203 * @param inputTimestampBufferQueue A stream of timestamps of images to be 204 * routed to the given output stream. This should be closed by 205 * the producer when no more images are expected. 206 * @param outputStream The image stream on which to add images. 207 */ 208 @Override addRoute(BufferQueue<Long> inputTimestampBufferQueue, BufferQueueController<ImageProxy> outputStream)209 public void addRoute(BufferQueue<Long> inputTimestampBufferQueue, 210 BufferQueueController<ImageProxy> outputStream) { 211 synchronized (mDispatchTable) { 212 mDispatchTable.add(new DispatchRecord(inputTimestampBufferQueue, outputStream)); 213 } 214 } 215 } 216