1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "SimpleC2Component"
19 #include <log/log.h>
20 
21 #include <cutils/properties.h>
22 #include <media/stagefright/foundation/AMessage.h>
23 
24 #include <inttypes.h>
25 
26 #include <C2Config.h>
27 #include <C2Debug.h>
28 #include <C2PlatformSupport.h>
29 #include <SimpleC2Component.h>
30 
31 namespace android {
32 
pop_front()33 std::unique_ptr<C2Work> SimpleC2Component::WorkQueue::pop_front() {
34     std::unique_ptr<C2Work> work = std::move(mQueue.front().work);
35     mQueue.pop_front();
36     return work;
37 }
38 
push_back(std::unique_ptr<C2Work> work)39 void SimpleC2Component::WorkQueue::push_back(std::unique_ptr<C2Work> work) {
40     mQueue.push_back({ std::move(work), NO_DRAIN });
41 }
42 
empty() const43 bool SimpleC2Component::WorkQueue::empty() const {
44     return mQueue.empty();
45 }
46 
clear()47 void SimpleC2Component::WorkQueue::clear() {
48     mQueue.clear();
49 }
50 
drainMode() const51 uint32_t SimpleC2Component::WorkQueue::drainMode() const {
52     return mQueue.front().drainMode;
53 }
54 
markDrain(uint32_t drainMode)55 void SimpleC2Component::WorkQueue::markDrain(uint32_t drainMode) {
56     mQueue.push_back({ nullptr, drainMode });
57 }
58 
59 ////////////////////////////////////////////////////////////////////////////////
60 
WorkHandler()61 SimpleC2Component::WorkHandler::WorkHandler() : mRunning(false) {}
62 
setComponent(const std::shared_ptr<SimpleC2Component> & thiz)63 void SimpleC2Component::WorkHandler::setComponent(
64         const std::shared_ptr<SimpleC2Component> &thiz) {
65     mThiz = thiz;
66 }
67 
Reply(const sp<AMessage> & msg,int32_t * err=nullptr)68 static void Reply(const sp<AMessage> &msg, int32_t *err = nullptr) {
69     sp<AReplyToken> replyId;
70     CHECK(msg->senderAwaitsResponse(&replyId));
71     sp<AMessage> reply = new AMessage;
72     if (err) {
73         reply->setInt32("err", *err);
74     }
75     reply->postReply(replyId);
76 }
77 
onMessageReceived(const sp<AMessage> & msg)78 void SimpleC2Component::WorkHandler::onMessageReceived(const sp<AMessage> &msg) {
79     std::shared_ptr<SimpleC2Component> thiz = mThiz.lock();
80     if (!thiz) {
81         ALOGD("component not yet set; msg = %s", msg->debugString().c_str());
82         sp<AReplyToken> replyId;
83         if (msg->senderAwaitsResponse(&replyId)) {
84             sp<AMessage> reply = new AMessage;
85             reply->setInt32("err", C2_CORRUPTED);
86             reply->postReply(replyId);
87         }
88         return;
89     }
90 
91     switch (msg->what()) {
92         case kWhatProcess: {
93             if (mRunning) {
94                 if (thiz->processQueue()) {
95                     (new AMessage(kWhatProcess, this))->post();
96                 }
97             } else {
98                 ALOGV("Ignore process message as we're not running");
99             }
100             break;
101         }
102         case kWhatInit: {
103             int32_t err = thiz->onInit();
104             Reply(msg, &err);
105             [[fallthrough]];
106         }
107         case kWhatStart: {
108             mRunning = true;
109             break;
110         }
111         case kWhatStop: {
112             int32_t err = thiz->onStop();
113             Reply(msg, &err);
114             break;
115         }
116         case kWhatReset: {
117             thiz->onReset();
118             mRunning = false;
119             Reply(msg);
120             break;
121         }
122         case kWhatRelease: {
123             thiz->onRelease();
124             mRunning = false;
125             Reply(msg);
126             break;
127         }
128         default: {
129             ALOGD("Unrecognized msg: %d", msg->what());
130             break;
131         }
132     }
133 }
134 
135 class SimpleC2Component::BlockingBlockPool : public C2BlockPool {
136 public:
BlockingBlockPool(const std::shared_ptr<C2BlockPool> & base)137     BlockingBlockPool(const std::shared_ptr<C2BlockPool>& base): mBase{base} {}
138 
getLocalId() const139     virtual local_id_t getLocalId() const override {
140         return mBase->getLocalId();
141     }
142 
getAllocatorId() const143     virtual C2Allocator::id_t getAllocatorId() const override {
144         return mBase->getAllocatorId();
145     }
146 
fetchLinearBlock(uint32_t capacity,C2MemoryUsage usage,std::shared_ptr<C2LinearBlock> * block)147     virtual c2_status_t fetchLinearBlock(
148             uint32_t capacity,
149             C2MemoryUsage usage,
150             std::shared_ptr<C2LinearBlock>* block) {
151         c2_status_t status;
152         do {
153             status = mBase->fetchLinearBlock(capacity, usage, block);
154         } while (status == C2_BLOCKING);
155         return status;
156     }
157 
fetchCircularBlock(uint32_t capacity,C2MemoryUsage usage,std::shared_ptr<C2CircularBlock> * block)158     virtual c2_status_t fetchCircularBlock(
159             uint32_t capacity,
160             C2MemoryUsage usage,
161             std::shared_ptr<C2CircularBlock>* block) {
162         c2_status_t status;
163         do {
164             status = mBase->fetchCircularBlock(capacity, usage, block);
165         } while (status == C2_BLOCKING);
166         return status;
167     }
168 
fetchGraphicBlock(uint32_t width,uint32_t height,uint32_t format,C2MemoryUsage usage,std::shared_ptr<C2GraphicBlock> * block)169     virtual c2_status_t fetchGraphicBlock(
170             uint32_t width, uint32_t height, uint32_t format,
171             C2MemoryUsage usage,
172             std::shared_ptr<C2GraphicBlock>* block) {
173         c2_status_t status;
174         do {
175             status = mBase->fetchGraphicBlock(width, height, format, usage,
176                                               block);
177         } while (status == C2_BLOCKING);
178         return status;
179     }
180 
181 private:
182     std::shared_ptr<C2BlockPool> mBase;
183 };
184 
185 ////////////////////////////////////////////////////////////////////////////////
186 
187 namespace {
188 
189 struct DummyReadView : public C2ReadView {
DummyReadViewandroid::__anon4064c6a50111::DummyReadView190     DummyReadView() : C2ReadView(C2_NO_INIT) {}
191 };
192 
193 }  // namespace
194 
SimpleC2Component(const std::shared_ptr<C2ComponentInterface> & intf)195 SimpleC2Component::SimpleC2Component(
196         const std::shared_ptr<C2ComponentInterface> &intf)
197     : mDummyReadView(DummyReadView()),
198       mIntf(intf),
199       mLooper(new ALooper),
200       mHandler(new WorkHandler) {
201     mLooper->setName(intf->getName().c_str());
202     (void)mLooper->registerHandler(mHandler);
203     mLooper->start(false, false, ANDROID_PRIORITY_VIDEO);
204 }
205 
~SimpleC2Component()206 SimpleC2Component::~SimpleC2Component() {
207     mLooper->unregisterHandler(mHandler->id());
208     (void)mLooper->stop();
209 }
210 
setListener_vb(const std::shared_ptr<C2Component::Listener> & listener,c2_blocking_t mayBlock)211 c2_status_t SimpleC2Component::setListener_vb(
212         const std::shared_ptr<C2Component::Listener> &listener, c2_blocking_t mayBlock) {
213     mHandler->setComponent(shared_from_this());
214 
215     Mutexed<ExecState>::Locked state(mExecState);
216     if (state->mState == RUNNING) {
217         if (listener) {
218             return C2_BAD_STATE;
219         } else if (!mayBlock) {
220             return C2_BLOCKING;
221         }
222     }
223     state->mListener = listener;
224     // TODO: wait for listener change to have taken place before returning
225     // (e.g. if there is an ongoing listener callback)
226     return C2_OK;
227 }
228 
queue_nb(std::list<std::unique_ptr<C2Work>> * const items)229 c2_status_t SimpleC2Component::queue_nb(std::list<std::unique_ptr<C2Work>> * const items) {
230     {
231         Mutexed<ExecState>::Locked state(mExecState);
232         if (state->mState != RUNNING) {
233             return C2_BAD_STATE;
234         }
235     }
236     bool queueWasEmpty = false;
237     {
238         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
239         queueWasEmpty = queue->empty();
240         while (!items->empty()) {
241             queue->push_back(std::move(items->front()));
242             items->pop_front();
243         }
244     }
245     if (queueWasEmpty) {
246         (new AMessage(WorkHandler::kWhatProcess, mHandler))->post();
247     }
248     return C2_OK;
249 }
250 
announce_nb(const std::vector<C2WorkOutline> & items)251 c2_status_t SimpleC2Component::announce_nb(const std::vector<C2WorkOutline> &items) {
252     (void)items;
253     return C2_OMITTED;
254 }
255 
flush_sm(flush_mode_t flushMode,std::list<std::unique_ptr<C2Work>> * const flushedWork)256 c2_status_t SimpleC2Component::flush_sm(
257         flush_mode_t flushMode, std::list<std::unique_ptr<C2Work>>* const flushedWork) {
258     (void)flushMode;
259     {
260         Mutexed<ExecState>::Locked state(mExecState);
261         if (state->mState != RUNNING) {
262             return C2_BAD_STATE;
263         }
264     }
265     {
266         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
267         queue->incGeneration();
268         // TODO: queue->splicedBy(flushedWork, flushedWork->end());
269         while (!queue->empty()) {
270             std::unique_ptr<C2Work> work = queue->pop_front();
271             if (work) {
272                 flushedWork->push_back(std::move(work));
273             }
274         }
275         while (!queue->pending().empty()) {
276             flushedWork->push_back(std::move(queue->pending().begin()->second));
277             queue->pending().erase(queue->pending().begin());
278         }
279     }
280 
281     return C2_OK;
282 }
283 
drain_nb(drain_mode_t drainMode)284 c2_status_t SimpleC2Component::drain_nb(drain_mode_t drainMode) {
285     if (drainMode == DRAIN_CHAIN) {
286         return C2_OMITTED;
287     }
288     {
289         Mutexed<ExecState>::Locked state(mExecState);
290         if (state->mState != RUNNING) {
291             return C2_BAD_STATE;
292         }
293     }
294     bool queueWasEmpty = false;
295     {
296         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
297         queueWasEmpty = queue->empty();
298         queue->markDrain(drainMode);
299     }
300     if (queueWasEmpty) {
301         (new AMessage(WorkHandler::kWhatProcess, mHandler))->post();
302     }
303 
304     return C2_OK;
305 }
306 
start()307 c2_status_t SimpleC2Component::start() {
308     Mutexed<ExecState>::Locked state(mExecState);
309     if (state->mState == RUNNING) {
310         return C2_BAD_STATE;
311     }
312     bool needsInit = (state->mState == UNINITIALIZED);
313     state.unlock();
314     if (needsInit) {
315         sp<AMessage> reply;
316         (new AMessage(WorkHandler::kWhatInit, mHandler))->postAndAwaitResponse(&reply);
317         int32_t err;
318         CHECK(reply->findInt32("err", &err));
319         if (err != C2_OK) {
320             return (c2_status_t)err;
321         }
322     } else {
323         (new AMessage(WorkHandler::kWhatStart, mHandler))->post();
324     }
325     state.lock();
326     state->mState = RUNNING;
327     return C2_OK;
328 }
329 
stop()330 c2_status_t SimpleC2Component::stop() {
331     ALOGV("stop");
332     {
333         Mutexed<ExecState>::Locked state(mExecState);
334         if (state->mState != RUNNING) {
335             return C2_BAD_STATE;
336         }
337         state->mState = STOPPED;
338     }
339     {
340         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
341         queue->clear();
342         queue->pending().clear();
343     }
344     sp<AMessage> reply;
345     (new AMessage(WorkHandler::kWhatStop, mHandler))->postAndAwaitResponse(&reply);
346     int32_t err;
347     CHECK(reply->findInt32("err", &err));
348     if (err != C2_OK) {
349         return (c2_status_t)err;
350     }
351     return C2_OK;
352 }
353 
reset()354 c2_status_t SimpleC2Component::reset() {
355     ALOGV("reset");
356     {
357         Mutexed<ExecState>::Locked state(mExecState);
358         state->mState = UNINITIALIZED;
359     }
360     {
361         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
362         queue->clear();
363         queue->pending().clear();
364     }
365     sp<AMessage> reply;
366     (new AMessage(WorkHandler::kWhatReset, mHandler))->postAndAwaitResponse(&reply);
367     return C2_OK;
368 }
369 
release()370 c2_status_t SimpleC2Component::release() {
371     ALOGV("release");
372     sp<AMessage> reply;
373     (new AMessage(WorkHandler::kWhatRelease, mHandler))->postAndAwaitResponse(&reply);
374     return C2_OK;
375 }
376 
intf()377 std::shared_ptr<C2ComponentInterface> SimpleC2Component::intf() {
378     return mIntf;
379 }
380 
381 namespace {
382 
vec(std::unique_ptr<C2Work> & work)383 std::list<std::unique_ptr<C2Work>> vec(std::unique_ptr<C2Work> &work) {
384     std::list<std::unique_ptr<C2Work>> ret;
385     ret.push_back(std::move(work));
386     return ret;
387 }
388 
389 }  // namespace
390 
finish(uint64_t frameIndex,std::function<void (const std::unique_ptr<C2Work> &)> fillWork)391 void SimpleC2Component::finish(
392         uint64_t frameIndex, std::function<void(const std::unique_ptr<C2Work> &)> fillWork) {
393     std::unique_ptr<C2Work> work;
394     {
395         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
396         if (queue->pending().count(frameIndex) == 0) {
397             ALOGW("unknown frame index: %" PRIu64, frameIndex);
398             return;
399         }
400         work = std::move(queue->pending().at(frameIndex));
401         queue->pending().erase(frameIndex);
402     }
403     if (work) {
404         fillWork(work);
405         std::shared_ptr<C2Component::Listener> listener = mExecState.lock()->mListener;
406         listener->onWorkDone_nb(shared_from_this(), vec(work));
407         ALOGV("returning pending work");
408     }
409 }
410 
cloneAndSend(uint64_t frameIndex,const std::unique_ptr<C2Work> & currentWork,std::function<void (const std::unique_ptr<C2Work> &)> fillWork)411 void SimpleC2Component::cloneAndSend(
412         uint64_t frameIndex,
413         const std::unique_ptr<C2Work> &currentWork,
414         std::function<void(const std::unique_ptr<C2Work> &)> fillWork) {
415     std::unique_ptr<C2Work> work(new C2Work);
416     if (currentWork->input.ordinal.frameIndex == frameIndex) {
417         work->input.flags = currentWork->input.flags;
418         work->input.ordinal = currentWork->input.ordinal;
419     } else {
420         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
421         if (queue->pending().count(frameIndex) == 0) {
422             ALOGW("unknown frame index: %" PRIu64, frameIndex);
423             return;
424         }
425         work->input.flags = queue->pending().at(frameIndex)->input.flags;
426         work->input.ordinal = queue->pending().at(frameIndex)->input.ordinal;
427     }
428     work->worklets.emplace_back(new C2Worklet);
429     if (work) {
430         fillWork(work);
431         std::shared_ptr<C2Component::Listener> listener = mExecState.lock()->mListener;
432         listener->onWorkDone_nb(shared_from_this(), vec(work));
433         ALOGV("cloned and sending work");
434     }
435 }
436 
processQueue()437 bool SimpleC2Component::processQueue() {
438     std::unique_ptr<C2Work> work;
439     uint64_t generation;
440     int32_t drainMode;
441     bool isFlushPending = false;
442     bool hasQueuedWork = false;
443     {
444         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
445         if (queue->empty()) {
446             return false;
447         }
448 
449         generation = queue->generation();
450         drainMode = queue->drainMode();
451         isFlushPending = queue->popPendingFlush();
452         work = queue->pop_front();
453         hasQueuedWork = !queue->empty();
454     }
455     if (isFlushPending) {
456         ALOGV("processing pending flush");
457         c2_status_t err = onFlush_sm();
458         if (err != C2_OK) {
459             ALOGD("flush err: %d", err);
460             // TODO: error
461         }
462     }
463 
464     if (!mOutputBlockPool) {
465         c2_status_t err = [this] {
466             // TODO: don't use query_vb
467             C2StreamBufferTypeSetting::output outputFormat(0u);
468             std::vector<std::unique_ptr<C2Param>> params;
469             c2_status_t err = intf()->query_vb(
470                     { &outputFormat },
471                     { C2PortBlockPoolsTuning::output::PARAM_TYPE },
472                     C2_DONT_BLOCK,
473                     &params);
474             if (err != C2_OK && err != C2_BAD_INDEX) {
475                 ALOGD("query err = %d", err);
476                 return err;
477             }
478             C2BlockPool::local_id_t poolId =
479                 outputFormat.value == C2BufferData::GRAPHIC
480                         ? C2BlockPool::BASIC_GRAPHIC
481                         : C2BlockPool::BASIC_LINEAR;
482             if (params.size()) {
483                 C2PortBlockPoolsTuning::output *outputPools =
484                     C2PortBlockPoolsTuning::output::From(params[0].get());
485                 if (outputPools && outputPools->flexCount() >= 1) {
486                     poolId = outputPools->m.values[0];
487                 }
488             }
489 
490             std::shared_ptr<C2BlockPool> blockPool;
491             err = GetCodec2BlockPool(poolId, shared_from_this(), &blockPool);
492             ALOGD("Using output block pool with poolID %llu => got %llu - %d",
493                     (unsigned long long)poolId,
494                     (unsigned long long)(
495                             blockPool ? blockPool->getLocalId() : 111000111),
496                     err);
497             if (err == C2_OK) {
498                 mOutputBlockPool = std::make_shared<BlockingBlockPool>(blockPool);
499             }
500             return err;
501         }();
502         if (err != C2_OK) {
503             Mutexed<ExecState>::Locked state(mExecState);
504             std::shared_ptr<C2Component::Listener> listener = state->mListener;
505             state.unlock();
506             listener->onError_nb(shared_from_this(), err);
507             return hasQueuedWork;
508         }
509     }
510 
511     if (!work) {
512         c2_status_t err = drain(drainMode, mOutputBlockPool);
513         if (err != C2_OK) {
514             Mutexed<ExecState>::Locked state(mExecState);
515             std::shared_ptr<C2Component::Listener> listener = state->mListener;
516             state.unlock();
517             listener->onError_nb(shared_from_this(), err);
518         }
519         return hasQueuedWork;
520     }
521 
522     {
523         std::vector<C2Param *> updates;
524         for (const std::unique_ptr<C2Param> &param: work->input.configUpdate) {
525             if (param) {
526                 updates.emplace_back(param.get());
527             }
528         }
529         if (!updates.empty()) {
530             std::vector<std::unique_ptr<C2SettingResult>> failures;
531             c2_status_t err = intf()->config_vb(updates, C2_MAY_BLOCK, &failures);
532             ALOGD("applied %zu configUpdates => %s (%d)", updates.size(), asString(err), err);
533         }
534     }
535 
536     ALOGV("start processing frame #%" PRIu64, work->input.ordinal.frameIndex.peeku());
537     // If input buffer list is not empty, it means we have some input to process on.
538     // However, input could be a null buffer. In such case, clear the buffer list
539     // before making call to process().
540     if (!work->input.buffers.empty() && !work->input.buffers[0]) {
541         ALOGD("Encountered null input buffer. Clearing the input buffer");
542         work->input.buffers.clear();
543     }
544     process(work, mOutputBlockPool);
545     ALOGV("processed frame #%" PRIu64, work->input.ordinal.frameIndex.peeku());
546     Mutexed<WorkQueue>::Locked queue(mWorkQueue);
547     if (queue->generation() != generation) {
548         ALOGD("work form old generation: was %" PRIu64 " now %" PRIu64,
549                 queue->generation(), generation);
550         work->result = C2_NOT_FOUND;
551         queue.unlock();
552 
553         Mutexed<ExecState>::Locked state(mExecState);
554         std::shared_ptr<C2Component::Listener> listener = state->mListener;
555         state.unlock();
556         listener->onWorkDone_nb(shared_from_this(), vec(work));
557         return hasQueuedWork;
558     }
559     if (work->workletsProcessed != 0u) {
560         queue.unlock();
561         Mutexed<ExecState>::Locked state(mExecState);
562         ALOGV("returning this work");
563         std::shared_ptr<C2Component::Listener> listener = state->mListener;
564         state.unlock();
565         listener->onWorkDone_nb(shared_from_this(), vec(work));
566     } else {
567         ALOGV("queue pending work");
568         work->input.buffers.clear();
569         std::unique_ptr<C2Work> unexpected;
570 
571         uint64_t frameIndex = work->input.ordinal.frameIndex.peeku();
572         if (queue->pending().count(frameIndex) != 0) {
573             unexpected = std::move(queue->pending().at(frameIndex));
574             queue->pending().erase(frameIndex);
575         }
576         (void)queue->pending().insert({ frameIndex, std::move(work) });
577 
578         queue.unlock();
579         if (unexpected) {
580             ALOGD("unexpected pending work");
581             unexpected->result = C2_CORRUPTED;
582             Mutexed<ExecState>::Locked state(mExecState);
583             std::shared_ptr<C2Component::Listener> listener = state->mListener;
584             state.unlock();
585             listener->onWorkDone_nb(shared_from_this(), vec(unexpected));
586         }
587     }
588     return hasQueuedWork;
589 }
590 
createLinearBuffer(const std::shared_ptr<C2LinearBlock> & block)591 std::shared_ptr<C2Buffer> SimpleC2Component::createLinearBuffer(
592         const std::shared_ptr<C2LinearBlock> &block) {
593     return createLinearBuffer(block, block->offset(), block->size());
594 }
595 
createLinearBuffer(const std::shared_ptr<C2LinearBlock> & block,size_t offset,size_t size)596 std::shared_ptr<C2Buffer> SimpleC2Component::createLinearBuffer(
597         const std::shared_ptr<C2LinearBlock> &block, size_t offset, size_t size) {
598     return C2Buffer::CreateLinearBuffer(block->share(offset, size, ::C2Fence()));
599 }
600 
createGraphicBuffer(const std::shared_ptr<C2GraphicBlock> & block)601 std::shared_ptr<C2Buffer> SimpleC2Component::createGraphicBuffer(
602         const std::shared_ptr<C2GraphicBlock> &block) {
603     return createGraphicBuffer(block, C2Rect(block->width(), block->height()));
604 }
605 
createGraphicBuffer(const std::shared_ptr<C2GraphicBlock> & block,const C2Rect & crop)606 std::shared_ptr<C2Buffer> SimpleC2Component::createGraphicBuffer(
607         const std::shared_ptr<C2GraphicBlock> &block, const C2Rect &crop) {
608     return C2Buffer::CreateGraphicBuffer(block->share(crop, ::C2Fence()));
609 }
610 
611 } // namespace android
612