1 /* 2 * Copyright (C) 2020 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #pragma once 18 19 #include <bitset> 20 #include <list> 21 #include <queue> 22 23 #include "LogBuffer.h" 24 #include "SerializedLogChunk.h" 25 #include "SerializedLogEntry.h" 26 27 struct LogPosition { 28 std::list<SerializedLogChunk>::iterator buffer_it; 29 int read_offset; 30 }; 31 32 struct MinHeapElement { MinHeapElementMinHeapElement33 MinHeapElement(log_id_t log_id, const SerializedLogEntry* entry) 34 : log_id(log_id), entry(entry) {} 35 log_id_t log_id; 36 const SerializedLogEntry* entry; 37 // The change of comparison operators is intentional, std::priority_queue uses operator<() to 38 // compare but creates a max heap. Since we want a min heap, we return the opposite result. 39 bool operator<(const MinHeapElement& rhs) const { 40 return entry->sequence() > rhs.entry->sequence(); 41 } 42 }; 43 44 // This class tracks the specific point where a FlushTo client has read through the logs. It 45 // directly references the std::list<> iterators from the parent SerializedLogBuffer and the offset 46 // into each log chunk where it has last read. All interactions with this class, except for its 47 // construction, must be done with SerializedLogBuffer::lock_ held. No log chunks that it 48 // references may be pruned, which is handled by ensuring prune does not touch any log chunk with 49 // highest sequence number greater or equal to start(). 50 class SerializedFlushToState : public FlushToState { 51 public: 52 // Initializes this state object. For each log buffer set in log_mask, this sets 53 // logs_needed_from_next_position_. 54 SerializedFlushToState(uint64_t start, LogMask log_mask); 55 56 // Decrease the reference of all referenced logs. This happens when a reader is disconnected. 57 ~SerializedFlushToState() override; 58 59 // We can't hold SerializedLogBuffer::lock_ in the constructor, so we must initialize logs here. InitializeLogs(std::list<SerializedLogChunk> * logs)60 void InitializeLogs(std::list<SerializedLogChunk>* logs) { 61 if (logs_ == nullptr) logs_ = logs; 62 } 63 HasUnreadLogs()64 bool HasUnreadLogs() { 65 CheckForNewLogs(); 66 return !min_heap_.empty(); 67 } 68 69 // Pops the next unread log from the min heap and sets logs_needed_from_next_position_ to 70 // indicate that we're waiting for more logs from the associated log buffer. 71 MinHeapElement PopNextUnreadLog(); 72 73 // If the parent log buffer prunes logs, the reference that this class contains may become 74 // invalid, so this must be called first to drop the reference to buffer_it, if any. 75 void Prune(log_id_t log_id, const std::list<SerializedLogChunk>::iterator& buffer_it); 76 77 private: 78 // If there is a log in the serialized log buffer for `log_id` at the read_offset, add it to the 79 // min heap for reading, otherwise set logs_needed_from_next_position_ to indicate that we're 80 // waiting for the next log. 81 void AddMinHeapEntry(log_id_t log_id); 82 83 // Create a LogPosition object for the given log_id by searching through the log chunks for the 84 // first chunk and then first log entry within that chunk that is greater or equal to start(). 85 void CreateLogPosition(log_id_t log_id); 86 87 // Checks to see if any log buffers set in logs_needed_from_next_position_ have new logs and 88 // calls AddMinHeapEntry() if so. 89 void CheckForNewLogs(); 90 91 std::list<SerializedLogChunk>* logs_ = nullptr; 92 // An optional structure that contains an iterator to the serialized log buffer and offset into 93 // it that this logger should handle next. 94 std::optional<LogPosition> log_positions_[LOG_ID_MAX]; 95 // A bit for each log that is set if a given log_id has no logs or if this client has read all 96 // of its logs. In order words: `logs_[i].empty() || (buffer_it == std::prev(logs_.end) && 97 // next_log_position == logs_write_position_)`. These will be re-checked in each 98 // loop in case new logs came in. 99 std::bitset<LOG_ID_MAX> logs_needed_from_next_position_ = {}; 100 // A min heap that has up to one entry per log buffer, sorted by sequence number, of the next 101 // element that this reader should read. 102 std::priority_queue<MinHeapElement> min_heap_; 103 }; 104