1 // Copyright 2014 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #pragma once
16 
17 #include "android/base/Optional.h"
18 #include "android/base/synchronization/AndroidConditionVariable.h"
19 #include "android/base/synchronization/AndroidLock.h"
20 
21 #include <utility>
22 #include <stddef.h>
23 
24 namespace android {
25 namespace base {
26 namespace guest {
27 
28 // Base non-templated class used to reduce the amount of template
29 // specialization.
30 class MessageChannelBase {
31 public:
32     // Get the current channel size
33     size_t size() const;
34 
35     // Abort the currently pending operations and don't allow any other ones
36     void stop();
37 
38     // Check if the channel is stopped.
39     bool isStopped() const;
40 
41     // Block until the channel has no pending messages.
42     void waitForEmpty();
43 
44 protected:
45     // Constructor. |capacity| is the buffer capacity in messages.
46     MessageChannelBase(size_t capacity);
47 
48     // Destructor.
49     ~MessageChannelBase() = default;
50 
51     // Call this method in the sender thread before writing a new message.
52     // This returns the position of the available slot in the message array
53     // where to copy the new fixed-size message. After the copy, call
54     // afterWrite().
55     // If the channel is stopped, return value is undefined.
56     size_t beforeWrite();
57 
58     // Same as beforeWrite(), but returns an empty optional if there was
59     // no room to write to instead of waiting for it.
60     // One still needs to call afterWrite() anyway.
61     Optional<size_t> beforeTryWrite();
62 
63     // To be called after trying to write a new fixed-size message (which should
64     // happen after beforeWrite() or beforeTryWrite()).
65     // |success| must be true to indicate that a new item was added to the
66     // channel, or false otherwise (i.e. if the channel is stopped, or if
67     // beforeTryWrite() returned an empty optional).
68     void afterWrite(bool success);
69 
70     // Call this method in the receiver thread before reading a new message.
71     // This returns the position in the message array where the new message
72     // can be read. Caller must process the message, then call afterRead().
73     // If the channel is stopped, return value is undefined.
74     size_t beforeRead();
75 
76     // Same as beforeRead(), but returns an empty optional if there was
77     // no data to read instead of waiting for it.
78     // One still needs to call afterWrite() anyway.
79     Optional<size_t> beforeTryRead();
80 
81     // Same as beforeRead(), but returns an empty optional if no data arrived
82     // by the |wallTimeUs| absolute time. One still needs to call
83     // afterWrite() anyway.
84     Optional<size_t> beforeTimedRead(uint64_t wallTimeUs);
85 
86     // To be called after reading a fixed-size message from the channel (which
87     // must happen after beforeRead() or beforeTryRead()).
88     // |success| must be true to indicate that a message was read, or false
89     // otherwise (i.e. if the channel is stopped or if beforeTryRead() returned
90     // an empty optional).
91     void afterRead(bool success);
92 
93     // A version of isStopped() that doesn't lock the channel but expects it
94     // to be locked by the caller.
isStoppedLocked()95     bool isStoppedLocked() const { return mStopped; }
96 
97 private:
98     size_t mPos = 0;
99     size_t mCapacity;
100     size_t mCount = 0;
101     bool mStopped = false;
102     mutable Lock mLock;     // Mutable to allow const members to lock it.
103     ConditionVariable mCanRead;
104     ConditionVariable mCanWrite;
105 };
106 
107 // Helper class used to implement an uni-directional IPC channel between
108 // two threads. The channel can be used to send fixed-size messages of type
109 // |T|, with an internal buffer size of |CAPACITY| items. All calls are
110 // blocking.
111 //
112 // Usage is pretty straightforward:
113 //
114 //   - From the sender thread, call send(msg);
115 //   - From the receiver thread, call receive(&msg);
116 //   - If you want to stop the IPC, call stop();
117 template <typename T, size_t CAPACITY>
118 class MessageChannel : public MessageChannelBase {
119 public:
MessageChannel()120     MessageChannel() : MessageChannelBase(CAPACITY) {}
121 
send(const T & msg)122     bool send(const T& msg) {
123         const size_t pos = beforeWrite();
124         const bool res = !isStoppedLocked();
125         if (res) {
126             mItems[pos] = msg;
127         }
128         afterWrite(res);
129         return res;
130     }
131 
send(T && msg)132     bool send(T&& msg) {
133         const size_t pos = beforeWrite();
134         const bool res = !isStoppedLocked();
135         if (res) {
136             mItems[pos] = std::move(msg);
137         }
138         afterWrite(res);
139         return res;
140     }
141 
trySend(const T & msg)142     bool trySend(const T& msg) {
143         const auto pos = beforeTryWrite();
144         if (pos) {
145             mItems[*pos] = msg;
146         }
147         afterWrite(pos);
148         return pos;
149     }
150 
trySend(T && msg)151     bool trySend(T&& msg) {
152         const auto pos = beforeTryWrite();
153         if (pos) {
154             mItems[*pos] = std::move(msg);
155         }
156         afterWrite(pos);
157         return pos;
158     }
159 
receive(T * msg)160     bool receive(T* msg) {
161         const size_t pos = beforeRead();
162         const bool res = !isStoppedLocked();
163         if (res) {
164             *msg = std::move(mItems[pos]);
165         }
166         afterRead(res);
167         return res;
168     }
169 
receive()170     Optional<T> receive() {
171         const size_t pos = beforeRead();
172         if (!isStoppedLocked()) {
173             Optional<T> msg(std::move(mItems[pos]));
174             afterRead(true);
175             return msg;
176         } else {
177             afterRead(false);
178             return {};
179         }
180     }
181 
tryReceive(T * msg)182     bool tryReceive(T* msg) {
183         const auto pos = beforeTryRead();
184         if (pos) {
185             *msg = std::move(mItems[*pos]);
186         }
187         afterRead(pos);
188         return pos;
189     }
190 
timedReceive(uint64_t wallTimeUs)191     Optional<T> timedReceive(uint64_t wallTimeUs) {
192         const auto pos = beforeTimedRead(wallTimeUs);
193         if (pos && !isStoppedLocked()) {
194             Optional<T> res(std::move(mItems[*pos]));
195             afterRead(true);
196             return res;
197         }
198         afterRead(false);
199         return {};
200     }
201 
capacity()202     constexpr size_t capacity() const { return CAPACITY; }
203 
204 private:
205     T mItems[CAPACITY];
206 };
207 
208 }  // namespace guest
209 }  // namespace base
210 }  // namespace android
211