#include <pdx/channel_handle.h> #include <private/dvr/consumer_queue_channel.h> #include <private/dvr/producer_channel.h> using android::pdx::ErrorStatus; using android::pdx::RemoteChannelHandle; using android::pdx::Status; using android::pdx::rpc::DispatchRemoteMethod; using android::pdx::rpc::RemoteMethodError; namespace android { namespace dvr { ConsumerQueueChannel::ConsumerQueueChannel( BufferHubService* service, int buffer_id, int channel_id, const std::shared_ptr<Channel>& producer, bool silent) : BufferHubChannel(service, buffer_id, channel_id, kConsumerQueueType), producer_(producer), capacity_(0), silent_(silent) { GetProducer()->AddConsumer(this); } ConsumerQueueChannel::~ConsumerQueueChannel() { ALOGD_IF(TRACE, "ConsumerQueueChannel::~ConsumerQueueChannel: channel_id=%d", channel_id()); if (auto producer = GetProducer()) { producer->RemoveConsumer(this); } } bool ConsumerQueueChannel::HandleMessage(Message& message) { ATRACE_NAME("ConsumerQueueChannel::HandleMessage"); auto producer = GetProducer(); if (!producer) { RemoteMethodError(message, EPIPE); return true; } switch (message.GetOp()) { case BufferHubRPC::CreateConsumerQueue::Opcode: DispatchRemoteMethod<BufferHubRPC::CreateConsumerQueue>( *producer, &ProducerQueueChannel::OnCreateConsumerQueue, message); return true; case BufferHubRPC::GetQueueInfo::Opcode: DispatchRemoteMethod<BufferHubRPC::GetQueueInfo>( *producer, &ProducerQueueChannel::OnGetQueueInfo, message); return true; case BufferHubRPC::ConsumerQueueImportBuffers::Opcode: DispatchRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>( *this, &ConsumerQueueChannel::OnConsumerQueueImportBuffers, message); return true; default: return false; } } std::shared_ptr<ProducerQueueChannel> ConsumerQueueChannel::GetProducer() const { return std::static_pointer_cast<ProducerQueueChannel>(producer_.lock()); } void ConsumerQueueChannel::HandleImpulse(Message& /* message */) { ATRACE_NAME("ConsumerQueueChannel::HandleImpulse"); } BufferHubChannel::BufferInfo ConsumerQueueChannel::GetBufferInfo() const { BufferHubChannel::BufferInfo info; if (auto producer = GetProducer()) { // If producer has not hung up, copy most buffer info from the producer. info = producer->GetBufferInfo(); } info.id = buffer_id(); info.capacity = capacity_; return info; } void ConsumerQueueChannel::RegisterNewBuffer( const std::shared_ptr<ProducerChannel>& producer_channel, size_t producer_slot) { ALOGD_IF(TRACE, "%s: queue_id=%d buffer_id=%d slot=%zu silent=%d", __FUNCTION__, buffer_id(), producer_channel->buffer_id(), producer_slot, silent_); // Only register buffers if the queue is not silent. if (silent_) { return; } auto status = producer_channel->CreateConsumerStateMask(); if (!status.ok()) { ALOGE("%s: Failed to create consumer state mask: %s", __FUNCTION__, status.GetErrorMessage().c_str()); return; } uint64_t consumer_state_mask = status.get(); pending_buffer_slots_.emplace(producer_channel, producer_slot, consumer_state_mask); // Signal the client that there is new buffer available. SignalAvailable(); } Status<std::vector<std::pair<RemoteChannelHandle, size_t>>> ConsumerQueueChannel::OnConsumerQueueImportBuffers(Message& message) { std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles; ATRACE_NAME(__FUNCTION__); ALOGD_IF(TRACE, "%s: pending_buffer_slots=%zu", __FUNCTION__, pending_buffer_slots_.size()); // Indicate this is a silent queue that will not import buffers. if (silent_) return ErrorStatus(EBADR); while (!pending_buffer_slots_.empty()) { auto producer_channel = pending_buffer_slots_.front().producer_channel.lock(); size_t producer_slot = pending_buffer_slots_.front().producer_slot; uint64_t consumer_state_mask = pending_buffer_slots_.front().consumer_state_mask; pending_buffer_slots_.pop(); // It's possible that the producer channel has expired. When this occurs, // ignore the producer channel. if (producer_channel == nullptr) { ALOGW("%s: producer channel has already been expired.", __FUNCTION__); continue; } auto status = producer_channel->CreateConsumer(message, consumer_state_mask); // If no buffers are imported successfully, clear available and return an // error. Otherwise, return all consumer handles already imported // successfully, but keep available bits on, so that the client can retry // importing remaining consumer buffers. if (!status) { ALOGE("%s: Failed create consumer: %s", __FUNCTION__, status.GetErrorMessage().c_str()); if (buffer_handles.empty()) { ClearAvailable(); return status.error_status(); } else { return {std::move(buffer_handles)}; } } buffer_handles.emplace_back(status.take(), producer_slot); } ClearAvailable(); return {std::move(buffer_handles)}; } void ConsumerQueueChannel::OnProducerClosed() { ALOGD_IF(TRACE, "ConsumerQueueChannel::OnProducerClosed: queue_id=%d", buffer_id()); producer_.reset(); Hangup(); } } // namespace dvr } // namespace android