1 /*
2 * Copyright 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "os/reactor.h"
18
19 #include <fcntl.h>
20 #include <sys/epoll.h>
21 #include <sys/eventfd.h>
22 #include <unistd.h>
23
24 #include <algorithm>
25 #include <cerrno>
26 #include <cinttypes>
27 #include <cstring>
28
29 #include "os/log.h"
30
31 namespace {
32
33 // Use at most sizeof(epoll_event) * kEpollMaxEvents kernel memory
34 constexpr int kEpollMaxEvents = 64;
35 constexpr uint64_t kStopReactor = 1 << 0;
36 constexpr uint64_t kWaitForIdle = 1 << 1;
37
38 } // namespace
39
40 namespace bluetooth {
41 namespace os {
42 using common::Closure;
43
44 class Reactor::Reactable {
45 public:
Reactable(int fd,Closure on_read_ready,Closure on_write_ready)46 Reactable(int fd, Closure on_read_ready, Closure on_write_ready)
47 : fd_(fd),
48 on_read_ready_(std::move(on_read_ready)),
49 on_write_ready_(std::move(on_write_ready)),
50 is_executing_(false),
51 removed_(false) {}
52 const int fd_;
53 Closure on_read_ready_;
54 Closure on_write_ready_;
55 bool is_executing_;
56 bool removed_;
57 std::mutex mutex_;
58 std::unique_ptr<std::promise<void>> finished_promise_;
59 };
60
Reactor()61 Reactor::Reactor() : epoll_fd_(0), control_fd_(0), is_running_(false) {
62 RUN_NO_INTR(epoll_fd_ = epoll_create1(EPOLL_CLOEXEC));
63 ASSERT_LOG(epoll_fd_ != -1, "could not create epoll fd: %s", strerror(errno));
64
65 control_fd_ = eventfd(0, EFD_NONBLOCK);
66 ASSERT(control_fd_ != -1);
67
68 epoll_event control_epoll_event = {EPOLLIN, {.ptr = nullptr}};
69 int result;
70 RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, control_fd_, &control_epoll_event));
71 ASSERT(result != -1);
72 }
73
~Reactor()74 Reactor::~Reactor() {
75 int result;
76 RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, control_fd_, nullptr));
77 ASSERT(result != -1);
78
79 RUN_NO_INTR(result = close(control_fd_));
80 ASSERT(result != -1);
81
82 RUN_NO_INTR(result = close(epoll_fd_));
83 ASSERT(result != -1);
84 }
85
Run()86 void Reactor::Run() {
87 bool already_running = is_running_.exchange(true);
88 ASSERT(!already_running);
89
90 int timeout_ms = -1;
91 bool waiting_for_idle = false;
92 for (;;) {
93 {
94 std::unique_lock<std::mutex> lock(mutex_);
95 invalidation_list_.clear();
96 }
97 epoll_event events[kEpollMaxEvents];
98 int count;
99 RUN_NO_INTR(count = epoll_wait(epoll_fd_, events, kEpollMaxEvents, timeout_ms));
100 ASSERT(count != -1);
101 if (waiting_for_idle && count == 0) {
102 timeout_ms = -1;
103 waiting_for_idle = false;
104 idle_promise_->set_value();
105 idle_promise_ = nullptr;
106 }
107
108 for (int i = 0; i < count; ++i) {
109 auto event = events[i];
110 ASSERT(event.events != 0u);
111
112 // If the ptr stored in epoll_event.data is nullptr, it means the control fd triggered
113 if (event.data.ptr == nullptr) {
114 uint64_t value;
115 eventfd_read(control_fd_, &value);
116 if ((value & kStopReactor) != 0) {
117 is_running_ = false;
118 return;
119 } else if ((value & kWaitForIdle) != 0) {
120 timeout_ms = 30;
121 waiting_for_idle = true;
122 continue;
123 } else {
124 LOG_ERROR("Unknown control_fd value %" PRIu64 "x", value);
125 continue;
126 }
127 }
128 auto* reactable = static_cast<Reactor::Reactable*>(event.data.ptr);
129 std::unique_lock<std::mutex> lock(mutex_);
130 executing_reactable_finished_ = nullptr;
131 // See if this reactable has been removed in the meantime.
132 if (std::find(invalidation_list_.begin(), invalidation_list_.end(), reactable) != invalidation_list_.end()) {
133 continue;
134 }
135
136 {
137 std::lock_guard<std::mutex> reactable_lock(reactable->mutex_);
138 lock.unlock();
139 reactable->is_executing_ = true;
140 }
141 if (event.events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR) && !reactable->on_read_ready_.is_null()) {
142 reactable->on_read_ready_.Run();
143 }
144 if (event.events & EPOLLOUT && !reactable->on_write_ready_.is_null()) {
145 reactable->on_write_ready_.Run();
146 }
147 {
148 std::lock_guard<std::mutex> reactable_lock(reactable->mutex_);
149 reactable->is_executing_ = false;
150 if (reactable->removed_) {
151 reactable->finished_promise_->set_value();
152 delete reactable;
153 }
154 }
155 }
156 }
157 }
158
Stop()159 void Reactor::Stop() {
160 if (!is_running_) {
161 LOG_WARN("not running, will stop once it's started");
162 }
163 auto control = eventfd_write(control_fd_, kStopReactor);
164 ASSERT(control != -1);
165 }
166
Register(int fd,Closure on_read_ready,Closure on_write_ready)167 Reactor::Reactable* Reactor::Register(int fd, Closure on_read_ready, Closure on_write_ready) {
168 uint32_t poll_event_type = 0;
169 if (!on_read_ready.is_null()) {
170 poll_event_type |= (EPOLLIN | EPOLLRDHUP);
171 }
172 if (!on_write_ready.is_null()) {
173 poll_event_type |= EPOLLOUT;
174 }
175 auto* reactable = new Reactable(fd, on_read_ready, on_write_ready);
176 epoll_event event = {
177 .events = poll_event_type,
178 .data = {.ptr = reactable},
179 };
180 int register_fd;
181 RUN_NO_INTR(register_fd = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event));
182 ASSERT(register_fd != -1);
183 return reactable;
184 }
185
Unregister(Reactor::Reactable * reactable)186 void Reactor::Unregister(Reactor::Reactable* reactable) {
187 ASSERT(reactable != nullptr);
188 {
189 std::lock_guard<std::mutex> lock(mutex_);
190 invalidation_list_.push_back(reactable);
191 }
192 bool delaying_delete_until_callback_finished = false;
193 {
194 int result;
195 std::lock_guard<std::mutex> reactable_lock(reactable->mutex_);
196 RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, reactable->fd_, nullptr));
197 if (result == -1 && errno == ENOENT) {
198 LOG_INFO("reactable is invalid or unregistered");
199 } else {
200 ASSERT(result != -1);
201 }
202
203 // If we are unregistering during the callback event from this reactable, we delete it after the callback is
204 // executed. reactable->is_executing_ is protected by reactable->mutex_, so it's thread safe.
205 if (reactable->is_executing_) {
206 reactable->removed_ = true;
207 reactable->finished_promise_ = std::make_unique<std::promise<void>>();
208 executing_reactable_finished_ = std::make_shared<std::future<void>>(reactable->finished_promise_->get_future());
209 delaying_delete_until_callback_finished = true;
210 }
211 }
212 // If we are unregistering outside of the callback event from this reactable, we delete it now
213 if (!delaying_delete_until_callback_finished) {
214 delete reactable;
215 }
216 }
217
WaitForUnregisteredReactable(std::chrono::milliseconds timeout)218 bool Reactor::WaitForUnregisteredReactable(std::chrono::milliseconds timeout) {
219 std::lock_guard<std::mutex> lock(mutex_);
220 if (executing_reactable_finished_ == nullptr) {
221 return true;
222 }
223 auto stop_status = executing_reactable_finished_->wait_for(timeout);
224 if (stop_status != std::future_status::ready) {
225 LOG_ERROR("Unregister reactable timed out");
226 }
227 return stop_status == std::future_status::ready;
228 }
229
WaitForIdle(std::chrono::milliseconds timeout)230 bool Reactor::WaitForIdle(std::chrono::milliseconds timeout) {
231 auto promise = std::make_shared<std::promise<void>>();
232 auto future = std::make_unique<std::future<void>>(promise->get_future());
233 {
234 std::lock_guard<std::mutex> lock(mutex_);
235 idle_promise_ = promise;
236 }
237
238 auto control = eventfd_write(control_fd_, kWaitForIdle);
239 ASSERT(control != -1);
240
241 auto idle_status = future->wait_for(timeout);
242 return idle_status == std::future_status::ready;
243 }
244
ModifyRegistration(Reactor::Reactable * reactable,Closure on_read_ready,Closure on_write_ready)245 void Reactor::ModifyRegistration(Reactor::Reactable* reactable, Closure on_read_ready, Closure on_write_ready) {
246 ASSERT(reactable != nullptr);
247
248 uint32_t poll_event_type = 0;
249 if (!on_read_ready.is_null()) {
250 poll_event_type |= (EPOLLIN | EPOLLRDHUP);
251 }
252 if (!on_write_ready.is_null()) {
253 poll_event_type |= EPOLLOUT;
254 }
255 {
256 std::lock_guard<std::mutex> reactable_lock(reactable->mutex_);
257 reactable->on_read_ready_ = std::move(on_read_ready);
258 reactable->on_write_ready_ = std::move(on_write_ready);
259 }
260 epoll_event event = {
261 .events = poll_event_type,
262 .data = {.ptr = reactable},
263 };
264 int modify_fd;
265 RUN_NO_INTR(modify_fd = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, reactable->fd_, &event));
266 ASSERT(modify_fd != -1);
267 }
268
269 } // namespace os
270 } // namespace bluetooth
271