1 /*
2  * Copyright (C) 2015 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.tv.tuner.source;
18 
19 import android.content.Context;
20 import android.net.Uri;
21 import android.os.Environment;
22 import android.support.annotation.Nullable;
23 import android.util.Log;
24 import android.util.SparseBooleanArray;
25 import com.android.tv.common.SoftPreconditions;
26 import com.android.tv.tuner.api.ScanChannel;
27 import com.android.tv.tuner.data.TunerChannel;
28 import com.android.tv.tuner.features.TunerFeatures;
29 import com.android.tv.tuner.ts.EventDetector.EventListener;
30 import com.android.tv.tuner.ts.TsParser;
31 import com.google.android.exoplayer2.upstream.DataSpec;
32 import com.google.android.exoplayer2.upstream.TransferListener;
33 import java.io.BufferedInputStream;
34 import java.io.File;
35 import java.io.FileInputStream;
36 import java.io.IOException;
37 import java.util.List;
38 import java.util.concurrent.atomic.AtomicLong;
39 
40 /**
41  * Provides MPEG-2 TS stream sources for both channel scanning and channel playing from a local file
42  * generated by capturing TV signal.
43  */
44 public class FileTsStreamer implements TsStreamer {
45     private static final String TAG = "FileTsStreamer";
46 
47     private static final int TS_PACKET_SIZE = 188;
48     private static final int TS_SYNC_BYTE = 0x47;
49     private static final int MIN_READ_UNIT = TS_PACKET_SIZE * 10;
50     private static final int READ_BUFFER_SIZE = MIN_READ_UNIT * 10; // ~20KB
51     private static final int CIRCULAR_BUFFER_SIZE = MIN_READ_UNIT * 4000; // ~ 8MB
52     private static final int PADDING_SIZE = MIN_READ_UNIT * 1000; // ~2MB
53     private static final int READ_TIMEOUT_MS = 10000; // 10 secs.
54     private static final int BUFFER_UNDERRUN_SLEEP_MS = 10;
55     private static final String FILE_DIR =
56             new File(Environment.getExternalStorageDirectory(), "Streams").getAbsolutePath();
57 
58     // Virtual frequency base used for file-based source
59     public static final int FREQ_BASE = 100;
60 
61     private final Object mCircularBufferMonitor = new Object();
62     private final byte[] mCircularBuffer = new byte[CIRCULAR_BUFFER_SIZE];
63     private final FileSourceEventDetector mEventDetector;
64     private final Context mContext;
65 
66     private long mBytesFetched;
67     private long mLastReadPosition;
68     private boolean mStreaming;
69 
70     private Thread mStreamingThread;
71     private StreamProvider mSource;
72 
73     public static class FileDataSource extends TsDataSource {
74         private final FileTsStreamer mTsStreamer;
75         private final AtomicLong mLastReadPosition = new AtomicLong(0);
76         private Uri mUri;
77         private long mStartBufferedPosition;
78 
FileDataSource(FileTsStreamer tsStreamer)79         private FileDataSource(FileTsStreamer tsStreamer) {
80             mTsStreamer = tsStreamer;
81             mStartBufferedPosition = tsStreamer.getBufferedPosition();
82         }
83 
84         @Override
getBufferedPosition()85         public long getBufferedPosition() {
86             return mTsStreamer.getBufferedPosition() - mStartBufferedPosition;
87         }
88 
89         @Override
getLastReadPosition()90         public long getLastReadPosition() {
91             return mLastReadPosition.get();
92         }
93 
94         @Override
shiftStartPosition(long offset)95         public void shiftStartPosition(long offset) {
96             SoftPreconditions.checkState(mLastReadPosition.get() == 0);
97             SoftPreconditions.checkArgument(0 <= offset && offset <= getBufferedPosition());
98             mStartBufferedPosition += offset;
99         }
100 
101         @Override
open(DataSpec dataSpec)102         public long open(DataSpec dataSpec) {
103             mUri = dataSpec.uri;
104             mLastReadPosition.set(0);
105             return com.google.android.exoplayer2.C.LENGTH_UNSET;
106         }
107 
108         @Override
close()109         public void close() {}
110 
111         @Override
read(byte[] buffer, int offset, int readLength)112         public int read(byte[] buffer, int offset, int readLength) throws IOException {
113             int ret =
114                     mTsStreamer.readAt(
115                             mStartBufferedPosition + mLastReadPosition.get(),
116                             buffer,
117                             offset,
118                             readLength);
119             if (ret > 0) {
120                 mLastReadPosition.addAndGet(ret);
121             }
122             return ret;
123         }
124 
125         // ExoPlayer V2 DataSource implementation.
126 
127         @Override
addTransferListener(TransferListener transferListener)128         public void addTransferListener(TransferListener transferListener) {
129             // TODO: Implement to support metrics collection.
130         }
131 
132         @Nullable
133         @Override
getUri()134         public Uri getUri() {
135             return mUri;
136         }
137     }
138 
139     /**
140      * Creates {@link TsStreamer} for scanning & playing MPEG-2 TS file.
141      *
142      * @param eventListener the listener for channel & program information
143      */
FileTsStreamer(EventListener eventListener, Context context)144     public FileTsStreamer(EventListener eventListener, Context context) {
145         mEventDetector =
146                 new FileSourceEventDetector(
147                         eventListener, TunerFeatures.ENABLE_FILE_DVB.isEnabled(context));
148         mContext = context;
149     }
150 
151     @Override
startStream(ScanChannel channel)152     public boolean startStream(ScanChannel channel) {
153         String filepath = new File(FILE_DIR, channel.filename).getAbsolutePath();
154         mSource = new StreamProvider(filepath);
155         if (!mSource.isReady()) {
156             return false;
157         }
158         mEventDetector.start(mSource, FileSourceEventDetector.ALL_PROGRAM_NUMBERS);
159         mSource.addPidFilter(TsParser.PAT_PID);
160         mSource.addPidFilter(TsParser.ATSC_SI_BASE_PID);
161         if (TunerFeatures.ENABLE_FILE_DVB.isEnabled(mContext)) {
162             mSource.addPidFilter(TsParser.DVB_EIT_PID);
163             mSource.addPidFilter(TsParser.DVB_SDT_PID);
164         }
165         synchronized (mCircularBufferMonitor) {
166             if (mStreaming) {
167                 return true;
168             }
169             mStreaming = true;
170         }
171 
172         mStreamingThread = new StreamingThread();
173         mStreamingThread.start();
174         Log.i(TAG, "Streaming started");
175         return true;
176     }
177 
178     @Override
startStream(TunerChannel channel)179     public boolean startStream(TunerChannel channel) {
180         Log.i(TAG, "tuneToChannel with: " + channel.getFilepath());
181         mSource = new StreamProvider(channel.getFilepath());
182         if (!mSource.isReady()) {
183             return false;
184         }
185         mEventDetector.start(mSource, channel.getProgramNumber());
186         mSource.addPidFilter(channel.getVideoPid());
187         for (Integer i : channel.getAudioPids()) {
188             mSource.addPidFilter(i);
189         }
190         mSource.addPidFilter(channel.getPcrPid());
191         mSource.addPidFilter(TsParser.PAT_PID);
192         mSource.addPidFilter(TsParser.ATSC_SI_BASE_PID);
193         if (TunerFeatures.ENABLE_FILE_DVB.isEnabled(mContext)) {
194             mSource.addPidFilter(TsParser.DVB_EIT_PID);
195             mSource.addPidFilter(TsParser.DVB_SDT_PID);
196         }
197         synchronized (mCircularBufferMonitor) {
198             if (mStreaming) {
199                 return true;
200             }
201             mStreaming = true;
202         }
203 
204         mStreamingThread = new StreamingThread();
205         mStreamingThread.start();
206         Log.i(TAG, "Streaming started");
207         return true;
208     }
209 
210     /**
211      * Blocks the current thread until the streaming thread stops. In rare cases when the tuner
212      * device is overloaded this can take a while, but usually it returns pretty quickly.
213      */
214     @Override
stopStream()215     public void stopStream() {
216         synchronized (mCircularBufferMonitor) {
217             mStreaming = false;
218             mCircularBufferMonitor.notify();
219         }
220 
221         try {
222             if (mStreamingThread != null) {
223                 mStreamingThread.join();
224             }
225         } catch (InterruptedException e) {
226             Thread.currentThread().interrupt();
227         }
228     }
229 
230     @Override
createDataSource()231     public TsDataSource createDataSource() {
232         return new FileDataSource(this);
233     }
234 
235     /**
236      * Returns the current buffered position from the file.
237      *
238      * @return the current buffered position
239      */
getBufferedPosition()240     public long getBufferedPosition() {
241         synchronized (mCircularBufferMonitor) {
242             return mBytesFetched;
243         }
244     }
245 
246     /** Provides MPEG-2 transport stream from a local file. Stream can be filtered by PID. */
247     public static class StreamProvider {
248         private final String mFilepath;
249         private final SparseBooleanArray mPids = new SparseBooleanArray();
250         private final byte[] mPreBuffer = new byte[READ_BUFFER_SIZE];
251 
252         private BufferedInputStream mInputStream;
253 
StreamProvider(String filepath)254         private StreamProvider(String filepath) {
255             mFilepath = filepath;
256             open(filepath);
257         }
258 
open(String filepath)259         private void open(String filepath) {
260             try {
261                 mInputStream = new BufferedInputStream(new FileInputStream(filepath));
262             } catch (IOException e) {
263                 Log.e(TAG, "Error opening input stream", e);
264                 mInputStream = null;
265             }
266         }
267 
isReady()268         private boolean isReady() {
269             return mInputStream != null;
270         }
271 
272         /** Returns the file path of the MPEG-2 TS file. */
getFilepath()273         public String getFilepath() {
274             return mFilepath;
275         }
276 
277         /** Adds a pid for filtering from the MPEG-2 TS file. */
addPidFilter(int pid)278         public void addPidFilter(int pid) {
279             mPids.put(pid, true);
280         }
281 
282         /** Returns whether the current pid filter is empty or not. */
isFilterEmpty()283         public boolean isFilterEmpty() {
284             return mPids.size() == 0;
285         }
286 
287         /** Clears the current pid filter. */
clearPidFilter()288         public void clearPidFilter() {
289             mPids.clear();
290         }
291 
292         /**
293          * Returns whether a pid is in the pid filter or not.
294          *
295          * @param pid the pid to check
296          */
isInFilter(int pid)297         public boolean isInFilter(int pid) {
298             return mPids.get(pid);
299         }
300 
301         /**
302          * Reads from the MPEG-2 TS file to buffer.
303          *
304          * @param inputBuffer to read
305          * @return the number of read bytes
306          */
read(byte[] inputBuffer)307         private int read(byte[] inputBuffer) {
308             int readSize = readInternal();
309             if (readSize <= 0) {
310                 // Reached the end of stream. Restart from the beginning.
311                 close();
312                 open(mFilepath);
313                 if (mInputStream == null) {
314                     return -1;
315                 }
316                 readSize = readInternal();
317             }
318 
319             if (mPreBuffer[0] != TS_SYNC_BYTE) {
320                 Log.e(TAG, "Error reading input stream - no TS sync found");
321                 return -1;
322             }
323             int filteredSize = 0;
324             for (int i = 0, destPos = 0; i < readSize; i += TS_PACKET_SIZE) {
325                 if (mPreBuffer[i] == TS_SYNC_BYTE) {
326                     int pid = ((mPreBuffer[i + 1] & 0x1f) << 8) + (mPreBuffer[i + 2] & 0xff);
327                     if (mPids.get(pid)) {
328                         System.arraycopy(mPreBuffer, i, inputBuffer, destPos, TS_PACKET_SIZE);
329                         destPos += TS_PACKET_SIZE;
330                         filteredSize += TS_PACKET_SIZE;
331                     }
332                 }
333             }
334             return filteredSize;
335         }
336 
readInternal()337         private int readInternal() {
338             int readSize;
339             try {
340                 readSize = mInputStream.read(mPreBuffer, 0, mPreBuffer.length);
341             } catch (IOException e) {
342                 Log.e(TAG, "Error reading input stream", e);
343                 return -1;
344             }
345             return readSize;
346         }
347 
close()348         private void close() {
349             try {
350                 mInputStream.close();
351             } catch (IOException e) {
352                 Log.e(TAG, "Error closing input stream:", e);
353             }
354             mInputStream = null;
355         }
356     }
357 
358     /**
359      * Reads data from internal buffer.
360      *
361      * @param pos the position to read from
362      * @param buffer to read
363      * @param offset start position of the read buffer
364      * @param amount number of bytes to read
365      * @return number of read bytes when successful, {@code -1} otherwise
366      * @throws IOException
367      */
readAt(long pos, byte[] buffer, int offset, int amount)368     public int readAt(long pos, byte[] buffer, int offset, int amount) throws IOException {
369         synchronized (mCircularBufferMonitor) {
370             long initialBytesFetched = mBytesFetched;
371             while (mBytesFetched < pos + amount && mStreaming) {
372                 try {
373                     mCircularBufferMonitor.wait(READ_TIMEOUT_MS);
374                 } catch (InterruptedException e) {
375                     // Wait again.
376                     Thread.currentThread().interrupt();
377                 }
378                 if (initialBytesFetched == mBytesFetched) {
379                     Log.w(TAG, "No data update for " + READ_TIMEOUT_MS + "ms. returning -1.");
380 
381                     // Returning -1 will make demux report EOS so that the input service can retry
382                     // the playback.
383                     return -1;
384                 }
385             }
386             if (!mStreaming) {
387                 Log.w(TAG, "Stream is already stopped.");
388                 return -1;
389             }
390             if (mBytesFetched - CIRCULAR_BUFFER_SIZE > pos) {
391                 Log.e(TAG, "Demux is requesting the data which is already overwritten.");
392                 return -1;
393             }
394             int posInBuffer = (int) (pos % CIRCULAR_BUFFER_SIZE);
395             int bytesToCopyInFirstPass = amount;
396             if (posInBuffer + bytesToCopyInFirstPass > mCircularBuffer.length) {
397                 bytesToCopyInFirstPass = mCircularBuffer.length - posInBuffer;
398             }
399             System.arraycopy(mCircularBuffer, posInBuffer, buffer, offset, bytesToCopyInFirstPass);
400             if (bytesToCopyInFirstPass < amount) {
401                 System.arraycopy(
402                         mCircularBuffer,
403                         0,
404                         buffer,
405                         offset + bytesToCopyInFirstPass,
406                         amount - bytesToCopyInFirstPass);
407             }
408             mLastReadPosition = pos + amount;
409             mCircularBufferMonitor.notify();
410             return amount;
411         }
412     }
413 
414     /**
415      * Adds {@link ScanChannel} instance for local files.
416      *
417      * @param output a list of channels where the results will be placed in
418      */
addLocalStreamFiles(List<ScanChannel> output)419     public static void addLocalStreamFiles(List<ScanChannel> output) {
420         File dir = new File(FILE_DIR);
421         if (!dir.exists()) return;
422 
423         File[] tsFiles = dir.listFiles();
424         if (tsFiles == null) return;
425         int freq = FileTsStreamer.FREQ_BASE;
426         for (File file : tsFiles) {
427             if (!file.isFile()) continue;
428             output.add(ScanChannel.forFile(freq, file.getName()));
429             freq += 100;
430         }
431     }
432 
433     /**
434      * A thread managing a circular buffer that holds stream data to be consumed by player. Keeps
435      * reading data in from a {@link StreamProvider} to hold enough amount for buffering. Started
436      * and stopped by {@link #startStream()} and {@link #stopStream()}, respectively.
437      */
438     private class StreamingThread extends Thread {
439         @Override
run()440         public void run() {
441             byte[] dataBuffer = new byte[READ_BUFFER_SIZE];
442 
443             synchronized (mCircularBufferMonitor) {
444                 mBytesFetched = 0;
445                 mLastReadPosition = 0;
446             }
447 
448             while (true) {
449                 synchronized (mCircularBufferMonitor) {
450                     while ((mBytesFetched - mLastReadPosition + PADDING_SIZE) > CIRCULAR_BUFFER_SIZE
451                             && mStreaming) {
452                         try {
453                             mCircularBufferMonitor.wait();
454                         } catch (InterruptedException e) {
455                             // Wait again.
456                             Thread.currentThread().interrupt();
457                         }
458                     }
459                     if (!mStreaming) {
460                         break;
461                     }
462                 }
463 
464                 int bytesWritten = mSource.read(dataBuffer);
465                 if (bytesWritten <= 0) {
466                     try {
467                         // When buffer is underrun, we sleep for short time to prevent
468                         // unnecessary CPU draining.
469                         sleep(BUFFER_UNDERRUN_SLEEP_MS);
470                     } catch (InterruptedException e) {
471                         Thread.currentThread().interrupt();
472                     }
473                     continue;
474                 }
475 
476                 mEventDetector.feedTSStream(dataBuffer, 0, bytesWritten);
477 
478                 synchronized (mCircularBufferMonitor) {
479                     int posInBuffer = (int) (mBytesFetched % CIRCULAR_BUFFER_SIZE);
480                     int bytesToCopyInFirstPass = bytesWritten;
481                     if (posInBuffer + bytesToCopyInFirstPass > mCircularBuffer.length) {
482                         bytesToCopyInFirstPass = mCircularBuffer.length - posInBuffer;
483                     }
484                     System.arraycopy(
485                             dataBuffer, 0, mCircularBuffer, posInBuffer, bytesToCopyInFirstPass);
486                     if (bytesToCopyInFirstPass < bytesWritten) {
487                         System.arraycopy(
488                                 dataBuffer,
489                                 bytesToCopyInFirstPass,
490                                 mCircularBuffer,
491                                 0,
492                                 bytesWritten - bytesToCopyInFirstPass);
493                     }
494                     mBytesFetched += bytesWritten;
495                     mCircularBufferMonitor.notify();
496                 }
497             }
498 
499             Log.i(TAG, "Streaming stopped");
500             mSource.close();
501         }
502     }
503 }
504