1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "Codec2-Component"
19 #include <android-base/logging.h>
20 
21 #include <C2PlatformSupport.h>
22 #include <codec2/hidl/1.0/Component.h>
23 #include <codec2/hidl/1.0/ComponentStore.h>
24 #include <codec2/hidl/1.0/types.h>
25 
26 #include <hidl/HidlBinderSupport.h>
27 #include <utils/Timers.h>
28 
29 #include <C2BqBufferPriv.h>
30 #include <C2Debug.h>
31 #include <C2PlatformSupport.h>
32 
33 #include <chrono>
34 #include <thread>
35 
36 namespace hardware {
37 namespace google {
38 namespace media {
39 namespace c2 {
40 namespace V1_0 {
41 namespace utils {
42 
43 using namespace ::android;
44 
45 namespace /* unnamed */ {
46 
47 // Implementation of ConfigurableC2Intf based on C2ComponentInterface
48 struct CompIntf : public ConfigurableC2Intf {
CompIntfhardware::google::media::c2::V1_0::utils::__anonac6205bf0111::CompIntf49     CompIntf(const std::shared_ptr<C2ComponentInterface>& intf) :
50         ConfigurableC2Intf(intf->getName()),
51         mIntf(intf) {
52     }
53 
confighardware::google::media::c2::V1_0::utils::__anonac6205bf0111::CompIntf54     virtual c2_status_t config(
55             const std::vector<C2Param*>& params,
56             c2_blocking_t mayBlock,
57             std::vector<std::unique_ptr<C2SettingResult>>* const failures
58             ) override {
59         ALOGV("config");
60         return mIntf->config_vb(params, mayBlock, failures);
61     }
62 
queryhardware::google::media::c2::V1_0::utils::__anonac6205bf0111::CompIntf63     virtual c2_status_t query(
64             const std::vector<C2Param::Index>& indices,
65             c2_blocking_t mayBlock,
66             std::vector<std::unique_ptr<C2Param>>* const params
67             ) const override {
68         ALOGV("query");
69         return mIntf->query_vb({}, indices, mayBlock, params);
70     }
71 
querySupportedParamshardware::google::media::c2::V1_0::utils::__anonac6205bf0111::CompIntf72     virtual c2_status_t querySupportedParams(
73             std::vector<std::shared_ptr<C2ParamDescriptor>>* const params
74             ) const override {
75         ALOGV("querySupportedParams");
76         return mIntf->querySupportedParams_nb(params);
77     }
78 
querySupportedValueshardware::google::media::c2::V1_0::utils::__anonac6205bf0111::CompIntf79     virtual c2_status_t querySupportedValues(
80             std::vector<C2FieldSupportedValuesQuery>& fields,
81             c2_blocking_t mayBlock) const override {
82         ALOGV("querySupportedValues");
83         return mIntf->querySupportedValues_vb(fields, mayBlock);
84     }
85 
86 protected:
87     std::shared_ptr<C2ComponentInterface> mIntf;
88 };
89 
90 } // unnamed namespace
91 
92 // InputBufferManager
93 // ==================
94 //
95 // InputBufferManager presents a way to track and untrack input buffers in this
96 // (codec) process and send a notification to a listener, possibly in a
97 // different process, when a tracked buffer no longer has any references in this
98 // process. (In fact, this class would work for listeners in the same process
99 // too, but the optimization discussed below will not be beneficial.)
100 //
101 // InputBufferManager holds a collection of records representing tracked buffers
102 // and their callback listeners. Conceptually, one record is a triple (listener,
103 // frameIndex, bufferIndex) where
104 //
105 // - (frameIndex, bufferIndex) is a pair of indices used to identify the buffer.
106 // - listener is of type IComponentListener. Its onFramesRendered() function
107 //   will be called after the associated buffer dies. The argument of
108 //   onFramesRendered() is a list of RenderedFrame objects, each of which has
109 //   the following members:
110 //
111 //     uint64_t bufferQueueId
112 //     int32_t  slotId
113 //     int64_t  timestampNs
114 //
115 // When a tracked buffer associated to the triple (listener, frameIndex,
116 // bufferIndex) goes out of scope, listener->onFramesRendered() will be called
117 // with a RenderedFrame object whose members are set as follows:
118 //
119 //     bufferQueueId = frameIndex
120 //     slotId        = ~bufferIndex
121 //     timestampNs   = systemTime() at the time of notification
122 //
123 // The reason for the bitwise negation of bufferIndex is that onFramesRendered()
124 // may be used for a different purpose when slotId is non-negative (which is a
125 // more general use case).
126 //
127 // IPC Optimization
128 // ----------------
129 //
130 // Since onFramesRendered() generally is an IPC call, InputBufferManager tries
131 // not to call it too often. There is a mechanism to guarantee that any two
132 // calls to the same listener are at least kNotificationPeriodNs nanoseconds
133 // apart.
134 //
135 struct InputBufferManager {
136     // The minimum time period between IPC calls to notify the client about the
137     // destruction of input buffers.
138     static constexpr nsecs_t kNotificationPeriodNs = 1000000;
139 
140     // Track all buffers in a C2FrameData object.
141     //
142     // input (C2FrameData) has the following two members that are of interest:
143     //
144     //   C2WorkOrdinal                ordinal
145     //   vector<shared_ptr<C2Buffer>> buffers
146     //
147     // Calling registerFrameData(listener, input) will register multiple
148     // triples (, frameIndex, bufferIndex) where frameIndex is equal to
149     // input.ordinal.frameIndex and bufferIndex runs through the indices of
150     // input.buffers such that input.buffers[bufferIndex] is not null.
151     //
152     // This should be called from queue().
153     static void registerFrameData(
154             const sp<IComponentListener>& listener,
155             const C2FrameData& input);
156 
157     // Untrack all buffers in a C2FrameData object.
158     //
159     // Calling unregisterFrameData(listener, input) will unregister and remove
160     // pending notifications for all triples (l, fi, bufferIndex) such that
161     // l = listener and fi = input.ordinal.frameIndex.
162     //
163     // This should be called from onWorkDone() and flush().
164     static void unregisterFrameData(
165             const wp<IComponentListener>& listener,
166             const C2FrameData& input);
167 
168     // Untrack all buffers associated to a given listener.
169     //
170     // Calling unregisterFrameData(listener) will unregister and remove
171     // pending notifications for all triples (l, frameIndex, bufferIndex) such
172     // that l = listener.
173     //
174     // This should be called when the component cleans up all input buffers,
175     // i.e., when reset(), release(), stop() or ~Component() is called.
176     static void unregisterFrameData(
177             const wp<IComponentListener>& listener);
178 
179 private:
180     void _registerFrameData(
181             const sp<IComponentListener>& listener,
182             const C2FrameData& input);
183     void _unregisterFrameData(
184             const wp<IComponentListener>& listener,
185             const C2FrameData& input);
186     void _unregisterFrameData(
187             const wp<IComponentListener>& listener);
188 
189     // The callback function tied to C2Buffer objects.
190     //
191     // Note: This function assumes that sInstance is the only instance of this
192     //       class.
193     static void onBufferDestroyed(const C2Buffer* buf, void* arg);
194     void _onBufferDestroyed(const C2Buffer* buf, void* arg);
195 
196     // Comparison operator for weak pointers.
197     struct CompareWeakComponentListener {
operator ()hardware::google::media::c2::V1_0::utils::InputBufferManager::CompareWeakComponentListener198         constexpr bool operator()(
199                 const wp<IComponentListener>& x,
200                 const wp<IComponentListener>& y) const {
201             return x.get_refs() < y.get_refs();
202         }
203     };
204 
205     // Persistent data to be passed as "arg" in onBufferDestroyed().
206     // This is essentially the triple (listener, frameIndex, bufferIndex) plus a
207     // weak pointer to the C2Buffer object.
208     //
209     // Note that the "key" is bufferIndex according to operator<(). This is
210     // designed to work with TrackedBuffersMap defined below.
211     struct TrackedBuffer {
212         wp<IComponentListener> listener;
213         uint64_t frameIndex;
214         size_t bufferIndex;
215         std::weak_ptr<C2Buffer> buffer;
TrackedBufferhardware::google::media::c2::V1_0::utils::InputBufferManager::TrackedBuffer216         TrackedBuffer(const wp<IComponentListener>& listener,
217                       uint64_t frameIndex,
218                       size_t bufferIndex,
219                       const std::shared_ptr<C2Buffer>& buffer)
220               : listener(listener),
221                 frameIndex(frameIndex),
222                 bufferIndex(bufferIndex),
223                 buffer(buffer) {}
224         TrackedBuffer(const TrackedBuffer&) = default;
operator <hardware::google::media::c2::V1_0::utils::InputBufferManager::TrackedBuffer225         bool operator<(const TrackedBuffer& other) const {
226             return bufferIndex < other.bufferIndex;
227         }
228     };
229 
230     // Map: listener -> frameIndex -> set<TrackedBuffer>.
231     // Essentially, this is used to store triples (listener, frameIndex,
232     // bufferIndex) that's searchable by listener and (listener, frameIndex).
233     // However, the value of the innermost map is TrackedBuffer, which also
234     // contains an extra copy of listener and frameIndex. This is needed
235     // because onBufferDestroyed() needs to know listener and frameIndex too.
236     typedef std::map<wp<IComponentListener>,
237                      std::map<uint64_t,
238                               std::set<TrackedBuffer>>,
239                      CompareWeakComponentListener> TrackedBuffersMap;
240 
241     // Storage for pending (unsent) death notifications for one listener.
242     // Each pair in member named "indices" are (frameIndex, bufferIndex) from
243     // the (listener, frameIndex, bufferIndex) triple.
244     struct DeathNotifications {
245 
246         // The number of pending notifications for this listener.
247         // count may be 0, in which case the DeathNotifications object will
248         // remain valid for only a small period (kNotificationPeriodNs
249         // nanoseconds).
250         size_t count;
251 
252         // The timestamp of the most recent callback on this listener. This is
253         // used to guarantee that callbacks do not occur too frequently, and
254         // also to trigger expiration of a DeathNotifications object that has
255         // count = 0.
256         nsecs_t lastSentNs;
257 
258         // Map: frameIndex -> vector of bufferIndices
259         // This is essentially a collection of (framdeIndex, bufferIndex).
260         std::map<uint64_t, std::vector<size_t>> indices;
261 
DeathNotificationshardware::google::media::c2::V1_0::utils::InputBufferManager::DeathNotifications262         DeathNotifications()
263               : count(0),
264                 lastSentNs(systemTime() - kNotificationPeriodNs),
265                 indices() {}
266     };
267 
268     // Mutex for the management of all input buffers.
269     std::mutex mMutex;
270 
271     // Tracked input buffers.
272     TrackedBuffersMap mTrackedBuffersMap;
273 
274     // Death notifications to be sent.
275     //
276     // A DeathNotifications object is associated to each listener. An entry in
277     // this map will be removed if its associated DeathNotifications has count =
278     // 0 and lastSentNs < systemTime() - kNotificationPeriodNs.
279     std::map<wp<IComponentListener>, DeathNotifications> mDeathNotifications;
280 
281     // Condition variable signaled when an entry is added to mDeathNotifications.
282     std::condition_variable mOnBufferDestroyed;
283 
284     // Notify the clients about buffer destructions.
285     // Return false if all destructions have been notified.
286     // Return true and set timeToRetry to the duration to wait for before
287     // retrying if some destructions have not been notified.
288     bool processNotifications(nsecs_t* timeToRetryNs);
289 
290     // Main function for the input buffer manager thread.
291     void main();
292 
293     // The thread that manages notifications.
294     //
295     // Note: This variable is declared last so its initialization will happen
296     // after all other member variables have been initialized.
297     std::thread mMainThread;
298 
299     // Private constructor.
300     InputBufferManager();
301 
302     // The only instance of this class.
303     static InputBufferManager& getInstance();
304 
305 };
306 
307 // ComponentInterface
ComponentInterface(const std::shared_ptr<C2ComponentInterface> & intf,const sp<ComponentStore> & store)308 ComponentInterface::ComponentInterface(
309         const std::shared_ptr<C2ComponentInterface>& intf,
310         const sp<ComponentStore>& store) :
311     Configurable(new CachedConfigurable(std::make_unique<CompIntf>(intf))),
312     mInterface(intf) {
313     mInit = init(store.get());
314 }
315 
status() const316 c2_status_t ComponentInterface::status() const {
317     return mInit;
318 }
319 
320 // ComponentListener wrapper
321 struct Component::Listener : public C2Component::Listener {
322 
Listenerhardware::google::media::c2::V1_0::utils::Component::Listener323     Listener(const sp<Component>& component) :
324         mComponent(component),
325         mListener(component->mListener) {
326     }
327 
onError_nbhardware::google::media::c2::V1_0::utils::Component::Listener328     virtual void onError_nb(
329             std::weak_ptr<C2Component> /* c2component */,
330             uint32_t errorCode) override {
331         ALOGV("onError");
332         sp<IComponentListener> listener = mListener.promote();
333         if (listener) {
334             Return<void> transStatus = listener->onError(Status::OK, errorCode);
335             if (!transStatus.isOk()) {
336                 ALOGE("onError -- transaction failed.");
337             }
338         }
339     }
340 
onTripped_nbhardware::google::media::c2::V1_0::utils::Component::Listener341     virtual void onTripped_nb(
342             std::weak_ptr<C2Component> /* c2component */,
343             std::vector<std::shared_ptr<C2SettingResult>> c2settingResult
344             ) override {
345         ALOGV("onTripped");
346         sp<IComponentListener> listener = mListener.promote();
347         if (listener) {
348             hidl_vec<SettingResult> settingResults(c2settingResult.size());
349             size_t ix = 0;
350             for (const std::shared_ptr<C2SettingResult> &c2result :
351                     c2settingResult) {
352                 if (c2result) {
353                     if (objcpy(&settingResults[ix++], *c2result) !=
354                             Status::OK) {
355                         break;
356                     }
357                 }
358             }
359             settingResults.resize(ix);
360             Return<void> transStatus = listener->onTripped(settingResults);
361             if (!transStatus.isOk()) {
362                 ALOGE("onTripped -- transaction failed.");
363             }
364         }
365     }
366 
onWorkDone_nbhardware::google::media::c2::V1_0::utils::Component::Listener367     virtual void onWorkDone_nb(
368             std::weak_ptr<C2Component> /* c2component */,
369             std::list<std::unique_ptr<C2Work>> c2workItems) override {
370         ALOGV("onWorkDone");
371         for (const std::unique_ptr<C2Work>& work : c2workItems) {
372             if (work) {
373                 if (work->worklets.empty()
374                         || !work->worklets.back()
375                         || (work->worklets.back()->output.flags &
376                             C2FrameData::FLAG_INCOMPLETE) == 0) {
377                     InputBufferManager::
378                             unregisterFrameData(mListener, work->input);
379                 }
380             }
381         }
382 
383         sp<IComponentListener> listener = mListener.promote();
384         if (listener) {
385             WorkBundle workBundle;
386 
387             sp<Component> strongComponent = mComponent.promote();
388             if (objcpy(&workBundle, c2workItems, strongComponent ?
389                     &strongComponent->mBufferPoolSender : nullptr)
390                     != Status::OK) {
391                 ALOGE("onWorkDone() received corrupted work items.");
392                 return;
393             }
394             Return<void> transStatus = listener->onWorkDone(workBundle);
395             if (!transStatus.isOk()) {
396                 ALOGE("onWorkDone -- transaction failed.");
397                 return;
398             }
399             yieldBufferQueueBlocks(c2workItems, true);
400         }
401     }
402 
403 protected:
404     wp<Component> mComponent;
405     wp<IComponentListener> mListener;
406 };
407 
408 // Component
Component(const std::shared_ptr<C2Component> & component,const sp<IComponentListener> & listener,const sp<ComponentStore> & store,const sp<::android::hardware::media::bufferpool::V1_0::IClientManager> & clientPoolManager)409 Component::Component(
410         const std::shared_ptr<C2Component>& component,
411         const sp<IComponentListener>& listener,
412         const sp<ComponentStore>& store,
413         const sp<::android::hardware::media::bufferpool::V1_0::
414         IClientManager>& clientPoolManager) :
415     Configurable(new CachedConfigurable(
416             std::make_unique<CompIntf>(component->intf()))),
417     mComponent(component),
418     mInterface(component->intf()),
419     mListener(listener),
420     mStore(store),
421     mBufferPoolSender(clientPoolManager) {
422     // Retrieve supported parameters from store
423     // TODO: We could cache this per component/interface type
424     mInit = init(store.get());
425 }
426 
status() const427 c2_status_t Component::status() const {
428     return mInit;
429 }
430 
431 // Methods from ::android::hardware::media::c2::V1_0::IComponent
queue(const WorkBundle & workBundle)432 Return<Status> Component::queue(const WorkBundle& workBundle) {
433     ALOGV("queue -- converting input");
434     std::list<std::unique_ptr<C2Work>> c2works;
435 
436     if (objcpy(&c2works, workBundle) != C2_OK) {
437         ALOGV("queue -- corrupted");
438         return Status::CORRUPTED;
439     }
440 
441     // Register input buffers.
442     for (const std::unique_ptr<C2Work>& work : c2works) {
443         if (work) {
444             InputBufferManager::
445                     registerFrameData(mListener, work->input);
446         }
447     }
448 
449     ALOGV("queue -- calling");
450     return static_cast<Status>(mComponent->queue_nb(&c2works));
451 }
452 
flush(flush_cb _hidl_cb)453 Return<void> Component::flush(flush_cb _hidl_cb) {
454     std::list<std::unique_ptr<C2Work>> c2flushedWorks;
455     ALOGV("flush -- calling");
456     c2_status_t c2res = mComponent->flush_sm(
457             C2Component::FLUSH_COMPONENT,
458             &c2flushedWorks);
459 
460     // Unregister input buffers.
461     for (const std::unique_ptr<C2Work>& work : c2flushedWorks) {
462         if (work) {
463             if (work->worklets.empty()
464                     || !work->worklets.back()
465                     || (work->worklets.back()->output.flags &
466                         C2FrameData::FLAG_INCOMPLETE) == 0) {
467                 InputBufferManager::
468                         unregisterFrameData(mListener, work->input);
469             }
470         }
471     }
472 
473     WorkBundle flushedWorkBundle;
474     Status res = static_cast<Status>(c2res);
475     if (c2res == C2_OK) {
476         ALOGV("flush -- converting output");
477         res = objcpy(&flushedWorkBundle, c2flushedWorks, &mBufferPoolSender);
478     }
479     _hidl_cb(res, flushedWorkBundle);
480     yieldBufferQueueBlocks(c2flushedWorks, true);
481     return Void();
482 }
483 
drain(bool withEos)484 Return<Status> Component::drain(bool withEos) {
485     ALOGV("drain");
486     return static_cast<Status>(mComponent->drain_nb(withEos ?
487             C2Component::DRAIN_COMPONENT_WITH_EOS :
488             C2Component::DRAIN_COMPONENT_NO_EOS));
489 }
490 
setOutputSurface(uint64_t blockPoolId,const sp<HGraphicBufferProducer> & surface)491 Return<Status> Component::setOutputSurface(
492         uint64_t blockPoolId,
493         const sp<HGraphicBufferProducer>& surface) {
494     std::shared_ptr<C2BlockPool> pool;
495     GetCodec2BlockPool(blockPoolId, mComponent, &pool);
496     if (pool && pool->getAllocatorId() == C2PlatformAllocatorStore::BUFFERQUEUE) {
497         std::shared_ptr<C2BufferQueueBlockPool> bqPool =
498                 std::static_pointer_cast<C2BufferQueueBlockPool>(pool);
499         C2BufferQueueBlockPool::OnRenderCallback cb =
500             [this](uint64_t producer, int32_t slot, int64_t nsecs) {
501                 // TODO: batch this
502                 hidl_vec<IComponentListener::RenderedFrame> rendered;
503                 rendered.resize(1);
504                 rendered[0] = { producer, slot, nsecs };
505                 (void)mListener->onFramesRendered(rendered).isOk();
506         };
507         if (bqPool) {
508             bqPool->setRenderCallback(cb);
509             bqPool->configureProducer(surface);
510         }
511     }
512     return Status::OK;
513 }
514 
connectToOmxInputSurface(const sp<HGraphicBufferProducer> & producer,const sp<::android::hardware::media::omx::V1_0::IGraphicBufferSource> & source)515 Return<Status> Component::connectToOmxInputSurface(
516         const sp<HGraphicBufferProducer>& producer,
517         const sp<::android::hardware::media::omx::V1_0::
518         IGraphicBufferSource>& source) {
519     // TODO implement
520     (void)producer;
521     (void)source;
522     return Status::OMITTED;
523 }
524 
disconnectFromInputSurface()525 Return<Status> Component::disconnectFromInputSurface() {
526     // TODO implement
527     return Status::OK;
528 }
529 
530 namespace /* unnamed */ {
531 
532 struct BlockPoolIntf : public ConfigurableC2Intf {
BlockPoolIntfhardware::google::media::c2::V1_0::utils::__anonac6205bf0311::BlockPoolIntf533     BlockPoolIntf(const std::shared_ptr<C2BlockPool>& pool) :
534         ConfigurableC2Intf("C2BlockPool:" +
535                            (pool ? std::to_string(pool->getLocalId()) :
536                            "null")),
537         mPool(pool) {
538     }
539 
confighardware::google::media::c2::V1_0::utils::__anonac6205bf0311::BlockPoolIntf540     virtual c2_status_t config(
541             const std::vector<C2Param*>& params,
542             c2_blocking_t mayBlock,
543             std::vector<std::unique_ptr<C2SettingResult>>* const failures
544             ) override {
545         (void)params;
546         (void)mayBlock;
547         (void)failures;
548         return C2_OK;
549     }
550 
queryhardware::google::media::c2::V1_0::utils::__anonac6205bf0311::BlockPoolIntf551     virtual c2_status_t query(
552             const std::vector<C2Param::Index>& indices,
553             c2_blocking_t mayBlock,
554             std::vector<std::unique_ptr<C2Param>>* const params
555             ) const override {
556         (void)indices;
557         (void)mayBlock;
558         (void)params;
559         return C2_OK;
560     }
561 
querySupportedParamshardware::google::media::c2::V1_0::utils::__anonac6205bf0311::BlockPoolIntf562     virtual c2_status_t querySupportedParams(
563             std::vector<std::shared_ptr<C2ParamDescriptor>>* const params
564             ) const override {
565         (void)params;
566         return C2_OK;
567     }
568 
querySupportedValueshardware::google::media::c2::V1_0::utils::__anonac6205bf0311::BlockPoolIntf569     virtual c2_status_t querySupportedValues(
570             std::vector<C2FieldSupportedValuesQuery>& fields,
571             c2_blocking_t mayBlock) const override {
572         (void)fields;
573         (void)mayBlock;
574         return C2_OK;
575     }
576 
577 protected:
578     std::shared_ptr<C2BlockPool> mPool;
579 };
580 
581 } // unnamed namespace
582 
createBlockPool(uint32_t allocatorId,createBlockPool_cb _hidl_cb)583 Return<void> Component::createBlockPool(
584         uint32_t allocatorId,
585         createBlockPool_cb _hidl_cb) {
586     std::shared_ptr<C2BlockPool> blockPool;
587     c2_status_t status = CreateCodec2BlockPool(
588             static_cast<C2PlatformAllocatorStore::id_t>(allocatorId),
589             mComponent,
590             &blockPool);
591     if (status != C2_OK) {
592         blockPool = nullptr;
593     }
594     if (blockPool) {
595         mBlockPoolsMutex.lock();
596         mBlockPools.emplace(blockPool->getLocalId(), blockPool);
597         mBlockPoolsMutex.unlock();
598     } else if (status == C2_OK) {
599         status = C2_CORRUPTED;
600     }
601 
602     _hidl_cb(static_cast<Status>(status),
603             blockPool ? blockPool->getLocalId() : 0,
604             new CachedConfigurable(
605             std::make_unique<BlockPoolIntf>(blockPool)));
606     return Void();
607 }
608 
destroyBlockPool(uint64_t blockPoolId)609 Return<Status> Component::destroyBlockPool(uint64_t blockPoolId) {
610     std::lock_guard<std::mutex> lock(mBlockPoolsMutex);
611     return mBlockPools.erase(blockPoolId) == 1 ?
612             Status::OK : Status::CORRUPTED;
613 }
614 
start()615 Return<Status> Component::start() {
616     ALOGV("start");
617     return static_cast<Status>(mComponent->start());
618 }
619 
stop()620 Return<Status> Component::stop() {
621     ALOGV("stop");
622     InputBufferManager::unregisterFrameData(mListener);
623     return static_cast<Status>(mComponent->stop());
624 }
625 
reset()626 Return<Status> Component::reset() {
627     ALOGV("reset");
628     Status status = static_cast<Status>(mComponent->reset());
629     {
630         std::lock_guard<std::mutex> lock(mBlockPoolsMutex);
631         mBlockPools.clear();
632     }
633     InputBufferManager::unregisterFrameData(mListener);
634     return status;
635 }
636 
release()637 Return<Status> Component::release() {
638     ALOGV("release");
639     Status status = static_cast<Status>(mComponent->release());
640     {
641         std::lock_guard<std::mutex> lock(mBlockPoolsMutex);
642         mBlockPools.clear();
643     }
644     InputBufferManager::unregisterFrameData(mListener);
645     return status;
646 }
647 
setLocalId(const Component::LocalId & localId)648 void Component::setLocalId(const Component::LocalId& localId) {
649     mLocalId = localId;
650 }
651 
initListener(const sp<Component> & self)652 void Component::initListener(const sp<Component>& self) {
653     std::shared_ptr<C2Component::Listener> c2listener =
654             std::make_shared<Listener>(self);
655     c2_status_t res = mComponent->setListener_vb(c2listener, C2_DONT_BLOCK);
656     if (res != C2_OK) {
657         mInit = res;
658     }
659 }
660 
~Component()661 Component::~Component() {
662     InputBufferManager::unregisterFrameData(mListener);
663     mStore->reportComponentDeath(mLocalId);
664 }
665 
InterfaceKey(const sp<IComponent> & component)666 Component::InterfaceKey::InterfaceKey(const sp<IComponent>& component) {
667     isRemote = component->isRemote();
668     if (isRemote) {
669         remote = ::android::hardware::toBinder(component);
670     } else {
671         local = component;
672     }
673 }
674 
675 // InputBufferManager implementation
676 
677 constexpr nsecs_t InputBufferManager::kNotificationPeriodNs;
678 
registerFrameData(const sp<IComponentListener> & listener,const C2FrameData & input)679 void InputBufferManager::registerFrameData(
680         const sp<IComponentListener>& listener,
681         const C2FrameData& input) {
682     getInstance()._registerFrameData(listener, input);
683 }
684 
unregisterFrameData(const wp<IComponentListener> & listener,const C2FrameData & input)685 void InputBufferManager::unregisterFrameData(
686         const wp<IComponentListener>& listener,
687         const C2FrameData& input) {
688     getInstance()._unregisterFrameData(listener, input);
689 }
690 
unregisterFrameData(const wp<IComponentListener> & listener)691 void InputBufferManager::unregisterFrameData(
692         const wp<IComponentListener>& listener) {
693     getInstance()._unregisterFrameData(listener);
694 }
695 
_registerFrameData(const sp<IComponentListener> & listener,const C2FrameData & input)696 void InputBufferManager::_registerFrameData(
697         const sp<IComponentListener>& listener,
698         const C2FrameData& input) {
699     uint64_t frameIndex = input.ordinal.frameIndex.peeku();
700     ALOGV("InputBufferManager::_registerFrameData called "
701           "(listener @ %p, frameIndex = %llu)",
702           listener.get(),
703           static_cast<long long unsigned>(frameIndex));
704     std::lock_guard<std::mutex> lock(mMutex);
705 
706     std::set<TrackedBuffer> &bufferIds =
707             mTrackedBuffersMap[listener][frameIndex];
708 
709     for (size_t i = 0; i < input.buffers.size(); ++i) {
710         if (!input.buffers[i]) {
711             ALOGV("InputBufferManager::_registerFrameData: "
712                   "Input buffer at index %zu is null", i);
713             continue;
714         }
715         const TrackedBuffer &bufferId =
716                 *bufferIds.emplace(listener, frameIndex, i, input.buffers[i]).
717                 first;
718 
719         c2_status_t status = input.buffers[i]->registerOnDestroyNotify(
720                 onBufferDestroyed,
721                 const_cast<void*>(reinterpret_cast<const void*>(&bufferId)));
722         if (status != C2_OK) {
723             ALOGD("InputBufferManager: registerOnDestroyNotify failed "
724                   "(listener @ %p, frameIndex = %llu, bufferIndex = %zu) "
725                   "=> %s (%d)",
726                   listener.get(),
727                   static_cast<unsigned long long>(frameIndex),
728                   i,
729                   asString(status), static_cast<int>(status));
730         }
731     }
732 
733     mDeathNotifications.emplace(listener, DeathNotifications());
734 }
735 
736 // Remove a pair (listener, frameIndex) from mTrackedBuffersMap and
737 // mDeathNotifications. This implies all bufferIndices are removed.
738 //
739 // This is called from onWorkDone() and flush().
_unregisterFrameData(const wp<IComponentListener> & listener,const C2FrameData & input)740 void InputBufferManager::_unregisterFrameData(
741         const wp<IComponentListener>& listener,
742         const C2FrameData& input) {
743     uint64_t frameIndex = input.ordinal.frameIndex.peeku();
744     ALOGV("InputBufferManager::_unregisterFrameData called "
745           "(listener @ %p, frameIndex = %llu)",
746           listener.unsafe_get(),
747           static_cast<long long unsigned>(frameIndex));
748     std::lock_guard<std::mutex> lock(mMutex);
749 
750     auto findListener = mTrackedBuffersMap.find(listener);
751     if (findListener != mTrackedBuffersMap.end()) {
752         std::map<uint64_t, std::set<TrackedBuffer>> &frameIndex2BufferIds
753                 = findListener->second;
754         auto findFrameIndex = frameIndex2BufferIds.find(frameIndex);
755         if (findFrameIndex != frameIndex2BufferIds.end()) {
756             std::set<TrackedBuffer> &bufferIds = findFrameIndex->second;
757             for (const TrackedBuffer& bufferId : bufferIds) {
758                 std::shared_ptr<C2Buffer> buffer = bufferId.buffer.lock();
759                 if (buffer) {
760                     c2_status_t status = buffer->unregisterOnDestroyNotify(
761                             onBufferDestroyed,
762                             const_cast<void*>(
763                             reinterpret_cast<const void*>(&bufferId)));
764                     if (status != C2_OK) {
765                         ALOGD("InputBufferManager: "
766                               "unregisterOnDestroyNotify failed "
767                               "(listener @ %p, "
768                               "frameIndex = %llu, "
769                               "bufferIndex = %zu) "
770                               "=> %s (%d)",
771                               bufferId.listener.unsafe_get(),
772                               static_cast<unsigned long long>(
773                                   bufferId.frameIndex),
774                               bufferId.bufferIndex,
775                               asString(status), static_cast<int>(status));
776                     }
777                 }
778             }
779 
780             frameIndex2BufferIds.erase(findFrameIndex);
781             if (frameIndex2BufferIds.empty()) {
782                 mTrackedBuffersMap.erase(findListener);
783             }
784         }
785     }
786 
787     auto findListenerD = mDeathNotifications.find(listener);
788     if (findListenerD != mDeathNotifications.end()) {
789         DeathNotifications &deathNotifications = findListenerD->second;
790         auto findFrameIndex = deathNotifications.indices.find(frameIndex);
791         if (findFrameIndex != deathNotifications.indices.end()) {
792             std::vector<size_t> &bufferIndices = findFrameIndex->second;
793             deathNotifications.count -= bufferIndices.size();
794             deathNotifications.indices.erase(findFrameIndex);
795         }
796     }
797 }
798 
799 // Remove listener from mTrackedBuffersMap and mDeathNotifications. This implies
800 // all frameIndices and bufferIndices are removed.
801 //
802 // This is called when the component cleans up all input buffers, i.e., when
803 // reset(), release(), stop() or ~Component() is called.
_unregisterFrameData(const wp<IComponentListener> & listener)804 void InputBufferManager::_unregisterFrameData(
805         const wp<IComponentListener>& listener) {
806     ALOGV("InputBufferManager::_unregisterFrameData called (listener @ %p)",
807             listener.unsafe_get());
808     std::lock_guard<std::mutex> lock(mMutex);
809 
810     auto findListener = mTrackedBuffersMap.find(listener);
811     if (findListener != mTrackedBuffersMap.end()) {
812         std::map<uint64_t, std::set<TrackedBuffer>> &frameIndex2BufferIds =
813                 findListener->second;
814         for (auto findFrameIndex = frameIndex2BufferIds.begin();
815                 findFrameIndex != frameIndex2BufferIds.end();
816                 ++findFrameIndex) {
817             std::set<TrackedBuffer> &bufferIds = findFrameIndex->second;
818             for (const TrackedBuffer& bufferId : bufferIds) {
819                 std::shared_ptr<C2Buffer> buffer = bufferId.buffer.lock();
820                 if (buffer) {
821                     c2_status_t status = buffer->unregisterOnDestroyNotify(
822                             onBufferDestroyed,
823                             const_cast<void*>(
824                             reinterpret_cast<const void*>(&bufferId)));
825                     if (status != C2_OK) {
826                         ALOGD("InputBufferManager: "
827                               "unregisterOnDestroyNotify failed "
828                               "(listener @ %p, "
829                               "frameIndex = %llu, "
830                               "bufferIndex = %zu) "
831                               "=> %s (%d)",
832                               bufferId.listener.unsafe_get(),
833                               static_cast<unsigned long long>(bufferId.frameIndex),
834                               bufferId.bufferIndex,
835                               asString(status), static_cast<int>(status));
836                     }
837                 }
838             }
839         }
840         mTrackedBuffersMap.erase(findListener);
841     }
842 
843     mDeathNotifications.erase(listener);
844 }
845 
846 // Move a buffer from mTrackedBuffersMap to mDeathNotifications.
847 // This is called when a registered C2Buffer object is destroyed.
onBufferDestroyed(const C2Buffer * buf,void * arg)848 void InputBufferManager::onBufferDestroyed(const C2Buffer* buf, void* arg) {
849     getInstance()._onBufferDestroyed(buf, arg);
850 }
851 
_onBufferDestroyed(const C2Buffer * buf,void * arg)852 void InputBufferManager::_onBufferDestroyed(const C2Buffer* buf, void* arg) {
853     if (!buf || !arg) {
854         ALOGW("InputBufferManager::_onBufferDestroyed called "
855               "with null argument(s) (buf @ %p, arg @ %p)",
856               buf, arg);
857         return;
858     }
859     TrackedBuffer id(*reinterpret_cast<TrackedBuffer*>(arg));
860     ALOGV("InputBufferManager::_onBufferDestroyed called "
861           "(listener @ %p, frameIndex = %llu, bufferIndex = %zu)",
862           id.listener.unsafe_get(),
863           static_cast<unsigned long long>(id.frameIndex),
864           id.bufferIndex);
865 
866     std::lock_guard<std::mutex> lock(mMutex);
867 
868     auto findListener = mTrackedBuffersMap.find(id.listener);
869     if (findListener == mTrackedBuffersMap.end()) {
870         ALOGD("InputBufferManager::_onBufferDestroyed received "
871               "invalid listener "
872               "(listener @ %p, frameIndex = %llu, bufferIndex = %zu)",
873               id.listener.unsafe_get(),
874               static_cast<unsigned long long>(id.frameIndex),
875               id.bufferIndex);
876         return;
877     }
878 
879     std::map<uint64_t, std::set<TrackedBuffer>> &frameIndex2BufferIds
880             = findListener->second;
881     auto findFrameIndex = frameIndex2BufferIds.find(id.frameIndex);
882     if (findFrameIndex == frameIndex2BufferIds.end()) {
883         ALOGD("InputBufferManager::_onBufferDestroyed received "
884               "invalid frame index "
885               "(listener @ %p, frameIndex = %llu, bufferIndex = %zu)",
886               id.listener.unsafe_get(),
887               static_cast<unsigned long long>(id.frameIndex),
888               id.bufferIndex);
889         return;
890     }
891 
892     std::set<TrackedBuffer> &bufferIds = findFrameIndex->second;
893     auto findBufferId = bufferIds.find(id);
894     if (findBufferId == bufferIds.end()) {
895         ALOGD("InputBufferManager::_onBufferDestroyed received "
896               "invalid buffer index: "
897               "(listener @ %p, frameIndex = %llu, bufferIndex = %zu)",
898               id.listener.unsafe_get(),
899               static_cast<unsigned long long>(id.frameIndex),
900               id.bufferIndex);
901     }
902 
903     bufferIds.erase(findBufferId);
904     if (bufferIds.empty()) {
905         frameIndex2BufferIds.erase(findFrameIndex);
906         if (frameIndex2BufferIds.empty()) {
907             mTrackedBuffersMap.erase(findListener);
908         }
909     }
910 
911     DeathNotifications &deathNotifications = mDeathNotifications[id.listener];
912     deathNotifications.indices[id.frameIndex].emplace_back(id.bufferIndex);
913     ++deathNotifications.count;
914     mOnBufferDestroyed.notify_one();
915 }
916 
917 // Notify the clients about buffer destructions.
918 // Return false if all destructions have been notified.
919 // Return true and set timeToRetry to the time point to wait for before
920 // retrying if some destructions have not been notified.
processNotifications(nsecs_t * timeToRetryNs)921 bool InputBufferManager::processNotifications(nsecs_t* timeToRetryNs) {
922 
923     struct Notification {
924         sp<IComponentListener> listener;
925         hidl_vec<IComponentListener::RenderedFrame> renderedFrames;
926         Notification(const sp<IComponentListener>& l, size_t s)
927               : listener(l), renderedFrames(s) {}
928     };
929     std::list<Notification> notifications;
930 
931     bool retry = false;
932     {
933         std::lock_guard<std::mutex> lock(mMutex);
934         *timeToRetryNs = kNotificationPeriodNs;
935         nsecs_t timeNowNs = systemTime();
936         for (auto it = mDeathNotifications.begin();
937                 it != mDeathNotifications.end(); ) {
938             sp<IComponentListener> listener = it->first.promote();
939             if (!listener) {
940                 ++it;
941                 continue;
942             }
943             DeathNotifications &deathNotifications = it->second;
944 
945             nsecs_t timeSinceLastNotifiedNs =
946                     timeNowNs - deathNotifications.lastSentNs;
947             // If not enough time has passed since the last callback, leave the
948             // notifications for this listener untouched for now and retry
949             // later.
950             if (timeSinceLastNotifiedNs < kNotificationPeriodNs) {
951                 retry = true;
952                 *timeToRetryNs = std::min(*timeToRetryNs,
953                         kNotificationPeriodNs - timeSinceLastNotifiedNs);
954                 ALOGV("InputBufferManager: Notifications for "
955                       "listener @ %p will be postponed.",
956                       listener.get());
957                 ++it;
958                 continue;
959             }
960 
961             // If enough time has passed since the last notification to this
962             // listener but there are currently no pending notifications, the
963             // listener can be removed from mDeathNotifications---there is no
964             // need to keep track of the last notification time anymore.
965             if (deathNotifications.count == 0) {
966                 it = mDeathNotifications.erase(it);
967                 continue;
968             }
969 
970             // Create the argument for the callback.
971             notifications.emplace_back(listener, deathNotifications.count);
972             hidl_vec<IComponentListener::RenderedFrame>& renderedFrames =
973                     notifications.back().renderedFrames;
974             size_t i = 0;
975             for (std::pair<const uint64_t, std::vector<size_t>>& p :
976                     deathNotifications.indices) {
977                 uint64_t frameIndex = p.first;
978                 const std::vector<size_t> &bufferIndices = p.second;
979                 for (const size_t& bufferIndex : bufferIndices) {
980                     IComponentListener::RenderedFrame &renderedFrame
981                             = renderedFrames[i++];
982                     renderedFrame.slotId = ~bufferIndex;
983                     renderedFrame.bufferQueueId = frameIndex;
984                     renderedFrame.timestampNs = timeNowNs;
985                     ALOGV("InputBufferManager: "
986                           "Sending death notification (listener @ %p, "
987                           "frameIndex = %llu, bufferIndex = %zu)",
988                           listener.get(),
989                           static_cast<long long unsigned>(frameIndex),
990                           bufferIndex);
991                 }
992             }
993 
994             // Clear deathNotifications for this listener and set retry to true
995             // so processNotifications will be called again. This will
996             // guarantee that a listener with no pending notifications will
997             // eventually be removed from mDeathNotifications after
998             // kNotificationPeriodNs nanoseconds has passed.
999             retry = true;
1000             deathNotifications.indices.clear();
1001             deathNotifications.count = 0;
1002             deathNotifications.lastSentNs = timeNowNs;
1003             ++it;
1004         }
1005     }
1006 
1007     // Call onFramesRendered outside the lock to avoid deadlock.
1008     for (const Notification& notification : notifications) {
1009         if (!notification.listener->onFramesRendered(
1010                 notification.renderedFrames).isOk()) {
1011             // This may trigger if the client has died.
1012             ALOGD("InputBufferManager: onFramesRendered transaction failed "
1013                   "(listener @ %p)",
1014                   notification.listener.get());
1015         }
1016     }
1017     if (retry) {
1018         ALOGV("InputBufferManager: Pending death notifications"
1019               "will be sent in %lldns.",
1020               static_cast<long long>(*timeToRetryNs));
1021     }
1022     return retry;
1023 }
1024 
main()1025 void InputBufferManager::main() {
1026     ALOGV("InputBufferManager: Starting main thread");
1027     nsecs_t timeToRetryNs;
1028     while (true) {
1029         std::unique_lock<std::mutex> lock(mMutex);
1030         while (mDeathNotifications.empty()) {
1031             ALOGV("InputBufferManager: Waiting for buffer deaths");
1032             mOnBufferDestroyed.wait(lock);
1033         }
1034         lock.unlock();
1035         ALOGV("InputBufferManager: Sending buffer death notifications");
1036         while (processNotifications(&timeToRetryNs)) {
1037             std::this_thread::sleep_for(
1038                     std::chrono::nanoseconds(timeToRetryNs));
1039             ALOGV("InputBufferManager: Sending pending death notifications");
1040         }
1041         ALOGV("InputBufferManager: No pending death notifications");
1042     }
1043 }
1044 
InputBufferManager()1045 InputBufferManager::InputBufferManager()
1046       : mMainThread(&InputBufferManager::main, this) {
1047 }
1048 
getInstance()1049 InputBufferManager& InputBufferManager::getInstance() {
1050     static InputBufferManager instance{};
1051     return instance;
1052 }
1053 
1054 }  // namespace utils
1055 }  // namespace V1_0
1056 }  // namespace c2
1057 }  // namespace media
1058 }  // namespace google
1059 }  // namespace hardware
1060