1 /*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #define LOG_TAG "BufferPoolAccessor"
18 //#define LOG_NDEBUG 0
19
20 #include <sys/types.h>
21 #include <time.h>
22 #include <unistd.h>
23 #include <utils/Log.h>
24 #include <thread>
25 #include "AccessorImpl.h"
26 #include "Connection.h"
27
28 namespace android {
29 namespace hardware {
30 namespace media {
31 namespace bufferpool {
32 namespace V2_0 {
33 namespace implementation {
34
35 namespace {
36 static constexpr int64_t kCleanUpDurationUs = 500000; // TODO tune 0.5 sec
37 static constexpr int64_t kLogDurationUs = 5000000; // 5 secs
38
39 static constexpr size_t kMinAllocBytesForEviction = 1024*1024*15;
40 static constexpr size_t kMinBufferCountForEviction = 25;
41 }
42
43 // Buffer structure in bufferpool process
44 struct InternalBuffer {
45 BufferId mId;
46 size_t mOwnerCount;
47 size_t mTransactionCount;
48 const std::shared_ptr<BufferPoolAllocation> mAllocation;
49 const size_t mAllocSize;
50 const std::vector<uint8_t> mConfig;
51 bool mInvalidated;
52
InternalBufferandroid::hardware::media::bufferpool::V2_0::implementation::InternalBuffer53 InternalBuffer(
54 BufferId id,
55 const std::shared_ptr<BufferPoolAllocation> &alloc,
56 const size_t allocSize,
57 const std::vector<uint8_t> &allocConfig)
58 : mId(id), mOwnerCount(0), mTransactionCount(0),
59 mAllocation(alloc), mAllocSize(allocSize), mConfig(allocConfig),
60 mInvalidated(false) {}
61
handleandroid::hardware::media::bufferpool::V2_0::implementation::InternalBuffer62 const native_handle_t *handle() {
63 return mAllocation->handle();
64 }
65
invalidateandroid::hardware::media::bufferpool::V2_0::implementation::InternalBuffer66 void invalidate() {
67 mInvalidated = true;
68 }
69 };
70
71 struct TransactionStatus {
72 TransactionId mId;
73 BufferId mBufferId;
74 ConnectionId mSender;
75 ConnectionId mReceiver;
76 BufferStatus mStatus;
77 int64_t mTimestampUs;
78 bool mSenderValidated;
79
TransactionStatusandroid::hardware::media::bufferpool::V2_0::implementation::TransactionStatus80 TransactionStatus(const BufferStatusMessage &message, int64_t timestampUs) {
81 mId = message.transactionId;
82 mBufferId = message.bufferId;
83 mStatus = message.newStatus;
84 mTimestampUs = timestampUs;
85 if (mStatus == BufferStatus::TRANSFER_TO) {
86 mSender = message.connectionId;
87 mReceiver = message.targetConnectionId;
88 mSenderValidated = true;
89 } else {
90 mSender = -1LL;
91 mReceiver = message.connectionId;
92 mSenderValidated = false;
93 }
94 }
95 };
96
97 // Helper template methods for handling map of set.
98 template<class T, class U>
insert(std::map<T,std::set<U>> * mapOfSet,T key,U value)99 bool insert(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
100 auto iter = mapOfSet->find(key);
101 if (iter == mapOfSet->end()) {
102 std::set<U> valueSet{value};
103 mapOfSet->insert(std::make_pair(key, valueSet));
104 return true;
105 } else if (iter->second.find(value) == iter->second.end()) {
106 iter->second.insert(value);
107 return true;
108 }
109 return false;
110 }
111
112 template<class T, class U>
erase(std::map<T,std::set<U>> * mapOfSet,T key,U value)113 bool erase(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
114 bool ret = false;
115 auto iter = mapOfSet->find(key);
116 if (iter != mapOfSet->end()) {
117 if (iter->second.erase(value) > 0) {
118 ret = true;
119 }
120 if (iter->second.size() == 0) {
121 mapOfSet->erase(iter);
122 }
123 }
124 return ret;
125 }
126
127 template<class T, class U>
contains(std::map<T,std::set<U>> * mapOfSet,T key,U value)128 bool contains(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
129 auto iter = mapOfSet->find(key);
130 if (iter != mapOfSet->end()) {
131 auto setIter = iter->second.find(value);
132 return setIter != iter->second.end();
133 }
134 return false;
135 }
136
137 int32_t Accessor::Impl::sPid = getpid();
138 uint32_t Accessor::Impl::sSeqId = time(nullptr);
139
Impl(const std::shared_ptr<BufferPoolAllocator> & allocator)140 Accessor::Impl::Impl(
141 const std::shared_ptr<BufferPoolAllocator> &allocator)
142 : mAllocator(allocator) {}
143
~Impl()144 Accessor::Impl::~Impl() {
145 }
146
connect(const sp<Accessor> & accessor,const sp<IObserver> & observer,sp<Connection> * connection,ConnectionId * pConnectionId,uint32_t * pMsgId,const StatusDescriptor ** statusDescPtr,const InvalidationDescriptor ** invDescPtr)147 ResultStatus Accessor::Impl::connect(
148 const sp<Accessor> &accessor, const sp<IObserver> &observer,
149 sp<Connection> *connection,
150 ConnectionId *pConnectionId,
151 uint32_t *pMsgId,
152 const StatusDescriptor** statusDescPtr,
153 const InvalidationDescriptor** invDescPtr) {
154 sp<Connection> newConnection = new Connection();
155 ResultStatus status = ResultStatus::CRITICAL_ERROR;
156 {
157 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
158 if (newConnection) {
159 ConnectionId id = (int64_t)sPid << 32 | sSeqId;
160 status = mBufferPool.mObserver.open(id, statusDescPtr);
161 if (status == ResultStatus::OK) {
162 newConnection->initialize(accessor, id);
163 *connection = newConnection;
164 *pConnectionId = id;
165 *pMsgId = mBufferPool.mInvalidation.mInvalidationId;
166 mBufferPool.mConnectionIds.insert(id);
167 mBufferPool.mInvalidationChannel.getDesc(invDescPtr);
168 mBufferPool.mInvalidation.onConnect(id, observer);
169 ++sSeqId;
170 }
171
172 }
173 mBufferPool.processStatusMessages();
174 mBufferPool.cleanUp();
175 }
176 return status;
177 }
178
close(ConnectionId connectionId)179 ResultStatus Accessor::Impl::close(ConnectionId connectionId) {
180 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
181 ALOGV("connection close %lld: %u", (long long)connectionId, mBufferPool.mInvalidation.mId);
182 mBufferPool.processStatusMessages();
183 mBufferPool.handleClose(connectionId);
184 mBufferPool.mObserver.close(connectionId);
185 mBufferPool.mInvalidation.onClose(connectionId);
186 // Since close# will be called after all works are finished, it is OK to
187 // evict unused buffers.
188 mBufferPool.cleanUp(true);
189 return ResultStatus::OK;
190 }
191
allocate(ConnectionId connectionId,const std::vector<uint8_t> & params,BufferId * bufferId,const native_handle_t ** handle)192 ResultStatus Accessor::Impl::allocate(
193 ConnectionId connectionId, const std::vector<uint8_t>& params,
194 BufferId *bufferId, const native_handle_t** handle) {
195 std::unique_lock<std::mutex> lock(mBufferPool.mMutex);
196 mBufferPool.processStatusMessages();
197 ResultStatus status = ResultStatus::OK;
198 if (!mBufferPool.getFreeBuffer(mAllocator, params, bufferId, handle)) {
199 lock.unlock();
200 std::shared_ptr<BufferPoolAllocation> alloc;
201 size_t allocSize;
202 status = mAllocator->allocate(params, &alloc, &allocSize);
203 lock.lock();
204 if (status == ResultStatus::OK) {
205 status = mBufferPool.addNewBuffer(alloc, allocSize, params, bufferId, handle);
206 }
207 ALOGV("create a buffer %d : %u %p",
208 status == ResultStatus::OK, *bufferId, *handle);
209 }
210 if (status == ResultStatus::OK) {
211 // TODO: handle ownBuffer failure
212 mBufferPool.handleOwnBuffer(connectionId, *bufferId);
213 }
214 mBufferPool.cleanUp();
215 return status;
216 }
217
fetch(ConnectionId connectionId,TransactionId transactionId,BufferId bufferId,const native_handle_t ** handle)218 ResultStatus Accessor::Impl::fetch(
219 ConnectionId connectionId, TransactionId transactionId,
220 BufferId bufferId, const native_handle_t** handle) {
221 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
222 mBufferPool.processStatusMessages();
223 auto found = mBufferPool.mTransactions.find(transactionId);
224 if (found != mBufferPool.mTransactions.end() &&
225 contains(&mBufferPool.mPendingTransactions,
226 connectionId, transactionId)) {
227 if (found->second->mSenderValidated &&
228 found->second->mStatus == BufferStatus::TRANSFER_FROM &&
229 found->second->mBufferId == bufferId) {
230 found->second->mStatus = BufferStatus::TRANSFER_FETCH;
231 auto bufferIt = mBufferPool.mBuffers.find(bufferId);
232 if (bufferIt != mBufferPool.mBuffers.end()) {
233 mBufferPool.mStats.onBufferFetched();
234 *handle = bufferIt->second->handle();
235 return ResultStatus::OK;
236 }
237 }
238 }
239 mBufferPool.cleanUp();
240 return ResultStatus::CRITICAL_ERROR;
241 }
242
cleanUp(bool clearCache)243 void Accessor::Impl::cleanUp(bool clearCache) {
244 // transaction timeout, buffer cacheing TTL handling
245 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
246 mBufferPool.processStatusMessages();
247 mBufferPool.cleanUp(clearCache);
248 }
249
flush()250 void Accessor::Impl::flush() {
251 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
252 mBufferPool.processStatusMessages();
253 mBufferPool.flush(shared_from_this());
254 }
255
handleInvalidateAck()256 void Accessor::Impl::handleInvalidateAck() {
257 std::map<ConnectionId, const sp<IObserver>> observers;
258 uint32_t invalidationId;
259 {
260 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
261 mBufferPool.processStatusMessages();
262 mBufferPool.mInvalidation.onHandleAck(&observers, &invalidationId);
263 }
264 // Do not hold lock for send invalidations
265 size_t deadClients = 0;
266 for (auto it = observers.begin(); it != observers.end(); ++it) {
267 const sp<IObserver> observer = it->second;
268 if (observer) {
269 Return<void> transResult = observer->onMessage(it->first, invalidationId);
270 if (!transResult.isOk()) {
271 ++deadClients;
272 }
273 }
274 }
275 if (deadClients > 0) {
276 ALOGD("During invalidation found %zu dead clients", deadClients);
277 }
278 }
279
isValid()280 bool Accessor::Impl::isValid() {
281 return mBufferPool.isValid();
282 }
283
BufferPool()284 Accessor::Impl::Impl::BufferPool::BufferPool()
285 : mTimestampUs(getTimestampNow()),
286 mLastCleanUpUs(mTimestampUs),
287 mLastLogUs(mTimestampUs),
288 mSeq(0),
289 mStartSeq(0) {
290 mValid = mInvalidationChannel.isValid();
291 }
292
293
294 // Statistics helper
295 template<typename T, typename S>
percentage(T base,S total)296 int percentage(T base, S total) {
297 return int(total ? 0.5 + 100. * static_cast<S>(base) / total : 0);
298 }
299
300 std::atomic<std::uint32_t> Accessor::Impl::BufferPool::Invalidation::sInvSeqId(0);
301
~BufferPool()302 Accessor::Impl::Impl::BufferPool::~BufferPool() {
303 std::lock_guard<std::mutex> lock(mMutex);
304 ALOGD("Destruction - bufferpool2 %p "
305 "cached: %zu/%zuM, %zu/%d%% in use; "
306 "allocs: %zu, %d%% recycled; "
307 "transfers: %zu, %d%% unfetced",
308 this, mStats.mBuffersCached, mStats.mSizeCached >> 20,
309 mStats.mBuffersInUse, percentage(mStats.mBuffersInUse, mStats.mBuffersCached),
310 mStats.mTotalAllocations, percentage(mStats.mTotalRecycles, mStats.mTotalAllocations),
311 mStats.mTotalTransfers,
312 percentage(mStats.mTotalTransfers - mStats.mTotalFetches, mStats.mTotalTransfers));
313 }
314
onConnect(ConnectionId conId,const sp<IObserver> & observer)315 void Accessor::Impl::BufferPool::Invalidation::onConnect(
316 ConnectionId conId, const sp<IObserver>& observer) {
317 mAcks[conId] = mInvalidationId; // starts from current invalidationId
318 mObservers.insert(std::make_pair(conId, observer));
319 }
320
onClose(ConnectionId conId)321 void Accessor::Impl::BufferPool::Invalidation::onClose(ConnectionId conId) {
322 mAcks.erase(conId);
323 mObservers.erase(conId);
324 }
325
onAck(ConnectionId conId,uint32_t msgId)326 void Accessor::Impl::BufferPool::Invalidation::onAck(
327 ConnectionId conId,
328 uint32_t msgId) {
329 auto it = mAcks.find(conId);
330 if (it == mAcks.end()) {
331 ALOGW("ACK from inconsistent connection! %lld", (long long)conId);
332 return;
333 }
334 if (isMessageLater(msgId, it->second)) {
335 mAcks[conId] = msgId;
336 }
337 }
338
onBufferInvalidated(BufferId bufferId,BufferInvalidationChannel & channel)339 void Accessor::Impl::BufferPool::Invalidation::onBufferInvalidated(
340 BufferId bufferId,
341 BufferInvalidationChannel &channel) {
342 for (auto it = mPendings.begin(); it != mPendings.end();) {
343 if (it->isInvalidated(bufferId)) {
344 uint32_t msgId = 0;
345 if (it->mNeedsAck) {
346 msgId = ++mInvalidationId;
347 if (msgId == 0) {
348 // wrap happens
349 msgId = ++mInvalidationId;
350 }
351 }
352 channel.postInvalidation(msgId, it->mFrom, it->mTo);
353 it = mPendings.erase(it);
354 continue;
355 }
356 ++it;
357 }
358 }
359
onInvalidationRequest(bool needsAck,uint32_t from,uint32_t to,size_t left,BufferInvalidationChannel & channel,const std::shared_ptr<Accessor::Impl> & impl)360 void Accessor::Impl::BufferPool::Invalidation::onInvalidationRequest(
361 bool needsAck,
362 uint32_t from,
363 uint32_t to,
364 size_t left,
365 BufferInvalidationChannel &channel,
366 const std::shared_ptr<Accessor::Impl> &impl) {
367 uint32_t msgId = 0;
368 if (needsAck) {
369 msgId = ++mInvalidationId;
370 if (msgId == 0) {
371 // wrap happens
372 msgId = ++mInvalidationId;
373 }
374 }
375 ALOGV("bufferpool2 invalidation requested and queued");
376 if (left == 0) {
377 channel.postInvalidation(msgId, from, to);
378 } else {
379 // TODO: sending hint message?
380 ALOGV("bufferpoo2 invalidation requested and pending");
381 Pending pending(needsAck, from, to, left, impl);
382 mPendings.push_back(pending);
383 }
384 sInvalidator->addAccessor(mId, impl);
385 }
386
onHandleAck(std::map<ConnectionId,const sp<IObserver>> * observers,uint32_t * invalidationId)387 void Accessor::Impl::BufferPool::Invalidation::onHandleAck(
388 std::map<ConnectionId, const sp<IObserver>> *observers,
389 uint32_t *invalidationId) {
390 if (mInvalidationId != 0) {
391 *invalidationId = mInvalidationId;
392 std::set<int> deads;
393 for (auto it = mAcks.begin(); it != mAcks.end(); ++it) {
394 if (it->second != mInvalidationId) {
395 const sp<IObserver> observer = mObservers[it->first];
396 if (observer) {
397 observers->emplace(it->first, observer);
398 ALOGV("connection %lld will call observer (%u: %u)",
399 (long long)it->first, it->second, mInvalidationId);
400 // N.B: onMessage will be called later. ignore possibility of
401 // onMessage# oneway call being lost.
402 it->second = mInvalidationId;
403 } else {
404 ALOGV("bufferpool2 observer died %lld", (long long)it->first);
405 deads.insert(it->first);
406 }
407 }
408 }
409 if (deads.size() > 0) {
410 for (auto it = deads.begin(); it != deads.end(); ++it) {
411 onClose(*it);
412 }
413 }
414 }
415 if (mPendings.size() == 0) {
416 // All invalidation Ids are synced and no more pending invalidations.
417 sInvalidator->delAccessor(mId);
418 }
419 }
420
handleOwnBuffer(ConnectionId connectionId,BufferId bufferId)421 bool Accessor::Impl::BufferPool::handleOwnBuffer(
422 ConnectionId connectionId, BufferId bufferId) {
423
424 bool added = insert(&mUsingBuffers, connectionId, bufferId);
425 if (added) {
426 auto iter = mBuffers.find(bufferId);
427 iter->second->mOwnerCount++;
428 }
429 insert(&mUsingConnections, bufferId, connectionId);
430 return added;
431 }
432
handleReleaseBuffer(ConnectionId connectionId,BufferId bufferId)433 bool Accessor::Impl::BufferPool::handleReleaseBuffer(
434 ConnectionId connectionId, BufferId bufferId) {
435 bool deleted = erase(&mUsingBuffers, connectionId, bufferId);
436 if (deleted) {
437 auto iter = mBuffers.find(bufferId);
438 iter->second->mOwnerCount--;
439 if (iter->second->mOwnerCount == 0 &&
440 iter->second->mTransactionCount == 0) {
441 if (!iter->second->mInvalidated) {
442 mStats.onBufferUnused(iter->second->mAllocSize);
443 mFreeBuffers.insert(bufferId);
444 } else {
445 mStats.onBufferUnused(iter->second->mAllocSize);
446 mStats.onBufferEvicted(iter->second->mAllocSize);
447 mBuffers.erase(iter);
448 mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
449 }
450 }
451 }
452 erase(&mUsingConnections, bufferId, connectionId);
453 ALOGV("release buffer %u : %d", bufferId, deleted);
454 return deleted;
455 }
456
handleTransferTo(const BufferStatusMessage & message)457 bool Accessor::Impl::BufferPool::handleTransferTo(const BufferStatusMessage &message) {
458 auto completed = mCompletedTransactions.find(
459 message.transactionId);
460 if (completed != mCompletedTransactions.end()) {
461 // already completed
462 mCompletedTransactions.erase(completed);
463 return true;
464 }
465 // the buffer should exist and be owned.
466 auto bufferIter = mBuffers.find(message.bufferId);
467 if (bufferIter == mBuffers.end() ||
468 !contains(&mUsingBuffers, message.connectionId, message.bufferId)) {
469 return false;
470 }
471 auto found = mTransactions.find(message.transactionId);
472 if (found != mTransactions.end()) {
473 // transfer_from was received earlier.
474 found->second->mSender = message.connectionId;
475 found->second->mSenderValidated = true;
476 return true;
477 }
478 if (mConnectionIds.find(message.targetConnectionId) == mConnectionIds.end()) {
479 // N.B: it could be fake or receive connection already closed.
480 ALOGD("bufferpool2 %p receiver connection %lld is no longer valid",
481 this, (long long)message.targetConnectionId);
482 return false;
483 }
484 mStats.onBufferSent();
485 mTransactions.insert(std::make_pair(
486 message.transactionId,
487 std::make_unique<TransactionStatus>(message, mTimestampUs)));
488 insert(&mPendingTransactions, message.targetConnectionId,
489 message.transactionId);
490 bufferIter->second->mTransactionCount++;
491 return true;
492 }
493
handleTransferFrom(const BufferStatusMessage & message)494 bool Accessor::Impl::BufferPool::handleTransferFrom(const BufferStatusMessage &message) {
495 auto found = mTransactions.find(message.transactionId);
496 if (found == mTransactions.end()) {
497 // TODO: is it feasible to check ownership here?
498 mStats.onBufferSent();
499 mTransactions.insert(std::make_pair(
500 message.transactionId,
501 std::make_unique<TransactionStatus>(message, mTimestampUs)));
502 insert(&mPendingTransactions, message.connectionId,
503 message.transactionId);
504 auto bufferIter = mBuffers.find(message.bufferId);
505 bufferIter->second->mTransactionCount++;
506 } else {
507 if (message.connectionId == found->second->mReceiver) {
508 found->second->mStatus = BufferStatus::TRANSFER_FROM;
509 }
510 }
511 return true;
512 }
513
handleTransferResult(const BufferStatusMessage & message)514 bool Accessor::Impl::BufferPool::handleTransferResult(const BufferStatusMessage &message) {
515 auto found = mTransactions.find(message.transactionId);
516 if (found != mTransactions.end()) {
517 bool deleted = erase(&mPendingTransactions, message.connectionId,
518 message.transactionId);
519 if (deleted) {
520 if (!found->second->mSenderValidated) {
521 mCompletedTransactions.insert(message.transactionId);
522 }
523 auto bufferIter = mBuffers.find(message.bufferId);
524 if (message.newStatus == BufferStatus::TRANSFER_OK) {
525 handleOwnBuffer(message.connectionId, message.bufferId);
526 }
527 bufferIter->second->mTransactionCount--;
528 if (bufferIter->second->mOwnerCount == 0
529 && bufferIter->second->mTransactionCount == 0) {
530 if (!bufferIter->second->mInvalidated) {
531 mStats.onBufferUnused(bufferIter->second->mAllocSize);
532 mFreeBuffers.insert(message.bufferId);
533 } else {
534 mStats.onBufferUnused(bufferIter->second->mAllocSize);
535 mStats.onBufferEvicted(bufferIter->second->mAllocSize);
536 mBuffers.erase(bufferIter);
537 mInvalidation.onBufferInvalidated(message.bufferId, mInvalidationChannel);
538 }
539 }
540 mTransactions.erase(found);
541 }
542 ALOGV("transfer finished %llu %u - %d", (unsigned long long)message.transactionId,
543 message.bufferId, deleted);
544 return deleted;
545 }
546 ALOGV("transfer not found %llu %u", (unsigned long long)message.transactionId,
547 message.bufferId);
548 return false;
549 }
550
processStatusMessages()551 void Accessor::Impl::BufferPool::processStatusMessages() {
552 std::vector<BufferStatusMessage> messages;
553 mObserver.getBufferStatusChanges(messages);
554 mTimestampUs = getTimestampNow();
555 for (BufferStatusMessage& message: messages) {
556 bool ret = false;
557 switch (message.newStatus) {
558 case BufferStatus::NOT_USED:
559 ret = handleReleaseBuffer(
560 message.connectionId, message.bufferId);
561 break;
562 case BufferStatus::USED:
563 // not happening
564 break;
565 case BufferStatus::TRANSFER_TO:
566 ret = handleTransferTo(message);
567 break;
568 case BufferStatus::TRANSFER_FROM:
569 ret = handleTransferFrom(message);
570 break;
571 case BufferStatus::TRANSFER_TIMEOUT:
572 // TODO
573 break;
574 case BufferStatus::TRANSFER_LOST:
575 // TODO
576 break;
577 case BufferStatus::TRANSFER_FETCH:
578 // not happening
579 break;
580 case BufferStatus::TRANSFER_OK:
581 case BufferStatus::TRANSFER_ERROR:
582 ret = handleTransferResult(message);
583 break;
584 case BufferStatus::INVALIDATION_ACK:
585 mInvalidation.onAck(message.connectionId, message.bufferId);
586 ret = true;
587 break;
588 }
589 if (ret == false) {
590 ALOGW("buffer status message processing failure - message : %d connection : %lld",
591 message.newStatus, (long long)message.connectionId);
592 }
593 }
594 messages.clear();
595 }
596
handleClose(ConnectionId connectionId)597 bool Accessor::Impl::BufferPool::handleClose(ConnectionId connectionId) {
598 // Cleaning buffers
599 auto buffers = mUsingBuffers.find(connectionId);
600 if (buffers != mUsingBuffers.end()) {
601 for (const BufferId& bufferId : buffers->second) {
602 bool deleted = erase(&mUsingConnections, bufferId, connectionId);
603 if (deleted) {
604 auto bufferIter = mBuffers.find(bufferId);
605 bufferIter->second->mOwnerCount--;
606 if (bufferIter->second->mOwnerCount == 0 &&
607 bufferIter->second->mTransactionCount == 0) {
608 // TODO: handle freebuffer insert fail
609 if (!bufferIter->second->mInvalidated) {
610 mStats.onBufferUnused(bufferIter->second->mAllocSize);
611 mFreeBuffers.insert(bufferId);
612 } else {
613 mStats.onBufferUnused(bufferIter->second->mAllocSize);
614 mStats.onBufferEvicted(bufferIter->second->mAllocSize);
615 mBuffers.erase(bufferIter);
616 mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
617 }
618 }
619 }
620 }
621 mUsingBuffers.erase(buffers);
622 }
623
624 // Cleaning transactions
625 auto pending = mPendingTransactions.find(connectionId);
626 if (pending != mPendingTransactions.end()) {
627 for (const TransactionId& transactionId : pending->second) {
628 auto iter = mTransactions.find(transactionId);
629 if (iter != mTransactions.end()) {
630 if (!iter->second->mSenderValidated) {
631 mCompletedTransactions.insert(transactionId);
632 }
633 BufferId bufferId = iter->second->mBufferId;
634 auto bufferIter = mBuffers.find(bufferId);
635 bufferIter->second->mTransactionCount--;
636 if (bufferIter->second->mOwnerCount == 0 &&
637 bufferIter->second->mTransactionCount == 0) {
638 // TODO: handle freebuffer insert fail
639 if (!bufferIter->second->mInvalidated) {
640 mStats.onBufferUnused(bufferIter->second->mAllocSize);
641 mFreeBuffers.insert(bufferId);
642 } else {
643 mStats.onBufferUnused(bufferIter->second->mAllocSize);
644 mStats.onBufferEvicted(bufferIter->second->mAllocSize);
645 mBuffers.erase(bufferIter);
646 mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
647 }
648 }
649 mTransactions.erase(iter);
650 }
651 }
652 }
653 mConnectionIds.erase(connectionId);
654 return true;
655 }
656
getFreeBuffer(const std::shared_ptr<BufferPoolAllocator> & allocator,const std::vector<uint8_t> & params,BufferId * pId,const native_handle_t ** handle)657 bool Accessor::Impl::BufferPool::getFreeBuffer(
658 const std::shared_ptr<BufferPoolAllocator> &allocator,
659 const std::vector<uint8_t> ¶ms, BufferId *pId,
660 const native_handle_t** handle) {
661 auto bufferIt = mFreeBuffers.begin();
662 for (;bufferIt != mFreeBuffers.end(); ++bufferIt) {
663 BufferId bufferId = *bufferIt;
664 if (allocator->compatible(params, mBuffers[bufferId]->mConfig)) {
665 break;
666 }
667 }
668 if (bufferIt != mFreeBuffers.end()) {
669 BufferId id = *bufferIt;
670 mFreeBuffers.erase(bufferIt);
671 mStats.onBufferRecycled(mBuffers[id]->mAllocSize);
672 *handle = mBuffers[id]->handle();
673 *pId = id;
674 ALOGV("recycle a buffer %u %p", id, *handle);
675 return true;
676 }
677 return false;
678 }
679
addNewBuffer(const std::shared_ptr<BufferPoolAllocation> & alloc,const size_t allocSize,const std::vector<uint8_t> & params,BufferId * pId,const native_handle_t ** handle)680 ResultStatus Accessor::Impl::BufferPool::addNewBuffer(
681 const std::shared_ptr<BufferPoolAllocation> &alloc,
682 const size_t allocSize,
683 const std::vector<uint8_t> ¶ms,
684 BufferId *pId,
685 const native_handle_t** handle) {
686
687 BufferId bufferId = mSeq++;
688 if (mSeq == Connection::SYNC_BUFFERID) {
689 mSeq = 0;
690 }
691 std::unique_ptr<InternalBuffer> buffer =
692 std::make_unique<InternalBuffer>(
693 bufferId, alloc, allocSize, params);
694 if (buffer) {
695 auto res = mBuffers.insert(std::make_pair(
696 bufferId, std::move(buffer)));
697 if (res.second) {
698 mStats.onBufferAllocated(allocSize);
699 *handle = alloc->handle();
700 *pId = bufferId;
701 return ResultStatus::OK;
702 }
703 }
704 return ResultStatus::NO_MEMORY;
705 }
706
cleanUp(bool clearCache)707 void Accessor::Impl::BufferPool::cleanUp(bool clearCache) {
708 if (clearCache || mTimestampUs > mLastCleanUpUs + kCleanUpDurationUs) {
709 mLastCleanUpUs = mTimestampUs;
710 if (mTimestampUs > mLastLogUs + kLogDurationUs) {
711 mLastLogUs = mTimestampUs;
712 ALOGD("bufferpool2 %p : %zu(%zu size) total buffers - "
713 "%zu(%zu size) used buffers - %zu/%zu (recycle/alloc) - "
714 "%zu/%zu (fetch/transfer)",
715 this, mStats.mBuffersCached, mStats.mSizeCached,
716 mStats.mBuffersInUse, mStats.mSizeInUse,
717 mStats.mTotalRecycles, mStats.mTotalAllocations,
718 mStats.mTotalFetches, mStats.mTotalTransfers);
719 }
720 for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
721 if (!clearCache && (mStats.mSizeCached < kMinAllocBytesForEviction
722 || mBuffers.size() < kMinBufferCountForEviction)) {
723 break;
724 }
725 auto it = mBuffers.find(*freeIt);
726 if (it != mBuffers.end() &&
727 it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
728 mStats.onBufferEvicted(it->second->mAllocSize);
729 mBuffers.erase(it);
730 freeIt = mFreeBuffers.erase(freeIt);
731 } else {
732 ++freeIt;
733 ALOGW("bufferpool2 inconsistent!");
734 }
735 }
736 }
737 }
738
invalidate(bool needsAck,BufferId from,BufferId to,const std::shared_ptr<Accessor::Impl> & impl)739 void Accessor::Impl::BufferPool::invalidate(
740 bool needsAck, BufferId from, BufferId to,
741 const std::shared_ptr<Accessor::Impl> &impl) {
742 for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
743 if (isBufferInRange(from, to, *freeIt)) {
744 auto it = mBuffers.find(*freeIt);
745 if (it != mBuffers.end() &&
746 it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
747 mStats.onBufferEvicted(it->second->mAllocSize);
748 mBuffers.erase(it);
749 freeIt = mFreeBuffers.erase(freeIt);
750 continue;
751 } else {
752 ALOGW("bufferpool2 inconsistent!");
753 }
754 }
755 ++freeIt;
756 }
757
758 size_t left = 0;
759 for (auto it = mBuffers.begin(); it != mBuffers.end(); ++it) {
760 if (isBufferInRange(from, to, it->first)) {
761 it->second->invalidate();
762 ++left;
763 }
764 }
765 mInvalidation.onInvalidationRequest(needsAck, from, to, left, mInvalidationChannel, impl);
766 }
767
flush(const std::shared_ptr<Accessor::Impl> & impl)768 void Accessor::Impl::BufferPool::flush(const std::shared_ptr<Accessor::Impl> &impl) {
769 BufferId from = mStartSeq;
770 BufferId to = mSeq;
771 mStartSeq = mSeq;
772 // TODO: needsAck params
773 ALOGV("buffer invalidation request bp:%u %u %u", mInvalidation.mId, from, to);
774 if (from != to) {
775 invalidate(true, from, to, impl);
776 }
777 }
778
invalidatorThread(std::map<uint32_t,const std::weak_ptr<Accessor::Impl>> & accessors,std::mutex & mutex,std::condition_variable & cv,bool & ready)779 void Accessor::Impl::invalidatorThread(
780 std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> &accessors,
781 std::mutex &mutex,
782 std::condition_variable &cv,
783 bool &ready) {
784 constexpr uint32_t NUM_SPIN_TO_INCREASE_SLEEP = 1024;
785 constexpr uint32_t NUM_SPIN_TO_LOG = 1024*8;
786 constexpr useconds_t MAX_SLEEP_US = 10000;
787 uint32_t numSpin = 0;
788 useconds_t sleepUs = 1;
789
790 while(true) {
791 std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> copied;
792 {
793 std::unique_lock<std::mutex> lock(mutex);
794 if (!ready) {
795 numSpin = 0;
796 sleepUs = 1;
797 cv.wait(lock);
798 }
799 copied.insert(accessors.begin(), accessors.end());
800 }
801 std::list<ConnectionId> erased;
802 for (auto it = copied.begin(); it != copied.end(); ++it) {
803 const std::shared_ptr<Accessor::Impl> impl = it->second.lock();
804 if (!impl) {
805 erased.push_back(it->first);
806 } else {
807 impl->handleInvalidateAck();
808 }
809 }
810 {
811 std::unique_lock<std::mutex> lock(mutex);
812 for (auto it = erased.begin(); it != erased.end(); ++it) {
813 accessors.erase(*it);
814 }
815 if (accessors.size() == 0) {
816 ready = false;
817 } else {
818 // TODO Use an efficient way to wait over FMQ.
819 // N.B. Since there is not a efficient way to wait over FMQ,
820 // polling over the FMQ is the current way to prevent draining
821 // CPU.
822 lock.unlock();
823 ++numSpin;
824 if (numSpin % NUM_SPIN_TO_INCREASE_SLEEP == 0 &&
825 sleepUs < MAX_SLEEP_US) {
826 sleepUs *= 10;
827 }
828 if (numSpin % NUM_SPIN_TO_LOG == 0) {
829 ALOGW("invalidator thread spinning");
830 }
831 ::usleep(sleepUs);
832 }
833 }
834 }
835 }
836
AccessorInvalidator()837 Accessor::Impl::AccessorInvalidator::AccessorInvalidator() : mReady(false) {
838 std::thread invalidator(
839 invalidatorThread,
840 std::ref(mAccessors),
841 std::ref(mMutex),
842 std::ref(mCv),
843 std::ref(mReady));
844 invalidator.detach();
845 }
846
addAccessor(uint32_t accessorId,const std::weak_ptr<Accessor::Impl> & impl)847 void Accessor::Impl::AccessorInvalidator::addAccessor(
848 uint32_t accessorId, const std::weak_ptr<Accessor::Impl> &impl) {
849 bool notify = false;
850 std::unique_lock<std::mutex> lock(mMutex);
851 if (mAccessors.find(accessorId) == mAccessors.end()) {
852 if (!mReady) {
853 mReady = true;
854 notify = true;
855 }
856 mAccessors.insert(std::make_pair(accessorId, impl));
857 ALOGV("buffer invalidation added bp:%u %d", accessorId, notify);
858 }
859 lock.unlock();
860 if (notify) {
861 mCv.notify_one();
862 }
863 }
864
delAccessor(uint32_t accessorId)865 void Accessor::Impl::AccessorInvalidator::delAccessor(uint32_t accessorId) {
866 std::lock_guard<std::mutex> lock(mMutex);
867 mAccessors.erase(accessorId);
868 ALOGV("buffer invalidation deleted bp:%u", accessorId);
869 if (mAccessors.size() == 0) {
870 mReady = false;
871 }
872 }
873
874 std::unique_ptr<Accessor::Impl::AccessorInvalidator> Accessor::Impl::sInvalidator;
875
createInvalidator()876 void Accessor::Impl::createInvalidator() {
877 if (!sInvalidator) {
878 sInvalidator = std::make_unique<Accessor::Impl::AccessorInvalidator>();
879 }
880 }
881
882 } // namespace implementation
883 } // namespace V2_0
884 } // namespace bufferpool
885 } // namespace media
886 } // namespace hardware
887 } // namespace android
888