1 /*
2  * Copyright (C) 2020 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <bitset>
20 #include <list>
21 #include <queue>
22 
23 #include "LogBuffer.h"
24 #include "SerializedLogChunk.h"
25 #include "SerializedLogEntry.h"
26 
27 struct LogPosition {
28     std::list<SerializedLogChunk>::iterator buffer_it;
29     int read_offset;
30 };
31 
32 struct MinHeapElement {
MinHeapElementMinHeapElement33     MinHeapElement(log_id_t log_id, const SerializedLogEntry* entry)
34         : log_id(log_id), entry(entry) {}
35     log_id_t log_id;
36     const SerializedLogEntry* entry;
37     // The change of comparison operators is intentional, std::priority_queue uses operator<() to
38     // compare but creates a max heap.  Since we want a min heap, we return the opposite result.
39     bool operator<(const MinHeapElement& rhs) const {
40         return entry->sequence() > rhs.entry->sequence();
41     }
42 };
43 
44 // This class tracks the specific point where a FlushTo client has read through the logs.  It
45 // directly references the std::list<> iterators from the parent SerializedLogBuffer and the offset
46 // into each log chunk where it has last read.  All interactions with this class, except for its
47 // construction, must be done with SerializedLogBuffer::lock_ held.  No log chunks that it
48 // references may be pruned, which is handled by ensuring prune does not touch any log chunk with
49 // highest sequence number greater or equal to start().
50 class SerializedFlushToState : public FlushToState {
51   public:
52     // Initializes this state object.  For each log buffer set in log_mask, this sets
53     // logs_needed_from_next_position_.
54     SerializedFlushToState(uint64_t start, LogMask log_mask);
55 
56     // Decrease the reference of all referenced logs.  This happens when a reader is disconnected.
57     ~SerializedFlushToState() override;
58 
59     // We can't hold SerializedLogBuffer::lock_ in the constructor, so we must initialize logs here.
InitializeLogs(std::list<SerializedLogChunk> * logs)60     void InitializeLogs(std::list<SerializedLogChunk>* logs) {
61         if (logs_ == nullptr) logs_ = logs;
62     }
63 
HasUnreadLogs()64     bool HasUnreadLogs() {
65         CheckForNewLogs();
66         return !min_heap_.empty();
67     }
68 
69     // Pops the next unread log from the min heap and sets logs_needed_from_next_position_ to
70     // indicate that we're waiting for more logs from the associated log buffer.
71     MinHeapElement PopNextUnreadLog();
72 
73     // If the parent log buffer prunes logs, the reference that this class contains may become
74     // invalid, so this must be called first to drop the reference to buffer_it, if any.
75     void Prune(log_id_t log_id, const std::list<SerializedLogChunk>::iterator& buffer_it);
76 
77   private:
78     // If there is a log in the serialized log buffer for `log_id` at the read_offset, add it to the
79     // min heap for reading, otherwise set logs_needed_from_next_position_ to indicate that we're
80     // waiting for the next log.
81     void AddMinHeapEntry(log_id_t log_id);
82 
83     // Create a LogPosition object for the given log_id by searching through the log chunks for the
84     // first chunk and then first log entry within that chunk that is greater or equal to start().
85     void CreateLogPosition(log_id_t log_id);
86 
87     // Checks to see if any log buffers set in logs_needed_from_next_position_ have new logs and
88     // calls AddMinHeapEntry() if so.
89     void CheckForNewLogs();
90 
91     std::list<SerializedLogChunk>* logs_ = nullptr;
92     // An optional structure that contains an iterator to the serialized log buffer and offset into
93     // it that this logger should handle next.
94     std::optional<LogPosition> log_positions_[LOG_ID_MAX];
95     // A bit for each log that is set if a given log_id has no logs or if this client has read all
96     // of its logs. In order words: `logs_[i].empty() || (buffer_it == std::prev(logs_.end) &&
97     // next_log_position == logs_write_position_)`.  These will be re-checked in each
98     // loop in case new logs came in.
99     std::bitset<LOG_ID_MAX> logs_needed_from_next_position_ = {};
100     // A min heap that has up to one entry per log buffer, sorted by sequence number, of the next
101     // element that this reader should read.
102     std::priority_queue<MinHeapElement> min_heap_;
103 };
104