1 /* 2 * Copyright (C) 2016 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef android_hardware_automotive_vehicle_V2_0_ConcurrentQueue_H_ 18 #define android_hardware_automotive_vehicle_V2_0_ConcurrentQueue_H_ 19 20 #include <queue> 21 #include <atomic> 22 #include <thread> 23 #include <condition_variable> 24 #include <iostream> 25 26 namespace android { 27 28 template<typename T> 29 class ConcurrentQueue { 30 public: waitForItems()31 void waitForItems() { 32 std::unique_lock<std::mutex> g(mLock); 33 while (mQueue.empty() && mIsActive) { 34 mCond.wait(g); 35 } 36 } 37 flush()38 std::vector<T> flush() { 39 std::vector<T> items; 40 41 MuxGuard g(mLock); 42 if (mQueue.empty() || !mIsActive) { 43 return items; 44 } 45 while (!mQueue.empty()) { 46 items.push_back(std::move(mQueue.front())); 47 mQueue.pop(); 48 } 49 return items; 50 } 51 push(T && item)52 void push(T&& item) { 53 { 54 MuxGuard g(mLock); 55 if (!mIsActive) { 56 return; 57 } 58 mQueue.push(std::move(item)); 59 } 60 mCond.notify_one(); 61 } 62 63 /* Deactivates the queue, thus no one can push items to it, also 64 * notifies all waiting thread. 65 */ deactivate()66 void deactivate() { 67 { 68 MuxGuard g(mLock); 69 mIsActive = false; 70 } 71 mCond.notify_all(); // To unblock all waiting consumers. 72 } 73 74 ConcurrentQueue() = default; 75 76 ConcurrentQueue(const ConcurrentQueue &) = delete; 77 ConcurrentQueue &operator=(const ConcurrentQueue &) = delete; 78 private: 79 using MuxGuard = std::lock_guard<std::mutex>; 80 81 bool mIsActive = true; 82 mutable std::mutex mLock; 83 std::condition_variable mCond; 84 std::queue<T> mQueue; 85 }; 86 87 template<typename T> 88 class BatchingConsumer { 89 private: 90 enum class State { 91 INIT = 0, 92 RUNNING = 1, 93 STOP_REQUESTED = 2, 94 STOPPED = 3, 95 }; 96 97 public: BatchingConsumer()98 BatchingConsumer() : mState(State::INIT) {} 99 100 BatchingConsumer(const BatchingConsumer &) = delete; 101 BatchingConsumer &operator=(const BatchingConsumer &) = delete; 102 103 using OnBatchReceivedFunc = std::function<void(const std::vector<T>& vec)>; 104 run(ConcurrentQueue<T> * queue,std::chrono::nanoseconds batchInterval,const OnBatchReceivedFunc & func)105 void run(ConcurrentQueue<T>* queue, 106 std::chrono::nanoseconds batchInterval, 107 const OnBatchReceivedFunc& func) { 108 mQueue = queue; 109 mBatchInterval = batchInterval; 110 111 mWorkerThread = std::thread( 112 &BatchingConsumer<T>::runInternal, this, func); 113 } 114 requestStop()115 void requestStop() { 116 mState = State::STOP_REQUESTED; 117 } 118 waitStopped()119 void waitStopped() { 120 if (mWorkerThread.joinable()) { 121 mWorkerThread.join(); 122 } 123 } 124 125 private: runInternal(const OnBatchReceivedFunc & onBatchReceived)126 void runInternal(const OnBatchReceivedFunc& onBatchReceived) { 127 if (mState.exchange(State::RUNNING) == State::INIT) { 128 while (State::RUNNING == mState) { 129 mQueue->waitForItems(); 130 if (State::STOP_REQUESTED == mState) break; 131 132 std::this_thread::sleep_for(mBatchInterval); 133 if (State::STOP_REQUESTED == mState) break; 134 135 std::vector<T> items = mQueue->flush(); 136 137 if (items.size() > 0) { 138 onBatchReceived(items); 139 } 140 } 141 } 142 143 mState = State::STOPPED; 144 } 145 146 private: 147 std::thread mWorkerThread; 148 149 std::atomic<State> mState; 150 std::chrono::nanoseconds mBatchInterval; 151 ConcurrentQueue<T>* mQueue; 152 }; 153 154 } // namespace android 155 156 #endif //android_hardware_automotive_vehicle_V2_0_ConcurrentQueue_H_ 157