X-Git-Url: https://gerrit.automotivelinux.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Futils-upoll.c;h=03f9a08b1e9514ffeb58b45867959c7929e82cfe;hb=f262b0f726ac0577f40525038b779185f144873f;hp=6db2246d19e085325be284ccb3a6795768d55c89;hpb=fb230eee946673ed5ebe9659d623c2a06d0a80ce;p=src%2Fapp-framework-binder.git diff --git a/src/utils-upoll.c b/src/utils-upoll.c index 6db2246d..03f9a08b 100644 --- a/src/utils-upoll.c +++ b/src/utils-upoll.c @@ -24,36 +24,129 @@ #include "utils-upoll.h" +struct upollfd; +/* + * Structure describing one opened client + */ struct upoll { - int fd; - void (*read)(void *); - void (*write)(void *); - void (*hangup)(void *); - void *closure; - struct upoll *next; + struct upollfd *fd; /* structure handling the file descriptor */ + void (*read)(void *); /* callback for handling on_readable */ + void (*write)(void *); /* callback for handling on_writable */ + void (*hangup)(void *); /* callback for handling on_hangup */ + void *closure; /* closure for callbacks */ + struct upoll *next; /* next client of the same file descriptor */ +}; + +/* + * Structure describing a watched file descriptor + */ +struct upollfd +{ + int fd; /* watch file descriptor */ + uint32_t events; /* watched events */ + struct upollfd *next; /* next watched file descriptor */ + struct upoll *head; /* first client watching the file descriptor */ }; + +/* + * Structure describing a upoll group +struct upollgrp +{ + int pollfd; + struct upollfd *head; + struct upoll *current; + pthread_mutex_t mutex; +}; + + +static struct upollgrp global = { + .pollfd = 0, + .head = NULL, + .current = NULL, + .mutex = PTHREAD_MUTEX_INITIALIZER +}; + */ + static int pollfd = 0; -static struct upoll *head = NULL; +static struct upollfd *head = NULL; +static struct upoll *current = NULL; static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; -int upoll_is_valid(struct upoll *upoll) +/* + * Compute the events for the set of clients + */ +static int update_flags_locked(struct upollfd *ufd) { - struct upoll *it = head; - while (it != NULL) { - if (it == upoll) - return 1; - it = it->next; + int rc; + struct upoll *u; + struct epoll_event e; + uint32_t events; + + /* compute expected events */ + events = 0; + u = ufd->head; + while (u != NULL) { + if (u->read != NULL) + events |= EPOLLIN; + if (u->write != NULL) + events |= EPOLLOUT; + u = u->next; } + if (ufd->events == events) + rc = 0; + else { + e.events = events; + e.data.ptr = ufd; + rc = epoll_ctl(pollfd, EPOLL_CTL_MOD, ufd->fd, &e); + if (rc == 0) + ufd->events = events; + } + pthread_mutex_unlock(&mutex); + return rc; +} + +/* + * Compute the events for the set of clients + */ +static int update_flags(struct upollfd *ufd) +{ + pthread_mutex_lock(&mutex); + return update_flags_locked(ufd); +} + +/* + * + */ +static int update(struct upollfd *ufd) +{ + struct upollfd **prv; + + pthread_mutex_lock(&mutex); + if (ufd->head != NULL) + return update_flags_locked(ufd); + + /* no more watchers */ + prv = &head; + while(*prv) { + if (*prv == ufd) { + *prv = ufd->next; + break; + } + prv = &(*prv)->next; + } + pthread_mutex_unlock(&mutex); + epoll_ctl(pollfd, EPOLL_CTL_DEL, ufd->fd, NULL); + free(ufd); return 0; } -struct upoll *upoll_open(int fd, void *closure) +static struct upollfd *get_fd(int fd) { struct epoll_event e; - struct upoll *result; + struct upollfd *result; int rc; /* opens the epoll stream */ @@ -69,6 +162,14 @@ struct upoll *upoll_open(int fd, void *closure) } } + /* search */ + result = head; + while (result != NULL) { + if (result->fd == fd) + return result; + result = result->next; + } + /* allocates */ result = calloc(1, sizeof *result); if (result == NULL) @@ -76,7 +177,6 @@ struct upoll *upoll_open(int fd, void *closure) /* init */ result->fd = fd; - result->closure = closure; pthread_mutex_lock(&mutex); result->next = head; head = result; @@ -91,18 +191,52 @@ struct upoll *upoll_open(int fd, void *closure) /* revert on error */ rc = errno; - upoll_close(result); + update(result); errno = rc; return NULL; } -static int update(struct upoll *upoll) +int upoll_is_valid(struct upoll *upoll) { - struct epoll_event e; - e.events = (uint32_t)((upoll->read != NULL ? EPOLLIN : 0 ) - | (upoll->write != NULL ? EPOLLOUT : 0)); - e.data.ptr = upoll; - return epoll_ctl(pollfd, EPOLL_CTL_MOD, upoll->fd, &e); + struct upollfd *itfd = head; + struct upoll *it; + while (itfd != NULL) { + it = itfd->head; + while (it != NULL) { + if (it == upoll) + return 1; + it = it->next; + } + itfd = itfd->next; + } + return 0; +} + +struct upoll *upoll_open(int fd, void *closure) +{ + struct upollfd *ufd; + struct upoll *result; + + /* allocates */ + result = calloc(1, sizeof *result); + if (result == NULL) + return NULL; + + /* get for fd */ + ufd = get_fd(fd); + if (ufd == NULL) { + free(result); + return NULL; + } + + /* init */ + result->fd = ufd; + result->closure = closure; + pthread_mutex_lock(&mutex); + result->next = ufd->head; + ufd->head = result; + pthread_mutex_unlock(&mutex); + return result; } int upoll_on_readable(struct upoll *upoll, void (*process)(void *)) @@ -111,7 +245,7 @@ int upoll_on_readable(struct upoll *upoll, void (*process)(void *)) assert(upoll_is_valid(upoll)); upoll->read = process; - return update(upoll); + return update_flags(upoll->fd); } int upoll_on_writable(struct upoll *upoll, void (*process)(void *)) @@ -120,7 +254,7 @@ int upoll_on_writable(struct upoll *upoll, void (*process)(void *)) assert(upoll_is_valid(upoll)); upoll->write = process; - return update(upoll); + return update_flags(upoll->fd); } void upoll_on_hangup(struct upoll *upoll, void (*process)(void *)) @@ -134,38 +268,61 @@ void upoll_on_hangup(struct upoll *upoll, void (*process)(void *)) void upoll_close(struct upoll *upoll) { struct upoll **it; + struct upollfd *ufd; assert(pollfd != 0); assert(upoll_is_valid(upoll)); - epoll_ctl(pollfd, EPOLL_CTL_DEL, upoll->fd, NULL); + ufd = upoll->fd; pthread_mutex_lock(&mutex); - it = &head; + if (current == upoll) + current = NULL; + it = &ufd->head; while (*it != upoll) it = &(*it)->next; *it = upoll->next; pthread_mutex_unlock(&mutex); free(upoll); + update(ufd); } -void upoll_wait(int timeout) +int upoll_wait(int timeout) { int rc; struct epoll_event e; - struct upoll *upoll; + struct upollfd *ufd; - if (pollfd == 0) - return; + if (pollfd == 0) { + errno = ECANCELED; + return -1; + } - rc = epoll_wait(pollfd, &e, 1, timeout); + do { + rc = epoll_wait(pollfd, &e, 1, timeout); + } while (rc < 0 && errno == EINTR); if (rc == 1) { - upoll = e.data.ptr; - if ((e.events & EPOLLIN) && upoll->read) - upoll->read(upoll->closure); - if ((e.events & EPOLLOUT) && upoll->write) - upoll->write(upoll->closure); - if ((e.events & EPOLLHUP) && upoll->hangup) - upoll->hangup(upoll->closure); + ufd = e.data.ptr; + current = ufd->head; + e.events &= EPOLLIN | EPOLLOUT | EPOLLHUP; + while (current != NULL && e.events != 0) { + if ((e.events & EPOLLIN) && current->read) { + current->read(current->closure); + e.events &= (uint32_t)~EPOLLIN; + continue; + } + if ((e.events & EPOLLOUT) && current->write) { + current->write(current->closure); + e.events &= (uint32_t)~EPOLLOUT; + continue; + } + if ((e.events & EPOLLHUP) && current->hangup) { + current->hangup(current->closure); + if (current == NULL) + break; + } + current = current->next; + } } + return rc < 0 ? rc : 0; }