1 #include <sys/epoll.h>
2 #include <sys/eventfd.h>
3 #include <sys/poll.h>
4 
5 #include <algorithm>
6 #include <atomic>
7 #include <thread>
8 
9 #include <log/log.h>
10 #include <private/dvr/bufferhub_rpc.h>
11 #include <private/dvr/consumer_channel.h>
12 #include <private/dvr/producer_channel.h>
13 #include <sync/sync.h>
14 #include <utils/Trace.h>
15 
16 using android::pdx::BorrowedHandle;
17 using android::pdx::ErrorStatus;
18 using android::pdx::Message;
19 using android::pdx::RemoteChannelHandle;
20 using android::pdx::Status;
21 using android::pdx::rpc::BufferWrapper;
22 using android::pdx::rpc::DispatchRemoteMethod;
23 using android::pdx::rpc::WrapBuffer;
24 
25 namespace android {
26 namespace dvr {
27 
ProducerChannel(BufferHubService * service,int buffer_id,int channel_id,IonBuffer buffer,IonBuffer metadata_buffer,size_t user_metadata_size,int * error)28 ProducerChannel::ProducerChannel(BufferHubService* service, int buffer_id,
29                                  int channel_id, IonBuffer buffer,
30                                  IonBuffer metadata_buffer,
31                                  size_t user_metadata_size, int* error)
32     : BufferHubChannel(service, buffer_id, channel_id, kProducerType),
33       buffer_(std::move(buffer)),
34       metadata_buffer_(std::move(metadata_buffer)),
35       user_metadata_size_(user_metadata_size),
36       metadata_buf_size_(BufferHubDefs::kMetadataHeaderSize +
37                          user_metadata_size) {
38   if (!buffer_.IsValid()) {
39     ALOGE("ProducerChannel::ProducerChannel: Invalid buffer.");
40     *error = -EINVAL;
41     return;
42   }
43   if (!metadata_buffer_.IsValid()) {
44     ALOGE("ProducerChannel::ProducerChannel: Invalid metadata buffer.");
45     *error = -EINVAL;
46     return;
47   }
48 
49   *error = InitializeBuffer();
50 }
51 
ProducerChannel(BufferHubService * service,int channel_id,uint32_t width,uint32_t height,uint32_t layer_count,uint32_t format,uint64_t usage,size_t user_metadata_size,int * error)52 ProducerChannel::ProducerChannel(BufferHubService* service, int channel_id,
53                                  uint32_t width, uint32_t height,
54                                  uint32_t layer_count, uint32_t format,
55                                  uint64_t usage, size_t user_metadata_size,
56                                  int* error)
57     : BufferHubChannel(service, channel_id, channel_id, kProducerType),
58       user_metadata_size_(user_metadata_size),
59       metadata_buf_size_(BufferHubDefs::kMetadataHeaderSize +
60                          user_metadata_size) {
61   if (int ret = buffer_.Alloc(width, height, layer_count, format, usage)) {
62     ALOGE("ProducerChannel::ProducerChannel: Failed to allocate buffer: %s",
63           strerror(-ret));
64     *error = ret;
65     return;
66   }
67 
68   if (int ret = metadata_buffer_.Alloc(metadata_buf_size_, /*height=*/1,
69                                        /*layer_count=*/1,
70                                        BufferHubDefs::kMetadataFormat,
71                                        BufferHubDefs::kMetadataUsage)) {
72     ALOGE("ProducerChannel::ProducerChannel: Failed to allocate metadata: %s",
73           strerror(-ret));
74     *error = ret;
75     return;
76   }
77 
78   *error = InitializeBuffer();
79 }
80 
InitializeBuffer()81 int ProducerChannel::InitializeBuffer() {
82   void* metadata_ptr = nullptr;
83   if (int ret = metadata_buffer_.Lock(BufferHubDefs::kMetadataUsage, /*x=*/0,
84                                       /*y=*/0, metadata_buf_size_,
85                                       /*height=*/1, &metadata_ptr)) {
86     ALOGE("ProducerChannel::ProducerChannel: Failed to lock metadata.");
87     return ret;
88   }
89   metadata_header_ =
90       reinterpret_cast<BufferHubDefs::MetadataHeader*>(metadata_ptr);
91 
92   // Using placement new here to reuse shared memory instead of new allocation
93   // and also initialize the value to zero.
94   buffer_state_ = new (&metadata_header_->bufferState) std::atomic<uint32_t>(0);
95   fence_state_ = new (&metadata_header_->fenceState) std::atomic<uint32_t>(0);
96   active_clients_bit_mask_ =
97       new (&metadata_header_->activeClientsBitMask) std::atomic<uint32_t>(0);
98 
99   // Producer channel is never created after consumer channel, and one buffer
100   // only have one fixed producer for now. Thus, it is correct to assume
101   // producer state bit is kFirstClientBitMask for now.
102   active_clients_bit_mask_->store(BufferHubDefs::kFirstClientBitMask,
103                                   std::memory_order_release);
104 
105   acquire_fence_fd_.Reset(epoll_create1(EPOLL_CLOEXEC));
106   release_fence_fd_.Reset(epoll_create1(EPOLL_CLOEXEC));
107   if (!acquire_fence_fd_ || !release_fence_fd_) {
108     ALOGE("ProducerChannel::ProducerChannel: Failed to create shared fences.");
109     return -EIO;
110   }
111 
112   dummy_fence_fd_.Reset(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK));
113   if (!dummy_fence_fd_) {
114     ALOGE("ProducerChannel::ProducerChannel: Failed to create dummy fences.");
115     return EIO;
116   }
117 
118   epoll_event event;
119   event.events = 0;
120   event.data.u32 = 0U;
121   if (epoll_ctl(release_fence_fd_.Get(), EPOLL_CTL_ADD, dummy_fence_fd_.Get(),
122                 &event) < 0) {
123     ALOGE(
124         "ProducerChannel::ProducerChannel: Failed to modify the shared "
125         "release fence to include the dummy fence: %s",
126         strerror(errno));
127     return -EIO;
128   }
129 
130   // Success.
131   return 0;
132 }
133 
Create(BufferHubService * service,int buffer_id,int channel_id,IonBuffer buffer,IonBuffer metadata_buffer,size_t user_metadata_size)134 std::unique_ptr<ProducerChannel> ProducerChannel::Create(
135     BufferHubService* service, int buffer_id, int channel_id, IonBuffer buffer,
136     IonBuffer metadata_buffer, size_t user_metadata_size) {
137   int error = 0;
138   std::unique_ptr<ProducerChannel> producer(new ProducerChannel(
139       service, buffer_id, channel_id, std::move(buffer),
140       std::move(metadata_buffer), user_metadata_size, &error));
141 
142   if (error < 0)
143     return nullptr;
144   else
145     return producer;
146 }
147 
Create(BufferHubService * service,int channel_id,uint32_t width,uint32_t height,uint32_t layer_count,uint32_t format,uint64_t usage,size_t user_metadata_size)148 Status<std::shared_ptr<ProducerChannel>> ProducerChannel::Create(
149     BufferHubService* service, int channel_id, uint32_t width, uint32_t height,
150     uint32_t layer_count, uint32_t format, uint64_t usage,
151     size_t user_metadata_size) {
152   int error;
153   std::shared_ptr<ProducerChannel> producer(
154       new ProducerChannel(service, channel_id, width, height, layer_count,
155                           format, usage, user_metadata_size, &error));
156   if (error < 0)
157     return ErrorStatus(-error);
158   else
159     return {std::move(producer)};
160 }
161 
~ProducerChannel()162 ProducerChannel::~ProducerChannel() {
163   ALOGD_IF(TRACE,
164            "ProducerChannel::~ProducerChannel: channel_id=%d buffer_id=%d "
165            "state=%" PRIx32 ".",
166            channel_id(), buffer_id(),
167            buffer_state_->load(std::memory_order_acquire));
168   for (auto consumer : consumer_channels_) {
169     consumer->OnProducerClosed();
170   }
171   Hangup();
172 }
173 
GetBufferInfo() const174 BufferHubChannel::BufferInfo ProducerChannel::GetBufferInfo() const {
175   // Derive the mask of signaled buffers in this producer / consumer set.
176   uint32_t signaled_mask = signaled() ? BufferHubDefs::kFirstClientBitMask : 0;
177   for (const ConsumerChannel* consumer : consumer_channels_) {
178     signaled_mask |= consumer->signaled() ? consumer->client_state_mask() : 0;
179   }
180 
181   return BufferInfo(buffer_id(), consumer_channels_.size(), buffer_.width(),
182                     buffer_.height(), buffer_.layer_count(), buffer_.format(),
183                     buffer_.usage(),
184                     buffer_state_->load(std::memory_order_acquire),
185                     signaled_mask, metadata_header_->queueIndex);
186 }
187 
HandleImpulse(Message & message)188 void ProducerChannel::HandleImpulse(Message& message) {
189   ATRACE_NAME("ProducerChannel::HandleImpulse");
190   switch (message.GetOp()) {
191     case BufferHubRPC::ProducerGain::Opcode:
192       OnProducerGain(message);
193       break;
194     case BufferHubRPC::ProducerPost::Opcode:
195       OnProducerPost(message, {});
196       break;
197   }
198 }
199 
HandleMessage(Message & message)200 bool ProducerChannel::HandleMessage(Message& message) {
201   ATRACE_NAME("ProducerChannel::HandleMessage");
202   switch (message.GetOp()) {
203     case BufferHubRPC::GetBuffer::Opcode:
204       DispatchRemoteMethod<BufferHubRPC::GetBuffer>(
205           *this, &ProducerChannel::OnGetBuffer, message);
206       return true;
207 
208     case BufferHubRPC::NewConsumer::Opcode:
209       DispatchRemoteMethod<BufferHubRPC::NewConsumer>(
210           *this, &ProducerChannel::OnNewConsumer, message);
211       return true;
212 
213     case BufferHubRPC::ProducerPost::Opcode:
214       DispatchRemoteMethod<BufferHubRPC::ProducerPost>(
215           *this, &ProducerChannel::OnProducerPost, message);
216       return true;
217 
218     case BufferHubRPC::ProducerGain::Opcode:
219       DispatchRemoteMethod<BufferHubRPC::ProducerGain>(
220           *this, &ProducerChannel::OnProducerGain, message);
221       return true;
222 
223     default:
224       return false;
225   }
226 }
227 
GetBuffer(uint32_t client_state_mask)228 BufferDescription<BorrowedHandle> ProducerChannel::GetBuffer(
229     uint32_t client_state_mask) {
230   return {buffer_,
231           metadata_buffer_,
232           buffer_id(),
233           channel_id(),
234           client_state_mask,
235           acquire_fence_fd_.Borrow(),
236           release_fence_fd_.Borrow()};
237 }
238 
OnGetBuffer(Message &)239 Status<BufferDescription<BorrowedHandle>> ProducerChannel::OnGetBuffer(
240     Message& /*message*/) {
241   ATRACE_NAME("ProducerChannel::OnGetBuffer");
242   ALOGD_IF(TRACE, "ProducerChannel::OnGetBuffer: buffer=%d, state=%" PRIx32 ".",
243            buffer_id(), buffer_state_->load(std::memory_order_acquire));
244   return {GetBuffer(BufferHubDefs::kFirstClientBitMask)};
245 }
246 
CreateConsumerStateMask()247 Status<uint32_t> ProducerChannel::CreateConsumerStateMask() {
248   // Try find the next consumer state bit which has not been claimed by any
249   // consumer yet.
250   // memory_order_acquire is chosen here because all writes in other threads
251   // that release active_clients_bit_mask_ need to be visible here.
252   uint32_t current_active_clients_bit_mask =
253       active_clients_bit_mask_->load(std::memory_order_acquire);
254   uint32_t consumer_state_mask =
255       BufferHubDefs::findNextAvailableClientStateMask(
256           current_active_clients_bit_mask | orphaned_consumer_bit_mask_);
257   if (consumer_state_mask == 0U) {
258     ALOGE("%s: reached the maximum mumber of consumers per producer: 63.",
259           __FUNCTION__);
260     return ErrorStatus(E2BIG);
261   }
262   uint32_t updated_active_clients_bit_mask =
263       current_active_clients_bit_mask | consumer_state_mask;
264   // Set the updated value only if the current value stays the same as what was
265   // read before. If the comparison succeeds, update the value without
266   // reordering anything before or after this read-modify-write in the current
267   // thread, and the modification will be visible in other threads that acquire
268   // active_clients_bit_mask_. If the comparison fails, load the result of
269   // all writes from all threads to updated_active_clients_bit_mask.
270   // Keep on finding the next available slient state mask until succeed or out
271   // of memory.
272   while (!active_clients_bit_mask_->compare_exchange_weak(
273       current_active_clients_bit_mask, updated_active_clients_bit_mask,
274       std::memory_order_acq_rel, std::memory_order_acquire)) {
275     ALOGE("%s: Current active clients bit mask is changed to %" PRIx32
276           ", which was expected to be %" PRIx32
277           ". Trying to generate a new client state mask to resolve race "
278           "condition.",
279           __FUNCTION__, updated_active_clients_bit_mask,
280           current_active_clients_bit_mask);
281     consumer_state_mask = BufferHubDefs::findNextAvailableClientStateMask(
282         current_active_clients_bit_mask | orphaned_consumer_bit_mask_);
283     if (consumer_state_mask == 0U) {
284       ALOGE("%s: reached the maximum mumber of consumers per producer: %d.",
285             __FUNCTION__, (BufferHubDefs::kMaxNumberOfClients - 1));
286       return ErrorStatus(E2BIG);
287     }
288     updated_active_clients_bit_mask =
289         current_active_clients_bit_mask | consumer_state_mask;
290   }
291 
292   return {consumer_state_mask};
293 }
294 
RemoveConsumerClientMask(uint32_t consumer_state_mask)295 void ProducerChannel::RemoveConsumerClientMask(uint32_t consumer_state_mask) {
296   // Clear up the buffer state and fence state in case there is already
297   // something there due to possible race condition between producer post and
298   // consumer failed to create channel.
299   buffer_state_->fetch_and(~consumer_state_mask, std::memory_order_release);
300   fence_state_->fetch_and(~consumer_state_mask, std::memory_order_release);
301 
302   // Restore the consumer state bit and make it visible in other threads that
303   // acquire the active_clients_bit_mask_.
304   active_clients_bit_mask_->fetch_and(~consumer_state_mask,
305                                       std::memory_order_release);
306 }
307 
CreateConsumer(Message & message,uint32_t consumer_state_mask)308 Status<RemoteChannelHandle> ProducerChannel::CreateConsumer(
309     Message& message, uint32_t consumer_state_mask) {
310   ATRACE_NAME(__FUNCTION__);
311   ALOGD("%s: buffer_id=%d", __FUNCTION__, buffer_id());
312 
313   int channel_id;
314   auto status = message.PushChannel(0, nullptr, &channel_id);
315   if (!status) {
316     ALOGE("%s: Failed to push consumer channel: %s", __FUNCTION__,
317           status.GetErrorMessage().c_str());
318     RemoveConsumerClientMask(consumer_state_mask);
319     return ErrorStatus(ENOMEM);
320   }
321 
322   auto consumer = std::make_shared<ConsumerChannel>(
323       service(), buffer_id(), channel_id, consumer_state_mask,
324       shared_from_this());
325   const auto channel_status = service()->SetChannel(channel_id, consumer);
326   if (!channel_status) {
327     ALOGE("%s: failed to set new consumer channel: %s.", __FUNCTION__,
328           channel_status.GetErrorMessage().c_str());
329     RemoveConsumerClientMask(consumer_state_mask);
330     return ErrorStatus(ENOMEM);
331   }
332 
333   uint32_t current_buffer_state =
334       buffer_state_->load(std::memory_order_acquire);
335   // Return the consumer channel handle without signal when adding the new
336   // consumer to a buffer that is available to producer (a.k.a a fully-released
337   // buffer) or a gained buffer.
338   if (current_buffer_state == 0U ||
339       BufferHubDefs::isAnyClientGained(current_buffer_state)) {
340     return {status.take()};
341   }
342 
343   // Signal the new consumer when adding it to a posted producer.
344   bool update_buffer_state = true;
345   if (!BufferHubDefs::isClientPosted(current_buffer_state,
346                                      consumer_state_mask)) {
347     uint32_t updated_buffer_state =
348         current_buffer_state ^
349         (consumer_state_mask & BufferHubDefs::kHighBitsMask);
350     while (!buffer_state_->compare_exchange_weak(
351         current_buffer_state, updated_buffer_state, std::memory_order_acq_rel,
352         std::memory_order_acquire)) {
353       ALOGI(
354           "%s: Failed to post to the new consumer. "
355           "Current buffer state was changed to %" PRIx32
356           " when trying to acquire the buffer and modify the buffer state to "
357           "%" PRIx32
358           ". About to try again if the buffer is still not gained nor fully "
359           "released.",
360           __FUNCTION__, current_buffer_state, updated_buffer_state);
361       if (current_buffer_state == 0U ||
362           BufferHubDefs::isAnyClientGained(current_buffer_state)) {
363         ALOGI("%s: buffer is gained or fully released, state=%" PRIx32 ".",
364               __FUNCTION__, current_buffer_state);
365         update_buffer_state = false;
366         break;
367       }
368       updated_buffer_state =
369           current_buffer_state ^
370           (consumer_state_mask & BufferHubDefs::kHighBitsMask);
371     }
372   }
373   if (update_buffer_state || BufferHubDefs::isClientPosted(
374                                  buffer_state_->load(std::memory_order_acquire),
375                                  consumer_state_mask)) {
376     consumer->OnProducerPosted();
377   }
378 
379   return {status.take()};
380 }
381 
OnNewConsumer(Message & message)382 Status<RemoteChannelHandle> ProducerChannel::OnNewConsumer(Message& message) {
383   ATRACE_NAME("ProducerChannel::OnNewConsumer");
384   ALOGD_IF(TRACE, "ProducerChannel::OnNewConsumer: buffer_id=%d", buffer_id());
385   auto status = CreateConsumerStateMask();
386   if (!status.ok()) {
387     return status.error_status();
388   }
389   return CreateConsumer(message, /*consumer_state_mask=*/status.get());
390 }
391 
OnProducerPost(Message &,LocalFence acquire_fence)392 Status<void> ProducerChannel::OnProducerPost(Message&,
393                                              LocalFence acquire_fence) {
394   ATRACE_NAME("ProducerChannel::OnProducerPost");
395   ALOGD_IF(TRACE, "%s: buffer_id=%d, state=0x%x", __FUNCTION__, buffer_id(),
396            buffer_state_->load(std::memory_order_acquire));
397 
398   epoll_event event;
399   event.events = 0;
400   event.data.u32 = 0U;
401   int ret = epoll_ctl(release_fence_fd_.Get(), EPOLL_CTL_MOD,
402                       dummy_fence_fd_.Get(), &event);
403   ALOGE_IF(ret < 0,
404            "ProducerChannel::OnProducerPost: Failed to modify the shared "
405            "release fence to include the dummy fence: %s",
406            strerror(errno));
407 
408   eventfd_t dummy_fence_count = 0U;
409   if (eventfd_read(dummy_fence_fd_.Get(), &dummy_fence_count) < 0) {
410     const int error = errno;
411     if (error != EAGAIN) {
412       ALOGE(
413           "ProducerChannel::ProducerChannel: Failed to read dummy fence, "
414           "error: %s",
415           strerror(error));
416       return ErrorStatus(error);
417     }
418   }
419 
420   ALOGW_IF(dummy_fence_count > 0,
421            "ProducerChannel::ProducerChannel: %" PRIu64
422            " dummy fence(s) was signaled during last release/gain cycle "
423            "buffer_id=%d.",
424            dummy_fence_count, buffer_id());
425 
426   post_fence_ = std::move(acquire_fence);
427 
428   // Signal any interested consumers. If there are none, the buffer will stay
429   // in posted state until a consumer comes online. This behavior guarantees
430   // that no frame is silently dropped.
431   for (auto& consumer : consumer_channels_) {
432     consumer->OnProducerPosted();
433   }
434 
435   return {};
436 }
437 
OnProducerGain(Message &)438 Status<LocalFence> ProducerChannel::OnProducerGain(Message& /*message*/) {
439   ATRACE_NAME("ProducerChannel::OnGain");
440   ALOGD_IF(TRACE, "%s: buffer_id=%d", __FUNCTION__, buffer_id());
441 
442   ClearAvailable();
443   post_fence_.close();
444   for (auto& consumer : consumer_channels_) {
445     consumer->OnProducerGained();
446   }
447   return {std::move(returned_fence_)};
448 }
449 
450 // TODO(b/112338294) Keep here for reference. Remove it after new logic is
451 // written.
452 /* Status<RemoteChannelHandle> ProducerChannel::OnProducerDetach(
453     Message& message) {
454   ATRACE_NAME("ProducerChannel::OnProducerDetach");
455   ALOGD_IF(TRACE, "ProducerChannel::OnProducerDetach: buffer_id=%d",
456            buffer_id());
457 
458   uint32_t buffer_state = buffer_state_->load(std::memory_order_acquire);
459   if (!BufferHubDefs::isClientGained(
460       buffer_state, BufferHubDefs::kFirstClientStateMask)) {
461     // Can only detach a ProducerBuffer when it's in gained state.
462     ALOGW(
463         "ProducerChannel::OnProducerDetach: The buffer (id=%d, state=%"
464         PRIx32
465         ") is not in gained state.",
466         buffer_id(), buffer_state);
467     return {};
468   }
469 
470   int channel_id;
471   auto status = message.PushChannel(0, nullptr, &channel_id);
472   if (!status) {
473     ALOGE(
474         "ProducerChannel::OnProducerDetach: Failed to push detached buffer "
475         "channel: %s",
476         status.GetErrorMessage().c_str());
477     return ErrorStatus(ENOMEM);
478   }
479 
480   // Make sure we unlock the buffer.
481   if (int ret = metadata_buffer_.Unlock()) {
482     ALOGE("ProducerChannel::OnProducerDetach: Failed to unlock metadata.");
483     return ErrorStatus(-ret);
484   };
485 
486   std::unique_ptr<BufferChannel> channel =
487       BufferChannel::Create(service(), buffer_id(), channel_id,
488                             std::move(buffer_), user_metadata_size_);
489   if (!channel) {
490     ALOGE("ProducerChannel::OnProducerDetach: Invalid buffer.");
491     return ErrorStatus(EINVAL);
492   }
493 
494   const auto channel_status =
495       service()->SetChannel(channel_id, std::move(channel));
496   if (!channel_status) {
497     // Technically, this should never fail, as we just pushed the channel.
498     // Note that LOG_FATAL will be stripped out in non-debug build.
499     LOG_FATAL(
500         "ProducerChannel::OnProducerDetach: Failed to set new detached "
501         "buffer channel: %s.", channel_status.GetErrorMessage().c_str());
502   }
503 
504   return status;
505 } */
506 
OnConsumerAcquire(Message &)507 Status<LocalFence> ProducerChannel::OnConsumerAcquire(Message& /*message*/) {
508   ATRACE_NAME("ProducerChannel::OnConsumerAcquire");
509   ALOGD_IF(TRACE, "ProducerChannel::OnConsumerAcquire: buffer_id=%d",
510            buffer_id());
511 
512   // Return a borrowed fd to avoid unnecessary duplication of the underlying
513   // fd. Serialization just needs to read the handle.
514   return {std::move(post_fence_)};
515 }
516 
OnConsumerRelease(Message &,LocalFence release_fence)517 Status<void> ProducerChannel::OnConsumerRelease(Message&,
518                                                 LocalFence release_fence) {
519   ATRACE_NAME("ProducerChannel::OnConsumerRelease");
520   ALOGD_IF(TRACE, "ProducerChannel::OnConsumerRelease: buffer_id=%d",
521            buffer_id());
522 
523   // Attempt to merge the fences if necessary.
524   if (release_fence) {
525     if (returned_fence_) {
526       LocalFence merged_fence(sync_merge("bufferhub_merged",
527                                          returned_fence_.get_fd(),
528                                          release_fence.get_fd()));
529       const int error = errno;
530       if (!merged_fence) {
531         ALOGE("ProducerChannel::OnConsumerRelease: Failed to merge fences: %s",
532               strerror(error));
533         return ErrorStatus(error);
534       }
535       returned_fence_ = std::move(merged_fence);
536     } else {
537       returned_fence_ = std::move(release_fence);
538     }
539   }
540 
541   if (IsBufferReleasedByAllActiveClientsExceptForOrphans()) {
542     buffer_state_->store(0U);
543     SignalAvailable();
544     if (orphaned_consumer_bit_mask_) {
545       ALOGW(
546           "%s: orphaned buffer detected during the this acquire/release cycle: "
547           "id=%d orphaned=0x%" PRIx32 " queue_index=%" PRId64 ".",
548           __FUNCTION__, buffer_id(), orphaned_consumer_bit_mask_,
549           metadata_header_->queueIndex);
550       orphaned_consumer_bit_mask_ = 0;
551     }
552   }
553 
554   return {};
555 }
556 
OnConsumerOrphaned(const uint32_t & consumer_state_mask)557 void ProducerChannel::OnConsumerOrphaned(const uint32_t& consumer_state_mask) {
558   // Remember the ignored consumer so that newly added consumer won't be
559   // taking the same state mask as this orphaned consumer.
560   ALOGE_IF(orphaned_consumer_bit_mask_ & consumer_state_mask,
561            "%s: Consumer (consumer_state_mask=%" PRIx32
562            ") is already orphaned.",
563            __FUNCTION__, consumer_state_mask);
564   orphaned_consumer_bit_mask_ |= consumer_state_mask;
565 
566   if (IsBufferReleasedByAllActiveClientsExceptForOrphans()) {
567     buffer_state_->store(0U);
568     SignalAvailable();
569   }
570 
571   // Atomically clear the fence state bit as an orphaned consumer will never
572   // signal a release fence.
573   fence_state_->fetch_and(~consumer_state_mask, std::memory_order_release);
574 
575   // Atomically set the buffer state of this consumer to released state.
576   buffer_state_->fetch_and(~consumer_state_mask, std::memory_order_release);
577 
578   ALOGW(
579       "%s: detected new orphaned consumer buffer_id=%d "
580       "consumer_state_mask=%" PRIx32 " queue_index=%" PRId64
581       " buffer_state=%" PRIx32 " fence_state=%" PRIx32 ".",
582       __FUNCTION__, buffer_id(), consumer_state_mask,
583       metadata_header_->queueIndex,
584       buffer_state_->load(std::memory_order_acquire),
585       fence_state_->load(std::memory_order_acquire));
586 }
587 
AddConsumer(ConsumerChannel * channel)588 void ProducerChannel::AddConsumer(ConsumerChannel* channel) {
589   consumer_channels_.push_back(channel);
590 }
591 
RemoveConsumer(ConsumerChannel * channel)592 void ProducerChannel::RemoveConsumer(ConsumerChannel* channel) {
593   consumer_channels_.erase(
594       std::find(consumer_channels_.begin(), consumer_channels_.end(), channel));
595   // Restore the consumer state bit and make it visible in other threads that
596   // acquire the active_clients_bit_mask_.
597   uint32_t consumer_state_mask = channel->client_state_mask();
598   uint32_t current_active_clients_bit_mask =
599       active_clients_bit_mask_->load(std::memory_order_acquire);
600   uint32_t updated_active_clients_bit_mask =
601       current_active_clients_bit_mask & (~consumer_state_mask);
602   while (!active_clients_bit_mask_->compare_exchange_weak(
603       current_active_clients_bit_mask, updated_active_clients_bit_mask,
604       std::memory_order_acq_rel, std::memory_order_acquire)) {
605     ALOGI(
606         "%s: Failed to remove consumer state mask. Current active clients bit "
607         "mask is changed to %" PRIx32
608         " when trying to acquire and modify it to %" PRIx32
609         ". About to try again.",
610         __FUNCTION__, current_active_clients_bit_mask,
611         updated_active_clients_bit_mask);
612     updated_active_clients_bit_mask =
613         current_active_clients_bit_mask & (~consumer_state_mask);
614   }
615 
616   const uint32_t current_buffer_state =
617       buffer_state_->load(std::memory_order_acquire);
618   if (BufferHubDefs::isClientPosted(current_buffer_state,
619                                     consumer_state_mask) ||
620       BufferHubDefs::isClientAcquired(current_buffer_state,
621                                       consumer_state_mask)) {
622     // The consumer client is being destoryed without releasing. This could
623     // happen in corner cases when the consumer crashes. Here we mark it
624     // orphaned before remove it from producer.
625     OnConsumerOrphaned(consumer_state_mask);
626     return;
627   }
628 
629   if (BufferHubDefs::isClientReleased(current_buffer_state,
630                                       consumer_state_mask) ||
631       BufferHubDefs::isAnyClientGained(current_buffer_state)) {
632     // The consumer is being close while it is suppose to signal a release
633     // fence. Signal the dummy fence here.
634     if (fence_state_->load(std::memory_order_acquire) & consumer_state_mask) {
635       epoll_event event;
636       event.events = EPOLLIN;
637       event.data.u32 = consumer_state_mask;
638       if (epoll_ctl(release_fence_fd_.Get(), EPOLL_CTL_MOD,
639                     dummy_fence_fd_.Get(), &event) < 0) {
640         ALOGE(
641             "%s: Failed to modify the shared release fence to include the "
642             "dummy fence: %s",
643             __FUNCTION__, strerror(errno));
644         return;
645       }
646       ALOGW("%s: signal dummy release fence buffer_id=%d", __FUNCTION__,
647             buffer_id());
648       eventfd_write(dummy_fence_fd_.Get(), 1);
649     }
650   }
651 }
652 
653 // Returns true if the given parameters match the underlying buffer
654 // parameters.
CheckParameters(uint32_t width,uint32_t height,uint32_t layer_count,uint32_t format,uint64_t usage,size_t user_metadata_size) const655 bool ProducerChannel::CheckParameters(uint32_t width, uint32_t height,
656                                       uint32_t layer_count, uint32_t format,
657                                       uint64_t usage,
658                                       size_t user_metadata_size) const {
659   return user_metadata_size == user_metadata_size_ &&
660          buffer_.width() == width && buffer_.height() == height &&
661          buffer_.layer_count() == layer_count && buffer_.format() == format &&
662          buffer_.usage() == usage;
663 }
664 
IsBufferReleasedByAllActiveClientsExceptForOrphans() const665 bool ProducerChannel::IsBufferReleasedByAllActiveClientsExceptForOrphans()
666     const {
667   return (buffer_state_->load(std::memory_order_acquire) &
668           ~orphaned_consumer_bit_mask_ &
669           active_clients_bit_mask_->load(std::memory_order_acquire)) == 0U;
670 }
671 
672 }  // namespace dvr
673 }  // namespace android
674