1 /*
2 * Copyright (C) 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #define LOG_TAG "android.hardware.tv.tuner@1.0-Filter"
18
19 #include "Filter.h"
20 #include <utils/Log.h>
21
22 namespace android {
23 namespace hardware {
24 namespace tv {
25 namespace tuner {
26 namespace V1_0 {
27 namespace implementation {
28
29 #define WAIT_TIMEOUT 3000000000
30
Filter()31 Filter::Filter() {}
32
Filter(DemuxFilterType type,uint32_t filterId,uint32_t bufferSize,const sp<IFilterCallback> & cb,sp<Demux> demux)33 Filter::Filter(DemuxFilterType type, uint32_t filterId, uint32_t bufferSize,
34 const sp<IFilterCallback>& cb, sp<Demux> demux) {
35 mType = type;
36 mFilterId = filterId;
37 mBufferSize = bufferSize;
38 mCallback = cb;
39 mDemux = demux;
40 }
41
~Filter()42 Filter::~Filter() {}
43
getId(getId_cb _hidl_cb)44 Return<void> Filter::getId(getId_cb _hidl_cb) {
45 ALOGV("%s", __FUNCTION__);
46
47 _hidl_cb(Result::SUCCESS, mFilterId);
48 return Void();
49 }
50
setDataSource(const sp<IFilter> & filter)51 Return<Result> Filter::setDataSource(const sp<IFilter>& filter) {
52 ALOGV("%s", __FUNCTION__);
53
54 mDataSource = filter;
55 mIsDataSourceDemux = false;
56
57 return Result::SUCCESS;
58 }
59
getQueueDesc(getQueueDesc_cb _hidl_cb)60 Return<void> Filter::getQueueDesc(getQueueDesc_cb _hidl_cb) {
61 ALOGV("%s", __FUNCTION__);
62
63 _hidl_cb(Result::SUCCESS, *mFilterMQ->getDesc());
64 return Void();
65 }
66
configure(const DemuxFilterSettings & settings)67 Return<Result> Filter::configure(const DemuxFilterSettings& settings) {
68 ALOGV("%s", __FUNCTION__);
69
70 mFilterSettings = settings;
71 switch (mType.mainType) {
72 case DemuxFilterMainType::TS:
73 mTpid = settings.ts().tpid;
74 break;
75 case DemuxFilterMainType::MMTP:
76 /*mmtpSettings*/
77 break;
78 case DemuxFilterMainType::IP:
79 /*ipSettings*/
80 break;
81 case DemuxFilterMainType::TLV:
82 /*tlvSettings*/
83 break;
84 case DemuxFilterMainType::ALP:
85 /*alpSettings*/
86 break;
87 default:
88 break;
89 }
90
91 return Result::SUCCESS;
92 }
93
start()94 Return<Result> Filter::start() {
95 ALOGV("%s", __FUNCTION__);
96
97 return startFilterLoop();
98 }
99
stop()100 Return<Result> Filter::stop() {
101 ALOGV("%s", __FUNCTION__);
102
103 mFilterThreadRunning = false;
104
105 std::lock_guard<std::mutex> lock(mFilterThreadLock);
106
107 return Result::SUCCESS;
108 }
109
flush()110 Return<Result> Filter::flush() {
111 ALOGV("%s", __FUNCTION__);
112
113 // temp implementation to flush the FMQ
114 int size = mFilterMQ->availableToRead();
115 char* buffer = new char[size];
116 mFilterMQ->read((unsigned char*)&buffer[0], size);
117 delete[] buffer;
118 mFilterStatus = DemuxFilterStatus::DATA_READY;
119
120 return Result::SUCCESS;
121 }
122
releaseAvHandle(const hidl_handle &,uint64_t)123 Return<Result> Filter::releaseAvHandle(const hidl_handle& /*avMemory*/, uint64_t /*avDataId*/) {
124 ALOGV("%s", __FUNCTION__);
125
126 return Result::SUCCESS;
127 }
128
close()129 Return<Result> Filter::close() {
130 ALOGV("%s", __FUNCTION__);
131
132 return mDemux->removeFilter(mFilterId);
133 }
134
createFilterMQ()135 bool Filter::createFilterMQ() {
136 ALOGV("%s", __FUNCTION__);
137
138 // Create a synchronized FMQ that supports blocking read/write
139 std::unique_ptr<FilterMQ> tmpFilterMQ =
140 std::unique_ptr<FilterMQ>(new (std::nothrow) FilterMQ(mBufferSize, true));
141 if (!tmpFilterMQ->isValid()) {
142 ALOGW("Failed to create FMQ of filter with id: %d", mFilterId);
143 return false;
144 }
145
146 mFilterMQ = std::move(tmpFilterMQ);
147
148 if (EventFlag::createEventFlag(mFilterMQ->getEventFlagWord(), &mFilterEventFlag) != OK) {
149 return false;
150 }
151
152 return true;
153 }
154
startFilterLoop()155 Result Filter::startFilterLoop() {
156 pthread_create(&mFilterThread, NULL, __threadLoopFilter, this);
157 pthread_setname_np(mFilterThread, "filter_waiting_loop");
158
159 return Result::SUCCESS;
160 }
161
__threadLoopFilter(void * user)162 void* Filter::__threadLoopFilter(void* user) {
163 Filter* const self = static_cast<Filter*>(user);
164 self->filterThreadLoop();
165 return 0;
166 }
167
filterThreadLoop()168 void Filter::filterThreadLoop() {
169 ALOGD("[Filter] filter %d threadLoop start.", mFilterId);
170 std::lock_guard<std::mutex> lock(mFilterThreadLock);
171 mFilterThreadRunning = true;
172
173 // For the first time of filter output, implementation needs to send the filter
174 // Event Callback without waiting for the DATA_CONSUMED to init the process.
175 while (mFilterThreadRunning) {
176 if (mFilterEvent.events.size() == 0) {
177 ALOGD("[Filter] wait for filter data output.");
178 usleep(1000 * 1000);
179 continue;
180 }
181 // After successfully write, send a callback and wait for the read to be done
182 mCallback->onFilterEvent(mFilterEvent);
183 mFilterEvent.events.resize(0);
184 mFilterStatus = DemuxFilterStatus::DATA_READY;
185 mCallback->onFilterStatus(mFilterStatus);
186 break;
187 }
188
189 while (mFilterThreadRunning) {
190 uint32_t efState = 0;
191 // We do not wait for the last round of written data to be read to finish the thread
192 // because the VTS can verify the reading itself.
193 for (int i = 0; i < SECTION_WRITE_COUNT; i++) {
194 while (mFilterThreadRunning) {
195 status_t status = mFilterEventFlag->wait(
196 static_cast<uint32_t>(DemuxQueueNotifyBits::DATA_CONSUMED), &efState,
197 WAIT_TIMEOUT, true /* retry on spurious wake */);
198 if (status != OK) {
199 ALOGD("[Filter] wait for data consumed");
200 continue;
201 }
202 break;
203 }
204
205 if (mCallback == nullptr) {
206 ALOGD("[Filter] filter %d does not hava callback. Ending thread", mFilterId);
207 break;
208 }
209
210 maySendFilterStatusCallback();
211
212 while (mFilterThreadRunning) {
213 std::lock_guard<std::mutex> lock(mFilterEventLock);
214 if (mFilterEvent.events.size() == 0) {
215 continue;
216 }
217 // After successfully write, send a callback and wait for the read to be done
218 mCallback->onFilterEvent(mFilterEvent);
219 mFilterEvent.events.resize(0);
220 break;
221 }
222 // We do not wait for the last read to be done
223 // VTS can verify the read result itself.
224 if (i == SECTION_WRITE_COUNT - 1) {
225 ALOGD("[Filter] filter %d writing done. Ending thread", mFilterId);
226 break;
227 }
228 }
229 mFilterThreadRunning = false;
230 }
231
232 ALOGD("[Filter] filter thread ended.");
233 }
234
maySendFilterStatusCallback()235 void Filter::maySendFilterStatusCallback() {
236 std::lock_guard<std::mutex> lock(mFilterStatusLock);
237 int availableToRead = mFilterMQ->availableToRead();
238 int availableToWrite = mFilterMQ->availableToWrite();
239 int fmqSize = mFilterMQ->getQuantumCount();
240
241 DemuxFilterStatus newStatus = checkFilterStatusChange(
242 availableToWrite, availableToRead, ceil(fmqSize * 0.75), ceil(fmqSize * 0.25));
243 if (mFilterStatus != newStatus) {
244 mCallback->onFilterStatus(newStatus);
245 mFilterStatus = newStatus;
246 }
247 }
248
checkFilterStatusChange(uint32_t availableToWrite,uint32_t availableToRead,uint32_t highThreshold,uint32_t lowThreshold)249 DemuxFilterStatus Filter::checkFilterStatusChange(uint32_t availableToWrite,
250 uint32_t availableToRead, uint32_t highThreshold,
251 uint32_t lowThreshold) {
252 if (availableToWrite == 0) {
253 return DemuxFilterStatus::OVERFLOW;
254 } else if (availableToRead > highThreshold) {
255 return DemuxFilterStatus::HIGH_WATER;
256 } else if (availableToRead < lowThreshold) {
257 return DemuxFilterStatus::LOW_WATER;
258 }
259 return mFilterStatus;
260 }
261
getTpid()262 uint16_t Filter::getTpid() {
263 return mTpid;
264 }
265
updateFilterOutput(vector<uint8_t> data)266 void Filter::updateFilterOutput(vector<uint8_t> data) {
267 std::lock_guard<std::mutex> lock(mFilterOutputLock);
268 ALOGD("[Filter] filter output updated");
269 mFilterOutput.insert(mFilterOutput.end(), data.begin(), data.end());
270 }
271
updateRecordOutput(vector<uint8_t> data)272 void Filter::updateRecordOutput(vector<uint8_t> data) {
273 std::lock_guard<std::mutex> lock(mRecordFilterOutputLock);
274 ALOGD("[Filter] record filter output updated");
275 mRecordFilterOutput.insert(mRecordFilterOutput.end(), data.begin(), data.end());
276 }
277
startFilterHandler()278 Result Filter::startFilterHandler() {
279 std::lock_guard<std::mutex> lock(mFilterOutputLock);
280 switch (mType.mainType) {
281 case DemuxFilterMainType::TS:
282 switch (mType.subType.tsFilterType()) {
283 case DemuxTsFilterType::UNDEFINED:
284 break;
285 case DemuxTsFilterType::SECTION:
286 startSectionFilterHandler();
287 break;
288 case DemuxTsFilterType::PES:
289 startPesFilterHandler();
290 break;
291 case DemuxTsFilterType::TS:
292 startTsFilterHandler();
293 break;
294 case DemuxTsFilterType::AUDIO:
295 case DemuxTsFilterType::VIDEO:
296 startMediaFilterHandler();
297 break;
298 case DemuxTsFilterType::PCR:
299 startPcrFilterHandler();
300 break;
301 case DemuxTsFilterType::TEMI:
302 startTemiFilterHandler();
303 break;
304 default:
305 break;
306 }
307 break;
308 case DemuxFilterMainType::MMTP:
309 /*mmtpSettings*/
310 break;
311 case DemuxFilterMainType::IP:
312 /*ipSettings*/
313 break;
314 case DemuxFilterMainType::TLV:
315 /*tlvSettings*/
316 break;
317 case DemuxFilterMainType::ALP:
318 /*alpSettings*/
319 break;
320 default:
321 break;
322 }
323 return Result::SUCCESS;
324 }
325
startSectionFilterHandler()326 Result Filter::startSectionFilterHandler() {
327 if (mFilterOutput.empty()) {
328 return Result::SUCCESS;
329 }
330 if (!writeSectionsAndCreateEvent(mFilterOutput)) {
331 ALOGD("[Filter] filter %d fails to write into FMQ. Ending thread", mFilterId);
332 return Result::UNKNOWN_ERROR;
333 }
334
335 mFilterOutput.clear();
336
337 return Result::SUCCESS;
338 }
339
startPesFilterHandler()340 Result Filter::startPesFilterHandler() {
341 std::lock_guard<std::mutex> lock(mFilterEventLock);
342 if (mFilterOutput.empty()) {
343 return Result::SUCCESS;
344 }
345
346 for (int i = 0; i < mFilterOutput.size(); i += 188) {
347 if (mPesSizeLeft == 0) {
348 uint32_t prefix = (mFilterOutput[i + 4] << 16) | (mFilterOutput[i + 5] << 8) |
349 mFilterOutput[i + 6];
350 if (DEBUG_FILTER) {
351 ALOGD("[Filter] prefix %d", prefix);
352 }
353 if (prefix == 0x000001) {
354 // TODO handle mulptiple Pes filters
355 mPesSizeLeft = (mFilterOutput[i + 8] << 8) | mFilterOutput[i + 9];
356 mPesSizeLeft += 6;
357 if (DEBUG_FILTER) {
358 ALOGD("[Filter] pes data length %d", mPesSizeLeft);
359 }
360 } else {
361 continue;
362 }
363 }
364
365 int endPoint = min(184, mPesSizeLeft);
366 // append data and check size
367 vector<uint8_t>::const_iterator first = mFilterOutput.begin() + i + 4;
368 vector<uint8_t>::const_iterator last = mFilterOutput.begin() + i + 4 + endPoint;
369 mPesOutput.insert(mPesOutput.end(), first, last);
370 // size does not match then continue
371 mPesSizeLeft -= endPoint;
372 if (DEBUG_FILTER) {
373 ALOGD("[Filter] pes data left %d", mPesSizeLeft);
374 }
375 if (mPesSizeLeft > 0) {
376 continue;
377 }
378 // size match then create event
379 if (!writeDataToFilterMQ(mPesOutput)) {
380 ALOGD("[Filter] pes data write failed");
381 mFilterOutput.clear();
382 return Result::INVALID_STATE;
383 }
384 maySendFilterStatusCallback();
385 DemuxFilterPesEvent pesEvent;
386 pesEvent = {
387 // temp dump meta data
388 .streamId = mPesOutput[3],
389 .dataLength = static_cast<uint16_t>(mPesOutput.size()),
390 };
391 if (DEBUG_FILTER) {
392 ALOGD("[Filter] assembled pes data length %d", pesEvent.dataLength);
393 }
394
395 int size = mFilterEvent.events.size();
396 mFilterEvent.events.resize(size + 1);
397 mFilterEvent.events[size].pes(pesEvent);
398 mPesOutput.clear();
399 }
400
401 mFilterOutput.clear();
402
403 return Result::SUCCESS;
404 }
405
startTsFilterHandler()406 Result Filter::startTsFilterHandler() {
407 // TODO handle starting TS filter
408 return Result::SUCCESS;
409 }
410
startMediaFilterHandler()411 Result Filter::startMediaFilterHandler() {
412 DemuxFilterMediaEvent mediaEvent;
413 mediaEvent = {
414 // temp dump meta data
415 .pts = 0,
416 .dataLength = 530,
417 .avMemory = nullptr,
418 .isSecureMemory = false,
419 };
420 mFilterEvent.events.resize(1);
421 mFilterEvent.events[0].media(mediaEvent);
422
423 mFilterOutput.clear();
424 // TODO handle write FQM for media stream
425 return Result::SUCCESS;
426 }
427
startRecordFilterHandler()428 Result Filter::startRecordFilterHandler() {
429 /*DemuxFilterTsRecordEvent tsRecordEvent;
430 tsRecordEvent.pid.tPid(0);
431 tsRecordEvent.indexMask.tsIndexMask(0x01);
432 mFilterEvent.events.resize(1);
433 mFilterEvent.events[0].tsRecord(tsRecordEvent);
434 */
435 std::lock_guard<std::mutex> lock(mRecordFilterOutputLock);
436 if (mRecordFilterOutput.empty()) {
437 return Result::SUCCESS;
438 }
439
440 if (mDvr == nullptr || !mDvr->writeRecordFMQ(mRecordFilterOutput)) {
441 ALOGD("[Filter] dvr fails to write into record FMQ.");
442 return Result::UNKNOWN_ERROR;
443 }
444
445 mRecordFilterOutput.clear();
446 return Result::SUCCESS;
447 }
448
startPcrFilterHandler()449 Result Filter::startPcrFilterHandler() {
450 // TODO handle starting PCR filter
451 return Result::SUCCESS;
452 }
453
startTemiFilterHandler()454 Result Filter::startTemiFilterHandler() {
455 // TODO handle starting TEMI filter
456 return Result::SUCCESS;
457 }
458
writeSectionsAndCreateEvent(vector<uint8_t> data)459 bool Filter::writeSectionsAndCreateEvent(vector<uint8_t> data) {
460 // TODO check how many sections has been read
461 ALOGD("[Filter] section hander");
462 std::lock_guard<std::mutex> lock(mFilterEventLock);
463 if (!writeDataToFilterMQ(data)) {
464 return false;
465 }
466 int size = mFilterEvent.events.size();
467 mFilterEvent.events.resize(size + 1);
468 DemuxFilterSectionEvent secEvent;
469 secEvent = {
470 // temp dump meta data
471 .tableId = 0,
472 .version = 1,
473 .sectionNum = 1,
474 .dataLength = static_cast<uint16_t>(data.size()),
475 };
476 mFilterEvent.events[size].section(secEvent);
477 return true;
478 }
479
writeDataToFilterMQ(const std::vector<uint8_t> & data)480 bool Filter::writeDataToFilterMQ(const std::vector<uint8_t>& data) {
481 std::lock_guard<std::mutex> lock(mWriteLock);
482 if (mFilterMQ->write(data.data(), data.size())) {
483 return true;
484 }
485 return false;
486 }
487
attachFilterToRecord(const sp<Dvr> dvr)488 void Filter::attachFilterToRecord(const sp<Dvr> dvr) {
489 mDvr = dvr;
490 }
491
detachFilterFromRecord()492 void Filter::detachFilterFromRecord() {
493 mDvr = nullptr;
494 }
495
496 } // namespace implementation
497 } // namespace V1_0
498 } // namespace tuner
499 } // namespace tv
500 } // namespace hardware
501 } // namespace android
502