1 /*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #define LOG_TAG "BufferPoolStatus"
18 //#define LOG_NDEBUG 0
19
20 #include <thread>
21 #include <time.h>
22 #include "BufferStatus.h"
23
24 namespace android {
25 namespace hardware {
26 namespace media {
27 namespace bufferpool {
28 namespace V2_0 {
29 namespace implementation {
30
getTimestampNow()31 int64_t getTimestampNow() {
32 int64_t stamp;
33 struct timespec ts;
34 // TODO: CLOCK_MONOTONIC_COARSE?
35 clock_gettime(CLOCK_MONOTONIC, &ts);
36 stamp = ts.tv_nsec / 1000;
37 stamp += (ts.tv_sec * 1000000LL);
38 return stamp;
39 }
40
isMessageLater(uint32_t curMsgId,uint32_t prevMsgId)41 bool isMessageLater(uint32_t curMsgId, uint32_t prevMsgId) {
42 return curMsgId != prevMsgId && curMsgId - prevMsgId < prevMsgId - curMsgId;
43 }
44
isBufferInRange(BufferId from,BufferId to,BufferId bufferId)45 bool isBufferInRange(BufferId from, BufferId to, BufferId bufferId) {
46 if (from < to) {
47 return from <= bufferId && bufferId < to;
48 } else { // wrap happens
49 return from <= bufferId || bufferId < to;
50 }
51 }
52
53 static constexpr int kNumElementsInQueue = 1024*16;
54 static constexpr int kMinElementsToSyncInQueue = 128;
55
open(ConnectionId id,const StatusDescriptor ** fmqDescPtr)56 ResultStatus BufferStatusObserver::open(
57 ConnectionId id, const StatusDescriptor** fmqDescPtr) {
58 if (mBufferStatusQueues.find(id) != mBufferStatusQueues.end()) {
59 // TODO: id collision log?
60 return ResultStatus::CRITICAL_ERROR;
61 }
62 std::unique_ptr<BufferStatusQueue> queue =
63 std::make_unique<BufferStatusQueue>(kNumElementsInQueue);
64 if (!queue || queue->isValid() == false) {
65 *fmqDescPtr = nullptr;
66 return ResultStatus::NO_MEMORY;
67 } else {
68 *fmqDescPtr = queue->getDesc();
69 }
70 auto result = mBufferStatusQueues.insert(
71 std::make_pair(id, std::move(queue)));
72 if (!result.second) {
73 *fmqDescPtr = nullptr;
74 return ResultStatus::NO_MEMORY;
75 }
76 return ResultStatus::OK;
77 }
78
close(ConnectionId id)79 ResultStatus BufferStatusObserver::close(ConnectionId id) {
80 if (mBufferStatusQueues.find(id) == mBufferStatusQueues.end()) {
81 return ResultStatus::CRITICAL_ERROR;
82 }
83 mBufferStatusQueues.erase(id);
84 return ResultStatus::OK;
85 }
86
getBufferStatusChanges(std::vector<BufferStatusMessage> & messages)87 void BufferStatusObserver::getBufferStatusChanges(std::vector<BufferStatusMessage> &messages) {
88 for (auto it = mBufferStatusQueues.begin(); it != mBufferStatusQueues.end(); ++it) {
89 BufferStatusMessage message;
90 size_t avail = it->second->availableToRead();
91 while (avail > 0) {
92 if (!it->second->read(&message, 1)) {
93 // Since avaliable # of reads are already confirmed,
94 // this should not happen.
95 // TODO: error handling (spurious client?)
96 ALOGW("FMQ message cannot be read from %lld", (long long)it->first);
97 return;
98 }
99 message.connectionId = it->first;
100 messages.push_back(message);
101 --avail;
102 }
103 }
104 }
105
BufferStatusChannel(const StatusDescriptor & fmqDesc)106 BufferStatusChannel::BufferStatusChannel(
107 const StatusDescriptor &fmqDesc) {
108 std::unique_ptr<BufferStatusQueue> queue =
109 std::make_unique<BufferStatusQueue>(fmqDesc);
110 if (!queue || queue->isValid() == false) {
111 mValid = false;
112 return;
113 }
114 mValid = true;
115 mBufferStatusQueue = std::move(queue);
116 }
117
isValid()118 bool BufferStatusChannel::isValid() {
119 return mValid;
120 }
121
needsSync()122 bool BufferStatusChannel::needsSync() {
123 if (mValid) {
124 size_t avail = mBufferStatusQueue->availableToWrite();
125 return avail + kMinElementsToSyncInQueue < kNumElementsInQueue;
126 }
127 return false;
128 }
129
postBufferRelease(ConnectionId connectionId,std::list<BufferId> & pending,std::list<BufferId> & posted)130 void BufferStatusChannel::postBufferRelease(
131 ConnectionId connectionId,
132 std::list<BufferId> &pending, std::list<BufferId> &posted) {
133 if (mValid && pending.size() > 0) {
134 size_t avail = mBufferStatusQueue->availableToWrite();
135 avail = std::min(avail, pending.size());
136 BufferStatusMessage message;
137 for (size_t i = 0 ; i < avail; ++i) {
138 BufferId id = pending.front();
139 message.newStatus = BufferStatus::NOT_USED;
140 message.bufferId = id;
141 message.connectionId = connectionId;
142 if (!mBufferStatusQueue->write(&message, 1)) {
143 // Since avaliable # of writes are already confirmed,
144 // this should not happen.
145 // TODO: error handing?
146 ALOGW("FMQ message cannot be sent from %lld", (long long)connectionId);
147 return;
148 }
149 pending.pop_front();
150 posted.push_back(id);
151 }
152 }
153 }
154
postBufferInvalidateAck(ConnectionId connectionId,uint32_t invalidateId,bool * invalidated)155 void BufferStatusChannel::postBufferInvalidateAck(
156 ConnectionId connectionId,
157 uint32_t invalidateId,
158 bool *invalidated) {
159 if (mValid && !*invalidated) {
160 size_t avail = mBufferStatusQueue->availableToWrite();
161 if (avail > 0) {
162 BufferStatusMessage message;
163 message.newStatus = BufferStatus::INVALIDATION_ACK;
164 message.bufferId = invalidateId;
165 message.connectionId = connectionId;
166 if (!mBufferStatusQueue->write(&message, 1)) {
167 // Since avaliable # of writes are already confirmed,
168 // this should not happen.
169 // TODO: error handing?
170 ALOGW("FMQ message cannot be sent from %lld", (long long)connectionId);
171 return;
172 }
173 *invalidated = true;
174 }
175 }
176 }
177
postBufferStatusMessage(TransactionId transactionId,BufferId bufferId,BufferStatus status,ConnectionId connectionId,ConnectionId targetId,std::list<BufferId> & pending,std::list<BufferId> & posted)178 bool BufferStatusChannel::postBufferStatusMessage(
179 TransactionId transactionId, BufferId bufferId,
180 BufferStatus status, ConnectionId connectionId, ConnectionId targetId,
181 std::list<BufferId> &pending, std::list<BufferId> &posted) {
182 if (mValid) {
183 size_t avail = mBufferStatusQueue->availableToWrite();
184 size_t numPending = pending.size();
185 if (avail >= numPending + 1) {
186 BufferStatusMessage release, message;
187 for (size_t i = 0; i < numPending; ++i) {
188 BufferId id = pending.front();
189 release.newStatus = BufferStatus::NOT_USED;
190 release.bufferId = id;
191 release.connectionId = connectionId;
192 if (!mBufferStatusQueue->write(&release, 1)) {
193 // Since avaliable # of writes are already confirmed,
194 // this should not happen.
195 // TODO: error handling?
196 ALOGW("FMQ message cannot be sent from %lld", (long long)connectionId);
197 return false;
198 }
199 pending.pop_front();
200 posted.push_back(id);
201 }
202 message.transactionId = transactionId;
203 message.bufferId = bufferId;
204 message.newStatus = status;
205 message.connectionId = connectionId;
206 message.targetConnectionId = targetId;
207 // TODO : timesatamp
208 message.timestampUs = 0;
209 if (!mBufferStatusQueue->write(&message, 1)) {
210 // Since avaliable # of writes are already confirmed,
211 // this should not happen.
212 ALOGW("FMQ message cannot be sent from %lld", (long long)connectionId);
213 return false;
214 }
215 return true;
216 }
217 }
218 return false;
219 }
220
BufferInvalidationListener(const InvalidationDescriptor & fmqDesc)221 BufferInvalidationListener::BufferInvalidationListener(
222 const InvalidationDescriptor &fmqDesc) {
223 std::unique_ptr<BufferInvalidationQueue> queue =
224 std::make_unique<BufferInvalidationQueue>(fmqDesc);
225 if (!queue || queue->isValid() == false) {
226 mValid = false;
227 return;
228 }
229 mValid = true;
230 mBufferInvalidationQueue = std::move(queue);
231 // drain previous messages
232 size_t avail = std::min(
233 mBufferInvalidationQueue->availableToRead(), (size_t) kNumElementsInQueue);
234 std::vector<BufferInvalidationMessage> temp(avail);
235 if (avail > 0) {
236 mBufferInvalidationQueue->read(temp.data(), avail);
237 }
238 }
239
getInvalidations(std::vector<BufferInvalidationMessage> & messages)240 void BufferInvalidationListener::getInvalidations(
241 std::vector<BufferInvalidationMessage> &messages) {
242 // Try twice in case of overflow.
243 // TODO: handling overflow though it may not happen.
244 for (int i = 0; i < 2; ++i) {
245 size_t avail = std::min(
246 mBufferInvalidationQueue->availableToRead(), (size_t) kNumElementsInQueue);
247 if (avail > 0) {
248 std::vector<BufferInvalidationMessage> temp(avail);
249 if (mBufferInvalidationQueue->read(temp.data(), avail)) {
250 messages.reserve(messages.size() + avail);
251 for (auto it = temp.begin(); it != temp.end(); ++it) {
252 messages.push_back(*it);
253 }
254 break;
255 }
256 } else {
257 return;
258 }
259 }
260 }
261
isValid()262 bool BufferInvalidationListener::isValid() {
263 return mValid;
264 }
265
BufferInvalidationChannel()266 BufferInvalidationChannel::BufferInvalidationChannel()
267 : mValid(true),
268 mBufferInvalidationQueue(
269 std::make_unique<BufferInvalidationQueue>(kNumElementsInQueue, true)) {
270 if (!mBufferInvalidationQueue || mBufferInvalidationQueue->isValid() == false) {
271 mValid = false;
272 }
273 }
274
isValid()275 bool BufferInvalidationChannel::isValid() {
276 return mValid;
277 }
278
getDesc(const InvalidationDescriptor ** fmqDescPtr)279 void BufferInvalidationChannel::getDesc(const InvalidationDescriptor **fmqDescPtr) {
280 if (mValid) {
281 *fmqDescPtr = mBufferInvalidationQueue->getDesc();
282 } else {
283 *fmqDescPtr = nullptr;
284 }
285 }
286
postInvalidation(uint32_t msgId,BufferId fromId,BufferId toId)287 void BufferInvalidationChannel::postInvalidation(
288 uint32_t msgId, BufferId fromId, BufferId toId) {
289 BufferInvalidationMessage message;
290
291 message.messageId = msgId;
292 message.fromBufferId = fromId;
293 message.toBufferId = toId;
294 // TODO: handle failure (it does not happen normally.)
295 mBufferInvalidationQueue->write(&message);
296 }
297
298 } // namespace implementation
299 } // namespace V2_0
300 } // namespace bufferpool
301 } // namespace media
302 } // namespace hardware
303 } // namespace android
304
305