1 /*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #define DEBUG false // STOPSHIP if true
17 #include "Log.h"
18
19 #include "ShellSubscriber.h"
20
21 #include "matchers/matcher_util.h"
22 #include "stats_log_util.h"
23
24 using android::util::ProtoOutputStream;
25
26 namespace android {
27 namespace os {
28 namespace statsd {
29
30 const static int FIELD_ID_ATOM = 1;
31
startNewSubscription(int in,int out,sp<IResultReceiver> resultReceiver,int timeoutSec)32 void ShellSubscriber::startNewSubscription(int in, int out, sp<IResultReceiver> resultReceiver,
33 int timeoutSec) {
34 VLOG("start new shell subscription");
35 {
36 std::lock_guard<std::mutex> lock(mMutex);
37 if (mResultReceiver != nullptr) {
38 VLOG("Only one shell subscriber is allowed.");
39 return;
40 }
41 mInput = in;
42 mOutput = out;
43 mResultReceiver = resultReceiver;
44 IInterface::asBinder(mResultReceiver)->linkToDeath(this);
45 }
46
47 // Note that the following is blocking, and it's intended as we cannot return until the shell
48 // cmd exits, otherwise all resources & FDs will be automatically closed.
49
50 // Read config forever until EOF is reached. Clients may send multiple configs -- each new
51 // config replace the previous one.
52 readConfig(in);
53 VLOG("timeout : %d", timeoutSec);
54
55 // Now we have read an EOF we now wait for the semaphore until the client exits.
56 VLOG("Now wait for client to exit");
57 std::unique_lock<std::mutex> lk(mMutex);
58
59 if (timeoutSec > 0) {
60 mShellDied.wait_for(lk, timeoutSec * 1s,
61 [this, resultReceiver] { return mResultReceiver != resultReceiver; });
62 } else {
63 mShellDied.wait(lk, [this, resultReceiver] { return mResultReceiver != resultReceiver; });
64 }
65 }
66
updateConfig(const ShellSubscription & config)67 void ShellSubscriber::updateConfig(const ShellSubscription& config) {
68 std::lock_guard<std::mutex> lock(mMutex);
69 mPushedMatchers.clear();
70 mPulledInfo.clear();
71
72 for (const auto& pushed : config.pushed()) {
73 mPushedMatchers.push_back(pushed);
74 VLOG("adding matcher for atom %d", pushed.atom_id());
75 }
76
77 int64_t token = getElapsedRealtimeNs();
78 mPullToken = token;
79
80 int64_t minInterval = -1;
81 for (const auto& pulled : config.pulled()) {
82 // All intervals need to be multiples of the min interval.
83 if (minInterval < 0 || pulled.freq_millis() < minInterval) {
84 minInterval = pulled.freq_millis();
85 }
86
87 mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis());
88 VLOG("adding matcher for pulled atom %d", pulled.matcher().atom_id());
89 }
90
91 if (mPulledInfo.size() > 0 && minInterval > 0) {
92 // This thread is guaranteed to terminate after it detects the token is different or
93 // cleaned up.
94 std::thread puller([token, minInterval, this] { startPull(token, minInterval); });
95 puller.detach();
96 }
97 }
98
writeToOutputLocked(const vector<std::shared_ptr<LogEvent>> & data,const SimpleAtomMatcher & matcher)99 void ShellSubscriber::writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data,
100 const SimpleAtomMatcher& matcher) {
101 if (mOutput == 0) return;
102 int count = 0;
103 mProto.clear();
104 for (const auto& event : data) {
105 VLOG("%s", event->ToString().c_str());
106 if (matchesSimple(*mUidMap, matcher, *event)) {
107 VLOG("matched");
108 count++;
109 uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
110 util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
111 event->ToProto(mProto);
112 mProto.end(atomToken);
113 }
114 }
115
116 if (count > 0) {
117 // First write the payload size.
118 size_t bufferSize = mProto.size();
119 write(mOutput, &bufferSize, sizeof(bufferSize));
120 VLOG("%d atoms, proto size: %zu", count, bufferSize);
121 // Then write the payload.
122 mProto.flush(mOutput);
123 }
124 mProto.clear();
125 }
126
startPull(int64_t token,int64_t intervalMillis)127 void ShellSubscriber::startPull(int64_t token, int64_t intervalMillis) {
128 while (1) {
129 int64_t nowMillis = getElapsedRealtimeMillis();
130 {
131 std::lock_guard<std::mutex> lock(mMutex);
132 if (mPulledInfo.size() == 0 || mPullToken != token) {
133 VLOG("Pulling thread %lld done!", (long long)token);
134 return;
135 }
136 for (auto& pullInfo : mPulledInfo) {
137 if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) {
138 VLOG("pull atom %d now", pullInfo.mPullerMatcher.atom_id());
139
140 vector<std::shared_ptr<LogEvent>> data;
141 mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), &data);
142 VLOG("pulled %zu atoms", data.size());
143 if (data.size() > 0) {
144 writeToOutputLocked(data, pullInfo.mPullerMatcher);
145 }
146 pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
147 }
148 }
149 }
150 VLOG("Pulling thread %lld sleep....", (long long)token);
151 std::this_thread::sleep_for(std::chrono::milliseconds(intervalMillis));
152 }
153 }
154
readConfig(int in)155 void ShellSubscriber::readConfig(int in) {
156 if (in <= 0) {
157 return;
158 }
159
160 while (1) {
161 size_t bufferSize = 0;
162 int result = 0;
163 if ((result = read(in, &bufferSize, sizeof(bufferSize))) == 0) {
164 VLOG("Done reading");
165 break;
166 } else if (result < 0 || result != sizeof(bufferSize)) {
167 ALOGE("Error reading config size");
168 break;
169 }
170
171 vector<uint8_t> buffer(bufferSize);
172 if ((result = read(in, buffer.data(), bufferSize)) > 0 && ((size_t)result) == bufferSize) {
173 ShellSubscription config;
174 if (config.ParseFromArray(buffer.data(), bufferSize)) {
175 updateConfig(config);
176 } else {
177 ALOGE("error parsing the config");
178 break;
179 }
180 } else {
181 VLOG("Error reading the config, returned: %d, expecting %zu", result, bufferSize);
182 break;
183 }
184 }
185 }
186
cleanUpLocked()187 void ShellSubscriber::cleanUpLocked() {
188 // The file descriptors will be closed by binder.
189 mInput = 0;
190 mOutput = 0;
191 mResultReceiver = nullptr;
192 mPushedMatchers.clear();
193 mPulledInfo.clear();
194 mPullToken = 0;
195 VLOG("done clean up");
196 }
197
onLogEvent(const LogEvent & event)198 void ShellSubscriber::onLogEvent(const LogEvent& event) {
199 std::lock_guard<std::mutex> lock(mMutex);
200
201 if (mOutput <= 0) {
202 return;
203 }
204 for (const auto& matcher : mPushedMatchers) {
205 if (matchesSimple(*mUidMap, matcher, event)) {
206 VLOG("%s", event.ToString().c_str());
207 uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
208 util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
209 event.ToProto(mProto);
210 mProto.end(atomToken);
211 // First write the payload size.
212 size_t bufferSize = mProto.size();
213 write(mOutput, &bufferSize, sizeof(bufferSize));
214
215 // Then write the payload.
216 mProto.flush(mOutput);
217 mProto.clear();
218 break;
219 }
220 }
221 }
222
binderDied(const wp<IBinder> & who)223 void ShellSubscriber::binderDied(const wp<IBinder>& who) {
224 {
225 VLOG("Shell exits");
226 std::lock_guard<std::mutex> lock(mMutex);
227 cleanUpLocked();
228 }
229 mShellDied.notify_all();
230 }
231
232 } // namespace statsd
233 } // namespace os
234 } // namespace android
235