use upoll for event loop
[src/app-framework-binder.git] / src / utils-upoll.c
index 6db2246..d72a0e8 100644 (file)
 #include "utils-upoll.h"
 
 
+struct upollfd;
+
 struct upoll
 {
-       int fd;
+       struct upollfd *fd;
        void (*read)(void *);
        void (*write)(void *);
        void (*hangup)(void *);
@@ -35,25 +37,67 @@ struct upoll
        struct upoll *next;
 };
 
+struct upollfd
+{
+       int fd;
+       uint32_t events;
+       struct upollfd *next;
+       struct upoll *head;
+};
+
 static int pollfd = 0;
-static struct upoll *head = NULL;
+static struct upollfd *head = NULL;
 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
 
-int upoll_is_valid(struct upoll *upoll)
+static int update(struct upollfd *ufd)
 {
-       struct upoll *it = head;
-       while (it != NULL) {
-               if (it == upoll)
-                       return 1;
-               it = it->next;
+       int rc;
+       struct upoll *u;
+       struct epoll_event e;
+       uint32_t events;
+       struct upollfd **prv;
+
+       events = 0;
+       pthread_mutex_lock(&mutex);
+       u = ufd->head;
+       if (u == NULL) {
+               /* no more watchers */
+               prv = &head;
+               while(*prv) {
+                       if (*prv == ufd) {
+                               *prv = ufd->next;
+                               break;
+                       }
+                       prv = &(*prv)->next;
+               }
+               pthread_mutex_unlock(&mutex);
+               epoll_ctl(pollfd, EPOLL_CTL_DEL, ufd->fd, NULL);
+               free(ufd);
+               return 0;
        }
-       return 0;
+       /* compute the events for the watchers */
+       while (u != NULL) {
+               if (u->read != NULL)
+                       events |= EPOLLIN;
+               if (u->write != NULL)
+                       events |= EPOLLOUT;
+               u = u->next;
+       }
+       pthread_mutex_unlock(&mutex);
+       if (ufd->events == events)
+               return 0;
+       e.events = events;
+       e.data.ptr = ufd;
+       rc = epoll_ctl(pollfd, EPOLL_CTL_MOD, ufd->fd, &e);
+       if (rc == 0)
+               ufd->events = events;
+       return rc;
 }
 
-struct upoll *upoll_open(int fd, void *closure)
+static struct upollfd *get_fd(int fd)
 {
        struct epoll_event e;
-       struct upoll *result;
+       struct upollfd *result;
        int rc;
 
        /* opens the epoll stream */
@@ -69,6 +113,14 @@ struct upoll *upoll_open(int fd, void *closure)
                }
        }
 
+       /* search */
+       result = head;
+       while (result != NULL) {
+               if (result->fd == fd)
+                       return result;
+               result = result->next;
+       }
+
        /* allocates */
        result = calloc(1, sizeof *result);
        if (result == NULL)
@@ -76,7 +128,6 @@ struct upoll *upoll_open(int fd, void *closure)
 
        /* init */
        result->fd = fd;
-       result->closure = closure;
        pthread_mutex_lock(&mutex);
        result->next = head;
        head = result;
@@ -91,18 +142,52 @@ struct upoll *upoll_open(int fd, void *closure)
 
        /* revert on error */
        rc = errno;
-       upoll_close(result);
+       update(result);
        errno = rc;
        return NULL;
 }
 
