1 #include "include/private/dvr/buffer_hub_queue_client.h"
2
3 #include <inttypes.h>
4 #include <log/log.h>
5 #include <poll.h>
6 #include <sys/epoll.h>
7
8 #include <array>
9
10 #include <pdx/default_transport/client_channel.h>
11 #include <pdx/default_transport/client_channel_factory.h>
12 #include <pdx/file_handle.h>
13 #include <pdx/trace.h>
14
15 #define RETRY_EINTR(fnc_call) \
16 ([&]() -> decltype(fnc_call) { \
17 decltype(fnc_call) result; \
18 do { \
19 result = (fnc_call); \
20 } while (result == -1 && errno == EINTR); \
21 return result; \
22 })()
23
24 using android::pdx::ErrorStatus;
25 using android::pdx::LocalChannelHandle;
26 using android::pdx::LocalHandle;
27 using android::pdx::Status;
28
29 namespace android {
30 namespace dvr {
31
32 namespace {
33
Unstuff(uint64_t value)34 std::pair<int32_t, int32_t> Unstuff(uint64_t value) {
35 return {static_cast<int32_t>(value >> 32),
36 static_cast<int32_t>(value & ((1ull << 32) - 1))};
37 }
38
Stuff(int32_t a,int32_t b)39 uint64_t Stuff(int32_t a, int32_t b) {
40 const uint32_t ua = static_cast<uint32_t>(a);
41 const uint32_t ub = static_cast<uint32_t>(b);
42 return (static_cast<uint64_t>(ua) << 32) | static_cast<uint64_t>(ub);
43 }
44
45 } // anonymous namespace
46
BufferHubQueue(LocalChannelHandle channel_handle)47 BufferHubQueue::BufferHubQueue(LocalChannelHandle channel_handle)
48 : Client{pdx::default_transport::ClientChannel::Create(
49 std::move(channel_handle))} {
50 Initialize();
51 }
52
BufferHubQueue(const std::string & endpoint_path)53 BufferHubQueue::BufferHubQueue(const std::string& endpoint_path)
54 : Client{
55 pdx::default_transport::ClientChannelFactory::Create(endpoint_path)} {
56 Initialize();
57 }
58
Initialize()59 void BufferHubQueue::Initialize() {
60 int ret = epoll_fd_.Create();
61 if (ret < 0) {
62 ALOGE("BufferHubQueue::BufferHubQueue: Failed to create epoll fd: %s",
63 strerror(-ret));
64 return;
65 }
66
67 epoll_event event = {
68 .events = EPOLLIN | EPOLLET,
69 .data = {.u64 = Stuff(-1, BufferHubQueue::kEpollQueueEventIndex)}};
70 ret = epoll_fd_.Control(EPOLL_CTL_ADD, event_fd(), &event);
71 if (ret < 0) {
72 ALOGE("%s: Failed to add event fd to epoll set: %s", __FUNCTION__,
73 strerror(-ret));
74 }
75 }
76
ImportQueue()77 Status<void> BufferHubQueue::ImportQueue() {
78 auto status = InvokeRemoteMethod<BufferHubRPC::GetQueueInfo>();
79 if (!status) {
80 ALOGE("%s: Failed to import queue: %s", __FUNCTION__,
81 status.GetErrorMessage().c_str());
82 return ErrorStatus(status.error());
83 } else {
84 SetupQueue(status.get());
85 return {};
86 }
87 }
88
SetupQueue(const QueueInfo & queue_info)89 void BufferHubQueue::SetupQueue(const QueueInfo& queue_info) {
90 is_async_ = queue_info.producer_config.is_async;
91 default_width_ = queue_info.producer_config.default_width;
92 default_height_ = queue_info.producer_config.default_height;
93 default_format_ = queue_info.producer_config.default_format;
94 user_metadata_size_ = queue_info.producer_config.user_metadata_size;
95 id_ = queue_info.id;
96 }
97
CreateConsumerQueue()98 std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateConsumerQueue() {
99 if (auto status = CreateConsumerQueueHandle(/*silent*/ false))
100 return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take()));
101 else
102 return nullptr;
103 }
104
CreateSilentConsumerQueue()105 std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateSilentConsumerQueue() {
106 if (auto status = CreateConsumerQueueHandle(/*silent*/ true))
107 return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take()));
108 else
109 return nullptr;
110 }
111
CreateConsumerQueueHandle(bool silent)112 Status<LocalChannelHandle> BufferHubQueue::CreateConsumerQueueHandle(
113 bool silent) {
114 auto status = InvokeRemoteMethod<BufferHubRPC::CreateConsumerQueue>(silent);
115 if (!status) {
116 ALOGE(
117 "BufferHubQueue::CreateConsumerQueue: Failed to create consumer queue: "
118 "%s",
119 status.GetErrorMessage().c_str());
120 return ErrorStatus(status.error());
121 }
122
123 return status;
124 }
125
126 pdx::Status<ConsumerQueueParcelable>
CreateConsumerQueueParcelable(bool silent)127 BufferHubQueue::CreateConsumerQueueParcelable(bool silent) {
128 auto status = CreateConsumerQueueHandle(silent);
129 if (!status)
130 return status.error_status();
131
132 // A temporary consumer queue client to pull its channel parcelable.
133 auto consumer_queue =
134 std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take()));
135 ConsumerQueueParcelable queue_parcelable(
136 consumer_queue->GetChannel()->TakeChannelParcelable());
137
138 if (!queue_parcelable.IsValid()) {
139 ALOGE("%s: Failed to create consumer queue parcelable.", __FUNCTION__);
140 return ErrorStatus(EINVAL);
141 }
142
143 return {std::move(queue_parcelable)};
144 }
145
WaitForBuffers(int timeout)146 bool BufferHubQueue::WaitForBuffers(int timeout) {
147 ATRACE_NAME("BufferHubQueue::WaitForBuffers");
148 std::array<epoll_event, kMaxEvents> events;
149
150 // Loop at least once to check for hangups.
151 do {
152 ALOGD_IF(
153 TRACE,
154 "BufferHubQueue::WaitForBuffers: queue_id=%d count=%zu capacity=%zu",
155 id(), count(), capacity());
156
157 // If there is already a buffer then just check for hangup without waiting.
158 const int ret = epoll_fd_.Wait(events.data(), events.size(),
159 count() == 0 ? timeout : 0);
160
161 if (ret == 0) {
162 ALOGI_IF(TRACE,
163 "BufferHubQueue::WaitForBuffers: No events before timeout: "
164 "queue_id=%d",
165 id());
166 return count() != 0;
167 }
168
169 if (ret < 0 && ret != -EINTR) {
170 ALOGE("%s: Failed to wait for buffers: %s", __FUNCTION__, strerror(-ret));
171 return false;
172 }
173
174 const int num_events = ret;
175
176 // A BufferQueue's epoll fd tracks N+1 events, where there are N events,
177 // one for each buffer in the queue, and one extra event for the queue
178 // client itself.
179 for (int i = 0; i < num_events; i++) {
180 int32_t event_fd;
181 int32_t index;
182 std::tie(event_fd, index) = Unstuff(events[i].data.u64);
183
184 PDX_TRACE_FORMAT(
185 "epoll_event|queue_id=%d;num_events=%d;event_index=%d;event_fd=%d;"
186 "slot=%d|",
187 id(), num_events, i, event_fd, index);
188
189 ALOGD_IF(TRACE,
190 "BufferHubQueue::WaitForBuffers: event %d: event_fd=%d index=%d",
191 i, event_fd, index);
192
193 if (is_buffer_event_index(index)) {
194 HandleBufferEvent(static_cast<size_t>(index), event_fd,
195 events[i].events);
196 } else if (is_queue_event_index(index)) {
197 HandleQueueEvent(events[i].events);
198 } else {
199 ALOGW(
200 "BufferHubQueue::WaitForBuffers: Unknown event type event_fd=%d "
201 "index=%d",
202 event_fd, index);
203 }
204 }
205 } while (count() == 0 && capacity() > 0 && !hung_up());
206
207 return count() != 0;
208 }
209
HandleBufferEvent(size_t slot,int event_fd,int poll_events)210 Status<void> BufferHubQueue::HandleBufferEvent(size_t slot, int event_fd,
211 int poll_events) {
212 ATRACE_NAME("BufferHubQueue::HandleBufferEvent");
213 if (!buffers_[slot]) {
214 ALOGW("BufferHubQueue::HandleBufferEvent: Invalid buffer slot: %zu", slot);
215 return ErrorStatus(ENOENT);
216 }
217
218 auto status = buffers_[slot]->GetEventMask(poll_events);
219 if (!status) {
220 ALOGW("BufferHubQueue::HandleBufferEvent: Failed to get event mask: %s",
221 status.GetErrorMessage().c_str());
222 return status.error_status();
223 }
224
225 const int events = status.get();
226 PDX_TRACE_FORMAT(
227 "buffer|queue_id=%d;buffer_id=%d;slot=%zu;event_fd=%d;poll_events=%x;"
228 "events=%d|",
229 id(), buffers_[slot]->id(), slot, event_fd, poll_events, events);
230
231 if (events & EPOLLIN) {
232 return Enqueue({buffers_[slot], slot, buffers_[slot]->GetQueueIndex()});
233 } else if (events & EPOLLHUP) {
234 ALOGW(
235 "BufferHubQueue::HandleBufferEvent: Received EPOLLHUP event: slot=%zu "
236 "event_fd=%d buffer_id=%d",
237 slot, buffers_[slot]->event_fd(), buffers_[slot]->id());
238 return RemoveBuffer(slot);
239 } else {
240 ALOGW(
241 "BufferHubQueue::HandleBufferEvent: Unknown event, slot=%zu, epoll "
242 "events=%d",
243 slot, events);
244 }
245
246 return {};
247 }
248
HandleQueueEvent(int poll_event)249 Status<void> BufferHubQueue::HandleQueueEvent(int poll_event) {
250 ATRACE_NAME("BufferHubQueue::HandleQueueEvent");
251 auto status = GetEventMask(poll_event);
252 if (!status) {
253 ALOGW("BufferHubQueue::HandleQueueEvent: Failed to get event mask: %s",
254 status.GetErrorMessage().c_str());
255 return status.error_status();
256 }
257
258 const int events = status.get();
259 if (events & EPOLLIN) {
260 // Note that after buffer imports, if |count()| still returns 0, epoll
261 // wait will be tried again to acquire the newly imported buffer.
262 auto buffer_status = OnBufferAllocated();
263 if (!buffer_status) {
264 ALOGE("%s: Failed to import buffer: %s", __FUNCTION__,
265 buffer_status.GetErrorMessage().c_str());
266 }
267 } else if (events & EPOLLHUP) {
268 ALOGD_IF(TRACE, "%s: hang up event!", __FUNCTION__);
269 hung_up_ = true;
270 } else {
271 ALOGW("%s: Unknown epoll events=%x", __FUNCTION__, events);
272 }
273
274 return {};
275 }
276
AddBuffer(const std::shared_ptr<BufferHubBase> & buffer,size_t slot)277 Status<void> BufferHubQueue::AddBuffer(
278 const std::shared_ptr<BufferHubBase>& buffer, size_t slot) {
279 ALOGD_IF(TRACE, "%s: buffer_id=%d slot=%zu", __FUNCTION__, buffer->id(),
280 slot);
281
282 if (is_full()) {
283 ALOGE("%s: queue is at maximum capacity: %zu", __FUNCTION__, capacity_);
284 return ErrorStatus(E2BIG);
285 }
286
287 if (buffers_[slot]) {
288 // Replace the buffer if the slot is occupied. This could happen when the
289 // producer side replaced the slot with a newly allocated buffer. Remove the
290 // buffer before setting up with the new one.
291 auto remove_status = RemoveBuffer(slot);
292 if (!remove_status)
293 return remove_status.error_status();
294 }
295
296 for (const auto& event_source : buffer->GetEventSources()) {
297 epoll_event event = {.events = event_source.event_mask | EPOLLET,
298 .data = {.u64 = Stuff(buffer->event_fd(), slot)}};
299 const int ret =
300 epoll_fd_.Control(EPOLL_CTL_ADD, event_source.event_fd, &event);
301 if (ret < 0) {
302 ALOGE("%s: Failed to add buffer to epoll set: %s", __FUNCTION__,
303 strerror(-ret));
304 return ErrorStatus(-ret);
305 }
306 }
307
308 buffers_[slot] = buffer;
309 capacity_++;
310 return {};
311 }
312
RemoveBuffer(size_t slot)313 Status<void> BufferHubQueue::RemoveBuffer(size_t slot) {
314 ALOGD_IF(TRACE, "%s: slot=%zu", __FUNCTION__, slot);
315
316 if (buffers_[slot]) {
317 for (const auto& event_source : buffers_[slot]->GetEventSources()) {
318 const int ret =
319 epoll_fd_.Control(EPOLL_CTL_DEL, event_source.event_fd, nullptr);
320 if (ret < 0) {
321 ALOGE("%s: Failed to remove buffer from epoll set: %s", __FUNCTION__,
322 strerror(-ret));
323 return ErrorStatus(-ret);
324 }
325 }
326
327 // Trigger OnBufferRemoved callback if registered.
328 if (on_buffer_removed_)
329 on_buffer_removed_(buffers_[slot]);
330
331 buffers_[slot] = nullptr;
332 capacity_--;
333 }
334
335 return {};
336 }
337
Enqueue(Entry entry)338 Status<void> BufferHubQueue::Enqueue(Entry entry) {
339 if (!is_full()) {
340 // Find and remove the enqueued buffer from unavailable_buffers_slot if
341 // exist.
342 auto enqueued_buffer_iter = std::find_if(
343 unavailable_buffers_slot_.begin(), unavailable_buffers_slot_.end(),
344 [&entry](size_t slot) -> bool { return slot == entry.slot; });
345 if (enqueued_buffer_iter != unavailable_buffers_slot_.end()) {
346 unavailable_buffers_slot_.erase(enqueued_buffer_iter);
347 }
348
349 available_buffers_.push(std::move(entry));
350
351 // Trigger OnBufferAvailable callback if registered.
352 if (on_buffer_available_)
353 on_buffer_available_();
354
355 return {};
356 } else {
357 ALOGE("%s: Buffer queue is full!", __FUNCTION__);
358 return ErrorStatus(E2BIG);
359 }
360 }
361
Dequeue(int timeout,size_t * slot)362 Status<std::shared_ptr<BufferHubBase>> BufferHubQueue::Dequeue(int timeout,
363 size_t* slot) {
364 ALOGD_IF(TRACE, "%s: count=%zu, timeout=%d", __FUNCTION__, count(), timeout);
365
366 PDX_TRACE_FORMAT("%s|count=%zu|", __FUNCTION__, count());
367
368 if (count() == 0) {
369 if (!WaitForBuffers(timeout))
370 return ErrorStatus(ETIMEDOUT);
371 }
372
373 auto& entry = available_buffers_.top();
374 PDX_TRACE_FORMAT("buffer|buffer_id=%d;slot=%zu|", entry.buffer->id(),
375 entry.slot);
376
377 std::shared_ptr<BufferHubBase> buffer = std::move(entry.buffer);
378 *slot = entry.slot;
379
380 available_buffers_.pop();
381 unavailable_buffers_slot_.push_back(*slot);
382
383 return {std::move(buffer)};
384 }
385
SetBufferAvailableCallback(BufferAvailableCallback callback)386 void BufferHubQueue::SetBufferAvailableCallback(
387 BufferAvailableCallback callback) {
388 on_buffer_available_ = callback;
389 }
390
SetBufferRemovedCallback(BufferRemovedCallback callback)391 void BufferHubQueue::SetBufferRemovedCallback(BufferRemovedCallback callback) {
392 on_buffer_removed_ = callback;
393 }
394
FreeAllBuffers()395 pdx::Status<void> BufferHubQueue::FreeAllBuffers() {
396 // Clear all available buffers.
397 while (!available_buffers_.empty())
398 available_buffers_.pop();
399
400 pdx::Status<void> last_error; // No error.
401 // Clear all buffers this producer queue is tracking.
402 for (size_t slot = 0; slot < BufferHubQueue::kMaxQueueCapacity; slot++) {
403 if (buffers_[slot] != nullptr) {
404 auto status = RemoveBuffer(slot);
405 if (!status) {
406 ALOGE(
407 "ProducerQueue::FreeAllBuffers: Failed to remove buffer at "
408 "slot=%zu.",
409 slot);
410 last_error = status.error_status();
411 }
412 }
413 }
414
415 return last_error;
416 }
417
ProducerQueue(LocalChannelHandle handle)418 ProducerQueue::ProducerQueue(LocalChannelHandle handle)
419 : BASE(std::move(handle)) {
420 auto status = ImportQueue();
421 if (!status) {
422 ALOGE("ProducerQueue::ProducerQueue: Failed to import queue: %s",
423 status.GetErrorMessage().c_str());
424 Close(-status.error());
425 }
426 }
427
ProducerQueue(const ProducerQueueConfig & config,const UsagePolicy & usage)428 ProducerQueue::ProducerQueue(const ProducerQueueConfig& config,
429 const UsagePolicy& usage)
430 : BASE(BufferHubRPC::kClientPath) {
431 auto status =
432 InvokeRemoteMethod<BufferHubRPC::CreateProducerQueue>(config, usage);
433 if (!status) {
434 ALOGE("ProducerQueue::ProducerQueue: Failed to create producer queue: %s",
435 status.GetErrorMessage().c_str());
436 Close(-status.error());
437 return;
438 }
439
440 SetupQueue(status.get());
441 }
442
AllocateBuffers(uint32_t width,uint32_t height,uint32_t layer_count,uint32_t format,uint64_t usage,size_t buffer_count)443 Status<std::vector<size_t>> ProducerQueue::AllocateBuffers(
444 uint32_t width, uint32_t height, uint32_t layer_count, uint32_t format,
445 uint64_t usage, size_t buffer_count) {
446 if (buffer_count == 0) {
447 return {std::vector<size_t>()};
448 }
449
450 if (capacity() + buffer_count > kMaxQueueCapacity) {
451 ALOGE(
452 "ProducerQueue::AllocateBuffers: queue is at capacity: %zu, cannot "
453 "allocate %zu more buffer(s).",
454 capacity(), buffer_count);
455 return ErrorStatus(E2BIG);
456 }
457
458 Status<std::vector<std::pair<LocalChannelHandle, size_t>>> status =
459 InvokeRemoteMethod<BufferHubRPC::ProducerQueueAllocateBuffers>(
460 width, height, layer_count, format, usage, buffer_count);
461 if (!status) {
462 ALOGE("ProducerQueue::AllocateBuffers: failed to allocate buffers: %s",
463 status.GetErrorMessage().c_str());
464 return status.error_status();
465 }
466
467 auto buffer_handle_slots = status.take();
468 LOG_ALWAYS_FATAL_IF(buffer_handle_slots.size() != buffer_count,
469 "BufferHubRPC::ProducerQueueAllocateBuffers should "
470 "return %zu buffer handle(s), but returned %zu instead.",
471 buffer_count, buffer_handle_slots.size());
472
473 std::vector<size_t> buffer_slots;
474 buffer_slots.reserve(buffer_count);
475
476 // Bookkeeping for each buffer.
477 for (auto& hs : buffer_handle_slots) {
478 auto& buffer_handle = hs.first;
479 size_t buffer_slot = hs.second;
480
481 // Note that import might (though very unlikely) fail. If so, buffer_handle
482 // will be closed and included in returned buffer_slots.
483 if (AddBuffer(ProducerBuffer::Import(std::move(buffer_handle)),
484 buffer_slot)) {
485 ALOGD_IF(TRACE, "ProducerQueue::AllocateBuffers: new buffer at slot: %zu",
486 buffer_slot);
487 buffer_slots.push_back(buffer_slot);
488 }
489 }
490
491 if (buffer_slots.size() != buffer_count) {
492 // Error out if the count of imported buffer(s) is not correct.
493 ALOGE(
494 "ProducerQueue::AllocateBuffers: requested to import %zu "
495 "buffers, but actually imported %zu buffers.",
496 buffer_count, buffer_slots.size());
497 return ErrorStatus(ENOMEM);
498 }
499
500 return {std::move(buffer_slots)};
501 }
502
AllocateBuffer(uint32_t width,uint32_t height,uint32_t layer_count,uint32_t format,uint64_t usage)503 Status<size_t> ProducerQueue::AllocateBuffer(uint32_t width, uint32_t height,
504 uint32_t layer_count,
505 uint32_t format, uint64_t usage) {
506 // We only allocate one buffer at a time.
507 constexpr size_t buffer_count = 1;
508 auto status =
509 AllocateBuffers(width, height, layer_count, format, usage, buffer_count);
510 if (!status) {
511 ALOGE("ProducerQueue::AllocateBuffer: Failed to allocate buffer: %s",
512 status.GetErrorMessage().c_str());
513 return status.error_status();
514 }
515
516 return {status.get()[0]};
517 }
518
AddBuffer(const std::shared_ptr<ProducerBuffer> & buffer,size_t slot)519 Status<void> ProducerQueue::AddBuffer(
520 const std::shared_ptr<ProducerBuffer>& buffer, size_t slot) {
521 ALOGD_IF(TRACE, "ProducerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
522 id(), buffer->id(), slot);
523 // For producer buffer, we need to enqueue the newly added buffer
524 // immediately. Producer queue starts with all buffers in available state.
525 auto status = BufferHubQueue::AddBuffer(buffer, slot);
526 if (!status)
527 return status;
528
529 return BufferHubQueue::Enqueue({buffer, slot, 0ULL});
530 }
531
InsertBuffer(const std::shared_ptr<ProducerBuffer> & buffer)532 Status<size_t> ProducerQueue::InsertBuffer(
533 const std::shared_ptr<ProducerBuffer>& buffer) {
534 if (buffer == nullptr ||
535 !BufferHubDefs::isClientGained(buffer->buffer_state(),
536 buffer->client_state_mask())) {
537 ALOGE(
538 "ProducerQueue::InsertBuffer: Can only insert a buffer when it's in "
539 "gained state.");
540 return ErrorStatus(EINVAL);
541 }
542
543 auto status_or_slot =
544 InvokeRemoteMethod<BufferHubRPC::ProducerQueueInsertBuffer>(
545 buffer->cid());
546 if (!status_or_slot) {
547 ALOGE(
548 "ProducerQueue::InsertBuffer: Failed to insert producer buffer: "
549 "buffer_cid=%d, error: %s.",
550 buffer->cid(), status_or_slot.GetErrorMessage().c_str());
551 return status_or_slot.error_status();
552 }
553
554 size_t slot = status_or_slot.get();
555
556 // Note that we are calling AddBuffer() from the base class to explicitly
557 // avoid Enqueue() the ProducerBuffer.
558 auto status = BufferHubQueue::AddBuffer(buffer, slot);
559 if (!status) {
560 ALOGE("ProducerQueue::InsertBuffer: Failed to add buffer: %s.",
561 status.GetErrorMessage().c_str());
562 return status.error_status();
563 }
564 return {slot};
565 }
566
RemoveBuffer(size_t slot)567 Status<void> ProducerQueue::RemoveBuffer(size_t slot) {
568 auto status =
569 InvokeRemoteMethod<BufferHubRPC::ProducerQueueRemoveBuffer>(slot);
570 if (!status) {
571 ALOGE("%s: Failed to remove producer buffer: %s", __FUNCTION__,
572 status.GetErrorMessage().c_str());
573 return status.error_status();
574 }
575
576 return BufferHubQueue::RemoveBuffer(slot);
577 }
578
Dequeue(int timeout,size_t * slot,LocalHandle * release_fence)579 Status<std::shared_ptr<ProducerBuffer>> ProducerQueue::Dequeue(
580 int timeout, size_t* slot, LocalHandle* release_fence) {
581 DvrNativeBufferMetadata canonical_meta;
582 return Dequeue(timeout, slot, &canonical_meta, release_fence);
583 }
584
Dequeue(int timeout,size_t * slot,DvrNativeBufferMetadata * out_meta,pdx::LocalHandle * release_fence,bool gain_posted_buffer)585 pdx::Status<std::shared_ptr<ProducerBuffer>> ProducerQueue::Dequeue(
586 int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta,
587 pdx::LocalHandle* release_fence, bool gain_posted_buffer) {
588 ATRACE_NAME("ProducerQueue::Dequeue");
589 if (slot == nullptr || out_meta == nullptr || release_fence == nullptr) {
590 ALOGE("%s: Invalid parameter.", __FUNCTION__);
591 return ErrorStatus(EINVAL);
592 }
593
594 std::shared_ptr<ProducerBuffer> buffer;
595 Status<std::shared_ptr<BufferHubBase>> dequeue_status =
596 BufferHubQueue::Dequeue(timeout, slot);
597 if (dequeue_status.ok()) {
598 buffer = std::static_pointer_cast<ProducerBuffer>(dequeue_status.take());
599 } else {
600 if (gain_posted_buffer) {
601 Status<std::shared_ptr<ProducerBuffer>> dequeue_unacquired_status =
602 ProducerQueue::DequeueUnacquiredBuffer(slot);
603 if (!dequeue_unacquired_status.ok()) {
604 ALOGE("%s: DequeueUnacquiredBuffer returned error: %d", __FUNCTION__,
605 dequeue_unacquired_status.error());
606 return dequeue_unacquired_status.error_status();
607 }
608 buffer = dequeue_unacquired_status.take();
609 } else {
610 return dequeue_status.error_status();
611 }
612 }
613 const int ret =
614 buffer->GainAsync(out_meta, release_fence, gain_posted_buffer);
615 if (ret < 0 && ret != -EALREADY)
616 return ErrorStatus(-ret);
617
618 return {std::move(buffer)};
619 }
620
DequeueUnacquiredBuffer(size_t * slot)621 Status<std::shared_ptr<ProducerBuffer>> ProducerQueue::DequeueUnacquiredBuffer(
622 size_t* slot) {
623 if (unavailable_buffers_slot_.size() < 1) {
624 ALOGE(
625 "%s: Failed to dequeue un-acquired buffer. All buffer(s) are in "
626 "acquired state if exist.",
627 __FUNCTION__);
628 return ErrorStatus(ENOMEM);
629 }
630
631 // Find the first buffer that is not in acquired state from
632 // unavailable_buffers_slot_.
633 for (auto iter = unavailable_buffers_slot_.begin();
634 iter != unavailable_buffers_slot_.end(); iter++) {
635 std::shared_ptr<ProducerBuffer> buffer = ProducerQueue::GetBuffer(*iter);
636 if (buffer == nullptr) {
637 ALOGE("%s failed. Buffer slot %d is null.", __FUNCTION__,
638 static_cast<int>(*slot));
639 return ErrorStatus(EIO);
640 }
641 if (!BufferHubDefs::isAnyClientAcquired(buffer->buffer_state())) {
642 *slot = *iter;
643 unavailable_buffers_slot_.erase(iter);
644 unavailable_buffers_slot_.push_back(*slot);
645 ALOGD("%s: Producer queue dequeue unacquired buffer in slot %d",
646 __FUNCTION__, static_cast<int>(*slot));
647 return {std::move(buffer)};
648 }
649 }
650 ALOGE(
651 "%s: Failed to dequeue un-acquired buffer. No un-acquired buffer exist.",
652 __FUNCTION__);
653 return ErrorStatus(EBUSY);
654 }
655
TakeAsParcelable()656 pdx::Status<ProducerQueueParcelable> ProducerQueue::TakeAsParcelable() {
657 if (capacity() != 0) {
658 ALOGE(
659 "%s: producer queue can only be taken out as a parcelable when empty. "
660 "Current queue capacity: %zu",
661 __FUNCTION__, capacity());
662 return ErrorStatus(EINVAL);
663 }
664
665 std::unique_ptr<pdx::ClientChannel> channel = TakeChannel();
666 ProducerQueueParcelable queue_parcelable(channel->TakeChannelParcelable());
667
668 // Here the queue parcelable is returned and holds the underlying system
669 // resources backing the queue; while the original client channel of this
670 // producer queue is destroyed in place so that this client can no longer
671 // provide producer operations.
672 return {std::move(queue_parcelable)};
673 }
674
675 /*static */
Import(LocalChannelHandle handle)676 std::unique_ptr<ConsumerQueue> ConsumerQueue::Import(
677 LocalChannelHandle handle) {
678 return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(std::move(handle)));
679 }
680
ConsumerQueue(LocalChannelHandle handle)681 ConsumerQueue::ConsumerQueue(LocalChannelHandle handle)
682 : BufferHubQueue(std::move(handle)) {
683 auto status = ImportQueue();
684 if (!status) {
685 ALOGE("%s: Failed to import queue: %s", __FUNCTION__,
686 status.GetErrorMessage().c_str());
687 Close(-status.error());
688 }
689
690 auto import_status = ImportBuffers();
691 if (import_status) {
692 ALOGI("%s: Imported %zu buffers.", __FUNCTION__, import_status.get());
693 } else {
694 ALOGE("%s: Failed to import buffers: %s", __FUNCTION__,
695 import_status.GetErrorMessage().c_str());
696 }
697 }
698
ImportBuffers()699 Status<size_t> ConsumerQueue::ImportBuffers() {
700 auto status = InvokeRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>();
701 if (!status) {
702 if (status.error() == EBADR) {
703 ALOGI("%s: Queue is silent, no buffers imported.", __FUNCTION__);
704 return {0};
705 } else {
706 ALOGE("%s: Failed to import consumer buffer: %s", __FUNCTION__,
707 status.GetErrorMessage().c_str());
708 return status.error_status();
709 }
710 }
711
712 int ret;
713 Status<void> last_error;
714 size_t imported_buffers_count = 0;
715
716 auto buffer_handle_slots = status.take();
717 for (auto& buffer_handle_slot : buffer_handle_slots) {
718 ALOGD_IF(TRACE, ": buffer_handle=%d", __FUNCTION__,
719 buffer_handle_slot.first.value());
720
721 std::unique_ptr<ConsumerBuffer> consumer_buffer =
722 ConsumerBuffer::Import(std::move(buffer_handle_slot.first));
723 if (!consumer_buffer) {
724 ALOGE("%s: Failed to import buffer: slot=%zu", __FUNCTION__,
725 buffer_handle_slot.second);
726 last_error = ErrorStatus(EPIPE);
727 continue;
728 }
729
730 auto add_status =
731 AddBuffer(std::move(consumer_buffer), buffer_handle_slot.second);
732 if (!add_status) {
733 ALOGE("%s: Failed to add buffer: %s", __FUNCTION__,
734 add_status.GetErrorMessage().c_str());
735 last_error = add_status;
736 } else {
737 imported_buffers_count++;
738 }
739 }
740
741 if (imported_buffers_count > 0)
742 return {imported_buffers_count};
743 else
744 return last_error.error_status();
745 }
746
AddBuffer(const std::shared_ptr<ConsumerBuffer> & buffer,size_t slot)747 Status<void> ConsumerQueue::AddBuffer(
748 const std::shared_ptr<ConsumerBuffer>& buffer, size_t slot) {
749 ALOGD_IF(TRACE, "%s: queue_id=%d buffer_id=%d slot=%zu", __FUNCTION__, id(),
750 buffer->id(), slot);
751 return BufferHubQueue::AddBuffer(buffer, slot);
752 }
753
Dequeue(int timeout,size_t * slot,void * meta,size_t user_metadata_size,LocalHandle * acquire_fence)754 Status<std::shared_ptr<ConsumerBuffer>> ConsumerQueue::Dequeue(
755 int timeout, size_t* slot, void* meta, size_t user_metadata_size,
756 LocalHandle* acquire_fence) {
757 if (user_metadata_size != user_metadata_size_) {
758 ALOGE(
759 "%s: Metadata size (%zu) for the dequeuing buffer does not match "
760 "metadata size (%zu) for the queue.",
761 __FUNCTION__, user_metadata_size, user_metadata_size_);
762 return ErrorStatus(EINVAL);
763 }
764
765 DvrNativeBufferMetadata canonical_meta;
766 auto status = Dequeue(timeout, slot, &canonical_meta, acquire_fence);
767 if (!status)
768 return status.error_status();
769
770 if (meta && user_metadata_size) {
771 void* metadata_src =
772 reinterpret_cast<void*>(canonical_meta.user_metadata_ptr);
773 if (metadata_src) {
774 memcpy(meta, metadata_src, user_metadata_size);
775 } else {
776 ALOGW("%s: no user-defined metadata.", __FUNCTION__);
777 }
778 }
779
780 return status;
781 }
782
Dequeue(int timeout,size_t * slot,DvrNativeBufferMetadata * out_meta,pdx::LocalHandle * acquire_fence)783 Status<std::shared_ptr<ConsumerBuffer>> ConsumerQueue::Dequeue(
784 int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta,
785 pdx::LocalHandle* acquire_fence) {
786 ATRACE_NAME("ConsumerQueue::Dequeue");
787 if (slot == nullptr || out_meta == nullptr || acquire_fence == nullptr) {
788 ALOGE("%s: Invalid parameter.", __FUNCTION__);
789 return ErrorStatus(EINVAL);
790 }
791
792 auto status = BufferHubQueue::Dequeue(timeout, slot);
793 if (!status)
794 return status.error_status();
795
796 auto buffer = std::static_pointer_cast<ConsumerBuffer>(status.take());
797 const int ret = buffer->AcquireAsync(out_meta, acquire_fence);
798 if (ret < 0)
799 return ErrorStatus(-ret);
800
801 return {std::move(buffer)};
802 }
803
OnBufferAllocated()804 Status<void> ConsumerQueue::OnBufferAllocated() {
805 ALOGD_IF(TRACE, "%s: queue_id=%d", __FUNCTION__, id());
806
807 auto status = ImportBuffers();
808 if (!status) {
809 ALOGE("%s: Failed to import buffers: %s", __FUNCTION__,
810 status.GetErrorMessage().c_str());
811 return ErrorStatus(status.error());
812 } else if (status.get() == 0) {
813 ALOGW("%s: No new buffers allocated!", __FUNCTION__);
814 return ErrorStatus(ENOBUFS);
815 } else {
816 ALOGD_IF(TRACE, "%s: Imported %zu consumer buffers.", __FUNCTION__,
817 status.get());
818 return {};
819 }
820 }
821
822 } // namespace dvr
823 } // namespace android
824