/****************************************************************************** * * Copyright 2014 Google, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ******************************************************************************/ #define LOG_TAG "bt_osi_reactor" #include "osi/include/reactor.h" #include #include #include #include #include #include #include #include #include #include "osi/include/allocator.h" #include "osi/include/list.h" #include "osi/include/log.h" #if !defined(EFD_SEMAPHORE) #define EFD_SEMAPHORE (1 << 0) #endif struct reactor_t { int epoll_fd; int event_fd; std::mutex* list_mutex; list_t* invalidation_list; // reactor objects that have been unregistered. pthread_t run_thread; // the pthread on which reactor_run is executing. bool is_running; // indicates whether |run_thread| is valid. bool object_removed; }; struct reactor_object_t { int fd; // the file descriptor to monitor for events. void* context; // a context that's passed back to the *_ready functions. reactor_t* reactor; // the reactor instance this object is registered with. std::mutex* mutex; // protects the lifetime of this object and all variables. void (*read_ready)(void* context); // function to call when the file // descriptor becomes readable. void (*write_ready)(void* context); // function to call when the file // descriptor becomes writeable. }; static reactor_status_t run_reactor(reactor_t* reactor, int iterations); static const size_t MAX_EVENTS = 64; static const eventfd_t EVENT_REACTOR_STOP = 1; reactor_t* reactor_new(void) { reactor_t* ret = (reactor_t*)osi_calloc(sizeof(reactor_t)); ret->epoll_fd = INVALID_FD; ret->event_fd = INVALID_FD; ret->epoll_fd = epoll_create1(EPOLL_CLOEXEC); if (ret->epoll_fd == INVALID_FD) { LOG_ERROR("%s unable to create epoll instance: %s", __func__, strerror(errno)); goto error; } ret->event_fd = eventfd(0, 0); if (ret->event_fd == INVALID_FD) { LOG_ERROR("%s unable to create eventfd: %s", __func__, strerror(errno)); goto error; } ret->list_mutex = new std::mutex; ret->invalidation_list = list_new(NULL); if (!ret->invalidation_list) { LOG_ERROR("%s unable to allocate object invalidation list.", __func__); goto error; } struct epoll_event event; memset(&event, 0, sizeof(event)); event.events = EPOLLIN; event.data.ptr = NULL; if (epoll_ctl(ret->epoll_fd, EPOLL_CTL_ADD, ret->event_fd, &event) == -1) { LOG_ERROR("%s unable to register eventfd with epoll set: %s", __func__, strerror(errno)); goto error; } return ret; error:; reactor_free(ret); return NULL; } void reactor_free(reactor_t* reactor) { if (!reactor) return; list_free(reactor->invalidation_list); close(reactor->event_fd); close(reactor->epoll_fd); osi_free(reactor); } reactor_status_t reactor_start(reactor_t* reactor) { CHECK(reactor != NULL); return run_reactor(reactor, 0); } reactor_status_t reactor_run_once(reactor_t* reactor) { CHECK(reactor != NULL); return run_reactor(reactor, 1); } void reactor_stop(reactor_t* reactor) { CHECK(reactor != NULL); eventfd_write(reactor->event_fd, EVENT_REACTOR_STOP); } reactor_object_t* reactor_register(reactor_t* reactor, int fd, void* context, void (*read_ready)(void* context), void (*write_ready)(void* context)) { CHECK(reactor != NULL); CHECK(fd != INVALID_FD); reactor_object_t* object = (reactor_object_t*)osi_calloc(sizeof(reactor_object_t)); object->reactor = reactor; object->fd = fd; object->context = context; object->read_ready = read_ready; object->write_ready = write_ready; object->mutex = new std::mutex; struct epoll_event event; memset(&event, 0, sizeof(event)); if (read_ready) event.events |= (EPOLLIN | EPOLLRDHUP); if (write_ready) event.events |= EPOLLOUT; event.data.ptr = object; if (epoll_ctl(reactor->epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1) { LOG_ERROR("%s unable to register fd %d to epoll set: %s", __func__, fd, strerror(errno)); delete object->mutex; osi_free(object); return NULL; } return object; } bool reactor_change_registration(reactor_object_t* object, void (*read_ready)(void* context), void (*write_ready)(void* context)) { CHECK(object != NULL); struct epoll_event event; memset(&event, 0, sizeof(event)); if (read_ready) event.events |= (EPOLLIN | EPOLLRDHUP); if (write_ready) event.events |= EPOLLOUT; event.data.ptr = object; if (epoll_ctl(object->reactor->epoll_fd, EPOLL_CTL_MOD, object->fd, &event) == -1) { LOG_ERROR("%s unable to modify interest set for fd %d: %s", __func__, object->fd, strerror(errno)); return false; } std::lock_guard lock(*object->mutex); object->read_ready = read_ready; object->write_ready = write_ready; return true; } void reactor_unregister(reactor_object_t* obj) { CHECK(obj != NULL); reactor_t* reactor = obj->reactor; if (epoll_ctl(reactor->epoll_fd, EPOLL_CTL_DEL, obj->fd, NULL) == -1) LOG_ERROR("%s unable to unregister fd %d from epoll set: %s", __func__, obj->fd, strerror(errno)); if (reactor->is_running && pthread_equal(pthread_self(), reactor->run_thread)) { reactor->object_removed = true; return; } { std::unique_lock lock(*reactor->list_mutex); list_append(reactor->invalidation_list, obj); } // Taking the object lock here makes sure a callback for |obj| isn't // currently executing. The reactor thread must then either be before // the callbacks or after. If after, we know that the object won't be // referenced because it has been taken out of the epoll set. If before, // it won't be referenced because the reactor thread will check the // invalidation_list and find it in there. So by taking this lock, we // are waiting until the reactor thread drops all references to |obj|. // One the wait completes, we can unlock and destroy |obj| safely. obj->mutex->lock(); obj->mutex->unlock(); delete obj->mutex; osi_free(obj); } // Runs the reactor loop for a maximum of |iterations|. // 0 |iterations| means loop forever. // |reactor| may not be NULL. static reactor_status_t run_reactor(reactor_t* reactor, int iterations) { CHECK(reactor != NULL); reactor->run_thread = pthread_self(); reactor->is_running = true; struct epoll_event events[MAX_EVENTS]; for (int i = 0; iterations == 0 || i < iterations; ++i) { { std::lock_guard lock(*reactor->list_mutex); list_clear(reactor->invalidation_list); } int ret; OSI_NO_INTR(ret = epoll_wait(reactor->epoll_fd, events, MAX_EVENTS, -1)); if (ret == -1) { LOG_ERROR("%s error in epoll_wait: %s", __func__, strerror(errno)); reactor->is_running = false; return REACTOR_STATUS_ERROR; } for (int j = 0; j < ret; ++j) { // The event file descriptor is the only one that registers with // a NULL data pointer. We use the NULL to identify it and break // out of the reactor loop. if (events[j].data.ptr == NULL) { eventfd_t value; eventfd_read(reactor->event_fd, &value); reactor->is_running = false; return REACTOR_STATUS_STOP; } reactor_object_t* object = (reactor_object_t*)events[j].data.ptr; std::unique_lock lock(*reactor->list_mutex); if (list_contains(reactor->invalidation_list, object)) { continue; } // Downgrade the list lock to an object lock. { std::lock_guard obj_lock(*object->mutex); lock.unlock(); reactor->object_removed = false; if (events[j].events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR) && object->read_ready) object->read_ready(object->context); if (!reactor->object_removed && events[j].events & EPOLLOUT && object->write_ready) object->write_ready(object->context); } if (reactor->object_removed) { delete object->mutex; osi_free(object); } } } reactor->is_running = false; return REACTOR_STATUS_DONE; }