1 /* 2 * Copyright (C) 2012 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef SINGLE_STATE_QUEUE_H 18 #define SINGLE_STATE_QUEUE_H 19 20 // Non-blocking single element state queue, or 21 // Non-blocking single-reader / single-writer multi-word atomic load / store 22 23 #include <stdint.h> 24 #include <cutils/atomic.h> 25 26 namespace android { 27 28 template<typename T> class SingleStateQueue { 29 30 public: 31 32 class Mutator; 33 class Observer; 34 35 enum SSQ_STATUS { 36 SSQ_PENDING, /* = 0 */ 37 SSQ_READ, 38 SSQ_DONE, 39 }; 40 41 struct Shared { 42 // needs to be part of a union so don't define constructor or destructor 43 44 friend class Mutator; 45 friend class Observer; 46 47 private: initShared48 void init() { mAck = 0; mSequence = 0; } 49 50 volatile int32_t mAck; 51 volatile int32_t mSequence; 52 T mValue; 53 }; 54 55 class Mutator { 56 public: Mutator(Shared * shared)57 Mutator(Shared *shared) 58 : mSequence(0), mShared(shared) 59 { 60 // exactly one of Mutator and Observer must initialize, currently it is Observer 61 // shared->init(); 62 } 63 64 // push new value onto state queue, overwriting previous value; 65 // returns a sequence number which can be used with ack() push(const T & value)66 int32_t push(const T& value) 67 { 68 Shared *shared = mShared; 69 int32_t sequence = mSequence; 70 sequence++; 71 android_atomic_acquire_store(sequence, &shared->mSequence); 72 shared->mValue = value; 73 sequence++; 74 android_atomic_release_store(sequence, &shared->mSequence); 75 mSequence = sequence; 76 // consider signalling a futex here, if we know that observer is waiting 77 return sequence; 78 } 79 80 // returns the status of the last state push. This may be a stale value. 81 // 82 // SSQ_PENDING, or 0, means it has not been observed 83 // SSQ_READ means it has been read 84 // SSQ_DONE means it has been acted upon, after Observer::done() is called ack()85 enum SSQ_STATUS ack() const 86 { 87 // in the case of SSQ_DONE, prevent any subtle data-races of subsequent reads 88 // being performed (out-of-order) before the ack read, should the caller be 89 // depending on sequentiality of reads. 90 const int32_t ack = android_atomic_acquire_load(&mShared->mAck); 91 return ack - mSequence & ~1 ? SSQ_PENDING /* seq differ */ : 92 ack & 1 ? SSQ_DONE : SSQ_READ; 93 } 94 95 // return true if a push with specified sequence number or later has been observed ack(int32_t sequence)96 bool ack(int32_t sequence) const 97 { 98 // this relies on 2's complement rollover to detect an ancient sequence number 99 return mShared->mAck - sequence >= 0; 100 } 101 102 // returns the last value written (or the contents of the shared buffer after initialization 103 // if no value was written). last()104 T last() const 105 { // assume no sequence check required - we are the writer. 106 return mShared->mValue; 107 } 108 109 private: 110 int32_t mSequence; 111 Shared * const mShared; 112 }; 113 114 class Observer { 115 public: Observer(Shared * shared)116 Observer(Shared *shared) 117 : mSequence(0), mSeed(1), mShared(shared) 118 { 119 // exactly one of Mutator and Observer must initialize, currently it is Observer 120 shared->init(); 121 } 122 123 // return true if value has changed poll(T & value)124 bool poll(T& value) 125 { 126 Shared *shared = mShared; 127 int32_t before = shared->mSequence; 128 if (before == mSequence) { 129 return false; 130 } 131 for (int tries = 0; ; ) { 132 const int MAX_TRIES = 5; 133 if (before & 1) { 134 if (++tries >= MAX_TRIES) { 135 return false; 136 } 137 before = shared->mSequence; 138 } else { 139 android_memory_barrier(); 140 T temp = shared->mValue; 141 int32_t after = android_atomic_release_load(&shared->mSequence); 142 if (after == before) { 143 value = temp; 144 shared->mAck = before; 145 mSequence = before; // mSequence is even after poll success 146 return true; 147 } 148 if (++tries >= MAX_TRIES) { 149 return false; 150 } 151 before = after; 152 } 153 } 154 } 155 156 // (optional) used to indicate to the Mutator that the state that has been polled 157 // has also been acted upon. done()158 void done() 159 { 160 const int32_t ack = mShared->mAck + 1; 161 // ensure all previous writes have been performed. 162 android_atomic_release_store(ack, &mShared->mAck); // mSequence is odd after "done" 163 } 164 165 private: 166 int32_t mSequence; 167 int mSeed; // for PRNG 168 Shared * const mShared; 169 }; 170 171 #if 0 172 SingleStateQueue(void /*Shared*/ *shared); 173 /*virtual*/ ~SingleStateQueue() { } 174 175 static size_t size() { return sizeof(Shared); } 176 #endif 177 178 }; 179 180 } // namespace android 181 182 #endif // SINGLE_STATE_QUEUE_H 183