1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #define DEBUG false  // STOPSHIP if true
17 #include "Log.h"
18 
19 #include "ShellSubscriber.h"
20 
21 #include "matchers/matcher_util.h"
22 #include "stats_log_util.h"
23 
24 using android::util::ProtoOutputStream;
25 
26 namespace android {
27 namespace os {
28 namespace statsd {
29 
30 const static int FIELD_ID_ATOM = 1;
31 
startNewSubscription(int in,int out,sp<IResultReceiver> resultReceiver,int timeoutSec)32 void ShellSubscriber::startNewSubscription(int in, int out, sp<IResultReceiver> resultReceiver,
33                                            int timeoutSec) {
34     VLOG("start new shell subscription");
35     {
36         std::lock_guard<std::mutex> lock(mMutex);
37         if (mResultReceiver != nullptr) {
38             VLOG("Only one shell subscriber is allowed.");
39             return;
40         }
41         mInput = in;
42         mOutput = out;
43         mResultReceiver = resultReceiver;
44         IInterface::asBinder(mResultReceiver)->linkToDeath(this);
45     }
46 
47     // Note that the following is blocking, and it's intended as we cannot return until the shell
48     // cmd exits, otherwise all resources & FDs will be automatically closed.
49 
50     // Read config forever until EOF is reached. Clients may send multiple configs -- each new
51     // config replace the previous one.
52     readConfig(in);
53     VLOG("timeout : %d", timeoutSec);
54 
55     // Now we have read an EOF we now wait for the semaphore until the client exits.
56     VLOG("Now wait for client to exit");
57     std::unique_lock<std::mutex> lk(mMutex);
58 
59     if (timeoutSec > 0) {
60         mShellDied.wait_for(lk, timeoutSec * 1s,
61                             [this, resultReceiver] { return mResultReceiver != resultReceiver; });
62     } else {
63         mShellDied.wait(lk, [this, resultReceiver] { return mResultReceiver != resultReceiver; });
64     }
65 }
66 
updateConfig(const ShellSubscription & config)67 void ShellSubscriber::updateConfig(const ShellSubscription& config) {
68     std::lock_guard<std::mutex> lock(mMutex);
69     mPushedMatchers.clear();
70     mPulledInfo.clear();
71 
72     for (const auto& pushed : config.pushed()) {
73         mPushedMatchers.push_back(pushed);
74         VLOG("adding matcher for atom %d", pushed.atom_id());
75     }
76 
77     int64_t token = getElapsedRealtimeNs();
78     mPullToken = token;
79 
80     int64_t minInterval = -1;
81     for (const auto& pulled : config.pulled()) {
82         // All intervals need to be multiples of the min interval.
83         if (minInterval < 0 || pulled.freq_millis() < minInterval) {
84             minInterval = pulled.freq_millis();
85         }
86 
87         mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis());
88         VLOG("adding matcher for pulled atom %d", pulled.matcher().atom_id());
89     }
90 
91     if (mPulledInfo.size() > 0 && minInterval > 0) {
92         // This thread is guaranteed to terminate after it detects the token is different or
93         // cleaned up.
94         std::thread puller([token, minInterval, this] { startPull(token, minInterval); });
95         puller.detach();
96     }
97 }
98 
writeToOutputLocked(const vector<std::shared_ptr<LogEvent>> & data,const SimpleAtomMatcher & matcher)99 void ShellSubscriber::writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data,
100                                           const SimpleAtomMatcher& matcher) {
101     if (mOutput == 0) return;
102     int count = 0;
103     mProto.clear();
104     for (const auto& event : data) {
105         VLOG("%s", event->ToString().c_str());
106         if (matchesSimple(*mUidMap, matcher, *event)) {
107             VLOG("matched");
108             count++;
109             uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
110                                               util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
111             event->ToProto(mProto);
112             mProto.end(atomToken);
113         }
114     }
115 
116     if (count > 0) {
117         // First write the payload size.
118         size_t bufferSize = mProto.size();
119         write(mOutput, &bufferSize, sizeof(bufferSize));
120         VLOG("%d atoms, proto size: %zu", count, bufferSize);
121         // Then write the payload.
122         mProto.flush(mOutput);
123     }
124     mProto.clear();
125 }
126 
startPull(int64_t token,int64_t intervalMillis)127 void ShellSubscriber::startPull(int64_t token, int64_t intervalMillis) {
128     while (1) {
129         int64_t nowMillis = getElapsedRealtimeMillis();
130         {
131             std::lock_guard<std::mutex> lock(mMutex);
132             if (mPulledInfo.size() == 0 || mPullToken != token) {
133                 VLOG("Pulling thread %lld done!", (long long)token);
134                 return;
135             }
136             for (auto& pullInfo : mPulledInfo) {
137                 if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) {
138                     VLOG("pull atom %d now", pullInfo.mPullerMatcher.atom_id());
139 
140                     vector<std::shared_ptr<LogEvent>> data;
141                     mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), &data);
142                     VLOG("pulled %zu atoms", data.size());
143                     if (data.size() > 0) {
144                         writeToOutputLocked(data, pullInfo.mPullerMatcher);
145                     }
146                     pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
147                 }
148             }
149         }
150         VLOG("Pulling thread %lld sleep....", (long long)token);
151         std::this_thread::sleep_for(std::chrono::milliseconds(intervalMillis));
152     }
153 }
154 
readConfig(int in)155 void ShellSubscriber::readConfig(int in) {
156     if (in <= 0) {
157         return;
158     }
159 
160     while (1) {
161         size_t bufferSize = 0;
162         int result = 0;
163         if ((result = read(in, &bufferSize, sizeof(bufferSize))) == 0) {
164             VLOG("Done reading");
165             break;
166         } else if (result < 0 || result != sizeof(bufferSize)) {
167             ALOGE("Error reading config size");
168             break;
169         }
170 
171         vector<uint8_t> buffer(bufferSize);
172         if ((result = read(in, buffer.data(), bufferSize)) > 0 && ((size_t)result) == bufferSize) {
173             ShellSubscription config;
174             if (config.ParseFromArray(buffer.data(), bufferSize)) {
175                 updateConfig(config);
176             } else {
177                 ALOGE("error parsing the config");
178                 break;
179             }
180         } else {
181             VLOG("Error reading the config, returned: %d, expecting %zu", result, bufferSize);
182             break;
183         }
184     }
185 }
186 
cleanUpLocked()187 void ShellSubscriber::cleanUpLocked() {
188     // The file descriptors will be closed by binder.
189     mInput = 0;
190     mOutput = 0;
191     mResultReceiver = nullptr;
192     mPushedMatchers.clear();
193     mPulledInfo.clear();
194     mPullToken = 0;
195     VLOG("done clean up");
196 }
197 
onLogEvent(const LogEvent & event)198 void ShellSubscriber::onLogEvent(const LogEvent& event) {
199     std::lock_guard<std::mutex> lock(mMutex);
200 
201     if (mOutput <= 0) {
202         return;
203     }
204     for (const auto& matcher : mPushedMatchers) {
205         if (matchesSimple(*mUidMap, matcher, event)) {
206             VLOG("%s", event.ToString().c_str());
207             uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
208                                               util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
209             event.ToProto(mProto);
210             mProto.end(atomToken);
211             // First write the payload size.
212             size_t bufferSize = mProto.size();
213             write(mOutput, &bufferSize, sizeof(bufferSize));
214 
215             // Then write the payload.
216             mProto.flush(mOutput);
217             mProto.clear();
218             break;
219         }
220     }
221 }
222 
binderDied(const wp<IBinder> & who)223 void ShellSubscriber::binderDied(const wp<IBinder>& who) {
224     {
225         VLOG("Shell exits");
226         std::lock_guard<std::mutex> lock(mMutex);
227         cleanUpLocked();
228     }
229     mShellDied.notify_all();
230 }
231 
232 }  // namespace statsd
233 }  // namespace os
234 }  // namespace android
235