1 /*
2  * Copyright (C) 2012 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "PlaylistFetcher"
19 #include <android-base/macros.h>
20 #include <utils/Log.h>
21 #include <utils/misc.h>
22 
23 #include "PlaylistFetcher.h"
24 #include "HTTPDownloader.h"
25 #include "LiveSession.h"
26 #include "M3UParser.h"
27 #include "include/ID3.h"
28 #include "mpeg2ts/AnotherPacketSource.h"
29 #include "mpeg2ts/HlsSampleDecryptor.h"
30 
31 #include <datasource/DataURISource.h>
32 #include <media/stagefright/foundation/ABitReader.h>
33 #include <media/stagefright/foundation/ABuffer.h>
34 #include <media/stagefright/foundation/ADebug.h>
35 #include <media/stagefright/foundation/ByteUtils.h>
36 #include <media/stagefright/foundation/MediaKeys.h>
37 #include <media/stagefright/foundation/avc_utils.h>
38 #include <media/stagefright/MediaDefs.h>
39 #include <media/stagefright/MetaData.h>
40 #include <media/stagefright/MetaDataUtils.h>
41 #include <media/stagefright/Utils.h>
42 #include <media/stagefright/FoundationUtils.h>
43 
44 #include <ctype.h>
45 #include <inttypes.h>
46 
47 #define FLOGV(fmt, ...) ALOGV("[fetcher-%d] " fmt, mFetcherID, ##__VA_ARGS__)
48 #define FSLOGV(stream, fmt, ...) ALOGV("[fetcher-%d] [%s] " fmt, mFetcherID, \
49          LiveSession::getNameForStream(stream), ##__VA_ARGS__)
50 
51 namespace android {
52 
53 // static
54 const int64_t PlaylistFetcher::kMinBufferedDurationUs = 30000000LL;
55 const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000LL;
56 // LCM of 188 (size of a TS packet) & 1k works well
57 const int32_t PlaylistFetcher::kDownloadBlockSize = 47 * 1024;
58 
59 struct PlaylistFetcher::DownloadState : public RefBase {
60     DownloadState();
61     void resetState();
62     bool hasSavedState() const;
63     void restoreState(
64             AString &uri,
65             sp<AMessage> &itemMeta,
66             sp<ABuffer> &buffer,
67             sp<ABuffer> &tsBuffer,
68             int32_t &firstSeqNumberInPlaylist,
69             int32_t &lastSeqNumberInPlaylist);
70     void saveState(
71             AString &uri,
72             sp<AMessage> &itemMeta,
73             sp<ABuffer> &buffer,
74             sp<ABuffer> &tsBuffer,
75             int32_t &firstSeqNumberInPlaylist,
76             int32_t &lastSeqNumberInPlaylist);
77 
78 private:
79     bool mHasSavedState;
80     AString mUri;
81     sp<AMessage> mItemMeta;
82     sp<ABuffer> mBuffer;
83     sp<ABuffer> mTsBuffer;
84     int32_t mFirstSeqNumberInPlaylist;
85     int32_t mLastSeqNumberInPlaylist;
86 };
87 
DownloadState()88 PlaylistFetcher::DownloadState::DownloadState() {
89     resetState();
90 }
91 
hasSavedState() const92 bool PlaylistFetcher::DownloadState::hasSavedState() const {
93     return mHasSavedState;
94 }
95 
resetState()96 void PlaylistFetcher::DownloadState::resetState() {
97     mHasSavedState = false;
98 
99     mUri.clear();
100     mItemMeta = NULL;
101     mBuffer = NULL;
102     mTsBuffer = NULL;
103     mFirstSeqNumberInPlaylist = 0;
104     mLastSeqNumberInPlaylist = 0;
105 }
106 
restoreState(AString & uri,sp<AMessage> & itemMeta,sp<ABuffer> & buffer,sp<ABuffer> & tsBuffer,int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)107 void PlaylistFetcher::DownloadState::restoreState(
108         AString &uri,
109         sp<AMessage> &itemMeta,
110         sp<ABuffer> &buffer,
111         sp<ABuffer> &tsBuffer,
112         int32_t &firstSeqNumberInPlaylist,
113         int32_t &lastSeqNumberInPlaylist) {
114     if (!mHasSavedState) {
115         return;
116     }
117 
118     uri = mUri;
119     itemMeta = mItemMeta;
120     buffer = mBuffer;
121     tsBuffer = mTsBuffer;
122     firstSeqNumberInPlaylist = mFirstSeqNumberInPlaylist;
123     lastSeqNumberInPlaylist = mLastSeqNumberInPlaylist;
124 
125     resetState();
126 }
127 
saveState(AString & uri,sp<AMessage> & itemMeta,sp<ABuffer> & buffer,sp<ABuffer> & tsBuffer,int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)128 void PlaylistFetcher::DownloadState::saveState(
129         AString &uri,
130         sp<AMessage> &itemMeta,
131         sp<ABuffer> &buffer,
132         sp<ABuffer> &tsBuffer,
133         int32_t &firstSeqNumberInPlaylist,
134         int32_t &lastSeqNumberInPlaylist) {
135     mHasSavedState = true;
136 
137     mUri = uri;
138     mItemMeta = itemMeta;
139     mBuffer = buffer;
140     mTsBuffer = tsBuffer;
141     mFirstSeqNumberInPlaylist = firstSeqNumberInPlaylist;
142     mLastSeqNumberInPlaylist = lastSeqNumberInPlaylist;
143 }
144 
PlaylistFetcher(const sp<AMessage> & notify,const sp<LiveSession> & session,const char * uri,int32_t id,int32_t subtitleGeneration)145 PlaylistFetcher::PlaylistFetcher(
146         const sp<AMessage> &notify,
147         const sp<LiveSession> &session,
148         const char *uri,
149         int32_t id,
150         int32_t subtitleGeneration)
151     : mNotify(notify),
152       mSession(session),
153       mURI(uri),
154       mFetcherID(id),
155       mStreamTypeMask(0),
156       mStartTimeUs(-1LL),
157       mSegmentStartTimeUs(-1LL),
158       mDiscontinuitySeq(-1LL),
159       mStartTimeUsRelative(false),
160       mLastPlaylistFetchTimeUs(-1LL),
161       mPlaylistTimeUs(-1LL),
162       mSeqNumber(-1),
163       mNumRetries(0),
164       mNumRetriesForMonitorQueue(0),
165       mStartup(true),
166       mIDRFound(false),
167       mSeekMode(LiveSession::kSeekModeExactPosition),
168       mTimeChangeSignaled(false),
169       mNextPTSTimeUs(-1LL),
170       mMonitorQueueGeneration(0),
171       mSubtitleGeneration(subtitleGeneration),
172       mLastDiscontinuitySeq(-1LL),
173       mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY),
174       mFirstPTSValid(false),
175       mFirstTimeUs(-1LL),
176       mVideoBuffer(new AnotherPacketSource(NULL)),
177       mSampleAesKeyItemChanged(false),
178       mThresholdRatio(-1.0f),
179       mDownloadState(new DownloadState()),
180       mHasMetadata(false) {
181     memset(mPlaylistHash, 0, sizeof(mPlaylistHash));
182     mHTTPDownloader = mSession->getHTTPDownloader();
183 
184     memset(mKeyData, 0, sizeof(mKeyData));
185     memset(mAESInitVec, 0, sizeof(mAESInitVec));
186 }
187 
~PlaylistFetcher()188 PlaylistFetcher::~PlaylistFetcher() {
189 }
190 
getFetcherID() const191 int32_t PlaylistFetcher::getFetcherID() const {
192     return mFetcherID;
193 }
194 
getSegmentStartTimeUs(int32_t seqNumber) const195 int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const {
196     CHECK(mPlaylist != NULL);
197 
198     int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist;
199     mPlaylist->getSeqNumberRange(
200             &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
201 
202     CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
203     CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
204 
205     int64_t segmentStartUs = 0LL;
206     for (int32_t index = 0;
207             index < seqNumber - firstSeqNumberInPlaylist; ++index) {
208         sp<AMessage> itemMeta;
209         CHECK(mPlaylist->itemAt(
210                     index, NULL /* uri */, &itemMeta));
211 
212         int64_t itemDurationUs;
213         CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
214 
215         segmentStartUs += itemDurationUs;
216     }
217 
218     return segmentStartUs;
219 }
220 
getSegmentDurationUs(int32_t seqNumber) const221 int64_t PlaylistFetcher::getSegmentDurationUs(int32_t seqNumber) const {
222     CHECK(mPlaylist != NULL);
223 
224     int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist;
225     mPlaylist->getSeqNumberRange(
226             &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
227 
228     CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
229     CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
230 
231     int32_t index = seqNumber - firstSeqNumberInPlaylist;
232     sp<AMessage> itemMeta;
233     CHECK(mPlaylist->itemAt(
234                 index, NULL /* uri */, &itemMeta));
235 
236     int64_t itemDurationUs;
237     CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
238 
239     return itemDurationUs;
240 }
241 
delayUsToRefreshPlaylist() const242 int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const {
243     int64_t nowUs = ALooper::GetNowUs();
244 
245     if (mPlaylist == NULL || mLastPlaylistFetchTimeUs < 0LL) {
246         CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY);
247         return 0LL;
248     }
249 
250     if (mPlaylist->isComplete()) {
251         return (~0LLU >> 1);
252     }
253 
254     int64_t targetDurationUs = mPlaylist->getTargetDuration();
255 
256     int64_t minPlaylistAgeUs;
257 
258     switch (mRefreshState) {
259         case INITIAL_MINIMUM_RELOAD_DELAY:
260         {
261             size_t n = mPlaylist->size();
262             if (n > 0) {
263                 sp<AMessage> itemMeta;
264                 CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta));
265 
266                 int64_t itemDurationUs;
267                 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
268 
269                 minPlaylistAgeUs = itemDurationUs;
270                 break;
271             }
272 
273             FALLTHROUGH_INTENDED;
274         }
275 
276         case FIRST_UNCHANGED_RELOAD_ATTEMPT:
277         {
278             minPlaylistAgeUs = targetDurationUs / 2;
279             break;
280         }
281 
282         case SECOND_UNCHANGED_RELOAD_ATTEMPT:
283         {
284             minPlaylistAgeUs = (targetDurationUs * 3) / 2;
285             break;
286         }
287 
288         case THIRD_UNCHANGED_RELOAD_ATTEMPT:
289         {
290             minPlaylistAgeUs = targetDurationUs * 3;
291             break;
292         }
293 
294         default:
295             TRESPASS();
296             break;
297     }
298 
299     int64_t delayUs = mLastPlaylistFetchTimeUs + minPlaylistAgeUs - nowUs;
300     return delayUs > 0LL ? delayUs : 0LL;
301 }
302 
decryptBuffer(size_t playlistIndex,const sp<ABuffer> & buffer,bool first)303 status_t PlaylistFetcher::decryptBuffer(
304         size_t playlistIndex, const sp<ABuffer> &buffer,
305         bool first) {
306     sp<AMessage> itemMeta;
307     bool found = false;
308     AString method;
309 
310     for (ssize_t i = playlistIndex; i >= 0; --i) {
311         AString uri;
312         CHECK(mPlaylist->itemAt(i, &uri, &itemMeta));
313 
314         if (itemMeta->findString("cipher-method", &method)) {
315             found = true;
316             break;
317         }
318     }
319 
320     // TODO: Revise this when we add support for KEYFORMAT
321     // If method has changed (e.g., -> NONE); sufficient to check at the segment boundary
322     if (mSampleAesKeyItem != NULL && first && found && method != "SAMPLE-AES") {
323         ALOGI("decryptBuffer: resetting mSampleAesKeyItem(%p) with method %s",
324                 mSampleAesKeyItem.get(), method.c_str());
325         mSampleAesKeyItem = NULL;
326         mSampleAesKeyItemChanged = true;
327     }
328 
329     if (!found) {
330         method = "NONE";
331     }
332     buffer->meta()->setString("cipher-method", method.c_str());
333 
334     if (method == "NONE") {
335         return OK;
336     } else if (method == "SAMPLE-AES") {
337         ALOGV("decryptBuffer: Non-Widevine SAMPLE-AES is supported now.");
338     } else if (!(method == "AES-128")) {
339         ALOGE("Unsupported cipher method '%s'", method.c_str());
340         return ERROR_UNSUPPORTED;
341     }
342 
343     AString keyURI;
344     if (!itemMeta->findString("cipher-uri", &keyURI)) {
345         ALOGE("Missing key uri");
346         return ERROR_MALFORMED;
347     }
348 
349     ssize_t index = mAESKeyForURI.indexOfKey(keyURI);
350 
351     sp<ABuffer> key;
352     if (index >= 0) {
353         key = mAESKeyForURI.valueAt(index);
354     } else if (keyURI.startsWith("data:")) {
355         sp<DataSource> keySrc = DataURISource::Create(keyURI.c_str());
356         off64_t keyLen;
357         if (keySrc == NULL || keySrc->getSize(&keyLen) != OK || keyLen < 0) {
358             ALOGE("Malformed cipher key data uri.");
359             return ERROR_MALFORMED;
360         }
361         key = new ABuffer(keyLen);
362         keySrc->readAt(0, key->data(), keyLen);
363         key->setRange(0, keyLen);
364     } else {
365         ssize_t err = mHTTPDownloader->fetchFile(keyURI.c_str(), &key);
366 
367         if (err == ERROR_NOT_CONNECTED) {
368             return ERROR_NOT_CONNECTED;
369         } else if (err < 0) {
370             ALOGE("failed to fetch cipher key from '%s'.", uriDebugString(keyURI).c_str());
371             return ERROR_IO;
372         } else if (key->size() != 16) {
373             ALOGE("key file '%s' wasn't 16 bytes in size.", uriDebugString(keyURI).c_str());
374             return ERROR_MALFORMED;
375         }
376 
377         mAESKeyForURI.add(keyURI, key);
378     }
379 
380     if (first) {
381         // If decrypting the first block in a file, read the iv from the manifest
382         // or derive the iv from the file's sequence number.
383 
384         unsigned char AESInitVec[AES_BLOCK_SIZE];
385         AString iv;
386         if (itemMeta->findString("cipher-iv", &iv)) {
387             if ((!iv.startsWith("0x") && !iv.startsWith("0X"))
388                     || iv.size() > 16 * 2 + 2) {
389                 ALOGE("malformed cipher IV '%s'.", iv.c_str());
390                 return ERROR_MALFORMED;
391             }
392 
393             while (iv.size() < 16 * 2 + 2) {
394                 iv.insert("0", 1, 2);
395             }
396 
397             memset(AESInitVec, 0, sizeof(AESInitVec));
398             for (size_t i = 0; i < 16; ++i) {
399                 char c1 = tolower(iv.c_str()[2 + 2 * i]);
400                 char c2 = tolower(iv.c_str()[3 + 2 * i]);
401                 if (!isxdigit(c1) || !isxdigit(c2)) {
402                     ALOGE("malformed cipher IV '%s'.", iv.c_str());
403                     return ERROR_MALFORMED;
404                 }
405                 uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10;
406                 uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10;
407 
408                 AESInitVec[i] = nibble1 << 4 | nibble2;
409             }
410         } else {
411             memset(AESInitVec, 0, sizeof(AESInitVec));
412             AESInitVec[15] = mSeqNumber & 0xff;
413             AESInitVec[14] = (mSeqNumber >> 8) & 0xff;
414             AESInitVec[13] = (mSeqNumber >> 16) & 0xff;
415             AESInitVec[12] = (mSeqNumber >> 24) & 0xff;
416         }
417 
418         bool newKey = memcmp(mKeyData, key->data(), AES_BLOCK_SIZE) != 0;
419         bool newInitVec = memcmp(mAESInitVec, AESInitVec, AES_BLOCK_SIZE) != 0;
420         bool newSampleAesKeyItem = newKey || newInitVec;
421         ALOGV("decryptBuffer: SAMPLE-AES newKeyItem %d/%d (Key %d initVec %d)",
422                 mSampleAesKeyItemChanged, newSampleAesKeyItem, newKey, newInitVec);
423 
424         if (newSampleAesKeyItem) {
425             memcpy(mKeyData, key->data(), AES_BLOCK_SIZE);
426             memcpy(mAESInitVec, AESInitVec, AES_BLOCK_SIZE);
427 
428             if (method == "SAMPLE-AES") {
429                 mSampleAesKeyItemChanged = true;
430 
431                 sp<ABuffer> keyDataBuffer = ABuffer::CreateAsCopy(mKeyData, sizeof(mKeyData));
432                 sp<ABuffer> initVecBuffer = ABuffer::CreateAsCopy(mAESInitVec, sizeof(mAESInitVec));
433 
434                 // always allocating a new one rather than updating the old message
435                 // lower layer might still have a reference to the old message
436                 mSampleAesKeyItem = new AMessage();
437                 mSampleAesKeyItem->setBuffer("keyData", keyDataBuffer);
438                 mSampleAesKeyItem->setBuffer("initVec", initVecBuffer);
439 
440                 ALOGV("decryptBuffer: New SampleAesKeyItem: Key: %s  IV: %s",
441                         HlsSampleDecryptor::aesBlockToStr(mKeyData).c_str(),
442                         HlsSampleDecryptor::aesBlockToStr(mAESInitVec).c_str());
443             } // SAMPLE-AES
444         } // newSampleAesKeyItem
445     } // first
446 
447     if (method == "SAMPLE-AES") {
448         ALOGV("decryptBuffer: skipping full-seg decrypt for SAMPLE-AES");
449         return OK;
450     }
451 
452 
453     AES_KEY aes_key;
454     if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) {
455         ALOGE("failed to set AES decryption key.");
456         return UNKNOWN_ERROR;
457     }
458 
459     size_t n = buffer->size();
460     if (!n) {
461         return OK;
462     }
463 
464     if (n < 16 || n % 16) {
465         ALOGE("not enough or trailing bytes (%zu) in encrypted buffer", n);
466         return ERROR_MALFORMED;
467     }
468 
469     AES_cbc_encrypt(
470             buffer->data(), buffer->data(), buffer->size(),
471             &aes_key, mAESInitVec, AES_DECRYPT);
472 
473     return OK;
474 }
475 
checkDecryptPadding(const sp<ABuffer> & buffer)476 status_t PlaylistFetcher::checkDecryptPadding(const sp<ABuffer> &buffer) {
477     AString method;
478     CHECK(buffer->meta()->findString("cipher-method", &method));
479     if (method == "NONE" || method == "SAMPLE-AES") {
480         return OK;
481     }
482 
483     uint8_t padding = 0;
484     if (buffer->size() > 0) {
485         padding = buffer->data()[buffer->size() - 1];
486     }
487 
488     if (padding > 16) {
489         return ERROR_MALFORMED;
490     }
491 
492     for (size_t i = buffer->size() - padding; i < padding; i++) {
493         if (buffer->data()[i] != padding) {
494             return ERROR_MALFORMED;
495         }
496     }
497 
498     buffer->setRange(buffer->offset(), buffer->size() - padding);
499     return OK;
500 }
501 
postMonitorQueue(int64_t delayUs,int64_t minDelayUs)502 void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) {
503     int64_t maxDelayUs = delayUsToRefreshPlaylist();
504     if (maxDelayUs < minDelayUs) {
505         maxDelayUs = minDelayUs;
506     }
507     if (delayUs > maxDelayUs) {
508         FLOGV("Need to refresh playlist in %lld", (long long)maxDelayUs);
509         delayUs = maxDelayUs;
510     }
511     sp<AMessage> msg = new AMessage(kWhatMonitorQueue, this);
512     msg->setInt32("generation", mMonitorQueueGeneration);
513     msg->post(delayUs);
514 }
515 
cancelMonitorQueue()516 void PlaylistFetcher::cancelMonitorQueue() {
517     ++mMonitorQueueGeneration;
518 }
519 
setStoppingThreshold(float thresholdRatio,bool disconnect)520 void PlaylistFetcher::setStoppingThreshold(float thresholdRatio, bool disconnect) {
521     {
522         AutoMutex _l(mThresholdLock);
523         mThresholdRatio = thresholdRatio;
524     }
525     if (disconnect) {
526         mHTTPDownloader->disconnect();
527     }
528 }
529 
resetStoppingThreshold(bool disconnect)530 void PlaylistFetcher::resetStoppingThreshold(bool disconnect) {
531     {
532         AutoMutex _l(mThresholdLock);
533         mThresholdRatio = -1.0f;
534     }
535     if (disconnect) {
536         mHTTPDownloader->disconnect();
537     } else {
538         // allow reconnect
539         mHTTPDownloader->reconnect();
540     }
541 }
542 
getStoppingThreshold()543 float PlaylistFetcher::getStoppingThreshold() {
544     AutoMutex _l(mThresholdLock);
545     return mThresholdRatio;
546 }
547 
startAsync(const sp<AnotherPacketSource> & audioSource,const sp<AnotherPacketSource> & videoSource,const sp<AnotherPacketSource> & subtitleSource,const sp<AnotherPacketSource> & metadataSource,int64_t startTimeUs,int64_t segmentStartTimeUs,int32_t startDiscontinuitySeq,LiveSession::SeekMode seekMode)548 void PlaylistFetcher::startAsync(
549         const sp<AnotherPacketSource> &audioSource,
550         const sp<AnotherPacketSource> &videoSource,
551         const sp<AnotherPacketSource> &subtitleSource,
552         const sp<AnotherPacketSource> &metadataSource,
553         int64_t startTimeUs,
554         int64_t segmentStartTimeUs,
555         int32_t startDiscontinuitySeq,
556         LiveSession::SeekMode seekMode) {
557     sp<AMessage> msg = new AMessage(kWhatStart, this);
558 
559     uint32_t streamTypeMask = 0ul;
560 
561     if (audioSource != NULL) {
562         msg->setPointer("audioSource", audioSource.get());
563         streamTypeMask |= LiveSession::STREAMTYPE_AUDIO;
564     }
565 
566     if (videoSource != NULL) {
567         msg->setPointer("videoSource", videoSource.get());
568         streamTypeMask |= LiveSession::STREAMTYPE_VIDEO;
569     }
570 
571     if (subtitleSource != NULL) {
572         msg->setPointer("subtitleSource", subtitleSource.get());
573         streamTypeMask |= LiveSession::STREAMTYPE_SUBTITLES;
574     }
575 
576     if (metadataSource != NULL) {
577         msg->setPointer("metadataSource", metadataSource.get());
578         // metadataSource does not affect streamTypeMask.
579     }
580 
581     msg->setInt32("streamTypeMask", streamTypeMask);
582     msg->setInt64("startTimeUs", startTimeUs);
583     msg->setInt64("segmentStartTimeUs", segmentStartTimeUs);
584     msg->setInt32("startDiscontinuitySeq", startDiscontinuitySeq);
585     msg->setInt32("seekMode", seekMode);
586     msg->post();
587 }
588 
589 /*
590  * pauseAsync
591  *
592  * threshold: 0.0f - pause after current fetch block (default 47Kbytes)
593  *           -1.0f - pause after finishing current segment
594  *        0.0~1.0f - pause if remaining of current segment exceeds threshold
595  */
pauseAsync(float thresholdRatio,bool disconnect)596 void PlaylistFetcher::pauseAsync(
597         float thresholdRatio, bool disconnect) {
598     setStoppingThreshold(thresholdRatio, disconnect);
599 
600     (new AMessage(kWhatPause, this))->post();
601 }
602 
stopAsync(bool clear)603 void PlaylistFetcher::stopAsync(bool clear) {
604     setStoppingThreshold(0.0f, true /* disconncect */);
605 
606     sp<AMessage> msg = new AMessage(kWhatStop, this);
607     msg->setInt32("clear", clear);
608     msg->post();
609 }
610 
resumeUntilAsync(const sp<AMessage> & params)611 void PlaylistFetcher::resumeUntilAsync(const sp<AMessage> &params) {
612     FLOGV("resumeUntilAsync: params=%s", params->debugString().c_str());
613 
614     AMessage* msg = new AMessage(kWhatResumeUntil, this);
615     msg->setMessage("params", params);
616     msg->post();
617 }
618 
fetchPlaylistAsync()619 void PlaylistFetcher::fetchPlaylistAsync() {
620     (new AMessage(kWhatFetchPlaylist, this))->post();
621 }
622 
onMessageReceived(const sp<AMessage> & msg)623 void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
624     switch (msg->what()) {
625         case kWhatStart:
626         {
627             status_t err = onStart(msg);
628 
629             sp<AMessage> notify = mNotify->dup();
630             notify->setInt32("what", kWhatStarted);
631             notify->setInt32("err", err);
632             notify->post();
633             break;
634         }
635 
636         case kWhatPause:
637         {
638             onPause();
639 
640             sp<AMessage> notify = mNotify->dup();
641             notify->setInt32("what", kWhatPaused);
642             notify->setInt32("seekMode",
643                     mDownloadState->hasSavedState()
644                     ? LiveSession::kSeekModeNextSample
645                     : LiveSession::kSeekModeNextSegment);
646             notify->post();
647             break;
648         }
649 
650         case kWhatStop:
651         {
652             onStop(msg);
653 
654             sp<AMessage> notify = mNotify->dup();
655             notify->setInt32("what", kWhatStopped);
656             notify->post();
657             break;
658         }
659 
660         case kWhatFetchPlaylist:
661         {
662             bool unchanged;
663             sp<M3UParser> playlist = mHTTPDownloader->fetchPlaylist(
664                     mURI.c_str(), NULL /* curPlaylistHash */, &unchanged);
665 
666             sp<AMessage> notify = mNotify->dup();
667             notify->setInt32("what", kWhatPlaylistFetched);
668             notify->setObject("playlist", playlist);
669             notify->post();
670             break;
671         }
672 
673         case kWhatMonitorQueue:
674         case kWhatDownloadNext:
675         {
676             int32_t generation;
677             CHECK(msg->findInt32("generation", &generation));
678 
679             if (generation != mMonitorQueueGeneration) {
680                 // Stale event
681                 break;
682             }
683 
684             if (msg->what() == kWhatMonitorQueue) {
685                 onMonitorQueue();
686             } else {
687                 onDownloadNext();
688             }
689             break;
690         }
691 
692         case kWhatResumeUntil:
693         {
694             onResumeUntil(msg);
695             break;
696         }
697 
698         default:
699             TRESPASS();
700     }
701 }
702 
onStart(const sp<AMessage> & msg)703 status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
704     mPacketSources.clear();
705     mStopParams.clear();
706     mStartTimeUsNotify = mNotify->dup();
707     mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
708     mStartTimeUsNotify->setString("uri", mURI);
709 
710     uint32_t streamTypeMask;
711     CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask));
712 
713     int64_t startTimeUs;
714     int64_t segmentStartTimeUs;
715     int32_t startDiscontinuitySeq;
716     int32_t seekMode;
717     CHECK(msg->findInt64("startTimeUs", &startTimeUs));
718     CHECK(msg->findInt64("segmentStartTimeUs", &segmentStartTimeUs));
719     CHECK(msg->findInt32("startDiscontinuitySeq", &startDiscontinuitySeq));
720     CHECK(msg->findInt32("seekMode", &seekMode));
721 
722     if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) {
723         void *ptr;
724         CHECK(msg->findPointer("audioSource", &ptr));
725 
726         mPacketSources.add(
727                 LiveSession::STREAMTYPE_AUDIO,
728                 static_cast<AnotherPacketSource *>(ptr));
729     }
730 
731     if (streamTypeMask & LiveSession::STREAMTYPE_VIDEO) {
732         void *ptr;
733         CHECK(msg->findPointer("videoSource", &ptr));
734 
735         mPacketSources.add(
736                 LiveSession::STREAMTYPE_VIDEO,
737                 static_cast<AnotherPacketSource *>(ptr));
738     }
739 
740     if (streamTypeMask & LiveSession::STREAMTYPE_SUBTITLES) {
741         void *ptr;
742         CHECK(msg->findPointer("subtitleSource", &ptr));
743 
744         mPacketSources.add(
745                 LiveSession::STREAMTYPE_SUBTITLES,
746                 static_cast<AnotherPacketSource *>(ptr));
747     }
748 
749     void *ptr;
750     // metadataSource is not part of streamTypeMask
751     if ((streamTypeMask & (LiveSession::STREAMTYPE_AUDIO | LiveSession::STREAMTYPE_VIDEO))
752             && msg->findPointer("metadataSource", &ptr)) {
753         mPacketSources.add(
754                 LiveSession::STREAMTYPE_METADATA,
755                 static_cast<AnotherPacketSource *>(ptr));
756     }
757 
758     mStreamTypeMask = streamTypeMask;
759 
760     mSegmentStartTimeUs = segmentStartTimeUs;
761 
762     if (startDiscontinuitySeq >= 0) {
763         mDiscontinuitySeq = startDiscontinuitySeq;
764     }
765 
766     mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
767     mSeekMode = (LiveSession::SeekMode) seekMode;
768 
769     if (startTimeUs >= 0 || mSeekMode == LiveSession::kSeekModeNextSample) {
770         mStartup = true;
771         mIDRFound = false;
772         mVideoBuffer->clear();
773     }
774 
775     if (startTimeUs >= 0) {
776         mStartTimeUs = startTimeUs;
777         mFirstPTSValid = false;
778         mSeqNumber = -1;
779         mTimeChangeSignaled = false;
780         mDownloadState->resetState();
781     }
782 
783     postMonitorQueue();
784 
785     return OK;
786 }
787 
onPause()788 void PlaylistFetcher::onPause() {
789     cancelMonitorQueue();
790     mLastDiscontinuitySeq = mDiscontinuitySeq;
791 
792     resetStoppingThreshold(false /* disconnect */);
793 }
794 
onStop(const sp<AMessage> & msg)795 void PlaylistFetcher::onStop(const sp<AMessage> &msg) {
796     cancelMonitorQueue();
797 
798     int32_t clear;
799     CHECK(msg->findInt32("clear", &clear));
800     if (clear) {
801         for (size_t i = 0; i < mPacketSources.size(); i++) {
802             sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
803             packetSource->clear();
804         }
805     }
806 
807     mDownloadState->resetState();
808     mPacketSources.clear();
809     mStreamTypeMask = 0;
810 
811     resetStoppingThreshold(true /* disconnect */);
812 }
813 
814 // Resume until we have reached the boundary timestamps listed in `msg`; when
815 // the remaining time is too short (within a resume threshold) stop immediately
816 // instead.
onResumeUntil(const sp<AMessage> & msg)817 status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) {
818     sp<AMessage> params;
819     CHECK(msg->findMessage("params", &params));
820 
821     mStopParams = params;
822     onDownloadNext();
823 
824     return OK;
825 }
826 
notifyStopReached()827 void PlaylistFetcher::notifyStopReached() {
828     sp<AMessage> notify = mNotify->dup();
829     notify->setInt32("what", kWhatStopReached);
830     notify->post();
831 }
832 
notifyError(status_t err)833 void PlaylistFetcher::notifyError(status_t err) {
834     sp<AMessage> notify = mNotify->dup();
835     notify->setInt32("what", kWhatError);
836     notify->setInt32("err", err);
837     notify->post();
838 }
839 
queueDiscontinuity(ATSParser::DiscontinuityType type,const sp<AMessage> & extra)840 void PlaylistFetcher::queueDiscontinuity(
841         ATSParser::DiscontinuityType type, const sp<AMessage> &extra) {
842     for (size_t i = 0; i < mPacketSources.size(); ++i) {
843         // do not discard buffer upon #EXT-X-DISCONTINUITY tag
844         // (seek will discard buffer by abandoning old fetchers)
845         mPacketSources.valueAt(i)->queueDiscontinuity(
846                 type, extra, false /* discard */);
847     }
848 }
849 
onMonitorQueue()850 void PlaylistFetcher::onMonitorQueue() {
851     // in the middle of an unfinished download, delay
852     // playlist refresh as it'll change seq numbers
853     if (!mDownloadState->hasSavedState()) {
854         status_t err = refreshPlaylist();
855         if (err != OK) {
856             if (mNumRetriesForMonitorQueue < kMaxNumRetries) {
857                 ++mNumRetriesForMonitorQueue;
858             } else {
859                 notifyError(err);
860             }
861             return;
862         } else {
863             mNumRetriesForMonitorQueue = 0;
864         }
865     }
866 
867     int64_t targetDurationUs = kMinBufferedDurationUs;
868     if (mPlaylist != NULL) {
869         targetDurationUs = mPlaylist->getTargetDuration();
870     }
871 
872     int64_t bufferedDurationUs = 0LL;
873     status_t finalResult = OK;
874     if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
875         sp<AnotherPacketSource> packetSource =
876             mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
877 
878         bufferedDurationUs =
879                 packetSource->getBufferedDurationUs(&finalResult);
880     } else {
881         // Use min stream duration, but ignore streams that never have any packet
882         // enqueued to prevent us from waiting on a non-existent stream;
883         // when we cannot make out from the manifest what streams are included in
884         // a playlist we might assume extra streams.
885         bufferedDurationUs = -1LL;
886         for (size_t i = 0; i < mPacketSources.size(); ++i) {
887             if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0
888                     || mPacketSources[i]->getLatestEnqueuedMeta() == NULL) {
889                 continue;
890             }
891 
892             int64_t bufferedStreamDurationUs =
893                 mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult);
894 
895             FSLOGV(mPacketSources.keyAt(i), "buffered %lld", (long long)bufferedStreamDurationUs);
896 
897             if (bufferedDurationUs == -1LL
898                  || bufferedStreamDurationUs < bufferedDurationUs) {
899                 bufferedDurationUs = bufferedStreamDurationUs;
900             }
901         }
902         if (bufferedDurationUs == -1LL) {
903             bufferedDurationUs = 0LL;
904         }
905     }
906 
907     if (finalResult == OK && bufferedDurationUs < kMinBufferedDurationUs) {
908         FLOGV("monitoring, buffered=%lld < %lld",
909                 (long long)bufferedDurationUs, (long long)kMinBufferedDurationUs);
910 
911         // delay the next download slightly; hopefully this gives other concurrent fetchers
912         // a better chance to run.
913         // onDownloadNext();
914         sp<AMessage> msg = new AMessage(kWhatDownloadNext, this);
915         msg->setInt32("generation", mMonitorQueueGeneration);
916         msg->post(1000L);
917     } else {
918         // We'd like to maintain buffering above durationToBufferUs, so try
919         // again when buffer just about to go below durationToBufferUs
920         // (or after targetDurationUs / 2, whichever is smaller).
921         int64_t delayUs = bufferedDurationUs - kMinBufferedDurationUs + 1000000LL;
922         if (delayUs > targetDurationUs / 2) {
923             delayUs = targetDurationUs / 2;
924         }
925 
926         FLOGV("pausing for %lld, buffered=%lld > %lld",
927                 (long long)delayUs,
928                 (long long)bufferedDurationUs,
929                 (long long)kMinBufferedDurationUs);
930 
931         postMonitorQueue(delayUs);
932     }
933 }
934 
refreshPlaylist()935 status_t PlaylistFetcher::refreshPlaylist() {
936     if (delayUsToRefreshPlaylist() <= 0) {
937         bool unchanged;
938         sp<M3UParser> playlist = mHTTPDownloader->fetchPlaylist(
939                 mURI.c_str(), mPlaylistHash, &unchanged);
940 
941         if (playlist == NULL) {
942             if (unchanged) {
943                 // We succeeded in fetching the playlist, but it was
944                 // unchanged from the last time we tried.
945 
946                 if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) {
947                     mRefreshState = (RefreshState)(mRefreshState + 1);
948                 }
949             } else {
950                 ALOGE("failed to load playlist at url '%s'", uriDebugString(mURI).c_str());
951                 return ERROR_IO;
952             }
953         } else {
954             mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
955             mPlaylist = playlist;
956 
957             if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
958                 updateDuration();
959             }
960             // Notify LiveSession to use target-duration based buffering level
961             // for up/down switch. Default LiveSession::kUpSwitchMark may not
962             // be reachable for live streams, as our max buffering amount is
963             // limited to 3 segments.
964             if (!mPlaylist->isComplete()) {
965                 updateTargetDuration();
966             }
967             mPlaylistTimeUs = ALooper::GetNowUs();
968         }
969 
970         mLastPlaylistFetchTimeUs = ALooper::GetNowUs();
971     }
972     return OK;
973 }
974 
975 // static
bufferStartsWithTsSyncByte(const sp<ABuffer> & buffer)976 bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer) {
977     return buffer->size() > 0 && buffer->data()[0] == 0x47;
978 }
979 
shouldPauseDownload()980 bool PlaylistFetcher::shouldPauseDownload() {
981     if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
982         // doesn't apply to subtitles
983         return false;
984     }
985 
986     // Calculate threshold to abort current download
987     float thresholdRatio = getStoppingThreshold();
988 
989     if (thresholdRatio < 0.0f) {
990         // never abort
991         return false;
992     } else if (thresholdRatio == 0.0f) {
993         // immediately abort
994         return true;
995     }
996 
997     // now we have a positive thresholdUs, abort if remaining
998     // portion to download is over that threshold.
999     if (mSegmentFirstPTS < 0) {
1000         // this means we haven't even find the first access unit,
1001         // abort now as we must be very far away from the end.
1002         return true;
1003     }
1004     int64_t lastEnqueueUs = mSegmentFirstPTS;
1005     for (size_t i = 0; i < mPacketSources.size(); ++i) {
1006         if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) {
1007             continue;
1008         }
1009         sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta();
1010         int32_t type;
1011         if (meta == NULL || meta->findInt32("discontinuity", &type)) {
1012             continue;
1013         }
1014         int64_t tmpUs;
1015         CHECK(meta->findInt64("timeUs", &tmpUs));
1016         if (tmpUs > lastEnqueueUs) {
1017             lastEnqueueUs = tmpUs;
1018         }
1019     }
1020     lastEnqueueUs -= mSegmentFirstPTS;
1021 
1022     int64_t targetDurationUs = mPlaylist->getTargetDuration();
1023     int64_t thresholdUs = thresholdRatio * targetDurationUs;
1024 
1025     FLOGV("%spausing now, thresholdUs %lld, remaining %lld",
1026             targetDurationUs - lastEnqueueUs > thresholdUs ? "" : "not ",
1027             (long long)thresholdUs,
1028             (long long)(targetDurationUs - lastEnqueueUs));
1029 
1030     if (targetDurationUs - lastEnqueueUs > thresholdUs) {
1031         return true;
1032     }
1033     return false;
1034 }
1035 
initSeqNumberForLiveStream(int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)1036 void PlaylistFetcher::initSeqNumberForLiveStream(
1037         int32_t &firstSeqNumberInPlaylist,
1038         int32_t &lastSeqNumberInPlaylist) {
1039     // start at least 3 target durations from the end.
1040     int64_t timeFromEnd = 0;
1041     size_t index = mPlaylist->size();
1042     sp<AMessage> itemMeta;
1043     int64_t itemDurationUs;
1044     int32_t targetDuration;
1045     if (mPlaylist->meta() != NULL
1046             && mPlaylist->meta()->findInt32("target-duration", &targetDuration)) {
1047         do {
1048             --index;
1049             if (!mPlaylist->itemAt(index, NULL /* uri */, &itemMeta)
1050                     || !itemMeta->findInt64("durationUs", &itemDurationUs)) {
1051                 ALOGW("item or itemDurationUs missing");
1052                 mSeqNumber = lastSeqNumberInPlaylist - 3;
1053                 break;
1054             }
1055 
1056             timeFromEnd += itemDurationUs;
1057             mSeqNumber = firstSeqNumberInPlaylist + index;
1058         } while (timeFromEnd < targetDuration * 3E6 && index > 0);
1059     } else {
1060         ALOGW("target-duration missing");
1061         mSeqNumber = lastSeqNumberInPlaylist - 3;
1062     }
1063 
1064     if (mSeqNumber < firstSeqNumberInPlaylist) {
1065         mSeqNumber = firstSeqNumberInPlaylist;
1066     }
1067 }
1068 
initDownloadState(AString & uri,sp<AMessage> & itemMeta,int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)1069 bool PlaylistFetcher::initDownloadState(
1070         AString &uri,
1071         sp<AMessage> &itemMeta,
1072         int32_t &firstSeqNumberInPlaylist,
1073         int32_t &lastSeqNumberInPlaylist) {
1074     status_t err = refreshPlaylist();
1075     firstSeqNumberInPlaylist = 0;
1076     lastSeqNumberInPlaylist = 0;
1077     bool discontinuity = false;
1078 
1079     if (mPlaylist != NULL) {
1080         mPlaylist->getSeqNumberRange(
1081                 &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
1082 
1083         if (mDiscontinuitySeq < 0) {
1084             mDiscontinuitySeq = mPlaylist->getDiscontinuitySeq();
1085         }
1086     }
1087 
1088     mSegmentFirstPTS = -1LL;
1089 
1090     if (mPlaylist != NULL && mSeqNumber < 0) {
1091         CHECK_GE(mStartTimeUs, 0LL);
1092 
1093         if (mSegmentStartTimeUs < 0) {
1094             if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
1095                 // this is a live session
1096                 initSeqNumberForLiveStream(firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
1097             } else {
1098                 // When seeking mSegmentStartTimeUs is unavailable (< 0), we
1099                 // use mStartTimeUs (client supplied timestamp) to determine both start segment
1100                 // and relative position inside a segment
1101                 mSeqNumber = getSeqNumberForTime(mStartTimeUs);
1102                 mStartTimeUs -= getSegmentStartTimeUs(mSeqNumber);
1103             }
1104             mStartTimeUsRelative = true;
1105             FLOGV("Initial sequence number for time %lld is %d from (%d .. %d)",
1106                     (long long)mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist,
1107                     lastSeqNumberInPlaylist);
1108         } else {
1109             // When adapting or track switching, mSegmentStartTimeUs (relative
1110             // to media time 0) is used to determine the start segment; mStartTimeUs (absolute
1111             // timestamps coming from the media container) is used to determine the position
1112             // inside a segments.
1113             if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES
1114                     && mSeekMode != LiveSession::kSeekModeNextSample) {
1115                 // avoid double fetch/decode
1116                 // Use (mSegmentStartTimeUs + 1/2 * targetDurationUs) to search
1117                 // for the starting segment in new variant.
1118                 // If the two variants' segments are aligned, this gives the
1119                 // next segment. If they're not aligned, this gives the segment
1120                 // that overlaps no more than 1/2 * targetDurationUs.
1121                 mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs
1122                         + mPlaylist->getTargetDuration() / 2);
1123             } else {
1124                 mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs);
1125             }
1126             ssize_t minSeq = getSeqNumberForDiscontinuity(mDiscontinuitySeq);
1127             if (mSeqNumber < minSeq) {
1128                 mSeqNumber = minSeq;
1129             }
1130 
1131             if (mSeqNumber < firstSeqNumberInPlaylist) {
1132                 mSeqNumber = firstSeqNumberInPlaylist;
1133             }
1134 
1135             if (mSeqNumber > lastSeqNumberInPlaylist) {
1136                 mSeqNumber = lastSeqNumberInPlaylist;
1137             }
1138             FLOGV("Initial sequence number is %d from (%d .. %d)",
1139                     mSeqNumber, firstSeqNumberInPlaylist,
1140                     lastSeqNumberInPlaylist);
1141         }
1142     }
1143 
1144     // if mPlaylist is NULL then err must be non-OK; but the other way around might not be true
1145     if (mSeqNumber < firstSeqNumberInPlaylist
1146             || mSeqNumber > lastSeqNumberInPlaylist
1147             || err != OK) {
1148         if ((err != OK || !mPlaylist->isComplete()) && mNumRetries < kMaxNumRetries) {
1149             ++mNumRetries;
1150 
1151             if (mSeqNumber > lastSeqNumberInPlaylist || err != OK) {
1152                 // make sure we reach this retry logic on refresh failures
1153                 // by adding an err != OK clause to all enclosing if's.
1154 
1155                 // refresh in increasing fraction (1/2, 1/3, ...) of the
1156                 // playlist's target duration or 3 seconds, whichever is less
1157                 int64_t delayUs = kMaxMonitorDelayUs;
1158                 if (mPlaylist != NULL) {
1159                     delayUs = mPlaylist->size() * mPlaylist->getTargetDuration()
1160                             / (1 + mNumRetries);
1161                 }
1162                 if (delayUs > kMaxMonitorDelayUs) {
1163                     delayUs = kMaxMonitorDelayUs;
1164                 }
1165                 FLOGV("sequence number high: %d from (%d .. %d), "
1166                       "monitor in %lld (retry=%d)",
1167                         mSeqNumber, firstSeqNumberInPlaylist,
1168                         lastSeqNumberInPlaylist, (long long)delayUs, mNumRetries);
1169                 postMonitorQueue(delayUs);
1170                 return false;
1171             }
1172 
1173             if (err != OK) {
1174                 notifyError(err);
1175                 return false;
1176             }
1177 
1178             // we've missed the boat, let's start 3 segments prior to the latest sequence
1179             // number available and signal a discontinuity.
1180 
1181             ALOGI("We've missed the boat, restarting playback."
1182                   "  mStartup=%d, was  looking for %d in %d-%d",
1183                     mStartup, mSeqNumber, firstSeqNumberInPlaylist,
1184                     lastSeqNumberInPlaylist);
1185             if (mStopParams != NULL) {
1186                 // we should have kept on fetching until we hit the boundaries in mStopParams,
1187                 // but since the segments we are supposed to fetch have already rolled off
1188                 // the playlist, i.e. we have already missed the boat, we inevitably have to
1189                 // skip.
1190                 notifyStopReached();
1191                 return false;
1192             }
1193             mSeqNumber = lastSeqNumberInPlaylist - 3;
1194             if (mSeqNumber < firstSeqNumberInPlaylist) {
1195                 mSeqNumber = firstSeqNumberInPlaylist;
1196             }
1197             discontinuity = true;
1198 
1199             // fall through
1200         } else {
1201             if (mPlaylist != NULL) {
1202                 if (mSeqNumber >= firstSeqNumberInPlaylist + (int32_t)mPlaylist->size()
1203                         && !mPlaylist->isComplete()) {
1204                     // Live playlists
1205                     ALOGW("sequence number %d not yet available", mSeqNumber);
1206                     postMonitorQueue(delayUsToRefreshPlaylist());
1207                     return false;
1208                 }
1209                 ALOGE("Cannot find sequence number %d in playlist "
1210                      "(contains %d - %d)",
1211                      mSeqNumber, firstSeqNumberInPlaylist,
1212                       firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1);
1213 
1214                 if (mTSParser != NULL) {
1215                     mTSParser->signalEOS(ERROR_END_OF_STREAM);
1216                     // Use an empty buffer; we don't have any new data, just want to extract
1217                     // potential new access units after flush.  Reset mSeqNumber to
1218                     // lastSeqNumberInPlaylist such that we set the correct access unit
1219                     // properties in extractAndQueueAccessUnitsFromTs.
1220                     sp<ABuffer> buffer = new ABuffer(0);
1221                     mSeqNumber = lastSeqNumberInPlaylist;
1222                     extractAndQueueAccessUnitsFromTs(buffer);
1223                 }
1224                 notifyError(ERROR_END_OF_STREAM);
1225             } else {
1226                 // It's possible that we were never able to download the playlist.
1227                 // In this case we should notify error, instead of EOS, as EOS during
1228                 // prepare means we succeeded in downloading everything.
1229                 ALOGE("Failed to download playlist!");
1230                 notifyError(ERROR_IO);
1231             }
1232 
1233             return false;
1234         }
1235     }
1236 
1237     mNumRetries = 0;
1238 
1239     CHECK(mPlaylist->itemAt(
1240                 mSeqNumber - firstSeqNumberInPlaylist,
1241                 &uri,
1242                 &itemMeta));
1243 
1244     CHECK(itemMeta->findInt32("discontinuity-sequence", &mDiscontinuitySeq));
1245 
1246     int32_t val;
1247     if (itemMeta->findInt32("discontinuity", &val) && val != 0) {
1248         discontinuity = true;
1249     } else if (mLastDiscontinuitySeq >= 0
1250             && mDiscontinuitySeq != mLastDiscontinuitySeq) {
1251         // Seek jumped to a new discontinuity sequence. We need to signal
1252         // a format change to decoder. Decoder needs to shutdown and be
1253         // created again if seamless format change is unsupported.
1254         FLOGV("saw discontinuity: mStartup %d, mLastDiscontinuitySeq %d, "
1255                 "mDiscontinuitySeq %d, mStartTimeUs %lld",
1256                 mStartup, mLastDiscontinuitySeq, mDiscontinuitySeq, (long long)mStartTimeUs);
1257         discontinuity = true;
1258     }
1259     mLastDiscontinuitySeq = -1;
1260 
1261     // decrypt a junk buffer to prefetch key; since a session uses only one http connection,
1262     // this avoids interleaved connections to the key and segment file.
1263     {
1264         sp<ABuffer> junk = new ABuffer(16);
1265         junk->setRange(0, 16);
1266         status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, junk,
1267                 true /* first */);
1268         if (err == ERROR_NOT_CONNECTED) {
1269             return false;
1270         } else if (err != OK) {
1271             notifyError(err);
1272             return false;
1273         }
1274     }
1275 
1276     if ((mStartup && !mTimeChangeSignaled) || discontinuity) {
1277         // We need to signal a time discontinuity to ATSParser on the
1278         // first segment after start, or on a discontinuity segment.
1279         // Setting mNextPTSTimeUs informs extractAndQueueAccessUnitsXX()
1280         // to send the time discontinuity.
1281         if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
1282             // If this was a live event this made no sense since
1283             // we don't have access to all the segment before the current
1284             // one.
1285             mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber);
1286         }
1287 
1288         // Setting mTimeChangeSignaled to true, so that if start time
1289         // searching goes into 2nd segment (without a discontinuity),
1290         // we don't reset time again. It causes corruption when pending
1291         // data in ATSParser is cleared.
1292         mTimeChangeSignaled = true;
1293     }
1294 
1295     if (discontinuity) {
1296         ALOGI("queueing discontinuity (explicit=%d)", discontinuity);
1297 
1298         // Signal a format discontinuity to ATSParser to clear partial data
1299         // from previous streams. Not doing this causes bitstream corruption.
1300         if (mTSParser != NULL) {
1301             mTSParser.clear();
1302         }
1303 
1304         queueDiscontinuity(
1305                 ATSParser::DISCONTINUITY_FORMAT_ONLY,
1306                 NULL /* extra */);
1307 
1308         if (mStartup && mStartTimeUsRelative && mFirstPTSValid) {
1309             // This means we guessed mStartTimeUs to be in the previous
1310             // segment (likely very close to the end), but either video or
1311             // audio has not found start by the end of that segment.
1312             //
1313             // If this new segment is not a discontinuity, keep searching.
1314             //
1315             // If this new segment even got a discontinuity marker, just
1316             // set mStartTimeUs=0, and take all samples from now on.
1317             mStartTimeUs = 0;
1318             mFirstPTSValid = false;
1319             mIDRFound = false;
1320             mVideoBuffer->clear();
1321         }
1322     }
1323 
1324     FLOGV("fetching segment %d from (%d .. %d)",
1325             mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
1326     return true;
1327 }
1328 
onDownloadNext()1329 void PlaylistFetcher::onDownloadNext() {
1330     AString uri;
1331     sp<AMessage> itemMeta;
1332     sp<ABuffer> buffer;
1333     sp<ABuffer> tsBuffer;
1334     int32_t firstSeqNumberInPlaylist = 0;
1335     int32_t lastSeqNumberInPlaylist = 0;
1336     bool connectHTTP = true;
1337 
1338     if (mDownloadState->hasSavedState()) {
1339         mDownloadState->restoreState(
1340                 uri,
1341                 itemMeta,
1342                 buffer,
1343                 tsBuffer,
1344                 firstSeqNumberInPlaylist,
1345                 lastSeqNumberInPlaylist);
1346         connectHTTP = false;
1347         FLOGV("resuming: '%s'", uri.c_str());
1348     } else {
1349         if (!initDownloadState(
1350                 uri,
1351                 itemMeta,
1352                 firstSeqNumberInPlaylist,
1353                 lastSeqNumberInPlaylist)) {
1354             return;
1355         }
1356         FLOGV("fetching: '%s'", uri.c_str());
1357     }
1358 
1359     int64_t range_offset, range_length;
1360     if (!itemMeta->findInt64("range-offset", &range_offset)
1361             || !itemMeta->findInt64("range-length", &range_length)) {
1362         range_offset = 0;
1363         range_length = -1;
1364     }
1365 
1366     // block-wise download
1367     bool shouldPause = false;
1368     ssize_t bytesRead;
1369     do {
1370         int64_t startUs = ALooper::GetNowUs();
1371         bytesRead = mHTTPDownloader->fetchBlock(
1372                 uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize,
1373                 NULL /* actualURL */, connectHTTP);
1374         int64_t delayUs = ALooper::GetNowUs() - startUs;
1375 
1376         if (bytesRead == ERROR_NOT_CONNECTED) {
1377             return;
1378         }
1379         if (bytesRead < 0) {
1380             status_t err = bytesRead;
1381             ALOGE("failed to fetch .ts segment at url '%s'", uriDebugString(uri).c_str());
1382             notifyError(err);
1383             return;
1384         }
1385 
1386         // add sample for bandwidth estimation, excluding samples from subtitles (as
1387         // its too small), or during startup/resumeUntil (when we could have more than
1388         // one connection open which affects bandwidth)
1389         if (!mStartup && mStopParams == NULL && bytesRead > 0
1390                 && (mStreamTypeMask
1391                         & (LiveSession::STREAMTYPE_AUDIO
1392                         | LiveSession::STREAMTYPE_VIDEO))) {
1393             mSession->addBandwidthMeasurement(bytesRead, delayUs);
1394             if (delayUs > 2000000LL) {
1395                 FLOGV("bytesRead %zd took %.2f seconds - abnormal bandwidth dip",
1396                         bytesRead, (double)delayUs / 1.0e6);
1397             }
1398         }
1399 
1400         connectHTTP = false;
1401 
1402         CHECK(buffer != NULL);
1403 
1404         size_t size = buffer->size();
1405         // Set decryption range.
1406         buffer->setRange(size - bytesRead, bytesRead);
1407         status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer,
1408                 buffer->offset() == 0 /* first */);
1409         // Unset decryption range.
1410         buffer->setRange(0, size);
1411 
1412         if (err != OK) {
1413             ALOGE("decryptBuffer failed w/ error %d", err);
1414 
1415             notifyError(err);
1416             return;
1417         }
1418 
1419         bool startUp = mStartup; // save current start up state
1420 
1421         err = OK;
1422         if (bufferStartsWithTsSyncByte(buffer)) {
1423             // Incremental extraction is only supported for MPEG2 transport streams.
1424             if (tsBuffer == NULL) {
1425                 tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
1426                 tsBuffer->setRange(0, 0);
1427             } else if (tsBuffer->capacity() != buffer->capacity()) {
1428                 size_t tsOff = tsBuffer->offset(), tsSize = tsBuffer->size();
1429                 tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
1430                 tsBuffer->setRange(tsOff, tsSize);
1431             }
1432             tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead);
1433             err = extractAndQueueAccessUnitsFromTs(tsBuffer);
1434         }
1435 
1436         if (err == -EAGAIN) {
1437             // starting sequence number too low/high
1438             mTSParser.clear();
1439             for (size_t i = 0; i < mPacketSources.size(); i++) {
1440                 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1441                 packetSource->clear();
1442             }
1443             postMonitorQueue();
1444             return;
1445         } else if (err == ERROR_OUT_OF_RANGE) {
1446             // reached stopping point
1447             notifyStopReached();
1448             return;
1449         } else if (err != OK) {
1450             notifyError(err);
1451             return;
1452         }
1453         // If we're switching, post start notification
1454         // this should only be posted when the last chunk is full processed by TSParser
1455         if (mSeekMode != LiveSession::kSeekModeExactPosition && startUp != mStartup) {
1456             CHECK(mStartTimeUsNotify != NULL);
1457             mStartTimeUsNotify->post();
1458             mStartTimeUsNotify.clear();
1459             shouldPause = true;
1460         }
1461         if (shouldPause || shouldPauseDownload()) {
1462             // save state and return if this is not the last chunk,
1463             // leaving the fetcher in paused state.
1464             if (bytesRead != 0) {
1465                 mDownloadState->saveState(
1466                         uri,
1467                         itemMeta,
1468                         buffer,
1469                         tsBuffer,
1470                         firstSeqNumberInPlaylist,
1471                         lastSeqNumberInPlaylist);
1472                 return;
1473             }
1474             shouldPause = true;
1475         }
1476     } while (bytesRead != 0);
1477 
1478     if (bufferStartsWithTsSyncByte(buffer)) {
1479         // If we don't see a stream in the program table after fetching a full ts segment
1480         // mark it as nonexistent.
1481         ATSParser::SourceType srcTypes[] =
1482                 { ATSParser::VIDEO, ATSParser::AUDIO };
1483         LiveSession::StreamType streamTypes[] =
1484                 { LiveSession::STREAMTYPE_VIDEO, LiveSession::STREAMTYPE_AUDIO };
1485         const size_t kNumTypes = NELEM(srcTypes);
1486 
1487         for (size_t i = 0; i < kNumTypes; i++) {
1488             ATSParser::SourceType srcType = srcTypes[i];
1489             LiveSession::StreamType streamType = streamTypes[i];
1490 
1491             sp<AnotherPacketSource> source =
1492                 static_cast<AnotherPacketSource *>(
1493                     mTSParser->getSource(srcType).get());
1494 
1495             if (!mTSParser->hasSource(srcType)) {
1496                 ALOGW("MPEG2 Transport stream does not contain %s data.",
1497                       srcType == ATSParser::VIDEO ? "video" : "audio");
1498 
1499                 mStreamTypeMask &= ~streamType;
1500                 mPacketSources.removeItem(streamType);
1501             }
1502         }
1503 
1504     }
1505 
1506     if (checkDecryptPadding(buffer) != OK) {
1507         ALOGE("Incorrect padding bytes after decryption.");
1508         notifyError(ERROR_MALFORMED);
1509         return;
1510     }
1511 
1512     if (tsBuffer != NULL) {
1513         AString method;
1514         CHECK(buffer->meta()->findString("cipher-method", &method));
1515         if ((tsBuffer->size() > 0 && method == "NONE")
1516                 || tsBuffer->size() > 16) {
1517             ALOGE("MPEG2 transport stream is not an even multiple of 188 "
1518                     "bytes in length.");
1519             notifyError(ERROR_MALFORMED);
1520             return;
1521         }
1522     }
1523 
1524     // bulk extract non-ts files
1525     bool startUp = mStartup;
1526     if (tsBuffer == NULL) {
1527         status_t err = extractAndQueueAccessUnits(buffer, itemMeta);
1528         if (err == -EAGAIN) {
1529             // starting sequence number too low/high
1530             postMonitorQueue();
1531             return;
1532         } else if (err == ERROR_OUT_OF_RANGE) {
1533             // reached stopping point
1534             notifyStopReached();
1535             return;
1536         } else if (err != OK) {
1537             notifyError(err);
1538             return;
1539         }
1540     }
1541 
1542     ++mSeqNumber;
1543 
1544     // if adapting, pause after found the next starting point
1545     if (mSeekMode != LiveSession::kSeekModeExactPosition && startUp != mStartup) {
1546         CHECK(mStartTimeUsNotify != NULL);
1547         mStartTimeUsNotify->post();
1548         mStartTimeUsNotify.clear();
1549         shouldPause = true;
1550     }
1551 
1552     if (!shouldPause) {
1553         postMonitorQueue();
1554     }
1555 }
1556 
1557 /*
1558  * returns true if we need to adjust mSeqNumber
1559  */
adjustSeqNumberWithAnchorTime(int64_t anchorTimeUs)1560 bool PlaylistFetcher::adjustSeqNumberWithAnchorTime(int64_t anchorTimeUs) {
1561     int32_t firstSeqNumberInPlaylist = mPlaylist->getFirstSeqNumber();
1562 
1563     int64_t minDiffUs, maxDiffUs;
1564     if (mSeekMode == LiveSession::kSeekModeNextSample) {
1565         // if the previous fetcher paused in the middle of a segment, we
1566         // want to start at a segment that overlaps the last sample
1567         minDiffUs = -mPlaylist->getTargetDuration();
1568         maxDiffUs = 0LL;
1569     } else {
1570         // if the previous fetcher paused at the end of a segment, ideally
1571         // we want to start at the segment that's roughly aligned with its
1572         // next segment, but if the two variants are not well aligned we
1573         // adjust the diff to within (-T/2, T/2)
1574         minDiffUs = -mPlaylist->getTargetDuration() / 2;
1575         maxDiffUs = mPlaylist->getTargetDuration() / 2;
1576     }
1577 
1578     int32_t oldSeqNumber = mSeqNumber;
1579     ssize_t index = mSeqNumber - firstSeqNumberInPlaylist;
1580 
1581     // adjust anchorTimeUs to within (minDiffUs, maxDiffUs) from mStartTimeUs
1582     int64_t diffUs = anchorTimeUs - mStartTimeUs;
1583     if (diffUs > maxDiffUs) {
1584         while (index > 0 && diffUs > maxDiffUs) {
1585             --index;
1586 
1587             sp<AMessage> itemMeta;
1588             CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta));
1589 
1590             int64_t itemDurationUs;
1591             CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1592 
1593             diffUs -= itemDurationUs;
1594         }
1595     } else if (diffUs < minDiffUs) {
1596         while (index + 1 < (ssize_t) mPlaylist->size()
1597                 && diffUs < minDiffUs) {
1598             ++index;
1599 
1600             sp<AMessage> itemMeta;
1601             CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta));
1602 
1603             int64_t itemDurationUs;
1604             CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1605 
1606             diffUs += itemDurationUs;
1607         }
1608     }
1609 
1610     mSeqNumber = firstSeqNumberInPlaylist + index;
1611 
1612     if (mSeqNumber != oldSeqNumber) {
1613         FLOGV("guessed wrong seg number: diff %lld out of [%lld, %lld]",
1614                 (long long) anchorTimeUs - mStartTimeUs,
1615                 (long long) minDiffUs,
1616                 (long long) maxDiffUs);
1617         return true;
1618     }
1619     return false;
1620 }
1621 
getSeqNumberForDiscontinuity(size_t discontinuitySeq) const1622 int32_t PlaylistFetcher::getSeqNumberForDiscontinuity(size_t discontinuitySeq) const {
1623     int32_t firstSeqNumberInPlaylist = mPlaylist->getFirstSeqNumber();
1624 
1625     size_t index = 0;
1626     while (index < mPlaylist->size()) {
1627         sp<AMessage> itemMeta;
1628         CHECK(mPlaylist->itemAt( index, NULL /* uri */, &itemMeta));
1629         size_t curDiscontinuitySeq;
1630         CHECK(itemMeta->findInt32("discontinuity-sequence", (int32_t *)&curDiscontinuitySeq));
1631         int32_t seqNumber = firstSeqNumberInPlaylist + index;
1632         if (curDiscontinuitySeq == discontinuitySeq) {
1633             return seqNumber;
1634         } else if (curDiscontinuitySeq > discontinuitySeq) {
1635             return seqNumber <= 0 ? 0 : seqNumber - 1;
1636         }
1637 
1638         ++index;
1639     }
1640 
1641     return firstSeqNumberInPlaylist + mPlaylist->size();
1642 }
1643 
getSeqNumberForTime(int64_t timeUs) const1644 int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const {
1645     size_t index = 0;
1646     int64_t segmentStartUs = 0;
1647     while (index < mPlaylist->size()) {
1648         sp<AMessage> itemMeta;
1649         CHECK(mPlaylist->itemAt(
1650                     index, NULL /* uri */, &itemMeta));
1651 
1652         int64_t itemDurationUs;
1653         CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1654 
1655         if (timeUs < segmentStartUs + itemDurationUs) {
1656             break;
1657         }
1658 
1659         segmentStartUs += itemDurationUs;
1660         ++index;
1661     }
1662 
1663     if (index >= mPlaylist->size()) {
1664         index = mPlaylist->size() - 1;
1665     }
1666 
1667     return mPlaylist->getFirstSeqNumber() + index;
1668 }
1669 
setAccessUnitProperties(const sp<ABuffer> & accessUnit,const sp<AnotherPacketSource> & source,bool discard)1670 const sp<ABuffer> &PlaylistFetcher::setAccessUnitProperties(
1671         const sp<ABuffer> &accessUnit, const sp<AnotherPacketSource> &source, bool discard) {
1672     sp<MetaData> format = source->getFormat();
1673     if (format != NULL) {
1674         // for simplicity, store a reference to the format in each unit
1675         accessUnit->meta()->setObject("format", format);
1676     }
1677 
1678     if (discard) {
1679         accessUnit->meta()->setInt32("discard", discard);
1680     }
1681 
1682     accessUnit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1683     accessUnit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1684     accessUnit->meta()->setInt64("segmentFirstTimeUs", mSegmentFirstPTS);
1685     accessUnit->meta()->setInt64("segmentDurationUs", getSegmentDurationUs(mSeqNumber));
1686     if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
1687         accessUnit->meta()->setInt64("playlistTimeUs", mPlaylistTimeUs);
1688     }
1689     return accessUnit;
1690 }
1691 
isStartTimeReached(int64_t timeUs)1692 bool PlaylistFetcher::isStartTimeReached(int64_t timeUs) {
1693     if (!mFirstPTSValid) {
1694         mFirstTimeUs = timeUs;
1695         mFirstPTSValid = true;
1696     }
1697     bool startTimeReached = true;
1698     if (mStartTimeUsRelative) {
1699         FLOGV("startTimeUsRelative, timeUs (%lld) - %lld = %lld",
1700                 (long long)timeUs,
1701                 (long long)mFirstTimeUs,
1702                 (long long)(timeUs - mFirstTimeUs));
1703         timeUs -= mFirstTimeUs;
1704         if (timeUs < 0) {
1705             FLOGV("clamp negative timeUs to 0");
1706             timeUs = 0;
1707         }
1708         startTimeReached = (timeUs >= mStartTimeUs);
1709     }
1710     return startTimeReached;
1711 }
1712 
extractAndQueueAccessUnitsFromTs(const sp<ABuffer> & buffer)1713 status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer) {
1714     if (mTSParser == NULL) {
1715         // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers.
1716         mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE);
1717     }
1718 
1719     if (mNextPTSTimeUs >= 0LL) {
1720         sp<AMessage> extra = new AMessage;
1721         // Since we are using absolute timestamps, signal an offset of 0 to prevent
1722         // ATSParser from skewing the timestamps of access units.
1723         extra->setInt64(kATSParserKeyMediaTimeUs, 0);
1724 
1725         // When adapting, signal a recent media time to the parser,
1726         // so that PTS wrap around is handled for the new variant.
1727         if (mStartTimeUs >= 0 && !mStartTimeUsRelative) {
1728             extra->setInt64(kATSParserKeyRecentMediaTimeUs, mStartTimeUs);
1729         }
1730 
1731         mTSParser->signalDiscontinuity(
1732                 ATSParser::DISCONTINUITY_TIME, extra);
1733 
1734         mNextPTSTimeUs = -1LL;
1735     }
1736 
1737     if (mSampleAesKeyItemChanged) {
1738         mTSParser->signalNewSampleAesKey(mSampleAesKeyItem);
1739         mSampleAesKeyItemChanged = false;
1740     }
1741 
1742     size_t offset = 0;
1743     while (offset + 188 <= buffer->size()) {
1744         status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188);
1745 
1746         if (err != OK) {
1747             return err;
1748         }
1749 
1750         offset += 188;
1751     }
1752     // setRange to indicate consumed bytes.
1753     buffer->setRange(buffer->offset() + offset, buffer->size() - offset);
1754 
1755     if (mSegmentFirstPTS < 0LL) {
1756         // get the smallest first PTS from all streams present in this parser
1757         for (size_t i = mPacketSources.size(); i > 0;) {
1758             i--;
1759             const LiveSession::StreamType stream = mPacketSources.keyAt(i);
1760             if (stream == LiveSession::STREAMTYPE_SUBTITLES) {
1761                 ALOGE("MPEG2 Transport streams do not contain subtitles.");
1762                 return ERROR_MALFORMED;
1763             }
1764             if (stream == LiveSession::STREAMTYPE_METADATA) {
1765                 continue;
1766             }
1767             ATSParser::SourceType type =LiveSession::getSourceTypeForStream(stream);
1768             sp<AnotherPacketSource> source =
1769                 static_cast<AnotherPacketSource *>(
1770                         mTSParser->getSource(type).get());
1771 
1772             if (source == NULL) {
1773                 continue;
1774             }
1775             sp<AMessage> meta = source->getMetaAfterLastDequeued(0);
1776             if (meta != NULL) {
1777                 int64_t timeUs;
1778                 CHECK(meta->findInt64("timeUs", &timeUs));
1779                 if (mSegmentFirstPTS < 0LL || timeUs < mSegmentFirstPTS) {
1780                     mSegmentFirstPTS = timeUs;
1781                 }
1782             }
1783         }
1784         if (mSegmentFirstPTS < 0LL) {
1785             // didn't find any TS packet, can return early
1786             return OK;
1787         }
1788         if (!mStartTimeUsRelative) {
1789             // mStartup
1790             //   mStartup is true until we have queued a packet for all the streams
1791             //   we are fetching. We queue packets whose timestamps are greater than
1792             //   mStartTimeUs.
1793             // mSegmentStartTimeUs >= 0
1794             //   mSegmentStartTimeUs is non-negative when adapting or switching tracks
1795             // adjustSeqNumberWithAnchorTime(timeUs) == true
1796             //   we guessed a seq number that's either too large or too small.
1797             // If this happens, we'll adjust mSeqNumber and restart fetching from new
1798             // location. Note that we only want to adjust once, so set mSegmentStartTimeUs
1799             // to -1 so that we don't enter this chunk next time.
1800             if (mStartup && mSegmentStartTimeUs >= 0
1801                     && adjustSeqNumberWithAnchorTime(mSegmentFirstPTS)) {
1802                 mStartTimeUsNotify = mNotify->dup();
1803                 mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
1804                 mStartTimeUsNotify->setString("uri", mURI);
1805                 mIDRFound = false;
1806                 mSegmentStartTimeUs = -1;
1807                 return -EAGAIN;
1808             }
1809         }
1810     }
1811 
1812     status_t err = OK;
1813     for (size_t i = mPacketSources.size(); i > 0;) {
1814         i--;
1815         sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1816 
1817         const LiveSession::StreamType stream = mPacketSources.keyAt(i);
1818         if (stream == LiveSession::STREAMTYPE_SUBTITLES) {
1819             ALOGE("MPEG2 Transport streams do not contain subtitles.");
1820             return ERROR_MALFORMED;
1821         }
1822 
1823         const char *key = LiveSession::getKeyForStream(stream);
1824         ATSParser::SourceType type =LiveSession::getSourceTypeForStream(stream);
1825 
1826         sp<AnotherPacketSource> source =
1827             static_cast<AnotherPacketSource *>(
1828                     mTSParser->getSource(type).get());
1829 
1830         if (source == NULL) {
1831             continue;
1832         }
1833 
1834         const char *mime;
1835         sp<MetaData> format  = source->getFormat();
1836         bool isAvc = format != NULL && format->findCString(kKeyMIMEType, &mime)
1837                 && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC);
1838 
1839         sp<ABuffer> accessUnit;
1840         status_t finalResult;
1841         while (source->hasBufferAvailable(&finalResult)
1842                 && source->dequeueAccessUnit(&accessUnit) == OK) {
1843 
1844             int64_t timeUs;
1845             CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
1846 
1847             if (mStartup) {
1848                 bool startTimeReached = isStartTimeReached(timeUs);
1849 
1850                 if (!startTimeReached || (isAvc && !mIDRFound)) {
1851                     // buffer up to the closest preceding IDR frame in the next segement,
1852                     // or the closest succeeding IDR frame after the exact position
1853                     FSLOGV(stream, "timeUs(%lld)-mStartTimeUs(%lld)=%lld, mIDRFound=%d",
1854                             (long long)timeUs,
1855                             (long long)mStartTimeUs,
1856                             (long long)timeUs - mStartTimeUs,
1857                             mIDRFound);
1858                     if (isAvc) {
1859                         if (IsIDR(accessUnit->data(), accessUnit->size())) {
1860                             mVideoBuffer->clear();
1861                             FSLOGV(stream, "found IDR, clear mVideoBuffer");
1862                             mIDRFound = true;
1863                         }
1864                         if (mIDRFound && mStartTimeUsRelative && !startTimeReached) {
1865                             mVideoBuffer->queueAccessUnit(accessUnit);
1866                             FSLOGV(stream, "saving AVC video AccessUnit");
1867                         }
1868                     }
1869                     if (!startTimeReached || (isAvc && !mIDRFound)) {
1870                         continue;
1871                     }
1872                 }
1873             }
1874 
1875             if (mStartTimeUsNotify != NULL) {
1876                 uint32_t streamMask = 0;
1877                 mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask);
1878                 if ((mStreamTypeMask & mPacketSources.keyAt(i))
1879                         && !(streamMask & mPacketSources.keyAt(i))) {
1880                     streamMask |= mPacketSources.keyAt(i);
1881                     mStartTimeUsNotify->setInt32("streamMask", streamMask);
1882                     FSLOGV(stream, "found start point, timeUs=%lld, streamMask becomes %x",
1883                             (long long)timeUs, streamMask);
1884 
1885                     if (streamMask == mStreamTypeMask) {
1886                         FLOGV("found start point for all streams");
1887                         mStartup = false;
1888                     }
1889                 }
1890             }
1891 
1892             if (mStopParams != NULL) {
1893                 int32_t discontinuitySeq;
1894                 int64_t stopTimeUs;
1895                 if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
1896                         || discontinuitySeq > mDiscontinuitySeq
1897                         || !mStopParams->findInt64(key, &stopTimeUs)
1898                         || (discontinuitySeq == mDiscontinuitySeq
1899                                 && timeUs >= stopTimeUs)) {
1900                     FSLOGV(stream, "reached stop point, timeUs=%lld", (long long)timeUs);
1901                     mStreamTypeMask &= ~stream;
1902                     mPacketSources.removeItemsAt(i);
1903                     break;
1904                 }
1905             }
1906 
1907             if (stream == LiveSession::STREAMTYPE_VIDEO) {
1908                 const bool discard = true;
1909                 status_t status;
1910                 while (mVideoBuffer->hasBufferAvailable(&status)) {
1911                     sp<ABuffer> videoBuffer;
1912                     mVideoBuffer->dequeueAccessUnit(&videoBuffer);
1913                     setAccessUnitProperties(videoBuffer, source, discard);
1914                     packetSource->queueAccessUnit(videoBuffer);
1915                     int64_t bufferTimeUs;
1916                     CHECK(videoBuffer->meta()->findInt64("timeUs", &bufferTimeUs));
1917                     FSLOGV(stream, "queueAccessUnit (saved), timeUs=%lld",
1918                             (long long)bufferTimeUs);
1919                 }
1920             } else if (stream == LiveSession::STREAMTYPE_METADATA && !mHasMetadata) {
1921                 mHasMetadata = true;
1922                 sp<AMessage> notify = mNotify->dup();
1923                 notify->setInt32("what", kWhatMetadataDetected);
1924                 notify->post();
1925             }
1926 
1927             setAccessUnitProperties(accessUnit, source);
1928             packetSource->queueAccessUnit(accessUnit);
1929             FSLOGV(stream, "queueAccessUnit, timeUs=%lld", (long long)timeUs);
1930         }
1931 
1932         if (err != OK) {
1933             break;
1934         }
1935     }
1936 
1937     if (err != OK) {
1938         for (size_t i = mPacketSources.size(); i > 0;) {
1939             i--;
1940             sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1941             packetSource->clear();
1942         }
1943         return err;
1944     }
1945 
1946     if (!mStreamTypeMask) {
1947         // Signal gap is filled between original and new stream.
1948         FLOGV("reached stop point for all streams");
1949         return ERROR_OUT_OF_RANGE;
1950     }
1951 
1952     return OK;
1953 }
1954 
1955 /* static */
bufferStartsWithWebVTTMagicSequence(const sp<ABuffer> & buffer)1956 bool PlaylistFetcher::bufferStartsWithWebVTTMagicSequence(
1957         const sp<ABuffer> &buffer) {
1958     size_t pos = 0;
1959 
1960     // skip possible BOM
1961     if (buffer->size() >= pos + 3 &&
1962             !memcmp("\xef\xbb\xbf", buffer->data() + pos, 3)) {
1963         pos += 3;
1964     }
1965 
1966     // accept WEBVTT followed by SPACE, TAB or (CR) LF
1967     if (buffer->size() < pos + 6 ||
1968             memcmp("WEBVTT", buffer->data() + pos, 6)) {
1969         return false;
1970     }
1971     pos += 6;
1972 
1973     if (buffer->size() == pos) {
1974         return true;
1975     }
1976 
1977     uint8_t sep = buffer->data()[pos];
1978     return sep == ' ' || sep == '\t' || sep == '\n' || sep == '\r';
1979 }
1980 
extractAndQueueAccessUnits(const sp<ABuffer> & buffer,const sp<AMessage> & itemMeta)1981 status_t PlaylistFetcher::extractAndQueueAccessUnits(
1982         const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) {
1983     if (bufferStartsWithWebVTTMagicSequence(buffer)) {
1984         if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) {
1985             ALOGE("This stream only contains subtitles.");
1986             return ERROR_MALFORMED;
1987         }
1988 
1989         const sp<AnotherPacketSource> packetSource =
1990             mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
1991 
1992         int64_t durationUs;
1993         CHECK(itemMeta->findInt64("durationUs", &durationUs));
1994         buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber));
1995         buffer->meta()->setInt64("durationUs", durationUs);
1996         buffer->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1997         buffer->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1998         buffer->meta()->setInt32("subtitleGeneration", mSubtitleGeneration);
1999         packetSource->queueAccessUnit(buffer);
2000         return OK;
2001     }
2002 
2003     if (mNextPTSTimeUs >= 0LL) {
2004         mNextPTSTimeUs = -1LL;
2005     }
2006 
2007     // This better be an ISO 13818-7 (AAC) or ISO 13818-1 (MPEG) audio
2008     // stream prefixed by an ID3 tag.
2009 
2010     bool firstID3Tag = true;
2011     uint64_t PTS = 0;
2012 
2013     for (;;) {
2014         // Make sure to skip all ID3 tags preceding the audio data.
2015         // At least one must be present to provide the PTS timestamp.
2016 
2017         ID3 id3(buffer->data(), buffer->size(), true /* ignoreV1 */);
2018         if (!id3.isValid()) {
2019             if (firstID3Tag) {
2020                 ALOGE("Unable to parse ID3 tag.");
2021                 return ERROR_MALFORMED;
2022             } else {
2023                 break;
2024             }
2025         }
2026 
2027         if (firstID3Tag) {
2028             bool found = false;
2029 
2030             ID3::Iterator it(id3, "PRIV");
2031             while (!it.done()) {
2032                 size_t length;
2033                 const uint8_t *data = it.getData(&length);
2034                 if (!data) {
2035                     return ERROR_MALFORMED;
2036                 }
2037 
2038                 static const char *kMatchName =
2039                     "com.apple.streaming.transportStreamTimestamp";
2040                 static const size_t kMatchNameLen = strlen(kMatchName);
2041 
2042                 if (length == kMatchNameLen + 1 + 8
2043                         && !strncmp((const char *)data, kMatchName, kMatchNameLen)) {
2044                     found = true;
2045                     PTS = U64_AT(&data[kMatchNameLen + 1]);
2046                 }
2047 
2048                 it.next();
2049             }
2050 
2051             if (!found) {
2052                 ALOGE("Unable to extract transportStreamTimestamp from ID3 tag.");
2053                 return ERROR_MALFORMED;
2054             }
2055         }
2056 
2057         // skip the ID3 tag
2058         buffer->setRange(
2059                 buffer->offset() + id3.rawSize(), buffer->size() - id3.rawSize());
2060 
2061         firstID3Tag = false;
2062     }
2063 
2064     if (mStreamTypeMask != LiveSession::STREAMTYPE_AUDIO) {
2065         ALOGW("This stream only contains audio data!");
2066 
2067         mStreamTypeMask &= LiveSession::STREAMTYPE_AUDIO;
2068 
2069         if (mStreamTypeMask == 0) {
2070             return OK;
2071         }
2072     }
2073 
2074     sp<AnotherPacketSource> packetSource =
2075         mPacketSources.valueFor(LiveSession::STREAMTYPE_AUDIO);
2076 
2077     if (packetSource->getFormat() == NULL && buffer->size() >= 7) {
2078         ABitReader bits(buffer->data(), buffer->size());
2079 
2080         // adts_fixed_header
2081 
2082         CHECK_EQ(bits.getBits(12), 0xfffu);
2083         bits.skipBits(3);  // ID, layer
2084         bool protection_absent __unused = bits.getBits(1) != 0;
2085 
2086         unsigned profile = bits.getBits(2);
2087         CHECK_NE(profile, 3u);
2088         unsigned sampling_freq_index = bits.getBits(4);
2089         bits.getBits(1);  // private_bit
2090         unsigned channel_configuration = bits.getBits(3);
2091         CHECK_NE(channel_configuration, 0u);
2092         bits.skipBits(2);  // original_copy, home
2093 
2094         sp<MetaData> meta = new MetaData();
2095         MakeAACCodecSpecificData(*meta,
2096                 profile, sampling_freq_index, channel_configuration);
2097 
2098         meta->setInt32(kKeyIsADTS, true);
2099 
2100         packetSource->setFormat(meta);
2101     }
2102 
2103     int64_t numSamples = 0LL;
2104     int32_t sampleRate;
2105     CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate));
2106 
2107     int64_t timeUs = (PTS * 100LL) / 9LL;
2108     if (mStartup && !mFirstPTSValid) {
2109         mFirstPTSValid = true;
2110         mFirstTimeUs = timeUs;
2111     }
2112 
2113     if (mSegmentFirstPTS < 0LL) {
2114         mSegmentFirstPTS = timeUs;
2115         if (!mStartTimeUsRelative) {
2116             // Duplicated logic from how we handle .ts playlists.
2117             if (mStartup && mSegmentStartTimeUs >= 0
2118                     && adjustSeqNumberWithAnchorTime(timeUs)) {
2119                 mSegmentStartTimeUs = -1;
2120                 return -EAGAIN;
2121             }
2122         }
2123     }
2124 
2125     sp<HlsSampleDecryptor> sampleDecryptor = NULL;
2126     if (mSampleAesKeyItem != NULL) {
2127         ALOGV("extractAndQueueAccessUnits[%d] SampleAesKeyItem: Key: %s  IV: %s",
2128                 mSeqNumber,
2129                 HlsSampleDecryptor::aesBlockToStr(mKeyData).c_str(),
2130                 HlsSampleDecryptor::aesBlockToStr(mAESInitVec).c_str());
2131 
2132         sampleDecryptor = new HlsSampleDecryptor(mSampleAesKeyItem);
2133     }
2134 
2135     int frameId = 0;
2136 
2137     size_t offset = 0;
2138     while (offset < buffer->size()) {
2139         const uint8_t *adtsHeader = buffer->data() + offset;
2140         if (buffer->size() <= offset+5) {
2141             ALOGV("buffer does not contain a complete header");
2142             return ERROR_MALFORMED;
2143         }
2144         // non-const pointer for decryption if needed
2145         uint8_t *adtsFrame = buffer->data() + offset;
2146 
2147         unsigned aac_frame_length =
2148             ((adtsHeader[3] & 3) << 11)
2149             | (adtsHeader[4] << 3)
2150             | (adtsHeader[5] >> 5);
2151 
2152         if (aac_frame_length == 0) {
2153             const uint8_t *id3Header = adtsHeader;
2154             if (!memcmp(id3Header, "ID3", 3)) {
2155                 ID3 id3(id3Header, buffer->size() - offset, true);
2156                 if (id3.isValid()) {
2157                     offset += id3.rawSize();
2158                     continue;
2159                 };
2160             }
2161             return ERROR_MALFORMED;
2162         }
2163 
2164         CHECK_LE(offset + aac_frame_length, buffer->size());
2165 
2166         int64_t unitTimeUs = timeUs + numSamples * 1000000LL / sampleRate;
2167         offset += aac_frame_length;
2168 
2169         // Each AAC frame encodes 1024 samples.
2170         numSamples += 1024;
2171 
2172         if (mStartup) {
2173             int64_t startTimeUs = unitTimeUs;
2174             if (mStartTimeUsRelative) {
2175                 startTimeUs -= mFirstTimeUs;
2176                 if (startTimeUs  < 0) {
2177                     startTimeUs = 0;
2178                 }
2179             }
2180             if (startTimeUs < mStartTimeUs) {
2181                 continue;
2182             }
2183 
2184             if (mStartTimeUsNotify != NULL) {
2185                 mStartTimeUsNotify->setInt32("streamMask", LiveSession::STREAMTYPE_AUDIO);
2186                 mStartup = false;
2187             }
2188         }
2189 
2190         if (mStopParams != NULL) {
2191             int32_t discontinuitySeq;
2192             int64_t stopTimeUs;
2193             if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
2194                     || discontinuitySeq > mDiscontinuitySeq
2195                     || !mStopParams->findInt64("timeUsAudio", &stopTimeUs)
2196                     || (discontinuitySeq == mDiscontinuitySeq && unitTimeUs >= stopTimeUs)) {
2197                 mStreamTypeMask = 0;
2198                 mPacketSources.clear();
2199                 return ERROR_OUT_OF_RANGE;
2200             }
2201         }
2202 
2203         if (sampleDecryptor != NULL) {
2204             bool protection_absent = (adtsHeader[1] & 0x1);
2205             size_t headerSize = protection_absent ? 7 : 9;
2206             if (frameId == 0) {
2207                 ALOGV("extractAndQueueAAC[%d] protection_absent %d (%02x) headerSize %zu",
2208                         mSeqNumber, protection_absent, adtsHeader[1], headerSize);
2209             }
2210 
2211             sampleDecryptor->processAAC(headerSize, adtsFrame, aac_frame_length);
2212         }
2213         frameId++;
2214 
2215         sp<ABuffer> unit = new ABuffer(aac_frame_length);
2216         memcpy(unit->data(), adtsHeader, aac_frame_length);
2217 
2218         unit->meta()->setInt64("timeUs", unitTimeUs);
2219         setAccessUnitProperties(unit, packetSource);
2220         packetSource->queueAccessUnit(unit);
2221     }
2222 
2223     return OK;
2224 }
2225 
updateDuration()2226 void PlaylistFetcher::updateDuration() {
2227     int64_t durationUs = 0LL;
2228     for (size_t index = 0; index < mPlaylist->size(); ++index) {
2229         sp<AMessage> itemMeta;
2230         CHECK(mPlaylist->itemAt(
2231                     index, NULL /* uri */, &itemMeta));
2232 
2233         int64_t itemDurationUs;
2234         CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
2235 
2236         durationUs += itemDurationUs;
2237     }
2238 
2239     sp<AMessage> msg = mNotify->dup();
2240     msg->setInt32("what", kWhatDurationUpdate);
2241     msg->setInt64("durationUs", durationUs);
2242     msg->post();
2243 }
2244 
updateTargetDuration()2245 void PlaylistFetcher::updateTargetDuration() {
2246     sp<AMessage> msg = mNotify->dup();
2247     msg->setInt32("what", kWhatTargetDurationUpdate);
2248     msg->setInt64("targetDurationUs", mPlaylist->getTargetDuration());
2249     msg->post();
2250 }
2251 
2252 }  // namespace android
2253