1 #include "include/private/dvr/buffer_hub_queue_client.h"
2 
3 #include <inttypes.h>
4 #include <log/log.h>
5 #include <poll.h>
6 #include <sys/epoll.h>
7 
8 #include <array>
9 
10 #include <pdx/default_transport/client_channel.h>
11 #include <pdx/default_transport/client_channel_factory.h>
12 #include <pdx/file_handle.h>
13 #include <pdx/trace.h>
14 
15 #define RETRY_EINTR(fnc_call)                 \
16   ([&]() -> decltype(fnc_call) {              \
17     decltype(fnc_call) result;                \
18     do {                                      \
19       result = (fnc_call);                    \
20     } while (result == -1 && errno == EINTR); \
21     return result;                            \
22   })()
23 
24 using android::pdx::ErrorStatus;
25 using android::pdx::LocalChannelHandle;
26 using android::pdx::LocalHandle;
27 using android::pdx::Status;
28 
29 namespace android {
30 namespace dvr {
31 
32 namespace {
33 
Unstuff(uint64_t value)34 std::pair<int32_t, int32_t> Unstuff(uint64_t value) {
35   return {static_cast<int32_t>(value >> 32),
36           static_cast<int32_t>(value & ((1ull << 32) - 1))};
37 }
38 
Stuff(int32_t a,int32_t b)39 uint64_t Stuff(int32_t a, int32_t b) {
40   const uint32_t ua = static_cast<uint32_t>(a);
41   const uint32_t ub = static_cast<uint32_t>(b);
42   return (static_cast<uint64_t>(ua) << 32) | static_cast<uint64_t>(ub);
43 }
44 
45 }  // anonymous namespace
46 
BufferHubQueue(LocalChannelHandle channel_handle)47 BufferHubQueue::BufferHubQueue(LocalChannelHandle channel_handle)
48     : Client{pdx::default_transport::ClientChannel::Create(
49           std::move(channel_handle))} {
50   Initialize();
51 }
52 
BufferHubQueue(const std::string & endpoint_path)53 BufferHubQueue::BufferHubQueue(const std::string& endpoint_path)
54     : Client{
55           pdx::default_transport::ClientChannelFactory::Create(endpoint_path)} {
56   Initialize();
57 }
58 
Initialize()59 void BufferHubQueue::Initialize() {
60   int ret = epoll_fd_.Create();
61   if (ret < 0) {
62     ALOGE("BufferHubQueue::BufferHubQueue: Failed to create epoll fd: %s",
63           strerror(-ret));
64     return;
65   }
66 
67   epoll_event event = {
68       .events = EPOLLIN | EPOLLET,
69       .data = {.u64 = Stuff(-1, BufferHubQueue::kEpollQueueEventIndex)}};
70   ret = epoll_fd_.Control(EPOLL_CTL_ADD, event_fd(), &event);
71   if (ret < 0) {
72     ALOGE("%s: Failed to add event fd to epoll set: %s", __FUNCTION__,
73           strerror(-ret));
74   }
75 }
76 
ImportQueue()77 Status<void> BufferHubQueue::ImportQueue() {
78   auto status = InvokeRemoteMethod<BufferHubRPC::GetQueueInfo>();
79   if (!status) {
80     ALOGE("%s: Failed to import queue: %s", __FUNCTION__,
81           status.GetErrorMessage().c_str());
82     return ErrorStatus(status.error());
83   } else {
84     SetupQueue(status.get());
85     return {};
86   }
87 }
88 
SetupQueue(const QueueInfo & queue_info)89 void BufferHubQueue::SetupQueue(const QueueInfo& queue_info) {
90   is_async_ = queue_info.producer_config.is_async;
91   default_width_ = queue_info.producer_config.default_width;
92   default_height_ = queue_info.producer_config.default_height;
93   default_format_ = queue_info.producer_config.default_format;
94   user_metadata_size_ = queue_info.producer_config.user_metadata_size;
95   id_ = queue_info.id;
96 }
97 
CreateConsumerQueue()98 std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateConsumerQueue() {
99   if (auto status = CreateConsumerQueueHandle(/*silent*/ false))
100     return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take()));
101   else
102     return nullptr;
103 }
104 
CreateSilentConsumerQueue()105 std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateSilentConsumerQueue() {
106   if (auto status = CreateConsumerQueueHandle(/*silent*/ true))
107     return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take()));
108   else
109     return nullptr;
110 }
111 
CreateConsumerQueueHandle(bool silent)112 Status<LocalChannelHandle> BufferHubQueue::CreateConsumerQueueHandle(
113     bool silent) {
114   auto status = InvokeRemoteMethod<BufferHubRPC::CreateConsumerQueue>(silent);
115   if (!status) {
116     ALOGE(
117         "BufferHubQueue::CreateConsumerQueue: Failed to create consumer queue: "
118         "%s",
119         status.GetErrorMessage().c_str());
120     return ErrorStatus(status.error());
121   }
122 
123   return status;
124 }
125 
126 pdx::Status<ConsumerQueueParcelable>
CreateConsumerQueueParcelable(bool silent)127 BufferHubQueue::CreateConsumerQueueParcelable(bool silent) {
128   auto status = CreateConsumerQueueHandle(silent);
129   if (!status)
130     return status.error_status();
131 
132   // A temporary consumer queue client to pull its channel parcelable.
133   auto consumer_queue =
134       std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take()));
135   ConsumerQueueParcelable queue_parcelable(
136       consumer_queue->GetChannel()->TakeChannelParcelable());
137 
138   if (!queue_parcelable.IsValid()) {
139     ALOGE("%s: Failed to create consumer queue parcelable.", __FUNCTION__);
140     return ErrorStatus(EINVAL);
141   }
142 
143   return {std::move(queue_parcelable)};
144 }
145 
WaitForBuffers(int timeout)146 bool BufferHubQueue::WaitForBuffers(int timeout) {
147   ATRACE_NAME("BufferHubQueue::WaitForBuffers");
148   std::array<epoll_event, kMaxEvents> events;
149 
150   // Loop at least once to check for hangups.
151   do {
152     ALOGD_IF(
153         TRACE,
154         "BufferHubQueue::WaitForBuffers: queue_id=%d count=%zu capacity=%zu",
155         id(), count(), capacity());
156 
157     // If there is already a buffer then just check for hangup without waiting.
158     const int ret = epoll_fd_.Wait(events.data(), events.size(),
159                                    count() == 0 ? timeout : 0);
160 
161     if (ret == 0) {
162       ALOGI_IF(TRACE,
163                "BufferHubQueue::WaitForBuffers: No events before timeout: "
164                "queue_id=%d",
165                id());
166       return count() != 0;
167     }
168 
169     if (ret < 0 && ret != -EINTR) {
170       ALOGE("%s: Failed to wait for buffers: %s", __FUNCTION__, strerror(-ret));
171       return false;
172     }
173 
174     const int num_events = ret;
175 
176     // A BufferQueue's epoll fd tracks N+1 events, where there are N events,
177     // one for each buffer in the queue, and one extra event for the queue
178     // client itself.
179     for (int i = 0; i < num_events; i++) {
180       int32_t event_fd;
181       int32_t index;
182       std::tie(event_fd, index) = Unstuff(events[i].data.u64);
183 
184       PDX_TRACE_FORMAT(
185           "epoll_event|queue_id=%d;num_events=%d;event_index=%d;event_fd=%d;"
186           "slot=%d|",
187           id(), num_events, i, event_fd, index);
188 
189       ALOGD_IF(TRACE,
190                "BufferHubQueue::WaitForBuffers: event %d: event_fd=%d index=%d",
191                i, event_fd, index);
192 
193       if (is_buffer_event_index(index)) {
194         HandleBufferEvent(static_cast<size_t>(index), event_fd,
195                           events[i].events);
196       } else if (is_queue_event_index(index)) {
197         HandleQueueEvent(events[i].events);
198       } else {
199         ALOGW(
200             "BufferHubQueue::WaitForBuffers: Unknown event type event_fd=%d "
201             "index=%d",
202             event_fd, index);
203       }
204     }
205   } while (count() == 0 && capacity() > 0 && !hung_up());
206 
207   return count() != 0;
208 }
209 
HandleBufferEvent(size_t slot,int event_fd,int poll_events)210 Status<void> BufferHubQueue::HandleBufferEvent(size_t slot, int event_fd,
211                                                int poll_events) {
212   ATRACE_NAME("BufferHubQueue::HandleBufferEvent");
213   if (!buffers_[slot]) {
214     ALOGW("BufferHubQueue::HandleBufferEvent: Invalid buffer slot: %zu", slot);
215     return ErrorStatus(ENOENT);
216   }
217 
218   auto status = buffers_[slot]->GetEventMask(poll_events);
219   if (!status) {
220     ALOGW("BufferHubQueue::HandleBufferEvent: Failed to get event mask: %s",
221           status.GetErrorMessage().c_str());
222     return status.error_status();
223   }
224 
225   const int events = status.get();
226   PDX_TRACE_FORMAT(
227       "buffer|queue_id=%d;buffer_id=%d;slot=%zu;event_fd=%d;poll_events=%x;"
228       "events=%d|",
229       id(), buffers_[slot]->id(), slot, event_fd, poll_events, events);
230 
231   if (events & EPOLLIN) {
232     return Enqueue({buffers_[slot], slot, buffers_[slot]->GetQueueIndex()});
233   } else if (events & EPOLLHUP) {
234     ALOGW(
235         "BufferHubQueue::HandleBufferEvent: Received EPOLLHUP event: slot=%zu "
236         "event_fd=%d buffer_id=%d",
237         slot, buffers_[slot]->event_fd(), buffers_[slot]->id());
238     return RemoveBuffer(slot);
239   } else {
240     ALOGW(
241         "BufferHubQueue::HandleBufferEvent: Unknown event, slot=%zu, epoll "
242         "events=%d",
243         slot, events);
244   }
245 
246   return {};
247 }
248 
HandleQueueEvent(int poll_event)249 Status<void> BufferHubQueue::HandleQueueEvent(int poll_event) {
250   ATRACE_NAME("BufferHubQueue::HandleQueueEvent");
251   auto status = GetEventMask(poll_event);
252   if (!status) {
253     ALOGW("BufferHubQueue::HandleQueueEvent: Failed to get event mask: %s",
254           status.GetErrorMessage().c_str());
255     return status.error_status();
256   }
257 
258   const int events = status.get();
259   if (events & EPOLLIN) {
260     // Note that after buffer imports, if |count()| still returns 0, epoll
261     // wait will be tried again to acquire the newly imported buffer.
262     auto buffer_status = OnBufferAllocated();
263     if (!buffer_status) {
264       ALOGE("%s: Failed to import buffer: %s", __FUNCTION__,
265             buffer_status.GetErrorMessage().c_str());
266     }
267   } else if (events & EPOLLHUP) {
268     ALOGD_IF(TRACE, "%s: hang up event!", __FUNCTION__);
269     hung_up_ = true;
270   } else {
271     ALOGW("%s: Unknown epoll events=%x", __FUNCTION__, events);
272   }
273 
274   return {};
275 }
276 
AddBuffer(const std::shared_ptr<BufferHubBase> & buffer,size_t slot)277 Status<void> BufferHubQueue::AddBuffer(
278     const std::shared_ptr<BufferHubBase>& buffer, size_t slot) {
279   ALOGD_IF(TRACE, "%s: buffer_id=%d slot=%zu", __FUNCTION__, buffer->id(),
280            slot);
281 
282   if (is_full()) {
283     ALOGE("%s: queue is at maximum capacity: %zu", __FUNCTION__, capacity_);
284     return ErrorStatus(E2BIG);
285   }
286 
287   if (buffers_[slot]) {
288     // Replace the buffer if the slot is occupied. This could happen when the
289     // producer side replaced the slot with a newly allocated buffer. Remove the
290     // buffer before setting up with the new one.
291     auto remove_status = RemoveBuffer(slot);
292     if (!remove_status)
293       return remove_status.error_status();
294   }
295 
296   for (const auto& event_source : buffer->GetEventSources()) {
297     epoll_event event = {.events = event_source.event_mask | EPOLLET,
298                          .data = {.u64 = Stuff(buffer->event_fd(), slot)}};
299     const int ret =
300         epoll_fd_.Control(EPOLL_CTL_ADD, event_source.event_fd, &event);
301     if (ret < 0) {
302       ALOGE("%s: Failed to add buffer to epoll set: %s", __FUNCTION__,
303             strerror(-ret));
304       return ErrorStatus(-ret);
305     }
306   }
307 
308   buffers_[slot] = buffer;
309   capacity_++;
310   return {};
311 }
312 
RemoveBuffer(size_t slot)313 Status<void> BufferHubQueue::RemoveBuffer(size_t slot) {
314   ALOGD_IF(TRACE, "%s: slot=%zu", __FUNCTION__, slot);
315 
316   if (buffers_[slot]) {
317     for (const auto& event_source : buffers_[slot]->GetEventSources()) {
318       const int ret =
319           epoll_fd_.Control(EPOLL_CTL_DEL, event_source.event_fd, nullptr);
320       if (ret < 0) {
321         ALOGE("%s: Failed to remove buffer from epoll set: %s", __FUNCTION__,
322               strerror(-ret));
323         return ErrorStatus(-ret);
324       }
325     }
326 
327     // Trigger OnBufferRemoved callback if registered.
328     if (on_buffer_removed_)
329       on_buffer_removed_(buffers_[slot]);
330 
331     buffers_[slot] = nullptr;
332     capacity_--;
333   }
334 
335   return {};
336 }
337 
Enqueue(Entry entry)338 Status<void> BufferHubQueue::Enqueue(Entry entry) {
339   if (!is_full()) {
340     // Find and remove the enqueued buffer from unavailable_buffers_slot if
341     // exist.
342     auto enqueued_buffer_iter = std::find_if(
343         unavailable_buffers_slot_.begin(), unavailable_buffers_slot_.end(),
344         [&entry](size_t slot) -> bool { return slot == entry.slot; });
345     if (enqueued_buffer_iter != unavailable_buffers_slot_.end()) {
346       unavailable_buffers_slot_.erase(enqueued_buffer_iter);
347     }
348 
349     available_buffers_.push(std::move(entry));
350 
351     // Trigger OnBufferAvailable callback if registered.
352     if (on_buffer_available_)
353       on_buffer_available_();
354 
355     return {};
356   } else {
357     ALOGE("%s: Buffer queue is full!", __FUNCTION__);
358     return ErrorStatus(E2BIG);
359   }
360 }
361 
Dequeue(int timeout,size_t * slot)362 Status<std::shared_ptr<BufferHubBase>> BufferHubQueue::Dequeue(int timeout,
363                                                                size_t* slot) {
364   ALOGD_IF(TRACE, "%s: count=%zu, timeout=%d", __FUNCTION__, count(), timeout);
365 
366   PDX_TRACE_FORMAT("%s|count=%zu|", __FUNCTION__, count());
367 
368   if (count() == 0) {
369     if (!WaitForBuffers(timeout))
370       return ErrorStatus(ETIMEDOUT);
371   }
372 
373   auto& entry = available_buffers_.top();
374   PDX_TRACE_FORMAT("buffer|buffer_id=%d;slot=%zu|", entry.buffer->id(),
375                    entry.slot);
376 
377   std::shared_ptr<BufferHubBase> buffer = std::move(entry.buffer);
378   *slot = entry.slot;
379 
380   available_buffers_.pop();
381   unavailable_buffers_slot_.push_back(*slot);
382 
383   return {std::move(buffer)};
384 }
385 
SetBufferAvailableCallback(BufferAvailableCallback callback)386 void BufferHubQueue::SetBufferAvailableCallback(
387     BufferAvailableCallback callback) {
388   on_buffer_available_ = callback;
389 }
390 
SetBufferRemovedCallback(BufferRemovedCallback callback)391 void BufferHubQueue::SetBufferRemovedCallback(BufferRemovedCallback callback) {
392   on_buffer_removed_ = callback;
393 }
394 
FreeAllBuffers()395 pdx::Status<void> BufferHubQueue::FreeAllBuffers() {
396   // Clear all available buffers.
397   while (!available_buffers_.empty())
398     available_buffers_.pop();
399 
400   pdx::Status<void> last_error;  // No error.
401   // Clear all buffers this producer queue is tracking.
402   for (size_t slot = 0; slot < BufferHubQueue::kMaxQueueCapacity; slot++) {
403     if (buffers_[slot] != nullptr) {
404       auto status = RemoveBuffer(slot);
405       if (!status) {
406         ALOGE(
407             "ProducerQueue::FreeAllBuffers: Failed to remove buffer at "
408             "slot=%zu.",
409             slot);
410         last_error = status.error_status();
411       }
412     }
413   }
414 
415   return last_error;
416 }
417 
ProducerQueue(LocalChannelHandle handle)418 ProducerQueue::ProducerQueue(LocalChannelHandle handle)
419     : BASE(std::move(handle)) {
420   auto status = ImportQueue();
421   if (!status) {
422     ALOGE("ProducerQueue::ProducerQueue: Failed to import queue: %s",
423           status.GetErrorMessage().c_str());
424     Close(-status.error());
425   }
426 }
427 
ProducerQueue(const ProducerQueueConfig & config,const UsagePolicy & usage)428 ProducerQueue::ProducerQueue(const ProducerQueueConfig& config,
429                              const UsagePolicy& usage)
430     : BASE(BufferHubRPC::kClientPath) {
431   auto status =
432       InvokeRemoteMethod<BufferHubRPC::CreateProducerQueue>(config, usage);
433   if (!status) {
434     ALOGE("ProducerQueue::ProducerQueue: Failed to create producer queue: %s",
435           status.GetErrorMessage().c_str());
436     Close(-status.error());
437     return;
438   }
439 
440   SetupQueue(status.get());
441 }
442 
AllocateBuffers(uint32_t width,uint32_t height,uint32_t layer_count,uint32_t format,uint64_t usage,size_t buffer_count)443 Status<std::vector<size_t>> ProducerQueue::AllocateBuffers(
444     uint32_t width, uint32_t height, uint32_t layer_count, uint32_t format,
445     uint64_t usage, size_t buffer_count) {
446   if (buffer_count == 0) {
447     return {std::vector<size_t>()};
448   }
449 
450   if (capacity() + buffer_count > kMaxQueueCapacity) {
451     ALOGE(
452         "ProducerQueue::AllocateBuffers: queue is at capacity: %zu, cannot "
453         "allocate %zu more buffer(s).",
454         capacity(), buffer_count);
455     return ErrorStatus(E2BIG);
456   }
457 
458   Status<std::vector<std::pair<LocalChannelHandle, size_t>>> status =
459       InvokeRemoteMethod<BufferHubRPC::ProducerQueueAllocateBuffers>(
460           width, height, layer_count, format, usage, buffer_count);
461   if (!status) {
462     ALOGE("ProducerQueue::AllocateBuffers: failed to allocate buffers: %s",
463           status.GetErrorMessage().c_str());
464     return status.error_status();
465   }
466 
467   auto buffer_handle_slots = status.take();
468   LOG_ALWAYS_FATAL_IF(buffer_handle_slots.size() != buffer_count,
469                       "BufferHubRPC::ProducerQueueAllocateBuffers should "
470                       "return %zu buffer handle(s), but returned %zu instead.",
471                       buffer_count, buffer_handle_slots.size());
472 
473   std::vector<size_t> buffer_slots;
474   buffer_slots.reserve(buffer_count);
475 
476   // Bookkeeping for each buffer.
477   for (auto& hs : buffer_handle_slots) {
478     auto& buffer_handle = hs.first;
479     size_t buffer_slot = hs.second;
480 
481     // Note that import might (though very unlikely) fail. If so, buffer_handle
482     // will be closed and included in returned buffer_slots.
483     if (AddBuffer(ProducerBuffer::Import(std::move(buffer_handle)),
484                   buffer_slot)) {
485       ALOGD_IF(TRACE, "ProducerQueue::AllocateBuffers: new buffer at slot: %zu",
486                buffer_slot);
487       buffer_slots.push_back(buffer_slot);
488     }
489   }
490 
491   if (buffer_slots.size() != buffer_count) {
492     // Error out if the count of imported buffer(s) is not correct.
493     ALOGE(
494         "ProducerQueue::AllocateBuffers: requested to import %zu "
495         "buffers, but actually imported %zu buffers.",
496         buffer_count, buffer_slots.size());
497     return ErrorStatus(ENOMEM);
498   }
499 
500   return {std::move(buffer_slots)};
501 }
502 
AllocateBuffer(uint32_t width,uint32_t height,uint32_t layer_count,uint32_t format,uint64_t usage)503 Status<size_t> ProducerQueue::AllocateBuffer(uint32_t width, uint32_t height,
504                                              uint32_t layer_count,
505                                              uint32_t format, uint64_t usage) {
506   // We only allocate one buffer at a time.
507   constexpr size_t buffer_count = 1;
508   auto status =
509       AllocateBuffers(width, height, layer_count, format, usage, buffer_count);
510   if (!status) {
511     ALOGE("ProducerQueue::AllocateBuffer: Failed to allocate buffer: %s",
512           status.GetErrorMessage().c_str());
513     return status.error_status();
514   }
515 
516   return {status.get()[0]};
517 }
518 
AddBuffer(const std::shared_ptr<ProducerBuffer> & buffer,size_t slot)519 Status<void> ProducerQueue::AddBuffer(
520     const std::shared_ptr<ProducerBuffer>& buffer, size_t slot) {
521   ALOGD_IF(TRACE, "ProducerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
522            id(), buffer->id(), slot);
523   // For producer buffer, we need to enqueue the newly added buffer
524   // immediately. Producer queue starts with all buffers in available state.
525   auto status = BufferHubQueue::AddBuffer(buffer, slot);
526   if (!status)
527     return status;
528 
529   return BufferHubQueue::Enqueue({buffer, slot, 0ULL});
530 }
531 
InsertBuffer(const std::shared_ptr<ProducerBuffer> & buffer)532 Status<size_t> ProducerQueue::InsertBuffer(
533     const std::shared_ptr<ProducerBuffer>& buffer) {
534   if (buffer == nullptr ||
535       !BufferHubDefs::isClientGained(buffer->buffer_state(),
536                                      buffer->client_state_mask())) {
537     ALOGE(
538         "ProducerQueue::InsertBuffer: Can only insert a buffer when it's in "
539         "gained state.");
540     return ErrorStatus(EINVAL);
541   }
542 
543   auto status_or_slot =
544       InvokeRemoteMethod<BufferHubRPC::ProducerQueueInsertBuffer>(
545           buffer->cid());
546   if (!status_or_slot) {
547     ALOGE(
548         "ProducerQueue::InsertBuffer: Failed to insert producer buffer: "
549         "buffer_cid=%d, error: %s.",
550         buffer->cid(), status_or_slot.GetErrorMessage().c_str());
551     return status_or_slot.error_status();
552   }
553 
554   size_t slot = status_or_slot.get();
555 
556   // Note that we are calling AddBuffer() from the base class to explicitly
557   // avoid Enqueue() the ProducerBuffer.
558   auto status = BufferHubQueue::AddBuffer(buffer, slot);
559   if (!status) {
560     ALOGE("ProducerQueue::InsertBuffer: Failed to add buffer: %s.",
561           status.GetErrorMessage().c_str());
562     return status.error_status();
563   }
564   return {slot};
565 }
566 
RemoveBuffer(size_t slot)567 Status<void> ProducerQueue::RemoveBuffer(size_t slot) {
568   auto status =
569       InvokeRemoteMethod<BufferHubRPC::ProducerQueueRemoveBuffer>(slot);
570   if (!status) {
571     ALOGE("%s: Failed to remove producer buffer: %s", __FUNCTION__,
572           status.GetErrorMessage().c_str());
573     return status.error_status();
574   }
575 
576   return BufferHubQueue::RemoveBuffer(slot);
577 }
578 
Dequeue(int timeout,size_t * slot,LocalHandle * release_fence)579 Status<std::shared_ptr<ProducerBuffer>> ProducerQueue::Dequeue(
580     int timeout, size_t* slot, LocalHandle* release_fence) {
581   DvrNativeBufferMetadata canonical_meta;
582   return Dequeue(timeout, slot, &canonical_meta, release_fence);
583 }
584 
Dequeue(int timeout,size_t * slot,DvrNativeBufferMetadata * out_meta,pdx::LocalHandle * release_fence,bool gain_posted_buffer)585 pdx::Status<std::shared_ptr<ProducerBuffer>> ProducerQueue::Dequeue(
586     int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta,
587     pdx::LocalHandle* release_fence, bool gain_posted_buffer) {
588   ATRACE_NAME("ProducerQueue::Dequeue");
589   if (slot == nullptr || out_meta == nullptr || release_fence == nullptr) {
590     ALOGE("%s: Invalid parameter.", __FUNCTION__);
591     return ErrorStatus(EINVAL);
592   }
593 
594   std::shared_ptr<ProducerBuffer> buffer;
595   Status<std::shared_ptr<BufferHubBase>> dequeue_status =
596       BufferHubQueue::Dequeue(timeout, slot);
597   if (dequeue_status.ok()) {
598     buffer = std::static_pointer_cast<ProducerBuffer>(dequeue_status.take());
599   } else {
600     if (gain_posted_buffer) {
601       Status<std::shared_ptr<ProducerBuffer>> dequeue_unacquired_status =
602           ProducerQueue::DequeueUnacquiredBuffer(slot);
603       if (!dequeue_unacquired_status.ok()) {
604         ALOGE("%s: DequeueUnacquiredBuffer returned error: %d", __FUNCTION__,
605               dequeue_unacquired_status.error());
606         return dequeue_unacquired_status.error_status();
607       }
608       buffer = dequeue_unacquired_status.take();
609     } else {
610       return dequeue_status.error_status();
611     }
612   }
613   const int ret =
614       buffer->GainAsync(out_meta, release_fence, gain_posted_buffer);
615   if (ret < 0 && ret != -EALREADY)
616     return ErrorStatus(-ret);
617 
618   return {std::move(buffer)};
619 }
620 
DequeueUnacquiredBuffer(size_t * slot)621 Status<std::shared_ptr<ProducerBuffer>> ProducerQueue::DequeueUnacquiredBuffer(
622     size_t* slot) {
623   if (unavailable_buffers_slot_.size() < 1) {
624     ALOGE(
625         "%s: Failed to dequeue un-acquired buffer. All buffer(s) are in "
626         "acquired state if exist.",
627         __FUNCTION__);
628     return ErrorStatus(ENOMEM);
629   }
630 
631   // Find the first buffer that is not in acquired state from
632   // unavailable_buffers_slot_.
633   for (auto iter = unavailable_buffers_slot_.begin();
634        iter != unavailable_buffers_slot_.end(); iter++) {
635     std::shared_ptr<ProducerBuffer> buffer = ProducerQueue::GetBuffer(*iter);
636     if (buffer == nullptr) {
637       ALOGE("%s failed. Buffer slot %d is  null.", __FUNCTION__,
638             static_cast<int>(*slot));
639       return ErrorStatus(EIO);
640     }
641     if (!BufferHubDefs::isAnyClientAcquired(buffer->buffer_state())) {
642       *slot = *iter;
643       unavailable_buffers_slot_.erase(iter);
644       unavailable_buffers_slot_.push_back(*slot);
645       ALOGD("%s: Producer queue dequeue unacquired buffer in slot %d",
646             __FUNCTION__, static_cast<int>(*slot));
647       return {std::move(buffer)};
648     }
649   }
650   ALOGE(
651       "%s: Failed to dequeue un-acquired buffer. No un-acquired buffer exist.",
652       __FUNCTION__);
653   return ErrorStatus(EBUSY);
654 }
655 
TakeAsParcelable()656 pdx::Status<ProducerQueueParcelable> ProducerQueue::TakeAsParcelable() {
657   if (capacity() != 0) {
658     ALOGE(
659         "%s: producer queue can only be taken out as a parcelable when empty. "
660         "Current queue capacity: %zu",
661         __FUNCTION__, capacity());
662     return ErrorStatus(EINVAL);
663   }
664 
665   std::unique_ptr<pdx::ClientChannel> channel = TakeChannel();
666   ProducerQueueParcelable queue_parcelable(channel->TakeChannelParcelable());
667 
668   // Here the queue parcelable is returned and holds the underlying system
669   // resources backing the queue; while the original client channel of this
670   // producer queue is destroyed in place so that this client can no longer
671   // provide producer operations.
672   return {std::move(queue_parcelable)};
673 }
674 
675 /*static */
Import(LocalChannelHandle handle)676 std::unique_ptr<ConsumerQueue> ConsumerQueue::Import(
677     LocalChannelHandle handle) {
678   return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(std::move(handle)));
679 }
680 
ConsumerQueue(LocalChannelHandle handle)681 ConsumerQueue::ConsumerQueue(LocalChannelHandle handle)
682     : BufferHubQueue(std::move(handle)) {
683   auto status = ImportQueue();
684   if (!status) {
685     ALOGE("%s: Failed to import queue: %s", __FUNCTION__,
686           status.GetErrorMessage().c_str());
687     Close(-status.error());
688   }
689 
690   auto import_status = ImportBuffers();
691   if (import_status) {
692     ALOGI("%s: Imported %zu buffers.", __FUNCTION__, import_status.get());
693   } else {
694     ALOGE("%s: Failed to import buffers: %s", __FUNCTION__,
695           import_status.GetErrorMessage().c_str());
696   }
697 }
698 
ImportBuffers()699 Status<size_t> ConsumerQueue::ImportBuffers() {
700   auto status = InvokeRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>();
701   if (!status) {
702     if (status.error() == EBADR) {
703       ALOGI("%s: Queue is silent, no buffers imported.", __FUNCTION__);
704       return {0};
705     } else {
706       ALOGE("%s: Failed to import consumer buffer: %s", __FUNCTION__,
707             status.GetErrorMessage().c_str());
708       return status.error_status();
709     }
710   }
711 
712   int ret;
713   Status<void> last_error;
714   size_t imported_buffers_count = 0;
715 
716   auto buffer_handle_slots = status.take();
717   for (auto& buffer_handle_slot : buffer_handle_slots) {
718     ALOGD_IF(TRACE, ": buffer_handle=%d", __FUNCTION__,
719              buffer_handle_slot.first.value());
720 
721     std::unique_ptr<ConsumerBuffer> consumer_buffer =
722         ConsumerBuffer::Import(std::move(buffer_handle_slot.first));
723     if (!consumer_buffer) {
724       ALOGE("%s: Failed to import buffer: slot=%zu", __FUNCTION__,
725             buffer_handle_slot.second);
726       last_error = ErrorStatus(EPIPE);
727       continue;
728     }
729 
730     auto add_status =
731         AddBuffer(std::move(consumer_buffer), buffer_handle_slot.second);
732     if (!add_status) {
733       ALOGE("%s: Failed to add buffer: %s", __FUNCTION__,
734             add_status.GetErrorMessage().c_str());
735       last_error = add_status;
736     } else {
737       imported_buffers_count++;
738     }
739   }
740 
741   if (imported_buffers_count > 0)
742     return {imported_buffers_count};
743   else
744     return last_error.error_status();
745 }
746 
AddBuffer(const std::shared_ptr<ConsumerBuffer> & buffer,size_t slot)747 Status<void> ConsumerQueue::AddBuffer(
748     const std::shared_ptr<ConsumerBuffer>& buffer, size_t slot) {
749   ALOGD_IF(TRACE, "%s: queue_id=%d buffer_id=%d slot=%zu", __FUNCTION__, id(),
750            buffer->id(), slot);
751   return BufferHubQueue::AddBuffer(buffer, slot);
752 }
753 
Dequeue(int timeout,size_t * slot,void * meta,size_t user_metadata_size,LocalHandle * acquire_fence)754 Status<std::shared_ptr<ConsumerBuffer>> ConsumerQueue::Dequeue(
755     int timeout, size_t* slot, void* meta, size_t user_metadata_size,
756     LocalHandle* acquire_fence) {
757   if (user_metadata_size != user_metadata_size_) {
758     ALOGE(
759         "%s: Metadata size (%zu) for the dequeuing buffer does not match "
760         "metadata size (%zu) for the queue.",
761         __FUNCTION__, user_metadata_size, user_metadata_size_);
762     return ErrorStatus(EINVAL);
763   }
764 
765   DvrNativeBufferMetadata canonical_meta;
766   auto status = Dequeue(timeout, slot, &canonical_meta, acquire_fence);
767   if (!status)
768     return status.error_status();
769 
770   if (meta && user_metadata_size) {
771     void* metadata_src =
772         reinterpret_cast<void*>(canonical_meta.user_metadata_ptr);
773     if (metadata_src) {
774       memcpy(meta, metadata_src, user_metadata_size);
775     } else {
776       ALOGW("%s: no user-defined metadata.", __FUNCTION__);
777     }
778   }
779 
780   return status;
781 }
782 
Dequeue(int timeout,size_t * slot,DvrNativeBufferMetadata * out_meta,pdx::LocalHandle * acquire_fence)783 Status<std::shared_ptr<ConsumerBuffer>> ConsumerQueue::Dequeue(
784     int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta,
785     pdx::LocalHandle* acquire_fence) {
786   ATRACE_NAME("ConsumerQueue::Dequeue");
787   if (slot == nullptr || out_meta == nullptr || acquire_fence == nullptr) {
788     ALOGE("%s: Invalid parameter.", __FUNCTION__);
789     return ErrorStatus(EINVAL);
790   }
791 
792   auto status = BufferHubQueue::Dequeue(timeout, slot);
793   if (!status)
794     return status.error_status();
795 
796   auto buffer = std::static_pointer_cast<ConsumerBuffer>(status.take());
797   const int ret = buffer->AcquireAsync(out_meta, acquire_fence);
798   if (ret < 0)
799     return ErrorStatus(-ret);
800 
801   return {std::move(buffer)};
802 }
803 
OnBufferAllocated()804 Status<void> ConsumerQueue::OnBufferAllocated() {
805   ALOGD_IF(TRACE, "%s: queue_id=%d", __FUNCTION__, id());
806 
807   auto status = ImportBuffers();
808   if (!status) {
809     ALOGE("%s: Failed to import buffers: %s", __FUNCTION__,
810           status.GetErrorMessage().c_str());
811     return ErrorStatus(status.error());
812   } else if (status.get() == 0) {
813     ALOGW("%s: No new buffers allocated!", __FUNCTION__);
814     return ErrorStatus(ENOBUFS);
815   } else {
816     ALOGD_IF(TRACE, "%s: Imported %zu consumer buffers.", __FUNCTION__,
817              status.get());
818     return {};
819   }
820 }
821 
822 }  // namespace dvr
823 }  // namespace android
824