1 /*
2  * Copyright 2014, The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "MediaCodecSource"
19 #define DEBUG_DRIFT_TIME 0
20 
21 #include <inttypes.h>
22 
23 #include <gui/IGraphicBufferProducer.h>
24 #include <gui/Surface.h>
25 #include <mediadrm/ICrypto.h>
26 #include <media/MediaBufferHolder.h>
27 #include <media/MediaCodecBuffer.h>
28 #include <media/MediaSource.h>
29 #include <media/stagefright/foundation/ABuffer.h>
30 #include <media/stagefright/foundation/ADebug.h>
31 #include <media/stagefright/foundation/ALooper.h>
32 #include <media/stagefright/foundation/AMessage.h>
33 #include <media/stagefright/MediaBuffer.h>
34 #include <media/stagefright/MediaCodec.h>
35 #include <media/stagefright/MediaCodecConstants.h>
36 #include <media/stagefright/MediaCodecList.h>
37 #include <media/stagefright/MediaCodecSource.h>
38 #include <media/stagefright/MediaErrors.h>
39 #include <media/stagefright/MetaData.h>
40 #include <media/stagefright/Utils.h>
41 
42 namespace android {
43 
44 const int32_t kDefaultSwVideoEncoderFormat = HAL_PIXEL_FORMAT_YCbCr_420_888;
45 const int32_t kDefaultHwVideoEncoderFormat = HAL_PIXEL_FORMAT_IMPLEMENTATION_DEFINED;
46 const int32_t kDefaultVideoEncoderDataSpace = HAL_DATASPACE_V0_BT709;
47 
48 const int kStopTimeoutUs = 300000; // allow 1 sec for shutting down encoder
49 // allow maximum 1 sec for stop time offset. This limits the the delay in the
50 // input source.
51 const int kMaxStopTimeOffsetUs = 1000000;
52 
53 struct MediaCodecSource::Puller : public AHandler {
54     explicit Puller(const sp<MediaSource> &source);
55 
56     void interruptSource();
57     status_t start(const sp<MetaData> &meta, const sp<AMessage> &notify);
58     void stop();
59     void stopSource();
60     void pause();
61     void resume();
62     status_t setStopTimeUs(int64_t stopTimeUs);
63     bool readBuffer(MediaBufferBase **buffer);
64 
65 protected:
66     virtual void onMessageReceived(const sp<AMessage> &msg);
67     virtual ~Puller();
68 
69 private:
70     enum {
71         kWhatStart = 'msta',
72         kWhatStop,
73         kWhatPull,
74         kWhatSetStopTimeUs,
75     };
76 
77     sp<MediaSource> mSource;
78     sp<AMessage> mNotify;
79     sp<ALooper> mLooper;
80     bool mIsAudio;
81 
82     struct Queue {
Queueandroid::MediaCodecSource::Puller::Queue83         Queue()
84             : mReadPendingSince(0),
85               mPaused(false),
86               mPulling(false) { }
87         int64_t mReadPendingSince;
88         bool mPaused;
89         bool mPulling;
90         Vector<MediaBufferBase *> mReadBuffers;
91 
92         void flush();
93         // if queue is empty, return false and set *|buffer| to NULL . Otherwise, pop
94         // buffer from front of the queue, place it into *|buffer| and return true.
95         bool readBuffer(MediaBufferBase **buffer);
96         // add a buffer to the back of the queue
97         void pushBuffer(MediaBufferBase *mbuf);
98     };
99     Mutexed<Queue> mQueue;
100 
101     status_t postSynchronouslyAndReturnError(const sp<AMessage> &msg);
102     void schedulePull();
103     void handleEOS();
104 
105     DISALLOW_EVIL_CONSTRUCTORS(Puller);
106 };
107 
Puller(const sp<MediaSource> & source)108 MediaCodecSource::Puller::Puller(const sp<MediaSource> &source)
109     : mSource(source),
110       mLooper(new ALooper()),
111       mIsAudio(false)
112 {
113     sp<MetaData> meta = source->getFormat();
114     const char *mime;
115     CHECK(meta->findCString(kKeyMIMEType, &mime));
116 
117     mIsAudio = !strncasecmp(mime, "audio/", 6);
118 
119     mLooper->setName("pull_looper");
120 }
121 
~Puller()122 MediaCodecSource::Puller::~Puller() {
123     mLooper->unregisterHandler(id());
124     mLooper->stop();
125 }
126 
pushBuffer(MediaBufferBase * mbuf)127 void MediaCodecSource::Puller::Queue::pushBuffer(MediaBufferBase *mbuf) {
128     mReadBuffers.push_back(mbuf);
129 }
130 
readBuffer(MediaBufferBase ** mbuf)131 bool MediaCodecSource::Puller::Queue::readBuffer(MediaBufferBase **mbuf) {
132     if (mReadBuffers.empty()) {
133         *mbuf = NULL;
134         return false;
135     }
136     *mbuf = *mReadBuffers.begin();
137     mReadBuffers.erase(mReadBuffers.begin());
138     return true;
139 }
140 
flush()141 void MediaCodecSource::Puller::Queue::flush() {
142     MediaBufferBase *mbuf;
143     while (readBuffer(&mbuf)) {
144         // there are no null buffers in the queue
145         mbuf->release();
146     }
147 }
148 
readBuffer(MediaBufferBase ** mbuf)149 bool MediaCodecSource::Puller::readBuffer(MediaBufferBase **mbuf) {
150     Mutexed<Queue>::Locked queue(mQueue);
151     return queue->readBuffer(mbuf);
152 }
153 
postSynchronouslyAndReturnError(const sp<AMessage> & msg)154 status_t MediaCodecSource::Puller::postSynchronouslyAndReturnError(
155         const sp<AMessage> &msg) {
156     sp<AMessage> response;
157     status_t err = msg->postAndAwaitResponse(&response);
158 
159     if (err != OK) {
160         return err;
161     }
162 
163     if (!response->findInt32("err", &err)) {
164         err = OK;
165     }
166 
167     return err;
168 }
169 
setStopTimeUs(int64_t stopTimeUs)170 status_t MediaCodecSource::Puller::setStopTimeUs(int64_t stopTimeUs) {
171     sp<AMessage> msg = new AMessage(kWhatSetStopTimeUs, this);
172     msg->setInt64("stop-time-us", stopTimeUs);
173     return postSynchronouslyAndReturnError(msg);
174 }
175 
start(const sp<MetaData> & meta,const sp<AMessage> & notify)176 status_t MediaCodecSource::Puller::start(const sp<MetaData> &meta, const sp<AMessage> &notify) {
177     ALOGV("puller (%s) start", mIsAudio ? "audio" : "video");
178     mLooper->start(
179             false /* runOnCallingThread */,
180             false /* canCallJava */,
181             PRIORITY_AUDIO);
182     mLooper->registerHandler(this);
183     mNotify = notify;
184 
185     sp<AMessage> msg = new AMessage(kWhatStart, this);
186     msg->setObject("meta", meta);
187     return postSynchronouslyAndReturnError(msg);
188 }
189 
stop()190 void MediaCodecSource::Puller::stop() {
191     bool interrupt = false;
192     {
193         // mark stopping before actually reaching kWhatStop on the looper, so the pulling will
194         // stop.
195         Mutexed<Queue>::Locked queue(mQueue);
196         queue->mPulling = false;
197         interrupt = queue->mReadPendingSince && (queue->mReadPendingSince < ALooper::GetNowUs() - 1000000);
198         queue->flush(); // flush any unprocessed pulled buffers
199     }
200 
201     if (interrupt) {
202         interruptSource();
203     }
204 }
205 
interruptSource()206 void MediaCodecSource::Puller::interruptSource() {
207     // call source->stop if read has been pending for over a second
208     // We have to call this outside the looper as looper is pending on the read.
209     mSource->stop();
210 }
211 
stopSource()212 void MediaCodecSource::Puller::stopSource() {
213     sp<AMessage> msg = new AMessage(kWhatStop, this);
214     (void)postSynchronouslyAndReturnError(msg);
215 }
216 
pause()217 void MediaCodecSource::Puller::pause() {
218     Mutexed<Queue>::Locked queue(mQueue);
219     queue->mPaused = true;
220 }
221 
resume()222 void MediaCodecSource::Puller::resume() {
223     Mutexed<Queue>::Locked queue(mQueue);
224     queue->mPaused = false;
225 }
226 
schedulePull()227 void MediaCodecSource::Puller::schedulePull() {
228     (new AMessage(kWhatPull, this))->post();
229 }
230 
handleEOS()231 void MediaCodecSource::Puller::handleEOS() {
232     ALOGV("puller (%s) posting EOS", mIsAudio ? "audio" : "video");
233     sp<AMessage> msg = mNotify->dup();
234     msg->setInt32("eos", 1);
235     msg->post();
236 }
237 
onMessageReceived(const sp<AMessage> & msg)238 void MediaCodecSource::Puller::onMessageReceived(const sp<AMessage> &msg) {
239     switch (msg->what()) {
240         case kWhatStart:
241         {
242             sp<RefBase> obj;
243             CHECK(msg->findObject("meta", &obj));
244 
245             {
246                 Mutexed<Queue>::Locked queue(mQueue);
247                 queue->mPulling = true;
248             }
249 
250             status_t err = mSource->start(static_cast<MetaData *>(obj.get()));
251 
252             if (err == OK) {
253                 schedulePull();
254             }
255 
256             sp<AMessage> response = new AMessage;
257             response->setInt32("err", err);
258 
259             sp<AReplyToken> replyID;
260             CHECK(msg->senderAwaitsResponse(&replyID));
261             response->postReply(replyID);
262             break;
263         }
264 
265         case kWhatSetStopTimeUs:
266         {
267             sp<AReplyToken> replyID;
268             CHECK(msg->senderAwaitsResponse(&replyID));
269             int64_t stopTimeUs;
270             CHECK(msg->findInt64("stop-time-us", &stopTimeUs));
271             status_t err = mSource->setStopTimeUs(stopTimeUs);
272 
273             sp<AMessage> response = new AMessage;
274             response->setInt32("err", err);
275             response->postReply(replyID);
276             break;
277         }
278 
279         case kWhatStop:
280         {
281             mSource->stop();
282 
283             sp<AMessage> response = new AMessage;
284             response->setInt32("err", OK);
285 
286             sp<AReplyToken> replyID;
287             CHECK(msg->senderAwaitsResponse(&replyID));
288             response->postReply(replyID);
289             break;
290         }
291 
292         case kWhatPull:
293         {
294             Mutexed<Queue>::Locked queue(mQueue);
295             queue->mReadPendingSince = ALooper::GetNowUs();
296             if (!queue->mPulling) {
297                 handleEOS();
298                 break;
299             }
300 
301             queue.unlock();
302             MediaBufferBase *mbuf = NULL;
303             status_t err = mSource->read(&mbuf);
304             queue.lock();
305 
306             queue->mReadPendingSince = 0;
307             // if we need to discard buffer
308             if (!queue->mPulling || queue->mPaused || err != OK) {
309                 if (mbuf != NULL) {
310                     mbuf->release();
311                     mbuf = NULL;
312                 }
313                 if (queue->mPulling && err == OK) {
314                     msg->post(); // if simply paused, keep pulling source
315                     break;
316                 } else if (err == ERROR_END_OF_STREAM) {
317                     ALOGV("stream ended, mbuf %p", mbuf);
318                 } else if (err != OK) {
319                     ALOGE("error %d reading stream.", err);
320                 }
321             }
322 
323             if (mbuf != NULL) {
324                 queue->pushBuffer(mbuf);
325             }
326 
327             queue.unlock();
328 
329             if (mbuf != NULL) {
330                 mNotify->post();
331                 msg->post();
332             } else {
333                 handleEOS();
334             }
335             break;
336         }
337 
338         default:
339             TRESPASS();
340     }
341 }
342 
Output()343 MediaCodecSource::Output::Output()
344     : mEncoderReachedEOS(false),
345       mErrorCode(OK) {
346 }
347 
348 // static
Create(const sp<ALooper> & looper,const sp<AMessage> & format,const sp<MediaSource> & source,const sp<PersistentSurface> & persistentSurface,uint32_t flags)349 sp<MediaCodecSource> MediaCodecSource::Create(
350         const sp<ALooper> &looper,
351         const sp<AMessage> &format,
352         const sp<MediaSource> &source,
353         const sp<PersistentSurface> &persistentSurface,
354         uint32_t flags) {
355     sp<MediaCodecSource> mediaSource = new MediaCodecSource(
356             looper, format, source, persistentSurface, flags);
357 
358     if (mediaSource->init() == OK) {
359         return mediaSource;
360     }
361     return NULL;
362 }
363 
setInputBufferTimeOffset(int64_t timeOffsetUs)364 status_t MediaCodecSource::setInputBufferTimeOffset(int64_t timeOffsetUs) {
365     sp<AMessage> msg = new AMessage(kWhatSetInputBufferTimeOffset, mReflector);
366     msg->setInt64(PARAMETER_KEY_OFFSET_TIME, timeOffsetUs);
367     return postSynchronouslyAndReturnError(msg);
368 }
369 
getFirstSampleSystemTimeUs()370 int64_t MediaCodecSource::getFirstSampleSystemTimeUs() {
371     sp<AMessage> msg = new AMessage(kWhatGetFirstSampleSystemTimeUs, mReflector);
372     sp<AMessage> response;
373     msg->postAndAwaitResponse(&response);
374     int64_t timeUs;
375     if (!response->findInt64("time-us", &timeUs)) {
376         timeUs = -1LL;
377     }
378     return timeUs;
379 }
380 
start(MetaData * params)381 status_t MediaCodecSource::start(MetaData* params) {
382     sp<AMessage> msg = new AMessage(kWhatStart, mReflector);
383     msg->setObject("meta", params);
384     return postSynchronouslyAndReturnError(msg);
385 }
386 
stop()387 status_t MediaCodecSource::stop() {
388     sp<AMessage> msg = new AMessage(kWhatStop, mReflector);
389     return postSynchronouslyAndReturnError(msg);
390 }
391 
392 
setStopTimeUs(int64_t stopTimeUs)393 status_t MediaCodecSource::setStopTimeUs(int64_t stopTimeUs) {
394     sp<AMessage> msg = new AMessage(kWhatSetStopTimeUs, mReflector);
395     msg->setInt64("stop-time-us", stopTimeUs);
396     return postSynchronouslyAndReturnError(msg);
397 }
398 
pause(MetaData * params)399 status_t MediaCodecSource::pause(MetaData* params) {
400     sp<AMessage> msg = new AMessage(kWhatPause, mReflector);
401     msg->setObject("meta", params);
402     msg->post();
403     return OK;
404 }
405 
getFormat()406 sp<MetaData> MediaCodecSource::getFormat() {
407     Mutexed<sp<MetaData>>::Locked meta(mMeta);
408     return *meta;
409 }
410 
getGraphicBufferProducer()411 sp<IGraphicBufferProducer> MediaCodecSource::getGraphicBufferProducer() {
412     CHECK(mFlags & FLAG_USE_SURFACE_INPUT);
413     return mGraphicBufferProducer;
414 }
415 
read(MediaBufferBase ** buffer,const ReadOptions *)416 status_t MediaCodecSource::read(
417         MediaBufferBase** buffer, const ReadOptions* /* options */) {
418     Mutexed<Output>::Locked output(mOutput);
419 
420     *buffer = NULL;
421     while (output->mBufferQueue.size() == 0 && !output->mEncoderReachedEOS) {
422         output.waitForCondition(output->mCond);
423     }
424     if (!output->mEncoderReachedEOS) {
425         *buffer = *output->mBufferQueue.begin();
426         output->mBufferQueue.erase(output->mBufferQueue.begin());
427         return OK;
428     }
429     return output->mErrorCode;
430 }
431 
signalBufferReturned(MediaBufferBase * buffer)432 void MediaCodecSource::signalBufferReturned(MediaBufferBase *buffer) {
433     buffer->setObserver(0);
434     buffer->release();
435 }
436 
MediaCodecSource(const sp<ALooper> & looper,const sp<AMessage> & outputFormat,const sp<MediaSource> & source,const sp<PersistentSurface> & persistentSurface,uint32_t flags)437 MediaCodecSource::MediaCodecSource(
438         const sp<ALooper> &looper,
439         const sp<AMessage> &outputFormat,
440         const sp<MediaSource> &source,
441         const sp<PersistentSurface> &persistentSurface,
442         uint32_t flags)
443     : mLooper(looper),
444       mOutputFormat(outputFormat),
445       mMeta(new MetaData),
446       mFlags(flags),
447       mIsVideo(false),
448       mStarted(false),
449       mStopping(false),
450       mDoMoreWorkPending(false),
451       mSetEncoderFormat(false),
452       mEncoderFormat(0),
453       mEncoderDataSpace(0),
454       mPersistentSurface(persistentSurface),
455       mInputBufferTimeOffsetUs(0),
456       mFirstSampleSystemTimeUs(-1LL),
457       mPausePending(false),
458       mFirstSampleTimeUs(-1LL),
459       mGeneration(0) {
460     CHECK(mLooper != NULL);
461 
462     if (!(mFlags & FLAG_USE_SURFACE_INPUT)) {
463         mPuller = new Puller(source);
464     }
465 }
466 
~MediaCodecSource()467 MediaCodecSource::~MediaCodecSource() {
468     releaseEncoder();
469 
470     mCodecLooper->stop();
471     mLooper->unregisterHandler(mReflector->id());
472 }
473 
init()474 status_t MediaCodecSource::init() {
475     status_t err = initEncoder();
476 
477     if (err != OK) {
478         releaseEncoder();
479     }
480 
481     return err;
482 }
483 
initEncoder()484 status_t MediaCodecSource::initEncoder() {
485 
486     mReflector = new AHandlerReflector<MediaCodecSource>(this);
487     mLooper->registerHandler(mReflector);
488 
489     mCodecLooper = new ALooper;
490     mCodecLooper->setName("codec_looper");
491     mCodecLooper->start();
492 
493     if (mFlags & FLAG_USE_SURFACE_INPUT) {
494         mOutputFormat->setInt32(KEY_CREATE_INPUT_SURFACE_SUSPENDED, 1);
495     }
496 
497     AString outputMIME;
498     CHECK(mOutputFormat->findString("mime", &outputMIME));
499     mIsVideo = outputMIME.startsWithIgnoreCase("video/");
500 
501     AString name;
502     status_t err = NO_INIT;
503     if (mOutputFormat->findString("testing-name", &name)) {
504         mEncoder = MediaCodec::CreateByComponentName(mCodecLooper, name);
505 
506         mEncoderActivityNotify = new AMessage(kWhatEncoderActivity, mReflector);
507         mEncoder->setCallback(mEncoderActivityNotify);
508 
509         err = mEncoder->configure(
510                     mOutputFormat,
511                     NULL /* nativeWindow */,
512                     NULL /* crypto */,
513                     MediaCodec::CONFIGURE_FLAG_ENCODE);
514     } else {
515         Vector<AString> matchingCodecs;
516         MediaCodecList::findMatchingCodecs(
517                 outputMIME.c_str(), true /* encoder */,
518                 ((mFlags & FLAG_PREFER_SOFTWARE_CODEC) ? MediaCodecList::kPreferSoftwareCodecs : 0),
519                 &matchingCodecs);
520 
521         for (size_t ix = 0; ix < matchingCodecs.size(); ++ix) {
522             mEncoder = MediaCodec::CreateByComponentName(
523                     mCodecLooper, matchingCodecs[ix]);
524 
525             if (mEncoder == NULL) {
526                 continue;
527             }
528 
529             ALOGV("output format is '%s'", mOutputFormat->debugString(0).c_str());
530 
531             mEncoderActivityNotify = new AMessage(kWhatEncoderActivity, mReflector);
532             mEncoder->setCallback(mEncoderActivityNotify);
533 
534             err = mEncoder->configure(
535                         mOutputFormat,
536                         NULL /* nativeWindow */,
537                         NULL /* crypto */,
538                         MediaCodec::CONFIGURE_FLAG_ENCODE);
539 
540             if (err == OK) {
541                 break;
542             }
543             mEncoder->release();
544             mEncoder = NULL;
545         }
546     }
547 
548     if (err != OK) {
549         return err;
550     }
551 
552     mEncoder->getOutputFormat(&mOutputFormat);
553     sp<MetaData> meta = new MetaData;
554     convertMessageToMetaData(mOutputFormat, meta);
555     mMeta.lock().set(meta);
556 
557     if (mFlags & FLAG_USE_SURFACE_INPUT) {
558         CHECK(mIsVideo);
559 
560         if (mPersistentSurface != NULL) {
561             // When using persistent surface, we are only interested in the
562             // consumer, but have to use PersistentSurface as a wrapper to
563             // pass consumer over messages (similar to BufferProducerWrapper)
564             err = mEncoder->setInputSurface(mPersistentSurface);
565         } else {
566             err = mEncoder->createInputSurface(&mGraphicBufferProducer);
567         }
568 
569         if (err != OK) {
570             return err;
571         }
572     }
573 
574     sp<AMessage> inputFormat;
575     int32_t usingSwReadOften;
576     mSetEncoderFormat = false;
577     if (mEncoder->getInputFormat(&inputFormat) == OK) {
578         mSetEncoderFormat = true;
579         if (inputFormat->findInt32("using-sw-read-often", &usingSwReadOften)
580                 && usingSwReadOften) {
581             // this is a SW encoder; signal source to allocate SW readable buffers
582             mEncoderFormat = kDefaultSwVideoEncoderFormat;
583         } else {
584             mEncoderFormat = kDefaultHwVideoEncoderFormat;
585         }
586         if (!inputFormat->findInt32("android._dataspace", &mEncoderDataSpace)) {
587             mEncoderDataSpace = kDefaultVideoEncoderDataSpace;
588         }
589         ALOGV("setting dataspace %#x, format %#x", mEncoderDataSpace, mEncoderFormat);
590     }
591 
592     err = mEncoder->start();
593 
594     if (err != OK) {
595         return err;
596     }
597 
598     {
599         Mutexed<Output>::Locked output(mOutput);
600         output->mEncoderReachedEOS = false;
601         output->mErrorCode = OK;
602     }
603 
604     return OK;
605 }
606 
releaseEncoder()607 void MediaCodecSource::releaseEncoder() {
608     if (mEncoder == NULL) {
609         return;
610     }
611 
612     mEncoder->release();
613     mEncoder.clear();
614 }
615 
postSynchronouslyAndReturnError(const sp<AMessage> & msg)616 status_t MediaCodecSource::postSynchronouslyAndReturnError(
617         const sp<AMessage> &msg) {
618     sp<AMessage> response;
619     status_t err = msg->postAndAwaitResponse(&response);
620 
621     if (err != OK) {
622         return err;
623     }
624 
625     if (!response->findInt32("err", &err)) {
626         err = OK;
627     }
628 
629     return err;
630 }
631 
signalEOS(status_t err)632 void MediaCodecSource::signalEOS(status_t err) {
633     bool reachedEOS = false;
634     {
635         Mutexed<Output>::Locked output(mOutput);
636         reachedEOS = output->mEncoderReachedEOS;
637         if (!reachedEOS) {
638             ALOGV("encoder (%s) reached EOS", mIsVideo ? "video" : "audio");
639             // release all unread media buffers
640             for (List<MediaBufferBase*>::iterator it = output->mBufferQueue.begin();
641                     it != output->mBufferQueue.end(); it++) {
642                 (*it)->release();
643             }
644             output->mBufferQueue.clear();
645             output->mEncoderReachedEOS = true;
646             output->mErrorCode = err;
647             if (!(mFlags & FLAG_USE_SURFACE_INPUT)) {
648                 mStopping = true;
649                 mPuller->stop();
650             }
651             output->mCond.signal();
652 
653             reachedEOS = true;
654             output.unlock();
655             releaseEncoder();
656         }
657     }
658 
659     if (mStopping && reachedEOS) {
660         ALOGI("encoder (%s) stopped", mIsVideo ? "video" : "audio");
661         if (mPuller != NULL) {
662             mPuller->stopSource();
663         }
664         ALOGV("source (%s) stopped", mIsVideo ? "video" : "audio");
665         // posting reply to everyone that's waiting
666         List<sp<AReplyToken>>::iterator it;
667         for (it = mStopReplyIDQueue.begin();
668                 it != mStopReplyIDQueue.end(); it++) {
669             (new AMessage)->postReply(*it);
670         }
671         mStopReplyIDQueue.clear();
672         mStopping = false;
673         ++mGeneration;
674     }
675 }
676 
resume(int64_t resumeStartTimeUs)677 void MediaCodecSource::resume(int64_t resumeStartTimeUs) {
678     CHECK(mFlags & FLAG_USE_SURFACE_INPUT);
679     if (mEncoder != NULL) {
680         sp<AMessage> params = new AMessage;
681         params->setInt32(PARAMETER_KEY_SUSPEND, false);
682         if (resumeStartTimeUs > 0) {
683             params->setInt64(PARAMETER_KEY_SUSPEND_TIME, resumeStartTimeUs);
684         }
685         mEncoder->setParameters(params);
686     }
687 }
688 
feedEncoderInputBuffers()689 status_t MediaCodecSource::feedEncoderInputBuffers() {
690     MediaBufferBase* mbuf = NULL;
691     while (!mAvailEncoderInputIndices.empty() && mPuller->readBuffer(&mbuf)) {
692         size_t bufferIndex = *mAvailEncoderInputIndices.begin();
693         mAvailEncoderInputIndices.erase(mAvailEncoderInputIndices.begin());
694 
695         int64_t timeUs = 0LL;
696         uint32_t flags = 0;
697         size_t size = 0;
698 
699         if (mbuf != NULL) {
700             CHECK(mbuf->meta_data().findInt64(kKeyTime, &timeUs));
701             if (mFirstSampleSystemTimeUs < 0LL) {
702                 mFirstSampleSystemTimeUs = systemTime() / 1000;
703                 if (mPausePending) {
704                     mPausePending = false;
705                     onPause(mFirstSampleSystemTimeUs);
706                     mbuf->release();
707                     mAvailEncoderInputIndices.push_back(bufferIndex);
708                     return OK;
709                 }
710             }
711 
712             timeUs += mInputBufferTimeOffsetUs;
713 
714             // push decoding time for video, or drift time for audio
715             if (mIsVideo) {
716                 mDecodingTimeQueue.push_back(timeUs);
717             } else {
718 #if DEBUG_DRIFT_TIME
719                 if (mFirstSampleTimeUs < 0ll) {
720                     mFirstSampleTimeUs = timeUs;
721                 }
722                 int64_t driftTimeUs = 0;
723                 if (mbuf->meta_data().findInt64(kKeyDriftTime, &driftTimeUs)
724                         && driftTimeUs) {
725                     driftTimeUs = timeUs - mFirstSampleTimeUs - driftTimeUs;
726                 }
727                 mDriftTimeQueue.push_back(driftTimeUs);
728 #endif // DEBUG_DRIFT_TIME
729             }
730 
731             sp<MediaCodecBuffer> inbuf;
732             status_t err = mEncoder->getInputBuffer(bufferIndex, &inbuf);
733 
734             if (err != OK || inbuf == NULL || inbuf->data() == NULL
735                     || mbuf->data() == NULL || mbuf->size() == 0) {
736                 mbuf->release();
737                 signalEOS();
738                 break;
739             }
740 
741             size = mbuf->size();
742 
743             memcpy(inbuf->data(), mbuf->data(), size);
744 
745             if (mIsVideo) {
746                 // video encoder will release MediaBuffer when done
747                 // with underlying data.
748                 inbuf->meta()->setObject("mediaBufferHolder", new MediaBufferHolder(mbuf));
749                 mbuf->release();
750             } else {
751                 mbuf->release();
752             }
753         } else {
754             flags = MediaCodec::BUFFER_FLAG_EOS;
755         }
756 
757         status_t err = mEncoder->queueInputBuffer(
758                 bufferIndex, 0, size, timeUs, flags);
759 
760         if (err != OK) {
761             return err;
762         }
763     }
764 
765     return OK;
766 }
767 
onStart(MetaData * params)768 status_t MediaCodecSource::onStart(MetaData *params) {
769     if (mStopping || mOutput.lock()->mEncoderReachedEOS) {
770         ALOGE("Failed to start while we're stopping or encoder already stopped due to EOS error");
771         return INVALID_OPERATION;
772     }
773     int64_t startTimeUs;
774     if (params == NULL || !params->findInt64(kKeyTime, &startTimeUs)) {
775         startTimeUs = -1LL;
776     }
777 
778     if (mStarted) {
779         ALOGI("MediaCodecSource (%s) resuming", mIsVideo ? "video" : "audio");
780         if (mPausePending) {
781             mPausePending = false;
782             return OK;
783         }
784         if (mIsVideo) {
785             mEncoder->requestIDRFrame();
786         }
787         if (mFlags & FLAG_USE_SURFACE_INPUT) {
788             resume(startTimeUs);
789         } else {
790             CHECK(mPuller != NULL);
791             mPuller->resume();
792         }
793         return OK;
794     }
795 
796     ALOGI("MediaCodecSource (%s) starting", mIsVideo ? "video" : "audio");
797 
798     status_t err = OK;
799 
800     if (mFlags & FLAG_USE_SURFACE_INPUT) {
801         if (mEncoder != NULL) {
802             sp<AMessage> params = new AMessage;
803             params->setInt32(PARAMETER_KEY_SUSPEND, false);
804             if (startTimeUs >= 0) {
805                 params->setInt64("skip-frames-before", startTimeUs);
806             }
807             mEncoder->setParameters(params);
808         }
809     } else {
810         CHECK(mPuller != NULL);
811         sp<MetaData> meta = params;
812         if (mSetEncoderFormat) {
813             if (meta == NULL) {
814                 meta = new MetaData;
815             }
816             meta->setInt32(kKeyPixelFormat, mEncoderFormat);
817             meta->setInt32(kKeyColorSpace, mEncoderDataSpace);
818         }
819 
820         sp<AMessage> notify = new AMessage(kWhatPullerNotify, mReflector);
821         err = mPuller->start(meta.get(), notify);
822         if (err != OK) {
823             return err;
824         }
825     }
826 
827     ALOGI("MediaCodecSource (%s) started", mIsVideo ? "video" : "audio");
828 
829     mStarted = true;
830     return OK;
831 }
832 
onPause(int64_t pauseStartTimeUs)833 void MediaCodecSource::onPause(int64_t pauseStartTimeUs) {
834     if (mStopping || mOutput.lock()->mEncoderReachedEOS) {
835         // Nothing to do
836     } else if ((mFlags & FLAG_USE_SURFACE_INPUT) && (mEncoder != NULL)) {
837         sp<AMessage> params = new AMessage;
838         params->setInt32(PARAMETER_KEY_SUSPEND, true);
839         params->setInt64(PARAMETER_KEY_SUSPEND_TIME, pauseStartTimeUs);
840         mEncoder->setParameters(params);
841     } else {
842         CHECK(mPuller != NULL);
843         mPuller->pause();
844     }
845 }
846 
onMessageReceived(const sp<AMessage> & msg)847 void MediaCodecSource::onMessageReceived(const sp<AMessage> &msg) {
848     switch (msg->what()) {
849     case kWhatPullerNotify:
850     {
851         int32_t eos = 0;
852         if (msg->findInt32("eos", &eos) && eos) {
853             ALOGV("puller (%s) reached EOS", mIsVideo ? "video" : "audio");
854             signalEOS();
855             break;
856         }
857 
858         if (mEncoder == NULL) {
859             ALOGV("got msg '%s' after encoder shutdown.", msg->debugString().c_str());
860             break;
861         }
862 
863         feedEncoderInputBuffers();
864         break;
865     }
866     case kWhatEncoderActivity:
867     {
868         if (mEncoder == NULL) {
869             break;
870         }
871 
872         int32_t cbID;
873         CHECK(msg->findInt32("callbackID", &cbID));
874         if (cbID == MediaCodec::CB_INPUT_AVAILABLE) {
875             int32_t index;
876             CHECK(msg->findInt32("index", &index));
877 
878             mAvailEncoderInputIndices.push_back(index);
879             feedEncoderInputBuffers();
880         } else if (cbID == MediaCodec::CB_OUTPUT_FORMAT_CHANGED) {
881             status_t err = mEncoder->getOutputFormat(&mOutputFormat);
882             if (err != OK) {
883                 signalEOS(err);
884                 break;
885             }
886             sp<MetaData> meta = new MetaData;
887             convertMessageToMetaData(mOutputFormat, meta);
888             mMeta.lock().set(meta);
889         } else if (cbID == MediaCodec::CB_OUTPUT_AVAILABLE) {
890             int32_t index;
891             size_t offset;
892             size_t size;
893             int64_t timeUs;
894             int32_t flags;
895 
896             CHECK(msg->findInt32("index", &index));
897             CHECK(msg->findSize("offset", &offset));
898             CHECK(msg->findSize("size", &size));
899             CHECK(msg->findInt64("timeUs", &timeUs));
900             CHECK(msg->findInt32("flags", &flags));
901 
902             if (flags & MediaCodec::BUFFER_FLAG_EOS) {
903                 mEncoder->releaseOutputBuffer(index);
904                 signalEOS();
905                 break;
906             }
907 
908             sp<MediaCodecBuffer> outbuf;
909             status_t err = mEncoder->getOutputBuffer(index, &outbuf);
910             if (err != OK || outbuf == NULL || outbuf->data() == NULL
911                 || outbuf->size() == 0) {
912                 signalEOS();
913                 break;
914             }
915 
916             MediaBufferBase *mbuf = new MediaBuffer(outbuf->size());
917             mbuf->setObserver(this);
918             mbuf->add_ref();
919 
920             if (!(flags & MediaCodec::BUFFER_FLAG_CODECCONFIG)) {
921                 if (mIsVideo) {
922                     int64_t decodingTimeUs;
923                     if (mFlags & FLAG_USE_SURFACE_INPUT) {
924                         if (mFirstSampleSystemTimeUs < 0LL) {
925                             mFirstSampleSystemTimeUs = systemTime() / 1000;
926                             if (mPausePending) {
927                                 mPausePending = false;
928                                 onPause(mFirstSampleSystemTimeUs);
929                                 mbuf->release();
930                                 break;
931                             }
932                         }
933                         // Timestamp offset is already adjusted in GraphicBufferSource.
934                         // GraphicBufferSource is supposed to discard samples
935                         // queued before start, and offset timeUs by start time
936                         CHECK_GE(timeUs, 0LL);
937                         // TODO:
938                         // Decoding time for surface source is unavailable,
939                         // use presentation time for now. May need to move
940                         // this logic into MediaCodec.
941                         decodingTimeUs = timeUs;
942                     } else {
943                         CHECK(!mDecodingTimeQueue.empty());
944                         decodingTimeUs = *(mDecodingTimeQueue.begin());
945                         mDecodingTimeQueue.erase(mDecodingTimeQueue.begin());
946                     }
947                     mbuf->meta_data().setInt64(kKeyDecodingTime, decodingTimeUs);
948 
949                     ALOGV("[video] time %" PRId64 " us (%.2f secs), dts/pts diff %" PRId64,
950                             timeUs, timeUs / 1E6, decodingTimeUs - timeUs);
951                 } else {
952                     int64_t driftTimeUs = 0;
953 #if DEBUG_DRIFT_TIME
954                     CHECK(!mDriftTimeQueue.empty());
955                     driftTimeUs = *(mDriftTimeQueue.begin());
956                     mDriftTimeQueue.erase(mDriftTimeQueue.begin());
957                     mbuf->meta_data().setInt64(kKeyDriftTime, driftTimeUs);
958 #endif // DEBUG_DRIFT_TIME
959                     ALOGV("[audio] time %" PRId64 " us (%.2f secs), drift %" PRId64,
960                             timeUs, timeUs / 1E6, driftTimeUs);
961                 }
962                 mbuf->meta_data().setInt64(kKeyTime, timeUs);
963             } else {
964                 mbuf->meta_data().setInt64(kKeyTime, 0LL);
965                 mbuf->meta_data().setInt32(kKeyIsCodecConfig, true);
966             }
967             if (flags & MediaCodec::BUFFER_FLAG_SYNCFRAME) {
968                 mbuf->meta_data().setInt32(kKeyIsSyncFrame, true);
969             }
970             memcpy(mbuf->data(), outbuf->data(), outbuf->size());
971 
972             {
973                 Mutexed<Output>::Locked output(mOutput);
974                 output->mBufferQueue.push_back(mbuf);
975                 output->mCond.signal();
976             }
977 
978             mEncoder->releaseOutputBuffer(index);
979        } else if (cbID == MediaCodec::CB_ERROR) {
980             status_t err;
981             CHECK(msg->findInt32("err", &err));
982             ALOGE("Encoder (%s) reported error : 0x%x",
983                     mIsVideo ? "video" : "audio", err);
984             if (!(mFlags & FLAG_USE_SURFACE_INPUT)) {
985                 mStopping = true;
986                 mPuller->stop();
987             }
988             signalEOS();
989        }
990        break;
991     }
992     case kWhatStart:
993     {
994         sp<AReplyToken> replyID;
995         CHECK(msg->senderAwaitsResponse(&replyID));
996 
997         sp<RefBase> obj;
998         CHECK(msg->findObject("meta", &obj));
999         MetaData *params = static_cast<MetaData *>(obj.get());
1000 
1001         sp<AMessage> response = new AMessage;
1002         response->setInt32("err", onStart(params));
1003         response->postReply(replyID);
1004         break;
1005     }
1006     case kWhatStop:
1007     {
1008         ALOGI("encoder (%s) stopping", mIsVideo ? "video" : "audio");
1009 
1010         sp<AReplyToken> replyID;
1011         CHECK(msg->senderAwaitsResponse(&replyID));
1012 
1013         if (mOutput.lock()->mEncoderReachedEOS) {
1014             // if we already reached EOS, reply and return now
1015             ALOGI("encoder (%s) already stopped",
1016                     mIsVideo ? "video" : "audio");
1017             (new AMessage)->postReply(replyID);
1018             break;
1019         }
1020 
1021         mStopReplyIDQueue.push_back(replyID);
1022         if (mStopping) {
1023             // nothing to do if we're already stopping, reply will be posted
1024             // to all when we're stopped.
1025             break;
1026         }
1027 
1028         mStopping = true;
1029 
1030         int64_t timeoutUs = kStopTimeoutUs;
1031         // if using surface, signal source EOS and wait for EOS to come back.
1032         // otherwise, stop puller (which also clears the input buffer queue)
1033         // and wait for the EOS message. We cannot call source->stop() because
1034         // the encoder may still be processing input buffers.
1035         if (mFlags & FLAG_USE_SURFACE_INPUT) {
1036             mEncoder->signalEndOfInputStream();
1037             // Increase the timeout if there is delay in the GraphicBufferSource
1038             sp<AMessage> inputFormat;
1039             int64_t stopTimeOffsetUs;
1040             if (mEncoder->getInputFormat(&inputFormat) == OK &&
1041                     inputFormat->findInt64("android._stop-time-offset-us", &stopTimeOffsetUs) &&
1042                     stopTimeOffsetUs > 0) {
1043                 if (stopTimeOffsetUs > kMaxStopTimeOffsetUs) {
1044                     ALOGW("Source stopTimeOffsetUs %lld too large, limit at %lld us",
1045                         (long long)stopTimeOffsetUs, (long long)kMaxStopTimeOffsetUs);
1046                     stopTimeOffsetUs = kMaxStopTimeOffsetUs;
1047                 }
1048                 timeoutUs += stopTimeOffsetUs;
1049             } else {
1050                 // Use kMaxStopTimeOffsetUs if stop time offset is not provided by input source
1051                 timeoutUs = kMaxStopTimeOffsetUs;
1052             }
1053         } else {
1054             mPuller->stop();
1055         }
1056 
1057         // complete stop even if encoder/puller stalled
1058         sp<AMessage> timeoutMsg = new AMessage(kWhatStopStalled, mReflector);
1059         timeoutMsg->setInt32("generation", mGeneration);
1060         timeoutMsg->post(timeoutUs);
1061         break;
1062     }
1063 
1064     case kWhatStopStalled:
1065     {
1066         int32_t generation;
1067         CHECK(msg->findInt32("generation", &generation));
1068         if (generation != mGeneration) {
1069              break;
1070         }
1071 
1072         if (!(mFlags & FLAG_USE_SURFACE_INPUT)) {
1073             ALOGV("source (%s) stopping", mIsVideo ? "video" : "audio");
1074             mPuller->interruptSource();
1075             ALOGV("source (%s) stopped", mIsVideo ? "video" : "audio");
1076         }
1077         signalEOS();
1078         break;
1079     }
1080 
1081     case kWhatPause:
1082     {
1083         if (mFirstSampleSystemTimeUs < 0) {
1084             mPausePending = true;
1085         } else {
1086             sp<RefBase> obj;
1087             CHECK(msg->findObject("meta", &obj));
1088             MetaData *params = static_cast<MetaData *>(obj.get());
1089             int64_t pauseStartTimeUs = -1;
1090             if (params == NULL || !params->findInt64(kKeyTime, &pauseStartTimeUs)) {
1091                 pauseStartTimeUs = -1LL;
1092             }
1093             onPause(pauseStartTimeUs);
1094         }
1095         break;
1096     }
1097     case kWhatSetInputBufferTimeOffset:
1098     {
1099         sp<AReplyToken> replyID;
1100         CHECK(msg->senderAwaitsResponse(&replyID));
1101         status_t err = OK;
1102         CHECK(msg->findInt64(PARAMETER_KEY_OFFSET_TIME, &mInputBufferTimeOffsetUs));
1103 
1104         // Propagate the timestamp offset to GraphicBufferSource.
1105         if (mFlags & FLAG_USE_SURFACE_INPUT) {
1106             sp<AMessage> params = new AMessage;
1107             params->setInt64(PARAMETER_KEY_OFFSET_TIME, mInputBufferTimeOffsetUs);
1108             err = mEncoder->setParameters(params);
1109         }
1110 
1111         sp<AMessage> response = new AMessage;
1112         response->setInt32("err", err);
1113         response->postReply(replyID);
1114         break;
1115     }
1116     case kWhatSetStopTimeUs:
1117     {
1118         sp<AReplyToken> replyID;
1119         CHECK(msg->senderAwaitsResponse(&replyID));
1120         status_t err = OK;
1121         int64_t stopTimeUs;
1122         CHECK(msg->findInt64("stop-time-us", &stopTimeUs));
1123 
1124         // Propagate the stop time to GraphicBufferSource.
1125         if (mFlags & FLAG_USE_SURFACE_INPUT) {
1126             sp<AMessage> params = new AMessage;
1127             params->setInt64("stop-time-us", stopTimeUs);
1128             err = mEncoder->setParameters(params);
1129         } else {
1130             err = mPuller->setStopTimeUs(stopTimeUs);
1131         }
1132 
1133         sp<AMessage> response = new AMessage;
1134         response->setInt32("err", err);
1135         response->postReply(replyID);
1136         break;
1137     }
1138     case kWhatGetFirstSampleSystemTimeUs:
1139     {
1140         sp<AReplyToken> replyID;
1141         CHECK(msg->senderAwaitsResponse(&replyID));
1142 
1143         sp<AMessage> response = new AMessage;
1144         response->setInt64("time-us", mFirstSampleSystemTimeUs);
1145         response->postReply(replyID);
1146         break;
1147     }
1148     default:
1149         TRESPASS();
1150     }
1151 }
1152 
1153 } // namespace android
1154