1 /*
2  * Copyright 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "Codec2-InputBufferManager"
19 #include <android-base/logging.h>
20 
21 #include <codec2/hidl/1.0/InputBufferManager.h>
22 #include <codec2/hidl/1.0/types.h>
23 
24 #include <android/hardware/media/c2/1.0/IComponentListener.h>
25 #include <android-base/logging.h>
26 
27 #include <C2Buffer.h>
28 #include <C2Work.h>
29 
30 #include <chrono>
31 
32 namespace android {
33 namespace hardware {
34 namespace media {
35 namespace c2 {
36 namespace V1_0 {
37 namespace utils {
38 
39 using namespace ::android;
40 
registerFrameData(const sp<IComponentListener> & listener,const C2FrameData & input)41 void InputBufferManager::registerFrameData(
42         const sp<IComponentListener>& listener,
43         const C2FrameData& input) {
44     getInstance()._registerFrameData(listener, input);
45 }
46 
unregisterFrameData(const wp<IComponentListener> & listener,const C2FrameData & input)47 void InputBufferManager::unregisterFrameData(
48         const wp<IComponentListener>& listener,
49         const C2FrameData& input) {
50     getInstance()._unregisterFrameData(listener, input);
51 }
52 
unregisterFrameData(const wp<IComponentListener> & listener)53 void InputBufferManager::unregisterFrameData(
54         const wp<IComponentListener>& listener) {
55     getInstance()._unregisterFrameData(listener);
56 }
57 
setNotificationInterval(nsecs_t notificationIntervalNs)58 void InputBufferManager::setNotificationInterval(
59         nsecs_t notificationIntervalNs) {
60     getInstance()._setNotificationInterval(notificationIntervalNs);
61 }
62 
_registerFrameData(const sp<IComponentListener> & listener,const C2FrameData & input)63 void InputBufferManager::_registerFrameData(
64         const sp<IComponentListener>& listener,
65         const C2FrameData& input) {
66     uint64_t frameIndex = input.ordinal.frameIndex.peeku();
67     LOG(VERBOSE) << "InputBufferManager::_registerFrameData -- called with "
68                  << "listener @ 0x" << std::hex << listener.get()
69                  << ", frameIndex = " << std::dec << frameIndex
70                  << ".";
71     std::lock_guard<std::mutex> lock(mMutex);
72 
73     std::set<TrackedBuffer> &bufferIds =
74             mTrackedBuffersMap[listener][frameIndex];
75 
76     for (size_t i = 0; i < input.buffers.size(); ++i) {
77         if (!input.buffers[i]) {
78             LOG(VERBOSE) << "InputBufferManager::_registerFrameData -- "
79                          << "Input buffer at index " << i << " is null.";
80             continue;
81         }
82         const TrackedBuffer &bufferId =
83                 *bufferIds.emplace(listener, frameIndex, i, input.buffers[i]).
84                 first;
85 
86         c2_status_t status = input.buffers[i]->registerOnDestroyNotify(
87                 onBufferDestroyed,
88                 const_cast<void*>(reinterpret_cast<const void*>(&bufferId)));
89         if (status != C2_OK) {
90             LOG(DEBUG) << "InputBufferManager::_registerFrameData -- "
91                        << "registerOnDestroyNotify() failed "
92                        << "(listener @ 0x" << std::hex << listener.get()
93                        << ", frameIndex = " << std::dec << frameIndex
94                        << ", bufferIndex = " << i
95                        << ") => status = " << status
96                        << ".";
97         }
98     }
99 
100     mDeathNotifications.emplace(
101             listener,
102             DeathNotifications(
103                 mNotificationIntervalNs.load(std::memory_order_relaxed)));
104 }
105 
106 // Remove a pair (listener, frameIndex) from mTrackedBuffersMap and
107 // mDeathNotifications. This implies all bufferIndices are removed.
108 //
109 // This is called from onWorkDone() and flush().
_unregisterFrameData(const wp<IComponentListener> & listener,const C2FrameData & input)110 void InputBufferManager::_unregisterFrameData(
111         const wp<IComponentListener>& listener,
112         const C2FrameData& input) {
113     uint64_t frameIndex = input.ordinal.frameIndex.peeku();
114     LOG(VERBOSE) << "InputBufferManager::_unregisterFrameData -- called with "
115                  << "listener @ 0x" << std::hex << listener.unsafe_get()
116                  << ", frameIndex = " << std::dec << frameIndex
117                  << ".";
118     std::lock_guard<std::mutex> lock(mMutex);
119 
120     auto findListener = mTrackedBuffersMap.find(listener);
121     if (findListener != mTrackedBuffersMap.end()) {
122         std::map<uint64_t, std::set<TrackedBuffer>> &frameIndex2BufferIds
123                 = findListener->second;
124         auto findFrameIndex = frameIndex2BufferIds.find(frameIndex);
125         if (findFrameIndex != frameIndex2BufferIds.end()) {
126             std::set<TrackedBuffer> &bufferIds = findFrameIndex->second;
127             for (const TrackedBuffer& bufferId : bufferIds) {
128                 std::shared_ptr<C2Buffer> buffer = bufferId.buffer.lock();
129                 if (buffer) {
130                     c2_status_t status = buffer->unregisterOnDestroyNotify(
131                             onBufferDestroyed,
132                             const_cast<void*>(
133                             reinterpret_cast<const void*>(&bufferId)));
134                     if (status != C2_OK) {
135                         LOG(DEBUG) << "InputBufferManager::_unregisterFrameData "
136                                    << "-- unregisterOnDestroyNotify() failed "
137                                    << "(listener @ 0x"
138                                         << std::hex
139                                         << bufferId.listener.unsafe_get()
140                                    << ", frameIndex = "
141                                         << std::dec << bufferId.frameIndex
142                                    << ", bufferIndex = " << bufferId.bufferIndex
143                                    << ") => status = " << status
144                                    << ".";
145                     }
146                 }
147             }
148 
149             frameIndex2BufferIds.erase(findFrameIndex);
150             if (frameIndex2BufferIds.empty()) {
151                 mTrackedBuffersMap.erase(findListener);
152             }
153         }
154     }
155 
156     auto findListenerD = mDeathNotifications.find(listener);
157     if (findListenerD != mDeathNotifications.end()) {
158         DeathNotifications &deathNotifications = findListenerD->second;
159         auto findFrameIndex = deathNotifications.indices.find(frameIndex);
160         if (findFrameIndex != deathNotifications.indices.end()) {
161             std::vector<size_t> &bufferIndices = findFrameIndex->second;
162             deathNotifications.count -= bufferIndices.size();
163             deathNotifications.indices.erase(findFrameIndex);
164         }
165     }
166 }
167 
168 // Remove listener from mTrackedBuffersMap and mDeathNotifications. This implies
169 // all frameIndices and bufferIndices are removed.
170 //
171 // This is called when the component cleans up all input buffers, i.e., when
172 // reset(), release(), stop() or ~Component() is called.
_unregisterFrameData(const wp<IComponentListener> & listener)173 void InputBufferManager::_unregisterFrameData(
174         const wp<IComponentListener>& listener) {
175     LOG(VERBOSE) << "InputBufferManager::_unregisterFrameData -- called with "
176                  << "listener @ 0x" << std::hex << listener.unsafe_get()
177                  << std::dec << ".";
178     std::lock_guard<std::mutex> lock(mMutex);
179 
180     auto findListener = mTrackedBuffersMap.find(listener);
181     if (findListener != mTrackedBuffersMap.end()) {
182         std::map<uint64_t, std::set<TrackedBuffer>> &frameIndex2BufferIds =
183                 findListener->second;
184         for (auto findFrameIndex = frameIndex2BufferIds.begin();
185                 findFrameIndex != frameIndex2BufferIds.end();
186                 ++findFrameIndex) {
187             std::set<TrackedBuffer> &bufferIds = findFrameIndex->second;
188             for (const TrackedBuffer& bufferId : bufferIds) {
189                 std::shared_ptr<C2Buffer> buffer = bufferId.buffer.lock();
190                 if (buffer) {
191                     c2_status_t status = buffer->unregisterOnDestroyNotify(
192                             onBufferDestroyed,
193                             const_cast<void*>(
194                             reinterpret_cast<const void*>(&bufferId)));
195                     if (status != C2_OK) {
196                         LOG(DEBUG) << "InputBufferManager::_unregisterFrameData "
197                                    << "-- unregisterOnDestroyNotify() failed "
198                                    << "(listener @ 0x"
199                                         << std::hex
200                                         << bufferId.listener.unsafe_get()
201                                    << ", frameIndex = "
202                                         << std::dec << bufferId.frameIndex
203                                    << ", bufferIndex = " << bufferId.bufferIndex
204                                    << ") => status = " << status
205                                    << ".";
206                     }
207                 }
208             }
209         }
210         mTrackedBuffersMap.erase(findListener);
211     }
212 
213     mDeathNotifications.erase(listener);
214 }
215 
216 // Set mNotificationIntervalNs.
_setNotificationInterval(nsecs_t notificationIntervalNs)217 void InputBufferManager::_setNotificationInterval(
218         nsecs_t notificationIntervalNs) {
219     mNotificationIntervalNs.store(
220             notificationIntervalNs,
221             std::memory_order_relaxed);
222 }
223 
224 // Move a buffer from mTrackedBuffersMap to mDeathNotifications.
225 // This is called when a registered C2Buffer object is destroyed.
onBufferDestroyed(const C2Buffer * buf,void * arg)226 void InputBufferManager::onBufferDestroyed(const C2Buffer* buf, void* arg) {
227     getInstance()._onBufferDestroyed(buf, arg);
228 }
229 
_onBufferDestroyed(const C2Buffer * buf,void * arg)230 void InputBufferManager::_onBufferDestroyed(const C2Buffer* buf, void* arg) {
231     if (!buf || !arg) {
232         LOG(WARNING) << "InputBufferManager::_onBufferDestroyed -- called with "
233                      << "null argument (s): "
234                      << "buf @ 0x" << std::hex << buf
235                      << ", arg @ 0x" << std::hex << arg
236                      << std::dec << ".";
237         return;
238     }
239     TrackedBuffer id(*reinterpret_cast<TrackedBuffer*>(arg));
240     LOG(VERBOSE) << "InputBufferManager::_onBufferDestroyed -- called with "
241                  << "buf @ 0x" << std::hex << buf
242                  << ", arg @ 0x" << std::hex << arg
243                  << std::dec << " -- "
244                  << "listener @ 0x" << std::hex << id.listener.unsafe_get()
245                  << ", frameIndex = " << std::dec << id.frameIndex
246                  << ", bufferIndex = " << id.bufferIndex
247                  << ".";
248 
249     std::lock_guard<std::mutex> lock(mMutex);
250 
251     auto findListener = mTrackedBuffersMap.find(id.listener);
252     if (findListener == mTrackedBuffersMap.end()) {
253         LOG(DEBUG) << "InputBufferManager::_onBufferDestroyed -- "
254                    << "received invalid listener: "
255                    << "listener @ 0x" << std::hex << id.listener.unsafe_get()
256                    << " (frameIndex = " << std::dec << id.frameIndex
257                    << ", bufferIndex = " << id.bufferIndex
258                    << ").";
259         return;
260     }
261 
262     std::map<uint64_t, std::set<TrackedBuffer>> &frameIndex2BufferIds
263             = findListener->second;
264     auto findFrameIndex = frameIndex2BufferIds.find(id.frameIndex);
265     if (findFrameIndex == frameIndex2BufferIds.end()) {
266         LOG(DEBUG) << "InputBufferManager::_onBufferDestroyed -- "
267                    << "received invalid frame index: "
268                    << "frameIndex = " << id.frameIndex
269                    << " (listener @ 0x" << std::hex << id.listener.unsafe_get()
270                    << ", bufferIndex = " << std::dec << id.bufferIndex
271                    << ").";
272         return;
273     }
274 
275     std::set<TrackedBuffer> &bufferIds = findFrameIndex->second;
276     auto findBufferId = bufferIds.find(id);
277     if (findBufferId == bufferIds.end()) {
278         LOG(DEBUG) << "InputBufferManager::_onBufferDestroyed -- "
279                    << "received invalid buffer index: "
280                    << "bufferIndex = " << id.bufferIndex
281                    << " (frameIndex = " << id.frameIndex
282                    << ", listener @ 0x" << std::hex << id.listener.unsafe_get()
283                    << std::dec << ").";
284         return;
285     }
286 
287     bufferIds.erase(findBufferId);
288     if (bufferIds.empty()) {
289         frameIndex2BufferIds.erase(findFrameIndex);
290         if (frameIndex2BufferIds.empty()) {
291             mTrackedBuffersMap.erase(findListener);
292         }
293     }
294 
295     DeathNotifications &deathNotifications = mDeathNotifications[id.listener];
296     deathNotifications.indices[id.frameIndex].emplace_back(id.bufferIndex);
297     ++deathNotifications.count;
298     mOnBufferDestroyed.notify_one();
299 }
300 
301 // Notify the clients about buffer destructions.
302 // Return false if all destructions have been notified.
303 // Return true and set timeToRetry to the time point to wait for before
304 // retrying if some destructions have not been notified.
processNotifications(nsecs_t * timeToRetryNs)305 bool InputBufferManager::processNotifications(nsecs_t* timeToRetryNs) {
306 
307     struct Notification {
308         sp<IComponentListener> listener;
309         hidl_vec<IComponentListener::InputBuffer> inputBuffers;
310         Notification(const sp<IComponentListener>& l, size_t s)
311               : listener(l), inputBuffers(s) {}
312     };
313     std::list<Notification> notifications;
314     nsecs_t notificationIntervalNs =
315             mNotificationIntervalNs.load(std::memory_order_relaxed);
316 
317     bool retry = false;
318     {
319         std::lock_guard<std::mutex> lock(mMutex);
320         *timeToRetryNs = notificationIntervalNs;
321         nsecs_t timeNowNs = systemTime();
322         for (auto it = mDeathNotifications.begin();
323                 it != mDeathNotifications.end(); ) {
324             sp<IComponentListener> listener = it->first.promote();
325             if (!listener) {
326                 ++it;
327                 continue;
328             }
329             DeathNotifications &deathNotifications = it->second;
330 
331             nsecs_t timeSinceLastNotifiedNs =
332                     timeNowNs - deathNotifications.lastSentNs;
333             // If not enough time has passed since the last callback, leave the
334             // notifications for this listener untouched for now and retry
335             // later.
336             if (timeSinceLastNotifiedNs < notificationIntervalNs) {
337                 retry = true;
338                 *timeToRetryNs = std::min(*timeToRetryNs,
339                         notificationIntervalNs - timeSinceLastNotifiedNs);
340                 LOG(VERBOSE) << "InputBufferManager::processNotifications -- "
341                              << "Notifications for listener @ "
342                                  << std::hex << listener.get()
343                              << " will be postponed.";
344                 ++it;
345                 continue;
346             }
347 
348             // If enough time has passed since the last notification to this
349             // listener but there are currently no pending notifications, the
350             // listener can be removed from mDeathNotifications---there is no
351             // need to keep track of the last notification time anymore.
352             if (deathNotifications.count == 0) {
353                 it = mDeathNotifications.erase(it);
354                 continue;
355             }
356 
357             // Create the argument for the callback.
358             notifications.emplace_back(listener, deathNotifications.count);
359             hidl_vec<IComponentListener::InputBuffer> &inputBuffers =
360                     notifications.back().inputBuffers;
361             size_t i = 0;
362             for (std::pair<const uint64_t, std::vector<size_t>>& p :
363                     deathNotifications.indices) {
364                 uint64_t frameIndex = p.first;
365                 const std::vector<size_t> &bufferIndices = p.second;
366                 for (const size_t& bufferIndex : bufferIndices) {
367                     IComponentListener::InputBuffer &inputBuffer
368                             = inputBuffers[i++];
369                     inputBuffer.arrayIndex = bufferIndex;
370                     inputBuffer.frameIndex = frameIndex;
371                 }
372             }
373 
374             // Clear deathNotifications for this listener and set retry to true
375             // so processNotifications will be called again. This will
376             // guarantee that a listener with no pending notifications will
377             // eventually be removed from mDeathNotifications after
378             // mNotificationIntervalNs nanoseconds has passed.
379             retry = true;
380             deathNotifications.indices.clear();
381             deathNotifications.count = 0;
382             deathNotifications.lastSentNs = timeNowNs;
383             ++it;
384         }
385     }
386 
387     // Call onInputBuffersReleased() outside the lock to avoid deadlock.
388     for (const Notification& notification : notifications) {
389         if (!notification.listener->onInputBuffersReleased(
390                 notification.inputBuffers).isOk()) {
391             // This may trigger if the client has died.
392             LOG(DEBUG) << "InputBufferManager::processNotifications -- "
393                        << "failed to send death notifications to "
394                        << "listener @ 0x" << std::hex
395                                           << notification.listener.get()
396                        << std::dec << ".";
397         } else {
398 #if LOG_NDEBUG == 0
399             std::stringstream inputBufferLog;
400             for (const IComponentListener::InputBuffer& inputBuffer :
401                     notification.inputBuffers) {
402                 inputBufferLog << " (" << inputBuffer.frameIndex
403                                << ", " << inputBuffer.arrayIndex
404                                << ")";
405             }
406             LOG(VERBOSE) << "InputBufferManager::processNotifications -- "
407                          << "death notifications sent to "
408                          << "listener @ 0x" << std::hex
409                                             << notification.listener.get()
410                                             << std::dec
411                          << " with these (frameIndex, bufferIndex) pairs:"
412                          << inputBufferLog.str();
413 #endif
414         }
415     }
416 #if LOG_NDEBUG == 0
417     if (retry) {
418         LOG(VERBOSE) << "InputBufferManager::processNotifications -- "
419                      << "will retry again in " << *timeToRetryNs << "ns.";
420     } else {
421         LOG(VERBOSE) << "InputBufferManager::processNotifications -- "
422                      << "no pending death notifications.";
423     }
424 #endif
425     return retry;
426 }
427 
main()428 void InputBufferManager::main() {
429     LOG(VERBOSE) << "InputBufferManager main -- started.";
430     nsecs_t timeToRetryNs;
431     while (true) {
432         std::unique_lock<std::mutex> lock(mMutex);
433         while (mDeathNotifications.empty()) {
434             mOnBufferDestroyed.wait(lock);
435         }
436         lock.unlock();
437         while (processNotifications(&timeToRetryNs)) {
438             std::this_thread::sleep_for(
439                     std::chrono::nanoseconds(timeToRetryNs));
440         }
441     }
442 }
443 
InputBufferManager()444 InputBufferManager::InputBufferManager()
445       : mMainThread{&InputBufferManager::main, this} {
446 }
447 
getInstance()448 InputBufferManager& InputBufferManager::getInstance() {
449     static InputBufferManager instance{};
450     return instance;
451 }
452 
453 }  // namespace utils
454 }  // namespace V1_0
455 }  // namespace c2
456 }  // namespace media
457 }  // namespace hardware
458 }  // namespace android
459 
460 
461 
462