1 /*
2 * Copyright (C) 2012 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "PlaylistFetcher"
19 #include <android-base/macros.h>
20 #include <utils/Log.h>
21 #include <utils/misc.h>
22
23 #include "PlaylistFetcher.h"
24 #include "HTTPDownloader.h"
25 #include "LiveSession.h"
26 #include "M3UParser.h"
27 #include "include/ID3.h"
28 #include "mpeg2ts/AnotherPacketSource.h"
29 #include "mpeg2ts/HlsSampleDecryptor.h"
30
31 #include <datasource/DataURISource.h>
32 #include <media/stagefright/foundation/ABitReader.h>
33 #include <media/stagefright/foundation/ABuffer.h>
34 #include <media/stagefright/foundation/ADebug.h>
35 #include <media/stagefright/foundation/ByteUtils.h>
36 #include <media/stagefright/foundation/MediaKeys.h>
37 #include <media/stagefright/foundation/avc_utils.h>
38 #include <media/stagefright/MediaDefs.h>
39 #include <media/stagefright/MetaData.h>
40 #include <media/stagefright/MetaDataUtils.h>
41 #include <media/stagefright/Utils.h>
42 #include <media/stagefright/FoundationUtils.h>
43
44 #include <ctype.h>
45 #include <inttypes.h>
46
47 #define FLOGV(fmt, ...) ALOGV("[fetcher-%d] " fmt, mFetcherID, ##__VA_ARGS__)
48 #define FSLOGV(stream, fmt, ...) ALOGV("[fetcher-%d] [%s] " fmt, mFetcherID, \
49 LiveSession::getNameForStream(stream), ##__VA_ARGS__)
50
51 namespace android {
52
53 // static
54 const int64_t PlaylistFetcher::kMinBufferedDurationUs = 30000000LL;
55 const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000LL;
56 // LCM of 188 (size of a TS packet) & 1k works well
57 const int32_t PlaylistFetcher::kDownloadBlockSize = 47 * 1024;
58
59 struct PlaylistFetcher::DownloadState : public RefBase {
60 DownloadState();
61 void resetState();
62 bool hasSavedState() const;
63 void restoreState(
64 AString &uri,
65 sp<AMessage> &itemMeta,
66 sp<ABuffer> &buffer,
67 sp<ABuffer> &tsBuffer,
68 int32_t &firstSeqNumberInPlaylist,
69 int32_t &lastSeqNumberInPlaylist);
70 void saveState(
71 AString &uri,
72 sp<AMessage> &itemMeta,
73 sp<ABuffer> &buffer,
74 sp<ABuffer> &tsBuffer,
75 int32_t &firstSeqNumberInPlaylist,
76 int32_t &lastSeqNumberInPlaylist);
77
78 private:
79 bool mHasSavedState;
80 AString mUri;
81 sp<AMessage> mItemMeta;
82 sp<ABuffer> mBuffer;
83 sp<ABuffer> mTsBuffer;
84 int32_t mFirstSeqNumberInPlaylist;
85 int32_t mLastSeqNumberInPlaylist;
86 };
87
DownloadState()88 PlaylistFetcher::DownloadState::DownloadState() {
89 resetState();
90 }
91
hasSavedState() const92 bool PlaylistFetcher::DownloadState::hasSavedState() const {
93 return mHasSavedState;
94 }
95
resetState()96 void PlaylistFetcher::DownloadState::resetState() {
97 mHasSavedState = false;
98
99 mUri.clear();
100 mItemMeta = NULL;
101 mBuffer = NULL;
102 mTsBuffer = NULL;
103 mFirstSeqNumberInPlaylist = 0;
104 mLastSeqNumberInPlaylist = 0;
105 }
106
restoreState(AString & uri,sp<AMessage> & itemMeta,sp<ABuffer> & buffer,sp<ABuffer> & tsBuffer,int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)107 void PlaylistFetcher::DownloadState::restoreState(
108 AString &uri,
109 sp<AMessage> &itemMeta,
110 sp<ABuffer> &buffer,
111 sp<ABuffer> &tsBuffer,
112 int32_t &firstSeqNumberInPlaylist,
113 int32_t &lastSeqNumberInPlaylist) {
114 if (!mHasSavedState) {
115 return;
116 }
117
118 uri = mUri;
119 itemMeta = mItemMeta;
120 buffer = mBuffer;
121 tsBuffer = mTsBuffer;
122 firstSeqNumberInPlaylist = mFirstSeqNumberInPlaylist;
123 lastSeqNumberInPlaylist = mLastSeqNumberInPlaylist;
124
125 resetState();
126 }
127
saveState(AString & uri,sp<AMessage> & itemMeta,sp<ABuffer> & buffer,sp<ABuffer> & tsBuffer,int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)128 void PlaylistFetcher::DownloadState::saveState(
129 AString &uri,
130 sp<AMessage> &itemMeta,
131 sp<ABuffer> &buffer,
132 sp<ABuffer> &tsBuffer,
133 int32_t &firstSeqNumberInPlaylist,
134 int32_t &lastSeqNumberInPlaylist) {
135 mHasSavedState = true;
136
137 mUri = uri;
138 mItemMeta = itemMeta;
139 mBuffer = buffer;
140 mTsBuffer = tsBuffer;
141 mFirstSeqNumberInPlaylist = firstSeqNumberInPlaylist;
142 mLastSeqNumberInPlaylist = lastSeqNumberInPlaylist;
143 }
144
PlaylistFetcher(const sp<AMessage> & notify,const sp<LiveSession> & session,const char * uri,int32_t id,int32_t subtitleGeneration)145 PlaylistFetcher::PlaylistFetcher(
146 const sp<AMessage> ¬ify,
147 const sp<LiveSession> &session,
148 const char *uri,
149 int32_t id,
150 int32_t subtitleGeneration)
151 : mNotify(notify),
152 mSession(session),
153 mURI(uri),
154 mFetcherID(id),
155 mStreamTypeMask(0),
156 mStartTimeUs(-1LL),
157 mSegmentStartTimeUs(-1LL),
158 mDiscontinuitySeq(-1LL),
159 mStartTimeUsRelative(false),
160 mLastPlaylistFetchTimeUs(-1LL),
161 mPlaylistTimeUs(-1LL),
162 mSeqNumber(-1),
163 mNumRetries(0),
164 mNumRetriesForMonitorQueue(0),
165 mStartup(true),
166 mIDRFound(false),
167 mSeekMode(LiveSession::kSeekModeExactPosition),
168 mTimeChangeSignaled(false),
169 mNextPTSTimeUs(-1LL),
170 mMonitorQueueGeneration(0),
171 mSubtitleGeneration(subtitleGeneration),
172 mLastDiscontinuitySeq(-1LL),
173 mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY),
174 mFirstPTSValid(false),
175 mFirstTimeUs(-1LL),
176 mVideoBuffer(new AnotherPacketSource(NULL)),
177 mSampleAesKeyItemChanged(false),
178 mThresholdRatio(-1.0f),
179 mDownloadState(new DownloadState()),
180 mHasMetadata(false) {
181 memset(mPlaylistHash, 0, sizeof(mPlaylistHash));
182 mHTTPDownloader = mSession->getHTTPDownloader();
183
184 memset(mKeyData, 0, sizeof(mKeyData));
185 memset(mAESInitVec, 0, sizeof(mAESInitVec));
186 }
187
~PlaylistFetcher()188 PlaylistFetcher::~PlaylistFetcher() {
189 }
190
getFetcherID() const191 int32_t PlaylistFetcher::getFetcherID() const {
192 return mFetcherID;
193 }
194
getSegmentStartTimeUs(int32_t seqNumber) const195 int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const {
196 CHECK(mPlaylist != NULL);
197
198 int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist;
199 mPlaylist->getSeqNumberRange(
200 &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
201
202 CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
203 CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
204
205 int64_t segmentStartUs = 0LL;
206 for (int32_t index = 0;
207 index < seqNumber - firstSeqNumberInPlaylist; ++index) {
208 sp<AMessage> itemMeta;
209 CHECK(mPlaylist->itemAt(
210 index, NULL /* uri */, &itemMeta));
211
212 int64_t itemDurationUs;
213 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
214
215 segmentStartUs += itemDurationUs;
216 }
217
218 return segmentStartUs;
219 }
220
getSegmentDurationUs(int32_t seqNumber) const221 int64_t PlaylistFetcher::getSegmentDurationUs(int32_t seqNumber) const {
222 CHECK(mPlaylist != NULL);
223
224 int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist;
225 mPlaylist->getSeqNumberRange(
226 &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
227
228 CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
229 CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
230
231 int32_t index = seqNumber - firstSeqNumberInPlaylist;
232 sp<AMessage> itemMeta;
233 CHECK(mPlaylist->itemAt(
234 index, NULL /* uri */, &itemMeta));
235
236 int64_t itemDurationUs;
237 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
238
239 return itemDurationUs;
240 }
241
delayUsToRefreshPlaylist() const242 int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const {
243 int64_t nowUs = ALooper::GetNowUs();
244
245 if (mPlaylist == NULL || mLastPlaylistFetchTimeUs < 0LL) {
246 CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY);
247 return 0LL;
248 }
249
250 if (mPlaylist->isComplete()) {
251 return (~0LLU >> 1);
252 }
253
254 int64_t targetDurationUs = mPlaylist->getTargetDuration();
255
256 int64_t minPlaylistAgeUs;
257
258 switch (mRefreshState) {
259 case INITIAL_MINIMUM_RELOAD_DELAY:
260 {
261 size_t n = mPlaylist->size();
262 if (n > 0) {
263 sp<AMessage> itemMeta;
264 CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta));
265
266 int64_t itemDurationUs;
267 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
268
269 minPlaylistAgeUs = itemDurationUs;
270 break;
271 }
272
273 FALLTHROUGH_INTENDED;
274 }
275
276 case FIRST_UNCHANGED_RELOAD_ATTEMPT:
277 {
278 minPlaylistAgeUs = targetDurationUs / 2;
279 break;
280 }
281
282 case SECOND_UNCHANGED_RELOAD_ATTEMPT:
283 {
284 minPlaylistAgeUs = (targetDurationUs * 3) / 2;
285 break;
286 }
287
288 case THIRD_UNCHANGED_RELOAD_ATTEMPT:
289 {
290 minPlaylistAgeUs = targetDurationUs * 3;
291 break;
292 }
293
294 default:
295 TRESPASS();
296 break;
297 }
298
299 int64_t delayUs = mLastPlaylistFetchTimeUs + minPlaylistAgeUs - nowUs;
300 return delayUs > 0LL ? delayUs : 0LL;
301 }
302
decryptBuffer(size_t playlistIndex,const sp<ABuffer> & buffer,bool first)303 status_t PlaylistFetcher::decryptBuffer(
304 size_t playlistIndex, const sp<ABuffer> &buffer,
305 bool first) {
306 sp<AMessage> itemMeta;
307 bool found = false;
308 AString method;
309
310 for (ssize_t i = playlistIndex; i >= 0; --i) {
311 AString uri;
312 CHECK(mPlaylist->itemAt(i, &uri, &itemMeta));
313
314 if (itemMeta->findString("cipher-method", &method)) {
315 found = true;
316 break;
317 }
318 }
319
320 // TODO: Revise this when we add support for KEYFORMAT
321 // If method has changed (e.g., -> NONE); sufficient to check at the segment boundary
322 if (mSampleAesKeyItem != NULL && first && found && method != "SAMPLE-AES") {
323 ALOGI("decryptBuffer: resetting mSampleAesKeyItem(%p) with method %s",
324 mSampleAesKeyItem.get(), method.c_str());
325 mSampleAesKeyItem = NULL;
326 mSampleAesKeyItemChanged = true;
327 }
328
329 if (!found) {
330 method = "NONE";
331 }
332 buffer->meta()->setString("cipher-method", method.c_str());
333
334 if (method == "NONE") {
335 return OK;
336 } else if (method == "SAMPLE-AES") {
337 ALOGV("decryptBuffer: Non-Widevine SAMPLE-AES is supported now.");
338 } else if (!(method == "AES-128")) {
339 ALOGE("Unsupported cipher method '%s'", method.c_str());
340 return ERROR_UNSUPPORTED;
341 }
342
343 AString keyURI;
344 if (!itemMeta->findString("cipher-uri", &keyURI)) {
345 ALOGE("Missing key uri");
346 return ERROR_MALFORMED;
347 }
348
349 ssize_t index = mAESKeyForURI.indexOfKey(keyURI);
350
351 sp<ABuffer> key;
352 if (index >= 0) {
353 key = mAESKeyForURI.valueAt(index);
354 } else if (keyURI.startsWith("data:")) {
355 sp<DataSource> keySrc = DataURISource::Create(keyURI.c_str());
356 off64_t keyLen;
357 if (keySrc == NULL || keySrc->getSize(&keyLen) != OK || keyLen < 0) {
358 ALOGE("Malformed cipher key data uri.");
359 return ERROR_MALFORMED;
360 }
361 key = new ABuffer(keyLen);
362 keySrc->readAt(0, key->data(), keyLen);
363 key->setRange(0, keyLen);
364 } else {
365 ssize_t err = mHTTPDownloader->fetchFile(keyURI.c_str(), &key);
366
367 if (err == ERROR_NOT_CONNECTED) {
368 return ERROR_NOT_CONNECTED;
369 } else if (err < 0) {
370 ALOGE("failed to fetch cipher key from '%s'.", uriDebugString(keyURI).c_str());
371 return ERROR_IO;
372 } else if (key->size() != 16) {
373 ALOGE("key file '%s' wasn't 16 bytes in size.", uriDebugString(keyURI).c_str());
374 return ERROR_MALFORMED;
375 }
376
377 mAESKeyForURI.add(keyURI, key);
378 }
379
380 if (first) {
381 // If decrypting the first block in a file, read the iv from the manifest
382 // or derive the iv from the file's sequence number.
383
384 unsigned char AESInitVec[AES_BLOCK_SIZE];
385 AString iv;
386 if (itemMeta->findString("cipher-iv", &iv)) {
387 if ((!iv.startsWith("0x") && !iv.startsWith("0X"))
388 || iv.size() > 16 * 2 + 2) {
389 ALOGE("malformed cipher IV '%s'.", iv.c_str());
390 return ERROR_MALFORMED;
391 }
392
393 while (iv.size() < 16 * 2 + 2) {
394 iv.insert("0", 1, 2);
395 }
396
397 memset(AESInitVec, 0, sizeof(AESInitVec));
398 for (size_t i = 0; i < 16; ++i) {
399 char c1 = tolower(iv.c_str()[2 + 2 * i]);
400 char c2 = tolower(iv.c_str()[3 + 2 * i]);
401 if (!isxdigit(c1) || !isxdigit(c2)) {
402 ALOGE("malformed cipher IV '%s'.", iv.c_str());
403 return ERROR_MALFORMED;
404 }
405 uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10;
406 uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10;
407
408 AESInitVec[i] = nibble1 << 4 | nibble2;
409 }
410 } else {
411 memset(AESInitVec, 0, sizeof(AESInitVec));
412 AESInitVec[15] = mSeqNumber & 0xff;
413 AESInitVec[14] = (mSeqNumber >> 8) & 0xff;
414 AESInitVec[13] = (mSeqNumber >> 16) & 0xff;
415 AESInitVec[12] = (mSeqNumber >> 24) & 0xff;
416 }
417
418 bool newKey = memcmp(mKeyData, key->data(), AES_BLOCK_SIZE) != 0;
419 bool newInitVec = memcmp(mAESInitVec, AESInitVec, AES_BLOCK_SIZE) != 0;
420 bool newSampleAesKeyItem = newKey || newInitVec;
421 ALOGV("decryptBuffer: SAMPLE-AES newKeyItem %d/%d (Key %d initVec %d)",
422 mSampleAesKeyItemChanged, newSampleAesKeyItem, newKey, newInitVec);
423
424 if (newSampleAesKeyItem) {
425 memcpy(mKeyData, key->data(), AES_BLOCK_SIZE);
426 memcpy(mAESInitVec, AESInitVec, AES_BLOCK_SIZE);
427
428 if (method == "SAMPLE-AES") {
429 mSampleAesKeyItemChanged = true;
430
431 sp<ABuffer> keyDataBuffer = ABuffer::CreateAsCopy(mKeyData, sizeof(mKeyData));
432 sp<ABuffer> initVecBuffer = ABuffer::CreateAsCopy(mAESInitVec, sizeof(mAESInitVec));
433
434 // always allocating a new one rather than updating the old message
435 // lower layer might still have a reference to the old message
436 mSampleAesKeyItem = new AMessage();
437 mSampleAesKeyItem->setBuffer("keyData", keyDataBuffer);
438 mSampleAesKeyItem->setBuffer("initVec", initVecBuffer);
439
440 ALOGV("decryptBuffer: New SampleAesKeyItem: Key: %s IV: %s",
441 HlsSampleDecryptor::aesBlockToStr(mKeyData).c_str(),
442 HlsSampleDecryptor::aesBlockToStr(mAESInitVec).c_str());
443 } // SAMPLE-AES
444 } // newSampleAesKeyItem
445 } // first
446
447 if (method == "SAMPLE-AES") {
448 ALOGV("decryptBuffer: skipping full-seg decrypt for SAMPLE-AES");
449 return OK;
450 }
451
452
453 AES_KEY aes_key;
454 if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) {
455 ALOGE("failed to set AES decryption key.");
456 return UNKNOWN_ERROR;
457 }
458
459 size_t n = buffer->size();
460 if (!n) {
461 return OK;
462 }
463
464 if (n < 16 || n % 16) {
465 ALOGE("not enough or trailing bytes (%zu) in encrypted buffer", n);
466 return ERROR_MALFORMED;
467 }
468
469 AES_cbc_encrypt(
470 buffer->data(), buffer->data(), buffer->size(),
471 &aes_key, mAESInitVec, AES_DECRYPT);
472
473 return OK;
474 }
475
checkDecryptPadding(const sp<ABuffer> & buffer)476 status_t PlaylistFetcher::checkDecryptPadding(const sp<ABuffer> &buffer) {
477 AString method;
478 CHECK(buffer->meta()->findString("cipher-method", &method));
479 if (method == "NONE" || method == "SAMPLE-AES") {
480 return OK;
481 }
482
483 uint8_t padding = 0;
484 if (buffer->size() > 0) {
485 padding = buffer->data()[buffer->size() - 1];
486 }
487
488 if (padding > 16) {
489 return ERROR_MALFORMED;
490 }
491
492 for (size_t i = buffer->size() - padding; i < padding; i++) {
493 if (buffer->data()[i] != padding) {
494 return ERROR_MALFORMED;
495 }
496 }
497
498 buffer->setRange(buffer->offset(), buffer->size() - padding);
499 return OK;
500 }
501
postMonitorQueue(int64_t delayUs,int64_t minDelayUs)502 void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) {
503 int64_t maxDelayUs = delayUsToRefreshPlaylist();
504 if (maxDelayUs < minDelayUs) {
505 maxDelayUs = minDelayUs;
506 }
507 if (delayUs > maxDelayUs) {
508 FLOGV("Need to refresh playlist in %lld", (long long)maxDelayUs);
509 delayUs = maxDelayUs;
510 }
511 sp<AMessage> msg = new AMessage(kWhatMonitorQueue, this);
512 msg->setInt32("generation", mMonitorQueueGeneration);
513 msg->post(delayUs);
514 }
515
cancelMonitorQueue()516 void PlaylistFetcher::cancelMonitorQueue() {
517 ++mMonitorQueueGeneration;
518 }
519
setStoppingThreshold(float thresholdRatio,bool disconnect)520 void PlaylistFetcher::setStoppingThreshold(float thresholdRatio, bool disconnect) {
521 {
522 AutoMutex _l(mThresholdLock);
523 mThresholdRatio = thresholdRatio;
524 }
525 if (disconnect) {
526 mHTTPDownloader->disconnect();
527 }
528 }
529
resetStoppingThreshold(bool disconnect)530 void PlaylistFetcher::resetStoppingThreshold(bool disconnect) {
531 {
532 AutoMutex _l(mThresholdLock);
533 mThresholdRatio = -1.0f;
534 }
535 if (disconnect) {
536 mHTTPDownloader->disconnect();
537 } else {
538 // allow reconnect
539 mHTTPDownloader->reconnect();
540 }
541 }
542
getStoppingThreshold()543 float PlaylistFetcher::getStoppingThreshold() {
544 AutoMutex _l(mThresholdLock);
545 return mThresholdRatio;
546 }
547
startAsync(const sp<AnotherPacketSource> & audioSource,const sp<AnotherPacketSource> & videoSource,const sp<AnotherPacketSource> & subtitleSource,const sp<AnotherPacketSource> & metadataSource,int64_t startTimeUs,int64_t segmentStartTimeUs,int32_t startDiscontinuitySeq,LiveSession::SeekMode seekMode)548 void PlaylistFetcher::startAsync(
549 const sp<AnotherPacketSource> &audioSource,
550 const sp<AnotherPacketSource> &videoSource,
551 const sp<AnotherPacketSource> &subtitleSource,
552 const sp<AnotherPacketSource> &metadataSource,
553 int64_t startTimeUs,
554 int64_t segmentStartTimeUs,
555 int32_t startDiscontinuitySeq,
556 LiveSession::SeekMode seekMode) {
557 sp<AMessage> msg = new AMessage(kWhatStart, this);
558
559 uint32_t streamTypeMask = 0ul;
560
561 if (audioSource != NULL) {
562 msg->setPointer("audioSource", audioSource.get());
563 streamTypeMask |= LiveSession::STREAMTYPE_AUDIO;
564 }
565
566 if (videoSource != NULL) {
567 msg->setPointer("videoSource", videoSource.get());
568 streamTypeMask |= LiveSession::STREAMTYPE_VIDEO;
569 }
570
571 if (subtitleSource != NULL) {
572 msg->setPointer("subtitleSource", subtitleSource.get());
573 streamTypeMask |= LiveSession::STREAMTYPE_SUBTITLES;
574 }
575
576 if (metadataSource != NULL) {
577 msg->setPointer("metadataSource", metadataSource.get());
578 // metadataSource does not affect streamTypeMask.
579 }
580
581 msg->setInt32("streamTypeMask", streamTypeMask);
582 msg->setInt64("startTimeUs", startTimeUs);
583 msg->setInt64("segmentStartTimeUs", segmentStartTimeUs);
584 msg->setInt32("startDiscontinuitySeq", startDiscontinuitySeq);
585 msg->setInt32("seekMode", seekMode);
586 msg->post();
587 }
588
589 /*
590 * pauseAsync
591 *
592 * threshold: 0.0f - pause after current fetch block (default 47Kbytes)
593 * -1.0f - pause after finishing current segment
594 * 0.0~1.0f - pause if remaining of current segment exceeds threshold
595 */
pauseAsync(float thresholdRatio,bool disconnect)596 void PlaylistFetcher::pauseAsync(
597 float thresholdRatio, bool disconnect) {
598 setStoppingThreshold(thresholdRatio, disconnect);
599
600 (new AMessage(kWhatPause, this))->post();
601 }
602
stopAsync(bool clear)603 void PlaylistFetcher::stopAsync(bool clear) {
604 setStoppingThreshold(0.0f, true /* disconncect */);
605
606 sp<AMessage> msg = new AMessage(kWhatStop, this);
607 msg->setInt32("clear", clear);
608 msg->post();
609 }
610
resumeUntilAsync(const sp<AMessage> & params)611 void PlaylistFetcher::resumeUntilAsync(const sp<AMessage> ¶ms) {
612 FLOGV("resumeUntilAsync: params=%s", params->debugString().c_str());
613
614 AMessage* msg = new AMessage(kWhatResumeUntil, this);
615 msg->setMessage("params", params);
616 msg->post();
617 }
618
fetchPlaylistAsync()619 void PlaylistFetcher::fetchPlaylistAsync() {
620 (new AMessage(kWhatFetchPlaylist, this))->post();
621 }
622
onMessageReceived(const sp<AMessage> & msg)623 void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
624 switch (msg->what()) {
625 case kWhatStart:
626 {
627 status_t err = onStart(msg);
628
629 sp<AMessage> notify = mNotify->dup();
630 notify->setInt32("what", kWhatStarted);
631 notify->setInt32("err", err);
632 notify->post();
633 break;
634 }
635
636 case kWhatPause:
637 {
638 onPause();
639
640 sp<AMessage> notify = mNotify->dup();
641 notify->setInt32("what", kWhatPaused);
642 notify->setInt32("seekMode",
643 mDownloadState->hasSavedState()
644 ? LiveSession::kSeekModeNextSample
645 : LiveSession::kSeekModeNextSegment);
646 notify->post();
647 break;
648 }
649
650 case kWhatStop:
651 {
652 onStop(msg);
653
654 sp<AMessage> notify = mNotify->dup();
655 notify->setInt32("what", kWhatStopped);
656 notify->post();
657 break;
658 }
659
660 case kWhatFetchPlaylist:
661 {
662 bool unchanged;
663 sp<M3UParser> playlist = mHTTPDownloader->fetchPlaylist(
664 mURI.c_str(), NULL /* curPlaylistHash */, &unchanged);
665
666 sp<AMessage> notify = mNotify->dup();
667 notify->setInt32("what", kWhatPlaylistFetched);
668 notify->setObject("playlist", playlist);
669 notify->post();
670 break;
671 }
672
673 case kWhatMonitorQueue:
674 case kWhatDownloadNext:
675 {
676 int32_t generation;
677 CHECK(msg->findInt32("generation", &generation));
678
679 if (generation != mMonitorQueueGeneration) {
680 // Stale event
681 break;
682 }
683
684 if (msg->what() == kWhatMonitorQueue) {
685 onMonitorQueue();
686 } else {
687 onDownloadNext();
688 }
689 break;
690 }
691
692 case kWhatResumeUntil:
693 {
694 onResumeUntil(msg);
695 break;
696 }
697
698 default:
699 TRESPASS();
700 }
701 }
702
onStart(const sp<AMessage> & msg)703 status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
704 mPacketSources.clear();
705 mStopParams.clear();
706 mStartTimeUsNotify = mNotify->dup();
707 mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
708 mStartTimeUsNotify->setString("uri", mURI);
709
710 uint32_t streamTypeMask;
711 CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask));
712
713 int64_t startTimeUs;
714 int64_t segmentStartTimeUs;
715 int32_t startDiscontinuitySeq;
716 int32_t seekMode;
717 CHECK(msg->findInt64("startTimeUs", &startTimeUs));
718 CHECK(msg->findInt64("segmentStartTimeUs", &segmentStartTimeUs));
719 CHECK(msg->findInt32("startDiscontinuitySeq", &startDiscontinuitySeq));
720 CHECK(msg->findInt32("seekMode", &seekMode));
721
722 if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) {
723 void *ptr;
724 CHECK(msg->findPointer("audioSource", &ptr));
725
726 mPacketSources.add(
727 LiveSession::STREAMTYPE_AUDIO,
728 static_cast<AnotherPacketSource *>(ptr));
729 }
730
731 if (streamTypeMask & LiveSession::STREAMTYPE_VIDEO) {
732 void *ptr;
733 CHECK(msg->findPointer("videoSource", &ptr));
734
735 mPacketSources.add(
736 LiveSession::STREAMTYPE_VIDEO,
737 static_cast<AnotherPacketSource *>(ptr));
738 }
739
740 if (streamTypeMask & LiveSession::STREAMTYPE_SUBTITLES) {
741 void *ptr;
742 CHECK(msg->findPointer("subtitleSource", &ptr));
743
744 mPacketSources.add(
745 LiveSession::STREAMTYPE_SUBTITLES,
746 static_cast<AnotherPacketSource *>(ptr));
747 }
748
749 void *ptr;
750 // metadataSource is not part of streamTypeMask
751 if ((streamTypeMask & (LiveSession::STREAMTYPE_AUDIO | LiveSession::STREAMTYPE_VIDEO))
752 && msg->findPointer("metadataSource", &ptr)) {
753 mPacketSources.add(
754 LiveSession::STREAMTYPE_METADATA,
755 static_cast<AnotherPacketSource *>(ptr));
756 }
757
758 mStreamTypeMask = streamTypeMask;
759
760 mSegmentStartTimeUs = segmentStartTimeUs;
761
762 if (startDiscontinuitySeq >= 0) {
763 mDiscontinuitySeq = startDiscontinuitySeq;
764 }
765
766 mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
767 mSeekMode = (LiveSession::SeekMode) seekMode;
768
769 if (startTimeUs >= 0 || mSeekMode == LiveSession::kSeekModeNextSample) {
770 mStartup = true;
771 mIDRFound = false;
772 mVideoBuffer->clear();
773 }
774
775 if (startTimeUs >= 0) {
776 mStartTimeUs = startTimeUs;
777 mFirstPTSValid = false;
778 mSeqNumber = -1;
779 mTimeChangeSignaled = false;
780 mDownloadState->resetState();
781 }
782
783 postMonitorQueue();
784
785 return OK;
786 }
787
onPause()788 void PlaylistFetcher::onPause() {
789 cancelMonitorQueue();
790 mLastDiscontinuitySeq = mDiscontinuitySeq;
791
792 resetStoppingThreshold(false /* disconnect */);
793 }
794
onStop(const sp<AMessage> & msg)795 void PlaylistFetcher::onStop(const sp<AMessage> &msg) {
796 cancelMonitorQueue();
797
798 int32_t clear;
799 CHECK(msg->findInt32("clear", &clear));
800 if (clear) {
801 for (size_t i = 0; i < mPacketSources.size(); i++) {
802 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
803 packetSource->clear();
804 }
805 }
806
807 mDownloadState->resetState();
808 mPacketSources.clear();
809 mStreamTypeMask = 0;
810
811 resetStoppingThreshold(true /* disconnect */);
812 }
813
814 // Resume until we have reached the boundary timestamps listed in `msg`; when
815 // the remaining time is too short (within a resume threshold) stop immediately
816 // instead.
onResumeUntil(const sp<AMessage> & msg)817 status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) {
818 sp<AMessage> params;
819 CHECK(msg->findMessage("params", ¶ms));
820
821 mStopParams = params;
822 onDownloadNext();
823
824 return OK;
825 }
826
notifyStopReached()827 void PlaylistFetcher::notifyStopReached() {
828 sp<AMessage> notify = mNotify->dup();
829 notify->setInt32("what", kWhatStopReached);
830 notify->post();
831 }
832
notifyError(status_t err)833 void PlaylistFetcher::notifyError(status_t err) {
834 sp<AMessage> notify = mNotify->dup();
835 notify->setInt32("what", kWhatError);
836 notify->setInt32("err", err);
837 notify->post();
838 }
839
queueDiscontinuity(ATSParser::DiscontinuityType type,const sp<AMessage> & extra)840 void PlaylistFetcher::queueDiscontinuity(
841 ATSParser::DiscontinuityType type, const sp<AMessage> &extra) {
842 for (size_t i = 0; i < mPacketSources.size(); ++i) {
843 // do not discard buffer upon #EXT-X-DISCONTINUITY tag
844 // (seek will discard buffer by abandoning old fetchers)
845 mPacketSources.valueAt(i)->queueDiscontinuity(
846 type, extra, false /* discard */);
847 }
848 }
849
onMonitorQueue()850 void PlaylistFetcher::onMonitorQueue() {
851 // in the middle of an unfinished download, delay
852 // playlist refresh as it'll change seq numbers
853 if (!mDownloadState->hasSavedState()) {
854 status_t err = refreshPlaylist();
855 if (err != OK) {
856 if (mNumRetriesForMonitorQueue < kMaxNumRetries) {
857 ++mNumRetriesForMonitorQueue;
858 } else {
859 notifyError(err);
860 }
861 return;
862 } else {
863 mNumRetriesForMonitorQueue = 0;
864 }
865 }
866
867 int64_t targetDurationUs = kMinBufferedDurationUs;
868 if (mPlaylist != NULL) {
869 targetDurationUs = mPlaylist->getTargetDuration();
870 }
871
872 int64_t bufferedDurationUs = 0LL;
873 status_t finalResult = OK;
874 if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
875 sp<AnotherPacketSource> packetSource =
876 mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
877
878 bufferedDurationUs =
879 packetSource->getBufferedDurationUs(&finalResult);
880 } else {
881 // Use min stream duration, but ignore streams that never have any packet
882 // enqueued to prevent us from waiting on a non-existent stream;
883 // when we cannot make out from the manifest what streams are included in
884 // a playlist we might assume extra streams.
885 bufferedDurationUs = -1LL;
886 for (size_t i = 0; i < mPacketSources.size(); ++i) {
887 if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0
888 || mPacketSources[i]->getLatestEnqueuedMeta() == NULL) {
889 continue;
890 }
891
892 int64_t bufferedStreamDurationUs =
893 mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult);
894
895 FSLOGV(mPacketSources.keyAt(i), "buffered %lld", (long long)bufferedStreamDurationUs);
896
897 if (bufferedDurationUs == -1LL
898 || bufferedStreamDurationUs < bufferedDurationUs) {
899 bufferedDurationUs = bufferedStreamDurationUs;
900 }
901 }
902 if (bufferedDurationUs == -1LL) {
903 bufferedDurationUs = 0LL;
904 }
905 }
906
907 if (finalResult == OK && bufferedDurationUs < kMinBufferedDurationUs) {
908 FLOGV("monitoring, buffered=%lld < %lld",
909 (long long)bufferedDurationUs, (long long)kMinBufferedDurationUs);
910
911 // delay the next download slightly; hopefully this gives other concurrent fetchers
912 // a better chance to run.
913 // onDownloadNext();
914 sp<AMessage> msg = new AMessage(kWhatDownloadNext, this);
915 msg->setInt32("generation", mMonitorQueueGeneration);
916 msg->post(1000L);
917 } else {
918 // We'd like to maintain buffering above durationToBufferUs, so try
919 // again when buffer just about to go below durationToBufferUs
920 // (or after targetDurationUs / 2, whichever is smaller).
921 int64_t delayUs = bufferedDurationUs - kMinBufferedDurationUs + 1000000LL;
922 if (delayUs > targetDurationUs / 2) {
923 delayUs = targetDurationUs / 2;
924 }
925
926 FLOGV("pausing for %lld, buffered=%lld > %lld",
927 (long long)delayUs,
928 (long long)bufferedDurationUs,
929 (long long)kMinBufferedDurationUs);
930
931 postMonitorQueue(delayUs);
932 }
933 }
934
refreshPlaylist()935 status_t PlaylistFetcher::refreshPlaylist() {
936 if (delayUsToRefreshPlaylist() <= 0) {
937 bool unchanged;
938 sp<M3UParser> playlist = mHTTPDownloader->fetchPlaylist(
939 mURI.c_str(), mPlaylistHash, &unchanged);
940
941 if (playlist == NULL) {
942 if (unchanged) {
943 // We succeeded in fetching the playlist, but it was
944 // unchanged from the last time we tried.
945
946 if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) {
947 mRefreshState = (RefreshState)(mRefreshState + 1);
948 }
949 } else {
950 ALOGE("failed to load playlist at url '%s'", uriDebugString(mURI).c_str());
951 return ERROR_IO;
952 }
953 } else {
954 mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
955 mPlaylist = playlist;
956
957 if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
958 updateDuration();
959 }
960 // Notify LiveSession to use target-duration based buffering level
961 // for up/down switch. Default LiveSession::kUpSwitchMark may not
962 // be reachable for live streams, as our max buffering amount is
963 // limited to 3 segments.
964 if (!mPlaylist->isComplete()) {
965 updateTargetDuration();
966 }
967 mPlaylistTimeUs = ALooper::GetNowUs();
968 }
969
970 mLastPlaylistFetchTimeUs = ALooper::GetNowUs();
971 }
972 return OK;
973 }
974
975 // static
bufferStartsWithTsSyncByte(const sp<ABuffer> & buffer)976 bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer) {
977 return buffer->size() > 0 && buffer->data()[0] == 0x47;
978 }
979
shouldPauseDownload()980 bool PlaylistFetcher::shouldPauseDownload() {
981 if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
982 // doesn't apply to subtitles
983 return false;
984 }
985
986 // Calculate threshold to abort current download
987 float thresholdRatio = getStoppingThreshold();
988
989 if (thresholdRatio < 0.0f) {
990 // never abort
991 return false;
992 } else if (thresholdRatio == 0.0f) {
993 // immediately abort
994 return true;
995 }
996
997 // now we have a positive thresholdUs, abort if remaining
998 // portion to download is over that threshold.
999 if (mSegmentFirstPTS < 0) {
1000 // this means we haven't even find the first access unit,
1001 // abort now as we must be very far away from the end.
1002 return true;
1003 }
1004 int64_t lastEnqueueUs = mSegmentFirstPTS;
1005 for (size_t i = 0; i < mPacketSources.size(); ++i) {
1006 if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) {
1007 continue;
1008 }
1009 sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta();
1010 int32_t type;
1011 if (meta == NULL || meta->findInt32("discontinuity", &type)) {
1012 continue;
1013 }
1014 int64_t tmpUs;
1015 CHECK(meta->findInt64("timeUs", &tmpUs));
1016 if (tmpUs > lastEnqueueUs) {
1017 lastEnqueueUs = tmpUs;
1018 }
1019 }
1020 lastEnqueueUs -= mSegmentFirstPTS;
1021
1022 int64_t targetDurationUs = mPlaylist->getTargetDuration();
1023 int64_t thresholdUs = thresholdRatio * targetDurationUs;
1024
1025 FLOGV("%spausing now, thresholdUs %lld, remaining %lld",
1026 targetDurationUs - lastEnqueueUs > thresholdUs ? "" : "not ",
1027 (long long)thresholdUs,
1028 (long long)(targetDurationUs - lastEnqueueUs));
1029
1030 if (targetDurationUs - lastEnqueueUs > thresholdUs) {
1031 return true;
1032 }
1033 return false;
1034 }
1035
initSeqNumberForLiveStream(int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)1036 void PlaylistFetcher::initSeqNumberForLiveStream(
1037 int32_t &firstSeqNumberInPlaylist,
1038 int32_t &lastSeqNumberInPlaylist) {
1039 // start at least 3 target durations from the end.
1040 int64_t timeFromEnd = 0;
1041 size_t index = mPlaylist->size();
1042 sp<AMessage> itemMeta;
1043 int64_t itemDurationUs;
1044 int32_t targetDuration;
1045 if (mPlaylist->meta() != NULL
1046 && mPlaylist->meta()->findInt32("target-duration", &targetDuration)) {
1047 do {
1048 --index;
1049 if (!mPlaylist->itemAt(index, NULL /* uri */, &itemMeta)
1050 || !itemMeta->findInt64("durationUs", &itemDurationUs)) {
1051 ALOGW("item or itemDurationUs missing");
1052 mSeqNumber = lastSeqNumberInPlaylist - 3;
1053 break;
1054 }
1055
1056 timeFromEnd += itemDurationUs;
1057 mSeqNumber = firstSeqNumberInPlaylist + index;
1058 } while (timeFromEnd < targetDuration * 3E6 && index > 0);
1059 } else {
1060 ALOGW("target-duration missing");
1061 mSeqNumber = lastSeqNumberInPlaylist - 3;
1062 }
1063
1064 if (mSeqNumber < firstSeqNumberInPlaylist) {
1065 mSeqNumber = firstSeqNumberInPlaylist;
1066 }
1067 }
1068
initDownloadState(AString & uri,sp<AMessage> & itemMeta,int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)1069 bool PlaylistFetcher::initDownloadState(
1070 AString &uri,
1071 sp<AMessage> &itemMeta,
1072 int32_t &firstSeqNumberInPlaylist,
1073 int32_t &lastSeqNumberInPlaylist) {
1074 status_t err = refreshPlaylist();
1075 firstSeqNumberInPlaylist = 0;
1076 lastSeqNumberInPlaylist = 0;
1077 bool discontinuity = false;
1078
1079 if (mPlaylist != NULL) {
1080 mPlaylist->getSeqNumberRange(
1081 &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
1082
1083 if (mDiscontinuitySeq < 0) {
1084 mDiscontinuitySeq = mPlaylist->getDiscontinuitySeq();
1085 }
1086 }
1087
1088 mSegmentFirstPTS = -1LL;
1089
1090 if (mPlaylist != NULL && mSeqNumber < 0) {
1091 CHECK_GE(mStartTimeUs, 0LL);
1092
1093 if (mSegmentStartTimeUs < 0) {
1094 if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
1095 // this is a live session
1096 initSeqNumberForLiveStream(firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
1097 } else {
1098 // When seeking mSegmentStartTimeUs is unavailable (< 0), we
1099 // use mStartTimeUs (client supplied timestamp) to determine both start segment
1100 // and relative position inside a segment
1101 mSeqNumber = getSeqNumberForTime(mStartTimeUs);
1102 mStartTimeUs -= getSegmentStartTimeUs(mSeqNumber);
1103 }
1104 mStartTimeUsRelative = true;
1105 FLOGV("Initial sequence number for time %lld is %d from (%d .. %d)",
1106 (long long)mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist,
1107 lastSeqNumberInPlaylist);
1108 } else {
1109 // When adapting or track switching, mSegmentStartTimeUs (relative
1110 // to media time 0) is used to determine the start segment; mStartTimeUs (absolute
1111 // timestamps coming from the media container) is used to determine the position
1112 // inside a segments.
1113 if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES
1114 && mSeekMode != LiveSession::kSeekModeNextSample) {
1115 // avoid double fetch/decode
1116 // Use (mSegmentStartTimeUs + 1/2 * targetDurationUs) to search
1117 // for the starting segment in new variant.
1118 // If the two variants' segments are aligned, this gives the
1119 // next segment. If they're not aligned, this gives the segment
1120 // that overlaps no more than 1/2 * targetDurationUs.
1121 mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs
1122 + mPlaylist->getTargetDuration() / 2);
1123 } else {
1124 mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs);
1125 }
1126 ssize_t minSeq = getSeqNumberForDiscontinuity(mDiscontinuitySeq);
1127 if (mSeqNumber < minSeq) {
1128 mSeqNumber = minSeq;
1129 }
1130
1131 if (mSeqNumber < firstSeqNumberInPlaylist) {
1132 mSeqNumber = firstSeqNumberInPlaylist;
1133 }
1134
1135 if (mSeqNumber > lastSeqNumberInPlaylist) {
1136 mSeqNumber = lastSeqNumberInPlaylist;
1137 }
1138 FLOGV("Initial sequence number is %d from (%d .. %d)",
1139 mSeqNumber, firstSeqNumberInPlaylist,
1140 lastSeqNumberInPlaylist);
1141 }
1142 }
1143
1144 // if mPlaylist is NULL then err must be non-OK; but the other way around might not be true
1145 if (mSeqNumber < firstSeqNumberInPlaylist
1146 || mSeqNumber > lastSeqNumberInPlaylist
1147 || err != OK) {
1148 if ((err != OK || !mPlaylist->isComplete()) && mNumRetries < kMaxNumRetries) {
1149 ++mNumRetries;
1150
1151 if (mSeqNumber > lastSeqNumberInPlaylist || err != OK) {
1152 // make sure we reach this retry logic on refresh failures
1153 // by adding an err != OK clause to all enclosing if's.
1154
1155 // refresh in increasing fraction (1/2, 1/3, ...) of the
1156 // playlist's target duration or 3 seconds, whichever is less
1157 int64_t delayUs = kMaxMonitorDelayUs;
1158 if (mPlaylist != NULL) {
1159 delayUs = mPlaylist->size() * mPlaylist->getTargetDuration()
1160 / (1 + mNumRetries);
1161 }
1162 if (delayUs > kMaxMonitorDelayUs) {
1163 delayUs = kMaxMonitorDelayUs;
1164 }
1165 FLOGV("sequence number high: %d from (%d .. %d), "
1166 "monitor in %lld (retry=%d)",
1167 mSeqNumber, firstSeqNumberInPlaylist,
1168 lastSeqNumberInPlaylist, (long long)delayUs, mNumRetries);
1169 postMonitorQueue(delayUs);
1170 return false;
1171 }
1172
1173 if (err != OK) {
1174 notifyError(err);
1175 return false;
1176 }
1177
1178 // we've missed the boat, let's start 3 segments prior to the latest sequence
1179 // number available and signal a discontinuity.
1180
1181 ALOGI("We've missed the boat, restarting playback."
1182 " mStartup=%d, was looking for %d in %d-%d",
1183 mStartup, mSeqNumber, firstSeqNumberInPlaylist,
1184 lastSeqNumberInPlaylist);
1185 if (mStopParams != NULL) {
1186 // we should have kept on fetching until we hit the boundaries in mStopParams,
1187 // but since the segments we are supposed to fetch have already rolled off
1188 // the playlist, i.e. we have already missed the boat, we inevitably have to
1189 // skip.
1190 notifyStopReached();
1191 return false;
1192 }
1193 mSeqNumber = lastSeqNumberInPlaylist - 3;
1194 if (mSeqNumber < firstSeqNumberInPlaylist) {
1195 mSeqNumber = firstSeqNumberInPlaylist;
1196 }
1197 discontinuity = true;
1198
1199 // fall through
1200 } else {
1201 if (mPlaylist != NULL) {
1202 if (mSeqNumber >= firstSeqNumberInPlaylist + (int32_t)mPlaylist->size()
1203 && !mPlaylist->isComplete()) {
1204 // Live playlists
1205 ALOGW("sequence number %d not yet available", mSeqNumber);
1206 postMonitorQueue(delayUsToRefreshPlaylist());
1207 return false;
1208 }
1209 ALOGE("Cannot find sequence number %d in playlist "
1210 "(contains %d - %d)",
1211 mSeqNumber, firstSeqNumberInPlaylist,
1212 firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1);
1213
1214 if (mTSParser != NULL) {
1215 mTSParser->signalEOS(ERROR_END_OF_STREAM);
1216 // Use an empty buffer; we don't have any new data, just want to extract
1217 // potential new access units after flush. Reset mSeqNumber to
1218 // lastSeqNumberInPlaylist such that we set the correct access unit
1219 // properties in extractAndQueueAccessUnitsFromTs.
1220 sp<ABuffer> buffer = new ABuffer(0);
1221 mSeqNumber = lastSeqNumberInPlaylist;
1222 extractAndQueueAccessUnitsFromTs(buffer);
1223 }
1224 notifyError(ERROR_END_OF_STREAM);
1225 } else {
1226 // It's possible that we were never able to download the playlist.
1227 // In this case we should notify error, instead of EOS, as EOS during
1228 // prepare means we succeeded in downloading everything.
1229 ALOGE("Failed to download playlist!");
1230 notifyError(ERROR_IO);
1231 }
1232
1233 return false;
1234 }
1235 }
1236
1237 mNumRetries = 0;
1238
1239 CHECK(mPlaylist->itemAt(
1240 mSeqNumber - firstSeqNumberInPlaylist,
1241 &uri,
1242 &itemMeta));
1243
1244 CHECK(itemMeta->findInt32("discontinuity-sequence", &mDiscontinuitySeq));
1245
1246 int32_t val;
1247 if (itemMeta->findInt32("discontinuity", &val) && val != 0) {
1248 discontinuity = true;
1249 } else if (mLastDiscontinuitySeq >= 0
1250 && mDiscontinuitySeq != mLastDiscontinuitySeq) {
1251 // Seek jumped to a new discontinuity sequence. We need to signal
1252 // a format change to decoder. Decoder needs to shutdown and be
1253 // created again if seamless format change is unsupported.
1254 FLOGV("saw discontinuity: mStartup %d, mLastDiscontinuitySeq %d, "
1255 "mDiscontinuitySeq %d, mStartTimeUs %lld",
1256 mStartup, mLastDiscontinuitySeq, mDiscontinuitySeq, (long long)mStartTimeUs);
1257 discontinuity = true;
1258 }
1259 mLastDiscontinuitySeq = -1;
1260
1261 // decrypt a junk buffer to prefetch key; since a session uses only one http connection,
1262 // this avoids interleaved connections to the key and segment file.
1263 {
1264 sp<ABuffer> junk = new ABuffer(16);
1265 junk->setRange(0, 16);
1266 status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, junk,
1267 true /* first */);
1268 if (err == ERROR_NOT_CONNECTED) {
1269 return false;
1270 } else if (err != OK) {
1271 notifyError(err);
1272 return false;
1273 }
1274 }
1275
1276 if ((mStartup && !mTimeChangeSignaled) || discontinuity) {
1277 // We need to signal a time discontinuity to ATSParser on the
1278 // first segment after start, or on a discontinuity segment.
1279 // Setting mNextPTSTimeUs informs extractAndQueueAccessUnitsXX()
1280 // to send the time discontinuity.
1281 if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
1282 // If this was a live event this made no sense since
1283 // we don't have access to all the segment before the current
1284 // one.
1285 mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber);
1286 }
1287
1288 // Setting mTimeChangeSignaled to true, so that if start time
1289 // searching goes into 2nd segment (without a discontinuity),
1290 // we don't reset time again. It causes corruption when pending
1291 // data in ATSParser is cleared.
1292 mTimeChangeSignaled = true;
1293 }
1294
1295 if (discontinuity) {
1296 ALOGI("queueing discontinuity (explicit=%d)", discontinuity);
1297
1298 // Signal a format discontinuity to ATSParser to clear partial data
1299 // from previous streams. Not doing this causes bitstream corruption.
1300 if (mTSParser != NULL) {
1301 mTSParser.clear();
1302 }
1303
1304 queueDiscontinuity(
1305 ATSParser::DISCONTINUITY_FORMAT_ONLY,
1306 NULL /* extra */);
1307
1308 if (mStartup && mStartTimeUsRelative && mFirstPTSValid) {
1309 // This means we guessed mStartTimeUs to be in the previous
1310 // segment (likely very close to the end), but either video or
1311 // audio has not found start by the end of that segment.
1312 //
1313 // If this new segment is not a discontinuity, keep searching.
1314 //
1315 // If this new segment even got a discontinuity marker, just
1316 // set mStartTimeUs=0, and take all samples from now on.
1317 mStartTimeUs = 0;
1318 mFirstPTSValid = false;
1319 mIDRFound = false;
1320 mVideoBuffer->clear();
1321 }
1322 }
1323
1324 FLOGV("fetching segment %d from (%d .. %d)",
1325 mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
1326 return true;
1327 }
1328
onDownloadNext()1329 void PlaylistFetcher::onDownloadNext() {
1330 AString uri;
1331 sp<AMessage> itemMeta;
1332 sp<ABuffer> buffer;
1333 sp<ABuffer> tsBuffer;
1334 int32_t firstSeqNumberInPlaylist = 0;
1335 int32_t lastSeqNumberInPlaylist = 0;
1336 bool connectHTTP = true;
1337
1338 if (mDownloadState->hasSavedState()) {
1339 mDownloadState->restoreState(
1340 uri,
1341 itemMeta,
1342 buffer,
1343 tsBuffer,
1344 firstSeqNumberInPlaylist,
1345 lastSeqNumberInPlaylist);
1346 connectHTTP = false;
1347 FLOGV("resuming: '%s'", uri.c_str());
1348 } else {
1349 if (!initDownloadState(
1350 uri,
1351 itemMeta,
1352 firstSeqNumberInPlaylist,
1353 lastSeqNumberInPlaylist)) {
1354 return;
1355 }
1356 FLOGV("fetching: '%s'", uri.c_str());
1357 }
1358
1359 int64_t range_offset, range_length;
1360 if (!itemMeta->findInt64("range-offset", &range_offset)
1361 || !itemMeta->findInt64("range-length", &range_length)) {
1362 range_offset = 0;
1363 range_length = -1;
1364 }
1365
1366 // block-wise download
1367 bool shouldPause = false;
1368 ssize_t bytesRead;
1369 do {
1370 int64_t startUs = ALooper::GetNowUs();
1371 bytesRead = mHTTPDownloader->fetchBlock(
1372 uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize,
1373 NULL /* actualURL */, connectHTTP);
1374 int64_t delayUs = ALooper::GetNowUs() - startUs;
1375
1376 if (bytesRead == ERROR_NOT_CONNECTED) {
1377 return;
1378 }
1379 if (bytesRead < 0) {
1380 status_t err = bytesRead;
1381 ALOGE("failed to fetch .ts segment at url '%s'", uriDebugString(uri).c_str());
1382 notifyError(err);
1383 return;
1384 }
1385
1386 // add sample for bandwidth estimation, excluding samples from subtitles (as
1387 // its too small), or during startup/resumeUntil (when we could have more than
1388 // one connection open which affects bandwidth)
1389 if (!mStartup && mStopParams == NULL && bytesRead > 0
1390 && (mStreamTypeMask
1391 & (LiveSession::STREAMTYPE_AUDIO
1392 | LiveSession::STREAMTYPE_VIDEO))) {
1393 mSession->addBandwidthMeasurement(bytesRead, delayUs);
1394 if (delayUs > 2000000LL) {
1395 FLOGV("bytesRead %zd took %.2f seconds - abnormal bandwidth dip",
1396 bytesRead, (double)delayUs / 1.0e6);
1397 }
1398 }
1399
1400 connectHTTP = false;
1401
1402 CHECK(buffer != NULL);
1403
1404 size_t size = buffer->size();
1405 // Set decryption range.
1406 buffer->setRange(size - bytesRead, bytesRead);
1407 status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer,
1408 buffer->offset() == 0 /* first */);
1409 // Unset decryption range.
1410 buffer->setRange(0, size);
1411
1412 if (err != OK) {
1413 ALOGE("decryptBuffer failed w/ error %d", err);
1414
1415 notifyError(err);
1416 return;
1417 }
1418
1419 bool startUp = mStartup; // save current start up state
1420
1421 err = OK;
1422 if (bufferStartsWithTsSyncByte(buffer)) {
1423 // Incremental extraction is only supported for MPEG2 transport streams.
1424 if (tsBuffer == NULL) {
1425 tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
1426 tsBuffer->setRange(0, 0);
1427 } else if (tsBuffer->capacity() != buffer->capacity()) {
1428 size_t tsOff = tsBuffer->offset(), tsSize = tsBuffer->size();
1429 tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
1430 tsBuffer->setRange(tsOff, tsSize);
1431 }
1432 tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead);
1433 err = extractAndQueueAccessUnitsFromTs(tsBuffer);
1434 }
1435
1436 if (err == -EAGAIN) {
1437 // starting sequence number too low/high
1438 mTSParser.clear();
1439 for (size_t i = 0; i < mPacketSources.size(); i++) {
1440 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1441 packetSource->clear();
1442 }
1443 postMonitorQueue();
1444 return;
1445 } else if (err == ERROR_OUT_OF_RANGE) {
1446 // reached stopping point
1447 notifyStopReached();
1448 return;
1449 } else if (err != OK) {
1450 notifyError(err);
1451 return;
1452 }
1453 // If we're switching, post start notification
1454 // this should only be posted when the last chunk is full processed by TSParser
1455 if (mSeekMode != LiveSession::kSeekModeExactPosition && startUp != mStartup) {
1456 CHECK(mStartTimeUsNotify != NULL);
1457 mStartTimeUsNotify->post();
1458 mStartTimeUsNotify.clear();
1459 shouldPause = true;
1460 }
1461 if (shouldPause || shouldPauseDownload()) {
1462 // save state and return if this is not the last chunk,
1463 // leaving the fetcher in paused state.
1464 if (bytesRead != 0) {
1465 mDownloadState->saveState(
1466 uri,
1467 itemMeta,
1468 buffer,
1469 tsBuffer,
1470 firstSeqNumberInPlaylist,
1471 lastSeqNumberInPlaylist);
1472 return;
1473 }
1474 shouldPause = true;
1475 }
1476 } while (bytesRead != 0);
1477
1478 if (bufferStartsWithTsSyncByte(buffer)) {
1479 // If we don't see a stream in the program table after fetching a full ts segment
1480 // mark it as nonexistent.
1481 ATSParser::SourceType srcTypes[] =
1482 { ATSParser::VIDEO, ATSParser::AUDIO };
1483 LiveSession::StreamType streamTypes[] =
1484 { LiveSession::STREAMTYPE_VIDEO, LiveSession::STREAMTYPE_AUDIO };
1485 const size_t kNumTypes = NELEM(srcTypes);
1486
1487 for (size_t i = 0; i < kNumTypes; i++) {
1488 ATSParser::SourceType srcType = srcTypes[i];
1489 LiveSession::StreamType streamType = streamTypes[i];
1490
1491 sp<AnotherPacketSource> source =
1492 static_cast<AnotherPacketSource *>(
1493 mTSParser->getSource(srcType).get());
1494
1495 if (!mTSParser->hasSource(srcType)) {
1496 ALOGW("MPEG2 Transport stream does not contain %s data.",
1497 srcType == ATSParser::VIDEO ? "video" : "audio");
1498
1499 mStreamTypeMask &= ~streamType;
1500 mPacketSources.removeItem(streamType);
1501 }
1502 }
1503
1504 }
1505
1506 if (checkDecryptPadding(buffer) != OK) {
1507 ALOGE("Incorrect padding bytes after decryption.");
1508 notifyError(ERROR_MALFORMED);
1509 return;
1510 }
1511
1512 if (tsBuffer != NULL) {
1513 AString method;
1514 CHECK(buffer->meta()->findString("cipher-method", &method));
1515 if ((tsBuffer->size() > 0 && method == "NONE")
1516 || tsBuffer->size() > 16) {
1517 ALOGE("MPEG2 transport stream is not an even multiple of 188 "
1518 "bytes in length.");
1519 notifyError(ERROR_MALFORMED);
1520 return;
1521 }
1522 }
1523
1524 // bulk extract non-ts files
1525 bool startUp = mStartup;
1526 if (tsBuffer == NULL) {
1527 status_t err = extractAndQueueAccessUnits(buffer, itemMeta);
1528 if (err == -EAGAIN) {
1529 // starting sequence number too low/high
1530 postMonitorQueue();
1531 return;
1532 } else if (err == ERROR_OUT_OF_RANGE) {
1533 // reached stopping point
1534 notifyStopReached();
1535 return;
1536 } else if (err != OK) {
1537 notifyError(err);
1538 return;
1539 }
1540 }
1541
1542 ++mSeqNumber;
1543
1544 // if adapting, pause after found the next starting point
1545 if (mSeekMode != LiveSession::kSeekModeExactPosition && startUp != mStartup) {
1546 CHECK(mStartTimeUsNotify != NULL);
1547 mStartTimeUsNotify->post();
1548 mStartTimeUsNotify.clear();
1549 shouldPause = true;
1550 }
1551
1552 if (!shouldPause) {
1553 postMonitorQueue();
1554 }
1555 }
1556
1557 /*
1558 * returns true if we need to adjust mSeqNumber
1559 */
adjustSeqNumberWithAnchorTime(int64_t anchorTimeUs)1560 bool PlaylistFetcher::adjustSeqNumberWithAnchorTime(int64_t anchorTimeUs) {
1561 int32_t firstSeqNumberInPlaylist = mPlaylist->getFirstSeqNumber();
1562
1563 int64_t minDiffUs, maxDiffUs;
1564 if (mSeekMode == LiveSession::kSeekModeNextSample) {
1565 // if the previous fetcher paused in the middle of a segment, we
1566 // want to start at a segment that overlaps the last sample
1567 minDiffUs = -mPlaylist->getTargetDuration();
1568 maxDiffUs = 0LL;
1569 } else {
1570 // if the previous fetcher paused at the end of a segment, ideally
1571 // we want to start at the segment that's roughly aligned with its
1572 // next segment, but if the two variants are not well aligned we
1573 // adjust the diff to within (-T/2, T/2)
1574 minDiffUs = -mPlaylist->getTargetDuration() / 2;
1575 maxDiffUs = mPlaylist->getTargetDuration() / 2;
1576 }
1577
1578 int32_t oldSeqNumber = mSeqNumber;
1579 ssize_t index = mSeqNumber - firstSeqNumberInPlaylist;
1580
1581 // adjust anchorTimeUs to within (minDiffUs, maxDiffUs) from mStartTimeUs
1582 int64_t diffUs = anchorTimeUs - mStartTimeUs;
1583 if (diffUs > maxDiffUs) {
1584 while (index > 0 && diffUs > maxDiffUs) {
1585 --index;
1586
1587 sp<AMessage> itemMeta;
1588 CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta));
1589
1590 int64_t itemDurationUs;
1591 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1592
1593 diffUs -= itemDurationUs;
1594 }
1595 } else if (diffUs < minDiffUs) {
1596 while (index + 1 < (ssize_t) mPlaylist->size()
1597 && diffUs < minDiffUs) {
1598 ++index;
1599
1600 sp<AMessage> itemMeta;
1601 CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta));
1602
1603 int64_t itemDurationUs;
1604 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1605
1606 diffUs += itemDurationUs;
1607 }
1608 }
1609
1610 mSeqNumber = firstSeqNumberInPlaylist + index;
1611
1612 if (mSeqNumber != oldSeqNumber) {
1613 FLOGV("guessed wrong seg number: diff %lld out of [%lld, %lld]",
1614 (long long) anchorTimeUs - mStartTimeUs,
1615 (long long) minDiffUs,
1616 (long long) maxDiffUs);
1617 return true;
1618 }
1619 return false;
1620 }
1621
getSeqNumberForDiscontinuity(size_t discontinuitySeq) const1622 int32_t PlaylistFetcher::getSeqNumberForDiscontinuity(size_t discontinuitySeq) const {
1623 int32_t firstSeqNumberInPlaylist = mPlaylist->getFirstSeqNumber();
1624
1625 size_t index = 0;
1626 while (index < mPlaylist->size()) {
1627 sp<AMessage> itemMeta;
1628 CHECK(mPlaylist->itemAt( index, NULL /* uri */, &itemMeta));
1629 size_t curDiscontinuitySeq;
1630 CHECK(itemMeta->findInt32("discontinuity-sequence", (int32_t *)&curDiscontinuitySeq));
1631 int32_t seqNumber = firstSeqNumberInPlaylist + index;
1632 if (curDiscontinuitySeq == discontinuitySeq) {
1633 return seqNumber;
1634 } else if (curDiscontinuitySeq > discontinuitySeq) {
1635 return seqNumber <= 0 ? 0 : seqNumber - 1;
1636 }
1637
1638 ++index;
1639 }
1640
1641 return firstSeqNumberInPlaylist + mPlaylist->size();
1642 }
1643
getSeqNumberForTime(int64_t timeUs) const1644 int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const {
1645 size_t index = 0;
1646 int64_t segmentStartUs = 0;
1647 while (index < mPlaylist->size()) {
1648 sp<AMessage> itemMeta;
1649 CHECK(mPlaylist->itemAt(
1650 index, NULL /* uri */, &itemMeta));
1651
1652 int64_t itemDurationUs;
1653 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1654
1655 if (timeUs < segmentStartUs + itemDurationUs) {
1656 break;
1657 }
1658
1659 segmentStartUs += itemDurationUs;
1660 ++index;
1661 }
1662
1663 if (index >= mPlaylist->size()) {
1664 index = mPlaylist->size() - 1;
1665 }
1666
1667 return mPlaylist->getFirstSeqNumber() + index;
1668 }
1669
setAccessUnitProperties(const sp<ABuffer> & accessUnit,const sp<AnotherPacketSource> & source,bool discard)1670 const sp<ABuffer> &PlaylistFetcher::setAccessUnitProperties(
1671 const sp<ABuffer> &accessUnit, const sp<AnotherPacketSource> &source, bool discard) {
1672 sp<MetaData> format = source->getFormat();
1673 if (format != NULL) {
1674 // for simplicity, store a reference to the format in each unit
1675 accessUnit->meta()->setObject("format", format);
1676 }
1677
1678 if (discard) {
1679 accessUnit->meta()->setInt32("discard", discard);
1680 }
1681
1682 accessUnit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1683 accessUnit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1684 accessUnit->meta()->setInt64("segmentFirstTimeUs", mSegmentFirstPTS);
1685 accessUnit->meta()->setInt64("segmentDurationUs", getSegmentDurationUs(mSeqNumber));
1686 if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
1687 accessUnit->meta()->setInt64("playlistTimeUs", mPlaylistTimeUs);
1688 }
1689 return accessUnit;
1690 }
1691
isStartTimeReached(int64_t timeUs)1692 bool PlaylistFetcher::isStartTimeReached(int64_t timeUs) {
1693 if (!mFirstPTSValid) {
1694 mFirstTimeUs = timeUs;
1695 mFirstPTSValid = true;
1696 }
1697 bool startTimeReached = true;
1698 if (mStartTimeUsRelative) {
1699 FLOGV("startTimeUsRelative, timeUs (%lld) - %lld = %lld",
1700 (long long)timeUs,
1701 (long long)mFirstTimeUs,
1702 (long long)(timeUs - mFirstTimeUs));
1703 timeUs -= mFirstTimeUs;
1704 if (timeUs < 0) {
1705 FLOGV("clamp negative timeUs to 0");
1706 timeUs = 0;
1707 }
1708 startTimeReached = (timeUs >= mStartTimeUs);
1709 }
1710 return startTimeReached;
1711 }
1712
extractAndQueueAccessUnitsFromTs(const sp<ABuffer> & buffer)1713 status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer) {
1714 if (mTSParser == NULL) {
1715 // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers.
1716 mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE);
1717 }
1718
1719 if (mNextPTSTimeUs >= 0LL) {
1720 sp<AMessage> extra = new AMessage;
1721 // Since we are using absolute timestamps, signal an offset of 0 to prevent
1722 // ATSParser from skewing the timestamps of access units.
1723 extra->setInt64(kATSParserKeyMediaTimeUs, 0);
1724
1725 // When adapting, signal a recent media time to the parser,
1726 // so that PTS wrap around is handled for the new variant.
1727 if (mStartTimeUs >= 0 && !mStartTimeUsRelative) {
1728 extra->setInt64(kATSParserKeyRecentMediaTimeUs, mStartTimeUs);
1729 }
1730
1731 mTSParser->signalDiscontinuity(
1732 ATSParser::DISCONTINUITY_TIME, extra);
1733
1734 mNextPTSTimeUs = -1LL;
1735 }
1736
1737 if (mSampleAesKeyItemChanged) {
1738 mTSParser->signalNewSampleAesKey(mSampleAesKeyItem);
1739 mSampleAesKeyItemChanged = false;
1740 }
1741
1742 size_t offset = 0;
1743 while (offset + 188 <= buffer->size()) {
1744 status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188);
1745
1746 if (err != OK) {
1747 return err;
1748 }
1749
1750 offset += 188;
1751 }
1752 // setRange to indicate consumed bytes.
1753 buffer->setRange(buffer->offset() + offset, buffer->size() - offset);
1754
1755 if (mSegmentFirstPTS < 0LL) {
1756 // get the smallest first PTS from all streams present in this parser
1757 for (size_t i = mPacketSources.size(); i > 0;) {
1758 i--;
1759 const LiveSession::StreamType stream = mPacketSources.keyAt(i);
1760 if (stream == LiveSession::STREAMTYPE_SUBTITLES) {
1761 ALOGE("MPEG2 Transport streams do not contain subtitles.");
1762 return ERROR_MALFORMED;
1763 }
1764 if (stream == LiveSession::STREAMTYPE_METADATA) {
1765 continue;
1766 }
1767 ATSParser::SourceType type =LiveSession::getSourceTypeForStream(stream);
1768 sp<AnotherPacketSource> source =
1769 static_cast<AnotherPacketSource *>(
1770 mTSParser->getSource(type).get());
1771
1772 if (source == NULL) {
1773 continue;
1774 }
1775 sp<AMessage> meta = source->getMetaAfterLastDequeued(0);
1776 if (meta != NULL) {
1777 int64_t timeUs;
1778 CHECK(meta->findInt64("timeUs", &timeUs));
1779 if (mSegmentFirstPTS < 0LL || timeUs < mSegmentFirstPTS) {
1780 mSegmentFirstPTS = timeUs;
1781 }
1782 }
1783 }
1784 if (mSegmentFirstPTS < 0LL) {
1785 // didn't find any TS packet, can return early
1786 return OK;
1787 }
1788 if (!mStartTimeUsRelative) {
1789 // mStartup
1790 // mStartup is true until we have queued a packet for all the streams
1791 // we are fetching. We queue packets whose timestamps are greater than
1792 // mStartTimeUs.
1793 // mSegmentStartTimeUs >= 0
1794 // mSegmentStartTimeUs is non-negative when adapting or switching tracks
1795 // adjustSeqNumberWithAnchorTime(timeUs) == true
1796 // we guessed a seq number that's either too large or too small.
1797 // If this happens, we'll adjust mSeqNumber and restart fetching from new
1798 // location. Note that we only want to adjust once, so set mSegmentStartTimeUs
1799 // to -1 so that we don't enter this chunk next time.
1800 if (mStartup && mSegmentStartTimeUs >= 0
1801 && adjustSeqNumberWithAnchorTime(mSegmentFirstPTS)) {
1802 mStartTimeUsNotify = mNotify->dup();
1803 mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
1804 mStartTimeUsNotify->setString("uri", mURI);
1805 mIDRFound = false;
1806 mSegmentStartTimeUs = -1;
1807 return -EAGAIN;
1808 }
1809 }
1810 }
1811
1812 status_t err = OK;
1813 for (size_t i = mPacketSources.size(); i > 0;) {
1814 i--;
1815 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1816
1817 const LiveSession::StreamType stream = mPacketSources.keyAt(i);
1818 if (stream == LiveSession::STREAMTYPE_SUBTITLES) {
1819 ALOGE("MPEG2 Transport streams do not contain subtitles.");
1820 return ERROR_MALFORMED;
1821 }
1822
1823 const char *key = LiveSession::getKeyForStream(stream);
1824 ATSParser::SourceType type =LiveSession::getSourceTypeForStream(stream);
1825
1826 sp<AnotherPacketSource> source =
1827 static_cast<AnotherPacketSource *>(
1828 mTSParser->getSource(type).get());
1829
1830 if (source == NULL) {
1831 continue;
1832 }
1833
1834 const char *mime;
1835 sp<MetaData> format = source->getFormat();
1836 bool isAvc = format != NULL && format->findCString(kKeyMIMEType, &mime)
1837 && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC);
1838
1839 sp<ABuffer> accessUnit;
1840 status_t finalResult;
1841 while (source->hasBufferAvailable(&finalResult)
1842 && source->dequeueAccessUnit(&accessUnit) == OK) {
1843
1844 int64_t timeUs;
1845 CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
1846
1847 if (mStartup) {
1848 bool startTimeReached = isStartTimeReached(timeUs);
1849
1850 if (!startTimeReached || (isAvc && !mIDRFound)) {
1851 // buffer up to the closest preceding IDR frame in the next segement,
1852 // or the closest succeeding IDR frame after the exact position
1853 FSLOGV(stream, "timeUs(%lld)-mStartTimeUs(%lld)=%lld, mIDRFound=%d",
1854 (long long)timeUs,
1855 (long long)mStartTimeUs,
1856 (long long)timeUs - mStartTimeUs,
1857 mIDRFound);
1858 if (isAvc) {
1859 if (IsIDR(accessUnit->data(), accessUnit->size())) {
1860 mVideoBuffer->clear();
1861 FSLOGV(stream, "found IDR, clear mVideoBuffer");
1862 mIDRFound = true;
1863 }
1864 if (mIDRFound && mStartTimeUsRelative && !startTimeReached) {
1865 mVideoBuffer->queueAccessUnit(accessUnit);
1866 FSLOGV(stream, "saving AVC video AccessUnit");
1867 }
1868 }
1869 if (!startTimeReached || (isAvc && !mIDRFound)) {
1870 continue;
1871 }
1872 }
1873 }
1874
1875 if (mStartTimeUsNotify != NULL) {
1876 uint32_t streamMask = 0;
1877 mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask);
1878 if ((mStreamTypeMask & mPacketSources.keyAt(i))
1879 && !(streamMask & mPacketSources.keyAt(i))) {
1880 streamMask |= mPacketSources.keyAt(i);
1881 mStartTimeUsNotify->setInt32("streamMask", streamMask);
1882 FSLOGV(stream, "found start point, timeUs=%lld, streamMask becomes %x",
1883 (long long)timeUs, streamMask);
1884
1885 if (streamMask == mStreamTypeMask) {
1886 FLOGV("found start point for all streams");
1887 mStartup = false;
1888 }
1889 }
1890 }
1891
1892 if (mStopParams != NULL) {
1893 int32_t discontinuitySeq;
1894 int64_t stopTimeUs;
1895 if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
1896 || discontinuitySeq > mDiscontinuitySeq
1897 || !mStopParams->findInt64(key, &stopTimeUs)
1898 || (discontinuitySeq == mDiscontinuitySeq
1899 && timeUs >= stopTimeUs)) {
1900 FSLOGV(stream, "reached stop point, timeUs=%lld", (long long)timeUs);
1901 mStreamTypeMask &= ~stream;
1902 mPacketSources.removeItemsAt(i);
1903 break;
1904 }
1905 }
1906
1907 if (stream == LiveSession::STREAMTYPE_VIDEO) {
1908 const bool discard = true;
1909 status_t status;
1910 while (mVideoBuffer->hasBufferAvailable(&status)) {
1911 sp<ABuffer> videoBuffer;
1912 mVideoBuffer->dequeueAccessUnit(&videoBuffer);
1913 setAccessUnitProperties(videoBuffer, source, discard);
1914 packetSource->queueAccessUnit(videoBuffer);
1915 int64_t bufferTimeUs;
1916 CHECK(videoBuffer->meta()->findInt64("timeUs", &bufferTimeUs));
1917 FSLOGV(stream, "queueAccessUnit (saved), timeUs=%lld",
1918 (long long)bufferTimeUs);
1919 }
1920 } else if (stream == LiveSession::STREAMTYPE_METADATA && !mHasMetadata) {
1921 mHasMetadata = true;
1922 sp<AMessage> notify = mNotify->dup();
1923 notify->setInt32("what", kWhatMetadataDetected);
1924 notify->post();
1925 }
1926
1927 setAccessUnitProperties(accessUnit, source);
1928 packetSource->queueAccessUnit(accessUnit);
1929 FSLOGV(stream, "queueAccessUnit, timeUs=%lld", (long long)timeUs);
1930 }
1931
1932 if (err != OK) {
1933 break;
1934 }
1935 }
1936
1937 if (err != OK) {
1938 for (size_t i = mPacketSources.size(); i > 0;) {
1939 i--;
1940 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1941 packetSource->clear();
1942 }
1943 return err;
1944 }
1945
1946 if (!mStreamTypeMask) {
1947 // Signal gap is filled between original and new stream.
1948 FLOGV("reached stop point for all streams");
1949 return ERROR_OUT_OF_RANGE;
1950 }
1951
1952 return OK;
1953 }
1954
1955 /* static */
bufferStartsWithWebVTTMagicSequence(const sp<ABuffer> & buffer)1956 bool PlaylistFetcher::bufferStartsWithWebVTTMagicSequence(
1957 const sp<ABuffer> &buffer) {
1958 size_t pos = 0;
1959
1960 // skip possible BOM
1961 if (buffer->size() >= pos + 3 &&
1962 !memcmp("\xef\xbb\xbf", buffer->data() + pos, 3)) {
1963 pos += 3;
1964 }
1965
1966 // accept WEBVTT followed by SPACE, TAB or (CR) LF
1967 if (buffer->size() < pos + 6 ||
1968 memcmp("WEBVTT", buffer->data() + pos, 6)) {
1969 return false;
1970 }
1971 pos += 6;
1972
1973 if (buffer->size() == pos) {
1974 return true;
1975 }
1976
1977 uint8_t sep = buffer->data()[pos];
1978 return sep == ' ' || sep == '\t' || sep == '\n' || sep == '\r';
1979 }
1980
extractAndQueueAccessUnits(const sp<ABuffer> & buffer,const sp<AMessage> & itemMeta)1981 status_t PlaylistFetcher::extractAndQueueAccessUnits(
1982 const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) {
1983 if (bufferStartsWithWebVTTMagicSequence(buffer)) {
1984 if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) {
1985 ALOGE("This stream only contains subtitles.");
1986 return ERROR_MALFORMED;
1987 }
1988
1989 const sp<AnotherPacketSource> packetSource =
1990 mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
1991
1992 int64_t durationUs;
1993 CHECK(itemMeta->findInt64("durationUs", &durationUs));
1994 buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber));
1995 buffer->meta()->setInt64("durationUs", durationUs);
1996 buffer->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1997 buffer->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1998 buffer->meta()->setInt32("subtitleGeneration", mSubtitleGeneration);
1999 packetSource->queueAccessUnit(buffer);
2000 return OK;
2001 }
2002
2003 if (mNextPTSTimeUs >= 0LL) {
2004 mNextPTSTimeUs = -1LL;
2005 }
2006
2007 // This better be an ISO 13818-7 (AAC) or ISO 13818-1 (MPEG) audio
2008 // stream prefixed by an ID3 tag.
2009
2010 bool firstID3Tag = true;
2011 uint64_t PTS = 0;
2012
2013 for (;;) {
2014 // Make sure to skip all ID3 tags preceding the audio data.
2015 // At least one must be present to provide the PTS timestamp.
2016
2017 ID3 id3(buffer->data(), buffer->size(), true /* ignoreV1 */);
2018 if (!id3.isValid()) {
2019 if (firstID3Tag) {
2020 ALOGE("Unable to parse ID3 tag.");
2021 return ERROR_MALFORMED;
2022 } else {
2023 break;
2024 }
2025 }
2026
2027 if (firstID3Tag) {
2028 bool found = false;
2029
2030 ID3::Iterator it(id3, "PRIV");
2031 while (!it.done()) {
2032 size_t length;
2033 const uint8_t *data = it.getData(&length);
2034 if (!data) {
2035 return ERROR_MALFORMED;
2036 }
2037
2038 static const char *kMatchName =
2039 "com.apple.streaming.transportStreamTimestamp";
2040 static const size_t kMatchNameLen = strlen(kMatchName);
2041
2042 if (length == kMatchNameLen + 1 + 8
2043 && !strncmp((const char *)data, kMatchName, kMatchNameLen)) {
2044 found = true;
2045 PTS = U64_AT(&data[kMatchNameLen + 1]);
2046 }
2047
2048 it.next();
2049 }
2050
2051 if (!found) {
2052 ALOGE("Unable to extract transportStreamTimestamp from ID3 tag.");
2053 return ERROR_MALFORMED;
2054 }
2055 }
2056
2057 // skip the ID3 tag
2058 buffer->setRange(
2059 buffer->offset() + id3.rawSize(), buffer->size() - id3.rawSize());
2060
2061 firstID3Tag = false;
2062 }
2063
2064 if (mStreamTypeMask != LiveSession::STREAMTYPE_AUDIO) {
2065 ALOGW("This stream only contains audio data!");
2066
2067 mStreamTypeMask &= LiveSession::STREAMTYPE_AUDIO;
2068
2069 if (mStreamTypeMask == 0) {
2070 return OK;
2071 }
2072 }
2073
2074 sp<AnotherPacketSource> packetSource =
2075 mPacketSources.valueFor(LiveSession::STREAMTYPE_AUDIO);
2076
2077 if (packetSource->getFormat() == NULL && buffer->size() >= 7) {
2078 ABitReader bits(buffer->data(), buffer->size());
2079
2080 // adts_fixed_header
2081
2082 CHECK_EQ(bits.getBits(12), 0xfffu);
2083 bits.skipBits(3); // ID, layer
2084 bool protection_absent __unused = bits.getBits(1) != 0;
2085
2086 unsigned profile = bits.getBits(2);
2087 CHECK_NE(profile, 3u);
2088 unsigned sampling_freq_index = bits.getBits(4);
2089 bits.getBits(1); // private_bit
2090 unsigned channel_configuration = bits.getBits(3);
2091 CHECK_NE(channel_configuration, 0u);
2092 bits.skipBits(2); // original_copy, home
2093
2094 sp<MetaData> meta = new MetaData();
2095 MakeAACCodecSpecificData(*meta,
2096 profile, sampling_freq_index, channel_configuration);
2097
2098 meta->setInt32(kKeyIsADTS, true);
2099
2100 packetSource->setFormat(meta);
2101 }
2102
2103 int64_t numSamples = 0LL;
2104 int32_t sampleRate;
2105 CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate));
2106
2107 int64_t timeUs = (PTS * 100LL) / 9LL;
2108 if (mStartup && !mFirstPTSValid) {
2109 mFirstPTSValid = true;
2110 mFirstTimeUs = timeUs;
2111 }
2112
2113 if (mSegmentFirstPTS < 0LL) {
2114 mSegmentFirstPTS = timeUs;
2115 if (!mStartTimeUsRelative) {
2116 // Duplicated logic from how we handle .ts playlists.
2117 if (mStartup && mSegmentStartTimeUs >= 0
2118 && adjustSeqNumberWithAnchorTime(timeUs)) {
2119 mSegmentStartTimeUs = -1;
2120 return -EAGAIN;
2121 }
2122 }
2123 }
2124
2125 sp<HlsSampleDecryptor> sampleDecryptor = NULL;
2126 if (mSampleAesKeyItem != NULL) {
2127 ALOGV("extractAndQueueAccessUnits[%d] SampleAesKeyItem: Key: %s IV: %s",
2128 mSeqNumber,
2129 HlsSampleDecryptor::aesBlockToStr(mKeyData).c_str(),
2130 HlsSampleDecryptor::aesBlockToStr(mAESInitVec).c_str());
2131
2132 sampleDecryptor = new HlsSampleDecryptor(mSampleAesKeyItem);
2133 }
2134
2135 int frameId = 0;
2136
2137 size_t offset = 0;
2138 while (offset < buffer->size()) {
2139 const uint8_t *adtsHeader = buffer->data() + offset;
2140 if (buffer->size() <= offset+5) {
2141 ALOGV("buffer does not contain a complete header");
2142 return ERROR_MALFORMED;
2143 }
2144 // non-const pointer for decryption if needed
2145 uint8_t *adtsFrame = buffer->data() + offset;
2146
2147 unsigned aac_frame_length =
2148 ((adtsHeader[3] & 3) << 11)
2149 | (adtsHeader[4] << 3)
2150 | (adtsHeader[5] >> 5);
2151
2152 if (aac_frame_length == 0) {
2153 const uint8_t *id3Header = adtsHeader;
2154 if (!memcmp(id3Header, "ID3", 3)) {
2155 ID3 id3(id3Header, buffer->size() - offset, true);
2156 if (id3.isValid()) {
2157 offset += id3.rawSize();
2158 continue;
2159 };
2160 }
2161 return ERROR_MALFORMED;
2162 }
2163
2164 CHECK_LE(offset + aac_frame_length, buffer->size());
2165
2166 int64_t unitTimeUs = timeUs + numSamples * 1000000LL / sampleRate;
2167 offset += aac_frame_length;
2168
2169 // Each AAC frame encodes 1024 samples.
2170 numSamples += 1024;
2171
2172 if (mStartup) {
2173 int64_t startTimeUs = unitTimeUs;
2174 if (mStartTimeUsRelative) {
2175 startTimeUs -= mFirstTimeUs;
2176 if (startTimeUs < 0) {
2177 startTimeUs = 0;
2178 }
2179 }
2180 if (startTimeUs < mStartTimeUs) {
2181 continue;
2182 }
2183
2184 if (mStartTimeUsNotify != NULL) {
2185 mStartTimeUsNotify->setInt32("streamMask", LiveSession::STREAMTYPE_AUDIO);
2186 mStartup = false;
2187 }
2188 }
2189
2190 if (mStopParams != NULL) {
2191 int32_t discontinuitySeq;
2192 int64_t stopTimeUs;
2193 if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
2194 || discontinuitySeq > mDiscontinuitySeq
2195 || !mStopParams->findInt64("timeUsAudio", &stopTimeUs)
2196 || (discontinuitySeq == mDiscontinuitySeq && unitTimeUs >= stopTimeUs)) {
2197 mStreamTypeMask = 0;
2198 mPacketSources.clear();
2199 return ERROR_OUT_OF_RANGE;
2200 }
2201 }
2202
2203 if (sampleDecryptor != NULL) {
2204 bool protection_absent = (adtsHeader[1] & 0x1);
2205 size_t headerSize = protection_absent ? 7 : 9;
2206 if (frameId == 0) {
2207 ALOGV("extractAndQueueAAC[%d] protection_absent %d (%02x) headerSize %zu",
2208 mSeqNumber, protection_absent, adtsHeader[1], headerSize);
2209 }
2210
2211 sampleDecryptor->processAAC(headerSize, adtsFrame, aac_frame_length);
2212 }
2213 frameId++;
2214
2215 sp<ABuffer> unit = new ABuffer(aac_frame_length);
2216 memcpy(unit->data(), adtsHeader, aac_frame_length);
2217
2218 unit->meta()->setInt64("timeUs", unitTimeUs);
2219 setAccessUnitProperties(unit, packetSource);
2220 packetSource->queueAccessUnit(unit);
2221 }
2222
2223 return OK;
2224 }
2225
updateDuration()2226 void PlaylistFetcher::updateDuration() {
2227 int64_t durationUs = 0LL;
2228 for (size_t index = 0; index < mPlaylist->size(); ++index) {
2229 sp<AMessage> itemMeta;
2230 CHECK(mPlaylist->itemAt(
2231 index, NULL /* uri */, &itemMeta));
2232
2233 int64_t itemDurationUs;
2234 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
2235
2236 durationUs += itemDurationUs;
2237 }
2238
2239 sp<AMessage> msg = mNotify->dup();
2240 msg->setInt32("what", kWhatDurationUpdate);
2241 msg->setInt64("durationUs", durationUs);
2242 msg->post();
2243 }
2244
updateTargetDuration()2245 void PlaylistFetcher::updateTargetDuration() {
2246 sp<AMessage> msg = mNotify->dup();
2247 msg->setInt32("what", kWhatTargetDurationUpdate);
2248 msg->setInt64("targetDurationUs", mPlaylist->getTargetDuration());
2249 msg->post();
2250 }
2251
2252 } // namespace android
2253