1 /*
2  * Copyright (C) 2014 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "LogReaderThread.h"
18 
19 #include <errno.h>
20 #include <string.h>
21 #include <sys/prctl.h>
22 
23 #include <thread>
24 
25 #include "LogBuffer.h"
26 #include "LogReaderList.h"
27 
LogReaderThread(LogBuffer * log_buffer,LogReaderList * reader_list,std::unique_ptr<LogWriter> writer,bool non_block,unsigned long tail,LogMask log_mask,pid_t pid,log_time start_time,uint64_t start,std::chrono::steady_clock::time_point deadline)28 LogReaderThread::LogReaderThread(LogBuffer* log_buffer, LogReaderList* reader_list,
29                                  std::unique_ptr<LogWriter> writer, bool non_block,
30                                  unsigned long tail, LogMask log_mask, pid_t pid,
31                                  log_time start_time, uint64_t start,
32                                  std::chrono::steady_clock::time_point deadline)
33     : log_buffer_(log_buffer),
34       reader_list_(reader_list),
35       writer_(std::move(writer)),
36       pid_(pid),
37       tail_(tail),
38       count_(0),
39       index_(0),
40       start_time_(start_time),
41       deadline_(deadline),
42       non_block_(non_block) {
43     cleanSkip_Locked();
44     flush_to_state_ = log_buffer_->CreateFlushToState(start, log_mask);
45     auto thread = std::thread{&LogReaderThread::ThreadFunction, this};
46     thread.detach();
47 }
48 
ThreadFunction()49 void LogReaderThread::ThreadFunction() {
50     prctl(PR_SET_NAME, "logd.reader.per");
51 
52     auto lock = std::unique_lock{reader_list_->reader_threads_lock()};
53 
54     while (!release_) {
55         if (deadline_.time_since_epoch().count() != 0) {
56             if (thread_triggered_condition_.wait_until(lock, deadline_) ==
57                 std::cv_status::timeout) {
58                 deadline_ = {};
59             }
60             if (release_) {
61                 break;
62             }
63         }
64 
65         lock.unlock();
66 
67         if (tail_) {
68             auto first_pass_state = log_buffer_->CreateFlushToState(flush_to_state_->start(),
69                                                                     flush_to_state_->log_mask());
70             log_buffer_->FlushTo(
71                     writer_.get(), *first_pass_state,
72                     [this](log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime) {
73                         return FilterFirstPass(log_id, pid, sequence, realtime);
74                     });
75         }
76         bool flush_success = log_buffer_->FlushTo(
77                 writer_.get(), *flush_to_state_,
78                 [this](log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime) {
79                     return FilterSecondPass(log_id, pid, sequence, realtime);
80                 });
81 
82         // We only ignore entries before the original start time for the first flushTo(), if we
83         // get entries after this first flush before the original start time, then the client
84         // wouldn't have seen them.
85         // Note: this is still racy and may skip out of order events that came in since the last
86         // time the client disconnected and then reconnected with the new start time.  The long term
87         // solution here is that clients must request events since a specific sequence number.
88         start_time_.tv_sec = 0;
89         start_time_.tv_nsec = 0;
90 
91         lock.lock();
92 
93         if (!flush_success) {
94             break;
95         }
96 
97         if (non_block_ || release_) {
98             break;
99         }
100 
101         cleanSkip_Locked();
102 
103         if (deadline_.time_since_epoch().count() == 0) {
104             thread_triggered_condition_.wait(lock);
105         }
106     }
107 
108     writer_->Release();
109 
110     auto& log_reader_threads = reader_list_->reader_threads();
111     auto it = std::find_if(log_reader_threads.begin(), log_reader_threads.end(),
112                            [this](const auto& other) { return other.get() == this; });
113 
114     if (it != log_reader_threads.end()) {
115         log_reader_threads.erase(it);
116     }
117 }
118 
119 // A first pass to count the number of elements
FilterFirstPass(log_id_t,pid_t pid,uint64_t,log_time realtime)120 FilterResult LogReaderThread::FilterFirstPass(log_id_t, pid_t pid, uint64_t, log_time realtime) {
121     auto lock = std::lock_guard{reader_list_->reader_threads_lock()};
122 
123     if ((!pid_ || pid_ == pid) && (start_time_ == log_time::EPOCH || start_time_ <= realtime)) {
124         ++count_;
125     }
126 
127     return FilterResult::kSkip;
128 }
129 
130 // A second pass to send the selected elements
FilterSecondPass(log_id_t log_id,pid_t pid,uint64_t,log_time realtime)131 FilterResult LogReaderThread::FilterSecondPass(log_id_t log_id, pid_t pid, uint64_t,
132                                                log_time realtime) {
133     auto lock = std::lock_guard{reader_list_->reader_threads_lock()};
134 
135     if (skip_ahead_[log_id]) {
136         skip_ahead_[log_id]--;
137         return FilterResult::kSkip;
138     }
139 
140     // Truncate to close race between first and second pass
141     if (non_block_ && tail_ && index_ >= count_) {
142         return FilterResult::kStop;
143     }
144 
145     if (pid_ && pid_ != pid) {
146         return FilterResult::kSkip;
147     }
148 
149     if (start_time_ != log_time::EPOCH && realtime <= start_time_) {
150         return FilterResult::kSkip;
151     }
152 
153     if (release_) {
154         return FilterResult::kStop;
155     }
156 
157     if (!tail_) {
158         goto ok;
159     }
160 
161     ++index_;
162 
163     if (count_ > tail_ && index_ <= (count_ - tail_)) {
164         return FilterResult::kSkip;
165     }
166 
167     if (!non_block_) {
168         tail_ = 0;
169     }
170 
171 ok:
172     if (!skip_ahead_[log_id]) {
173         return FilterResult::kWrite;
174     }
175     return FilterResult::kSkip;
176 }
177 
cleanSkip_Locked(void)178 void LogReaderThread::cleanSkip_Locked(void) {
179     memset(skip_ahead_, 0, sizeof(skip_ahead_));
180 }
181