1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define LOG_TAG "NBLog"
18 //#define LOG_NDEBUG 0
19 
20 #include <memory>
21 #include <queue>
22 #include <stddef.h>
23 #include <stdint.h>
24 #include <vector>
25 
26 #include <audio_utils/fifo.h>
27 #include <json/json.h>
28 #include <media/nblog/Merger.h>
29 #include <media/nblog/PerformanceAnalysis.h>
30 #include <media/nblog/ReportPerformance.h>
31 #include <media/nblog/Reader.h>
32 #include <media/nblog/Timeline.h>
33 #include <utils/Condition.h>
34 #include <utils/Log.h>
35 #include <utils/Mutex.h>
36 #include <utils/RefBase.h>
37 #include <utils/String16.h>
38 #include <utils/String8.h>
39 #include <utils/Thread.h>
40 #include <utils/Timers.h>
41 #include <utils/Vector.h>
42 
43 namespace android {
44 namespace NBLog {
45 
Merger(const void * shared,size_t size)46 Merger::Merger(const void *shared, size_t size):
47       mShared((Shared *) shared),
48       mFifo(mShared != NULL ?
49         new audio_utils_fifo(size, sizeof(uint8_t),
50             mShared->mBuffer, mShared->mRear, NULL /*throttlesFront*/) : NULL),
51       mFifoWriter(mFifo != NULL ? new audio_utils_fifo_writer(*mFifo) : NULL)
52 {
53 }
54 
addReader(const sp<Reader> & reader)55 void Merger::addReader(const sp<Reader> &reader)
56 {
57     // FIXME This is called by binder thread in MediaLogService::registerWriter
58     //       but the access to shared variable mReaders is not yet protected by a lock.
59     mReaders.push_back(reader);
60 }
61 
62 // items placed in priority queue during merge
63 // composed by a timestamp and the index of the snapshot where the timestamp came from
64 struct MergeItem
65 {
66     int64_t ts;
67     int index;
MergeItemandroid::NBLog::MergeItem68     MergeItem(int64_t ts, int index): ts(ts), index(index) {}
69 };
70 
operator >(const struct MergeItem & i1,const struct MergeItem & i2)71 bool operator>(const struct MergeItem &i1, const struct MergeItem &i2)
72 {
73     return i1.ts > i2.ts || (i1.ts == i2.ts && i1.index > i2.index);
74 }
75 
76 // Merge registered readers, sorted by timestamp, and write data to a single FIFO in local memory
merge()77 void Merger::merge()
78 {
79     if (true) return; // Merging is not necessary at the moment, so this is to disable it
80                       // and bypass compiler warnings about member variables not being used.
81     const int nLogs = mReaders.size();
82     std::vector<std::unique_ptr<Snapshot>> snapshots(nLogs);
83     std::vector<EntryIterator> offsets;
84     offsets.reserve(nLogs);
85     for (int i = 0; i < nLogs; ++i) {
86         snapshots[i] = mReaders[i]->getSnapshot();
87         offsets.push_back(snapshots[i]->begin());
88     }
89     // initialize offsets
90     // TODO custom heap implementation could allow to update top, improving performance
91     // for bursty buffers
92     std::priority_queue<MergeItem, std::vector<MergeItem>, std::greater<MergeItem>> timestamps;
93     for (int i = 0; i < nLogs; ++i)
94     {
95         if (offsets[i] != snapshots[i]->end()) {
96             std::unique_ptr<AbstractEntry> abstractEntry = AbstractEntry::buildEntry(offsets[i]);
97             if (abstractEntry == nullptr) {
98                 continue;
99             }
100             timestamps.emplace(abstractEntry->timestamp(), i);
101         }
102     }
103 
104     while (!timestamps.empty()) {
105         int index = timestamps.top().index;     // find minimum timestamp
106         // copy it to the log, increasing offset
107         offsets[index] = AbstractEntry::buildEntry(offsets[index])->
108             copyWithAuthor(mFifoWriter, index);
109         // update data structures
110         timestamps.pop();
111         if (offsets[index] != snapshots[index]->end()) {
112             int64_t ts = AbstractEntry::buildEntry(offsets[index])->timestamp();
113             timestamps.emplace(ts, index);
114         }
115     }
116 }
117 
getReaders() const118 const std::vector<sp<Reader>>& Merger::getReaders() const
119 {
120     //AutoMutex _l(mLock);
121     return mReaders;
122 }
123 
124 // ---------------------------------------------------------------------------
125 
MergeReader(const void * shared,size_t size,Merger & merger)126 MergeReader::MergeReader(const void *shared, size_t size, Merger &merger)
127     : Reader(shared, size, "MergeReader"), mReaders(merger.getReaders())
128 {
129 }
130 
131 // Takes raw content of the local merger FIFO, processes log entries, and
132 // writes the data to a map of class PerformanceAnalysis, based on their thread ID.
processSnapshot(Snapshot & snapshot,int author)133 void MergeReader::processSnapshot(Snapshot &snapshot, int author)
134 {
135     ReportPerformance::PerformanceData& data = mThreadPerformanceData[author];
136     // We don't do "auto it" because it reduces readability in this case.
137     for (EntryIterator it = snapshot.begin(); it != snapshot.end(); ++it) {
138         switch (it->type) {
139         case EVENT_HISTOGRAM_ENTRY_TS: {
140             const HistTsEntry payload = it.payload<HistTsEntry>();
141             // TODO: hash for histogram ts and audio state need to match
142             // and correspond to audio production source file location
143             mThreadPerformanceAnalysis[author][0 /*hash*/].logTsEntry(payload.ts);
144         } break;
145         case EVENT_AUDIO_STATE: {
146             mThreadPerformanceAnalysis[author][0 /*hash*/].handleStateChange();
147         } break;
148         case EVENT_THREAD_INFO: {
149             const thread_info_t info = it.payload<thread_info_t>();
150             data.threadInfo = info;
151         } break;
152         case EVENT_THREAD_PARAMS: {
153             const thread_params_t params = it.payload<thread_params_t>();
154             data.threadParams = params;
155         } break;
156         case EVENT_LATENCY: {
157             const double latencyMs = it.payload<double>();
158             data.latencyHist.add(latencyMs);
159         } break;
160         case EVENT_WORK_TIME: {
161             const int64_t monotonicNs = it.payload<int64_t>();
162             const double monotonicMs = monotonicNs * 1e-6;
163             data.workHist.add(monotonicMs);
164             data.active += monotonicNs;
165         } break;
166         case EVENT_WARMUP_TIME: {
167             const double timeMs = it.payload<double>();
168             data.warmupHist.add(timeMs);
169         } break;
170         case EVENT_UNDERRUN: {
171             const int64_t ts = it.payload<int64_t>();
172             data.underruns++;
173             data.snapshots.emplace_front(EVENT_UNDERRUN, ts);
174             // TODO have a data structure to automatically handle resizing
175             if (data.snapshots.size() > ReportPerformance::PerformanceData::kMaxSnapshotsToStore) {
176                 data.snapshots.pop_back();
177             }
178         } break;
179         case EVENT_OVERRUN: {
180             const int64_t ts = it.payload<int64_t>();
181             data.overruns++;
182             data.snapshots.emplace_front(EVENT_UNDERRUN, ts);
183             // TODO have a data structure to automatically handle resizing
184             if (data.snapshots.size() > ReportPerformance::PerformanceData::kMaxSnapshotsToStore) {
185                 data.snapshots.pop_back();
186             }
187         } break;
188         case EVENT_RESERVED:
189         case EVENT_UPPER_BOUND:
190             ALOGW("warning: unexpected event %d", it->type);
191             break;
192         default:
193             break;
194         }
195     }
196 }
197 
getAndProcessSnapshot()198 void MergeReader::getAndProcessSnapshot()
199 {
200     // get a snapshot of each reader and process them
201     // TODO insert lock here
202     const size_t nLogs = mReaders.size();
203     std::vector<std::unique_ptr<Snapshot>> snapshots(nLogs);
204     for (size_t i = 0; i < nLogs; i++) {
205         snapshots[i] = mReaders[i]->getSnapshot();
206     }
207     // TODO unlock lock here
208     for (size_t i = 0; i < nLogs; i++) {
209         if (snapshots[i] != nullptr) {
210             processSnapshot(*(snapshots[i]), i);
211         }
212     }
213     checkPushToMediaMetrics();
214 }
215 
checkPushToMediaMetrics()216 void MergeReader::checkPushToMediaMetrics()
217 {
218     const nsecs_t now = systemTime();
219     for (auto& item : mThreadPerformanceData) {
220         ReportPerformance::PerformanceData& data = item.second;
221         if (now - data.start >= kPeriodicMediaMetricsPush) {
222             (void)ReportPerformance::sendToMediaMetrics(data);
223             data.reset();   // data is persistent per thread
224         }
225     }
226 }
227 
dump(int fd,const Vector<String16> & args)228 void MergeReader::dump(int fd, const Vector<String16>& args)
229 {
230     // TODO: add a mutex around media.log dump
231     // Options for dumpsys
232     bool pa = false, json = false, plots = false, retro = false;
233     for (const auto &arg : args) {
234         if (arg == String16("--pa")) {
235             pa = true;
236         } else if (arg == String16("--json")) {
237             json = true;
238         } else if (arg == String16("--plots")) {
239             plots = true;
240         } else if (arg == String16("--retro")) {
241             retro = true;
242         }
243     }
244     if (pa) {
245         ReportPerformance::dump(fd, 0 /*indent*/, mThreadPerformanceAnalysis);
246     }
247     if (json) {
248         ReportPerformance::dumpJson(fd, mThreadPerformanceData);
249     }
250     if (plots) {
251         ReportPerformance::dumpPlots(fd, mThreadPerformanceData);
252     }
253     if (retro) {
254         ReportPerformance::dumpRetro(fd, mThreadPerformanceData);
255     }
256 }
257 
handleAuthor(const AbstractEntry & entry,String8 * body)258 void MergeReader::handleAuthor(const AbstractEntry &entry, String8 *body)
259 {
260     int author = entry.author();
261     if (author == -1) {
262         return;
263     }
264     // FIXME Needs a lock
265     const char* name = mReaders[author]->name().c_str();
266     body->appendFormat("%s: ", name);
267 }
268 
269 // ---------------------------------------------------------------------------
270 
MergeThread(Merger & merger,MergeReader & mergeReader)271 MergeThread::MergeThread(Merger &merger, MergeReader &mergeReader)
272     : mMerger(merger),
273       mMergeReader(mergeReader),
274       mTimeoutUs(0)
275 {
276 }
277 
~MergeThread()278 MergeThread::~MergeThread()
279 {
280     // set exit flag, set timeout to 0 to force threadLoop to exit and wait for the thread to join
281     requestExit();
282     setTimeoutUs(0);
283     join();
284 }
285 
threadLoop()286 bool MergeThread::threadLoop()
287 {
288     bool doMerge;
289     {
290         AutoMutex _l(mMutex);
291         // If mTimeoutUs is negative, wait on the condition variable until it's positive.
292         // If it's positive, merge. The minimum period between waking the condition variable
293         // is handled in AudioFlinger::MediaLogNotifier::threadLoop().
294         mCond.wait(mMutex);
295         doMerge = mTimeoutUs > 0;
296         mTimeoutUs -= kThreadSleepPeriodUs;
297     }
298     if (doMerge) {
299         // Merge data from all the readers
300         mMerger.merge();
301         // Process the data collected by mMerger and write it to PerformanceAnalysis
302         // FIXME: decide whether to call getAndProcessSnapshot every time
303         // or whether to have a separate thread that calls it with a lower frequency
304         mMergeReader.getAndProcessSnapshot();
305     }
306     return true;
307 }
308 
wakeup()309 void MergeThread::wakeup()
310 {
311     setTimeoutUs(kThreadWakeupPeriodUs);
312 }
313 
setTimeoutUs(int time)314 void MergeThread::setTimeoutUs(int time)
315 {
316     AutoMutex _l(mMutex);
317     mTimeoutUs = time;
318     mCond.signal();
319 }
320 
321 }   // namespace NBLog
322 }   // namespace android
323