1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define DEBUG false  // STOPSHIP if true
18 #include "Log.h"
19 
20 #include "ValueMetricProducer.h"
21 #include "../guardrail/StatsdStats.h"
22 #include "../stats_log_util.h"
23 
24 #include <limits.h>
25 #include <stdlib.h>
26 
27 using android::util::FIELD_COUNT_REPEATED;
28 using android::util::FIELD_TYPE_BOOL;
29 using android::util::FIELD_TYPE_DOUBLE;
30 using android::util::FIELD_TYPE_INT32;
31 using android::util::FIELD_TYPE_INT64;
32 using android::util::FIELD_TYPE_MESSAGE;
33 using android::util::FIELD_TYPE_STRING;
34 using android::util::ProtoOutputStream;
35 using std::map;
36 using std::shared_ptr;
37 using std::unordered_map;
38 
39 namespace android {
40 namespace os {
41 namespace statsd {
42 
43 // for StatsLogReport
44 const int FIELD_ID_ID = 1;
45 const int FIELD_ID_VALUE_METRICS = 7;
46 const int FIELD_ID_TIME_BASE = 9;
47 const int FIELD_ID_BUCKET_SIZE = 10;
48 const int FIELD_ID_DIMENSION_PATH_IN_WHAT = 11;
49 const int FIELD_ID_DIMENSION_PATH_IN_CONDITION = 12;
50 const int FIELD_ID_IS_ACTIVE = 14;
51 // for ValueMetricDataWrapper
52 const int FIELD_ID_DATA = 1;
53 const int FIELD_ID_SKIPPED = 2;
54 const int FIELD_ID_SKIPPED_START_MILLIS = 3;
55 const int FIELD_ID_SKIPPED_END_MILLIS = 4;
56 // for ValueMetricData
57 const int FIELD_ID_DIMENSION_IN_WHAT = 1;
58 const int FIELD_ID_DIMENSION_IN_CONDITION = 2;
59 const int FIELD_ID_BUCKET_INFO = 3;
60 const int FIELD_ID_DIMENSION_LEAF_IN_WHAT = 4;
61 const int FIELD_ID_DIMENSION_LEAF_IN_CONDITION = 5;
62 // for ValueBucketInfo
63 const int FIELD_ID_VALUE_INDEX = 1;
64 const int FIELD_ID_VALUE_LONG = 2;
65 const int FIELD_ID_VALUE_DOUBLE = 3;
66 const int FIELD_ID_VALUES = 9;
67 const int FIELD_ID_BUCKET_NUM = 4;
68 const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 5;
69 const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 6;
70 const int FIELD_ID_CONDITION_TRUE_NS = 10;
71 
72 const Value ZERO_LONG((int64_t)0);
73 const Value ZERO_DOUBLE((int64_t)0);
74 
75 // ValueMetric has a minimum bucket size of 10min so that we don't pull too frequently
ValueMetricProducer(const ConfigKey & key,const ValueMetric & metric,const int conditionIndex,const sp<ConditionWizard> & conditionWizard,const int whatMatcherIndex,const sp<EventMatcherWizard> & matcherWizard,const int pullTagId,const int64_t timeBaseNs,const int64_t startTimeNs,const sp<StatsPullerManager> & pullerManager)76 ValueMetricProducer::ValueMetricProducer(
77         const ConfigKey& key, const ValueMetric& metric, const int conditionIndex,
78         const sp<ConditionWizard>& conditionWizard, const int whatMatcherIndex,
79         const sp<EventMatcherWizard>& matcherWizard, const int pullTagId, const int64_t timeBaseNs,
80         const int64_t startTimeNs, const sp<StatsPullerManager>& pullerManager)
81     : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, conditionWizard),
82       mWhatMatcherIndex(whatMatcherIndex),
83       mEventMatcherWizard(matcherWizard),
84       mPullerManager(pullerManager),
85       mPullTagId(pullTagId),
86       mIsPulled(pullTagId != -1),
87       mMinBucketSizeNs(metric.min_bucket_size_nanos()),
88       mDimensionSoftLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) !=
89                                           StatsdStats::kAtomDimensionKeySizeLimitMap.end()
90                                   ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).first
91                                   : StatsdStats::kDimensionKeySizeSoftLimit),
92       mDimensionHardLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) !=
93                                           StatsdStats::kAtomDimensionKeySizeLimitMap.end()
94                                   ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).second
95                                   : StatsdStats::kDimensionKeySizeHardLimit),
96       mUseAbsoluteValueOnReset(metric.use_absolute_value_on_reset()),
97       mAggregationType(metric.aggregation_type()),
98       mUseDiff(metric.has_use_diff() ? metric.use_diff() : (mIsPulled ? true : false)),
99       mValueDirection(metric.value_direction()),
100       mSkipZeroDiffOutput(metric.skip_zero_diff_output()),
101       mUseZeroDefaultBase(metric.use_zero_default_base()),
102       mHasGlobalBase(false),
103       mCurrentBucketIsInvalid(false),
104       mMaxPullDelayNs(metric.max_pull_delay_sec() > 0 ? metric.max_pull_delay_sec() * NS_PER_SEC
105                                                       : StatsdStats::kPullMaxDelayNs),
106       mSplitBucketForAppUpgrade(metric.split_bucket_for_app_upgrade()),
107       // Condition timer will be set in prepareFirstBucketLocked.
108       mConditionTimer(false, timeBaseNs) {
109     int64_t bucketSizeMills = 0;
110     if (metric.has_bucket()) {
111         bucketSizeMills = TimeUnitToBucketSizeInMillisGuardrailed(key.GetUid(), metric.bucket());
112     } else {
113         bucketSizeMills = TimeUnitToBucketSizeInMillis(ONE_HOUR);
114     }
115 
116     mBucketSizeNs = bucketSizeMills * 1000000;
117 
118     translateFieldMatcher(metric.value_field(), &mFieldMatchers);
119 
120     if (metric.has_dimensions_in_what()) {
121         translateFieldMatcher(metric.dimensions_in_what(), &mDimensionsInWhat);
122         mContainANYPositionInDimensionsInWhat = HasPositionANY(metric.dimensions_in_what());
123     }
124 
125     if (metric.has_dimensions_in_condition()) {
126         translateFieldMatcher(metric.dimensions_in_condition(), &mDimensionsInCondition);
127     }
128 
129     if (metric.links().size() > 0) {
130         for (const auto& link : metric.links()) {
131             Metric2Condition mc;
132             mc.conditionId = link.condition();
133             translateFieldMatcher(link.fields_in_what(), &mc.metricFields);
134             translateFieldMatcher(link.fields_in_condition(), &mc.conditionFields);
135             mMetric2ConditionLinks.push_back(mc);
136         }
137     }
138 
139     mConditionSliced = (metric.links().size() > 0) || (mDimensionsInCondition.size() > 0);
140     mSliceByPositionALL = HasPositionALL(metric.dimensions_in_what()) ||
141                           HasPositionALL(metric.dimensions_in_condition());
142 
143     int64_t numBucketsForward = calcBucketsForwardCount(startTimeNs);
144     mCurrentBucketNum += numBucketsForward;
145 
146     flushIfNeededLocked(startTimeNs);
147 
148     if (mIsPulled) {
149         mPullerManager->RegisterReceiver(mPullTagId, this, getCurrentBucketEndTimeNs(),
150                                          mBucketSizeNs);
151     }
152 
153     // Only do this for partial buckets like first bucket. All other buckets should use
154     // flushIfNeeded to adjust start and end to bucket boundaries.
155     // Adjust start for partial bucket
156     mCurrentBucketStartTimeNs = startTimeNs;
157     mConditionTimer.newBucketStart(mCurrentBucketStartTimeNs);
158     VLOG("value metric %lld created. bucket size %lld start_time: %lld", (long long)metric.id(),
159          (long long)mBucketSizeNs, (long long)mTimeBaseNs);
160 }
161 
~ValueMetricProducer()162 ValueMetricProducer::~ValueMetricProducer() {
163     VLOG("~ValueMetricProducer() called");
164     if (mIsPulled) {
165         mPullerManager->UnRegisterReceiver(mPullTagId, this);
166     }
167 }
168 
prepareFirstBucketLocked()169 void ValueMetricProducer::prepareFirstBucketLocked() {
170     // Kicks off the puller immediately if condition is true and diff based.
171     if (mIsActive && mIsPulled && mCondition == ConditionState::kTrue && mUseDiff) {
172         pullAndMatchEventsLocked(mCurrentBucketStartTimeNs, mCondition);
173     }
174     // Now that activations are processed, start the condition timer if needed.
175     mConditionTimer.onConditionChanged(mIsActive && mCondition == ConditionState::kTrue,
176                                        mCurrentBucketStartTimeNs);
177 }
178 
onSlicedConditionMayChangeLocked(bool overallCondition,const int64_t eventTime)179 void ValueMetricProducer::onSlicedConditionMayChangeLocked(bool overallCondition,
180                                                            const int64_t eventTime) {
181     VLOG("Metric %lld onSlicedConditionMayChange", (long long)mMetricId);
182 }
183 
dropDataLocked(const int64_t dropTimeNs)184 void ValueMetricProducer::dropDataLocked(const int64_t dropTimeNs) {
185     StatsdStats::getInstance().noteBucketDropped(mMetricId);
186     // We are going to flush the data without doing a pull first so we need to invalidte the data.
187     bool pullNeeded = mIsPulled && mCondition == ConditionState::kTrue;
188     if (pullNeeded) {
189         invalidateCurrentBucket();
190     }
191     flushIfNeededLocked(dropTimeNs);
192     clearPastBucketsLocked(dropTimeNs);
193 }
194 
clearPastBucketsLocked(const int64_t dumpTimeNs)195 void ValueMetricProducer::clearPastBucketsLocked(const int64_t dumpTimeNs) {
196     mPastBuckets.clear();
197     mSkippedBuckets.clear();
198 }
199 
onDumpReportLocked(const int64_t dumpTimeNs,const bool include_current_partial_bucket,const bool erase_data,const DumpLatency dumpLatency,std::set<string> * str_set,ProtoOutputStream * protoOutput)200 void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
201                                              const bool include_current_partial_bucket,
202                                              const bool erase_data,
203                                              const DumpLatency dumpLatency,
204                                              std::set<string> *str_set,
205                                              ProtoOutputStream* protoOutput) {
206     VLOG("metric %lld dump report now...", (long long)mMetricId);
207     if (include_current_partial_bucket) {
208         // For pull metrics, we need to do a pull at bucket boundaries. If we do not do that the
209         // current bucket will have incomplete data and the next will have the wrong snapshot to do
210         // a diff against. If the condition is false, we are fine since the base data is reset and
211         // we are not tracking anything.
212         bool pullNeeded = mIsPulled && mCondition == ConditionState::kTrue;
213         if (pullNeeded) {
214             switch (dumpLatency) {
215                 case FAST:
216                     invalidateCurrentBucket();
217                     break;
218                 case NO_TIME_CONSTRAINTS:
219                     pullAndMatchEventsLocked(dumpTimeNs, mCondition);
220                     break;
221             }
222         }
223         flushCurrentBucketLocked(dumpTimeNs, dumpTimeNs);
224     }
225     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId);
226     protoOutput->write(FIELD_TYPE_BOOL | FIELD_ID_IS_ACTIVE, isActiveLocked());
227 
228     if (mPastBuckets.empty() && mSkippedBuckets.empty()) {
229         return;
230     }
231     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_TIME_BASE, (long long)mTimeBaseNs);
232     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_SIZE, (long long)mBucketSizeNs);
233     // Fills the dimension path if not slicing by ALL.
234     if (!mSliceByPositionALL) {
235         if (!mDimensionsInWhat.empty()) {
236             uint64_t dimenPathToken =
237                     protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_WHAT);
238             writeDimensionPathToProto(mDimensionsInWhat, protoOutput);
239             protoOutput->end(dimenPathToken);
240         }
241         if (!mDimensionsInCondition.empty()) {
242             uint64_t dimenPathToken =
243                     protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_CONDITION);
244             writeDimensionPathToProto(mDimensionsInCondition, protoOutput);
245             protoOutput->end(dimenPathToken);
246         }
247     }
248 
249     uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_VALUE_METRICS);
250 
251     for (const auto& pair : mSkippedBuckets) {
252         uint64_t wrapperToken =
253                 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED);
254         protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START_MILLIS,
255                            (long long)(NanoToMillis(pair.first)));
256         protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END_MILLIS,
257                            (long long)(NanoToMillis(pair.second)));
258         protoOutput->end(wrapperToken);
259     }
260 
261     for (const auto& pair : mPastBuckets) {
262         const MetricDimensionKey& dimensionKey = pair.first;
263         VLOG("  dimension key %s", dimensionKey.toString().c_str());
264         uint64_t wrapperToken =
265                 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
266 
267         // First fill dimension.
268         if (mSliceByPositionALL) {
269             uint64_t dimensionToken =
270                     protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT);
271             writeDimensionToProto(dimensionKey.getDimensionKeyInWhat(), str_set, protoOutput);
272             protoOutput->end(dimensionToken);
273             if (dimensionKey.hasDimensionKeyInCondition()) {
274                 uint64_t dimensionInConditionToken =
275                         protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_CONDITION);
276                 writeDimensionToProto(dimensionKey.getDimensionKeyInCondition(), str_set,
277                                       protoOutput);
278                 protoOutput->end(dimensionInConditionToken);
279             }
280         } else {
281             writeDimensionLeafNodesToProto(dimensionKey.getDimensionKeyInWhat(),
282                                            FIELD_ID_DIMENSION_LEAF_IN_WHAT, str_set, protoOutput);
283             if (dimensionKey.hasDimensionKeyInCondition()) {
284                 writeDimensionLeafNodesToProto(dimensionKey.getDimensionKeyInCondition(),
285                                                FIELD_ID_DIMENSION_LEAF_IN_CONDITION, str_set,
286                                                protoOutput);
287             }
288         }
289 
290         // Then fill bucket_info (ValueBucketInfo).
291         for (const auto& bucket : pair.second) {
292             uint64_t bucketInfoToken = protoOutput->start(
293                     FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
294 
295             if (bucket.mBucketEndNs - bucket.mBucketStartNs != mBucketSizeNs) {
296                 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_ELAPSED_MILLIS,
297                                    (long long)NanoToMillis(bucket.mBucketStartNs));
298                 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_ELAPSED_MILLIS,
299                                    (long long)NanoToMillis(bucket.mBucketEndNs));
300             } else {
301                 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_NUM,
302                                    (long long)(getBucketNumFromEndTimeNs(bucket.mBucketEndNs)));
303             }
304             // only write the condition timer value if the metric has a condition.
305             if (mConditionTrackerIndex >= 0) {
306                 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_CONDITION_TRUE_NS,
307                                    (long long)bucket.mConditionTrueNs);
308             }
309             for (int i = 0; i < (int)bucket.valueIndex.size(); i ++) {
310                 int index = bucket.valueIndex[i];
311                 const Value& value = bucket.values[i];
312                 uint64_t valueToken = protoOutput->start(
313                         FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_VALUES);
314                 protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_VALUE_INDEX,
315                                    index);
316                 if (value.getType() == LONG) {
317                     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_LONG,
318                                        (long long)value.long_value);
319                     VLOG("\t bucket [%lld - %lld] value %d: %lld", (long long)bucket.mBucketStartNs,
320                          (long long)bucket.mBucketEndNs, index, (long long)value.long_value);
321                 } else if (value.getType() == DOUBLE) {
322                     protoOutput->write(FIELD_TYPE_DOUBLE | FIELD_ID_VALUE_DOUBLE,
323                                        value.double_value);
324                     VLOG("\t bucket [%lld - %lld] value %d: %.2f", (long long)bucket.mBucketStartNs,
325                          (long long)bucket.mBucketEndNs, index, value.double_value);
326                 } else {
327                     VLOG("Wrong value type for ValueMetric output: %d", value.getType());
328                 }
329                 protoOutput->end(valueToken);
330             }
331             protoOutput->end(bucketInfoToken);
332         }
333         protoOutput->end(wrapperToken);
334     }
335     protoOutput->end(protoToken);
336 
337     VLOG("metric %lld dump report now...", (long long)mMetricId);
338     if (erase_data) {
339         mPastBuckets.clear();
340         mSkippedBuckets.clear();
341     }
342 }
343 
invalidateCurrentBucketWithoutResetBase()344 void ValueMetricProducer::invalidateCurrentBucketWithoutResetBase() {
345     if (!mCurrentBucketIsInvalid) {
346         // Only report once per invalid bucket.
347         StatsdStats::getInstance().noteInvalidatedBucket(mMetricId);
348     }
349     mCurrentBucketIsInvalid = true;
350 }
351 
invalidateCurrentBucket()352 void ValueMetricProducer::invalidateCurrentBucket() {
353     invalidateCurrentBucketWithoutResetBase();
354     resetBase();
355 }
356 
resetBase()357 void ValueMetricProducer::resetBase() {
358     for (auto& slice : mCurrentSlicedBucket) {
359         for (auto& interval : slice.second) {
360             interval.hasBase = false;
361         }
362     }
363     mHasGlobalBase = false;
364 }
365 
366 // Handle active state change. Active state change is treated like a condition change:
367 // - drop bucket if active state change event arrives too late
368 // - if condition is true, pull data on active state changes
369 // - ConditionTimer tracks changes based on AND of condition and active state.
onActiveStateChangedLocked(const int64_t & eventTimeNs)370 void ValueMetricProducer::onActiveStateChangedLocked(const int64_t& eventTimeNs) {
371     bool isEventTooLate  = eventTimeNs < mCurrentBucketStartTimeNs;
372     if (ConditionState::kTrue == mCondition && isEventTooLate) {
373         // Drop bucket because event arrived too late, ie. we are missing data for this bucket.
374         invalidateCurrentBucket();
375     }
376 
377     // Call parent method once we've verified the validity of current bucket.
378     MetricProducer::onActiveStateChangedLocked(eventTimeNs);
379 
380     if (ConditionState::kTrue != mCondition) {
381         return;
382     }
383 
384     // Pull on active state changes.
385     if (!isEventTooLate) {
386         if (mIsPulled) {
387             pullAndMatchEventsLocked(eventTimeNs, mCondition);
388         }
389         // When active state changes from true to false, clear diff base but don't
390         // reset other counters as we may accumulate more value in the bucket.
391         if (mUseDiff && !mIsActive) {
392             resetBase();
393         }
394     }
395 
396     flushIfNeededLocked(eventTimeNs);
397 
398     // Let condition timer know of new active state.
399     mConditionTimer.onConditionChanged(mIsActive, eventTimeNs);
400 }
401 
onConditionChangedLocked(const bool condition,const int64_t eventTimeNs)402 void ValueMetricProducer::onConditionChangedLocked(const bool condition,
403                                                    const int64_t eventTimeNs) {
404     ConditionState newCondition = condition ? ConditionState::kTrue : ConditionState::kFalse;
405     bool isEventTooLate  = eventTimeNs < mCurrentBucketStartTimeNs;
406 
407     if (mIsActive) {
408         if (isEventTooLate) {
409             VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
410                  (long long)mCurrentBucketStartTimeNs);
411             StatsdStats::getInstance().noteConditionChangeInNextBucket(mMetricId);
412             invalidateCurrentBucket();
413         } else {
414             if (mCondition == ConditionState::kUnknown) {
415                 // If the condition was unknown, we mark the bucket as invalid since the bucket will
416                 // contain partial data. For instance, the condition change might happen close to
417                 // the end of the bucket and we might miss lots of data.
418                 //
419                 // We still want to pull to set the base.
420                 invalidateCurrentBucket();
421             }
422 
423             // Pull on condition changes.
424             bool conditionChanged =
425                     (mCondition == ConditionState::kTrue && newCondition == ConditionState::kFalse)
426                     || (mCondition == ConditionState::kFalse &&
427                             newCondition == ConditionState::kTrue);
428             // We do not need to pull when we go from unknown to false.
429             //
430             // We also pull if the condition was already true in order to be able to flush the
431             // bucket at the end if needed.
432             //
433             // onConditionChangedLocked might happen on bucket boundaries if this is called before
434             // #onDataPulled.
435             if (mIsPulled && (conditionChanged || condition)) {
436                 pullAndMatchEventsLocked(eventTimeNs, newCondition);
437             }
438 
439             // When condition change from true to false, clear diff base but don't
440             // reset other counters as we may accumulate more value in the bucket.
441             if (mUseDiff && mCondition == ConditionState::kTrue
442                     && newCondition == ConditionState::kFalse) {
443                 resetBase();
444             }
445         }
446     }
447 
448     mCondition = isEventTooLate ? initialCondition(mConditionTrackerIndex) : newCondition;
449 
450     if (mIsActive) {
451         flushIfNeededLocked(eventTimeNs);
452         mConditionTimer.onConditionChanged(mCondition, eventTimeNs);
453     }
454 }
455 
pullAndMatchEventsLocked(const int64_t timestampNs,ConditionState condition)456 void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs,
457         ConditionState condition) {
458     vector<std::shared_ptr<LogEvent>> allData;
459     if (!mPullerManager->Pull(mPullTagId, &allData)) {
460         ALOGE("Stats puller failed for tag: %d at %lld", mPullTagId, (long long)timestampNs);
461         invalidateCurrentBucket();
462         return;
463     }
464 
465     accumulateEvents(allData, timestampNs, timestampNs, condition);
466 }
467 
calcPreviousBucketEndTime(const int64_t currentTimeNs)468 int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) {
469     return mTimeBaseNs + ((currentTimeNs - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs;
470 }
471 
472 // By design, statsd pulls data at bucket boundaries using AlarmManager. These pulls are likely
473 // to be delayed. Other events like condition changes or app upgrade which are not based on
474 // AlarmManager might have arrived earlier and close the bucket.
onDataPulled(const std::vector<std::shared_ptr<LogEvent>> & allData,bool pullSuccess,int64_t originalPullTimeNs)475 void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData,
476                                        bool pullSuccess, int64_t originalPullTimeNs) {
477     std::lock_guard<std::mutex> lock(mMutex);
478         if (mCondition == ConditionState::kTrue) {
479             // If the pull failed, we won't be able to compute a diff.
480             if (!pullSuccess) {
481                 invalidateCurrentBucket();
482             } else {
483                 bool isEventLate = originalPullTimeNs < getCurrentBucketEndTimeNs();
484                 if (isEventLate) {
485                     // If the event is late, we are in the middle of a bucket. Just
486                     // process the data without trying to snap the data to the nearest bucket.
487                     accumulateEvents(allData, originalPullTimeNs, originalPullTimeNs, mCondition);
488                 } else {
489                     // For scheduled pulled data, the effective event time is snap to the nearest
490                     // bucket end. In the case of waking up from a deep sleep state, we will
491                     // attribute to the previous bucket end. If the sleep was long but not very
492                     // long, we will be in the immediate next bucket. Previous bucket may get a
493                     // larger number as we pull at a later time than real bucket end.
494                     //
495                     // If the sleep was very long, we skip more than one bucket before sleep. In
496                     // this case, if the diff base will be cleared and this new data will serve as
497                     // new diff base.
498                     int64_t bucketEndTime = calcPreviousBucketEndTime(originalPullTimeNs) - 1;
499                     StatsdStats::getInstance().noteBucketBoundaryDelayNs(
500                             mMetricId, originalPullTimeNs - bucketEndTime);
501                     accumulateEvents(allData, originalPullTimeNs, bucketEndTime, mCondition);
502                 }
503             }
504         }
505 
506     // We can probably flush the bucket. Since we used bucketEndTime when calling
507     // #onMatchedLogEventInternalLocked, the current bucket will not have been flushed.
508     flushIfNeededLocked(originalPullTimeNs);
509 }
510 
accumulateEvents(const std::vector<std::shared_ptr<LogEvent>> & allData,int64_t originalPullTimeNs,int64_t eventElapsedTimeNs,ConditionState condition)511 void ValueMetricProducer::accumulateEvents(const std::vector<std::shared_ptr<LogEvent>>& allData,
512                                            int64_t originalPullTimeNs, int64_t eventElapsedTimeNs,
513                                            ConditionState condition) {
514     bool isEventLate = eventElapsedTimeNs < mCurrentBucketStartTimeNs;
515     if (isEventLate) {
516         VLOG("Skip bucket end pull due to late arrival: %lld vs %lld",
517              (long long)eventElapsedTimeNs, (long long)mCurrentBucketStartTimeNs);
518         StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
519         invalidateCurrentBucket();
520         return;
521     }
522 
523     const int64_t pullDelayNs = getElapsedRealtimeNs() - originalPullTimeNs;
524     StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
525     if (pullDelayNs > mMaxPullDelayNs) {
526         ALOGE("Pull finish too late for atom %d, longer than %lld", mPullTagId,
527               (long long)mMaxPullDelayNs);
528         StatsdStats::getInstance().notePullExceedMaxDelay(mPullTagId);
529         // We are missing one pull from the bucket which means we will not have a complete view of
530         // what's going on.
531         invalidateCurrentBucket();
532         return;
533     }
534 
535     if (allData.size() == 0) {
536         VLOG("Data pulled is empty");
537         StatsdStats::getInstance().noteEmptyData(mPullTagId);
538     }
539 
540     mMatchedMetricDimensionKeys.clear();
541     for (const auto& data : allData) {
542         LogEvent localCopy = data->makeCopy();
543         if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) ==
544             MatchingState::kMatched) {
545             localCopy.setElapsedTimestampNs(eventElapsedTimeNs);
546             onMatchedLogEventLocked(mWhatMatcherIndex, localCopy);
547         }
548     }
549     // If the new pulled data does not contains some keys we track in our intervals, we need to
550     // reset the base.
551     for (auto& slice : mCurrentSlicedBucket) {
552         bool presentInPulledData = mMatchedMetricDimensionKeys.find(slice.first)
553                 != mMatchedMetricDimensionKeys.end();
554         if (!presentInPulledData) {
555             for (auto& interval : slice.second) {
556                 interval.hasBase = false;
557             }
558         }
559     }
560     mMatchedMetricDimensionKeys.clear();
561     mHasGlobalBase = true;
562 
563     // If we reach the guardrail, we might have dropped some data which means the bucket is
564     // incomplete.
565     //
566     // The base also needs to be reset. If we do not have the full data, we might
567     // incorrectly compute the diff when mUseZeroDefaultBase is true since an existing key
568     // might be missing from mCurrentSlicedBucket.
569     if (hasReachedGuardRailLimit()) {
570         invalidateCurrentBucket();
571         mCurrentSlicedBucket.clear();
572     }
573 }
574 
dumpStatesLocked(FILE * out,bool verbose) const575 void ValueMetricProducer::dumpStatesLocked(FILE* out, bool verbose) const {
576     if (mCurrentSlicedBucket.size() == 0) {
577         return;
578     }
579 
580     fprintf(out, "ValueMetric %lld dimension size %lu\n", (long long)mMetricId,
581             (unsigned long)mCurrentSlicedBucket.size());
582     if (verbose) {
583         for (const auto& it : mCurrentSlicedBucket) {
584           for (const auto& interval : it.second) {
585             fprintf(out, "\t(what)%s\t(condition)%s  (value)%s\n",
586                     it.first.getDimensionKeyInWhat().toString().c_str(),
587                     it.first.getDimensionKeyInCondition().toString().c_str(),
588                     interval.value.toString().c_str());
589           }
590         }
591     }
592 }
593 
hasReachedGuardRailLimit() const594 bool ValueMetricProducer::hasReachedGuardRailLimit() const {
595     return mCurrentSlicedBucket.size() >= mDimensionHardLimit;
596 }
597 
hitGuardRailLocked(const MetricDimensionKey & newKey)598 bool ValueMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) {
599     // ===========GuardRail==============
600     // 1. Report the tuple count if the tuple count > soft limit
601     if (mCurrentSlicedBucket.find(newKey) != mCurrentSlicedBucket.end()) {
602         return false;
603     }
604     if (mCurrentSlicedBucket.size() > mDimensionSoftLimit - 1) {
605         size_t newTupleCount = mCurrentSlicedBucket.size() + 1;
606         StatsdStats::getInstance().noteMetricDimensionSize(mConfigKey, mMetricId, newTupleCount);
607         // 2. Don't add more tuples, we are above the allowed threshold. Drop the data.
608         if (hasReachedGuardRailLimit()) {
609             ALOGE("ValueMetric %lld dropping data for dimension key %s", (long long)mMetricId,
610                   newKey.toString().c_str());
611             StatsdStats::getInstance().noteHardDimensionLimitReached(mMetricId);
612             return true;
613         }
614     }
615 
616     return false;
617 }
618 
hitFullBucketGuardRailLocked(const MetricDimensionKey & newKey)619 bool ValueMetricProducer::hitFullBucketGuardRailLocked(const MetricDimensionKey& newKey) {
620     // ===========GuardRail==============
621     // 1. Report the tuple count if the tuple count > soft limit
622     if (mCurrentFullBucket.find(newKey) != mCurrentFullBucket.end()) {
623         return false;
624     }
625     if (mCurrentFullBucket.size() > mDimensionSoftLimit - 1) {
626         size_t newTupleCount = mCurrentFullBucket.size() + 1;
627         // 2. Don't add more tuples, we are above the allowed threshold. Drop the data.
628         if (newTupleCount > mDimensionHardLimit) {
629             ALOGE("ValueMetric %lld dropping data for full bucket dimension key %s",
630                   (long long)mMetricId,
631                   newKey.toString().c_str());
632             return true;
633         }
634     }
635 
636     return false;
637 }
638 
getDoubleOrLong(const LogEvent & event,const Matcher & matcher,Value & ret)639 bool getDoubleOrLong(const LogEvent& event, const Matcher& matcher, Value& ret) {
640     for (const FieldValue& value : event.getValues()) {
641         if (value.mField.matches(matcher)) {
642             switch (value.mValue.type) {
643                 case INT:
644                     ret.setLong(value.mValue.int_value);
645                     break;
646                 case LONG:
647                     ret.setLong(value.mValue.long_value);
648                     break;
649                 case FLOAT:
650                     ret.setDouble(value.mValue.float_value);
651                     break;
652                 case DOUBLE:
653                     ret.setDouble(value.mValue.double_value);
654                     break;
655                 default:
656                     break;
657             }
658             return true;
659         }
660     }
661     return false;
662 }
663 
onMatchedLogEventInternalLocked(const size_t matcherIndex,const MetricDimensionKey & eventKey,const ConditionKey & conditionKey,bool condition,const LogEvent & event)664 void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIndex,
665                                                           const MetricDimensionKey& eventKey,
666                                                           const ConditionKey& conditionKey,
667                                                           bool condition, const LogEvent& event) {
668     int64_t eventTimeNs = event.GetElapsedTimestampNs();
669     if (eventTimeNs < mCurrentBucketStartTimeNs) {
670         VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
671              (long long)mCurrentBucketStartTimeNs);
672         return;
673     }
674     mMatchedMetricDimensionKeys.insert(eventKey);
675 
676     if (!mIsPulled) {
677         // We cannot flush without doing a pull first.
678         flushIfNeededLocked(eventTimeNs);
679     }
680 
681     // We should not accumulate the data for pushed metrics when the condition is false.
682     bool shouldSkipForPushMetric = !mIsPulled && !condition;
683     // For pulled metrics, there are two cases:
684     // - to compute diffs, we need to process all the state changes
685     // - for non-diffs metrics, we should ignore the data if the condition wasn't true. If we have a
686     // state change from
687     //     + True -> True: we should process the data, it might be a bucket boundary
688     //     + True -> False: we als need to process the data.
689     bool shouldSkipForPulledMetric = mIsPulled && !mUseDiff
690             && mCondition != ConditionState::kTrue;
691     if (shouldSkipForPushMetric || shouldSkipForPulledMetric) {
692         VLOG("ValueMetric skip event because condition is false");
693         return;
694     }
695 
696     if (hitGuardRailLocked(eventKey)) {
697         return;
698     }
699     vector<Interval>& multiIntervals = mCurrentSlicedBucket[eventKey];
700     if (multiIntervals.size() < mFieldMatchers.size()) {
701         VLOG("Resizing number of intervals to %d", (int)mFieldMatchers.size());
702         multiIntervals.resize(mFieldMatchers.size());
703     }
704 
705     // We only use anomaly detection under certain cases.
706     // N.B.: The anomaly detection cases were modified in order to fix an issue with value metrics
707     // containing multiple values. We tried to retain all previous behaviour, but we are unsure the
708     // previous behaviour was correct. At the time of the fix, anomaly detection had no owner.
709     // Whoever next works on it should look into the cases where it is triggered in this function.
710     // Discussion here: http://ag/6124370.
711     bool useAnomalyDetection = true;
712 
713     for (int i = 0; i < (int)mFieldMatchers.size(); i++) {
714         const Matcher& matcher = mFieldMatchers[i];
715         Interval& interval = multiIntervals[i];
716         interval.valueIndex = i;
717         Value value;
718         if (!getDoubleOrLong(event, matcher, value)) {
719             VLOG("Failed to get value %d from event %s", i, event.ToString().c_str());
720             StatsdStats::getInstance().noteBadValueType(mMetricId);
721             return;
722         }
723         interval.seenNewData = true;
724 
725         if (mUseDiff) {
726             if (!interval.hasBase) {
727                 if (mHasGlobalBase && mUseZeroDefaultBase) {
728                     // The bucket has global base. This key does not.
729                     // Optionally use zero as base.
730                     interval.base = (value.type == LONG ? ZERO_LONG : ZERO_DOUBLE);
731                     interval.hasBase = true;
732                 } else {
733                     // no base. just update base and return.
734                     interval.base = value;
735                     interval.hasBase = true;
736                     // If we're missing a base, do not use anomaly detection on incomplete data
737                     useAnomalyDetection = false;
738                     // Continue (instead of return) here in order to set interval.base and
739                     // interval.hasBase for other intervals
740                     continue;
741                 }
742             }
743             Value diff;
744             switch (mValueDirection) {
745                 case ValueMetric::INCREASING:
746                     if (value >= interval.base) {
747                         diff = value - interval.base;
748                     } else if (mUseAbsoluteValueOnReset) {
749                         diff = value;
750                     } else {
751                         VLOG("Unexpected decreasing value");
752                         StatsdStats::getInstance().notePullDataError(mPullTagId);
753                         interval.base = value;
754                         // If we've got bad data, do not use anomaly detection
755                         useAnomalyDetection = false;
756                         continue;
757                     }
758                     break;
759                 case ValueMetric::DECREASING:
760                     if (interval.base >= value) {
761                         diff = interval.base - value;
762                     } else if (mUseAbsoluteValueOnReset) {
763                         diff = value;
764                     } else {
765                         VLOG("Unexpected increasing value");
766                         StatsdStats::getInstance().notePullDataError(mPullTagId);
767                         interval.base = value;
768                         // If we've got bad data, do not use anomaly detection
769                         useAnomalyDetection = false;
770                         continue;
771                     }
772                     break;
773                 case ValueMetric::ANY:
774                     diff = value - interval.base;
775                     break;
776                 default:
777                     break;
778             }
779             interval.base = value;
780             value = diff;
781         }
782 
783         if (interval.hasValue) {
784             switch (mAggregationType) {
785                 case ValueMetric::SUM:
786                     // for AVG, we add up and take average when flushing the bucket
787                 case ValueMetric::AVG:
788                     interval.value += value;
789                     break;
790                 case ValueMetric::MIN:
791                     interval.value = std::min(value, interval.value);
792                     break;
793                 case ValueMetric::MAX:
794                     interval.value = std::max(value, interval.value);
795                     break;
796                 default:
797                     break;
798             }
799         } else {
800             interval.value = value;
801             interval.hasValue = true;
802         }
803         interval.sampleSize += 1;
804     }
805 
806     // Only trigger the tracker if all intervals are correct
807     if (useAnomalyDetection) {
808         // TODO: propgate proper values down stream when anomaly support doubles
809         long wholeBucketVal = multiIntervals[0].value.long_value;
810         auto prev = mCurrentFullBucket.find(eventKey);
811         if (prev != mCurrentFullBucket.end()) {
812             wholeBucketVal += prev->second;
813         }
814         for (auto& tracker : mAnomalyTrackers) {
815             tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, mMetricId, eventKey,
816                                              wholeBucketVal);
817         }
818     }
819 }
820 
821 // For pulled metrics, we always need to make sure we do a pull before flushing the bucket
822 // if mCondition is true!
flushIfNeededLocked(const int64_t & eventTimeNs)823 void ValueMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) {
824     int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs();
825     if (eventTimeNs < currentBucketEndTimeNs) {
826         VLOG("eventTime is %lld, less than next bucket start time %lld", (long long)eventTimeNs,
827              (long long)(currentBucketEndTimeNs));
828         return;
829     }
830     int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs);
831     int64_t nextBucketStartTimeNs = currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs;
832     flushCurrentBucketLocked(eventTimeNs, nextBucketStartTimeNs);
833 }
834 
calcBucketsForwardCount(const int64_t & eventTimeNs) const835 int64_t ValueMetricProducer::calcBucketsForwardCount(const int64_t& eventTimeNs) const {
836     int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs();
837     if (eventTimeNs < currentBucketEndTimeNs) {
838         return 0;
839     }
840     return 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs;
841 }
842 
flushCurrentBucketLocked(const int64_t & eventTimeNs,const int64_t & nextBucketStartTimeNs)843 void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs,
844                                                    const int64_t& nextBucketStartTimeNs) {
845     if (mCondition == ConditionState::kUnknown) {
846         StatsdStats::getInstance().noteBucketUnknownCondition(mMetricId);
847     }
848 
849     int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs);
850     if (numBucketsForward > 1) {
851         VLOG("Skipping forward %lld buckets", (long long)numBucketsForward);
852         StatsdStats::getInstance().noteSkippedForwardBuckets(mMetricId);
853         // Something went wrong. Maybe the device was sleeping for a long time. It is better
854         // to mark the current bucket as invalid. The last pull might have been successful through.
855         invalidateCurrentBucketWithoutResetBase();
856     }
857 
858     VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs,
859          (int)mCurrentSlicedBucket.size());
860     int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
861     int64_t bucketEndTime = eventTimeNs < fullBucketEndTimeNs ? eventTimeNs : fullBucketEndTimeNs;
862     // Close the current bucket.
863     int64_t conditionTrueDuration = mConditionTimer.newBucketStart(bucketEndTime);
864     bool isBucketLargeEnough = bucketEndTime - mCurrentBucketStartTimeNs >= mMinBucketSizeNs;
865     if (isBucketLargeEnough && !mCurrentBucketIsInvalid) {
866         // The current bucket is large enough to keep.
867         for (const auto& slice : mCurrentSlicedBucket) {
868             ValueBucket bucket = buildPartialBucket(bucketEndTime, slice.second);
869             bucket.mConditionTrueNs = conditionTrueDuration;
870             // it will auto create new vector of ValuebucketInfo if the key is not found.
871             if (bucket.valueIndex.size() > 0) {
872                 auto& bucketList = mPastBuckets[slice.first];
873                 bucketList.push_back(bucket);
874             }
875         }
876     } else {
877         mSkippedBuckets.emplace_back(mCurrentBucketStartTimeNs, bucketEndTime);
878     }
879 
880     appendToFullBucket(eventTimeNs, fullBucketEndTimeNs);
881     initCurrentSlicedBucket(nextBucketStartTimeNs);
882     // Update the condition timer again, in case we skipped buckets.
883     mConditionTimer.newBucketStart(nextBucketStartTimeNs);
884     mCurrentBucketNum += numBucketsForward;
885 }
886 
buildPartialBucket(int64_t bucketEndTime,const std::vector<Interval> & intervals)887 ValueBucket ValueMetricProducer::buildPartialBucket(int64_t bucketEndTime,
888                                                     const std::vector<Interval>& intervals) {
889     ValueBucket bucket;
890     bucket.mBucketStartNs = mCurrentBucketStartTimeNs;
891     bucket.mBucketEndNs = bucketEndTime;
892     for (const auto& interval : intervals) {
893         if (interval.hasValue) {
894             // skip the output if the diff is zero
895             if (mSkipZeroDiffOutput && mUseDiff && interval.value.isZero()) {
896                 continue;
897             }
898             bucket.valueIndex.push_back(interval.valueIndex);
899             if (mAggregationType != ValueMetric::AVG) {
900                 bucket.values.push_back(interval.value);
901             } else {
902                 double sum = interval.value.type == LONG ? (double)interval.value.long_value
903                                                          : interval.value.double_value;
904                 bucket.values.push_back(Value((double)sum / interval.sampleSize));
905             }
906         }
907     }
908     return bucket;
909 }
910 
initCurrentSlicedBucket(int64_t nextBucketStartTimeNs)911 void ValueMetricProducer::initCurrentSlicedBucket(int64_t nextBucketStartTimeNs) {
912     StatsdStats::getInstance().noteBucketCount(mMetricId);
913     // Cleanup data structure to aggregate values.
914     for (auto it = mCurrentSlicedBucket.begin(); it != mCurrentSlicedBucket.end();) {
915         bool obsolete = true;
916         for (auto& interval : it->second) {
917             interval.hasValue = false;
918             interval.sampleSize = 0;
919             if (interval.seenNewData) {
920                 obsolete = false;
921             }
922             interval.seenNewData = false;
923         }
924 
925         if (obsolete) {
926             it = mCurrentSlicedBucket.erase(it);
927         } else {
928             it++;
929         }
930     }
931 
932     mCurrentBucketIsInvalid = false;
933     // If we do not have a global base when the condition is true,
934     // we will have incomplete bucket for the next bucket.
935     if (mUseDiff && !mHasGlobalBase && mCondition) {
936         mCurrentBucketIsInvalid = false;
937     }
938     mCurrentBucketStartTimeNs = nextBucketStartTimeNs;
939     VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId,
940          (long long)mCurrentBucketStartTimeNs);
941 }
942 
appendToFullBucket(int64_t eventTimeNs,int64_t fullBucketEndTimeNs)943 void ValueMetricProducer::appendToFullBucket(int64_t eventTimeNs, int64_t fullBucketEndTimeNs) {
944     bool isFullBucketReached = eventTimeNs > fullBucketEndTimeNs;
945     if (mCurrentBucketIsInvalid) {
946         if (isFullBucketReached) {
947             // If the bucket is invalid, we ignore the full bucket since it contains invalid data.
948             mCurrentFullBucket.clear();
949         }
950         // Current bucket is invalid, we do not add it to the full bucket.
951         return;
952     }
953 
954     if (isFullBucketReached) {  // If full bucket, send to anomaly tracker.
955         // Accumulate partial buckets with current value and then send to anomaly tracker.
956         if (mCurrentFullBucket.size() > 0) {
957             for (const auto& slice : mCurrentSlicedBucket) {
958                 if (hitFullBucketGuardRailLocked(slice.first)) {
959                     continue;
960                 }
961                 // TODO: fix this when anomaly can accept double values
962                 auto& interval = slice.second[0];
963                 if (interval.hasValue) {
964                     mCurrentFullBucket[slice.first] += interval.value.long_value;
965                 }
966             }
967             for (const auto& slice : mCurrentFullBucket) {
968                 for (auto& tracker : mAnomalyTrackers) {
969                     if (tracker != nullptr) {
970                         tracker->addPastBucket(slice.first, slice.second, mCurrentBucketNum);
971                     }
972                 }
973             }
974             mCurrentFullBucket.clear();
975         } else {
976             // Skip aggregating the partial buckets since there's no previous partial bucket.
977             for (const auto& slice : mCurrentSlicedBucket) {
978                 for (auto& tracker : mAnomalyTrackers) {
979                     if (tracker != nullptr) {
980                         // TODO: fix this when anomaly can accept double values
981                         auto& interval = slice.second[0];
982                         if (interval.hasValue) {
983                             tracker->addPastBucket(slice.first, interval.value.long_value,
984                                                    mCurrentBucketNum);
985                         }
986                     }
987                 }
988             }
989         }
990     } else {
991         // Accumulate partial bucket.
992         for (const auto& slice : mCurrentSlicedBucket) {
993             // TODO: fix this when anomaly can accept double values
994             auto& interval = slice.second[0];
995             if (interval.hasValue) {
996                 mCurrentFullBucket[slice.first] += interval.value.long_value;
997             }
998         }
999     }
1000 }
1001 
byteSizeLocked() const1002 size_t ValueMetricProducer::byteSizeLocked() const {
1003     size_t totalSize = 0;
1004     for (const auto& pair : mPastBuckets) {
1005         totalSize += pair.second.size() * kBucketSize;
1006     }
1007     return totalSize;
1008 }
1009 
1010 }  // namespace statsd
1011 }  // namespace os
1012 }  // namespace android
1013