#include "utils-upoll.h"
+struct upollfd;
+
struct upoll
{
- int fd;
- void (*process)(void *closure, int fd, uint32_t events);
+ struct upollfd *fd;
+ void (*read)(void *);
+ void (*write)(void *);
+ void (*hangup)(void *);
void *closure;
struct upoll *next;
};
+struct upollfd
+{
+ int fd;
+ uint32_t events;
+ struct upollfd *next;
+ struct upoll *head;
+};
+
static int pollfd = 0;
-static struct upoll *head = NULL;
+static struct upollfd *head = NULL;
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
-int upoll_is_valid(struct upoll *upoll)
+static int update(struct upollfd *ufd)
{
- struct upoll *it = head;
- while (it != NULL) {
- if (it == upoll)
- return 1;
- it = it->next;
+ int rc;
+ struct upoll *u;
+ struct epoll_event e;
+ uint32_t events;
+ struct upollfd **prv;
+
+ events = 0;
+ pthread_mutex_lock(&mutex);
+ u = ufd->head;
+ if (u == NULL) {
+ /* no more watchers */
+ prv = &head;
+ while(*prv) {
+ if (*prv == ufd) {
+ *prv = ufd->next;
+ break;
+ }
+ prv = &(*prv)->next;
+ }
+ pthread_mutex_unlock(&mutex);
+ epoll_ctl(pollfd, EPOLL_CTL_DEL, ufd->fd, NULL);
+ free(ufd);
+ return 0;
}
- return 0;
+ /* compute the events for the watchers */
+ while (u != NULL) {
+ if (u->read != NULL)
+ events |= EPOLLIN;
+ if (u->write != NULL)
+ events |= EPOLLOUT;
+ u = u->next;
+ }
+ pthread_mutex_unlock(&mutex);
+ if (ufd->events == events)
+ return 0;
+ e.events = events;
+ e.data.ptr = ufd;
+ rc = epoll_ctl(pollfd, EPOLL_CTL_MOD, ufd->fd, &e);
+ if (rc == 0)
+ ufd->events = events;
+ return rc;
}
-struct upoll *upoll_open(int fd, uint32_t events, void (*process)(void *closure, int fd, uint32_t events), void *closure)
+static struct upollfd *get_fd(int fd)
{
struct epoll_event e;
- struct upoll *result;
+ struct upollfd *result;
int rc;
/* opens the epoll stream */
}
}
+ /* search */
+ result = head;
+ while (result != NULL) {
+ if (result->fd == fd)
+ return result;
+ result = result->next;
+ }
+
/* allocates */
- result = malloc(sizeof *result);
+ result = calloc(1, sizeof *result);
if (result == NULL)
return NULL;
/* init */
result->fd = fd;
- result->process = process;
- result->closure = closure;
pthread_mutex_lock(&mutex);
result->next = head;
head = result;
pthread_mutex_unlock(&mutex);
/* records */
- e.events = events;
+ e.events = 0;
e.data.ptr = result;
rc = epoll_ctl(pollfd, EPOLL_CTL_ADD, fd, &e);
if (rc == 0)
/* revert on error */
rc = errno;
- upoll_close(result);
+ update(result);
errno = rc;
return NULL;
}
-int upoll_update(struct upoll *upoll, uint32_t events)
+int upoll_is_valid(struct upoll *upoll)
{
- struct epoll_event e;
+ struct upollfd *itfd = head;
+ struct upoll *it;
+ while (itfd != NULL) {
+ it = itfd->head;
+ while (it != NULL) {
+ if (it == upoll)
+ return 1;
+ it = it->next;
+ }
+ itfd = itfd->next;
+ }
+ return 0;
+}
+
+struct upoll *upoll_open(int fd, void *closure)
+{
+ struct upollfd *ufd;
+ struct upoll *result;
+ /* allocates */
+ result = calloc(1, sizeof *result);
+ if (result == NULL)
+ return NULL;
+
+ /* get for fd */
+ ufd = get_fd(fd);
+ if (ufd == NULL) {
+ free(result);
+ return NULL;
+ }
+
+ /* init */
+ result->fd = ufd;
+ result->closure = closure;
+ pthread_mutex_lock(&mutex);
+ result->next = ufd->head;
+ ufd->head = result;
+ pthread_mutex_unlock(&mutex);
+ return result;
+}
+
+int upoll_on_readable(struct upoll *upoll, void (*process)(void *))
+{
assert(pollfd != 0);
assert(upoll_is_valid(upoll));
- e.events = events;
- e.data.ptr = upoll;
- return epoll_ctl(pollfd, EPOLL_CTL_MOD, upoll->fd, &e);
+ upoll->read = process;
+ return update(upoll->fd);
+}
+
+int upoll_on_writable(struct upoll *upoll, void (*process)(void *))
+{
+ assert(pollfd != 0);
+ assert(upoll_is_valid(upoll));
+
+ upoll->write = process;
+ return update(upoll->fd);
+}
+
+void upoll_on_hangup(struct upoll *upoll, void (*process)(void *))
+{
+ assert(pollfd != 0);
+ assert(upoll_is_valid(upoll));
+
+ upoll->hangup = process;
}
void upoll_close(struct upoll *upoll)
{
struct upoll **it;
+ struct upollfd *ufd;
assert(pollfd != 0);
assert(upoll_is_valid(upoll));
- epoll_ctl(pollfd, EPOLL_CTL_DEL, upoll->fd, NULL);
+ ufd = upoll->fd;
pthread_mutex_lock(&mutex);
- it = &head;
+ it = &ufd->head;
while (*it != upoll)
it = &(*it)->next;
*it = upoll->next;
pthread_mutex_unlock(&mutex);
free(upoll);
+ update(ufd);
}
-void upoll_wait(int timeout)
+int upoll_wait(int timeout)
{
int rc;
struct epoll_event e;
- struct upoll *upoll;
+ struct upollfd *ufd;
+ struct upoll *u;
- if (pollfd == 0)
- return;
+ if (pollfd == 0) {
+ errno = ECANCELED;
+ return -1;
+ }
- rc = epoll_wait(pollfd, &e, 1, timeout);
+ do {
+ rc = epoll_wait(pollfd, &e, 1, timeout);
+ } while (rc < 0 && errno == EINTR);
if (rc == 1) {
- upoll = e.data.ptr;
- upoll->process(upoll->closure, upoll->fd, e.events);
+ ufd = e.data.ptr;
+ u = ufd->head;
+ while (u != NULL) {
+ if ((e.events & EPOLLIN) && u->read) {
+ u->read(u->closure);
+ }
+ if ((e.events & EPOLLOUT) && u->write) {
+ u->write(u->closure);
+ }
+ if ((e.events & EPOLLHUP) && u->hangup) {
+ u->hangup(u->closure);
+ }
+ u = u->next;
+ }
}
+ return rc < 0 ? rc : 0;
}