1 /******************************************************************************
2 *
3 * Copyright 2014 Google, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at:
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 ******************************************************************************/
18
19 #include <base/logging.h>
20 #include <string.h>
21
22 #include <mutex>
23
24 #include "osi/include/allocator.h"
25 #include "osi/include/fixed_queue.h"
26 #include "osi/include/list.h"
27 #include "osi/include/osi.h"
28 #include "osi/include/reactor.h"
29 #include "osi/include/semaphore.h"
30
31 typedef struct fixed_queue_t {
32 list_t* list;
33 semaphore_t* enqueue_sem;
34 semaphore_t* dequeue_sem;
35 std::mutex* mutex;
36 size_t capacity;
37
38 reactor_object_t* dequeue_object;
39 fixed_queue_cb dequeue_ready;
40 void* dequeue_context;
41 } fixed_queue_t;
42
43 static void internal_dequeue_ready(void* context);
44
fixed_queue_new(size_t capacity)45 fixed_queue_t* fixed_queue_new(size_t capacity) {
46 fixed_queue_t* ret =
47 static_cast<fixed_queue_t*>(osi_calloc(sizeof(fixed_queue_t)));
48
49 ret->mutex = new std::mutex;
50 ret->capacity = capacity;
51
52 ret->list = list_new(NULL);
53 if (!ret->list) goto error;
54
55 ret->enqueue_sem = semaphore_new(capacity);
56 if (!ret->enqueue_sem) goto error;
57
58 ret->dequeue_sem = semaphore_new(0);
59 if (!ret->dequeue_sem) goto error;
60
61 return ret;
62
63 error:
64 fixed_queue_free(ret, NULL);
65 return NULL;
66 }
67
fixed_queue_free(fixed_queue_t * queue,fixed_queue_free_cb free_cb)68 void fixed_queue_free(fixed_queue_t* queue, fixed_queue_free_cb free_cb) {
69 if (!queue) return;
70
71 fixed_queue_unregister_dequeue(queue);
72
73 if (free_cb)
74 for (const list_node_t* node = list_begin(queue->list);
75 node != list_end(queue->list); node = list_next(node))
76 free_cb(list_node(node));
77
78 list_free(queue->list);
79 semaphore_free(queue->enqueue_sem);
80 semaphore_free(queue->dequeue_sem);
81 delete queue->mutex;
82 osi_free(queue);
83 }
84
fixed_queue_flush(fixed_queue_t * queue,fixed_queue_free_cb free_cb)85 void fixed_queue_flush(fixed_queue_t* queue, fixed_queue_free_cb free_cb) {
86 if (!queue) return;
87
88 while (!fixed_queue_is_empty(queue)) {
89 void* data = fixed_queue_try_dequeue(queue);
90 if (free_cb != NULL) {
91 free_cb(data);
92 }
93 }
94 }
95
fixed_queue_is_empty(fixed_queue_t * queue)96 bool fixed_queue_is_empty(fixed_queue_t* queue) {
97 if (queue == NULL) return true;
98
99 std::lock_guard<std::mutex> lock(*queue->mutex);
100 return list_is_empty(queue->list);
101 }
102
fixed_queue_length(fixed_queue_t * queue)103 size_t fixed_queue_length(fixed_queue_t* queue) {
104 if (queue == NULL) return 0;
105
106 std::lock_guard<std::mutex> lock(*queue->mutex);
107 return list_length(queue->list);
108 }
109
fixed_queue_capacity(fixed_queue_t * queue)110 size_t fixed_queue_capacity(fixed_queue_t* queue) {
111 CHECK(queue != NULL);
112
113 return queue->capacity;
114 }
115
fixed_queue_enqueue(fixed_queue_t * queue,void * data)116 void fixed_queue_enqueue(fixed_queue_t* queue, void* data) {
117 CHECK(queue != NULL);
118 CHECK(data != NULL);
119
120 semaphore_wait(queue->enqueue_sem);
121
122 {
123 std::lock_guard<std::mutex> lock(*queue->mutex);
124 list_append(queue->list, data);
125 }
126
127 semaphore_post(queue->dequeue_sem);
128 }
129
fixed_queue_dequeue(fixed_queue_t * queue)130 void* fixed_queue_dequeue(fixed_queue_t* queue) {
131 CHECK(queue != NULL);
132
133 semaphore_wait(queue->dequeue_sem);
134
135 void* ret = NULL;
136 {
137 std::lock_guard<std::mutex> lock(*queue->mutex);
138 ret = list_front(queue->list);
139 list_remove(queue->list, ret);
140 }
141
142 semaphore_post(queue->enqueue_sem);
143
144 return ret;
145 }
146
fixed_queue_try_enqueue(fixed_queue_t * queue,void * data)147 bool fixed_queue_try_enqueue(fixed_queue_t* queue, void* data) {
148 CHECK(queue != NULL);
149 CHECK(data != NULL);
150
151 if (!semaphore_try_wait(queue->enqueue_sem)) return false;
152
153 {
154 std::lock_guard<std::mutex> lock(*queue->mutex);
155 list_append(queue->list, data);
156 }
157
158 semaphore_post(queue->dequeue_sem);
159 return true;
160 }
161
fixed_queue_try_dequeue(fixed_queue_t * queue)162 void* fixed_queue_try_dequeue(fixed_queue_t* queue) {
163 if (queue == NULL) return NULL;
164
165 if (!semaphore_try_wait(queue->dequeue_sem)) return NULL;
166
167 void* ret = NULL;
168 {
169 std::lock_guard<std::mutex> lock(*queue->mutex);
170 ret = list_front(queue->list);
171 list_remove(queue->list, ret);
172 }
173
174 semaphore_post(queue->enqueue_sem);
175
176 return ret;
177 }
178
fixed_queue_try_peek_first(fixed_queue_t * queue)179 void* fixed_queue_try_peek_first(fixed_queue_t* queue) {
180 if (queue == NULL) return NULL;
181
182 std::lock_guard<std::mutex> lock(*queue->mutex);
183 return list_is_empty(queue->list) ? NULL : list_front(queue->list);
184 }
185
fixed_queue_try_peek_last(fixed_queue_t * queue)186 void* fixed_queue_try_peek_last(fixed_queue_t* queue) {
187 if (queue == NULL) return NULL;
188
189 std::lock_guard<std::mutex> lock(*queue->mutex);
190 return list_is_empty(queue->list) ? NULL : list_back(queue->list);
191 }
192
fixed_queue_try_remove_from_queue(fixed_queue_t * queue,void * data)193 void* fixed_queue_try_remove_from_queue(fixed_queue_t* queue, void* data) {
194 if (queue == NULL) return NULL;
195
196 bool removed = false;
197 {
198 std::lock_guard<std::mutex> lock(*queue->mutex);
199 if (list_contains(queue->list, data) &&
200 semaphore_try_wait(queue->dequeue_sem)) {
201 removed = list_remove(queue->list, data);
202 CHECK(removed);
203 }
204 }
205
206 if (removed) {
207 semaphore_post(queue->enqueue_sem);
208 return data;
209 }
210 return NULL;
211 }
212
fixed_queue_get_list(fixed_queue_t * queue)213 list_t* fixed_queue_get_list(fixed_queue_t* queue) {
214 CHECK(queue != NULL);
215
216 // NOTE: Using the list in this way is not thread-safe.
217 // Using this list in any context where threads can call other functions
218 // to the queue can break our assumptions and the queue in general.
219 return queue->list;
220 }
221
fixed_queue_get_dequeue_fd(const fixed_queue_t * queue)222 int fixed_queue_get_dequeue_fd(const fixed_queue_t* queue) {
223 CHECK(queue != NULL);
224 return semaphore_get_fd(queue->dequeue_sem);
225 }
226
fixed_queue_get_enqueue_fd(const fixed_queue_t * queue)227 int fixed_queue_get_enqueue_fd(const fixed_queue_t* queue) {
228 CHECK(queue != NULL);
229 return semaphore_get_fd(queue->enqueue_sem);
230 }
231
fixed_queue_register_dequeue(fixed_queue_t * queue,reactor_t * reactor,fixed_queue_cb ready_cb,void * context)232 void fixed_queue_register_dequeue(fixed_queue_t* queue, reactor_t* reactor,
233 fixed_queue_cb ready_cb, void* context) {
234 CHECK(queue != NULL);
235 CHECK(reactor != NULL);
236 CHECK(ready_cb != NULL);
237
238 // Make sure we're not already registered
239 fixed_queue_unregister_dequeue(queue);
240
241 queue->dequeue_ready = ready_cb;
242 queue->dequeue_context = context;
243 queue->dequeue_object =
244 reactor_register(reactor, fixed_queue_get_dequeue_fd(queue), queue,
245 internal_dequeue_ready, NULL);
246 }
247
fixed_queue_unregister_dequeue(fixed_queue_t * queue)248 void fixed_queue_unregister_dequeue(fixed_queue_t* queue) {
249 CHECK(queue != NULL);
250
251 if (queue->dequeue_object) {
252 reactor_unregister(queue->dequeue_object);
253 queue->dequeue_object = NULL;
254 }
255 }
256
internal_dequeue_ready(void * context)257 static void internal_dequeue_ready(void* context) {
258 CHECK(context != NULL);
259
260 fixed_queue_t* queue = static_cast<fixed_queue_t*>(context);
261 queue->dequeue_ready(queue, queue->dequeue_context);
262 }
263