1 /* 2 * Copyright (C) 2015 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.tv.tuner.source; 18 19 import android.content.Context; 20 import android.net.Uri; 21 import android.os.Environment; 22 import android.support.annotation.Nullable; 23 import android.util.Log; 24 import android.util.SparseBooleanArray; 25 import com.android.tv.common.SoftPreconditions; 26 import com.android.tv.tuner.api.ScanChannel; 27 import com.android.tv.tuner.data.TunerChannel; 28 import com.android.tv.tuner.features.TunerFeatures; 29 import com.android.tv.tuner.ts.EventDetector.EventListener; 30 import com.android.tv.tuner.ts.TsParser; 31 import com.google.android.exoplayer2.upstream.DataSpec; 32 import com.google.android.exoplayer2.upstream.TransferListener; 33 import java.io.BufferedInputStream; 34 import java.io.File; 35 import java.io.FileInputStream; 36 import java.io.IOException; 37 import java.util.List; 38 import java.util.concurrent.atomic.AtomicLong; 39 40 /** 41 * Provides MPEG-2 TS stream sources for both channel scanning and channel playing from a local file 42 * generated by capturing TV signal. 43 */ 44 public class FileTsStreamer implements TsStreamer { 45 private static final String TAG = "FileTsStreamer"; 46 47 private static final int TS_PACKET_SIZE = 188; 48 private static final int TS_SYNC_BYTE = 0x47; 49 private static final int MIN_READ_UNIT = TS_PACKET_SIZE * 10; 50 private static final int READ_BUFFER_SIZE = MIN_READ_UNIT * 10; // ~20KB 51 private static final int CIRCULAR_BUFFER_SIZE = MIN_READ_UNIT * 4000; // ~ 8MB 52 private static final int PADDING_SIZE = MIN_READ_UNIT * 1000; // ~2MB 53 private static final int READ_TIMEOUT_MS = 10000; // 10 secs. 54 private static final int BUFFER_UNDERRUN_SLEEP_MS = 10; 55 private static final String FILE_DIR = 56 new File(Environment.getExternalStorageDirectory(), "Streams").getAbsolutePath(); 57 58 // Virtual frequency base used for file-based source 59 public static final int FREQ_BASE = 100; 60 61 private final Object mCircularBufferMonitor = new Object(); 62 private final byte[] mCircularBuffer = new byte[CIRCULAR_BUFFER_SIZE]; 63 private final FileSourceEventDetector mEventDetector; 64 private final Context mContext; 65 66 private long mBytesFetched; 67 private long mLastReadPosition; 68 private boolean mStreaming; 69 70 private Thread mStreamingThread; 71 private StreamProvider mSource; 72 73 public static class FileDataSource extends TsDataSource { 74 private final FileTsStreamer mTsStreamer; 75 private final AtomicLong mLastReadPosition = new AtomicLong(0); 76 private Uri mUri; 77 private long mStartBufferedPosition; 78 FileDataSource(FileTsStreamer tsStreamer)79 private FileDataSource(FileTsStreamer tsStreamer) { 80 mTsStreamer = tsStreamer; 81 mStartBufferedPosition = tsStreamer.getBufferedPosition(); 82 } 83 84 @Override getBufferedPosition()85 public long getBufferedPosition() { 86 return mTsStreamer.getBufferedPosition() - mStartBufferedPosition; 87 } 88 89 @Override getLastReadPosition()90 public long getLastReadPosition() { 91 return mLastReadPosition.get(); 92 } 93 94 @Override shiftStartPosition(long offset)95 public void shiftStartPosition(long offset) { 96 SoftPreconditions.checkState(mLastReadPosition.get() == 0); 97 SoftPreconditions.checkArgument(0 <= offset && offset <= getBufferedPosition()); 98 mStartBufferedPosition += offset; 99 } 100 101 @Override open(DataSpec dataSpec)102 public long open(DataSpec dataSpec) { 103 mUri = dataSpec.uri; 104 mLastReadPosition.set(0); 105 return com.google.android.exoplayer2.C.LENGTH_UNSET; 106 } 107 108 @Override close()109 public void close() {} 110 111 @Override read(byte[] buffer, int offset, int readLength)112 public int read(byte[] buffer, int offset, int readLength) throws IOException { 113 int ret = 114 mTsStreamer.readAt( 115 mStartBufferedPosition + mLastReadPosition.get(), 116 buffer, 117 offset, 118 readLength); 119 if (ret > 0) { 120 mLastReadPosition.addAndGet(ret); 121 } 122 return ret; 123 } 124 125 // ExoPlayer V2 DataSource implementation. 126 127 @Override addTransferListener(TransferListener transferListener)128 public void addTransferListener(TransferListener transferListener) { 129 // TODO: Implement to support metrics collection. 130 } 131 132 @Nullable 133 @Override getUri()134 public Uri getUri() { 135 return mUri; 136 } 137 } 138 139 /** 140 * Creates {@link TsStreamer} for scanning & playing MPEG-2 TS file. 141 * 142 * @param eventListener the listener for channel & program information 143 */ FileTsStreamer(EventListener eventListener, Context context)144 public FileTsStreamer(EventListener eventListener, Context context) { 145 mEventDetector = 146 new FileSourceEventDetector( 147 eventListener, TunerFeatures.ENABLE_FILE_DVB.isEnabled(context)); 148 mContext = context; 149 } 150 151 @Override startStream(ScanChannel channel)152 public boolean startStream(ScanChannel channel) { 153 String filepath = new File(FILE_DIR, channel.filename).getAbsolutePath(); 154 mSource = new StreamProvider(filepath); 155 if (!mSource.isReady()) { 156 return false; 157 } 158 mEventDetector.start(mSource, FileSourceEventDetector.ALL_PROGRAM_NUMBERS); 159 mSource.addPidFilter(TsParser.PAT_PID); 160 mSource.addPidFilter(TsParser.ATSC_SI_BASE_PID); 161 if (TunerFeatures.ENABLE_FILE_DVB.isEnabled(mContext)) { 162 mSource.addPidFilter(TsParser.DVB_EIT_PID); 163 mSource.addPidFilter(TsParser.DVB_SDT_PID); 164 } 165 synchronized (mCircularBufferMonitor) { 166 if (mStreaming) { 167 return true; 168 } 169 mStreaming = true; 170 } 171 172 mStreamingThread = new StreamingThread(); 173 mStreamingThread.start(); 174 Log.i(TAG, "Streaming started"); 175 return true; 176 } 177 178 @Override startStream(TunerChannel channel)179 public boolean startStream(TunerChannel channel) { 180 Log.i(TAG, "tuneToChannel with: " + channel.getFilepath()); 181 mSource = new StreamProvider(channel.getFilepath()); 182 if (!mSource.isReady()) { 183 return false; 184 } 185 mEventDetector.start(mSource, channel.getProgramNumber()); 186 mSource.addPidFilter(channel.getVideoPid()); 187 for (Integer i : channel.getAudioPids()) { 188 mSource.addPidFilter(i); 189 } 190 mSource.addPidFilter(channel.getPcrPid()); 191 mSource.addPidFilter(TsParser.PAT_PID); 192 mSource.addPidFilter(TsParser.ATSC_SI_BASE_PID); 193 if (TunerFeatures.ENABLE_FILE_DVB.isEnabled(mContext)) { 194 mSource.addPidFilter(TsParser.DVB_EIT_PID); 195 mSource.addPidFilter(TsParser.DVB_SDT_PID); 196 } 197 synchronized (mCircularBufferMonitor) { 198 if (mStreaming) { 199 return true; 200 } 201 mStreaming = true; 202 } 203 204 mStreamingThread = new StreamingThread(); 205 mStreamingThread.start(); 206 Log.i(TAG, "Streaming started"); 207 return true; 208 } 209 210 /** 211 * Blocks the current thread until the streaming thread stops. In rare cases when the tuner 212 * device is overloaded this can take a while, but usually it returns pretty quickly. 213 */ 214 @Override stopStream()215 public void stopStream() { 216 synchronized (mCircularBufferMonitor) { 217 mStreaming = false; 218 mCircularBufferMonitor.notify(); 219 } 220 221 try { 222 if (mStreamingThread != null) { 223 mStreamingThread.join(); 224 } 225 } catch (InterruptedException e) { 226 Thread.currentThread().interrupt(); 227 } 228 } 229 230 @Override createDataSource()231 public TsDataSource createDataSource() { 232 return new FileDataSource(this); 233 } 234 235 /** 236 * Returns the current buffered position from the file. 237 * 238 * @return the current buffered position 239 */ getBufferedPosition()240 public long getBufferedPosition() { 241 synchronized (mCircularBufferMonitor) { 242 return mBytesFetched; 243 } 244 } 245 246 /** Provides MPEG-2 transport stream from a local file. Stream can be filtered by PID. */ 247 public static class StreamProvider { 248 private final String mFilepath; 249 private final SparseBooleanArray mPids = new SparseBooleanArray(); 250 private final byte[] mPreBuffer = new byte[READ_BUFFER_SIZE]; 251 252 private BufferedInputStream mInputStream; 253 StreamProvider(String filepath)254 private StreamProvider(String filepath) { 255 mFilepath = filepath; 256 open(filepath); 257 } 258 open(String filepath)259 private void open(String filepath) { 260 try { 261 mInputStream = new BufferedInputStream(new FileInputStream(filepath)); 262 } catch (IOException e) { 263 Log.e(TAG, "Error opening input stream", e); 264 mInputStream = null; 265 } 266 } 267 isReady()268 private boolean isReady() { 269 return mInputStream != null; 270 } 271 272 /** Returns the file path of the MPEG-2 TS file. */ getFilepath()273 public String getFilepath() { 274 return mFilepath; 275 } 276 277 /** Adds a pid for filtering from the MPEG-2 TS file. */ addPidFilter(int pid)278 public void addPidFilter(int pid) { 279 mPids.put(pid, true); 280 } 281 282 /** Returns whether the current pid filter is empty or not. */ isFilterEmpty()283 public boolean isFilterEmpty() { 284 return mPids.size() == 0; 285 } 286 287 /** Clears the current pid filter. */ clearPidFilter()288 public void clearPidFilter() { 289 mPids.clear(); 290 } 291 292 /** 293 * Returns whether a pid is in the pid filter or not. 294 * 295 * @param pid the pid to check 296 */ isInFilter(int pid)297 public boolean isInFilter(int pid) { 298 return mPids.get(pid); 299 } 300 301 /** 302 * Reads from the MPEG-2 TS file to buffer. 303 * 304 * @param inputBuffer to read 305 * @return the number of read bytes 306 */ read(byte[] inputBuffer)307 private int read(byte[] inputBuffer) { 308 int readSize = readInternal(); 309 if (readSize <= 0) { 310 // Reached the end of stream. Restart from the beginning. 311 close(); 312 open(mFilepath); 313 if (mInputStream == null) { 314 return -1; 315 } 316 readSize = readInternal(); 317 } 318 319 if (mPreBuffer[0] != TS_SYNC_BYTE) { 320 Log.e(TAG, "Error reading input stream - no TS sync found"); 321 return -1; 322 } 323 int filteredSize = 0; 324 for (int i = 0, destPos = 0; i < readSize; i += TS_PACKET_SIZE) { 325 if (mPreBuffer[i] == TS_SYNC_BYTE) { 326 int pid = ((mPreBuffer[i + 1] & 0x1f) << 8) + (mPreBuffer[i + 2] & 0xff); 327 if (mPids.get(pid)) { 328 System.arraycopy(mPreBuffer, i, inputBuffer, destPos, TS_PACKET_SIZE); 329 destPos += TS_PACKET_SIZE; 330 filteredSize += TS_PACKET_SIZE; 331 } 332 } 333 } 334 return filteredSize; 335 } 336 readInternal()337 private int readInternal() { 338 int readSize; 339 try { 340 readSize = mInputStream.read(mPreBuffer, 0, mPreBuffer.length); 341 } catch (IOException e) { 342 Log.e(TAG, "Error reading input stream", e); 343 return -1; 344 } 345 return readSize; 346 } 347 close()348 private void close() { 349 try { 350 mInputStream.close(); 351 } catch (IOException e) { 352 Log.e(TAG, "Error closing input stream:", e); 353 } 354 mInputStream = null; 355 } 356 } 357 358 /** 359 * Reads data from internal buffer. 360 * 361 * @param pos the position to read from 362 * @param buffer to read 363 * @param offset start position of the read buffer 364 * @param amount number of bytes to read 365 * @return number of read bytes when successful, {@code -1} otherwise 366 * @throws IOException 367 */ readAt(long pos, byte[] buffer, int offset, int amount)368 public int readAt(long pos, byte[] buffer, int offset, int amount) throws IOException { 369 synchronized (mCircularBufferMonitor) { 370 long initialBytesFetched = mBytesFetched; 371 while (mBytesFetched < pos + amount && mStreaming) { 372 try { 373 mCircularBufferMonitor.wait(READ_TIMEOUT_MS); 374 } catch (InterruptedException e) { 375 // Wait again. 376 Thread.currentThread().interrupt(); 377 } 378 if (initialBytesFetched == mBytesFetched) { 379 Log.w(TAG, "No data update for " + READ_TIMEOUT_MS + "ms. returning -1."); 380 381 // Returning -1 will make demux report EOS so that the input service can retry 382 // the playback. 383 return -1; 384 } 385 } 386 if (!mStreaming) { 387 Log.w(TAG, "Stream is already stopped."); 388 return -1; 389 } 390 if (mBytesFetched - CIRCULAR_BUFFER_SIZE > pos) { 391 Log.e(TAG, "Demux is requesting the data which is already overwritten."); 392 return -1; 393 } 394 int posInBuffer = (int) (pos % CIRCULAR_BUFFER_SIZE); 395 int bytesToCopyInFirstPass = amount; 396 if (posInBuffer + bytesToCopyInFirstPass > mCircularBuffer.length) { 397 bytesToCopyInFirstPass = mCircularBuffer.length - posInBuffer; 398 } 399 System.arraycopy(mCircularBuffer, posInBuffer, buffer, offset, bytesToCopyInFirstPass); 400 if (bytesToCopyInFirstPass < amount) { 401 System.arraycopy( 402 mCircularBuffer, 403 0, 404 buffer, 405 offset + bytesToCopyInFirstPass, 406 amount - bytesToCopyInFirstPass); 407 } 408 mLastReadPosition = pos + amount; 409 mCircularBufferMonitor.notify(); 410 return amount; 411 } 412 } 413 414 /** 415 * Adds {@link ScanChannel} instance for local files. 416 * 417 * @param output a list of channels where the results will be placed in 418 */ addLocalStreamFiles(List<ScanChannel> output)419 public static void addLocalStreamFiles(List<ScanChannel> output) { 420 File dir = new File(FILE_DIR); 421 if (!dir.exists()) return; 422 423 File[] tsFiles = dir.listFiles(); 424 if (tsFiles == null) return; 425 int freq = FileTsStreamer.FREQ_BASE; 426 for (File file : tsFiles) { 427 if (!file.isFile()) continue; 428 output.add(ScanChannel.forFile(freq, file.getName())); 429 freq += 100; 430 } 431 } 432 433 /** 434 * A thread managing a circular buffer that holds stream data to be consumed by player. Keeps 435 * reading data in from a {@link StreamProvider} to hold enough amount for buffering. Started 436 * and stopped by {@link #startStream()} and {@link #stopStream()}, respectively. 437 */ 438 private class StreamingThread extends Thread { 439 @Override run()440 public void run() { 441 byte[] dataBuffer = new byte[READ_BUFFER_SIZE]; 442 443 synchronized (mCircularBufferMonitor) { 444 mBytesFetched = 0; 445 mLastReadPosition = 0; 446 } 447 448 while (true) { 449 synchronized (mCircularBufferMonitor) { 450 while ((mBytesFetched - mLastReadPosition + PADDING_SIZE) > CIRCULAR_BUFFER_SIZE 451 && mStreaming) { 452 try { 453 mCircularBufferMonitor.wait(); 454 } catch (InterruptedException e) { 455 // Wait again. 456 Thread.currentThread().interrupt(); 457 } 458 } 459 if (!mStreaming) { 460 break; 461 } 462 } 463 464 int bytesWritten = mSource.read(dataBuffer); 465 if (bytesWritten <= 0) { 466 try { 467 // When buffer is underrun, we sleep for short time to prevent 468 // unnecessary CPU draining. 469 sleep(BUFFER_UNDERRUN_SLEEP_MS); 470 } catch (InterruptedException e) { 471 Thread.currentThread().interrupt(); 472 } 473 continue; 474 } 475 476 mEventDetector.feedTSStream(dataBuffer, 0, bytesWritten); 477 478 synchronized (mCircularBufferMonitor) { 479 int posInBuffer = (int) (mBytesFetched % CIRCULAR_BUFFER_SIZE); 480 int bytesToCopyInFirstPass = bytesWritten; 481 if (posInBuffer + bytesToCopyInFirstPass > mCircularBuffer.length) { 482 bytesToCopyInFirstPass = mCircularBuffer.length - posInBuffer; 483 } 484 System.arraycopy( 485 dataBuffer, 0, mCircularBuffer, posInBuffer, bytesToCopyInFirstPass); 486 if (bytesToCopyInFirstPass < bytesWritten) { 487 System.arraycopy( 488 dataBuffer, 489 bytesToCopyInFirstPass, 490 mCircularBuffer, 491 0, 492 bytesWritten - bytesToCopyInFirstPass); 493 } 494 mBytesFetched += bytesWritten; 495 mCircularBufferMonitor.notify(); 496 } 497 } 498 499 Log.i(TAG, "Streaming stopped"); 500 mSource.close(); 501 } 502 } 503 } 504