#include "utils-upoll.h"
-
struct upollfd;
+/*
+ * Structure describing one opened client
+ */
struct upoll
{
- struct upollfd *fd;
- void (*read)(void *);
- void (*write)(void *);
- void (*hangup)(void *);
- void *closure;
- struct upoll *next;
+ struct upollfd *fd; /* structure handling the file descriptor */
+ void (*read)(void *); /* callback for handling on_readable */
+ void (*write)(void *); /* callback for handling on_writable */
+ void (*hangup)(void *); /* callback for handling on_hangup */
+ void *closure; /* closure for callbacks */
+ struct upoll *next; /* next client of the same file descriptor */
};
+/*
+ * Structure describing a watched file descriptor
+ */
struct upollfd
{
- int fd;
- uint32_t events;
- struct upollfd *next;
- struct upoll *head;
+ int fd; /* watch file descriptor */
+ uint32_t events; /* watched events */
+ struct upollfd *next; /* next watched file descriptor */
+ struct upoll *head; /* first client watching the file descriptor */
+};
+
+
+/*
+ * Structure describing a upoll group
+struct upollgrp
+{
+ int pollfd;
+ struct upollfd *head;
+ struct upoll *current;
+ pthread_mutex_t mutex;
+};
+
+
+static struct upollgrp global = {
+ .pollfd = 0,
+ .head = NULL,
+ .current = NULL,
+ .mutex = PTHREAD_MUTEX_INITIALIZER
};
+ */
static int pollfd = 0;
static struct upollfd *head = NULL;
+static struct upoll *current = NULL;
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
-static int update(struct upollfd *ufd)
+/*
+ * Compute the events for the set of clients
+ */
+static int update_flags_locked(struct upollfd *ufd)
{
int rc;
struct upoll *u;
struct epoll_event e;
uint32_t events;
- struct upollfd **prv;
+ /* compute expected events */
events = 0;
- pthread_mutex_lock(&mutex);
u = ufd->head;
- if (u == NULL) {
- /* no more watchers */
- prv = &head;
- while(*prv) {
- if (*prv == ufd) {
- *prv = ufd->next;
- break;
- }
- prv = &(*prv)->next;
- }
- pthread_mutex_unlock(&mutex);
- epoll_ctl(pollfd, EPOLL_CTL_DEL, ufd->fd, NULL);
- free(ufd);
- return 0;
- }
- /* compute the events for the watchers */
while (u != NULL) {
if (u->read != NULL)
events |= EPOLLIN;
events |= EPOLLOUT;
u = u->next;
}
- pthread_mutex_unlock(&mutex);
if (ufd->events == events)
- return 0;
- e.events = events;
- e.data.ptr = ufd;
- rc = epoll_ctl(pollfd, EPOLL_CTL_MOD, ufd->fd, &e);
- if (rc == 0)
- ufd->events = events;
+ rc = 0;
+ else {
+ e.events = events;
+ e.data.ptr = ufd;
+ rc = epoll_ctl(pollfd, EPOLL_CTL_MOD, ufd->fd, &e);
+ if (rc == 0)
+ ufd->events = events;
+ }
+ pthread_mutex_unlock(&mutex);
return rc;
}
+/*
+ * Compute the events for the set of clients
+ */
+static int update_flags(struct upollfd *ufd)
+{
+ pthread_mutex_lock(&mutex);
+ return update_flags_locked(ufd);
+}
+
+/*
+ *
+ */
+static int update(struct upollfd *ufd)
+{
+ struct upollfd **prv;
+
+ pthread_mutex_lock(&mutex);
+ if (ufd->head != NULL)
+ return update_flags_locked(ufd);
+
+ /* no more watchers */
+ prv = &head;
+ while(*prv) {
+ if (*prv == ufd) {
+ *prv = ufd->next;
+ break;
+ }
+ prv = &(*prv)->next;
+ }
+ pthread_mutex_unlock(&mutex);
+ epoll_ctl(pollfd, EPOLL_CTL_DEL, ufd->fd, NULL);
+ free(ufd);
+ return 0;
+}
+
static struct upollfd *get_fd(int fd)
{
struct epoll_event e;
assert(upoll_is_valid(upoll));
upoll->read = process;
- return update(upoll->fd);
+ return update_flags(upoll->fd);
}
int upoll_on_writable(struct upoll *upoll, void (*process)(void *))
assert(upoll_is_valid(upoll));
upoll->write = process;
- return update(upoll->fd);
+ return update_flags(upoll->fd);
}
void upoll_on_hangup(struct upoll *upoll, void (*process)(void *))
ufd = upoll->fd;
pthread_mutex_lock(&mutex);
+ if (current == upoll)
+ current = NULL;
it = &ufd->head;
while (*it != upoll)
it = &(*it)->next;
int rc;
struct epoll_event e;
struct upollfd *ufd;
- struct upoll *u;
if (pollfd == 0) {
errno = ECANCELED;
} while (rc < 0 && errno == EINTR);
if (rc == 1) {
ufd = e.data.ptr;
- u = ufd->head;
- while (u != NULL) {
- if ((e.events & EPOLLIN) && u->read) {
- u->read(u->closure);
+ current = ufd->head;
+ e.events &= EPOLLIN | EPOLLOUT | EPOLLHUP;
+ while (current != NULL && e.events != 0) {
+ if ((e.events & EPOLLIN) && current->read) {
+ current->read(current->closure);
+ e.events &= (uint32_t)~EPOLLIN;
+ continue;
}
- if ((e.events & EPOLLOUT) && u->write) {
- u->write(u->closure);
+ if ((e.events & EPOLLOUT) && current->write) {
+ current->write(current->closure);
+ e.events &= (uint32_t)~EPOLLOUT;
+ continue;
}
- if ((e.events & EPOLLHUP) && u->hangup) {
- u->hangup(u->closure);
+ if ((e.events & EPOLLHUP) && current->hangup) {
+ current->hangup(current->closure);
+ if (current == NULL)
+ break;
}
- u = u->next;
+ current = current->next;
}
}
return rc < 0 ? rc : 0;