1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #define LOG_TAG "BufferPoolManager"
17 //#define LOG_NDEBUG 0
18 
19 #include <bufferpool/ClientManager.h>
20 #include <hidl/HidlTransportSupport.h>
21 #include <sys/types.h>
22 #include <time.h>
23 #include <unistd.h>
24 #include <utils/Log.h>
25 #include "BufferPoolClient.h"
26 
27 namespace android {
28 namespace hardware {
29 namespace media {
30 namespace bufferpool {
31 namespace V1_0 {
32 namespace implementation {
33 
34 static constexpr int64_t kRegisterTimeoutUs = 500000; // 0.5 sec
35 static constexpr int64_t kCleanUpDurationUs = 1000000; // TODO: 1 sec tune
36 static constexpr int64_t kClientTimeoutUs = 5000000; // TODO: 5 secs tune
37 
38 /**
39  * The holder of the cookie of remote IClientManager.
40  * The cookie is process locally unique for each IClientManager.
41  * (The cookie is used to notify death of clients to bufferpool process.)
42  */
43 class ClientManagerCookieHolder {
44 public:
45     /**
46      * Creates a cookie holder for remote IClientManager(s).
47      */
48     ClientManagerCookieHolder();
49 
50     /**
51      * Gets a cookie for a remote IClientManager.
52      *
53      * @param manager   the specified remote IClientManager.
54      * @param added     true when the specified remote IClientManager is added
55      *                  newly, false otherwise.
56      *
57      * @return the process locally unique cookie for the specified IClientManager.
58      */
59     uint64_t getCookie(const sp<IClientManager> &manager, bool *added);
60 
61 private:
62     uint64_t mSeqId;
63     std::mutex mLock;
64     std::list<std::pair<const wp<IClientManager>, uint64_t>> mManagers;
65 };
66 
ClientManagerCookieHolder()67 ClientManagerCookieHolder::ClientManagerCookieHolder() : mSeqId(0){}
68 
getCookie(const sp<IClientManager> & manager,bool * added)69 uint64_t ClientManagerCookieHolder::getCookie(
70         const sp<IClientManager> &manager,
71         bool *added) {
72     std::lock_guard<std::mutex> lock(mLock);
73     for (auto it = mManagers.begin(); it != mManagers.end();) {
74         const sp<IClientManager> key = it->first.promote();
75         if (key) {
76             if (interfacesEqual(key, manager)) {
77                 *added = false;
78                 return it->second;
79             }
80             ++it;
81         } else {
82             it = mManagers.erase(it);
83         }
84     }
85     uint64_t id = mSeqId++;
86     *added = true;
87     mManagers.push_back(std::make_pair(manager, id));
88     return id;
89 }
90 
91 class ClientManager::Impl {
92 public:
93     Impl();
94 
95     // BnRegisterSender
96     ResultStatus registerSender(const sp<IAccessor> &accessor,
97                                 ConnectionId *pConnectionId);
98 
99     // BpRegisterSender
100     ResultStatus registerSender(const sp<IClientManager> &receiver,
101                                 ConnectionId senderId,
102                                 ConnectionId *receiverId);
103 
104     ResultStatus create(const std::shared_ptr<BufferPoolAllocator> &allocator,
105                         ConnectionId *pConnectionId);
106 
107     ResultStatus close(ConnectionId connectionId);
108 
109     ResultStatus allocate(ConnectionId connectionId,
110                           const std::vector<uint8_t> &params,
111                           native_handle_t **handle,
112                           std::shared_ptr<BufferPoolData> *buffer);
113 
114     ResultStatus receive(ConnectionId connectionId,
115                          TransactionId transactionId,
116                          BufferId bufferId,
117                          int64_t timestampUs,
118                          native_handle_t **handle,
119                          std::shared_ptr<BufferPoolData> *buffer);
120 
121     ResultStatus postSend(ConnectionId receiverId,
122                           const std::shared_ptr<BufferPoolData> &buffer,
123                           TransactionId *transactionId,
124                           int64_t *timestampUs);
125 
126     ResultStatus getAccessor(ConnectionId connectionId,
127                              sp<IAccessor> *accessor);
128 
129     void cleanUp(bool clearCache = false);
130 
131 private:
132     // In order to prevent deadlock between multiple locks,
133     // always lock ClientCache.lock before locking ActiveClients.lock.
134     struct ClientCache {
135         // This lock is held for brief duration.
136         // Blocking operation is not performed while holding the lock.
137         std::mutex mMutex;
138         std::list<std::pair<const wp<IAccessor>, const std::weak_ptr<BufferPoolClient>>>
139                 mClients;
140         std::condition_variable mConnectCv;
141         bool mConnecting;
142         int64_t mLastCleanUpUs;
143 
ClientCacheandroid::hardware::media::bufferpool::V1_0::implementation::ClientManager::Impl::ClientCache144         ClientCache() : mConnecting(false), mLastCleanUpUs(getTimestampNow()) {}
145     } mCache;
146 
147     // Active clients which can be retrieved via ConnectionId
148     struct ActiveClients {
149         // This lock is held for brief duration.
150         // Blocking operation is not performed holding the lock.
151         std::mutex mMutex;
152         std::map<ConnectionId, const std::shared_ptr<BufferPoolClient>>
153                 mClients;
154     } mActive;
155 
156     ClientManagerCookieHolder mRemoteClientCookies;
157 };
158 
Impl()159 ClientManager::Impl::Impl() {}
160 
registerSender(const sp<IAccessor> & accessor,ConnectionId * pConnectionId)161 ResultStatus ClientManager::Impl::registerSender(
162         const sp<IAccessor> &accessor, ConnectionId *pConnectionId) {
163     cleanUp();
164     int64_t timeoutUs = getTimestampNow() + kRegisterTimeoutUs;
165     do {
166         std::unique_lock<std::mutex> lock(mCache.mMutex);
167         for (auto it = mCache.mClients.begin(); it != mCache.mClients.end(); ++it) {
168             sp<IAccessor> sAccessor = it->first.promote();
169             if (sAccessor && interfacesEqual(sAccessor, accessor)) {
170                 const std::shared_ptr<BufferPoolClient> client = it->second.lock();
171                 if (client) {
172                     std::lock_guard<std::mutex> lock(mActive.mMutex);
173                     *pConnectionId = client->getConnectionId();
174                     if (mActive.mClients.find(*pConnectionId) != mActive.mClients.end()) {
175                         ALOGV("register existing connection %lld", (long long)*pConnectionId);
176                         return ResultStatus::ALREADY_EXISTS;
177                     }
178                 }
179                 mCache.mClients.erase(it);
180                 break;
181             }
182         }
183         if (!mCache.mConnecting) {
184             mCache.mConnecting = true;
185             lock.unlock();
186             ResultStatus result = ResultStatus::OK;
187             const std::shared_ptr<BufferPoolClient> client =
188                     std::make_shared<BufferPoolClient>(accessor);
189             lock.lock();
190             if (!client) {
191                 result = ResultStatus::NO_MEMORY;
192             } else if (!client->isValid()) {
193                 result = ResultStatus::CRITICAL_ERROR;
194             }
195             if (result == ResultStatus::OK) {
196                 // TODO: handle insert fail. (malloc fail)
197                 const std::weak_ptr<BufferPoolClient> wclient = client;
198                 mCache.mClients.push_back(std::make_pair(accessor, wclient));
199                 ConnectionId conId = client->getConnectionId();
200                 {
201                     std::lock_guard<std::mutex> lock(mActive.mMutex);
202                     mActive.mClients.insert(std::make_pair(conId, client));
203                 }
204                 *pConnectionId = conId;
205                 ALOGV("register new connection %lld", (long long)*pConnectionId);
206             }
207             mCache.mConnecting = false;
208             lock.unlock();
209             mCache.mConnectCv.notify_all();
210             return result;
211         }
212         mCache.mConnectCv.wait_for(
213                 lock, std::chrono::microseconds(kRegisterTimeoutUs));
214     } while (getTimestampNow() < timeoutUs);
215     // TODO: return timeout error
216     return ResultStatus::CRITICAL_ERROR;
217 }
218 
registerSender(const sp<IClientManager> & receiver,ConnectionId senderId,ConnectionId * receiverId)219 ResultStatus ClientManager::Impl::registerSender(
220         const sp<IClientManager> &receiver,
221         ConnectionId senderId,
222         ConnectionId *receiverId) {
223     sp<IAccessor> accessor;
224     bool local = false;
225     {
226         std::lock_guard<std::mutex> lock(mActive.mMutex);
227         auto it = mActive.mClients.find(senderId);
228         if (it == mActive.mClients.end()) {
229             return ResultStatus::NOT_FOUND;
230         }
231         it->second->getAccessor(&accessor);
232         local = it->second->isLocal();
233     }
234     ResultStatus rs = ResultStatus::CRITICAL_ERROR;
235     if (accessor) {
236        Return<void> transResult = receiver->registerSender(
237                 accessor,
238                 [&rs, receiverId](
239                         ResultStatus status,
240                         int64_t connectionId) {
241                     rs = status;
242                     *receiverId = connectionId;
243                 });
244         if (!transResult.isOk()) {
245             return ResultStatus::CRITICAL_ERROR;
246         } else if (local && rs == ResultStatus::OK) {
247             sp<ConnectionDeathRecipient> recipient = Accessor::getConnectionDeathRecipient();
248             if (recipient)  {
249                 ALOGV("client death recipient registered %lld", (long long)*receiverId);
250                 bool added;
251                 uint64_t cookie = mRemoteClientCookies.getCookie(receiver, &added);
252                 recipient->addCookieToConnection(cookie, *receiverId);
253                 if (added) {
254                     Return<bool> transResult = receiver->linkToDeath(recipient, cookie);
255                 }
256             }
257         }
258     }
259     return rs;
260 }
261 
create(const std::shared_ptr<BufferPoolAllocator> & allocator,ConnectionId * pConnectionId)262 ResultStatus ClientManager::Impl::create(
263         const std::shared_ptr<BufferPoolAllocator> &allocator,
264         ConnectionId *pConnectionId) {
265     const sp<Accessor> accessor = new Accessor(allocator);
266     if (!accessor || !accessor->isValid()) {
267         return ResultStatus::CRITICAL_ERROR;
268     }
269     std::shared_ptr<BufferPoolClient> client =
270             std::make_shared<BufferPoolClient>(accessor);
271     if (!client || !client->isValid()) {
272         return ResultStatus::CRITICAL_ERROR;
273     }
274     // Since a new bufferpool is created, evict memories which are used by
275     // existing bufferpools and clients.
276     cleanUp(true);
277     {
278         // TODO: handle insert fail. (malloc fail)
279         std::lock_guard<std::mutex> lock(mCache.mMutex);
280         const std::weak_ptr<BufferPoolClient> wclient = client;
281         mCache.mClients.push_back(std::make_pair(accessor, wclient));
282         ConnectionId conId = client->getConnectionId();
283         {
284             std::lock_guard<std::mutex> lock(mActive.mMutex);
285             mActive.mClients.insert(std::make_pair(conId, client));
286         }
287         *pConnectionId = conId;
288         ALOGV("create new connection %lld", (long long)*pConnectionId);
289     }
290     return ResultStatus::OK;
291 }
292 
close(ConnectionId connectionId)293 ResultStatus ClientManager::Impl::close(ConnectionId connectionId) {
294     std::lock_guard<std::mutex> lock1(mCache.mMutex);
295     std::lock_guard<std::mutex> lock2(mActive.mMutex);
296     auto it = mActive.mClients.find(connectionId);
297     if (it != mActive.mClients.end()) {
298         sp<IAccessor> accessor;
299         it->second->getAccessor(&accessor);
300         mActive.mClients.erase(connectionId);
301         for (auto cit = mCache.mClients.begin(); cit != mCache.mClients.end();) {
302             // clean up dead client caches
303             sp<IAccessor> cAccessor = cit->first.promote();
304             if (!cAccessor || (accessor && interfacesEqual(cAccessor, accessor))) {
305                 cit = mCache.mClients.erase(cit);
306             } else {
307                 cit++;
308             }
309         }
310         return ResultStatus::OK;
311     }
312     return ResultStatus::NOT_FOUND;
313 }
314 
allocate(ConnectionId connectionId,const std::vector<uint8_t> & params,native_handle_t ** handle,std::shared_ptr<BufferPoolData> * buffer)315 ResultStatus ClientManager::Impl::allocate(
316         ConnectionId connectionId, const std::vector<uint8_t> &params,
317         native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
318     std::shared_ptr<BufferPoolClient> client;
319     {
320         std::lock_guard<std::mutex> lock(mActive.mMutex);
321         auto it = mActive.mClients.find(connectionId);
322         if (it == mActive.mClients.end()) {
323             return ResultStatus::NOT_FOUND;
324         }
325         client = it->second;
326     }
327     return client->allocate(params, handle, buffer);
328 }
329 
receive(ConnectionId connectionId,TransactionId transactionId,BufferId bufferId,int64_t timestampUs,native_handle_t ** handle,std::shared_ptr<BufferPoolData> * buffer)330 ResultStatus ClientManager::Impl::receive(
331         ConnectionId connectionId, TransactionId transactionId,
332         BufferId bufferId, int64_t timestampUs,
333         native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
334     std::shared_ptr<BufferPoolClient> client;
335     {
336         std::lock_guard<std::mutex> lock(mActive.mMutex);
337         auto it = mActive.mClients.find(connectionId);
338         if (it == mActive.mClients.end()) {
339             return ResultStatus::NOT_FOUND;
340         }
341         client = it->second;
342     }
343     return client->receive(transactionId, bufferId, timestampUs, handle, buffer);
344 }
345 
postSend(ConnectionId receiverId,const std::shared_ptr<BufferPoolData> & buffer,TransactionId * transactionId,int64_t * timestampUs)346 ResultStatus ClientManager::Impl::postSend(
347         ConnectionId receiverId, const std::shared_ptr<BufferPoolData> &buffer,
348         TransactionId *transactionId, int64_t *timestampUs) {
349     ConnectionId connectionId = buffer->mConnectionId;
350     std::shared_ptr<BufferPoolClient> client;
351     {
352         std::lock_guard<std::mutex> lock(mActive.mMutex);
353         auto it = mActive.mClients.find(connectionId);
354         if (it == mActive.mClients.end()) {
355             return ResultStatus::NOT_FOUND;
356         }
357         client = it->second;
358     }
359     return client->postSend(receiverId, buffer, transactionId, timestampUs);
360 }
361 
getAccessor(ConnectionId connectionId,sp<IAccessor> * accessor)362 ResultStatus ClientManager::Impl::getAccessor(
363         ConnectionId connectionId, sp<IAccessor> *accessor) {
364     std::shared_ptr<BufferPoolClient> client;
365     {
366         std::lock_guard<std::mutex> lock(mActive.mMutex);
367         auto it = mActive.mClients.find(connectionId);
368         if (it == mActive.mClients.end()) {
369             return ResultStatus::NOT_FOUND;
370         }
371         client = it->second;
372     }
373     return client->getAccessor(accessor);
374 }
375 
cleanUp(bool clearCache)376 void ClientManager::Impl::cleanUp(bool clearCache) {
377     int64_t now = getTimestampNow();
378     int64_t lastTransactionUs;
379     std::lock_guard<std::mutex> lock1(mCache.mMutex);
380     if (clearCache || mCache.mLastCleanUpUs + kCleanUpDurationUs < now) {
381         std::lock_guard<std::mutex> lock2(mActive.mMutex);
382         int cleaned = 0;
383         for (auto it = mActive.mClients.begin(); it != mActive.mClients.end();) {
384             if (!it->second->isActive(&lastTransactionUs, clearCache)) {
385                 if (lastTransactionUs + kClientTimeoutUs < now) {
386                     sp<IAccessor> accessor;
387                     it->second->getAccessor(&accessor);
388                     it = mActive.mClients.erase(it);
389                     ++cleaned;
390                     continue;
391                 }
392             }
393             ++it;
394         }
395         for (auto cit = mCache.mClients.begin(); cit != mCache.mClients.end();) {
396             // clean up dead client caches
397             sp<IAccessor> cAccessor = cit->first.promote();
398             if (!cAccessor) {
399                 cit = mCache.mClients.erase(cit);
400             } else {
401                 ++cit;
402             }
403         }
404         ALOGV("# of cleaned connections: %d", cleaned);
405         mCache.mLastCleanUpUs = now;
406     }
407 }
408 
409 // Methods from ::android::hardware::media::bufferpool::V1_0::IClientManager follow.
registerSender(const sp<::android::hardware::media::bufferpool::V1_0::IAccessor> & bufferPool,registerSender_cb _hidl_cb)410 Return<void> ClientManager::registerSender(const sp<::android::hardware::media::bufferpool::V1_0::IAccessor>& bufferPool, registerSender_cb _hidl_cb) {
411     if (mImpl) {
412         ConnectionId connectionId = -1;
413         ResultStatus status = mImpl->registerSender(bufferPool, &connectionId);
414         _hidl_cb(status, connectionId);
415     } else {
416         _hidl_cb(ResultStatus::CRITICAL_ERROR, -1);
417     }
418     return Void();
419 }
420 
421 // Methods for local use.
422 sp<ClientManager> ClientManager::sInstance;
423 std::mutex ClientManager::sInstanceLock;
424 
getInstance()425 sp<ClientManager> ClientManager::getInstance() {
426     std::lock_guard<std::mutex> lock(sInstanceLock);
427     if (!sInstance) {
428         sInstance = new ClientManager();
429     }
430     return sInstance;
431 }
432 
ClientManager()433 ClientManager::ClientManager() : mImpl(new Impl()) {}
434 
~ClientManager()435 ClientManager::~ClientManager() {
436 }
437 
create(const std::shared_ptr<BufferPoolAllocator> & allocator,ConnectionId * pConnectionId)438 ResultStatus ClientManager::create(
439         const std::shared_ptr<BufferPoolAllocator> &allocator,
440         ConnectionId *pConnectionId) {
441     if (mImpl) {
442         return mImpl->create(allocator, pConnectionId);
443     }
444     return ResultStatus::CRITICAL_ERROR;
445 }
446 
registerSender(const sp<IClientManager> & receiver,ConnectionId senderId,ConnectionId * receiverId)447 ResultStatus ClientManager::registerSender(
448         const sp<IClientManager> &receiver,
449         ConnectionId senderId,
450         ConnectionId *receiverId) {
451     if (mImpl) {
452         return mImpl->registerSender(receiver, senderId, receiverId);
453     }
454     return ResultStatus::CRITICAL_ERROR;
455 }
456 
close(ConnectionId connectionId)457 ResultStatus ClientManager::close(ConnectionId connectionId) {
458     if (mImpl) {
459         return mImpl->close(connectionId);
460     }
461     return ResultStatus::CRITICAL_ERROR;
462 }
463 
allocate(ConnectionId connectionId,const std::vector<uint8_t> & params,native_handle_t ** handle,std::shared_ptr<BufferPoolData> * buffer)464 ResultStatus ClientManager::allocate(
465         ConnectionId connectionId, const std::vector<uint8_t> &params,
466         native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
467     if (mImpl) {
468         return mImpl->allocate(connectionId, params, handle, buffer);
469     }
470     return ResultStatus::CRITICAL_ERROR;
471 }
472 
receive(ConnectionId connectionId,TransactionId transactionId,BufferId bufferId,int64_t timestampUs,native_handle_t ** handle,std::shared_ptr<BufferPoolData> * buffer)473 ResultStatus ClientManager::receive(
474         ConnectionId connectionId, TransactionId transactionId,
475         BufferId bufferId, int64_t timestampUs,
476         native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
477     if (mImpl) {
478         return mImpl->receive(connectionId, transactionId, bufferId,
479                               timestampUs, handle, buffer);
480     }
481     return ResultStatus::CRITICAL_ERROR;
482 }
483 
postSend(ConnectionId receiverId,const std::shared_ptr<BufferPoolData> & buffer,TransactionId * transactionId,int64_t * timestampUs)484 ResultStatus ClientManager::postSend(
485         ConnectionId receiverId, const std::shared_ptr<BufferPoolData> &buffer,
486         TransactionId *transactionId, int64_t* timestampUs) {
487     if (mImpl && buffer) {
488         return mImpl->postSend(receiverId, buffer, transactionId, timestampUs);
489     }
490     return ResultStatus::CRITICAL_ERROR;
491 }
492 
cleanUp()493 void ClientManager::cleanUp() {
494     if (mImpl) {
495         mImpl->cleanUp(true);
496     }
497 }
498 
499 }  // namespace implementation
500 }  // namespace V1_0
501 }  // namespace bufferpool
502 }  // namespace media
503 }  // namespace hardware
504 }  // namespace android
505