Switch to libsystemd events
authorJosé Bollo <jose.bollo@iot.bzh>
Tue, 3 May 2016 08:03:58 +0000 (10:03 +0200)
committerJosé Bollo <jose.bollo@iot.bzh>
Wed, 4 May 2016 09:55:38 +0000 (11:55 +0200)
This patch removes part of code that are
not specific in favour of a more shared
library.

Change-Id: I3506e7514181cfbed753559bb65460f95b2141c9
Signed-off-by: José Bollo <jose.bollo@iot.bzh>
13 files changed:
CMakeLists.txt
include/afb-plugin.h
include/afb-pollmgr-itf.h [deleted file]
plugins/CMakeLists.txt
src/CMakeLists.txt
src/afb-api-so.c
src/afb-common.c [new file with mode: 0644]
src/afb-common.h [moved from src/utils-upoll.h with 52% similarity]
src/afb-hsrv.c
src/afb-websock.c
src/afb-ws.c
src/main.c
src/utils-upoll.c [deleted file]

index 2ca0e89..4a85628 100644 (file)
@@ -53,6 +53,7 @@ IF(CMAKE_BUILD_TYPE MATCHES Debug)
 ENDIF(CMAKE_BUILD_TYPE MATCHES Debug)
 
 INCLUDE(FindPkgConfig)
+PKG_CHECK_MODULES(libsystemd REQUIRED libsystemd)
 PKG_CHECK_MODULES(json-c REQUIRED json-c)
 PKG_CHECK_MODULES(libmicrohttpd REQUIRED libmicrohttpd>=0.9.48)
 PKG_CHECK_MODULES(openssl REQUIRED openssl)
