1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define LOG_TAG "BufferPoolAccessor"
18 //#define LOG_NDEBUG 0
19 
20 #include <sys/types.h>
21 #include <time.h>
22 #include <unistd.h>
23 #include <utils/Log.h>
24 #include <thread>
25 #include "AccessorImpl.h"
26 #include "Connection.h"
27 
28 namespace android {
29 namespace hardware {
30 namespace media {
31 namespace bufferpool {
32 namespace V2_0 {
33 namespace implementation {
34 
35 namespace {
36     static constexpr int64_t kCleanUpDurationUs = 500000; // TODO tune 0.5 sec
37     static constexpr int64_t kLogDurationUs = 5000000; // 5 secs
38 
39     static constexpr size_t kMinAllocBytesForEviction = 1024*1024*15;
40     static constexpr size_t kMinBufferCountForEviction = 25;
41 }
42 
43 // Buffer structure in bufferpool process
44 struct InternalBuffer {
45     BufferId mId;
46     size_t mOwnerCount;
47     size_t mTransactionCount;
48     const std::shared_ptr<BufferPoolAllocation> mAllocation;
49     const size_t mAllocSize;
50     const std::vector<uint8_t> mConfig;
51     bool mInvalidated;
52 
InternalBufferandroid::hardware::media::bufferpool::V2_0::implementation::InternalBuffer53     InternalBuffer(
54             BufferId id,
55             const std::shared_ptr<BufferPoolAllocation> &alloc,
56             const size_t allocSize,
57             const std::vector<uint8_t> &allocConfig)
58             : mId(id), mOwnerCount(0), mTransactionCount(0),
59             mAllocation(alloc), mAllocSize(allocSize), mConfig(allocConfig),
60             mInvalidated(false) {}
61 
handleandroid::hardware::media::bufferpool::V2_0::implementation::InternalBuffer62     const native_handle_t *handle() {
63         return mAllocation->handle();
64     }
65 
invalidateandroid::hardware::media::bufferpool::V2_0::implementation::InternalBuffer66     void invalidate() {
67         mInvalidated = true;
68     }
69 };
70 
71 struct TransactionStatus {
72     TransactionId mId;
73     BufferId mBufferId;
74     ConnectionId mSender;
75     ConnectionId mReceiver;
76     BufferStatus mStatus;
77     int64_t mTimestampUs;
78     bool mSenderValidated;
79 
TransactionStatusandroid::hardware::media::bufferpool::V2_0::implementation::TransactionStatus80     TransactionStatus(const BufferStatusMessage &message, int64_t timestampUs) {
81         mId = message.transactionId;
82         mBufferId = message.bufferId;
83         mStatus = message.newStatus;
84         mTimestampUs = timestampUs;
85         if (mStatus == BufferStatus::TRANSFER_TO) {
86             mSender = message.connectionId;
87             mReceiver = message.targetConnectionId;
88             mSenderValidated = true;
89         } else {
90             mSender = -1LL;
91             mReceiver = message.connectionId;
92             mSenderValidated = false;
93         }
94     }
95 };
96 
97 // Helper template methods for handling map of set.
98 template<class T, class U>
insert(std::map<T,std::set<U>> * mapOfSet,T key,U value)99 bool insert(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
100     auto iter = mapOfSet->find(key);
101     if (iter == mapOfSet->end()) {
102         std::set<U> valueSet{value};
103         mapOfSet->insert(std::make_pair(key, valueSet));
104         return true;
105     } else if (iter->second.find(value)  == iter->second.end()) {
106         iter->second.insert(value);
107         return true;
108     }
109     return false;
110 }
111 
112 template<class T, class U>
erase(std::map<T,std::set<U>> * mapOfSet,T key,U value)113 bool erase(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
114     bool ret = false;
115     auto iter = mapOfSet->find(key);
116     if (iter != mapOfSet->end()) {
117         if (iter->second.erase(value) > 0) {
118             ret = true;
119         }
120         if (iter->second.size() == 0) {
121             mapOfSet->erase(iter);
122         }
123     }
124     return ret;
125 }
126 
127 template<class T, class U>
contains(std::map<T,std::set<U>> * mapOfSet,T key,U value)128 bool contains(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
129     auto iter = mapOfSet->find(key);
130     if (iter != mapOfSet->end()) {
131         auto setIter = iter->second.find(value);
132         return setIter != iter->second.end();
133     }
134     return false;
135 }
136 
137 int32_t Accessor::Impl::sPid = getpid();
138 uint32_t Accessor::Impl::sSeqId = time(nullptr);
139 
Impl(const std::shared_ptr<BufferPoolAllocator> & allocator)140 Accessor::Impl::Impl(
141         const std::shared_ptr<BufferPoolAllocator> &allocator)
142         : mAllocator(allocator) {}
143 
~Impl()144 Accessor::Impl::~Impl() {
145 }
146 
connect(const sp<Accessor> & accessor,const sp<IObserver> & observer,sp<Connection> * connection,ConnectionId * pConnectionId,uint32_t * pMsgId,const StatusDescriptor ** statusDescPtr,const InvalidationDescriptor ** invDescPtr)147 ResultStatus Accessor::Impl::connect(
148         const sp<Accessor> &accessor, const sp<IObserver> &observer,
149         sp<Connection> *connection,
150         ConnectionId *pConnectionId,
151         uint32_t *pMsgId,
152         const StatusDescriptor** statusDescPtr,
153         const InvalidationDescriptor** invDescPtr) {
154     sp<Connection> newConnection = new Connection();
155     ResultStatus status = ResultStatus::CRITICAL_ERROR;
156     {
157         std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
158         if (newConnection) {
159             ConnectionId id = (int64_t)sPid << 32 | sSeqId;
160             status = mBufferPool.mObserver.open(id, statusDescPtr);
161             if (status == ResultStatus::OK) {
162                 newConnection->initialize(accessor, id);
163                 *connection = newConnection;
164                 *pConnectionId = id;
165                 *pMsgId = mBufferPool.mInvalidation.mInvalidationId;
166                 mBufferPool.mConnectionIds.insert(id);
167                 mBufferPool.mInvalidationChannel.getDesc(invDescPtr);
168                 mBufferPool.mInvalidation.onConnect(id, observer);
169                 ++sSeqId;
170             }
171 
172         }
173         mBufferPool.processStatusMessages();
174         mBufferPool.cleanUp();
175     }
176     return status;
177 }
178 
close(ConnectionId connectionId)179 ResultStatus Accessor::Impl::close(ConnectionId connectionId) {
180     std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
181     ALOGV("connection close %lld: %u", (long long)connectionId, mBufferPool.mInvalidation.mId);
182     mBufferPool.processStatusMessages();
183     mBufferPool.handleClose(connectionId);
184     mBufferPool.mObserver.close(connectionId);
185     mBufferPool.mInvalidation.onClose(connectionId);
186     // Since close# will be called after all works are finished, it is OK to
187     // evict unused buffers.
188     mBufferPool.cleanUp(true);
189     return ResultStatus::OK;
190 }
191 
allocate(ConnectionId connectionId,const std::vector<uint8_t> & params,BufferId * bufferId,const native_handle_t ** handle)192 ResultStatus Accessor::Impl::allocate(
193         ConnectionId connectionId, const std::vector<uint8_t>& params,
194         BufferId *bufferId, const native_handle_t** handle) {
195     std::unique_lock<std::mutex> lock(mBufferPool.mMutex);
196     mBufferPool.processStatusMessages();
197     ResultStatus status = ResultStatus::OK;
198     if (!mBufferPool.getFreeBuffer(mAllocator, params, bufferId, handle)) {
199         lock.unlock();
200         std::shared_ptr<BufferPoolAllocation> alloc;
201         size_t allocSize;
202         status = mAllocator->allocate(params, &alloc, &allocSize);
203         lock.lock();
204         if (status == ResultStatus::OK) {
205             status = mBufferPool.addNewBuffer(alloc, allocSize, params, bufferId, handle);
206         }
207         ALOGV("create a buffer %d : %u %p",
208               status == ResultStatus::OK, *bufferId, *handle);
209     }
210     if (status == ResultStatus::OK) {
211         // TODO: handle ownBuffer failure
212         mBufferPool.handleOwnBuffer(connectionId, *bufferId);
213     }
214     mBufferPool.cleanUp();
215     return status;
216 }
217 
fetch(ConnectionId connectionId,TransactionId transactionId,BufferId bufferId,const native_handle_t ** handle)218 ResultStatus Accessor::Impl::fetch(
219         ConnectionId connectionId, TransactionId transactionId,
220         BufferId bufferId, const native_handle_t** handle) {
221     std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
222     mBufferPool.processStatusMessages();
223     auto found = mBufferPool.mTransactions.find(transactionId);
224     if (found != mBufferPool.mTransactions.end() &&
225             contains(&mBufferPool.mPendingTransactions,
226                      connectionId, transactionId)) {
227         if (found->second->mSenderValidated &&
228                 found->second->mStatus == BufferStatus::TRANSFER_FROM &&
229                 found->second->mBufferId == bufferId) {
230             found->second->mStatus = BufferStatus::TRANSFER_FETCH;
231             auto bufferIt = mBufferPool.mBuffers.find(bufferId);
232             if (bufferIt != mBufferPool.mBuffers.end()) {
233                 mBufferPool.mStats.onBufferFetched();
234                 *handle = bufferIt->second->handle();
235                 return ResultStatus::OK;
236             }
237         }
238     }
239     mBufferPool.cleanUp();
240     return ResultStatus::CRITICAL_ERROR;
241 }
242 
cleanUp(bool clearCache)243 void Accessor::Impl::cleanUp(bool clearCache) {
244     // transaction timeout, buffer cacheing TTL handling
245     std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
246     mBufferPool.processStatusMessages();
247     mBufferPool.cleanUp(clearCache);
248 }
249 
flush()250 void Accessor::Impl::flush() {
251     std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
252     mBufferPool.processStatusMessages();
253     mBufferPool.flush(shared_from_this());
254 }
255 
handleInvalidateAck()256 void Accessor::Impl::handleInvalidateAck() {
257     std::map<ConnectionId, const sp<IObserver>> observers;
258     uint32_t invalidationId;
259     {
260         std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
261         mBufferPool.processStatusMessages();
262         mBufferPool.mInvalidation.onHandleAck(&observers, &invalidationId);
263     }
264     // Do not hold lock for send invalidations
265     size_t deadClients = 0;
266     for (auto it = observers.begin(); it != observers.end(); ++it) {
267         const sp<IObserver> observer = it->second;
268         if (observer) {
269             Return<void> transResult = observer->onMessage(it->first, invalidationId);
270             if (!transResult.isOk()) {
271                 ++deadClients;
272             }
273         }
274     }
275     if (deadClients > 0) {
276         ALOGD("During invalidation found %zu dead clients", deadClients);
277     }
278 }
279 
isValid()280 bool Accessor::Impl::isValid() {
281     return mBufferPool.isValid();
282 }
283 
BufferPool()284 Accessor::Impl::Impl::BufferPool::BufferPool()
285     : mTimestampUs(getTimestampNow()),
286       mLastCleanUpUs(mTimestampUs),
287       mLastLogUs(mTimestampUs),
288       mSeq(0),
289       mStartSeq(0) {
290     mValid = mInvalidationChannel.isValid();
291 }
292 
293 
294 // Statistics helper
295 template<typename T, typename S>
percentage(T base,S total)296 int percentage(T base, S total) {
297     return int(total ? 0.5 + 100. * static_cast<S>(base) / total : 0);
298 }
299 
300 std::atomic<std::uint32_t> Accessor::Impl::BufferPool::Invalidation::sInvSeqId(0);
301 
~BufferPool()302 Accessor::Impl::Impl::BufferPool::~BufferPool() {
303     std::lock_guard<std::mutex> lock(mMutex);
304     ALOGD("Destruction - bufferpool2 %p "
305           "cached: %zu/%zuM, %zu/%d%% in use; "
306           "allocs: %zu, %d%% recycled; "
307           "transfers: %zu, %d%% unfetced",
308           this, mStats.mBuffersCached, mStats.mSizeCached >> 20,
309           mStats.mBuffersInUse, percentage(mStats.mBuffersInUse, mStats.mBuffersCached),
310           mStats.mTotalAllocations, percentage(mStats.mTotalRecycles, mStats.mTotalAllocations),
311           mStats.mTotalTransfers,
312           percentage(mStats.mTotalTransfers - mStats.mTotalFetches, mStats.mTotalTransfers));
313 }
314 
onConnect(ConnectionId conId,const sp<IObserver> & observer)315 void Accessor::Impl::BufferPool::Invalidation::onConnect(
316         ConnectionId conId, const sp<IObserver>& observer) {
317     mAcks[conId] = mInvalidationId; // starts from current invalidationId
318     mObservers.insert(std::make_pair(conId, observer));
319 }
320 
onClose(ConnectionId conId)321 void Accessor::Impl::BufferPool::Invalidation::onClose(ConnectionId conId) {
322     mAcks.erase(conId);
323     mObservers.erase(conId);
324 }
325 
onAck(ConnectionId conId,uint32_t msgId)326 void Accessor::Impl::BufferPool::Invalidation::onAck(
327         ConnectionId conId,
328         uint32_t msgId) {
329     auto it = mAcks.find(conId);
330     if (it == mAcks.end()) {
331         ALOGW("ACK from inconsistent connection! %lld", (long long)conId);
332         return;
333     }
334     if (isMessageLater(msgId, it->second)) {
335         mAcks[conId] = msgId;
336     }
337 }
338 
onBufferInvalidated(BufferId bufferId,BufferInvalidationChannel & channel)339 void Accessor::Impl::BufferPool::Invalidation::onBufferInvalidated(
340         BufferId bufferId,
341         BufferInvalidationChannel &channel) {
342     for (auto it = mPendings.begin(); it != mPendings.end();) {
343         if (it->isInvalidated(bufferId)) {
344             uint32_t msgId = 0;
345             if (it->mNeedsAck) {
346                 msgId = ++mInvalidationId;
347                 if (msgId == 0) {
348                     // wrap happens
349                     msgId = ++mInvalidationId;
350                 }
351             }
352             channel.postInvalidation(msgId, it->mFrom, it->mTo);
353             it = mPendings.erase(it);
354             continue;
355         }
356         ++it;
357     }
358 }
359 
onInvalidationRequest(bool needsAck,uint32_t from,uint32_t to,size_t left,BufferInvalidationChannel & channel,const std::shared_ptr<Accessor::Impl> & impl)360 void Accessor::Impl::BufferPool::Invalidation::onInvalidationRequest(
361         bool needsAck,
362         uint32_t from,
363         uint32_t to,
364         size_t left,
365         BufferInvalidationChannel &channel,
366         const std::shared_ptr<Accessor::Impl> &impl) {
367         uint32_t msgId = 0;
368     if (needsAck) {
369         msgId = ++mInvalidationId;
370         if (msgId == 0) {
371             // wrap happens
372             msgId = ++mInvalidationId;
373         }
374     }
375     ALOGV("bufferpool2 invalidation requested and queued");
376     if (left == 0) {
377         channel.postInvalidation(msgId, from, to);
378     } else {
379         // TODO: sending hint message?
380         ALOGV("bufferpoo2 invalidation requested and pending");
381         Pending pending(needsAck, from, to, left, impl);
382         mPendings.push_back(pending);
383     }
384     sInvalidator->addAccessor(mId, impl);
385 }
386 
onHandleAck(std::map<ConnectionId,const sp<IObserver>> * observers,uint32_t * invalidationId)387 void Accessor::Impl::BufferPool::Invalidation::onHandleAck(
388         std::map<ConnectionId, const sp<IObserver>> *observers,
389         uint32_t *invalidationId) {
390     if (mInvalidationId != 0) {
391         *invalidationId = mInvalidationId;
392         std::set<int> deads;
393         for (auto it = mAcks.begin(); it != mAcks.end(); ++it) {
394             if (it->second != mInvalidationId) {
395                 const sp<IObserver> observer = mObservers[it->first];
396                 if (observer) {
397                     observers->emplace(it->first, observer);
398                     ALOGV("connection %lld will call observer (%u: %u)",
399                           (long long)it->first, it->second, mInvalidationId);
400                     // N.B: onMessage will be called later. ignore possibility of
401                     // onMessage# oneway call being lost.
402                     it->second = mInvalidationId;
403                 } else {
404                     ALOGV("bufferpool2 observer died %lld", (long long)it->first);
405                     deads.insert(it->first);
406                 }
407             }
408         }
409         if (deads.size() > 0) {
410             for (auto it = deads.begin(); it != deads.end(); ++it) {
411                 onClose(*it);
412             }
413         }
414     }
415     if (mPendings.size() == 0) {
416         // All invalidation Ids are synced and no more pending invalidations.
417         sInvalidator->delAccessor(mId);
418     }
419 }
420 
handleOwnBuffer(ConnectionId connectionId,BufferId bufferId)421 bool Accessor::Impl::BufferPool::handleOwnBuffer(
422         ConnectionId connectionId, BufferId bufferId) {
423 
424     bool added = insert(&mUsingBuffers, connectionId, bufferId);
425     if (added) {
426         auto iter = mBuffers.find(bufferId);
427         iter->second->mOwnerCount++;
428     }
429     insert(&mUsingConnections, bufferId, connectionId);
430     return added;
431 }
432 
handleReleaseBuffer(ConnectionId connectionId,BufferId bufferId)433 bool Accessor::Impl::BufferPool::handleReleaseBuffer(
434         ConnectionId connectionId, BufferId bufferId) {
435     bool deleted = erase(&mUsingBuffers, connectionId, bufferId);
436     if (deleted) {
437         auto iter = mBuffers.find(bufferId);
438         iter->second->mOwnerCount--;
439         if (iter->second->mOwnerCount == 0 &&
440                 iter->second->mTransactionCount == 0) {
441             if (!iter->second->mInvalidated) {
442                 mStats.onBufferUnused(iter->second->mAllocSize);
443                 mFreeBuffers.insert(bufferId);
444             } else {
445                 mStats.onBufferUnused(iter->second->mAllocSize);
446                 mStats.onBufferEvicted(iter->second->mAllocSize);
447                 mBuffers.erase(iter);
448                 mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
449             }
450         }
451     }
452     erase(&mUsingConnections, bufferId, connectionId);
453     ALOGV("release buffer %u : %d", bufferId, deleted);
454     return deleted;
455 }
456 
handleTransferTo(const BufferStatusMessage & message)457 bool Accessor::Impl::BufferPool::handleTransferTo(const BufferStatusMessage &message) {
458     auto completed = mCompletedTransactions.find(
459             message.transactionId);
460     if (completed != mCompletedTransactions.end()) {
461         // already completed
462         mCompletedTransactions.erase(completed);
463         return true;
464     }
465     // the buffer should exist and be owned.
466     auto bufferIter = mBuffers.find(message.bufferId);
467     if (bufferIter == mBuffers.end() ||
468             !contains(&mUsingBuffers, message.connectionId, message.bufferId)) {
469         return false;
470     }
471     auto found = mTransactions.find(message.transactionId);
472     if (found != mTransactions.end()) {
473         // transfer_from was received earlier.
474         found->second->mSender = message.connectionId;
475         found->second->mSenderValidated = true;
476         return true;
477     }
478     if (mConnectionIds.find(message.targetConnectionId) == mConnectionIds.end()) {
479         // N.B: it could be fake or receive connection already closed.
480         ALOGD("bufferpool2 %p receiver connection %lld is no longer valid",
481               this, (long long)message.targetConnectionId);
482         return false;
483     }
484     mStats.onBufferSent();
485     mTransactions.insert(std::make_pair(
486             message.transactionId,
487             std::make_unique<TransactionStatus>(message, mTimestampUs)));
488     insert(&mPendingTransactions, message.targetConnectionId,
489            message.transactionId);
490     bufferIter->second->mTransactionCount++;
491     return true;
492 }
493 
handleTransferFrom(const BufferStatusMessage & message)494 bool Accessor::Impl::BufferPool::handleTransferFrom(const BufferStatusMessage &message) {
495     auto found = mTransactions.find(message.transactionId);
496     if (found == mTransactions.end()) {
497         // TODO: is it feasible to check ownership here?
498         mStats.onBufferSent();
499         mTransactions.insert(std::make_pair(
500                 message.transactionId,
501                 std::make_unique<TransactionStatus>(message, mTimestampUs)));
502         insert(&mPendingTransactions, message.connectionId,
503                message.transactionId);
504         auto bufferIter = mBuffers.find(message.bufferId);
505         bufferIter->second->mTransactionCount++;
506     } else {
507         if (message.connectionId == found->second->mReceiver) {
508             found->second->mStatus = BufferStatus::TRANSFER_FROM;
509         }
510     }
511     return true;
512 }
513 
handleTransferResult(const BufferStatusMessage & message)514 bool Accessor::Impl::BufferPool::handleTransferResult(const BufferStatusMessage &message) {
515     auto found = mTransactions.find(message.transactionId);
516     if (found != mTransactions.end()) {
517         bool deleted = erase(&mPendingTransactions, message.connectionId,
518                              message.transactionId);
519         if (deleted) {
520             if (!found->second->mSenderValidated) {
521                 mCompletedTransactions.insert(message.transactionId);
522             }
523             auto bufferIter = mBuffers.find(message.bufferId);
524             if (message.newStatus == BufferStatus::TRANSFER_OK) {
525                 handleOwnBuffer(message.connectionId, message.bufferId);
526             }
527             bufferIter->second->mTransactionCount--;
528             if (bufferIter->second->mOwnerCount == 0
529                 && bufferIter->second->mTransactionCount == 0) {
530                 if (!bufferIter->second->mInvalidated) {
531                     mStats.onBufferUnused(bufferIter->second->mAllocSize);
532                     mFreeBuffers.insert(message.bufferId);
533                 } else {
534                     mStats.onBufferUnused(bufferIter->second->mAllocSize);
535                     mStats.onBufferEvicted(bufferIter->second->mAllocSize);
536                     mBuffers.erase(bufferIter);
537                     mInvalidation.onBufferInvalidated(message.bufferId, mInvalidationChannel);
538                 }
539             }
540             mTransactions.erase(found);
541         }
542         ALOGV("transfer finished %llu %u - %d", (unsigned long long)message.transactionId,
543               message.bufferId, deleted);
544         return deleted;
545     }
546     ALOGV("transfer not found %llu %u", (unsigned long long)message.transactionId,
547           message.bufferId);
548     return false;
549 }
550 
processStatusMessages()551 void Accessor::Impl::BufferPool::processStatusMessages() {
552     std::vector<BufferStatusMessage> messages;
553     mObserver.getBufferStatusChanges(messages);
554     mTimestampUs = getTimestampNow();
555     for (BufferStatusMessage& message: messages) {
556         bool ret = false;
557         switch (message.newStatus) {
558             case BufferStatus::NOT_USED:
559                 ret = handleReleaseBuffer(
560                         message.connectionId, message.bufferId);
561                 break;
562             case BufferStatus::USED:
563                 // not happening
564                 break;
565             case BufferStatus::TRANSFER_TO:
566                 ret = handleTransferTo(message);
567                 break;
568             case BufferStatus::TRANSFER_FROM:
569                 ret = handleTransferFrom(message);
570                 break;
571             case BufferStatus::TRANSFER_TIMEOUT:
572                 // TODO
573                 break;
574             case BufferStatus::TRANSFER_LOST:
575                 // TODO
576                 break;
577             case BufferStatus::TRANSFER_FETCH:
578                 // not happening
579                 break;
580             case BufferStatus::TRANSFER_OK:
581             case BufferStatus::TRANSFER_ERROR:
582                 ret = handleTransferResult(message);
583                 break;
584             case BufferStatus::INVALIDATION_ACK:
585                 mInvalidation.onAck(message.connectionId, message.bufferId);
586                 ret = true;
587                 break;
588         }
589         if (ret == false) {
590             ALOGW("buffer status message processing failure - message : %d connection : %lld",
591                   message.newStatus, (long long)message.connectionId);
592         }
593     }
594     messages.clear();
595 }
596 
handleClose(ConnectionId connectionId)597 bool Accessor::Impl::BufferPool::handleClose(ConnectionId connectionId) {
598     // Cleaning buffers
599     auto buffers = mUsingBuffers.find(connectionId);
600     if (buffers != mUsingBuffers.end()) {
601         for (const BufferId& bufferId : buffers->second) {
602             bool deleted = erase(&mUsingConnections, bufferId, connectionId);
603             if (deleted) {
604                 auto bufferIter = mBuffers.find(bufferId);
605                 bufferIter->second->mOwnerCount--;
606                 if (bufferIter->second->mOwnerCount == 0 &&
607                         bufferIter->second->mTransactionCount == 0) {
608                     // TODO: handle freebuffer insert fail
609                     if (!bufferIter->second->mInvalidated) {
610                         mStats.onBufferUnused(bufferIter->second->mAllocSize);
611                         mFreeBuffers.insert(bufferId);
612                     } else {
613                         mStats.onBufferUnused(bufferIter->second->mAllocSize);
614                         mStats.onBufferEvicted(bufferIter->second->mAllocSize);
615                         mBuffers.erase(bufferIter);
616                         mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
617                     }
618                 }
619             }
620         }
621         mUsingBuffers.erase(buffers);
622     }
623 
624     // Cleaning transactions
625     auto pending = mPendingTransactions.find(connectionId);
626     if (pending != mPendingTransactions.end()) {
627         for (const TransactionId& transactionId : pending->second) {
628             auto iter = mTransactions.find(transactionId);
629             if (iter != mTransactions.end()) {
630                 if (!iter->second->mSenderValidated) {
631                     mCompletedTransactions.insert(transactionId);
632                 }
633                 BufferId bufferId = iter->second->mBufferId;
634                 auto bufferIter = mBuffers.find(bufferId);
635                 bufferIter->second->mTransactionCount--;
636                 if (bufferIter->second->mOwnerCount == 0 &&
637                     bufferIter->second->mTransactionCount == 0) {
638                     // TODO: handle freebuffer insert fail
639                     if (!bufferIter->second->mInvalidated) {
640                         mStats.onBufferUnused(bufferIter->second->mAllocSize);
641                         mFreeBuffers.insert(bufferId);
642                     } else {
643                         mStats.onBufferUnused(bufferIter->second->mAllocSize);
644                         mStats.onBufferEvicted(bufferIter->second->mAllocSize);
645                         mBuffers.erase(bufferIter);
646                         mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
647                     }
648                 }
649                 mTransactions.erase(iter);
650             }
651         }
652     }
653     mConnectionIds.erase(connectionId);
654     return true;
655 }
656 
getFreeBuffer(const std::shared_ptr<BufferPoolAllocator> & allocator,const std::vector<uint8_t> & params,BufferId * pId,const native_handle_t ** handle)657 bool Accessor::Impl::BufferPool::getFreeBuffer(
658         const std::shared_ptr<BufferPoolAllocator> &allocator,
659         const std::vector<uint8_t> &params, BufferId *pId,
660         const native_handle_t** handle) {
661     auto bufferIt = mFreeBuffers.begin();
662     for (;bufferIt != mFreeBuffers.end(); ++bufferIt) {
663         BufferId bufferId = *bufferIt;
664         if (allocator->compatible(params, mBuffers[bufferId]->mConfig)) {
665             break;
666         }
667     }
668     if (bufferIt != mFreeBuffers.end()) {
669         BufferId id = *bufferIt;
670         mFreeBuffers.erase(bufferIt);
671         mStats.onBufferRecycled(mBuffers[id]->mAllocSize);
672         *handle = mBuffers[id]->handle();
673         *pId = id;
674         ALOGV("recycle a buffer %u %p", id, *handle);
675         return true;
676     }
677     return false;
678 }
679 
addNewBuffer(const std::shared_ptr<BufferPoolAllocation> & alloc,const size_t allocSize,const std::vector<uint8_t> & params,BufferId * pId,const native_handle_t ** handle)680 ResultStatus Accessor::Impl::BufferPool::addNewBuffer(
681         const std::shared_ptr<BufferPoolAllocation> &alloc,
682         const size_t allocSize,
683         const std::vector<uint8_t> &params,
684         BufferId *pId,
685         const native_handle_t** handle) {
686 
687     BufferId bufferId = mSeq++;
688     if (mSeq == Connection::SYNC_BUFFERID) {
689         mSeq = 0;
690     }
691     std::unique_ptr<InternalBuffer> buffer =
692             std::make_unique<InternalBuffer>(
693                     bufferId, alloc, allocSize, params);
694     if (buffer) {
695         auto res = mBuffers.insert(std::make_pair(
696                 bufferId, std::move(buffer)));
697         if (res.second) {
698             mStats.onBufferAllocated(allocSize);
699             *handle = alloc->handle();
700             *pId = bufferId;
701             return ResultStatus::OK;
702         }
703     }
704     return ResultStatus::NO_MEMORY;
705 }
706 
cleanUp(bool clearCache)707 void Accessor::Impl::BufferPool::cleanUp(bool clearCache) {
708     if (clearCache || mTimestampUs > mLastCleanUpUs + kCleanUpDurationUs) {
709         mLastCleanUpUs = mTimestampUs;
710         if (mTimestampUs > mLastLogUs + kLogDurationUs) {
711             mLastLogUs = mTimestampUs;
712             ALOGD("bufferpool2 %p : %zu(%zu size) total buffers - "
713                   "%zu(%zu size) used buffers - %zu/%zu (recycle/alloc) - "
714                   "%zu/%zu (fetch/transfer)",
715                   this, mStats.mBuffersCached, mStats.mSizeCached,
716                   mStats.mBuffersInUse, mStats.mSizeInUse,
717                   mStats.mTotalRecycles, mStats.mTotalAllocations,
718                   mStats.mTotalFetches, mStats.mTotalTransfers);
719         }
720         for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
721             if (!clearCache && (mStats.mSizeCached < kMinAllocBytesForEviction
722                     || mBuffers.size() < kMinBufferCountForEviction)) {
723                 break;
724             }
725             auto it = mBuffers.find(*freeIt);
726             if (it != mBuffers.end() &&
727                     it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
728                 mStats.onBufferEvicted(it->second->mAllocSize);
729                 mBuffers.erase(it);
730                 freeIt = mFreeBuffers.erase(freeIt);
731             } else {
732                 ++freeIt;
733                 ALOGW("bufferpool2 inconsistent!");
734             }
735         }
736     }
737 }
738 
invalidate(bool needsAck,BufferId from,BufferId to,const std::shared_ptr<Accessor::Impl> & impl)739 void Accessor::Impl::BufferPool::invalidate(
740         bool needsAck, BufferId from, BufferId to,
741         const std::shared_ptr<Accessor::Impl> &impl) {
742     for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
743         if (isBufferInRange(from, to, *freeIt)) {
744             auto it = mBuffers.find(*freeIt);
745             if (it != mBuffers.end() &&
746                 it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
747                 mStats.onBufferEvicted(it->second->mAllocSize);
748                 mBuffers.erase(it);
749                 freeIt = mFreeBuffers.erase(freeIt);
750                 continue;
751             } else {
752                 ALOGW("bufferpool2 inconsistent!");
753             }
754         }
755         ++freeIt;
756     }
757 
758     size_t left = 0;
759     for (auto it = mBuffers.begin(); it != mBuffers.end(); ++it) {
760         if (isBufferInRange(from, to, it->first)) {
761             it->second->invalidate();
762             ++left;
763         }
764     }
765     mInvalidation.onInvalidationRequest(needsAck, from, to, left, mInvalidationChannel, impl);
766 }
767 
flush(const std::shared_ptr<Accessor::Impl> & impl)768 void Accessor::Impl::BufferPool::flush(const std::shared_ptr<Accessor::Impl> &impl) {
769     BufferId from = mStartSeq;
770     BufferId to = mSeq;
771     mStartSeq = mSeq;
772     // TODO: needsAck params
773     ALOGV("buffer invalidation request bp:%u %u %u", mInvalidation.mId, from, to);
774     if (from != to) {
775         invalidate(true, from, to, impl);
776     }
777 }
778 
invalidatorThread(std::map<uint32_t,const std::weak_ptr<Accessor::Impl>> & accessors,std::mutex & mutex,std::condition_variable & cv,bool & ready)779 void Accessor::Impl::invalidatorThread(
780             std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> &accessors,
781             std::mutex &mutex,
782             std::condition_variable &cv,
783             bool &ready) {
784     constexpr uint32_t NUM_SPIN_TO_INCREASE_SLEEP = 1024;
785     constexpr uint32_t NUM_SPIN_TO_LOG = 1024*8;
786     constexpr useconds_t MAX_SLEEP_US = 10000;
787     uint32_t numSpin = 0;
788     useconds_t sleepUs = 1;
789 
790     while(true) {
791         std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> copied;
792         {
793             std::unique_lock<std::mutex> lock(mutex);
794             if (!ready) {
795                 numSpin = 0;
796                 sleepUs = 1;
797                 cv.wait(lock);
798             }
799             copied.insert(accessors.begin(), accessors.end());
800         }
801         std::list<ConnectionId> erased;
802         for (auto it = copied.begin(); it != copied.end(); ++it) {
803             const std::shared_ptr<Accessor::Impl> impl = it->second.lock();
804             if (!impl) {
805                 erased.push_back(it->first);
806             } else {
807                 impl->handleInvalidateAck();
808             }
809         }
810         {
811             std::unique_lock<std::mutex> lock(mutex);
812             for (auto it = erased.begin(); it != erased.end(); ++it) {
813                 accessors.erase(*it);
814             }
815             if (accessors.size() == 0) {
816                 ready = false;
817             } else {
818                 // TODO Use an efficient way to wait over FMQ.
819                 // N.B. Since there is not a efficient way to wait over FMQ,
820                 // polling over the FMQ is the current way to prevent draining
821                 // CPU.
822                 lock.unlock();
823                 ++numSpin;
824                 if (numSpin % NUM_SPIN_TO_INCREASE_SLEEP == 0 &&
825                     sleepUs < MAX_SLEEP_US) {
826                     sleepUs *= 10;
827                 }
828                 if (numSpin % NUM_SPIN_TO_LOG == 0) {
829                     ALOGW("invalidator thread spinning");
830                 }
831                 ::usleep(sleepUs);
832             }
833         }
834     }
835 }
836 
AccessorInvalidator()837 Accessor::Impl::AccessorInvalidator::AccessorInvalidator() : mReady(false) {
838     std::thread invalidator(
839             invalidatorThread,
840             std::ref(mAccessors),
841             std::ref(mMutex),
842             std::ref(mCv),
843             std::ref(mReady));
844     invalidator.detach();
845 }
846 
addAccessor(uint32_t accessorId,const std::weak_ptr<Accessor::Impl> & impl)847 void Accessor::Impl::AccessorInvalidator::addAccessor(
848         uint32_t accessorId, const std::weak_ptr<Accessor::Impl> &impl) {
849     bool notify = false;
850     std::unique_lock<std::mutex> lock(mMutex);
851     if (mAccessors.find(accessorId) == mAccessors.end()) {
852         if (!mReady) {
853             mReady = true;
854             notify = true;
855         }
856         mAccessors.insert(std::make_pair(accessorId, impl));
857         ALOGV("buffer invalidation added bp:%u %d", accessorId, notify);
858     }
859     lock.unlock();
860     if (notify) {
861         mCv.notify_one();
862     }
863 }
864 
delAccessor(uint32_t accessorId)865 void Accessor::Impl::AccessorInvalidator::delAccessor(uint32_t accessorId) {
866     std::lock_guard<std::mutex> lock(mMutex);
867     mAccessors.erase(accessorId);
868     ALOGV("buffer invalidation deleted bp:%u", accessorId);
869     if (mAccessors.size() == 0) {
870         mReady = false;
871     }
872 }
873 
874 std::unique_ptr<Accessor::Impl::AccessorInvalidator> Accessor::Impl::sInvalidator;
875 
createInvalidator()876 void Accessor::Impl::createInvalidator() {
877     if (!sInvalidator) {
878         sInvalidator = std::make_unique<Accessor::Impl::AccessorInvalidator>();
879     }
880 }
881 
882 }  // namespace implementation
883 }  // namespace V2_0
884 }  // namespace bufferpool
885 }  // namespace media
886 }  // namespace hardware
887 }  // namespace android
888