1 /*
2 * Copyright (C) 2014 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "LogReaderThread.h"
18
19 #include <errno.h>
20 #include <string.h>
21 #include <sys/prctl.h>
22
23 #include <thread>
24
25 #include "LogBuffer.h"
26 #include "LogReaderList.h"
27
LogReaderThread(LogBuffer * log_buffer,LogReaderList * reader_list,std::unique_ptr<LogWriter> writer,bool non_block,unsigned long tail,LogMask log_mask,pid_t pid,log_time start_time,uint64_t start,std::chrono::steady_clock::time_point deadline)28 LogReaderThread::LogReaderThread(LogBuffer* log_buffer, LogReaderList* reader_list,
29 std::unique_ptr<LogWriter> writer, bool non_block,
30 unsigned long tail, LogMask log_mask, pid_t pid,
31 log_time start_time, uint64_t start,
32 std::chrono::steady_clock::time_point deadline)
33 : log_buffer_(log_buffer),
34 reader_list_(reader_list),
35 writer_(std::move(writer)),
36 pid_(pid),
37 tail_(tail),
38 count_(0),
39 index_(0),
40 start_time_(start_time),
41 deadline_(deadline),
42 non_block_(non_block) {
43 cleanSkip_Locked();
44 flush_to_state_ = log_buffer_->CreateFlushToState(start, log_mask);
45 auto thread = std::thread{&LogReaderThread::ThreadFunction, this};
46 thread.detach();
47 }
48
ThreadFunction()49 void LogReaderThread::ThreadFunction() {
50 prctl(PR_SET_NAME, "logd.reader.per");
51
52 auto lock = std::unique_lock{reader_list_->reader_threads_lock()};
53
54 while (!release_) {
55 if (deadline_.time_since_epoch().count() != 0) {
56 if (thread_triggered_condition_.wait_until(lock, deadline_) ==
57 std::cv_status::timeout) {
58 deadline_ = {};
59 }
60 if (release_) {
61 break;
62 }
63 }
64
65 lock.unlock();
66
67 if (tail_) {
68 auto first_pass_state = log_buffer_->CreateFlushToState(flush_to_state_->start(),
69 flush_to_state_->log_mask());
70 log_buffer_->FlushTo(
71 writer_.get(), *first_pass_state,
72 [this](log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime) {
73 return FilterFirstPass(log_id, pid, sequence, realtime);
74 });
75 }
76 bool flush_success = log_buffer_->FlushTo(
77 writer_.get(), *flush_to_state_,
78 [this](log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime) {
79 return FilterSecondPass(log_id, pid, sequence, realtime);
80 });
81
82 // We only ignore entries before the original start time for the first flushTo(), if we
83 // get entries after this first flush before the original start time, then the client
84 // wouldn't have seen them.
85 // Note: this is still racy and may skip out of order events that came in since the last
86 // time the client disconnected and then reconnected with the new start time. The long term
87 // solution here is that clients must request events since a specific sequence number.
88 start_time_.tv_sec = 0;
89 start_time_.tv_nsec = 0;
90
91 lock.lock();
92
93 if (!flush_success) {
94 break;
95 }
96
97 if (non_block_ || release_) {
98 break;
99 }
100
101 cleanSkip_Locked();
102
103 if (deadline_.time_since_epoch().count() == 0) {
104 thread_triggered_condition_.wait(lock);
105 }
106 }
107
108 writer_->Release();
109
110 auto& log_reader_threads = reader_list_->reader_threads();
111 auto it = std::find_if(log_reader_threads.begin(), log_reader_threads.end(),
112 [this](const auto& other) { return other.get() == this; });
113
114 if (it != log_reader_threads.end()) {
115 log_reader_threads.erase(it);
116 }
117 }
118
119 // A first pass to count the number of elements
FilterFirstPass(log_id_t,pid_t pid,uint64_t,log_time realtime)120 FilterResult LogReaderThread::FilterFirstPass(log_id_t, pid_t pid, uint64_t, log_time realtime) {
121 auto lock = std::lock_guard{reader_list_->reader_threads_lock()};
122
123 if ((!pid_ || pid_ == pid) && (start_time_ == log_time::EPOCH || start_time_ <= realtime)) {
124 ++count_;
125 }
126
127 return FilterResult::kSkip;
128 }
129
130 // A second pass to send the selected elements
FilterSecondPass(log_id_t log_id,pid_t pid,uint64_t,log_time realtime)131 FilterResult LogReaderThread::FilterSecondPass(log_id_t log_id, pid_t pid, uint64_t,
132 log_time realtime) {
133 auto lock = std::lock_guard{reader_list_->reader_threads_lock()};
134
135 if (skip_ahead_[log_id]) {
136 skip_ahead_[log_id]--;
137 return FilterResult::kSkip;
138 }
139
140 // Truncate to close race between first and second pass
141 if (non_block_ && tail_ && index_ >= count_) {
142 return FilterResult::kStop;
143 }
144
145 if (pid_ && pid_ != pid) {
146 return FilterResult::kSkip;
147 }
148
149 if (start_time_ != log_time::EPOCH && realtime <= start_time_) {
150 return FilterResult::kSkip;
151 }
152
153 if (release_) {
154 return FilterResult::kStop;
155 }
156
157 if (!tail_) {
158 goto ok;
159 }
160
161 ++index_;
162
163 if (count_ > tail_ && index_ <= (count_ - tail_)) {
164 return FilterResult::kSkip;
165 }
166
167 if (!non_block_) {
168 tail_ = 0;
169 }
170
171 ok:
172 if (!skip_ahead_[log_id]) {
173 return FilterResult::kWrite;
174 }
175 return FilterResult::kSkip;
176 }
177
cleanSkip_Locked(void)178 void LogReaderThread::cleanSkip_Locked(void) {
179 memset(skip_ahead_, 0, sizeof(skip_ahead_));
180 }
181