-static int update(struct upoll *upoll)
+int upoll_is_valid(struct upoll *upoll)
 {
-       struct epoll_event e;
-       e.events = (uint32_t)((upoll->read != NULL ? EPOLLIN : 0 )
-                | (upoll->write != NULL ? EPOLLOUT : 0));
-       e.data.ptr = upoll;
-       return epoll_ctl(pollfd, EPOLL_CTL_MOD, upoll->fd, &e);
+       struct upollfd *itfd = head;
+       struct upoll *it;
+       while (itfd != NULL) {
+               it = itfd->head;
+               while (it != NULL) {
+                       if (it == upoll)
+                               return 1;
+                       it = it->next;
+               }
+               itfd = itfd->next;
+       }
+       return 0;
+}
+
+struct upoll *upoll_open(int fd, void *closure)
+{
+       struct upollfd *ufd;
+       struct upoll *result;
+
+       /* allocates */
+       result = calloc(1, sizeof *result);
+       if (result == NULL)
+               return NULL;
+
+       /* get for fd */
+       ufd = get_fd(fd);
+       if (ufd == NULL) {
+               free(result);
+               return NULL;
+       }
+
+       /* init */
+       result->fd = ufd;
+       result->closure = closure;
+       pthread_mutex_lock(&mutex);
+       result->next = ufd->head;
+       ufd->head = result;
+       pthread_mutex_unlock(&mutex);
+       return result;
 }
 
 int upoll_on_readable(struct upoll *upoll, void (*process)(void *))
@@ -111,7 +196,7 @@ int upoll_on_readable(struct upoll *upoll, void (*process)(void *))
        assert(upoll_is_valid(upoll));
 
        upoll->read = process;
-       return update(upoll);
+       return update(upoll->fd);
 }
 
 int upoll_on_writable(struct upoll *upoll, void (*process)(void *))
@@ -120,7 +205,7 @@ int upoll_on_writable(struct upoll *upoll, void (*process)(void *))
        assert(upoll_is_valid(upoll));
 
        upoll->write = process;
-       return update(upoll);
+       return update(upoll->fd);
 }
 
 void upoll_on_hangup(struct upoll *upoll, void (*process)(void *))
@@ -134,38 +219,56 @@ void upoll_on_hangup(struct upoll *upoll, void (*process)(void *))
 void upoll_close(struct upoll *upoll)
 {
        struct upoll **it;
+       struct upollfd *ufd;
 
        assert(pollfd != 0);
        assert(upoll_is_valid(upoll));
 
-       epoll_ctl(pollfd, EPOLL_CTL_DEL, upoll->fd, NULL);
+       ufd = upoll->fd;
        pthread_mutex_lock(&mutex);
-       it = &head;
+       it = &ufd->head;
        while (*it != upoll)
                it = &(*it)->next;
        *it = upoll->next;
        pthread_mutex_unlock(&mutex);
        free(upoll);
+       update(ufd);
 }
 
-void upoll_wait(int timeout)
+int upoll_wait(int timeout)
 {
        int rc;
        struct epoll_event e;
-       struct upoll *upoll;
+       struct upollfd *ufd;
+       struct upoll *u;
 
-       if (pollfd == 0)
-               return;
+       if (pollfd == 0) {
+               errno = ECANCELED;
+               return -1;
+       }
 
-       rc = epoll_wait(pollfd, &e, 1, timeout);
+       do {
+               rc = epoll_wait(pollfd, &e, 1, timeout);
+       } while (rc < 0 && errno == EINTR);
        if (rc == 1) {
-               upoll = e.data.ptr;
-               if ((e.events & EPOLLIN) && upoll->read)
-                       upoll->read(upoll->closure);
-               if ((e.events & EPOLLOUT) && upoll->write)
-                       upoll->write(upoll->closure);
-               if ((e.events & EPOLLHUP) && upoll->hangup)
-                       upoll->hangup(upoll->closure);
+               ufd = e.data.ptr;
+               u = ufd->head;
+               while (u != NULL) {
+                       if ((e.events & EPOLLIN) && u->read) {
+                               u->read(u->closure);
+                               break;
+                       }
+                       if ((e.events & EPOLLOUT) && u->write) {
+                               u->write(u->closure);
+                               break;
+                       }
+                       if ((e.events & EPOLLHUP) && u->hangup) {
+                               u->hangup(u->closure);
+                               break;
+                       }
+                       u = u->next;
+               }
        }
+       return rc < 0 ? rc : 0;
 }