1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define LOG_TAG "android.hardware.tv.tuner@1.0-Filter"
18 
19 #include "Filter.h"
20 #include <utils/Log.h>
21 
22 namespace android {
23 namespace hardware {
24 namespace tv {
25 namespace tuner {
26 namespace V1_0 {
27 namespace implementation {
28 
29 #define WAIT_TIMEOUT 3000000000
30 
Filter()31 Filter::Filter() {}
32 
Filter(DemuxFilterType type,uint32_t filterId,uint32_t bufferSize,const sp<IFilterCallback> & cb,sp<Demux> demux)33 Filter::Filter(DemuxFilterType type, uint32_t filterId, uint32_t bufferSize,
34                const sp<IFilterCallback>& cb, sp<Demux> demux) {
35     mType = type;
36     mFilterId = filterId;
37     mBufferSize = bufferSize;
38     mCallback = cb;
39     mDemux = demux;
40 }
41 
~Filter()42 Filter::~Filter() {}
43 
getId(getId_cb _hidl_cb)44 Return<void> Filter::getId(getId_cb _hidl_cb) {
45     ALOGV("%s", __FUNCTION__);
46 
47     _hidl_cb(Result::SUCCESS, mFilterId);
48     return Void();
49 }
50 
setDataSource(const sp<IFilter> & filter)51 Return<Result> Filter::setDataSource(const sp<IFilter>& filter) {
52     ALOGV("%s", __FUNCTION__);
53 
54     mDataSource = filter;
55     mIsDataSourceDemux = false;
56 
57     return Result::SUCCESS;
58 }
59 
getQueueDesc(getQueueDesc_cb _hidl_cb)60 Return<void> Filter::getQueueDesc(getQueueDesc_cb _hidl_cb) {
61     ALOGV("%s", __FUNCTION__);
62 
63     _hidl_cb(Result::SUCCESS, *mFilterMQ->getDesc());
64     return Void();
65 }
66 
configure(const DemuxFilterSettings & settings)67 Return<Result> Filter::configure(const DemuxFilterSettings& settings) {
68     ALOGV("%s", __FUNCTION__);
69 
70     mFilterSettings = settings;
71     switch (mType.mainType) {
72         case DemuxFilterMainType::TS:
73             mTpid = settings.ts().tpid;
74             break;
75         case DemuxFilterMainType::MMTP:
76             /*mmtpSettings*/
77             break;
78         case DemuxFilterMainType::IP:
79             /*ipSettings*/
80             break;
81         case DemuxFilterMainType::TLV:
82             /*tlvSettings*/
83             break;
84         case DemuxFilterMainType::ALP:
85             /*alpSettings*/
86             break;
87         default:
88             break;
89     }
90 
91     return Result::SUCCESS;
92 }
93 
start()94 Return<Result> Filter::start() {
95     ALOGV("%s", __FUNCTION__);
96 
97     return startFilterLoop();
98 }
99 
stop()100 Return<Result> Filter::stop() {
101     ALOGV("%s", __FUNCTION__);
102 
103     mFilterThreadRunning = false;
104 
105     std::lock_guard<std::mutex> lock(mFilterThreadLock);
106 
107     return Result::SUCCESS;
108 }
109 
flush()110 Return<Result> Filter::flush() {
111     ALOGV("%s", __FUNCTION__);
112 
113     // temp implementation to flush the FMQ
114     int size = mFilterMQ->availableToRead();
115     char* buffer = new char[size];
116     mFilterMQ->read((unsigned char*)&buffer[0], size);
117     delete[] buffer;
118     mFilterStatus = DemuxFilterStatus::DATA_READY;
119 
120     return Result::SUCCESS;
121 }
122 
releaseAvHandle(const hidl_handle &,uint64_t)123 Return<Result> Filter::releaseAvHandle(const hidl_handle& /*avMemory*/, uint64_t /*avDataId*/) {
124     ALOGV("%s", __FUNCTION__);
125 
126     return Result::SUCCESS;
127 }
128 
close()129 Return<Result> Filter::close() {
130     ALOGV("%s", __FUNCTION__);
131 
132     return mDemux->removeFilter(mFilterId);
133 }
134 
createFilterMQ()135 bool Filter::createFilterMQ() {
136     ALOGV("%s", __FUNCTION__);
137 
138     // Create a synchronized FMQ that supports blocking read/write
139     std::unique_ptr<FilterMQ> tmpFilterMQ =
140             std::unique_ptr<FilterMQ>(new (std::nothrow) FilterMQ(mBufferSize, true));
141     if (!tmpFilterMQ->isValid()) {
142         ALOGW("Failed to create FMQ of filter with id: %d", mFilterId);
143         return false;
144     }
145 
146     mFilterMQ = std::move(tmpFilterMQ);
147 
148     if (EventFlag::createEventFlag(mFilterMQ->getEventFlagWord(), &mFilterEventFlag) != OK) {
149         return false;
150     }
151 
152     return true;
153 }
154 
startFilterLoop()155 Result Filter::startFilterLoop() {
156     pthread_create(&mFilterThread, NULL, __threadLoopFilter, this);
157     pthread_setname_np(mFilterThread, "filter_waiting_loop");
158 
159     return Result::SUCCESS;
160 }
161 
__threadLoopFilter(void * user)162 void* Filter::__threadLoopFilter(void* user) {
163     Filter* const self = static_cast<Filter*>(user);
164     self->filterThreadLoop();
165     return 0;
166 }
167 
filterThreadLoop()168 void Filter::filterThreadLoop() {
169     ALOGD("[Filter] filter %d threadLoop start.", mFilterId);
170     std::lock_guard<std::mutex> lock(mFilterThreadLock);
171     mFilterThreadRunning = true;
172 
173     // For the first time of filter output, implementation needs to send the filter
174     // Event Callback without waiting for the DATA_CONSUMED to init the process.
175     while (mFilterThreadRunning) {
176         if (mFilterEvent.events.size() == 0) {
177             ALOGD("[Filter] wait for filter data output.");
178             usleep(1000 * 1000);
179             continue;
180         }
181         // After successfully write, send a callback and wait for the read to be done
182         mCallback->onFilterEvent(mFilterEvent);
183         mFilterEvent.events.resize(0);
184         mFilterStatus = DemuxFilterStatus::DATA_READY;
185         mCallback->onFilterStatus(mFilterStatus);
186         break;
187     }
188 
189     while (mFilterThreadRunning) {
190         uint32_t efState = 0;
191         // We do not wait for the last round of written data to be read to finish the thread
192         // because the VTS can verify the reading itself.
193         for (int i = 0; i < SECTION_WRITE_COUNT; i++) {
194             while (mFilterThreadRunning) {
195                 status_t status = mFilterEventFlag->wait(
196                         static_cast<uint32_t>(DemuxQueueNotifyBits::DATA_CONSUMED), &efState,
197                         WAIT_TIMEOUT, true /* retry on spurious wake */);
198                 if (status != OK) {
199                     ALOGD("[Filter] wait for data consumed");
200                     continue;
201                 }
202                 break;
203             }
204 
205             if (mCallback == nullptr) {
206                 ALOGD("[Filter] filter %d does not hava callback. Ending thread", mFilterId);
207                 break;
208             }
209 
210             maySendFilterStatusCallback();
211 
212             while (mFilterThreadRunning) {
213                 std::lock_guard<std::mutex> lock(mFilterEventLock);
214                 if (mFilterEvent.events.size() == 0) {
215                     continue;
216                 }
217                 // After successfully write, send a callback and wait for the read to be done
218                 mCallback->onFilterEvent(mFilterEvent);
219                 mFilterEvent.events.resize(0);
220                 break;
221             }
222             // We do not wait for the last read to be done
223             // VTS can verify the read result itself.
224             if (i == SECTION_WRITE_COUNT - 1) {
225                 ALOGD("[Filter] filter %d writing done. Ending thread", mFilterId);
226                 break;
227             }
228         }
229         mFilterThreadRunning = false;
230     }
231 
232     ALOGD("[Filter] filter thread ended.");
233 }
234 
maySendFilterStatusCallback()235 void Filter::maySendFilterStatusCallback() {
236     std::lock_guard<std::mutex> lock(mFilterStatusLock);
237     int availableToRead = mFilterMQ->availableToRead();
238     int availableToWrite = mFilterMQ->availableToWrite();
239     int fmqSize = mFilterMQ->getQuantumCount();
240 
241     DemuxFilterStatus newStatus = checkFilterStatusChange(
242             availableToWrite, availableToRead, ceil(fmqSize * 0.75), ceil(fmqSize * 0.25));
243     if (mFilterStatus != newStatus) {
244         mCallback->onFilterStatus(newStatus);
245         mFilterStatus = newStatus;
246     }
247 }
248 
checkFilterStatusChange(uint32_t availableToWrite,uint32_t availableToRead,uint32_t highThreshold,uint32_t lowThreshold)249 DemuxFilterStatus Filter::checkFilterStatusChange(uint32_t availableToWrite,
250                                                   uint32_t availableToRead, uint32_t highThreshold,
251                                                   uint32_t lowThreshold) {
252     if (availableToWrite == 0) {
253         return DemuxFilterStatus::OVERFLOW;
254     } else if (availableToRead > highThreshold) {
255         return DemuxFilterStatus::HIGH_WATER;
256     } else if (availableToRead < lowThreshold) {
257         return DemuxFilterStatus::LOW_WATER;
258     }
259     return mFilterStatus;
260 }
261 
getTpid()262 uint16_t Filter::getTpid() {
263     return mTpid;
264 }
265 
updateFilterOutput(vector<uint8_t> data)266 void Filter::updateFilterOutput(vector<uint8_t> data) {
267     std::lock_guard<std::mutex> lock(mFilterOutputLock);
268     ALOGD("[Filter] filter output updated");
269     mFilterOutput.insert(mFilterOutput.end(), data.begin(), data.end());
270 }
271 
updateRecordOutput(vector<uint8_t> data)272 void Filter::updateRecordOutput(vector<uint8_t> data) {
273     std::lock_guard<std::mutex> lock(mRecordFilterOutputLock);
274     ALOGD("[Filter] record filter output updated");
275     mRecordFilterOutput.insert(mRecordFilterOutput.end(), data.begin(), data.end());
276 }
277 
startFilterHandler()278 Result Filter::startFilterHandler() {
279     std::lock_guard<std::mutex> lock(mFilterOutputLock);
280     switch (mType.mainType) {
281         case DemuxFilterMainType::TS:
282             switch (mType.subType.tsFilterType()) {
283                 case DemuxTsFilterType::UNDEFINED:
284                     break;
285                 case DemuxTsFilterType::SECTION:
286                     startSectionFilterHandler();
287                     break;
288                 case DemuxTsFilterType::PES:
289                     startPesFilterHandler();
290                     break;
291                 case DemuxTsFilterType::TS:
292                     startTsFilterHandler();
293                     break;
294                 case DemuxTsFilterType::AUDIO:
295                 case DemuxTsFilterType::VIDEO:
296                     startMediaFilterHandler();
297                     break;
298                 case DemuxTsFilterType::PCR:
299                     startPcrFilterHandler();
300                     break;
301                 case DemuxTsFilterType::TEMI:
302                     startTemiFilterHandler();
303                     break;
304                 default:
305                     break;
306             }
307             break;
308         case DemuxFilterMainType::MMTP:
309             /*mmtpSettings*/
310             break;
311         case DemuxFilterMainType::IP:
312             /*ipSettings*/
313             break;
314         case DemuxFilterMainType::TLV:
315             /*tlvSettings*/
316             break;
317         case DemuxFilterMainType::ALP:
318             /*alpSettings*/
319             break;
320         default:
321             break;
322     }
323     return Result::SUCCESS;
324 }
325 
startSectionFilterHandler()326 Result Filter::startSectionFilterHandler() {
327     if (mFilterOutput.empty()) {
328         return Result::SUCCESS;
329     }
330     if (!writeSectionsAndCreateEvent(mFilterOutput)) {
331         ALOGD("[Filter] filter %d fails to write into FMQ. Ending thread", mFilterId);
332         return Result::UNKNOWN_ERROR;
333     }
334 
335     mFilterOutput.clear();
336 
337     return Result::SUCCESS;
338 }
339 
startPesFilterHandler()340 Result Filter::startPesFilterHandler() {
341     std::lock_guard<std::mutex> lock(mFilterEventLock);
342     if (mFilterOutput.empty()) {
343         return Result::SUCCESS;
344     }
345 
346     for (int i = 0; i < mFilterOutput.size(); i += 188) {
347         if (mPesSizeLeft == 0) {
348             uint32_t prefix = (mFilterOutput[i + 4] << 16) | (mFilterOutput[i + 5] << 8) |
349                               mFilterOutput[i + 6];
350             if (DEBUG_FILTER) {
351                 ALOGD("[Filter] prefix %d", prefix);
352             }
353             if (prefix == 0x000001) {
354                 // TODO handle mulptiple Pes filters
355                 mPesSizeLeft = (mFilterOutput[i + 8] << 8) | mFilterOutput[i + 9];
356                 mPesSizeLeft += 6;
357                 if (DEBUG_FILTER) {
358                     ALOGD("[Filter] pes data length %d", mPesSizeLeft);
359                 }
360             } else {
361                 continue;
362             }
363         }
364 
365         int endPoint = min(184, mPesSizeLeft);
366         // append data and check size
367         vector<uint8_t>::const_iterator first = mFilterOutput.begin() + i + 4;
368         vector<uint8_t>::const_iterator last = mFilterOutput.begin() + i + 4 + endPoint;
369         mPesOutput.insert(mPesOutput.end(), first, last);
370         // size does not match then continue
371         mPesSizeLeft -= endPoint;
372         if (DEBUG_FILTER) {
373             ALOGD("[Filter] pes data left %d", mPesSizeLeft);
374         }
375         if (mPesSizeLeft > 0) {
376             continue;
377         }
378         // size match then create event
379         if (!writeDataToFilterMQ(mPesOutput)) {
380             ALOGD("[Filter] pes data write failed");
381             mFilterOutput.clear();
382             return Result::INVALID_STATE;
383         }
384         maySendFilterStatusCallback();
385         DemuxFilterPesEvent pesEvent;
386         pesEvent = {
387                 // temp dump meta data
388                 .streamId = mPesOutput[3],
389                 .dataLength = static_cast<uint16_t>(mPesOutput.size()),
390         };
391         if (DEBUG_FILTER) {
392             ALOGD("[Filter] assembled pes data length %d", pesEvent.dataLength);
393         }
394 
395         int size = mFilterEvent.events.size();
396         mFilterEvent.events.resize(size + 1);
397         mFilterEvent.events[size].pes(pesEvent);
398         mPesOutput.clear();
399     }
400 
401     mFilterOutput.clear();
402 
403     return Result::SUCCESS;
404 }
405 
startTsFilterHandler()406 Result Filter::startTsFilterHandler() {
407     // TODO handle starting TS filter
408     return Result::SUCCESS;
409 }
410 
startMediaFilterHandler()411 Result Filter::startMediaFilterHandler() {
412     DemuxFilterMediaEvent mediaEvent;
413     mediaEvent = {
414             // temp dump meta data
415             .pts = 0,
416             .dataLength = 530,
417             .avMemory = nullptr,
418             .isSecureMemory = false,
419     };
420     mFilterEvent.events.resize(1);
421     mFilterEvent.events[0].media(mediaEvent);
422 
423     mFilterOutput.clear();
424     // TODO handle write FQM for media stream
425     return Result::SUCCESS;
426 }
427 
startRecordFilterHandler()428 Result Filter::startRecordFilterHandler() {
429     /*DemuxFilterTsRecordEvent tsRecordEvent;
430     tsRecordEvent.pid.tPid(0);
431     tsRecordEvent.indexMask.tsIndexMask(0x01);
432     mFilterEvent.events.resize(1);
433     mFilterEvent.events[0].tsRecord(tsRecordEvent);
434 */
435     std::lock_guard<std::mutex> lock(mRecordFilterOutputLock);
436     if (mRecordFilterOutput.empty()) {
437         return Result::SUCCESS;
438     }
439 
440     if (mDvr == nullptr || !mDvr->writeRecordFMQ(mRecordFilterOutput)) {
441         ALOGD("[Filter] dvr fails to write into record FMQ.");
442         return Result::UNKNOWN_ERROR;
443     }
444 
445     mRecordFilterOutput.clear();
446     return Result::SUCCESS;
447 }
448 
startPcrFilterHandler()449 Result Filter::startPcrFilterHandler() {
450     // TODO handle starting PCR filter
451     return Result::SUCCESS;
452 }
453 
startTemiFilterHandler()454 Result Filter::startTemiFilterHandler() {
455     // TODO handle starting TEMI filter
456     return Result::SUCCESS;
457 }
458 
writeSectionsAndCreateEvent(vector<uint8_t> data)459 bool Filter::writeSectionsAndCreateEvent(vector<uint8_t> data) {
460     // TODO check how many sections has been read
461     ALOGD("[Filter] section hander");
462     std::lock_guard<std::mutex> lock(mFilterEventLock);
463     if (!writeDataToFilterMQ(data)) {
464         return false;
465     }
466     int size = mFilterEvent.events.size();
467     mFilterEvent.events.resize(size + 1);
468     DemuxFilterSectionEvent secEvent;
469     secEvent = {
470             // temp dump meta data
471             .tableId = 0,
472             .version = 1,
473             .sectionNum = 1,
474             .dataLength = static_cast<uint16_t>(data.size()),
475     };
476     mFilterEvent.events[size].section(secEvent);
477     return true;
478 }
479 
writeDataToFilterMQ(const std::vector<uint8_t> & data)480 bool Filter::writeDataToFilterMQ(const std::vector<uint8_t>& data) {
481     std::lock_guard<std::mutex> lock(mWriteLock);
482     if (mFilterMQ->write(data.data(), data.size())) {
483         return true;
484     }
485     return false;
486 }
487 
attachFilterToRecord(const sp<Dvr> dvr)488 void Filter::attachFilterToRecord(const sp<Dvr> dvr) {
489     mDvr = dvr;
490 }
491 
detachFilterFromRecord()492 void Filter::detachFilterFromRecord() {
493     mDvr = nullptr;
494 }
495 
496 }  // namespace implementation
497 }  // namespace V1_0
498 }  // namespace tuner
499 }  // namespace tv
500 }  // namespace hardware
501 }  // namespace android
502