1 /*
2  * Copyright (C) 2016 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef android_hardware_automotive_vehicle_V2_0_ConcurrentQueue_H_
18 #define android_hardware_automotive_vehicle_V2_0_ConcurrentQueue_H_
19 
20 #include <queue>
21 #include <atomic>
22 #include <thread>
23 #include <condition_variable>
24 #include <iostream>
25 
26 namespace android {
27 
28 template<typename T>
29 class ConcurrentQueue {
30 public:
waitForItems()31     void waitForItems() {
32         std::unique_lock<std::mutex> g(mLock);
33         while (mQueue.empty() && mIsActive) {
34             mCond.wait(g);
35         }
36     }
37 
flush()38     std::vector<T> flush() {
39         std::vector<T> items;
40 
41         MuxGuard g(mLock);
42         if (mQueue.empty() || !mIsActive) {
43             return items;
44         }
45         while (!mQueue.empty()) {
46             items.push_back(std::move(mQueue.front()));
47             mQueue.pop();
48         }
49         return items;
50     }
51 
push(T && item)52     void push(T&& item) {
53         {
54             MuxGuard g(mLock);
55             if (!mIsActive) {
56                 return;
57             }
58             mQueue.push(std::move(item));
59         }
60         mCond.notify_one();
61     }
62 
63     /* Deactivates the queue, thus no one can push items to it, also
64      * notifies all waiting thread.
65      */
deactivate()66     void deactivate() {
67         {
68             MuxGuard g(mLock);
69             mIsActive = false;
70         }
71         mCond.notify_all();  // To unblock all waiting consumers.
72     }
73 
74     ConcurrentQueue() = default;
75 
76     ConcurrentQueue(const ConcurrentQueue &) = delete;
77     ConcurrentQueue &operator=(const ConcurrentQueue &) = delete;
78 private:
79     using MuxGuard = std::lock_guard<std::mutex>;
80 
81     bool mIsActive = true;
82     mutable std::mutex mLock;
83     std::condition_variable mCond;
84     std::queue<T> mQueue;
85 };
86 
87 template<typename T>
88 class BatchingConsumer {
89 private:
90     enum class State {
91         INIT = 0,
92         RUNNING = 1,
93         STOP_REQUESTED = 2,
94         STOPPED = 3,
95     };
96 
97 public:
BatchingConsumer()98     BatchingConsumer() : mState(State::INIT) {}
99 
100     BatchingConsumer(const BatchingConsumer &) = delete;
101     BatchingConsumer &operator=(const BatchingConsumer &) = delete;
102 
103     using OnBatchReceivedFunc = std::function<void(const std::vector<T>& vec)>;
104 
run(ConcurrentQueue<T> * queue,std::chrono::nanoseconds batchInterval,const OnBatchReceivedFunc & func)105     void run(ConcurrentQueue<T>* queue,
106              std::chrono::nanoseconds batchInterval,
107              const OnBatchReceivedFunc& func) {
108         mQueue = queue;
109         mBatchInterval = batchInterval;
110 
111         mWorkerThread = std::thread(
112             &BatchingConsumer<T>::runInternal, this, func);
113     }
114 
requestStop()115     void requestStop() {
116         mState = State::STOP_REQUESTED;
117     }
118 
waitStopped()119     void waitStopped() {
120         if (mWorkerThread.joinable()) {
121             mWorkerThread.join();
122         }
123     }
124 
125 private:
runInternal(const OnBatchReceivedFunc & onBatchReceived)126     void runInternal(const OnBatchReceivedFunc& onBatchReceived) {
127         if (mState.exchange(State::RUNNING) == State::INIT) {
128             while (State::RUNNING == mState) {
129                 mQueue->waitForItems();
130                 if (State::STOP_REQUESTED == mState) break;
131 
132                 std::this_thread::sleep_for(mBatchInterval);
133                 if (State::STOP_REQUESTED == mState) break;
134 
135                 std::vector<T> items = mQueue->flush();
136 
137                 if (items.size() > 0) {
138                     onBatchReceived(items);
139                 }
140             }
141         }
142 
143         mState = State::STOPPED;
144     }
145 
146 private:
147     std::thread mWorkerThread;
148 
149     std::atomic<State> mState;
150     std::chrono::nanoseconds mBatchInterval;
151     ConcurrentQueue<T>* mQueue;
152 };
153 
154 }  // namespace android
155 
156 #endif //android_hardware_automotive_vehicle_V2_0_ConcurrentQueue_H_
157