1 /*
2 * Copyright 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "Codec2-InputBufferManager"
19 #include <android-base/logging.h>
20
21 #include <codec2/hidl/1.0/InputBufferManager.h>
22 #include <codec2/hidl/1.0/types.h>
23
24 #include <android/hardware/media/c2/1.0/IComponentListener.h>
25 #include <android-base/logging.h>
26
27 #include <C2Buffer.h>
28 #include <C2Work.h>
29
30 #include <chrono>
31
32 namespace android {
33 namespace hardware {
34 namespace media {
35 namespace c2 {
36 namespace V1_0 {
37 namespace utils {
38
39 using namespace ::android;
40
registerFrameData(const sp<IComponentListener> & listener,const C2FrameData & input)41 void InputBufferManager::registerFrameData(
42 const sp<IComponentListener>& listener,
43 const C2FrameData& input) {
44 getInstance()._registerFrameData(listener, input);
45 }
46
unregisterFrameData(const wp<IComponentListener> & listener,const C2FrameData & input)47 void InputBufferManager::unregisterFrameData(
48 const wp<IComponentListener>& listener,
49 const C2FrameData& input) {
50 getInstance()._unregisterFrameData(listener, input);
51 }
52
unregisterFrameData(const wp<IComponentListener> & listener)53 void InputBufferManager::unregisterFrameData(
54 const wp<IComponentListener>& listener) {
55 getInstance()._unregisterFrameData(listener);
56 }
57
setNotificationInterval(nsecs_t notificationIntervalNs)58 void InputBufferManager::setNotificationInterval(
59 nsecs_t notificationIntervalNs) {
60 getInstance()._setNotificationInterval(notificationIntervalNs);
61 }
62
_registerFrameData(const sp<IComponentListener> & listener,const C2FrameData & input)63 void InputBufferManager::_registerFrameData(
64 const sp<IComponentListener>& listener,
65 const C2FrameData& input) {
66 uint64_t frameIndex = input.ordinal.frameIndex.peeku();
67 LOG(VERBOSE) << "InputBufferManager::_registerFrameData -- called with "
68 << "listener @ 0x" << std::hex << listener.get()
69 << ", frameIndex = " << std::dec << frameIndex
70 << ".";
71 std::lock_guard<std::mutex> lock(mMutex);
72
73 std::set<TrackedBuffer> &bufferIds =
74 mTrackedBuffersMap[listener][frameIndex];
75
76 for (size_t i = 0; i < input.buffers.size(); ++i) {
77 if (!input.buffers[i]) {
78 LOG(VERBOSE) << "InputBufferManager::_registerFrameData -- "
79 << "Input buffer at index " << i << " is null.";
80 continue;
81 }
82 const TrackedBuffer &bufferId =
83 *bufferIds.emplace(listener, frameIndex, i, input.buffers[i]).
84 first;
85
86 c2_status_t status = input.buffers[i]->registerOnDestroyNotify(
87 onBufferDestroyed,
88 const_cast<void*>(reinterpret_cast<const void*>(&bufferId)));
89 if (status != C2_OK) {
90 LOG(DEBUG) << "InputBufferManager::_registerFrameData -- "
91 << "registerOnDestroyNotify() failed "
92 << "(listener @ 0x" << std::hex << listener.get()
93 << ", frameIndex = " << std::dec << frameIndex
94 << ", bufferIndex = " << i
95 << ") => status = " << status
96 << ".";
97 }
98 }
99
100 mDeathNotifications.emplace(
101 listener,
102 DeathNotifications(
103 mNotificationIntervalNs.load(std::memory_order_relaxed)));
104 }
105
106 // Remove a pair (listener, frameIndex) from mTrackedBuffersMap and
107 // mDeathNotifications. This implies all bufferIndices are removed.
108 //
109 // This is called from onWorkDone() and flush().
_unregisterFrameData(const wp<IComponentListener> & listener,const C2FrameData & input)110 void InputBufferManager::_unregisterFrameData(
111 const wp<IComponentListener>& listener,
112 const C2FrameData& input) {
113 uint64_t frameIndex = input.ordinal.frameIndex.peeku();
114 LOG(VERBOSE) << "InputBufferManager::_unregisterFrameData -- called with "
115 << "listener @ 0x" << std::hex << listener.unsafe_get()
116 << ", frameIndex = " << std::dec << frameIndex
117 << ".";
118 std::lock_guard<std::mutex> lock(mMutex);
119
120 auto findListener = mTrackedBuffersMap.find(listener);
121 if (findListener != mTrackedBuffersMap.end()) {
122 std::map<uint64_t, std::set<TrackedBuffer>> &frameIndex2BufferIds
123 = findListener->second;
124 auto findFrameIndex = frameIndex2BufferIds.find(frameIndex);
125 if (findFrameIndex != frameIndex2BufferIds.end()) {
126 std::set<TrackedBuffer> &bufferIds = findFrameIndex->second;
127 for (const TrackedBuffer& bufferId : bufferIds) {
128 std::shared_ptr<C2Buffer> buffer = bufferId.buffer.lock();
129 if (buffer) {
130 c2_status_t status = buffer->unregisterOnDestroyNotify(
131 onBufferDestroyed,
132 const_cast<void*>(
133 reinterpret_cast<const void*>(&bufferId)));
134 if (status != C2_OK) {
135 LOG(DEBUG) << "InputBufferManager::_unregisterFrameData "
136 << "-- unregisterOnDestroyNotify() failed "
137 << "(listener @ 0x"
138 << std::hex
139 << bufferId.listener.unsafe_get()
140 << ", frameIndex = "
141 << std::dec << bufferId.frameIndex
142 << ", bufferIndex = " << bufferId.bufferIndex
143 << ") => status = " << status
144 << ".";
145 }
146 }
147 }
148
149 frameIndex2BufferIds.erase(findFrameIndex);
150 if (frameIndex2BufferIds.empty()) {
151 mTrackedBuffersMap.erase(findListener);
152 }
153 }
154 }
155
156 auto findListenerD = mDeathNotifications.find(listener);
157 if (findListenerD != mDeathNotifications.end()) {
158 DeathNotifications &deathNotifications = findListenerD->second;
159 auto findFrameIndex = deathNotifications.indices.find(frameIndex);
160 if (findFrameIndex != deathNotifications.indices.end()) {
161 std::vector<size_t> &bufferIndices = findFrameIndex->second;
162 deathNotifications.count -= bufferIndices.size();
163 deathNotifications.indices.erase(findFrameIndex);
164 }
165 }
166 }
167
168 // Remove listener from mTrackedBuffersMap and mDeathNotifications. This implies
169 // all frameIndices and bufferIndices are removed.
170 //
171 // This is called when the component cleans up all input buffers, i.e., when
172 // reset(), release(), stop() or ~Component() is called.
_unregisterFrameData(const wp<IComponentListener> & listener)173 void InputBufferManager::_unregisterFrameData(
174 const wp<IComponentListener>& listener) {
175 LOG(VERBOSE) << "InputBufferManager::_unregisterFrameData -- called with "
176 << "listener @ 0x" << std::hex << listener.unsafe_get()
177 << std::dec << ".";
178 std::lock_guard<std::mutex> lock(mMutex);
179
180 auto findListener = mTrackedBuffersMap.find(listener);
181 if (findListener != mTrackedBuffersMap.end()) {
182 std::map<uint64_t, std::set<TrackedBuffer>> &frameIndex2BufferIds =
183 findListener->second;
184 for (auto findFrameIndex = frameIndex2BufferIds.begin();
185 findFrameIndex != frameIndex2BufferIds.end();
186 ++findFrameIndex) {
187 std::set<TrackedBuffer> &bufferIds = findFrameIndex->second;
188 for (const TrackedBuffer& bufferId : bufferIds) {
189 std::shared_ptr<C2Buffer> buffer = bufferId.buffer.lock();
190 if (buffer) {
191 c2_status_t status = buffer->unregisterOnDestroyNotify(
192 onBufferDestroyed,
193 const_cast<void*>(
194 reinterpret_cast<const void*>(&bufferId)));
195 if (status != C2_OK) {
196 LOG(DEBUG) << "InputBufferManager::_unregisterFrameData "
197 << "-- unregisterOnDestroyNotify() failed "
198 << "(listener @ 0x"
199 << std::hex
200 << bufferId.listener.unsafe_get()
201 << ", frameIndex = "
202 << std::dec << bufferId.frameIndex
203 << ", bufferIndex = " << bufferId.bufferIndex
204 << ") => status = " << status
205 << ".";
206 }
207 }
208 }
209 }
210 mTrackedBuffersMap.erase(findListener);
211 }
212
213 mDeathNotifications.erase(listener);
214 }
215
216 // Set mNotificationIntervalNs.
_setNotificationInterval(nsecs_t notificationIntervalNs)217 void InputBufferManager::_setNotificationInterval(
218 nsecs_t notificationIntervalNs) {
219 mNotificationIntervalNs.store(
220 notificationIntervalNs,
221 std::memory_order_relaxed);
222 }
223
224 // Move a buffer from mTrackedBuffersMap to mDeathNotifications.
225 // This is called when a registered C2Buffer object is destroyed.
onBufferDestroyed(const C2Buffer * buf,void * arg)226 void InputBufferManager::onBufferDestroyed(const C2Buffer* buf, void* arg) {
227 getInstance()._onBufferDestroyed(buf, arg);
228 }
229
_onBufferDestroyed(const C2Buffer * buf,void * arg)230 void InputBufferManager::_onBufferDestroyed(const C2Buffer* buf, void* arg) {
231 if (!buf || !arg) {
232 LOG(WARNING) << "InputBufferManager::_onBufferDestroyed -- called with "
233 << "null argument (s): "
234 << "buf @ 0x" << std::hex << buf
235 << ", arg @ 0x" << std::hex << arg
236 << std::dec << ".";
237 return;
238 }
239 TrackedBuffer id(*reinterpret_cast<TrackedBuffer*>(arg));
240 LOG(VERBOSE) << "InputBufferManager::_onBufferDestroyed -- called with "
241 << "buf @ 0x" << std::hex << buf
242 << ", arg @ 0x" << std::hex << arg
243 << std::dec << " -- "
244 << "listener @ 0x" << std::hex << id.listener.unsafe_get()
245 << ", frameIndex = " << std::dec << id.frameIndex
246 << ", bufferIndex = " << id.bufferIndex
247 << ".";
248
249 std::lock_guard<std::mutex> lock(mMutex);
250
251 auto findListener = mTrackedBuffersMap.find(id.listener);
252 if (findListener == mTrackedBuffersMap.end()) {
253 LOG(DEBUG) << "InputBufferManager::_onBufferDestroyed -- "
254 << "received invalid listener: "
255 << "listener @ 0x" << std::hex << id.listener.unsafe_get()
256 << " (frameIndex = " << std::dec << id.frameIndex
257 << ", bufferIndex = " << id.bufferIndex
258 << ").";
259 return;
260 }
261
262 std::map<uint64_t, std::set<TrackedBuffer>> &frameIndex2BufferIds
263 = findListener->second;
264 auto findFrameIndex = frameIndex2BufferIds.find(id.frameIndex);
265 if (findFrameIndex == frameIndex2BufferIds.end()) {
266 LOG(DEBUG) << "InputBufferManager::_onBufferDestroyed -- "
267 << "received invalid frame index: "
268 << "frameIndex = " << id.frameIndex
269 << " (listener @ 0x" << std::hex << id.listener.unsafe_get()
270 << ", bufferIndex = " << std::dec << id.bufferIndex
271 << ").";
272 return;
273 }
274
275 std::set<TrackedBuffer> &bufferIds = findFrameIndex->second;
276 auto findBufferId = bufferIds.find(id);
277 if (findBufferId == bufferIds.end()) {
278 LOG(DEBUG) << "InputBufferManager::_onBufferDestroyed -- "
279 << "received invalid buffer index: "
280 << "bufferIndex = " << id.bufferIndex
281 << " (frameIndex = " << id.frameIndex
282 << ", listener @ 0x" << std::hex << id.listener.unsafe_get()
283 << std::dec << ").";
284 return;
285 }
286
287 bufferIds.erase(findBufferId);
288 if (bufferIds.empty()) {
289 frameIndex2BufferIds.erase(findFrameIndex);
290 if (frameIndex2BufferIds.empty()) {
291 mTrackedBuffersMap.erase(findListener);
292 }
293 }
294
295 DeathNotifications &deathNotifications = mDeathNotifications[id.listener];
296 deathNotifications.indices[id.frameIndex].emplace_back(id.bufferIndex);
297 ++deathNotifications.count;
298 mOnBufferDestroyed.notify_one();
299 }
300
301 // Notify the clients about buffer destructions.
302 // Return false if all destructions have been notified.
303 // Return true and set timeToRetry to the time point to wait for before
304 // retrying if some destructions have not been notified.
processNotifications(nsecs_t * timeToRetryNs)305 bool InputBufferManager::processNotifications(nsecs_t* timeToRetryNs) {
306
307 struct Notification {
308 sp<IComponentListener> listener;
309 hidl_vec<IComponentListener::InputBuffer> inputBuffers;
310 Notification(const sp<IComponentListener>& l, size_t s)
311 : listener(l), inputBuffers(s) {}
312 };
313 std::list<Notification> notifications;
314 nsecs_t notificationIntervalNs =
315 mNotificationIntervalNs.load(std::memory_order_relaxed);
316
317 bool retry = false;
318 {
319 std::lock_guard<std::mutex> lock(mMutex);
320 *timeToRetryNs = notificationIntervalNs;
321 nsecs_t timeNowNs = systemTime();
322 for (auto it = mDeathNotifications.begin();
323 it != mDeathNotifications.end(); ) {
324 sp<IComponentListener> listener = it->first.promote();
325 if (!listener) {
326 ++it;
327 continue;
328 }
329 DeathNotifications &deathNotifications = it->second;
330
331 nsecs_t timeSinceLastNotifiedNs =
332 timeNowNs - deathNotifications.lastSentNs;
333 // If not enough time has passed since the last callback, leave the
334 // notifications for this listener untouched for now and retry
335 // later.
336 if (timeSinceLastNotifiedNs < notificationIntervalNs) {
337 retry = true;
338 *timeToRetryNs = std::min(*timeToRetryNs,
339 notificationIntervalNs - timeSinceLastNotifiedNs);
340 LOG(VERBOSE) << "InputBufferManager::processNotifications -- "
341 << "Notifications for listener @ "
342 << std::hex << listener.get()
343 << " will be postponed.";
344 ++it;
345 continue;
346 }
347
348 // If enough time has passed since the last notification to this
349 // listener but there are currently no pending notifications, the
350 // listener can be removed from mDeathNotifications---there is no
351 // need to keep track of the last notification time anymore.
352 if (deathNotifications.count == 0) {
353 it = mDeathNotifications.erase(it);
354 continue;
355 }
356
357 // Create the argument for the callback.
358 notifications.emplace_back(listener, deathNotifications.count);
359 hidl_vec<IComponentListener::InputBuffer> &inputBuffers =
360 notifications.back().inputBuffers;
361 size_t i = 0;
362 for (std::pair<const uint64_t, std::vector<size_t>>& p :
363 deathNotifications.indices) {
364 uint64_t frameIndex = p.first;
365 const std::vector<size_t> &bufferIndices = p.second;
366 for (const size_t& bufferIndex : bufferIndices) {
367 IComponentListener::InputBuffer &inputBuffer
368 = inputBuffers[i++];
369 inputBuffer.arrayIndex = bufferIndex;
370 inputBuffer.frameIndex = frameIndex;
371 }
372 }
373
374 // Clear deathNotifications for this listener and set retry to true
375 // so processNotifications will be called again. This will
376 // guarantee that a listener with no pending notifications will
377 // eventually be removed from mDeathNotifications after
378 // mNotificationIntervalNs nanoseconds has passed.
379 retry = true;
380 deathNotifications.indices.clear();
381 deathNotifications.count = 0;
382 deathNotifications.lastSentNs = timeNowNs;
383 ++it;
384 }
385 }
386
387 // Call onInputBuffersReleased() outside the lock to avoid deadlock.
388 for (const Notification& notification : notifications) {
389 if (!notification.listener->onInputBuffersReleased(
390 notification.inputBuffers).isOk()) {
391 // This may trigger if the client has died.
392 LOG(DEBUG) << "InputBufferManager::processNotifications -- "
393 << "failed to send death notifications to "
394 << "listener @ 0x" << std::hex
395 << notification.listener.get()
396 << std::dec << ".";
397 } else {
398 #if LOG_NDEBUG == 0
399 std::stringstream inputBufferLog;
400 for (const IComponentListener::InputBuffer& inputBuffer :
401 notification.inputBuffers) {
402 inputBufferLog << " (" << inputBuffer.frameIndex
403 << ", " << inputBuffer.arrayIndex
404 << ")";
405 }
406 LOG(VERBOSE) << "InputBufferManager::processNotifications -- "
407 << "death notifications sent to "
408 << "listener @ 0x" << std::hex
409 << notification.listener.get()
410 << std::dec
411 << " with these (frameIndex, bufferIndex) pairs:"
412 << inputBufferLog.str();
413 #endif
414 }
415 }
416 #if LOG_NDEBUG == 0
417 if (retry) {
418 LOG(VERBOSE) << "InputBufferManager::processNotifications -- "
419 << "will retry again in " << *timeToRetryNs << "ns.";
420 } else {
421 LOG(VERBOSE) << "InputBufferManager::processNotifications -- "
422 << "no pending death notifications.";
423 }
424 #endif
425 return retry;
426 }
427
main()428 void InputBufferManager::main() {
429 LOG(VERBOSE) << "InputBufferManager main -- started.";
430 nsecs_t timeToRetryNs;
431 while (true) {
432 std::unique_lock<std::mutex> lock(mMutex);
433 while (mDeathNotifications.empty()) {
434 mOnBufferDestroyed.wait(lock);
435 }
436 lock.unlock();
437 while (processNotifications(&timeToRetryNs)) {
438 std::this_thread::sleep_for(
439 std::chrono::nanoseconds(timeToRetryNs));
440 }
441 }
442 }
443
InputBufferManager()444 InputBufferManager::InputBufferManager()
445 : mMainThread{&InputBufferManager::main, this} {
446 }
447
getInstance()448 InputBufferManager& InputBufferManager::getInstance() {
449 static InputBufferManager instance{};
450 return instance;
451 }
452
453 } // namespace utils
454 } // namespace V1_0
455 } // namespace c2
456 } // namespace media
457 } // namespace hardware
458 } // namespace android
459
460
461
462