1 /*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #define LOG_TAG "BufferPoolManager"
17 //#define LOG_NDEBUG 0
18
19 #include <bufferpool/ClientManager.h>
20 #include <hidl/HidlTransportSupport.h>
21 #include <sys/types.h>
22 #include <time.h>
23 #include <unistd.h>
24 #include <utils/Log.h>
25 #include "BufferPoolClient.h"
26
27 namespace android {
28 namespace hardware {
29 namespace media {
30 namespace bufferpool {
31 namespace V1_0 {
32 namespace implementation {
33
34 static constexpr int64_t kRegisterTimeoutUs = 500000; // 0.5 sec
35 static constexpr int64_t kCleanUpDurationUs = 1000000; // TODO: 1 sec tune
36 static constexpr int64_t kClientTimeoutUs = 5000000; // TODO: 5 secs tune
37
38 /**
39 * The holder of the cookie of remote IClientManager.
40 * The cookie is process locally unique for each IClientManager.
41 * (The cookie is used to notify death of clients to bufferpool process.)
42 */
43 class ClientManagerCookieHolder {
44 public:
45 /**
46 * Creates a cookie holder for remote IClientManager(s).
47 */
48 ClientManagerCookieHolder();
49
50 /**
51 * Gets a cookie for a remote IClientManager.
52 *
53 * @param manager the specified remote IClientManager.
54 * @param added true when the specified remote IClientManager is added
55 * newly, false otherwise.
56 *
57 * @return the process locally unique cookie for the specified IClientManager.
58 */
59 uint64_t getCookie(const sp<IClientManager> &manager, bool *added);
60
61 private:
62 uint64_t mSeqId;
63 std::mutex mLock;
64 std::list<std::pair<const wp<IClientManager>, uint64_t>> mManagers;
65 };
66
ClientManagerCookieHolder()67 ClientManagerCookieHolder::ClientManagerCookieHolder() : mSeqId(0){}
68
getCookie(const sp<IClientManager> & manager,bool * added)69 uint64_t ClientManagerCookieHolder::getCookie(
70 const sp<IClientManager> &manager,
71 bool *added) {
72 std::lock_guard<std::mutex> lock(mLock);
73 for (auto it = mManagers.begin(); it != mManagers.end();) {
74 const sp<IClientManager> key = it->first.promote();
75 if (key) {
76 if (interfacesEqual(key, manager)) {
77 *added = false;
78 return it->second;
79 }
80 ++it;
81 } else {
82 it = mManagers.erase(it);
83 }
84 }
85 uint64_t id = mSeqId++;
86 *added = true;
87 mManagers.push_back(std::make_pair(manager, id));
88 return id;
89 }
90
91 class ClientManager::Impl {
92 public:
93 Impl();
94
95 // BnRegisterSender
96 ResultStatus registerSender(const sp<IAccessor> &accessor,
97 ConnectionId *pConnectionId);
98
99 // BpRegisterSender
100 ResultStatus registerSender(const sp<IClientManager> &receiver,
101 ConnectionId senderId,
102 ConnectionId *receiverId);
103
104 ResultStatus create(const std::shared_ptr<BufferPoolAllocator> &allocator,
105 ConnectionId *pConnectionId);
106
107 ResultStatus close(ConnectionId connectionId);
108
109 ResultStatus allocate(ConnectionId connectionId,
110 const std::vector<uint8_t> ¶ms,
111 native_handle_t **handle,
112 std::shared_ptr<BufferPoolData> *buffer);
113
114 ResultStatus receive(ConnectionId connectionId,
115 TransactionId transactionId,
116 BufferId bufferId,
117 int64_t timestampUs,
118 native_handle_t **handle,
119 std::shared_ptr<BufferPoolData> *buffer);
120
121 ResultStatus postSend(ConnectionId receiverId,
122 const std::shared_ptr<BufferPoolData> &buffer,
123 TransactionId *transactionId,
124 int64_t *timestampUs);
125
126 ResultStatus getAccessor(ConnectionId connectionId,
127 sp<IAccessor> *accessor);
128
129 void cleanUp(bool clearCache = false);
130
131 private:
132 // In order to prevent deadlock between multiple locks,
133 // always lock ClientCache.lock before locking ActiveClients.lock.
134 struct ClientCache {
135 // This lock is held for brief duration.
136 // Blocking operation is not performed while holding the lock.
137 std::mutex mMutex;
138 std::list<std::pair<const wp<IAccessor>, const std::weak_ptr<BufferPoolClient>>>
139 mClients;
140 std::condition_variable mConnectCv;
141 bool mConnecting;
142 int64_t mLastCleanUpUs;
143
ClientCacheandroid::hardware::media::bufferpool::V1_0::implementation::ClientManager::Impl::ClientCache144 ClientCache() : mConnecting(false), mLastCleanUpUs(getTimestampNow()) {}
145 } mCache;
146
147 // Active clients which can be retrieved via ConnectionId
148 struct ActiveClients {
149 // This lock is held for brief duration.
150 // Blocking operation is not performed holding the lock.
151 std::mutex mMutex;
152 std::map<ConnectionId, const std::shared_ptr<BufferPoolClient>>
153 mClients;
154 } mActive;
155
156 ClientManagerCookieHolder mRemoteClientCookies;
157 };
158
Impl()159 ClientManager::Impl::Impl() {}
160
registerSender(const sp<IAccessor> & accessor,ConnectionId * pConnectionId)161 ResultStatus ClientManager::Impl::registerSender(
162 const sp<IAccessor> &accessor, ConnectionId *pConnectionId) {
163 cleanUp();
164 int64_t timeoutUs = getTimestampNow() + kRegisterTimeoutUs;
165 do {
166 std::unique_lock<std::mutex> lock(mCache.mMutex);
167 for (auto it = mCache.mClients.begin(); it != mCache.mClients.end(); ++it) {
168 sp<IAccessor> sAccessor = it->first.promote();
169 if (sAccessor && interfacesEqual(sAccessor, accessor)) {
170 const std::shared_ptr<BufferPoolClient> client = it->second.lock();
171 if (client) {
172 std::lock_guard<std::mutex> lock(mActive.mMutex);
173 *pConnectionId = client->getConnectionId();
174 if (mActive.mClients.find(*pConnectionId) != mActive.mClients.end()) {
175 ALOGV("register existing connection %lld", (long long)*pConnectionId);
176 return ResultStatus::ALREADY_EXISTS;
177 }
178 }
179 mCache.mClients.erase(it);
180 break;
181 }
182 }
183 if (!mCache.mConnecting) {
184 mCache.mConnecting = true;
185 lock.unlock();
186 ResultStatus result = ResultStatus::OK;
187 const std::shared_ptr<BufferPoolClient> client =
188 std::make_shared<BufferPoolClient>(accessor);
189 lock.lock();
190 if (!client) {
191 result = ResultStatus::NO_MEMORY;
192 } else if (!client->isValid()) {
193 result = ResultStatus::CRITICAL_ERROR;
194 }
195 if (result == ResultStatus::OK) {
196 // TODO: handle insert fail. (malloc fail)
197 const std::weak_ptr<BufferPoolClient> wclient = client;
198 mCache.mClients.push_back(std::make_pair(accessor, wclient));
199 ConnectionId conId = client->getConnectionId();
200 {
201 std::lock_guard<std::mutex> lock(mActive.mMutex);
202 mActive.mClients.insert(std::make_pair(conId, client));
203 }
204 *pConnectionId = conId;
205 ALOGV("register new connection %lld", (long long)*pConnectionId);
206 }
207 mCache.mConnecting = false;
208 lock.unlock();
209 mCache.mConnectCv.notify_all();
210 return result;
211 }
212 mCache.mConnectCv.wait_for(
213 lock, std::chrono::microseconds(kRegisterTimeoutUs));
214 } while (getTimestampNow() < timeoutUs);
215 // TODO: return timeout error
216 return ResultStatus::CRITICAL_ERROR;
217 }
218
registerSender(const sp<IClientManager> & receiver,ConnectionId senderId,ConnectionId * receiverId)219 ResultStatus ClientManager::Impl::registerSender(
220 const sp<IClientManager> &receiver,
221 ConnectionId senderId,
222 ConnectionId *receiverId) {
223 sp<IAccessor> accessor;
224 bool local = false;
225 {
226 std::lock_guard<std::mutex> lock(mActive.mMutex);
227 auto it = mActive.mClients.find(senderId);
228 if (it == mActive.mClients.end()) {
229 return ResultStatus::NOT_FOUND;
230 }
231 it->second->getAccessor(&accessor);
232 local = it->second->isLocal();
233 }
234 ResultStatus rs = ResultStatus::CRITICAL_ERROR;
235 if (accessor) {
236 Return<void> transResult = receiver->registerSender(
237 accessor,
238 [&rs, receiverId](
239 ResultStatus status,
240 int64_t connectionId) {
241 rs = status;
242 *receiverId = connectionId;
243 });
244 if (!transResult.isOk()) {
245 return ResultStatus::CRITICAL_ERROR;
246 } else if (local && rs == ResultStatus::OK) {
247 sp<ConnectionDeathRecipient> recipient = Accessor::getConnectionDeathRecipient();
248 if (recipient) {
249 ALOGV("client death recipient registered %lld", (long long)*receiverId);
250 bool added;
251 uint64_t cookie = mRemoteClientCookies.getCookie(receiver, &added);
252 recipient->addCookieToConnection(cookie, *receiverId);
253 if (added) {
254 Return<bool> transResult = receiver->linkToDeath(recipient, cookie);
255 }
256 }
257 }
258 }
259 return rs;
260 }
261
create(const std::shared_ptr<BufferPoolAllocator> & allocator,ConnectionId * pConnectionId)262 ResultStatus ClientManager::Impl::create(
263 const std::shared_ptr<BufferPoolAllocator> &allocator,
264 ConnectionId *pConnectionId) {
265 const sp<Accessor> accessor = new Accessor(allocator);
266 if (!accessor || !accessor->isValid()) {
267 return ResultStatus::CRITICAL_ERROR;
268 }
269 std::shared_ptr<BufferPoolClient> client =
270 std::make_shared<BufferPoolClient>(accessor);
271 if (!client || !client->isValid()) {
272 return ResultStatus::CRITICAL_ERROR;
273 }
274 // Since a new bufferpool is created, evict memories which are used by
275 // existing bufferpools and clients.
276 cleanUp(true);
277 {
278 // TODO: handle insert fail. (malloc fail)
279 std::lock_guard<std::mutex> lock(mCache.mMutex);
280 const std::weak_ptr<BufferPoolClient> wclient = client;
281 mCache.mClients.push_back(std::make_pair(accessor, wclient));
282 ConnectionId conId = client->getConnectionId();
283 {
284 std::lock_guard<std::mutex> lock(mActive.mMutex);
285 mActive.mClients.insert(std::make_pair(conId, client));
286 }
287 *pConnectionId = conId;
288 ALOGV("create new connection %lld", (long long)*pConnectionId);
289 }
290 return ResultStatus::OK;
291 }
292
close(ConnectionId connectionId)293 ResultStatus ClientManager::Impl::close(ConnectionId connectionId) {
294 std::lock_guard<std::mutex> lock1(mCache.mMutex);
295 std::lock_guard<std::mutex> lock2(mActive.mMutex);
296 auto it = mActive.mClients.find(connectionId);
297 if (it != mActive.mClients.end()) {
298 sp<IAccessor> accessor;
299 it->second->getAccessor(&accessor);
300 mActive.mClients.erase(connectionId);
301 for (auto cit = mCache.mClients.begin(); cit != mCache.mClients.end();) {
302 // clean up dead client caches
303 sp<IAccessor> cAccessor = cit->first.promote();
304 if (!cAccessor || (accessor && interfacesEqual(cAccessor, accessor))) {
305 cit = mCache.mClients.erase(cit);
306 } else {
307 cit++;
308 }
309 }
310 return ResultStatus::OK;
311 }
312 return ResultStatus::NOT_FOUND;
313 }
314
allocate(ConnectionId connectionId,const std::vector<uint8_t> & params,native_handle_t ** handle,std::shared_ptr<BufferPoolData> * buffer)315 ResultStatus ClientManager::Impl::allocate(
316 ConnectionId connectionId, const std::vector<uint8_t> ¶ms,
317 native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
318 std::shared_ptr<BufferPoolClient> client;
319 {
320 std::lock_guard<std::mutex> lock(mActive.mMutex);
321 auto it = mActive.mClients.find(connectionId);
322 if (it == mActive.mClients.end()) {
323 return ResultStatus::NOT_FOUND;
324 }
325 client = it->second;
326 }
327 return client->allocate(params, handle, buffer);
328 }
329
receive(ConnectionId connectionId,TransactionId transactionId,BufferId bufferId,int64_t timestampUs,native_handle_t ** handle,std::shared_ptr<BufferPoolData> * buffer)330 ResultStatus ClientManager::Impl::receive(
331 ConnectionId connectionId, TransactionId transactionId,
332 BufferId bufferId, int64_t timestampUs,
333 native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
334 std::shared_ptr<BufferPoolClient> client;
335 {
336 std::lock_guard<std::mutex> lock(mActive.mMutex);
337 auto it = mActive.mClients.find(connectionId);
338 if (it == mActive.mClients.end()) {
339 return ResultStatus::NOT_FOUND;
340 }
341 client = it->second;
342 }
343 return client->receive(transactionId, bufferId, timestampUs, handle, buffer);
344 }
345
postSend(ConnectionId receiverId,const std::shared_ptr<BufferPoolData> & buffer,TransactionId * transactionId,int64_t * timestampUs)346 ResultStatus ClientManager::Impl::postSend(
347 ConnectionId receiverId, const std::shared_ptr<BufferPoolData> &buffer,
348 TransactionId *transactionId, int64_t *timestampUs) {
349 ConnectionId connectionId = buffer->mConnectionId;
350 std::shared_ptr<BufferPoolClient> client;
351 {
352 std::lock_guard<std::mutex> lock(mActive.mMutex);
353 auto it = mActive.mClients.find(connectionId);
354 if (it == mActive.mClients.end()) {
355 return ResultStatus::NOT_FOUND;
356 }
357 client = it->second;
358 }
359 return client->postSend(receiverId, buffer, transactionId, timestampUs);
360 }
361
getAccessor(ConnectionId connectionId,sp<IAccessor> * accessor)362 ResultStatus ClientManager::Impl::getAccessor(
363 ConnectionId connectionId, sp<IAccessor> *accessor) {
364 std::shared_ptr<BufferPoolClient> client;
365 {
366 std::lock_guard<std::mutex> lock(mActive.mMutex);
367 auto it = mActive.mClients.find(connectionId);
368 if (it == mActive.mClients.end()) {
369 return ResultStatus::NOT_FOUND;
370 }
371 client = it->second;
372 }
373 return client->getAccessor(accessor);
374 }
375
cleanUp(bool clearCache)376 void ClientManager::Impl::cleanUp(bool clearCache) {
377 int64_t now = getTimestampNow();
378 int64_t lastTransactionUs;
379 std::lock_guard<std::mutex> lock1(mCache.mMutex);
380 if (clearCache || mCache.mLastCleanUpUs + kCleanUpDurationUs < now) {
381 std::lock_guard<std::mutex> lock2(mActive.mMutex);
382 int cleaned = 0;
383 for (auto it = mActive.mClients.begin(); it != mActive.mClients.end();) {
384 if (!it->second->isActive(&lastTransactionUs, clearCache)) {
385 if (lastTransactionUs + kClientTimeoutUs < now) {
386 sp<IAccessor> accessor;
387 it->second->getAccessor(&accessor);
388 it = mActive.mClients.erase(it);
389 ++cleaned;
390 continue;
391 }
392 }
393 ++it;
394 }
395 for (auto cit = mCache.mClients.begin(); cit != mCache.mClients.end();) {
396 // clean up dead client caches
397 sp<IAccessor> cAccessor = cit->first.promote();
398 if (!cAccessor) {
399 cit = mCache.mClients.erase(cit);
400 } else {
401 ++cit;
402 }
403 }
404 ALOGV("# of cleaned connections: %d", cleaned);
405 mCache.mLastCleanUpUs = now;
406 }
407 }
408
409 // Methods from ::android::hardware::media::bufferpool::V1_0::IClientManager follow.
registerSender(const sp<::android::hardware::media::bufferpool::V1_0::IAccessor> & bufferPool,registerSender_cb _hidl_cb)410 Return<void> ClientManager::registerSender(const sp<::android::hardware::media::bufferpool::V1_0::IAccessor>& bufferPool, registerSender_cb _hidl_cb) {
411 if (mImpl) {
412 ConnectionId connectionId = -1;
413 ResultStatus status = mImpl->registerSender(bufferPool, &connectionId);
414 _hidl_cb(status, connectionId);
415 } else {
416 _hidl_cb(ResultStatus::CRITICAL_ERROR, -1);
417 }
418 return Void();
419 }
420
421 // Methods for local use.
422 sp<ClientManager> ClientManager::sInstance;
423 std::mutex ClientManager::sInstanceLock;
424
getInstance()425 sp<ClientManager> ClientManager::getInstance() {
426 std::lock_guard<std::mutex> lock(sInstanceLock);
427 if (!sInstance) {
428 sInstance = new ClientManager();
429 }
430 return sInstance;
431 }
432
ClientManager()433 ClientManager::ClientManager() : mImpl(new Impl()) {}
434
~ClientManager()435 ClientManager::~ClientManager() {
436 }
437
create(const std::shared_ptr<BufferPoolAllocator> & allocator,ConnectionId * pConnectionId)438 ResultStatus ClientManager::create(
439 const std::shared_ptr<BufferPoolAllocator> &allocator,
440 ConnectionId *pConnectionId) {
441 if (mImpl) {
442 return mImpl->create(allocator, pConnectionId);
443 }
444 return ResultStatus::CRITICAL_ERROR;
445 }
446
registerSender(const sp<IClientManager> & receiver,ConnectionId senderId,ConnectionId * receiverId)447 ResultStatus ClientManager::registerSender(
448 const sp<IClientManager> &receiver,
449 ConnectionId senderId,
450 ConnectionId *receiverId) {
451 if (mImpl) {
452 return mImpl->registerSender(receiver, senderId, receiverId);
453 }
454 return ResultStatus::CRITICAL_ERROR;
455 }
456
close(ConnectionId connectionId)457 ResultStatus ClientManager::close(ConnectionId connectionId) {
458 if (mImpl) {
459 return mImpl->close(connectionId);
460 }
461 return ResultStatus::CRITICAL_ERROR;
462 }
463
allocate(ConnectionId connectionId,const std::vector<uint8_t> & params,native_handle_t ** handle,std::shared_ptr<BufferPoolData> * buffer)464 ResultStatus ClientManager::allocate(
465 ConnectionId connectionId, const std::vector<uint8_t> ¶ms,
466 native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
467 if (mImpl) {
468 return mImpl->allocate(connectionId, params, handle, buffer);
469 }
470 return ResultStatus::CRITICAL_ERROR;
471 }
472
receive(ConnectionId connectionId,TransactionId transactionId,BufferId bufferId,int64_t timestampUs,native_handle_t ** handle,std::shared_ptr<BufferPoolData> * buffer)473 ResultStatus ClientManager::receive(
474 ConnectionId connectionId, TransactionId transactionId,
475 BufferId bufferId, int64_t timestampUs,
476 native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
477 if (mImpl) {
478 return mImpl->receive(connectionId, transactionId, bufferId,
479 timestampUs, handle, buffer);
480 }
481 return ResultStatus::CRITICAL_ERROR;
482 }
483
postSend(ConnectionId receiverId,const std::shared_ptr<BufferPoolData> & buffer,TransactionId * transactionId,int64_t * timestampUs)484 ResultStatus ClientManager::postSend(
485 ConnectionId receiverId, const std::shared_ptr<BufferPoolData> &buffer,
486 TransactionId *transactionId, int64_t* timestampUs) {
487 if (mImpl && buffer) {
488 return mImpl->postSend(receiverId, buffer, transactionId, timestampUs);
489 }
490 return ResultStatus::CRITICAL_ERROR;
491 }
492
cleanUp()493 void ClientManager::cleanUp() {
494 if (mImpl) {
495 mImpl->cleanUp(true);
496 }
497 }
498
499 } // namespace implementation
500 } // namespace V1_0
501 } // namespace bufferpool
502 } // namespace media
503 } // namespace hardware
504 } // namespace android
505