@@ -90,6 +91,7 @@ FIND_PACKAGE(Threads)
 SET(include_dirs
        ${INCLUDE_DIRS}
        ${CMAKE_SOURCE_DIR}/include
+       ${libsystemd_INCLUDE_DIRS}
        ${json-c_INCLUDE_DIRS}
        ${libmicrohttpd_INCLUDE_DIRS}
        ${uuid_INCLUDE_DIRS}
@@ -102,6 +104,7 @@ SET(include_dirs
 )
 
 SET(link_libraries
+       ${libsystemd_LIBRARIES}
        ${json-c_LIBRARIES}
        ${libmicrohttpd_LIBRARIES}
        ${uuid_LIBRARIES}
index dfb6fba..ce78e84 100644 (file)
@@ -18,7 +18,6 @@
 #pragma once
 
 #include "afb-req-itf.h"
-#include "afb-pollmgr-itf.h"
 #include "afb-evmgr-itf.h"
 
 /* Plugin Type */
@@ -64,9 +63,14 @@ enum AFB_Mode {
        AFB_MODE_GLOBAL
 };
 
+struct sd_event;
+struct sd_bus;
+
 struct afb_daemon_itf {
        struct afb_evmgr (*get_evmgr)(void *closure);
-       struct afb_pollmgr (*get_pollmgr)(void *closure);
+       struct sd_event *(*get_event_loop)(void *closure);
+       struct sd_bus *(*get_user_bus)(void *closure);
+       struct sd_bus *(*get_system_bus)(void *closure);
 };
 
 struct afb_daemon {
@@ -88,9 +92,19 @@ static inline struct afb_evmgr afb_daemon_get_evmgr(struct afb_daemon daemon)
        return daemon.itf->get_evmgr(daemon.closure);
 }
 
-static inline struct afb_pollmgr afb_daemon_get_pollmgr(struct afb_daemon daemon)
+static inline struct sd_event *afb_daemon_get_event_loop(struct afb_daemon daemon)
+{
+       return daemon.itf->get_event_loop(daemon.closure);
+}
+
+static inline struct sd_bus *afb_daemon_get_user_bus(struct afb_daemon daemon)
+{
+       return daemon.itf->get_user_bus(daemon.closure);
+}
+
+static inline struct sd_bus *afb_daemon_get_system_bus(struct afb_daemon daemon)
 {
-       return daemon.itf->get_pollmgr(daemon.closure);
+       return daemon.itf->get_system_bus(daemon.closure);
 }
 
 
diff --git a/include/afb-pollmgr-itf.h b/include/afb-pollmgr-itf.h
deleted file mode 100644 (file)
index c3a62c1..0000000
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright 2016 IoT.bzh
- * Author: José Bollo <jose.bollo@iot.bzh>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-struct afb_pollmgr_itf
-{
-       int (*wait)(int timeout, void *pollclosure);
-       void *(*open)(int fd, void *closure, void *pollclosure);
-       int (*on_readable)(void *hndl, void (*cb)(void *closure));
-       int (*on_writable)(void *hndl, void (*cb)(void *closure));
-       void (*on_hangup)(void *hndl, void (*cb)(void *closure));
-       void (*close)(void *hndl);
-};
-
-struct afb_pollmgr
-{
-       const struct afb_pollmgr_itf *itf;
-       void *closure;
-};
-
-static inline int afb_pollmgr_wait(struct afb_pollmgr pollmgr, int timeout)
-{
-       return pollmgr.itf->wait(timeout, pollmgr.closure);
-}
-
-static inline void *afb_pollmgr_open(struct afb_pollmgr pollmgr, int fd, void *closure)
-{
-       return pollmgr.itf->open(fd, closure, pollmgr.closure);
-}
-
-static inline int afb_pollmgr_on_readable(struct afb_pollmgr pollmgr, void *hndl, void (*cb)(void *closure))
-{
-       return pollmgr.itf->on_readable(hndl, cb);
-}
-
-static inline int afb_pollmgr_on_writable(struct afb_pollmgr pollmgr, void *hndl, void (*cb)(void *closure))
-{
-       return pollmgr.itf->on_writable(hndl, cb);
-}
-
-static inline void afb_pollmgr_on_hangup(struct afb_pollmgr pollmgr, void *hndl, void (*cb)(void *closure))
-{
-       pollmgr.itf->on_hangup(hndl, cb);
-}
-
-
index c60e352..5d8b26a 100644 (file)
@@ -1,4 +1,4 @@
-ADD_SUBDIRECTORY(afm-main-plugin)
+#ADD_SUBDIRECTORY(afm-main-plugin)
 ADD_SUBDIRECTORY(session)
 ADD_SUBDIRECTORY(samples)
 #ADD_SUBDIRECTORY(audio)
index 5153c5c..bc5fd4f 100644 (file)
@@ -12,9 +12,9 @@ ADD_LIBRARY(src OBJECT
        afb-ws.c
        afb-ws-json.c
        afb-msg-json.c
+       afb-common.c
        websock.c
        verbose.c
-       utils-upoll.c
 )
 
 INCLUDE_DIRECTORIES(${include_dirs})
index 88246b5..89418fb 100644 (file)
 
 #include "afb-plugin.h"
 #include "afb-req-itf.h"
-#include "afb-pollmgr-itf.h"
 #include "afb-evmgr-itf.h"
 
 #include "session.h"
+#include "afb-common.h"
 #include "afb-apis.h"
 #include "afb-api-so.h"
 #include "verbose.h"
-#include "utils-upoll.h"
 
 extern __thread sigjmp_buf *error_handler;
 
@@ -55,15 +54,6 @@ static int api_timeout = 15;
 
 static const char plugin_register_function[] = "pluginRegister";
 
-static const struct afb_pollmgr_itf pollmgr_itf = {
-       .wait = (void*)upoll_wait,
-       .open = (void*)upoll_open,
-       .on_readable = (void*)upoll_on_readable,
-       .on_writable = (void*)upoll_on_writable,
-       .on_hangup = (void*)upoll_on_hangup,
-       .close = (void*)upoll_close
-};
-
 static void afb_api_so_evmgr_push(struct api_so_desc *desc, const char *name, struct json_object *object)
 {
        size_t length;
@@ -87,16 +77,14 @@ static struct afb_evmgr afb_api_so_get_evmgr(struct api_so_desc *desc)
        return (struct afb_evmgr){ .itf = &evmgr_itf, .closure = desc };
 }
 
-static struct afb_pollmgr afb_api_so_get_pollmgr(struct api_so_desc *desc)
-{
-       return (struct afb_pollmgr){ .itf = &pollmgr_itf, .closure = NULL };
-}
-
 static const struct afb_daemon_itf daemon_itf = {
        .get_evmgr = (void*)afb_api_so_get_evmgr,
-       .get_pollmgr = (void*)afb_api_so_get_pollmgr
+       .get_event_loop = (void*)afb_common_get_event_loop,
+       .get_user_bus = (void*)afb_common_get_user_bus,
+       .get_system_bus = (void*)afb_common_get_system_bus
 };
 
+
 static void trapping_call(struct afb_req req, void(*cb)(struct afb_req))
 {
        volatile int signum, timerset;
diff --git a/src/afb-common.c b/src/afb-common.c
new file mode 100644 (file)
index 0000000..e8fe4b7
--- /dev/null
@@ -0,0 +1,89 @@
+/* 
+ * Copyright (C) 2015 "IoT.bzh"
+ * Author José Bollo <jose.bollo@iot.bzh>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define _GNU_SOURCE
+
+#include <errno.h>
+#include <systemd/sd-event.h>
+#include <systemd/sd-bus.h>
+
+#include "afb-common.h"
+
+/*
+struct sd_event *afb_common_get_thread_event_loop()
+{
+       sd_event *result;
+       int rc = sd_event_default(&result);
+       if (rc != 0) {
+               errno = -rc;
+               result = NULL;
+       }
+       return result;
+}
+*/
+
+static void *sdopen(void **p, int (*f)(void **))
+{
+       if (*p == NULL) {
+               int rc = f(p);
+               if (rc < 0) {
+                       errno = -rc;
+                       *p = NULL;
+               }
+       }
+       return *p;
+}
+
+static struct sd_bus *sdbusopen(struct sd_bus **p, int (*f)(struct sd_bus **))
+{
+       if (*p == NULL) {
+               int rc = f(p);
+               if (rc < 0) {
+                       errno = -rc;
+                       *p = NULL;
+               } else {
+                       rc = sd_bus_attach_event(*p, afb_common_get_event_loop(), 0);
+                       if (rc < 0) {
+                               sd_bus_unref(*p);
+                               errno = -rc;
+                               *p = NULL;
+                       }
+               }
+       }
+       return *p;
+}
+
+struct sd_event *afb_common_get_event_loop()
+{
+       static sd_event *result = NULL;
+       return sdopen((void*)&result, (void*)sd_event_new);
+}
+
+struct sd_bus *afb_common_get_user_bus()
+{
+       static struct sd_bus *result = NULL;
+       return sdbusopen((void*)&result, (void*)sd_bus_open_user);
+}
+
+struct sd_bus *afb_common_get_system_bus()
+{
+       static struct sd_bus *result = NULL;
+       return sdbusopen((void*)&result, (void*)sd_bus_open_system);
+}
+
+
+
similarity index 52%
rename from src/utils-upoll.h
rename to src/afb-common.h
index 56692d3..fe17bc9 100644 (file)
@@ -1,6 +1,6 @@
-/*
- * Copyright 2016 IoT.bzh
- * Author: José Bollo <jose.bollo@iot.bzh>
+/* 
+ * Copyright (C) 2015 "IoT.bzh"
+ * Author José Bollo <jose.bollo@iot.bzh>
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 #pragma once
 
-struct upoll;
-
-extern int upoll_is_valid(struct upoll *upoll);
-
-extern struct upoll *upoll_open(int fd, void *closure);
-
-extern int upoll_on_readable(struct upoll *upoll, void (*process)(void *closure));
-extern int upoll_on_writable(struct upoll *upoll, void (*process)(void *closure));
-
-extern void upoll_on_hangup(struct upoll *upoll, void (*process)(void *closure));
-
-extern void upoll_close(struct upoll *upoll);
-
-extern int upoll_wait(int timeout);
-
+struct sd_event;
+struct sd_bus;
 
+extern struct sd_event *afb_common_get_event_loop();
+extern struct sd_bus *afb_common_get_user_bus();
+extern struct sd_bus *afb_common_get_system_bus();
 
index a833825..f514c97 100644 (file)
 
 #define _GNU_SOURCE
 
+#include <stdint.h>
 #include <stdio.h>
 #include <string.h>
 #include <assert.h>
 #include <poll.h>
 #include <fcntl.h>
+#include <errno.h>
 #include <sys/stat.h>
 
 #include <microhttpd.h>
+#include <systemd/sd-event.h>
 
 #include "afb-method.h"
 #include "afb-hreq.h"
 #include "afb-hsrv.h"
 #include "afb-req-itf.h"
 #include "verbose.h"
-#include "utils-upoll.h"
+
+#include "afb-common.h"
+
 
 
 #define JSON_CONTENT  "application/json"
@@ -58,7 +63,7 @@ struct afb_hsrv {
        unsigned refcount;
        struct hsrv_handler *handlers;
        struct MHD_Daemon *httpd;
-       struct upoll *upoll;
+       sd_event_source *evsrc;
        int in_run;
        char *cache_to;
 };
@@ -231,15 +236,21 @@ void run_micro_httpd(struct afb_hsrv *hsrv)
        if (hsrv->in_run != 0)
                hsrv->in_run = 2;
        else {
-               upoll_on_readable(hsrv->upoll, NULL);
+               sd_event_source_set_io_events(hsrv->evsrc, 0);
                do {
                        hsrv->in_run = 1;
                        MHD_run(hsrv->httpd);
                } while(hsrv->in_run == 2);
                hsrv->in_run = 0;
-               upoll_on_readable(hsrv->upoll, (void*)run_micro_httpd);
+               sd_event_source_set_io_events(hsrv->evsrc, EPOLLIN);
        }
-};
+}
+
+static int io_event_callback(sd_event_source *src, int fd, uint32_t revents, void *hsrv)
+{
+       run_micro_httpd(hsrv);
+       return 0;
+}
 
 static int new_client_handler(void *cls, const struct sockaddr *addr, socklen_t addrlen)
 {
@@ -360,7 +371,8 @@ int afb_hsrv_set_cache_timeout(struct afb_hsrv *hsrv, int duration)
 
 int afb_hsrv_start(struct afb_hsrv *hsrv, uint16_t port, unsigned int connection_timeout)
 {
-       struct upoll *upoll;
+       sd_event_source *evsrc;
+       int rc;
        struct MHD_Daemon *httpd;
        const union MHD_DaemonInfo *info;
 
@@ -385,24 +397,25 @@ int afb_hsrv_start(struct afb_hsrv *hsrv, uint16_t port, unsigned int connection
                return 0;
        }
 
-       upoll = upoll_open(info->listen_fd, hsrv);
-       if (upoll == NULL) {
+       rc = sd_event_add_io(afb_common_get_event_loop(), &evsrc, info->listen_fd, EPOLLIN, io_event_callback, hsrv);
+       if (rc < 0) {
                MHD_stop_daemon(httpd);
-               fprintf(stderr, "Error: connection to upoll of httpd failed");
+               errno = -rc;
+               fprintf(stderr, "Error: connection to events for httpd failed");
                return 0;
        }
-       upoll_on_readable(upoll, (void*)run_micro_httpd);
 
        hsrv->httpd = httpd;
-       hsrv->upoll = upoll;
+       hsrv->evsrc = evsrc;
        return 1;
 }
 
 void afb_hsrv_stop(struct afb_hsrv *hsrv)
 {
-       if (hsrv->upoll)
-               upoll_close(hsrv->upoll);
-       hsrv->upoll = NULL;
+       if (hsrv->evsrc != NULL) {
+               sd_event_source_unref(hsrv->evsrc);
+               hsrv->evsrc = NULL;
+       }
        if (hsrv->httpd != NULL)
                MHD_stop_daemon(hsrv->httpd);
        hsrv->httpd = NULL;
index b4ad57a..9f5619f 100644 (file)
@@ -204,8 +204,11 @@ int afb_websock_check_upgrade(struct afb_hreq *hreq)
 
        ws = NULL;
        rc = check_websocket_upgrade(hreq->connection, protodefs, afb_hreq_context(hreq), &ws);
-       if (rc && ws != NULL)
-               hreq->upgrade = 1;
+       if (rc == 1) {
+               hreq->replied = 1;
+               if (ws != NULL)
+                       hreq->upgrade = 1;
+       }
        return rc;
 }
 
index da248c8..d0cfc8a 100644 (file)
 #include <sys/uio.h>
 #include <string.h>
 
+#include <systemd/sd-event.h>
+
 #include "websock.h"
 #include "afb-ws.h"
 
-#include "utils-upoll.h"
+#include "afb-common.h"
 
 /*
  * declaration of the websock interface for afb-ws
@@ -85,7 +87,7 @@ struct afb_ws
        const struct afb_ws_itf *itf; /* the callback interface */
        void *closure;          /* closure when calling the callbacks */
        struct websock *ws;     /* the websock handler */
-       struct upoll *up;       /* the upoll handler for the socket */
+       sd_event_source *evsrc; /* the event source for the socket */
        struct buf buffer;      /* the last read fragment */
 };
 
@@ -109,8 +111,8 @@ static void aws_disconnect(struct afb_ws *ws, int call_on_hangup)
        struct websock *wsi = ws->ws;
        if (wsi != NULL) {
                ws->ws = NULL;
-               upoll_close(ws->up);
-               ws->up = NULL;
+               sd_event_source_unref(ws->evsrc);
+               ws->evsrc = NULL;
                websock_destroy(wsi);
                free(aws_pick_buffer(ws).buffer);
                ws->state = waiting;
@@ -119,6 +121,15 @@ static void aws_disconnect(struct afb_ws *ws, int call_on_hangup)
        }
 }
 
+static int io_event_callback(sd_event_source *src, int fd, uint32_t revents, void *ws)
+{
+       if ((revents & EPOLLIN) != 0)
+               aws_on_readable(ws);
+       if ((revents & EPOLLHUP) != 0)
+               afb_ws_hangup(ws);
+       return 0;
+}
+
 /*
  * Creates the afb_ws structure for the file descritor
  * 'fd' and the callbacks described by the interface 'itf'
@@ -128,6 +139,7 @@ static void aws_disconnect(struct afb_ws *ws, int call_on_hangup)
  */
 struct afb_ws *afb_ws_create(int fd, const struct afb_ws_itf *itf, void *closure)
 {
+       int rc;
        struct afb_ws *result;
 
        assert(fd >= 0);
@@ -150,15 +162,12 @@ struct afb_ws *afb_ws_create(int fd, const struct afb_ws_itf *itf, void *closure
        if (result->ws == NULL)
                goto error2;
 
-       /* creates the upoll */
-       result->up = upoll_open(result->fd, result);
-       if (result->up == NULL)
+       /* creates the evsrc */
+       rc = sd_event_add_io(afb_common_get_event_loop(), &result->evsrc, result->fd, EPOLLIN, io_event_callback, result);
+       if (rc < 0) {
+               errno = -rc;
                goto error3;
-
-       /* init the upoll */
-       upoll_on_readable(result->up, (void*)aws_on_readable);
-       upoll_on_hangup(result->up, (void*)afb_ws_hangup);
-
+       }
        return result;
 
 error3:
index 8ec684f..fcedd2a 100644 (file)
@@ -32,6 +32,8 @@
 #include <signal.h>
 #include <syslog.h>
 
+#include <systemd/sd-event.h>
+
 #include "afb-config.h"
 #include "afb-hswitch.h"
 #include "afb-apis.h"
@@ -40,7 +42,7 @@
 #include "afb-hreq.h"
 #include "session.h"
 #include "verbose.h"
-#include "utils-upoll.h"
+#include "afb-common.h"
 
 #include "afb-plugin.h"
 
@@ -559,6 +561,7 @@ static struct afb_hsrv *start_http_server(struct afb_config * config)
 int main(int argc, char *argv[])  {
   struct afb_hsrv *hsrv;
   struct afb_config *config;
+  struct sd_event *eventloop;
 
   // open syslog if ever needed
   openlog("afb-daemon", 0, LOG_DAEMON);
@@ -623,8 +626,9 @@ int main(int argc, char *argv[])  {
   }
 
    // infinite loop
+  eventloop = afb_common_get_event_loop();
   for(;;)
-    upoll_wait(30000); 
+    sd_event_run(eventloop, 30000000);
 
    if (verbosity)
        fprintf (stderr, "hoops returned from infinite loop [report bug]\n");
diff --git a/src/utils-upoll.c b/src/utils-upoll.c
deleted file mode 100644 (file)
index 03f9a08..0000000
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- * Copyright 2016 IoT.bzh
- * Author: José Bollo <jose.bollo@iot.bzh>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <sys/epoll.h>
-#include <pthread.h>
-#include <stdlib.h>
-#include <errno.h>
-#include <unistd.h>
-#include <assert.h>
-
-#include "utils-upoll.h"
-
-struct upollfd;
-
-/*
- * Structure describing one opened client
- */
-struct upoll
-{
-       struct upollfd *fd;     /* structure handling the file descriptor */
-       void (*read)(void *);   /* callback for handling on_readable */
-       void (*write)(void *);  /* callback for handling on_writable */
-       void (*hangup)(void *); /* callback for handling on_hangup */
-       void *closure;          /* closure for callbacks */
-       struct upoll *next;     /* next client of the same file descriptor */
-};
-
-/*
- * Structure describing a watched file descriptor
- */
-struct upollfd
-{
-       int fd;                 /* watch file descriptor */
-       uint32_t events;        /* watched events */
-       struct upollfd *next;   /* next watched file descriptor */
-       struct upoll *head;     /* first client watching the file descriptor */
-};
-
-
-/*
- * Structure describing a upoll group
-struct upollgrp
-{
-       int pollfd;
-       struct upollfd *head;
-       struct upoll *current;
-       pthread_mutex_t mutex;
-};
-
-
-static struct upollgrp global = {
-       .pollfd = 0,
-       .head = NULL,
-       .current = NULL,
-       .mutex = PTHREAD_MUTEX_INITIALIZER
-};
- */
-
-static int pollfd = 0;
-static struct upollfd *head = NULL;
-static struct upoll *current = NULL;
-static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
-
-/*
- * Compute the events for the set of clients
- */
-static int update_flags_locked(struct upollfd *ufd)
-{
-       int rc;
-       struct upoll *u;
-       struct epoll_event e;
-       uint32_t events;
-
-       /* compute expected events */
-       events = 0;
-       u = ufd->head;
-       while (u != NULL) {
-               if (u->read != NULL)
-                       events |= EPOLLIN;
-               if (u->write != NULL)
-                       events |= EPOLLOUT;
-               u = u->next;
-       }
-       if (ufd->events == events)
-               rc = 0;
-       else {
-               e.events = events;
-               e.data.ptr = ufd;
-               rc = epoll_ctl(pollfd, EPOLL_CTL_MOD, ufd->fd, &e);
-               if (rc == 0)
-                       ufd->events = events;
-       }
-       pthread_mutex_unlock(&mutex);
-       return rc;
-}
-
-/*
- * Compute the events for the set of clients
- */
-static int update_flags(struct upollfd *ufd)
-{
-       pthread_mutex_lock(&mutex);
-       return update_flags_locked(ufd);
-}
-
-/*
- *
- */
-static int update(struct upollfd *ufd)
-{
-       struct upollfd **prv;
-
-       pthread_mutex_lock(&mutex);
-       if (ufd->head != NULL)
-               return update_flags_locked(ufd);
-
-       /* no more watchers */
-       prv = &head;
-       while(*prv) {
-               if (*prv == ufd) {
-                       *prv = ufd->next;
-                       break;
-               }
-               prv = &(*prv)->next;
-       }
-       pthread_mutex_unlock(&mutex);
-       epoll_ctl(pollfd, EPOLL_CTL_DEL, ufd->fd, NULL);
-       free(ufd);
-       return 0;
-}
-
-static struct upollfd *get_fd(int fd)
-{
-       struct epoll_event e;
-       struct upollfd *result;
-       int rc;
-
-       /* opens the epoll stream */
-       if (pollfd == 0) {
-               pollfd = epoll_create1(EPOLL_CLOEXEC);
-               if (pollfd == 0) {
-                       pollfd = dup(0);
-                       close(0);
-               }
-               if (pollfd < 0) {
-                       pollfd = 0;
-                       return NULL;
-               }
-       }
-
-       /* search */
-       result = head;
-       while (result != NULL) {
-               if (result->fd == fd)
-                       return result;
-               result = result->next;
-       }
-
-       /* allocates */
-       result = calloc(1, sizeof *result);
-       if (result == NULL)
-               return NULL;
-
-       /* init */
-       result->fd = fd;
-       pthread_mutex_lock(&mutex);
-       result->next = head;
-       head = result;
-       pthread_mutex_unlock(&mutex);
-
-       /* records */
-       e.events = 0;
-       e.data.ptr = result;
-       rc = epoll_ctl(pollfd, EPOLL_CTL_ADD, fd, &e);
-       if (rc == 0)
-               return result;
-
-       /* revert on error */
-       rc = errno;
-       update(result);
-       errno = rc;
-       return NULL;
-}
-
-int upoll_is_valid(struct upoll *upoll)
-{
-       struct upollfd *itfd = head;
-       struct upoll *it;
-       while (itfd != NULL) {
-               it = itfd->head;
-               while (it != NULL) {
-                       if (it == upoll)
-                               return 1;
-                       it = it->next;
-               }
-               itfd = itfd->next;
-       }
-       return 0;
-}
-
-struct upoll *upoll_open(int fd, void *closure)
-{
-       struct upollfd *ufd;
-       struct upoll *result;
-
-       /* allocates */
-       result = calloc(1, sizeof *result);
-       if (result == NULL)
-               return NULL;
-
-       /* get for fd */
-       ufd = get_fd(fd);
-       if (ufd == NULL) {
-               free(result);
-               return NULL;
-       }
-
-       /* init */
-       result->fd = ufd;
-       result->closure = closure;
-       pthread_mutex_lock(&mutex);
-       result->next = ufd->head;
-       ufd->head = result;
-       pthread_mutex_unlock(&mutex);
-       return result;
-}
-
-int upoll_on_readable(struct upoll *upoll, void (*process)(void *))
-{
-       assert(pollfd != 0);
-       assert(upoll_is_valid(upoll));
-
-       upoll->read = process;
-       return update_flags(upoll->fd);
-}
-
-int upoll_on_writable(struct upoll *upoll, void (*process)(void *))
-{
-       assert(pollfd != 0);
-       assert(upoll_is_valid(upoll));
-
-       upoll->write = process;
-       return update_flags(upoll->fd);
-}
-
-void upoll_on_hangup(struct upoll *upoll, void (*process)(void *))
-{
-       assert(pollfd != 0);
-       assert(upoll_is_valid(upoll));
-
-       upoll->hangup = process;
-}
-
-void upoll_close(struct upoll *upoll)
-{
-       struct upoll **it;
-       struct upollfd *ufd;
-
-       assert(pollfd != 0);
-       assert(upoll_is_valid(upoll));
-
-       ufd = upoll->fd;
-       pthread_mutex_lock(&mutex);
-       if (current == upoll)
-               current = NULL;
-       it = &ufd->head;
-       while (*it != upoll)
-               it = &(*it)->next;
-       *it = upoll->next;
-       pthread_mutex_unlock(&mutex);
-       free(upoll);
-       update(ufd);
-}
-
-int upoll_wait(int timeout)
-{
-       int rc;
-       struct epoll_event e;
-       struct upollfd *ufd;
-
-       if (pollfd == 0) {
-               errno = ECANCELED;
-               return -1;
-       }
-
-       do {
-               rc = epoll_wait(pollfd, &e, 1, timeout);
-       } while (rc < 0 && errno == EINTR);
-       if (rc == 1) {
-               ufd = e.data.ptr;
-               current = ufd->head;
-               e.events &= EPOLLIN | EPOLLOUT | EPOLLHUP;
-               while (current != NULL && e.events != 0) {
-                       if ((e.events & EPOLLIN) && current->read) {
-                               current->read(current->closure);
-                               e.events &= (uint32_t)~EPOLLIN;
-                               continue;
-                       }
-                       if ((e.events & EPOLLOUT) && current->write) {
-                               current->write(current->closure);
-                               e.events &= (uint32_t)~EPOLLOUT;
-                               continue;
-                       }
-                       if ((e.events & EPOLLHUP) && current->hangup) {
-                               current->hangup(current->closure);
-                               if (current == NULL)
-                                       break;
-                       }
-                       current = current->next;
-               }
-       }
-       return rc < 0 ? rc : 0;
-}
-