1/*
2 * Copyright 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17template <typename T>
18Queue<T>::Queue(size_t capacity) : enqueue_(capacity), dequeue_(0){};
19
20template <typename T>
21Queue<T>::~Queue() {
22  ASSERT_LOG(enqueue_.handler_ == nullptr, "Enqueue is not unregistered");
23  ASSERT_LOG(dequeue_.handler_ == nullptr, "Dequeue is not unregistered");
24};
25
26template <typename T>
27void Queue<T>::RegisterEnqueue(Handler* handler, EnqueueCallback callback) {
28  std::lock_guard<std::mutex> lock(mutex_);
29  ASSERT(enqueue_.handler_ == nullptr);
30  ASSERT(enqueue_.reactable_ == nullptr);
31  enqueue_.handler_ = handler;
32  enqueue_.reactable_ = enqueue_.handler_->thread_->GetReactor()->Register(
33      enqueue_.reactive_semaphore_.GetFd(),
34      base::Bind(&Queue<T>::EnqueueCallbackInternal, base::Unretained(this), std::move(callback)),
35      base::Closure());
36}
37
38template <typename T>
39void Queue<T>::UnregisterEnqueue() {
40  Reactor* reactor = nullptr;
41  Reactor::Reactable* to_unregister = nullptr;
42  bool wait_for_unregister = false;
43  {
44    std::lock_guard<std::mutex> lock(mutex_);
45    ASSERT(enqueue_.reactable_ != nullptr);
46    reactor = enqueue_.handler_->thread_->GetReactor();
47    wait_for_unregister = (!enqueue_.handler_->thread_->IsSameThread());
48    to_unregister = enqueue_.reactable_;
49    enqueue_.reactable_ = nullptr;
50    enqueue_.handler_ = nullptr;
51  }
52  reactor->Unregister(to_unregister);
53  if (wait_for_unregister) {
54    reactor->WaitForUnregisteredReactable(std::chrono::milliseconds(1000));
55  }
56}
57
58template <typename T>
59void Queue<T>::RegisterDequeue(Handler* handler, DequeueCallback callback) {
60  std::lock_guard<std::mutex> lock(mutex_);
61  ASSERT(dequeue_.handler_ == nullptr);
62  ASSERT(dequeue_.reactable_ == nullptr);
63  dequeue_.handler_ = handler;
64  dequeue_.reactable_ = dequeue_.handler_->thread_->GetReactor()->Register(
65      dequeue_.reactive_semaphore_.GetFd(), callback, base::Closure());
66}
67
68template <typename T>
69void Queue<T>::UnregisterDequeue() {
70  Reactor* reactor = nullptr;
71  Reactor::Reactable* to_unregister = nullptr;
72  bool wait_for_unregister = false;
73  {
74    std::lock_guard<std::mutex> lock(mutex_);
75    ASSERT(dequeue_.reactable_ != nullptr);
76    reactor = dequeue_.handler_->thread_->GetReactor();
77    wait_for_unregister = (!dequeue_.handler_->thread_->IsSameThread());
78    to_unregister = dequeue_.reactable_;
79    dequeue_.reactable_ = nullptr;
80    dequeue_.handler_ = nullptr;
81  }
82  reactor->Unregister(to_unregister);
83  if (wait_for_unregister) {
84    reactor->WaitForUnregisteredReactable(std::chrono::milliseconds(1000));
85  }
86}
87
88template <typename T>
89std::unique_ptr<T> Queue<T>::TryDequeue() {
90  std::lock_guard<std::mutex> lock(mutex_);
91
92  if (queue_.empty()) {
93    return nullptr;
94  }
95
96  dequeue_.reactive_semaphore_.Decrease();
97
98  std::unique_ptr<T> data = std::move(queue_.front());
99  queue_.pop();
100
101  enqueue_.reactive_semaphore_.Increase();
102
103  return data;
104}
105
106template <typename T>
107void Queue<T>::EnqueueCallbackInternal(EnqueueCallback callback) {
108  std::unique_ptr<T> data = callback.Run();
109  ASSERT(data != nullptr);
110  std::lock_guard<std::mutex> lock(mutex_);
111  enqueue_.reactive_semaphore_.Decrease();
112  queue_.push(std::move(data));
113  dequeue_.reactive_semaphore_.Increase();
114}
115