1 #include <thread>
2 
3 #include <log/log.h>
4 #include <private/dvr/bufferhub_rpc.h>
5 #include <private/dvr/consumer_channel.h>
6 #include <private/dvr/producer_channel.h>
7 #include <utils/Trace.h>
8 
9 using android::pdx::BorrowedHandle;
10 using android::pdx::Channel;
11 using android::pdx::ErrorStatus;
12 using android::pdx::Message;
13 using android::pdx::Status;
14 using android::pdx::rpc::DispatchRemoteMethod;
15 
16 namespace android {
17 namespace dvr {
18 
ConsumerChannel(BufferHubService * service,int buffer_id,int channel_id,uint32_t client_state_mask,const std::shared_ptr<Channel> producer)19 ConsumerChannel::ConsumerChannel(BufferHubService* service, int buffer_id,
20                                  int channel_id, uint32_t client_state_mask,
21                                  const std::shared_ptr<Channel> producer)
22     : BufferHubChannel(service, buffer_id, channel_id, kConsumerType),
23       client_state_mask_(client_state_mask),
24       producer_(producer) {
25   GetProducer()->AddConsumer(this);
26 }
27 
~ConsumerChannel()28 ConsumerChannel::~ConsumerChannel() {
29   ALOGD_IF(TRACE,
30            "ConsumerChannel::~ConsumerChannel: channel_id=%d buffer_id=%d",
31            channel_id(), buffer_id());
32 
33   if (auto producer = GetProducer()) {
34     producer->RemoveConsumer(this);
35   }
36 }
37 
GetBufferInfo() const38 BufferHubChannel::BufferInfo ConsumerChannel::GetBufferInfo() const {
39   BufferHubChannel::BufferInfo info;
40   if (auto producer = GetProducer()) {
41     // If producer has not hung up, copy most buffer info from the producer.
42     info = producer->GetBufferInfo();
43   } else {
44     info.signaled_mask = client_state_mask();
45   }
46   info.id = buffer_id();
47   return info;
48 }
49 
GetProducer() const50 std::shared_ptr<ProducerChannel> ConsumerChannel::GetProducer() const {
51   return std::static_pointer_cast<ProducerChannel>(producer_.lock());
52 }
53 
HandleImpulse(Message & message)54 void ConsumerChannel::HandleImpulse(Message& message) {
55   ATRACE_NAME("ConsumerChannel::HandleImpulse");
56   switch (message.GetOp()) {
57     case BufferHubRPC::ConsumerAcquire::Opcode:
58       OnConsumerAcquire(message);
59       break;
60     case BufferHubRPC::ConsumerRelease::Opcode:
61       OnConsumerRelease(message, {});
62       break;
63   }
64 }
65 
HandleMessage(Message & message)66 bool ConsumerChannel::HandleMessage(Message& message) {
67   ATRACE_NAME("ConsumerChannel::HandleMessage");
68   auto producer = GetProducer();
69   if (!producer)
70     REPLY_ERROR_RETURN(message, EPIPE, true);
71 
72   switch (message.GetOp()) {
73     case BufferHubRPC::GetBuffer::Opcode:
74       DispatchRemoteMethod<BufferHubRPC::GetBuffer>(
75           *this, &ConsumerChannel::OnGetBuffer, message);
76       return true;
77 
78     case BufferHubRPC::NewConsumer::Opcode:
79       DispatchRemoteMethod<BufferHubRPC::NewConsumer>(
80           *producer, &ProducerChannel::OnNewConsumer, message);
81       return true;
82 
83     case BufferHubRPC::ConsumerAcquire::Opcode:
84       DispatchRemoteMethod<BufferHubRPC::ConsumerAcquire>(
85           *this, &ConsumerChannel::OnConsumerAcquire, message);
86       return true;
87 
88     case BufferHubRPC::ConsumerRelease::Opcode:
89       DispatchRemoteMethod<BufferHubRPC::ConsumerRelease>(
90           *this, &ConsumerChannel::OnConsumerRelease, message);
91       return true;
92 
93     default:
94       return false;
95   }
96 }
97 
OnGetBuffer(Message &)98 Status<BufferDescription<BorrowedHandle>> ConsumerChannel::OnGetBuffer(
99     Message& /*message*/) {
100   ATRACE_NAME("ConsumerChannel::OnGetBuffer");
101   ALOGD_IF(TRACE, "ConsumerChannel::OnGetBuffer: buffer=%d", buffer_id());
102   if (auto producer = GetProducer()) {
103     return {producer->GetBuffer(client_state_mask_)};
104   } else {
105     return ErrorStatus(EPIPE);
106   }
107 }
108 
OnConsumerAcquire(Message & message)109 Status<LocalFence> ConsumerChannel::OnConsumerAcquire(Message& message) {
110   ATRACE_NAME("ConsumerChannel::OnConsumerAcquire");
111   auto producer = GetProducer();
112   if (!producer)
113     return ErrorStatus(EPIPE);
114 
115   if (acquired_ || released_) {
116     ALOGE(
117         "ConsumerChannel::OnConsumerAcquire: Acquire when not posted: "
118         "acquired=%d released=%d channel_id=%d buffer_id=%d",
119         acquired_, released_, message.GetChannelId(), producer->buffer_id());
120     return ErrorStatus(EBUSY);
121   } else {
122     auto status = producer->OnConsumerAcquire(message);
123     if (status) {
124       ClearAvailable();
125       acquired_ = true;
126     }
127     return status;
128   }
129 }
130 
OnConsumerRelease(Message & message,LocalFence release_fence)131 Status<void> ConsumerChannel::OnConsumerRelease(Message& message,
132                                                 LocalFence release_fence) {
133   ATRACE_NAME("ConsumerChannel::OnConsumerRelease");
134   auto producer = GetProducer();
135   if (!producer)
136     return ErrorStatus(EPIPE);
137 
138   if (!acquired_ || released_) {
139     ALOGE(
140         "ConsumerChannel::OnConsumerRelease: Release when not acquired: "
141         "acquired=%d released=%d channel_id=%d buffer_id=%d",
142         acquired_, released_, message.GetChannelId(), producer->buffer_id());
143     return ErrorStatus(EBUSY);
144   } else {
145     auto status =
146         producer->OnConsumerRelease(message, std::move(release_fence));
147     if (status) {
148       ClearAvailable();
149       acquired_ = false;
150       released_ = true;
151     }
152     return status;
153   }
154 }
155 
OnProducerGained()156 void ConsumerChannel::OnProducerGained() {
157   // Clear the signal if exist. There is a possiblity that the signal still
158   // exist in consumer client when producer gains the buffer, e.g. newly added
159   // consumer fail to acquire the previous posted buffer in time. Then, when
160   // producer gains back the buffer, posts the buffer again and signal the
161   // consumer later, there won't be an signal change in eventfd, and thus,
162   // consumer will miss the posted buffer later. Thus, we need to clear the
163   // signal in consumer clients if the signal exist.
164   ClearAvailable();
165 }
166 
OnProducerPosted()167 void ConsumerChannel::OnProducerPosted() {
168   acquired_ = false;
169   released_ = false;
170   SignalAvailable();
171 }
172 
OnProducerClosed()173 void ConsumerChannel::OnProducerClosed() {
174   producer_.reset();
175   Hangup();
176 }
177 
178 }  // namespace dvr
179 }  // namespace android
180