first add of asynchonous handling
[src/app-framework-binder.git] / src / utils-upoll.c
index d72a0e8..03f9a08 100644 (file)
 
 #include "utils-upoll.h"
 
-
 struct upollfd;
 
+/*
+ * Structure describing one opened client
+ */
 struct upoll
 {
-       struct upollfd *fd;
-       void (*read)(void *);
-       void (*write)(void *);
-       void (*hangup)(void *);
-       void *closure;
-       struct upoll *next;
+       struct upollfd *fd;     /* structure handling the file descriptor */
+       void (*read)(void *);   /* callback for handling on_readable */
+       void (*write)(void *);  /* callback for handling on_writable */
+       void (*hangup)(void *); /* callback for handling on_hangup */
+       void *closure;          /* closure for callbacks */
+       struct upoll *next;     /* next client of the same file descriptor */
 };
 
+/*
+ * Structure describing a watched file descriptor
+ */
 struct upollfd
 {
-       int fd;
-       uint32_t events;
-       struct upollfd *next;
-       struct upoll *head;
+       int fd;                 /* watch file descriptor */
+       uint32_t events;        /* watched events */
+       struct upollfd *next;   /* next watched file descriptor */
+       struct upoll *head;     /* first client watching the file descriptor */
+};
+
+
+/*
+ * Structure describing a upoll group
+struct upollgrp
+{
+       int pollfd;
+       struct upollfd *head;
+       struct upoll *current;
+       pthread_mutex_t mutex;
+};
+
+
+static struct upollgrp global = {
+       .pollfd = 0,
+       .head = NULL,
+       .current = NULL,
+       .mutex = PTHREAD_MUTEX_INITIALIZER
 };
+ */
 
 static int pollfd = 0;
 static struct upollfd *head = NULL;
+static struct upoll *current = NULL;
 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
 
-static int update(struct upollfd *ufd)
+/*
+ * Compute the events for the set of clients
+ */
+static int update_flags_locked(struct upollfd *ufd)
 {
        int rc;
        struct upoll *u;
        struct epoll_event e;
        uint32_t events;
-       struct upollfd **prv;
 
+       /* compute expected events */
        events = 0;
-       pthread_mutex_lock(&mutex);
        u = ufd->head;
-       if (u == NULL) {
-               /* no more watchers */
-               prv = &head;
-               while(*prv) {
-                       if (*prv == ufd) {
-                               *prv = ufd->next;
-                               break;
-                       }
-                       prv = &(*prv)->next;
-               }
-               pthread_mutex_unlock(&mutex);
-               epoll_ctl(pollfd, EPOLL_CTL_DEL, ufd->fd, NULL);
-               free(ufd);
-               return 0;
-       }
-       /* compute the events for the watchers */
        while (u != NULL) {
                if (u->read != NULL)
                        events |= EPOLLIN;
@@ -83,17 +95,54 @@ static int update(struct upollfd *ufd)
                        events |= EPOLLOUT;
                u = u->next;
        }
-       pthread_mutex_unlock(&mutex);
        if (ufd->events == events)
-               return 0;
-       e.events = events;
-       e.data.ptr = ufd;
-       rc = epoll_ctl(pollfd, EPOLL_CTL_MOD, ufd->fd, &e);
-       if (rc == 0)
-               ufd->events = events;
+               rc = 0;
+       else {
+               e.events = events;
+               e.data.ptr = ufd;
+               rc = epoll_ctl(pollfd, EPOLL_CTL_MOD, ufd->fd, &e);
+               if (rc == 0)
+                       ufd->events = events;
+       }
+       pthread_mutex_unlock(&mutex);
        return rc;
 }
 
+/*
+ * Compute the events for the set of clients
+ */
+static int update_flags(struct upollfd *ufd)
+{
+       pthread_mutex_lock(&mutex);
+       return update_flags_locked(ufd);
+}
+
+/*
+ *
+ */
+static int update(struct upollfd *ufd)
+{
+       struct upollfd **prv;
+
+       pthread_mutex_lock(&mutex);
+       if (ufd->head != NULL)
+               return update_flags_locked(ufd);
+
+       /* no more watchers */
+       prv = &head;
+       while(*prv) {
+               if (*prv == ufd) {
+                       *prv = ufd->next;
+                       break;
+               }
+               prv = &(*prv)->next;
+       }
+       pthread_mutex_unlock(&mutex);
+       epoll_ctl(pollfd, EPOLL_CTL_DEL, ufd->fd, NULL);
+       free(ufd);
+       return 0;
+}
+
 static struct upollfd *get_fd(int fd)
 {
        struct epoll_event e;
@@ -196,7 +245,7 @@ int upoll_on_readable(struct upoll *upoll, void (*process)(void *))
        assert(upoll_is_valid(upoll));
 
        upoll->read = process;
-       return update(upoll->fd);
+       return update_flags(upoll->fd);
 }
 
 int upoll_on_writable(struct upoll *upoll, void (*process)(void *))
@@ -205,7 +254,7 @@ int upoll_on_writable(struct upoll *upoll, void (*process)(void *))
        assert(upoll_is_valid(upoll));
 
        upoll->write = process;
-       return update(upoll->fd);
+       return update_flags(upoll->fd);
 }
 
 void upoll_on_hangup(struct upoll *upoll, void (*process)(void *))
@@ -226,6 +275,8 @@ void upoll_close(struct upoll *upoll)
 
        ufd = upoll->fd;
        pthread_mutex_lock(&mutex);
+       if (current == upoll)
+               current = NULL;
        it = &ufd->head;
        while (*it != upoll)
                it = &(*it)->next;
@@ -240,7 +291,6 @@ int upoll_wait(int timeout)
        int rc;
        struct epoll_event e;
        struct upollfd *ufd;
-       struct upoll *u;
 
        if (pollfd == 0) {
                errno = ECANCELED;
@@ -252,21 +302,25 @@ int upoll_wait(int timeout)
        } while (rc < 0 && errno == EINTR);
        if (rc == 1) {
                ufd = e.data.ptr;
-               u = ufd->head;
-               while (u != NULL) {
-                       if ((e.events & EPOLLIN) && u->read) {
-                               u->read(u->closure);
-                               break;
+               current = ufd->head;
+               e.events &= EPOLLIN | EPOLLOUT | EPOLLHUP;
+               while (current != NULL && e.events != 0) {
+                       if ((e.events & EPOLLIN) && current->read) {
+                               current->read(current->closure);
+                               e.events &= (uint32_t)~EPOLLIN;
+                               continue;
                        }
-                       if ((e.events & EPOLLOUT) && u->write) {
-                               u->write(u->closure);
-                               break;
+                       if ((e.events & EPOLLOUT) && current->write) {
+                               current->write(current->closure);
+                               e.events &= (uint32_t)~EPOLLOUT;
+                               continue;
                        }
-                       if ((e.events & EPOLLHUP) && u->hangup) {
-                               u->hangup(u->closure);
-                               break;
+                       if ((e.events & EPOLLHUP) && current->hangup) {
+                               current->hangup(current->closure);
+                               if (current == NULL)
+                                       break;
                        }
-                       u = u->next;
+                       current = current->next;
                }
        }
        return rc < 0 ? rc : 0;