1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "SimpleC2Component"
19 #include <log/log.h>
20 
21 #include <cutils/properties.h>
22 #include <media/stagefright/foundation/AMessage.h>
23 
24 #include <inttypes.h>
25 
26 #include <C2Config.h>
27 #include <C2Debug.h>
28 #include <C2PlatformSupport.h>
29 #include <SimpleC2Component.h>
30 
31 namespace android {
32 
pop_front()33 std::unique_ptr<C2Work> SimpleC2Component::WorkQueue::pop_front() {
34     std::unique_ptr<C2Work> work = std::move(mQueue.front().work);
35     mQueue.pop_front();
36     return work;
37 }
38 
push_back(std::unique_ptr<C2Work> work)39 void SimpleC2Component::WorkQueue::push_back(std::unique_ptr<C2Work> work) {
40     mQueue.push_back({ std::move(work), NO_DRAIN });
41 }
42 
empty() const43 bool SimpleC2Component::WorkQueue::empty() const {
44     return mQueue.empty();
45 }
46 
clear()47 void SimpleC2Component::WorkQueue::clear() {
48     mQueue.clear();
49 }
50 
drainMode() const51 uint32_t SimpleC2Component::WorkQueue::drainMode() const {
52     return mQueue.front().drainMode;
53 }
54 
markDrain(uint32_t drainMode)55 void SimpleC2Component::WorkQueue::markDrain(uint32_t drainMode) {
56     mQueue.push_back({ nullptr, drainMode });
57 }
58 
59 ////////////////////////////////////////////////////////////////////////////////
60 
WorkHandler()61 SimpleC2Component::WorkHandler::WorkHandler() : mRunning(false) {}
62 
setComponent(const std::shared_ptr<SimpleC2Component> & thiz)63 void SimpleC2Component::WorkHandler::setComponent(
64         const std::shared_ptr<SimpleC2Component> &thiz) {
65     mThiz = thiz;
66 }
67 
Reply(const sp<AMessage> & msg,int32_t * err=nullptr)68 static void Reply(const sp<AMessage> &msg, int32_t *err = nullptr) {
69     sp<AReplyToken> replyId;
70     CHECK(msg->senderAwaitsResponse(&replyId));
71     sp<AMessage> reply = new AMessage;
72     if (err) {
73         reply->setInt32("err", *err);
74     }
75     reply->postReply(replyId);
76 }
77 
onMessageReceived(const sp<AMessage> & msg)78 void SimpleC2Component::WorkHandler::onMessageReceived(const sp<AMessage> &msg) {
79     std::shared_ptr<SimpleC2Component> thiz = mThiz.lock();
80     if (!thiz) {
81         ALOGD("component not yet set; msg = %s", msg->debugString().c_str());
82         sp<AReplyToken> replyId;
83         if (msg->senderAwaitsResponse(&replyId)) {
84             sp<AMessage> reply = new AMessage;
85             reply->setInt32("err", C2_CORRUPTED);
86             reply->postReply(replyId);
87         }
88         return;
89     }
90 
91     switch (msg->what()) {
92         case kWhatProcess: {
93             if (mRunning) {
94                 if (thiz->processQueue()) {
95                     (new AMessage(kWhatProcess, this))->post();
96                 }
97             } else {
98                 ALOGV("Ignore process message as we're not running");
99             }
100             break;
101         }
102         case kWhatInit: {
103             int32_t err = thiz->onInit();
104             Reply(msg, &err);
105             [[fallthrough]];
106         }
107         case kWhatStart: {
108             mRunning = true;
109             break;
110         }
111         case kWhatStop: {
112             int32_t err = thiz->onStop();
113             Reply(msg, &err);
114             break;
115         }
116         case kWhatReset: {
117             thiz->onReset();
118             mRunning = false;
119             Reply(msg);
120             break;
121         }
122         case kWhatRelease: {
123             thiz->onRelease();
124             mRunning = false;
125             Reply(msg);
126             break;
127         }
128         default: {
129             ALOGD("Unrecognized msg: %d", msg->what());
130             break;
131         }
132     }
133 }
134 
135 ////////////////////////////////////////////////////////////////////////////////
136 
137 namespace {
138 
139 struct DummyReadView : public C2ReadView {
DummyReadViewandroid::__anon98cc350a0111::DummyReadView140     DummyReadView() : C2ReadView(C2_NO_INIT) {}
141 };
142 
143 }  // namespace
144 
SimpleC2Component(const std::shared_ptr<C2ComponentInterface> & intf)145 SimpleC2Component::SimpleC2Component(
146         const std::shared_ptr<C2ComponentInterface> &intf)
147     : mDummyReadView(DummyReadView()),
148       mIntf(intf),
149       mLooper(new ALooper),
150       mHandler(new WorkHandler) {
151     mLooper->setName(intf->getName().c_str());
152     (void)mLooper->registerHandler(mHandler);
153     mLooper->start(false, false, ANDROID_PRIORITY_VIDEO);
154 }
155 
~SimpleC2Component()156 SimpleC2Component::~SimpleC2Component() {
157     mLooper->unregisterHandler(mHandler->id());
158     (void)mLooper->stop();
159 }
160 
setListener_vb(const std::shared_ptr<C2Component::Listener> & listener,c2_blocking_t mayBlock)161 c2_status_t SimpleC2Component::setListener_vb(
162         const std::shared_ptr<C2Component::Listener> &listener, c2_blocking_t mayBlock) {
163     mHandler->setComponent(shared_from_this());
164 
165     Mutexed<ExecState>::Locked state(mExecState);
166     if (state->mState == RUNNING) {
167         if (listener) {
168             return C2_BAD_STATE;
169         } else if (!mayBlock) {
170             return C2_BLOCKING;
171         }
172     }
173     state->mListener = listener;
174     // TODO: wait for listener change to have taken place before returning
175     // (e.g. if there is an ongoing listener callback)
176     return C2_OK;
177 }
178 
queue_nb(std::list<std::unique_ptr<C2Work>> * const items)179 c2_status_t SimpleC2Component::queue_nb(std::list<std::unique_ptr<C2Work>> * const items) {
180     {
181         Mutexed<ExecState>::Locked state(mExecState);
182         if (state->mState != RUNNING) {
183             return C2_BAD_STATE;
184         }
185     }
186     bool queueWasEmpty = false;
187     {
188         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
189         queueWasEmpty = queue->empty();
190         while (!items->empty()) {
191             queue->push_back(std::move(items->front()));
192             items->pop_front();
193         }
194     }
195     if (queueWasEmpty) {
196         (new AMessage(WorkHandler::kWhatProcess, mHandler))->post();
197     }
198     return C2_OK;
199 }
200 
announce_nb(const std::vector<C2WorkOutline> & items)201 c2_status_t SimpleC2Component::announce_nb(const std::vector<C2WorkOutline> &items) {
202     (void)items;
203     return C2_OMITTED;
204 }
205 
flush_sm(flush_mode_t flushMode,std::list<std::unique_ptr<C2Work>> * const flushedWork)206 c2_status_t SimpleC2Component::flush_sm(
207         flush_mode_t flushMode, std::list<std::unique_ptr<C2Work>>* const flushedWork) {
208     (void)flushMode;
209     {
210         Mutexed<ExecState>::Locked state(mExecState);
211         if (state->mState != RUNNING) {
212             return C2_BAD_STATE;
213         }
214     }
215     {
216         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
217         queue->incGeneration();
218         // TODO: queue->splicedBy(flushedWork, flushedWork->end());
219         while (!queue->empty()) {
220             std::unique_ptr<C2Work> work = queue->pop_front();
221             if (work) {
222                 flushedWork->push_back(std::move(work));
223             }
224         }
225     }
226     {
227         Mutexed<PendingWork>::Locked pending(mPendingWork);
228         while (!pending->empty()) {
229             flushedWork->push_back(std::move(pending->begin()->second));
230             pending->erase(pending->begin());
231         }
232     }
233 
234     return C2_OK;
235 }
236 
drain_nb(drain_mode_t drainMode)237 c2_status_t SimpleC2Component::drain_nb(drain_mode_t drainMode) {
238     if (drainMode == DRAIN_CHAIN) {
239         return C2_OMITTED;
240     }
241     {
242         Mutexed<ExecState>::Locked state(mExecState);
243         if (state->mState != RUNNING) {
244             return C2_BAD_STATE;
245         }
246     }
247     bool queueWasEmpty = false;
248     {
249         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
250         queueWasEmpty = queue->empty();
251         queue->markDrain(drainMode);
252     }
253     if (queueWasEmpty) {
254         (new AMessage(WorkHandler::kWhatProcess, mHandler))->post();
255     }
256 
257     return C2_OK;
258 }
259 
start()260 c2_status_t SimpleC2Component::start() {
261     Mutexed<ExecState>::Locked state(mExecState);
262     if (state->mState == RUNNING) {
263         return C2_BAD_STATE;
264     }
265     bool needsInit = (state->mState == UNINITIALIZED);
266     state.unlock();
267     if (needsInit) {
268         sp<AMessage> reply;
269         (new AMessage(WorkHandler::kWhatInit, mHandler))->postAndAwaitResponse(&reply);
270         int32_t err;
271         CHECK(reply->findInt32("err", &err));
272         if (err != C2_OK) {
273             return (c2_status_t)err;
274         }
275     } else {
276         (new AMessage(WorkHandler::kWhatStart, mHandler))->post();
277     }
278     state.lock();
279     state->mState = RUNNING;
280     return C2_OK;
281 }
282 
stop()283 c2_status_t SimpleC2Component::stop() {
284     ALOGV("stop");
285     {
286         Mutexed<ExecState>::Locked state(mExecState);
287         if (state->mState != RUNNING) {
288             return C2_BAD_STATE;
289         }
290         state->mState = STOPPED;
291     }
292     {
293         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
294         queue->clear();
295     }
296     {
297         Mutexed<PendingWork>::Locked pending(mPendingWork);
298         pending->clear();
299     }
300     sp<AMessage> reply;
301     (new AMessage(WorkHandler::kWhatStop, mHandler))->postAndAwaitResponse(&reply);
302     int32_t err;
303     CHECK(reply->findInt32("err", &err));
304     if (err != C2_OK) {
305         return (c2_status_t)err;
306     }
307     return C2_OK;
308 }
309 
reset()310 c2_status_t SimpleC2Component::reset() {
311     ALOGV("reset");
312     {
313         Mutexed<ExecState>::Locked state(mExecState);
314         state->mState = UNINITIALIZED;
315     }
316     {
317         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
318         queue->clear();
319     }
320     {
321         Mutexed<PendingWork>::Locked pending(mPendingWork);
322         pending->clear();
323     }
324     sp<AMessage> reply;
325     (new AMessage(WorkHandler::kWhatReset, mHandler))->postAndAwaitResponse(&reply);
326     return C2_OK;
327 }
328 
release()329 c2_status_t SimpleC2Component::release() {
330     ALOGV("release");
331     sp<AMessage> reply;
332     (new AMessage(WorkHandler::kWhatRelease, mHandler))->postAndAwaitResponse(&reply);
333     return C2_OK;
334 }
335 
intf()336 std::shared_ptr<C2ComponentInterface> SimpleC2Component::intf() {
337     return mIntf;
338 }
339 
340 namespace {
341 
vec(std::unique_ptr<C2Work> & work)342 std::list<std::unique_ptr<C2Work>> vec(std::unique_ptr<C2Work> &work) {
343     std::list<std::unique_ptr<C2Work>> ret;
344     ret.push_back(std::move(work));
345     return ret;
346 }
347 
348 }  // namespace
349 
finish(uint64_t frameIndex,std::function<void (const std::unique_ptr<C2Work> &)> fillWork)350 void SimpleC2Component::finish(
351         uint64_t frameIndex, std::function<void(const std::unique_ptr<C2Work> &)> fillWork) {
352     std::unique_ptr<C2Work> work;
353     {
354         Mutexed<PendingWork>::Locked pending(mPendingWork);
355         if (pending->count(frameIndex) == 0) {
356             ALOGW("unknown frame index: %" PRIu64, frameIndex);
357             return;
358         }
359         work = std::move(pending->at(frameIndex));
360         pending->erase(frameIndex);
361     }
362     if (work) {
363         fillWork(work);
364         std::shared_ptr<C2Component::Listener> listener = mExecState.lock()->mListener;
365         listener->onWorkDone_nb(shared_from_this(), vec(work));
366         ALOGV("returning pending work");
367     }
368 }
369 
cloneAndSend(uint64_t frameIndex,const std::unique_ptr<C2Work> & currentWork,std::function<void (const std::unique_ptr<C2Work> &)> fillWork)370 void SimpleC2Component::cloneAndSend(
371         uint64_t frameIndex,
372         const std::unique_ptr<C2Work> &currentWork,
373         std::function<void(const std::unique_ptr<C2Work> &)> fillWork) {
374     std::unique_ptr<C2Work> work(new C2Work);
375     if (currentWork->input.ordinal.frameIndex == frameIndex) {
376         work->input.flags = currentWork->input.flags;
377         work->input.ordinal = currentWork->input.ordinal;
378     } else {
379         Mutexed<PendingWork>::Locked pending(mPendingWork);
380         if (pending->count(frameIndex) == 0) {
381             ALOGW("unknown frame index: %" PRIu64, frameIndex);
382             return;
383         }
384         work->input.flags = pending->at(frameIndex)->input.flags;
385         work->input.ordinal = pending->at(frameIndex)->input.ordinal;
386     }
387     work->worklets.emplace_back(new C2Worklet);
388     if (work) {
389         fillWork(work);
390         std::shared_ptr<C2Component::Listener> listener = mExecState.lock()->mListener;
391         listener->onWorkDone_nb(shared_from_this(), vec(work));
392         ALOGV("cloned and sending work");
393     }
394 }
395 
processQueue()396 bool SimpleC2Component::processQueue() {
397     std::unique_ptr<C2Work> work;
398     uint64_t generation;
399     int32_t drainMode;
400     bool isFlushPending = false;
401     bool hasQueuedWork = false;
402     {
403         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
404         if (queue->empty()) {
405             return false;
406         }
407 
408         generation = queue->generation();
409         drainMode = queue->drainMode();
410         isFlushPending = queue->popPendingFlush();
411         work = queue->pop_front();
412         hasQueuedWork = !queue->empty();
413     }
414     if (isFlushPending) {
415         ALOGV("processing pending flush");
416         c2_status_t err = onFlush_sm();
417         if (err != C2_OK) {
418             ALOGD("flush err: %d", err);
419             // TODO: error
420         }
421     }
422 
423     if (!mOutputBlockPool) {
424         c2_status_t err = [this] {
425             // TODO: don't use query_vb
426             C2StreamFormatConfig::output outputFormat(0u);
427             std::vector<std::unique_ptr<C2Param>> params;
428             c2_status_t err = intf()->query_vb(
429                     { &outputFormat },
430                     { C2PortBlockPoolsTuning::output::PARAM_TYPE },
431                     C2_DONT_BLOCK,
432                     &params);
433             if (err != C2_OK && err != C2_BAD_INDEX) {
434                 ALOGD("query err = %d", err);
435                 return err;
436             }
437             C2BlockPool::local_id_t poolId =
438                 outputFormat.value == C2FormatVideo
439                         ? C2BlockPool::BASIC_GRAPHIC
440                         : C2BlockPool::BASIC_LINEAR;
441             if (params.size()) {
442                 C2PortBlockPoolsTuning::output *outputPools =
443                     C2PortBlockPoolsTuning::output::From(params[0].get());
444                 if (outputPools && outputPools->flexCount() >= 1) {
445                     poolId = outputPools->m.values[0];
446                 }
447             }
448 
449             err = GetCodec2BlockPool(poolId, shared_from_this(), &mOutputBlockPool);
450             ALOGD("Using output block pool with poolID %llu => got %llu - %d",
451                     (unsigned long long)poolId,
452                     (unsigned long long)(
453                             mOutputBlockPool ? mOutputBlockPool->getLocalId() : 111000111),
454                     err);
455             return err;
456         }();
457         if (err != C2_OK) {
458             Mutexed<ExecState>::Locked state(mExecState);
459             std::shared_ptr<C2Component::Listener> listener = state->mListener;
460             state.unlock();
461             listener->onError_nb(shared_from_this(), err);
462             return hasQueuedWork;
463         }
464     }
465 
466     if (!work) {
467         c2_status_t err = drain(drainMode, mOutputBlockPool);
468         if (err != C2_OK) {
469             Mutexed<ExecState>::Locked state(mExecState);
470             std::shared_ptr<C2Component::Listener> listener = state->mListener;
471             state.unlock();
472             listener->onError_nb(shared_from_this(), err);
473         }
474         return hasQueuedWork;
475     }
476 
477     {
478         std::vector<C2Param *> updates;
479         for (const std::unique_ptr<C2Param> &param: work->input.configUpdate) {
480             if (param) {
481                 updates.emplace_back(param.get());
482             }
483         }
484         if (!updates.empty()) {
485             std::vector<std::unique_ptr<C2SettingResult>> failures;
486             c2_status_t err = intf()->config_vb(updates, C2_MAY_BLOCK, &failures);
487             ALOGD("applied %zu configUpdates => %s (%d)", updates.size(), asString(err), err);
488         }
489     }
490 
491     ALOGV("start processing frame #%" PRIu64, work->input.ordinal.frameIndex.peeku());
492     // If input buffer list is not empty, it means we have some input to process on.
493     // However, input could be a null buffer. In such case, clear the buffer list
494     // before making call to process().
495     if (!work->input.buffers.empty() && !work->input.buffers[0]) {
496         ALOGD("Encountered null input buffer. Clearing the input buffer");
497         work->input.buffers.clear();
498     }
499     process(work, mOutputBlockPool);
500     ALOGV("processed frame #%" PRIu64, work->input.ordinal.frameIndex.peeku());
501     {
502         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
503         if (queue->generation() != generation) {
504             ALOGD("work form old generation: was %" PRIu64 " now %" PRIu64,
505                     queue->generation(), generation);
506             work->result = C2_NOT_FOUND;
507             queue.unlock();
508             {
509                 Mutexed<ExecState>::Locked state(mExecState);
510                 std::shared_ptr<C2Component::Listener> listener = state->mListener;
511                 state.unlock();
512                 listener->onWorkDone_nb(shared_from_this(), vec(work));
513             }
514             queue.lock();
515             return hasQueuedWork;
516         }
517     }
518     if (work->workletsProcessed != 0u) {
519         Mutexed<ExecState>::Locked state(mExecState);
520         ALOGV("returning this work");
521         std::shared_ptr<C2Component::Listener> listener = state->mListener;
522         state.unlock();
523         listener->onWorkDone_nb(shared_from_this(), vec(work));
524     } else {
525         ALOGV("queue pending work");
526         work->input.buffers.clear();
527         std::unique_ptr<C2Work> unexpected;
528         {
529             Mutexed<PendingWork>::Locked pending(mPendingWork);
530             uint64_t frameIndex = work->input.ordinal.frameIndex.peeku();
531             if (pending->count(frameIndex) != 0) {
532                 unexpected = std::move(pending->at(frameIndex));
533                 pending->erase(frameIndex);
534             }
535             (void)pending->insert({ frameIndex, std::move(work) });
536         }
537         if (unexpected) {
538             ALOGD("unexpected pending work");
539             unexpected->result = C2_CORRUPTED;
540             Mutexed<ExecState>::Locked state(mExecState);
541             std::shared_ptr<C2Component::Listener> listener = state->mListener;
542             state.unlock();
543             listener->onWorkDone_nb(shared_from_this(), vec(unexpected));
544         }
545     }
546     return hasQueuedWork;
547 }
548 
createLinearBuffer(const std::shared_ptr<C2LinearBlock> & block)549 std::shared_ptr<C2Buffer> SimpleC2Component::createLinearBuffer(
550         const std::shared_ptr<C2LinearBlock> &block) {
551     return createLinearBuffer(block, block->offset(), block->size());
552 }
553 
createLinearBuffer(const std::shared_ptr<C2LinearBlock> & block,size_t offset,size_t size)554 std::shared_ptr<C2Buffer> SimpleC2Component::createLinearBuffer(
555         const std::shared_ptr<C2LinearBlock> &block, size_t offset, size_t size) {
556     return C2Buffer::CreateLinearBuffer(block->share(offset, size, ::C2Fence()));
557 }
558 
createGraphicBuffer(const std::shared_ptr<C2GraphicBlock> & block)559 std::shared_ptr<C2Buffer> SimpleC2Component::createGraphicBuffer(
560         const std::shared_ptr<C2GraphicBlock> &block) {
561     return createGraphicBuffer(block, C2Rect(block->width(), block->height()));
562 }
563 
createGraphicBuffer(const std::shared_ptr<C2GraphicBlock> & block,const C2Rect & crop)564 std::shared_ptr<C2Buffer> SimpleC2Component::createGraphicBuffer(
565         const std::shared_ptr<C2GraphicBlock> &block, const C2Rect &crop) {
566     return C2Buffer::CreateGraphicBuffer(block->share(crop, ::C2Fence()));
567 }
568 
569 } // namespace android
570