1 /*
2  * Copyright (C) 2012 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef SINGLE_STATE_QUEUE_H
18 #define SINGLE_STATE_QUEUE_H
19 
20 // Non-blocking single element state queue, or
21 // Non-blocking single-reader / single-writer multi-word atomic load / store
22 
23 #include <stdint.h>
24 #include <cutils/atomic.h>
25 
26 namespace android {
27 
28 template<typename T> class SingleStateQueue {
29 
30 public:
31 
32     class Mutator;
33     class Observer;
34 
35     enum SSQ_STATUS {
36         SSQ_PENDING, /* = 0 */
37         SSQ_READ,
38         SSQ_DONE,
39     };
40 
41     struct Shared {
42         // needs to be part of a union so don't define constructor or destructor
43 
44         friend class Mutator;
45         friend class Observer;
46 
47 private:
initShared48         void                init() { mAck = 0; mSequence = 0; }
49 
50         volatile int32_t    mAck;
51         volatile int32_t    mSequence;
52         T                   mValue;
53     };
54 
55     class Mutator {
56     public:
Mutator(Shared * shared)57         Mutator(Shared *shared)
58             : mSequence(0), mShared(shared)
59         {
60             // exactly one of Mutator and Observer must initialize, currently it is Observer
61             // shared->init();
62         }
63 
64         // push new value onto state queue, overwriting previous value;
65         // returns a sequence number which can be used with ack()
push(const T & value)66         int32_t push(const T& value)
67         {
68             Shared *shared = mShared;
69             int32_t sequence = mSequence;
70             sequence++;
71             android_atomic_acquire_store(sequence, &shared->mSequence);
72             shared->mValue = value;
73             sequence++;
74             android_atomic_release_store(sequence, &shared->mSequence);
75             mSequence = sequence;
76             // consider signalling a futex here, if we know that observer is waiting
77             return sequence;
78         }
79 
80         // returns the status of the last state push.  This may be a stale value.
81         //
82         // SSQ_PENDING, or 0, means it has not been observed
83         // SSQ_READ means it has been read
84         // SSQ_DONE means it has been acted upon, after Observer::done() is called
ack()85         enum SSQ_STATUS ack() const
86         {
87             // in the case of SSQ_DONE, prevent any subtle data-races of subsequent reads
88             // being performed (out-of-order) before the ack read, should the caller be
89             // depending on sequentiality of reads.
90             const int32_t ack = android_atomic_acquire_load(&mShared->mAck);
91             return ack - mSequence & ~1 ? SSQ_PENDING /* seq differ */ :
92                     ack & 1 ? SSQ_DONE : SSQ_READ;
93         }
94 
95         // return true if a push with specified sequence number or later has been observed
ack(int32_t sequence)96         bool ack(int32_t sequence) const
97         {
98             // this relies on 2's complement rollover to detect an ancient sequence number
99             return mShared->mAck - sequence >= 0;
100         }
101 
102         // returns the last value written (or the contents of the shared buffer after initialization
103         // if no value was written).
last()104         T last() const
105         {   // assume no sequence check required - we are the writer.
106             return mShared->mValue;
107         }
108 
109     private:
110         int32_t     mSequence;
111         Shared * const mShared;
112     };
113 
114     class Observer {
115     public:
Observer(Shared * shared)116         Observer(Shared *shared)
117             : mSequence(0), mSeed(1), mShared(shared)
118         {
119             // exactly one of Mutator and Observer must initialize, currently it is Observer
120             shared->init();
121         }
122 
123         // return true if value has changed
poll(T & value)124         bool poll(T& value)
125         {
126             Shared *shared = mShared;
127             int32_t before = shared->mSequence;
128             if (before == mSequence) {
129                 return false;
130             }
131             for (int tries = 0; ; ) {
132                 const int MAX_TRIES = 5;
133                 if (before & 1) {
134                     if (++tries >= MAX_TRIES) {
135                         return false;
136                     }
137                     before = shared->mSequence;
138                 } else {
139                     android_memory_barrier();
140                     T temp = shared->mValue;
141                     int32_t after = android_atomic_release_load(&shared->mSequence);
142                     if (after == before) {
143                         value = temp;
144                         shared->mAck = before;
145                         mSequence = before; // mSequence is even after poll success
146                         return true;
147                     }
148                     if (++tries >= MAX_TRIES) {
149                         return false;
150                     }
151                     before = after;
152                 }
153             }
154         }
155 
156         // (optional) used to indicate to the Mutator that the state that has been polled
157         // has also been acted upon.
done()158         void done()
159         {
160             const int32_t ack = mShared->mAck + 1;
161             // ensure all previous writes have been performed.
162             android_atomic_release_store(ack, &mShared->mAck); // mSequence is odd after "done"
163         }
164 
165     private:
166         int32_t     mSequence;
167         int         mSeed;  // for PRNG
168         Shared * const mShared;
169     };
170 
171 #if 0
172     SingleStateQueue(void /*Shared*/ *shared);
173     /*virtual*/ ~SingleStateQueue() { }
174 
175     static size_t size() { return sizeof(Shared); }
176 #endif
177 
178 };
179 
180 }   // namespace android
181 
182 #endif  // SINGLE_STATE_QUEUE_H
183