Merge "Fix build for native package"
authorJosé Bollo <jobol@nonadev.net>
Mon, 20 Nov 2017 14:00:08 +0000 (14:00 +0000)
committerGerrit Code Review <gerrit@automotivelinux.org>
Mon, 20 Nov 2017 14:00:08 +0000 (14:00 +0000)
28 files changed:
CMakeLists.txt
bindings/CMakeLists.txt
bindings/samples/ave.c
include/afb/afb-daemon-itf.h
include/afb/afb-daemon-v1.h
include/afb/afb-daemon-v2.h
include/afb/afb-dynapi-itf.h
include/afb/afb-dynapi.h
src/CMakeLists.txt
src/afb-api-dyn.c
src/afb-api-dyn.h
src/afb-context.c
src/afb-evt.h
src/afb-export.c
src/afb-proto-ws.c
src/afb-session.c
src/afb-session.h
src/afb-stub-ws.c
src/afb-trace.c
src/jobs-fake.c [new file with mode: 0644]
src/jobs.c
src/main.c
src/verbose.c
test/AFB.js
test/monitoring/AFB.js
test/monitoring/monitor-pastel.css
test/monitoring/monitor.html
test/monitoring/monitor.js

index 163a74b..568eb42 100644 (file)
@@ -70,19 +70,14 @@ CHECK_LIBRARY_EXISTS(magic magic_load "" HAVE_LIBMAGIC_SO)
 IF(HAVE_MAGIC_H)
   IF(HAVE_LIBMAGIC_SO)
     SET(HAVE_LIBMAGIC "1")
+    SET(LIBMAGIC_LDFLAGS -lmagic)
   ENDIF(HAVE_LIBMAGIC_SO)
 ENDIF(HAVE_MAGIC_H)
 
-IF(NOT HAVE_LIBMAGIC)
-  MESSAGE(FATAL_ERROR "\"magic.h\" or \"libmagic.so\" missing.
-    Please install the \"file-devel\" or \"libmagic-dev\" package !")
-ENDIF(NOT HAVE_LIBMAGIC)
-ADD_DEFINITIONS(-DUSE_MAGIC_MIME_TYPE)
-
-PKG_CHECK_MODULES(libsystemd REQUIRED libsystemd>=222)
-PKG_CHECK_MODULES(libmicrohttpd REQUIRED libmicrohttpd>=0.9.55)
-PKG_CHECK_MODULES(openssl REQUIRED openssl)
-PKG_CHECK_MODULES(uuid REQUIRED uuid)
+PKG_CHECK_MODULES(libsystemd libsystemd>=222)
+PKG_CHECK_MODULES(libmicrohttpd libmicrohttpd>=0.9.55)
+PKG_CHECK_MODULES(openssl openssl)
+PKG_CHECK_MODULES(uuid uuid)
 PKG_CHECK_MODULES(cynara cynara-client)
 
 IF(AGL_DEVEL)
@@ -93,6 +88,32 @@ IF(cynara_FOUND)
        ADD_DEFINITIONS(-DBACKEND_PERMISSION_IS_CYNARA)
 ENDIF(cynara_FOUND)
 
+IF(HAVE_LIBMAGIC AND libsystemd_FOUND AND libmicrohttpd_FOUND AND openssl_FOUND AND uuid_FOUND)
+  SET(WITH_BINDER TRUE)
+  ADD_DEFINITIONS(-DUSE_MAGIC_MIME_TYPE)
+ELSE()
+  IF(NOT HAVE_LIBMAGIC)
+    MESSAGE(WARNING "\"magic.h\" or \"libmagic.so\" missing.
+    Please install the \"file-devel\" or \"libmagic-dev\" package !")
+  ENDIF(NOT HAVE_LIBMAGIC)
+  IF(NOT libsystemd_FOUND)
+    MESSAGE(WARNING "Dependency to 'libsystemd' is missing")
+  ENDIF()
+  IF(NOT libmicrohttpd_FOUND)
+    MESSAGE(WARNING "Dependency to 'libmicrohttpd' is missing")
+  ENDIF()
+  IF(NOT openssl_FOUND)
+    MESSAGE(WARNING "Dependency to 'openssl' is missing")
+  ENDIF()
+  IF(NOT uuid_FOUND)
+    MESSAGE(WARNING "Dependency to 'uuid' is missing")
+  ENDIF()
+  IF(NOT ALLOW_NO_BINDER)
+    MESSAGE(FATAL_ERROR "Can't compile the binder, either define ALLOW_NO_BINDER or install dependencies")
+  ENDIF()
+  SET(WITH_BINDER FALSE)
+ENDIF()
+
 ADD_DEFINITIONS(-DAFB_VERSION="${PROJECT_VERSION}")
 
 INCLUDE_DIRECTORIES(
@@ -114,7 +135,7 @@ SET(link_libraries
        ${uuid_LDFLAGS}
        ${openssl_LDFLAGS}
        ${cynara_LDFLAGS}
-       -lmagic
+       ${LIBMAGIC_LDFLAGS}
        -ldl
        -lrt
        )
@@ -123,7 +144,7 @@ SET(binding_install_dir ${CMAKE_INSTALL_FULL_LIBDIR}/afb)
 
 ###########################################################################
 # activates the monitoring by default
-if(INCLUDE_MONITORING)
+if(INCLUDE_MONITORING AND WITH_BINDER)
        add_definitions(-DWITH_MONITORING_OPTION)
        INSTALL(DIRECTORY
                ${CMAKE_CURRENT_SOURCE_DIR}/test/monitoring
index 1a1e901..509c6cb 100644 (file)
@@ -16,6 +16,9 @@
 # limitations under the License.
 ###########################################################################
 
+IF(WITH_BINDER)
 ADD_SUBDIRECTORY(intrinsics)
 ADD_SUBDIRECTORY(samples)
 ADD_SUBDIRECTORY(tutorial)
+ENDIF(WITH_BINDER)
+
index a4b4144..569245e 100644 (file)
@@ -483,7 +483,7 @@ int afbBindingVdyn(afb_dynapi *dynapi)
        int i, rc;
 
        for (i = 0; apis[i] ; i++) {
-               rc = afb_dynapi_new_api(dynapi, apis[i], NULL, build_api, (void*)apis[i]);
+               rc = afb_dynapi_new_api(dynapi, apis[i], NULL, 0, build_api, (void*)apis[i]);
                if (rc < 0)
                        AFB_DYNAPI_ERROR(dynapi, "can't create API %s", apis[i]);
        }
index b78f9af..492032e 100644 (file)
@@ -44,7 +44,7 @@ struct afb_daemon_itf
        struct afb_req (*unstore_req)(void*closure, struct afb_stored_req *sreq);
        int (*require_api)(void*closure, const char *name, int initialized);
        int (*rename_api)(void*closure, const char *name);
-       int (*new_api)(void *closure, const char *api, const char *info, int (*preinit)(void*, struct afb_dynapi *), void *preinit_closure);
+       int (*new_api)(void *closure, const char *api, const char *info, int noconcurrency, int (*preinit)(void*, struct afb_dynapi *), void *preinit_closure);
 };
 
 /*
index d1a0cc2..d199a48 100644 (file)
@@ -195,8 +195,9 @@ static inline int afb_daemon_new_api_v1(
        struct afb_daemon daemon,
        const char *api,
        const char *info,
+       int noconcurrency,
        int (*preinit)(void*, struct afb_dynapi *),
        void *closure)
 {
-       return daemon.itf->new_api(daemon.closure, api, info, preinit, closure);
+       return daemon.itf->new_api(daemon.closure, api, info, noconcurrency, preinit, closure);
 }
index 1ea40e9..6eb48c6 100644 (file)
@@ -171,9 +171,10 @@ static inline int afb_daemon_rename_api_v2(const char *name)
 static inline int afb_daemon_new_api_v2(
        const char *api,
        const char *info,
+       int noconcurrency,
        int (*preinit)(void*, struct afb_dynapi *),
        void *closure)
 {
-       return afb_get_daemon_v2().itf->new_api(afb_get_daemon_v2().closure, api, info, preinit, closure);
+       return afb_get_daemon_v2().itf->new_api(afb_get_daemon_v2().closure, api, info, noconcurrency, preinit, closure);
 }
 
index 682558e..fc90dbd 100644 (file)
@@ -131,6 +131,7 @@ struct afb_dynapi_itf
                void *dynapi,
                const char *api,
                const char *info,
+               int noconcurrency,
                int (*preinit)(void*, struct afb_dynapi *),
                void *closure);
 
index dfdcdb2..e245895 100644 (file)
@@ -242,10 +242,11 @@ static inline int afb_dynapi_new_api(
        struct afb_dynapi *dynapi,
        const char *api,
        const char *info,
+       int noconcurrency,
        int (*preinit)(void*, struct afb_dynapi *),
        void *closure)
 {
-       return dynapi->itf->api_new_api(dynapi, api, info, preinit, closure);
+       return dynapi->itf->api_new_api(dynapi, api, info, noconcurrency, preinit, closure);
 }
 
 static inline int afb_dynapi_set_verbs_v2(
index a7d1ff9..b8accc7 100644 (file)
@@ -25,6 +25,10 @@ endif(ALLOW_NO_BINDER)
 INCLUDE(FindPkgConfig)
 
 ADD_SUBDIRECTORY(genskel)
+
+IF(WITH_BINDER)
+###########################################
+
 ADD_SUBDIRECTORY(tests)
 
 ADD_DEFINITIONS(-DBINDING_INSTALL_DIR="${binding_install_dir}")
@@ -91,13 +95,14 @@ INSTALL(TARGETS afb-daemon
 ###########################################
 # build and install libafbwsc
 ###########################################
-ADD_LIBRARY(afbwsc SHARED afb-ws.c afb-ws-client.c afb-wsj1.c websock.c afb-proto-ws.c)
+ADD_LIBRARY(afbwsc SHARED afb-ws.c afb-ws-client.c afb-wsj1.c websock.c afb-proto-ws.c jobs-fake.c)
 SET_TARGET_PROPERTIES(afbwsc PROPERTIES
        VERSION ${LIBAFBWSC_VERSION}
        SOVERSION ${LIBAFBWSC_SOVERSION})
 TARGET_LINK_LIBRARIES(afbwsc
        ${libsystemd_LDFLAGS}
        ${json-c_LDFLAGS}
+       -lpthread
        -Wl,--version-script=${CMAKE_CURRENT_SOURCE_DIR}/export-afbwsc.map
        -Wl,--as-needed
        -Wl,--gc-sections
@@ -117,4 +122,7 @@ TARGET_LINK_LIBRARIES(afb-client-demo
 INSTALL(TARGETS afb-client-demo
         RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR})
 
-
+###########################################
+ELSE(WITH_BINDER)
+  MESSAGE(WARNING "NOT compiling the binder! but tools are compiled")
+ENDIF(WITH_BINDER)
index 0667f51..2ef1b1a 100644 (file)
@@ -235,7 +235,7 @@ static struct afb_api_itf dyn_api_itf = {
        .describe = describe_cb
 };
 
-int afb_api_dyn_add(struct afb_apiset *apiset, const char *name, const char *info, int (*preinit)(void*, struct afb_dynapi*), void *closure)
+int afb_api_dyn_add(struct afb_apiset *apiset, const char *name, const char *info, int noconcurrency, int (*preinit)(void*, struct afb_dynapi*), void *closure)
 {
        int rc;
        struct afb_api_dyn *dynapi;
@@ -266,7 +266,7 @@ int afb_api_dyn_add(struct afb_apiset *apiset, const char *name, const char *inf
        /* records the binding */
        afb_api.closure = dynapi;
        afb_api.itf = &dyn_api_itf;
-       afb_api.group = NULL;
+       afb_api.group = noconcurrency ? dynapi : NULL;
        if (afb_apiset_add(apiset, afb_export_apiname(dynapi->export), afb_api) < 0) {
                ERROR("dynamic api %s can't be registered to set %s, ABORTING it!",
                                afb_export_apiname(dynapi->export),
index 35464c6..21626ee 100644 (file)
@@ -40,6 +40,7 @@ extern int afb_api_dyn_add(
                struct afb_apiset *apiset,
                const char *name,
                const char *info,
+               int noconcurrency,
                int (*preinit)(void*, struct afb_dynapi*),
                void *closure);
 
index 759ee90..c2649a4 100644 (file)
@@ -63,7 +63,7 @@ int afb_context_connect(struct afb_context *context, const char *uuid, const cha
        int created;
        struct afb_session *session;
 
-       session = afb_session_get (uuid, &created);
+       session = afb_session_get (uuid, AFB_SESSION_TIMEOUT_DEFAULT, &created);
        if (session == NULL)
                return -1;
        init_context(context, session, token);
index 2d888fa..901bfbb 100644 (file)
@@ -18,6 +18,7 @@
 #pragma once
 
 struct afb_event;
+struct afb_eventid;
 struct afb_evtid;
 struct afb_session;
 struct json_object;
index 19aab0c..304395a 100644 (file)
@@ -179,7 +179,7 @@ static struct afb_eventid *eventid_make_cb(void *closure, const char *name)
 static struct afb_event event_make_cb(void *closure, const char *name)
 {
        struct afb_eventid *eventid = eventid_make_cb(closure, name);
-       return (struct afb_event){ .itf = eventid ? eventid->itf : NULL, .closure = eventid };
+       return afb_evt_event_from_evtid(afb_evt_eventid_to_evtid(eventid));
 }
 
 static int event_broadcast_cb(void *closure, const char *name, struct json_object *object)
@@ -255,11 +255,12 @@ static int api_new_api_cb(
                void *closure,
                const char *api,
                const char *info,
+               int noconcurrency,
                int (*preinit)(void*, struct afb_dynapi *),
                void *preinit_closure)
 {
        struct afb_export *export = closure;
-       return afb_api_dyn_add(export->apiset, api, info, preinit, preinit_closure);
+       return afb_api_dyn_add(export->apiset, api, info, noconcurrency, preinit, preinit_closure);
 }
 
 /**********************************************
@@ -376,11 +377,12 @@ static int hooked_api_new_api_cb(
                void *closure,
                const char *api,
                const char *info,
+               int noconcurrency,
                int (*preinit)(void*, struct afb_dynapi *),
                void *preinit_closure)
 {
        /* TODO */
-       return api_new_api_cb(closure, api, info, preinit, preinit_closure);
+       return api_new_api_cb(closure, api, info, noconcurrency, preinit, preinit_closure);
 }
 /**********************************************
 * vectors
@@ -1070,7 +1072,7 @@ static struct afb_export *create(struct afb_apiset *apiset, const char *apiname,
 
        /* session shared with other exports */
        if (common_session == NULL) {
-               common_session = afb_session_create (NULL, 0);
+               common_session = afb_session_create (0);
                if (common_session == NULL)
                        return NULL;
        }
@@ -1179,7 +1181,7 @@ struct afb_binding_interface_v1 *afb_export_get_interface_v1(struct afb_export *
 int afb_export_unshare_session(struct afb_export *export)
 {
        if (export->session == common_session) {
-               export->session = afb_session_create (NULL, 0);
+               export->session = afb_session_create (0);
                if (export->session)
                        afb_session_unref(common_session);
                else {
index 3c8922c..ce7d75d 100644 (file)
@@ -37,6 +37,7 @@
 #include "afb-ws.h"
 #include "afb-msg-json.h"
 #include "afb-proto-ws.h"
+#include "jobs.h"
 
 struct afb_proto_ws;
 
@@ -190,6 +191,27 @@ struct afb_proto_ws
        void (*on_hangup)(void *closure);
 };
 
+/******************* streaming objects **********************************/
+
+#define WRITEBUF_COUNT_MAX  32
+struct writebuf
+{
+       struct iovec iovec[WRITEBUF_COUNT_MAX];
+       uint32_t uints[WRITEBUF_COUNT_MAX];
+       int count;
+};
+
+struct readbuf
+{
+       char *base, *head, *end;
+};
+
+struct binary
+{
+       struct afb_proto_ws *protows;
+       struct readbuf rb;
+};
+
 /******************* common useful tools **********************************/
 
 /**
@@ -204,19 +226,6 @@ static inline uint32_t ptr2id(void *ptr)
 
 /******************* serialisation part **********************************/
 
-struct readbuf
-{
-       char *base, *head, *end;
-};
-
-#define WRITEBUF_COUNT_MAX  32
-struct writebuf
-{
-       struct iovec iovec[WRITEBUF_COUNT_MAX];
-       uint32_t uints[WRITEBUF_COUNT_MAX];
-       int count;
-};
-
 static char *readbuf_get(struct readbuf *rb, uint32_t length)
 {
        char *before = rb->head;
@@ -343,12 +352,15 @@ int afb_proto_ws_call_success(struct afb_proto_ws_call *call, struct json_object
 {
        int rc = -1;
        struct writebuf wb = { .count = 0 };
+       struct afb_proto_ws *protows = call->protows;
 
        if (writebuf_char(&wb, CHAR_FOR_ANSWER_SUCCESS)
         && writebuf_uint32(&wb, call->callid)
         && writebuf_string(&wb, info ?: "")
         && writebuf_object(&wb, obj)) {
-               rc = afb_ws_binary_v(call->protows->ws, wb.iovec, wb.count);
+               pthread_mutex_lock(&protows->mutex);
+               rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+               pthread_mutex_unlock(&protows->mutex);
                if (rc >= 0) {
                        rc = 0;
                        goto success;
@@ -362,12 +374,15 @@ int afb_proto_ws_call_fail(struct afb_proto_ws_call *call, const char *status, c
 {
        int rc = -1;
        struct writebuf wb = { .count = 0 };
+       struct afb_proto_ws *protows = call->protows;
 
        if (writebuf_char(&wb, CHAR_FOR_ANSWER_FAIL)
         && writebuf_uint32(&wb, call->callid)
         && writebuf_string(&wb, status)
         && writebuf_string(&wb, info ? : "")) {
-               rc = afb_ws_binary_v(call->protows->ws, wb.iovec, wb.count);
+               pthread_mutex_lock(&protows->mutex);
+               rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+               pthread_mutex_unlock(&protows->mutex);
                if (rc >= 0) {
                        rc = 0;
                        goto success;
@@ -391,7 +406,7 @@ int afb_proto_ws_call_subcall(struct afb_proto_ws_call *call, const char *api, c
                sc->callback = callback;
                sc->closure = cb_closure;
 
-               pthread_mutex_unlock(&protows->mutex);
+               pthread_mutex_lock(&protows->mutex);
                sc->subcallid = ptr2id(sc);
                do {
                        sc->subcallid++;
@@ -409,7 +424,9 @@ int afb_proto_ws_call_subcall(struct afb_proto_ws_call *call, const char *api, c
                 && writebuf_string(&wb, api)
                 && writebuf_string(&wb, verb)
                 && writebuf_object(&wb, args)) {
+                       pthread_mutex_lock(&protows->mutex);
                        rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+                       pthread_mutex_unlock(&protows->mutex);
                        if (rc >= 0) {
                                rc = 0;
                                goto success;
@@ -424,12 +441,15 @@ int afb_proto_ws_call_subscribe(struct afb_proto_ws_call *call, const char *even
 {
        int rc = -1;
        struct writebuf wb = { .count = 0 };
+       struct afb_proto_ws *protows = call->protows;
 
        if (writebuf_char(&wb, CHAR_FOR_EVT_SUBSCRIBE)
         && writebuf_uint32(&wb, call->callid)
         && writebuf_uint32(&wb, (uint32_t)event_id)
         && writebuf_string(&wb, event_name)) {
-               rc = afb_ws_binary_v(call->protows->ws, wb.iovec, wb.count);
+               pthread_mutex_lock(&protows->mutex);
+               rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+               pthread_mutex_unlock(&protows->mutex);
                if (rc >= 0) {
                        rc = 0;
                        goto success;
@@ -443,12 +463,15 @@ int afb_proto_ws_call_unsubscribe(struct afb_proto_ws_call *call, const char *ev
 {
        int rc = -1;
        struct writebuf wb = { .count = 0 };
+       struct afb_proto_ws *protows = call->protows;
 
        if (writebuf_char(&wb, CHAR_FOR_EVT_UNSUBSCRIBE)
         && writebuf_uint32(&wb, call->callid)
         && writebuf_uint32(&wb, (uint32_t)event_id)
         && writebuf_string(&wb, event_name)) {
-               rc = afb_ws_binary_v(call->protows->ws, wb.iovec, wb.count);
+               pthread_mutex_lock(&protows->mutex);
+               rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+               pthread_mutex_unlock(&protows->mutex);
                if (rc >= 0) {
                        rc = 0;
                        goto success;
@@ -461,7 +484,7 @@ success:
 /******************* client part **********************************/
 
 /* search a memorized call */
-static struct client_call *client_call_search(struct afb_proto_ws *protows, uint32_t callid)
+static struct client_call *client_call_search_locked(struct afb_proto_ws *protows, uint32_t callid)
 {
        struct client_call *call;
 
@@ -472,11 +495,23 @@ static struct client_call *client_call_search(struct afb_proto_ws *protows, uint
        return call;
 }
 
+static struct client_call *client_call_search_unlocked(struct afb_proto_ws *protows, uint32_t callid)
+{
+       struct client_call *result;
+
+       pthread_mutex_lock(&protows->mutex);
+       result = client_call_search_locked(protows, callid);
+       pthread_mutex_unlock(&protows->mutex);
+       return result;
+}
+
 /* free and release the memorizing call */
 static void client_call_destroy(struct client_call *call)
 {
        struct client_call **prv;
+       struct afb_proto_ws *protows = call->protows;
 
+       pthread_mutex_lock(&protows->mutex);
        prv = &call->protows->calls;
        while (*prv != NULL) {
                if (*prv == call) {
@@ -485,6 +520,7 @@ static void client_call_destroy(struct client_call *call)
                }
                prv = &(*prv)->next;
        }
+       pthread_mutex_unlock(&protows->mutex);
        free(call);
 }
 
@@ -505,7 +541,7 @@ static int client_msg_call_get(struct afb_proto_ws *protows, struct readbuf *rb,
        }
 
        /* get the call */
-       *call = client_call_search(protows, callid);
+       *call = client_call_search_unlocked(protows, callid);
        if (*call == NULL) {
                return 0;
        }
@@ -600,6 +636,7 @@ static void client_on_reply_fail(struct afb_proto_ws *protows, struct readbuf *r
 
        if (!client_msg_call_get(protows, rb, &call))
                return;
+       
 
        if (readbuf_string(rb, &status, NULL) && readbuf_string(rb, &info, NULL)) {
                protows->client_itf->on_reply_fail(protows->closure, call->request, status, info);
@@ -614,12 +651,19 @@ static int client_send_subcall_reply(struct afb_proto_ws *protows, uint32_t subc
 {
        struct writebuf wb = { .count = 0 };
        char ie = status < 0;
+       int rc;
 
-       return -!(writebuf_char(&wb, CHAR_FOR_SUBCALL_REPLY)
+       if (writebuf_char(&wb, CHAR_FOR_SUBCALL_REPLY)
         && writebuf_uint32(&wb, subcallid)
         && writebuf_char(&wb, ie)
-        && writebuf_object(&wb, object)
-        && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0);
+        && writebuf_object(&wb, object)) {
+               pthread_mutex_lock(&protows->mutex);
+               rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+               pthread_mutex_unlock(&protows->mutex);
+               if (rc >= 0)
+                       return 0;
+       }
+       return -1;
 }
 
 /* callback for subcall reply */
@@ -682,11 +726,15 @@ static void client_on_description(struct afb_proto_ws *protows, struct readbuf *
        struct json_object *object;
 
        if (readbuf_uint32(rb, &descid)) {
+               pthread_mutex_lock(&protows->mutex);
                prv = &protows->describes;
                while ((desc = *prv) && desc->descid != descid)
                        prv = &desc->next;
-               if (desc) {
+               if (!desc)
+                       pthread_mutex_unlock(&protows->mutex);
+               else {
                        *prv = desc->next;
+                       pthread_mutex_unlock(&protows->mutex);
                        if (!readbuf_object(rb, &object))
                                object = NULL;
                        desc->callback(desc->closure, object);
@@ -696,56 +744,73 @@ static void client_on_description(struct afb_proto_ws *protows, struct readbuf *
 }
 
 /* callback when receiving binary data */
-static void client_on_binary(void *closure, char *data, size_t size)
+static void client_on_binary_job(int sig, void *closure)
 {
-       struct afb_proto_ws *protows;
-       struct readbuf rb;
-
-       rb.base = data;
-       if (size > 0) {
-               rb.head = data;
-               rb.end = data + size;
-               protows = closure;
+       struct binary *binary = closure;
 
-               pthread_mutex_lock(&protows->mutex);
-               switch (*rb.head++) {
+       if (!sig) {
+               switch (*binary->rb.head++) {
                case CHAR_FOR_ANSWER_SUCCESS: /* success */
-                       client_on_reply_success(protows, &rb);
+                       client_on_reply_success(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_ANSWER_FAIL: /* fail */
-                       client_on_reply_fail(protows, &rb);
+                       client_on_reply_fail(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_EVT_BROADCAST: /* broadcast */
-                       client_on_event_broadcast(protows, &rb);
+                       client_on_event_broadcast(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_EVT_ADD: /* creates the event */
-                       client_on_event_create(protows, &rb);
+                       client_on_event_create(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_EVT_DEL: /* removes the event */
-                       client_on_event_remove(protows, &rb);
+                       client_on_event_remove(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_EVT_PUSH: /* pushs the event */
-                       client_on_event_push(protows, &rb);
+                       client_on_event_push(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_EVT_SUBSCRIBE: /* subscribe event for a request */
-                       client_on_event_subscribe(protows, &rb);
+                       client_on_event_subscribe(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_EVT_UNSUBSCRIBE: /* unsubscribe event for a request */
-                       client_on_event_unsubscribe(protows, &rb);
+                       client_on_event_unsubscribe(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_SUBCALL_CALL: /* subcall */
-                       client_on_subcall(protows, &rb);
+                       client_on_subcall(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_DESCRIPTION: /* description */
-                       client_on_description(protows, &rb);
+                       client_on_description(binary->protows, &binary->rb);
                        break;
                default: /* unexpected message */
                        /* TODO: close the connection */
                        break;
                }
-               pthread_mutex_unlock(&protows->mutex);
        }
-       free(rb.base);
+       free(binary->rb.base);
+       free(binary);
+}
+
+/* callback when receiving binary data */
+static void client_on_binary(void *closure, char *data, size_t size)
+{
+       int rc;
+       struct binary *binary;
+
+       if (size) {
+               binary = malloc(sizeof *binary);
+               if (!binary) {
+                       errno = ENOMEM;
+               } else {
+                       binary->protows = closure;
+                       binary->rb.base = data;
+                       binary->rb.head = data;
+                       binary->rb.end = data + size;
+                       rc = jobs_queue(NULL, 0, client_on_binary_job, binary);
+                       if (rc >= 0)
+                               return;
+                       free(binary);
+               }
+       }
+       free(data);
 }
 
 int afb_proto_ws_client_call(
@@ -771,11 +836,12 @@ int afb_proto_ws_client_call(
        /* init call data */
        pthread_mutex_lock(&protows->mutex);
        call->callid = ptr2id(call);
-       while(client_call_search(protows, call->callid) != NULL)
+       while(client_call_search_locked(protows, call->callid) != NULL)
                call->callid++;
        call->protows = protows;
        call->next = protows->calls;
        protows->calls = call;
+       pthread_mutex_unlock(&protows->mutex);
 
        /* creates the call message */
        if (!writebuf_char(&wb, CHAR_FOR_CALL)
@@ -788,7 +854,9 @@ int afb_proto_ws_client_call(
        }
 
        /* send */
+       pthread_mutex_lock(&protows->mutex);
        rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+       pthread_mutex_unlock(&protows->mutex);
        if (rc >= 0) {
                rc = 0;
                goto end;
@@ -797,7 +865,6 @@ int afb_proto_ws_client_call(
 clean:
        client_call_destroy(call);
 end:
-       pthread_mutex_unlock(&protows->mutex);
        return rc;
 }
 
@@ -830,15 +897,15 @@ int afb_proto_ws_client_describe(struct afb_proto_ws *protows, void (*callback)(
        desc->protows = protows;
        desc->next = protows->describes;
        protows->describes = desc;
-       pthread_mutex_unlock(&protows->mutex);
 
        /* send */
        if (writebuf_char(&wb, CHAR_FOR_DESCRIBE)
         && writebuf_uint32(&wb, desc->descid)
-        && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0)
+        && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0) {
+               pthread_mutex_unlock(&protows->mutex);
                return 0;
+       }
 
-       pthread_mutex_lock(&protows->mutex);
        d = protows->describes;
        if (d == desc)
                protows->describes = desc->next;
@@ -848,8 +915,8 @@ int afb_proto_ws_client_describe(struct afb_proto_ws *protows, void (*callback)(
                if (d)
                        d->next = desc->next;
        }
-       free(desc);
        pthread_mutex_unlock(&protows->mutex);
+       free(desc);
 error:
        /* TODO? callback(closure, NULL); */
        return -1;
@@ -931,18 +998,25 @@ static void server_on_subcall_reply(struct afb_proto_ws *protows, struct readbuf
 
 static int server_send_description(struct afb_proto_ws *protows, uint32_t descid, struct json_object *descobj)
 {
+       int rc;
        struct writebuf wb = { .count = 0 };
 
-       return -!(writebuf_char(&wb, CHAR_FOR_DESCRIPTION)
-                && writebuf_uint32(&wb, descid)
-                && writebuf_object(&wb, descobj)
-                && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0);
+       if (writebuf_char(&wb, CHAR_FOR_DESCRIPTION)
+        && writebuf_uint32(&wb, descid)
+        && writebuf_object(&wb, descobj)) {
+               pthread_mutex_lock(&protows->mutex);
+               rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+               pthread_mutex_unlock(&protows->mutex);
+               if (rc >= 0)
+                       return 0;
+       }
+       return -1;
 }
 
 int afb_proto_ws_describe_put(struct afb_proto_ws_describe *describe, struct json_object *description)
 {
        int rc = server_send_description(describe->protows, describe->descid, description);
-       afb_proto_ws_addref(describe->protows);
+       afb_proto_ws_unref(describe->protows);
        free(describe);
        return rc;
 }
@@ -971,33 +1045,51 @@ static void server_on_describe(struct afb_proto_ws *protows, struct readbuf *rb)
 }
 
 /* callback when receiving binary data */
-static void server_on_binary(void *closure, char *data, size_t size)
+static void server_on_binary_job(int sig, void *closure)
 {
-       struct afb_proto_ws *protows;
-       struct readbuf rb;
-
-       rb.base = data;
-       if (size > 0) {
-               rb.head = data;
-               rb.end = data + size;
-               protows = closure;
+       struct binary *binary = closure;
 
-               switch (*rb.head++) {
+       if (!sig) {
+               switch (*binary->rb.head++) {
                case CHAR_FOR_CALL:
-                       server_on_call(protows, &rb);
+                       server_on_call(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_SUBCALL_REPLY:
-                       server_on_subcall_reply(protows, &rb);
+                       server_on_subcall_reply(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_DESCRIBE:
-                       server_on_describe(protows, &rb);
+                       server_on_describe(binary->protows, &binary->rb);
                        break;
                default: /* unexpected message */
                        /* TODO: close the connection */
                        break;
                }
        }
-       free(rb.base);
+       free(binary->rb.base);
+       free(binary);
+}
+
+static void server_on_binary(void *closure, char *data, size_t size)
+{
+       int rc;
+       struct binary *binary;
+
+       if (size) {
+               binary = malloc(sizeof *binary);
+               if (!binary) {
+                       errno = ENOMEM;
+               } else {
+                       binary->protows = closure;
+                       binary->rb.base = data;
+                       binary->rb.head = data;
+                       binary->rb.end = data + size;
+                       rc = jobs_queue(NULL, 0, server_on_binary_job, binary);
+                       if (rc >= 0)
+                               return;
+                       free(binary);
+               }
+       }
+       free(data);
 }
 
 /******************* server part: manage events **********************************/
@@ -1005,12 +1097,19 @@ static void server_on_binary(void *closure, char *data, size_t size)
 static int server_event_send(struct afb_proto_ws *protows, char order, const char *event_name, int event_id, struct json_object *data)
 {
        struct writebuf wb = { .count = 0 };
+       int rc;
 
-       return -!(writebuf_char(&wb, order)
-                && (order == CHAR_FOR_EVT_BROADCAST || writebuf_uint32(&wb, event_id))
-                && writebuf_string(&wb, event_name)
-                && (order == CHAR_FOR_EVT_ADD || order == CHAR_FOR_EVT_DEL || writebuf_object(&wb, data))
-                && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0);
+       if (writebuf_char(&wb, order)
+        && (order == CHAR_FOR_EVT_BROADCAST || writebuf_uint32(&wb, event_id))
+        && writebuf_string(&wb, event_name)
+        && (order == CHAR_FOR_EVT_ADD || order == CHAR_FOR_EVT_DEL || writebuf_object(&wb, data))) {
+               pthread_mutex_lock(&protows->mutex);
+               rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+               pthread_mutex_unlock(&protows->mutex);
+               if (rc >= 0)
+                       return 0;
+       }
+       return -1;
 }
 
 int afb_proto_ws_server_event_create(struct afb_proto_ws *protows, const char *event_name, int event_id)
@@ -1059,6 +1158,7 @@ static void on_hangup(void *closure)
        }
 
        if (protows->fd >= 0) {
+               close(protows->fd);
                protows->fd = -1;
                if (protows->on_hangup)
                        protows->on_hangup(protows->closure);
index 61bce09..6b6ad63 100644 (file)
 #include "afb-session.h"
 #include "verbose.h"
 
-#define COOKEYCOUNT  8
-#define COOKEYMASK   (COOKEYCOUNT - 1)
+#define SIZEUUID       37
+#define HEADCOUNT      16
+#define COOKEYCOUNT    8
+#define COOKEYMASK     (COOKEYCOUNT - 1)
 
-#define NOW (time(NULL))
+#define _MAXEXP_       ((time_t)(~(time_t)0))
+#define _MAXEXP2_      ((time_t)((((unsigned long long)_MAXEXP_) >> 1)))
+#define MAX_EXPIRATION (_MAXEXP_ >= 0 ? _MAXEXP_ : _MAXEXP2_)
+#define NOW            (time(NULL))
 
 struct cookie
 {
@@ -46,40 +51,30 @@ struct cookie
 
 struct afb_session
 {
+       struct afb_session *next; /* link to the next */
        unsigned refcount;
        int timeout;
-       time_t expiration;    // expiration time of the token
-       time_t access;
+       time_t expiration;      // expiration time of the token
        pthread_mutex_t mutex;
-       char uuid[37];        // long term authentication of remote client
-       char token[37];       // short term authentication of remote client
        struct cookie *cookies[COOKEYCOUNT];
+       char autoclose;
+       char idx;
+       char uuid[SIZEUUID];    // long term authentication of remote client
+       char token[SIZEUUID];   // short term authentication of remote client
 };
 
 // Session UUID are store in a simple array [for 10 sessions this should be enough]
 static struct {
-       pthread_mutex_t mutex;          // declare a mutex to protect hash table
-       struct afb_session **store;          // sessions store
-       int count;                      // current number of sessions
+       pthread_mutex_t mutex;  // declare a mutex to protect hash table
+       struct afb_session *heads[HEADCOUNT]; // sessions
+       int count;      // current number of sessions
        int max;
        int timeout;
-       char initok[37];
+       char initok[SIZEUUID];
 } sessions;
 
-/**
- * Get the index of the 'key' in the cookies array.
- * @param key the key to scan
- * @return the index of the list for key within cookies
- */
-static int cookeyidx(const void *key)
-{
-       intptr_t x = (intptr_t)key;
-       unsigned r = (unsigned)((x >> 5) ^ (x >> 15));
-       return r & COOKEYMASK;
-}
-
 /* generate a uuid */
-static void new_uuid(char uuid[37])
+static void new_uuid(char uuid[SIZEUUID])
 {
        uuid_t newuuid;
        uuid_generate(newuuid);
@@ -97,40 +92,51 @@ static inline void unlock(struct afb_session *session)
 }
 
 // Free context [XXXX Should be protected again memory abort XXXX]
-static void free_data (struct afb_session *session)
+static void close_session(struct afb_session *session)
 {
        int idx;
-       struct cookie *cookie, *next;
+       struct cookie *cookie;
 
-       // free cookies
+       /* free cookies */
        for (idx = 0 ; idx < COOKEYCOUNT ; idx++) {
-               cookie = session->cookies[idx];
-               session->cookies[idx] = NULL;
-               while (cookie != NULL) {
-                       next = cookie->next;
+               while ((cookie = session->cookies[idx])) {
+                       session->cookies[idx] = cookie->next;
                        if (cookie->freecb != NULL)
                                cookie->freecb(cookie->value);
                        free(cookie);
-                       cookie = next;
                }
        }
 }
 
+/* tiny hash function inspired from pearson */
+static int pearson4(const char *text)
+{
+       static uint8_t T[16] = {
+                4,  1,  6,  0,  9, 14, 11,  5,
+                2,  3, 12, 15, 10,  7,  8, 13
+       };
+       uint8_t r, c;
+
+       for (r = 0; (c = (uint8_t)*text) ; text++) {
+               r = T[r ^ (15 & c)];
+               r = T[r ^ (c >> 4)];
+       }
+       return r; // % HEADCOUNT;
+}
+
 // Create a new store in RAM, not that is too small it will be automatically extended
 void afb_session_init (int max_session_count, int timeout, const char *initok)
 {
-       // let's create as store as hashtable does not have any
-       sessions.store = calloc (1 + (unsigned)max_session_count, sizeof *sessions.store);
        pthread_mutex_init(&sessions.mutex, NULL);
        sessions.max = max_session_count;
        sessions.timeout = timeout;
        if (initok == NULL)
                /* without token, a secret is made to forbid creation of sessions */
                new_uuid(sessions.initok);
-       else if (strlen(initok) < sizeof(sessions.store[0]->token))
+       else if (strlen(initok) < sizeof sessions.initok)
                strcpy(sessions.initok, initok);
        else {
-               ERROR("initial token '%s' too long (max length 36)", initok);
+               ERROR("initial token '%s' too long (max length %d)", initok, ((int)(sizeof sessions.initok)) - 1);
                exit(1);
        }
 }
@@ -140,196 +146,205 @@ const char *afb_session_initial_token()
        return sessions.initok;
 }
 
-static struct afb_session *search (const char* uuid)
+static struct afb_session *search (const char* uuid, int idx)
 {
-       int  idx;
        struct afb_session *session;
 
-       assert (uuid != NULL);
-
-       pthread_mutex_lock(&sessions.mutex);
-
-       for (idx=0; idx < sessions.max; idx++) {
-               session = sessions.store[idx];
-               if (session && (0 == strcmp (uuid, session->uuid)))
-                       goto found;
-       }
-       session = NULL;
+       session = sessions.heads[idx];
+       while (session && strcmp(uuid, session->uuid))
+               session = session->next;
 
-found:
-       pthread_mutex_unlock(&sessions.mutex);
        return session;
 }
 
-static int destroy (struct afb_session *session)
+static void destroy (struct afb_session *session)
 {
-       int idx;
-       int status;
+       struct afb_session **prv;
 
        assert (session != NULL);
 
+       close_session(session);
        pthread_mutex_lock(&sessions.mutex);
-
-       for (idx=0; idx < sessions.max; idx++) {
-               if (sessions.store[idx] == session) {
-                       sessions.store[idx] = NULL;
+       prv = &sessions.heads[(int)session->idx];
+       while (*prv)
+               if (*prv != session)
+                       prv = &((*prv)->next);
+               else {
+                       *prv = session->next;
                        sessions.count--;
-                       status = 1;
-                       goto deleted;
+                       pthread_mutex_destroy(&session->mutex);
+                       free(session);
+                       break;
                }
-       }
-       status = 0;
-deleted:
        pthread_mutex_unlock(&sessions.mutex);
-       return status;
 }
 
-static int add (struct afb_session *session)
+// Loop on every entry and remove old context sessions.hash
+static time_t cleanup ()
 {
+       struct afb_session *session, *next;
        int idx;
-       int status;
-
-       assert (session != NULL);
-
-       pthread_mutex_lock(&sessions.mutex);
+       time_t now;
 
-       for (idx=0; idx < sessions.max; idx++) {
-               if (NULL == sessions.store[idx]) {
-                       sessions.store[idx] = session;
-                       sessions.count++;
-                       status = 1;
-                       goto added;
+       // Loop on Sessions Table and remove anything that is older than timeout
+       now = NOW;
+       for (idx = 0 ; idx < HEADCOUNT; idx++) {
+               session = sessions.heads[idx];
+               while (session) {
+                       next = session->next;
+                       if (session->expiration < now)
+                               afb_session_close(session);
+                       session = next;
                }
        }
-       status = 0;
-added:
-       pthread_mutex_unlock(&sessions.mutex);
-       return status;
+       return now;
 }
 
-// Check if context timeout or not
-static int is_expired (struct afb_session *ctx, time_t now)
+static void update_timeout(struct afb_session *session, time_t now, int timeout)
 {
-       assert (ctx != NULL);
-       return ctx->expiration < now;
+       time_t expiration;
+
+       /* compute expiration */
+       if (timeout == AFB_SESSION_TIMEOUT_INFINITE)
+               expiration = MAX_EXPIRATION;
+       else {
+               if (timeout == AFB_SESSION_TIMEOUT_DEFAULT)
+                       expiration = now + sessions.timeout;
+               else
+                       expiration = now + timeout;
+               if (expiration < 0)
+                       expiration = MAX_EXPIRATION;
+       }
+
+       /* record the values */
+       session->timeout = timeout;
+       session->expiration = expiration;
 }
 
-// Check if context is active or not
-static int is_active (struct afb_session *ctx, time_t now)
+static void update_expiration(struct afb_session *session, time_t now)
 {
-       assert (ctx != NULL);
-       return ctx->uuid[0] != 0 && ctx->expiration >= now;
+       update_timeout(session, now, session->timeout);
 }
 
-// Loop on every entry and remove old context sessions.hash
-static void cleanup (time_t now)
+static struct afb_session *add_session (const char *uuid, int timeout, time_t now, int idx)
 {
-       struct afb_session *ctx;
-       long idx;
+       struct afb_session *session;
 
-       // Loop on Sessions Table and remove anything that is older than timeout
-       for (idx=0; idx < sessions.max; idx++) {
-               ctx = sessions.store[idx];
-               if (ctx != NULL && is_expired(ctx, now)) {
-                       afb_session_close (ctx);
-               }
+       /* check arguments */
+       if (!AFB_SESSION_TIMEOUT_IS_VALID(timeout)
+        || (uuid && strlen(uuid) >= sizeof session->uuid)) {
+               errno = EINVAL;
+               return NULL;
        }
-}
 
-static struct afb_session *make_session (const char *uuid, int timeout, time_t now)
-{
-       struct afb_session *session;
+       /* check session count */
+       if (sessions.count >= sessions.max) {
+               errno = EBUSY;
+               return NULL;
+       }
 
        /* allocates a new one */
        session = calloc(1, sizeof *session);
        if (session == NULL) {
                errno = ENOMEM;
-               goto error;
-       }
-       pthread_mutex_init(&session->mutex, NULL);
-
-       /* generate the uuid */
-       if (uuid == NULL) {
-               new_uuid(session->uuid);
-       } else {
-               if (strlen(uuid) >= sizeof session->uuid) {
-                       errno = EINVAL;
-                       goto error2;
-               }
-               strcpy(session->uuid, uuid);
+               return NULL;
        }
 
-       /* init the token */
+       /* initialize */
+       pthread_mutex_init(&session->mutex, NULL);
+       session->refcount = 1;
+       strcpy(session->uuid, uuid);
        strcpy(session->token, sessions.initok);
-       session->timeout = timeout;
-       if (timeout != 0)
-               session->expiration = now + timeout;
-       else {
-               session->expiration = (time_t)(~(time_t)0);
-               if (session->expiration < 0)
-                       session->expiration = (time_t)(((unsigned long long)session->expiration) >> 1);
-       }
-       if (!add (session)) {
-               errno = ENOMEM;
-               goto error2;
-       }
+       update_timeout(session, now, timeout);
+
+       /* link */
+       session->idx = (char)idx;
+       session->next = sessions.heads[idx];
+       sessions.heads[idx] = session;
+       sessions.count++;
 
-       session->access = now;
-       session->refcount = 1;
        return session;
+}
+
+/* create a new session for the given timeout */
+static struct afb_session *new_session (int timeout, time_t now)
+{
+       int idx;
+       char uuid[SIZEUUID];
 
-error2:
-       free(session);
-error:
-       return NULL;
+       do {
+               new_uuid(uuid);
+               idx = pearson4(uuid);
+       } while(search(uuid, idx));
+       return add_session(uuid, timeout, now, idx);
 }
 
-struct afb_session *afb_session_create (const char *uuid, int timeout)
+/* Creates a new session with 'timeout' */
+struct afb_session *afb_session_create (int timeout)
 {
        time_t now;
+       struct afb_session *session;
 
        /* cleaning */
-       now = NOW;
-       cleanup (now);
+       pthread_mutex_lock(&sessions.mutex);
+       now = cleanup();
+       session = new_session(timeout, now);
+       pthread_mutex_unlock(&sessions.mutex);
 
-       /* search for an existing one not too old */
-       if (uuid != NULL && search(uuid) != NULL) {
-               errno = EEXIST;
-               return NULL;
-       }
+       return session;
+}
+
+/* Searchs the session of 'uuid' */
+struct afb_session *afb_session_search (const char *uuid)
+{
+       struct afb_session *session;
+
+       /* cleaning */
+       pthread_mutex_lock(&sessions.mutex);
+       cleanup();
+       session = search(uuid, pearson4(uuid));
+       if (session)
+               __atomic_add_fetch(&session->refcount, 1, __ATOMIC_RELAXED);
+       pthread_mutex_unlock(&sessions.mutex);
+       return session;
 
-       return make_session(uuid, timeout, now);
 }
 
-// This function will return exiting session or newly created session
-struct afb_session *afb_session_get (const char *uuid, int *created)
+/* This function will return exiting session or newly created session */
+struct afb_session *afb_session_get (const char *uuid, int timeout, int *created)
 {
+       int idx;
        struct afb_session *session;
        time_t now;
 
        /* cleaning */
-       now = NOW;
-       cleanup (now);
+       pthread_mutex_lock(&sessions.mutex);
+       now = cleanup();
 
        /* search for an existing one not too old */
-       if (uuid != NULL) {
-               session = search(uuid);
-               if (!created)
-                       return session;
-               if (session != NULL) {
-                       *created = 0;
-                       session->access = now;
-                       session->refcount++;
+       if (!uuid)
+               session = new_session(timeout, now);
+       else {
+               idx = pearson4(uuid);
+               session = search(uuid, idx);
+               if (session) {
+                       __atomic_add_fetch(&session->refcount, 1, __ATOMIC_RELAXED);
+                       pthread_mutex_unlock(&sessions.mutex);
+                       if (created)
+                               *created = 0;
                        return session;
                }
+               session = add_session (uuid, timeout, now, idx);
        }
+       pthread_mutex_unlock(&sessions.mutex);
 
        if (created)
-               *created = 1;
+               *created = !!session;
 
-       return make_session(uuid, sessions.timeout, now);
+       return session;
 }
 
+/* increase the use count on the session */
 struct afb_session *afb_session_addref(struct afb_session *session)
 {
        if (session != NULL)
@@ -337,32 +352,57 @@ struct afb_session *afb_session_addref(struct afb_session *session)
        return session;
 }
 
+/* decrease the use count of the session */
 void afb_session_unref(struct afb_session *session)
 {
        if (session != NULL) {
                assert(session->refcount != 0);
                if (!__atomic_sub_fetch(&session->refcount, 1, __ATOMIC_RELAXED)) {
-                       if (session->uuid[0] == 0) {
+                       pthread_mutex_lock(&session->mutex);
+                       if (session->autoclose || session->uuid[0] == 0)
                                destroy (session);
-                               pthread_mutex_destroy(&session->mutex);
-                               free(session);
-                       }
+                       else
+                               pthread_mutex_unlock(&session->mutex);
                }
        }
 }
 
-// Free Client Session Context
+// close Client Session Context
 void afb_session_close (struct afb_session *session)
 {
        assert(session != NULL);
+       pthread_mutex_lock(&session->mutex);
        if (session->uuid[0] != 0) {
                session->uuid[0] = 0;
-               free_data (session);
-               if (session->refcount == 0) {
+               if (session->refcount)
+                       close_session(session);
+               else {
                        destroy (session);
-                       free(session);
+                       return;
                }
        }
+       pthread_mutex_unlock(&session->mutex);
+}
+
+/* set the autoclose flag */
+void afb_session_set_autoclose(struct afb_session *session, int autoclose)
+{
+       assert(session != NULL);
+       session->autoclose = (char)!!autoclose;
+}
+
+// is the session active?
+int afb_session_is_active (struct afb_session *session)
+{
+       assert(session != NULL);
+       return !!session->uuid[0];
+}
+
+// is the session closed?
+int afb_session_is_closed (struct afb_session *session)
+{
+       assert(session != NULL);
+       return !session->uuid[0];
 }
 
 // Sample Generic Ping Debug API
@@ -371,8 +411,10 @@ int afb_session_check_token (struct afb_session *session, const char *token)
        assert(session != NULL);
        assert(token != NULL);
 
-       // compare current token with previous one
-       if (!is_active (session, NOW))
+       if (!session->uuid[0])
+               return 0;
+
+       if (session->expiration < NOW)
                return 0;
 
        if (session->token[0] && strcmp (token, session->token) != 0)
@@ -390,111 +432,135 @@ void afb_session_new_token (struct afb_session *session)
        new_uuid(session->token);
 
        // keep track of time for session timeout and further clean up
-       if (session->timeout != 0)
-               session->expiration = NOW + session->timeout;
+       update_expiration(session, NOW);
 }
 
+/* Returns the uuid of 'session' */
 const char *afb_session_uuid (struct afb_session *session)
 {
        assert(session != NULL);
        return session->uuid;
 }
 
+/* Returns the token of 'session' */
 const char *afb_session_token (struct afb_session *session)
 {
        assert(session != NULL);
        return session->token;
 }
 
-static struct cookie *cookie_search(struct afb_session *session, const void *key, int *idx)
-{
-       struct cookie *cookie;
-
-       cookie = session->cookies[*idx = cookeyidx(key)];
-       while(cookie != NULL && cookie->key != key)
-               cookie = cookie->next;
-       return cookie;
-}
-
-static struct cookie *cookie_add(struct afb_session *session, int idx, const void *key, void *value, void (*freecb)(void*))
+/**
+ * Get the index of the 'key' in the cookies array.
+ * @param key the key to scan
+ * @return the index of the list for key within cookies
+ */
+static int cookeyidx(const void *key)
 {
-       struct cookie *cookie;
-
-       cookie = malloc(sizeof *cookie);
-       if (!cookie)
-               errno = ENOMEM;
-       else {
-               cookie->key = key;
-               cookie->value = value;
-               cookie->freecb = freecb;
-               cookie->next = session->cookies[idx];
-               session->cookies[idx] = cookie;
-       }
-       return cookie;
+       intptr_t x = (intptr_t)key;
+       unsigned r = (unsigned)((x >> 5) ^ (x >> 15));
+       return r & COOKEYMASK;
 }
 
+/**
+ * Set, get, replace, remove a cookie of 'key' for the 'session'
+ *
+ * The behaviour of this function depends on its parameters:
+ *
+ * @param session      the session
+ * @param key          the key of the cookie
+ * @param makecb       the creation function or NULL
+ * @param freecb       the release function or NULL
+ * @param closure      an argument for makecb or the value if makecb==NULL
+ * @param replace      a boolean enforcing replecement of the previous value
+ *
+ * @return the value of the cookie
+ *
+ * The 'key' is a pointer and compared as pointers.
+ *
+ * For getting the current value of the cookie:
+ *
+ *   afb_session_cookie(session, key, NULL, NULL, NULL, 0)
+ *
+ * For storing the value of the cookie
+ *
+ *   afb_session_cookie(session, key, NULL, NULL, value, 1)
+ */
 void *afb_session_cookie(struct afb_session *session, const void *key, void *(*makecb)(void *closure), void (*freecb)(void *item), void *closure, int replace)
 {
        int idx;
        void *value;
-       struct cookie *cookie;
+       struct cookie *cookie, **prv;
 
+       /* get key hashed index */
+       idx = cookeyidx(key);
+
+       /* lock session and search for the cookie of 'key' */
        lock(session);
-       cookie = cookie_search(session, key, &idx);
-       if (cookie) {
-               if (!replace)
-                       value = cookie->value;
-               else {
+       prv = &session->cookies[idx];
+       for (;;) {
+               cookie = *prv;
+               if (!cookie) {
+                       /* 'key' not found, create value using 'closure' and 'makecb' */
                        value = makecb ? makecb(closure) : closure;
-                       if (cookie->value != value && cookie->freecb)
-                               cookie->freecb(cookie->value);
-                       cookie->value = value;
-                       cookie->freecb = freecb;
-               }
-       } else {
-               value = makecb ? makecb(closure) : closure;
-               if (replace || makecb || freecb) {
-                       cookie = cookie_add(session, idx, key, value, freecb);
-                       if (!cookie) {
-                               if (makecb && freecb)
-                                       freecb(value);
-                               value = NULL;
+                       /* store the the only if it has some meaning */
+                       if (replace || makecb || freecb) {
+                               cookie = malloc(sizeof *cookie);
+                               if (!cookie) {
+                                       errno = ENOMEM;
+                                       /* calling freecb if there is no makecb may have issue */
+                                       if (makecb && freecb)
+                                               freecb(value);
+                                       value = NULL;
+                               } else {
+                                       cookie->key = key;
+                                       cookie->value = value;
+                                       cookie->freecb = freecb;
+                                       cookie->next = NULL;
+                                       *prv = cookie;
+                               }
+                       }
+                       break;
+               } else if (cookie->key == key) {
+                       /* cookie of key found */
+                       if (!replace)
+                               /* not replacing, get the value */
+                               value = cookie->value;
+                       else {
+                               /* create value using 'closure' and 'makecb' */
+                               value = makecb ? makecb(closure) : closure;
+
+                               /* free previous value is needed */
+                               if (cookie->value != value && cookie->freecb)
+                                       cookie->freecb(cookie->value);
+
+                               /* store the value and its releaser */
+                               cookie->value = value;
+                               cookie->freecb = freecb;
+
+                               /* but if both are NULL drop the cookie */
+                               if (!value && !freecb) {
+                                       *prv = cookie->next;
+                                       free(cookie);
+                               }
                        }
+                       break;
+               } else {
+                       prv = &(cookie->next);
                }
        }
+
+       /* unlock the session and return the value */
        unlock(session);
        return value;
 }
 
 void *afb_session_get_cookie(struct afb_session *session, const void *key)
 {
-       int idx;
-       void *value;
-       struct cookie *cookie;
-
-       lock(session);
-       cookie = cookie_search(session, key, &idx);
-       value = cookie ? cookie->value : NULL;
-       unlock(session);
-       return value;
+       return afb_session_cookie(session, key, NULL, NULL, NULL, 0);
 }
 
 int afb_session_set_cookie(struct afb_session *session, const void *key, void *value, void (*freecb)(void*))
 {
-       int idx;
-       struct cookie *cookie;
-
-       lock(session);
-       cookie = cookie_search(session, key, &idx);
-       if (!cookie)
-               cookie = cookie_add(session, idx, key, value, freecb);
-       else {
-               if (cookie->value != value && cookie->freecb)
-                       cookie->freecb(cookie->value);
-               cookie->value = value;
-               cookie->freecb = freecb;
-       }
-       unlock(session);
-       return -!cookie;
+       return -(value != afb_session_cookie(session, key, NULL, freecb, value, 1));
 }
 
index b5dc394..d79ec41 100644 (file)
 
 struct afb_session;
 
+#define AFB_SESSION_TIMEOUT_INFINITE  -1
+#define AFB_SESSION_TIMEOUT_DEFAULT   -2
+#define AFB_SESSION_TIMEOUT_IS_VALID(x) ((x) >= AFB_SESSION_TIMEOUT_DEFAULT)
+
 extern void afb_session_init(int max_session_count, int timeout, const char *initok);
 extern const char *afb_session_initial_token();
 
-extern struct afb_session *afb_session_create (const char *uuid, int timeout);
-extern struct afb_session *afb_session_get (const char *uuid, int *created);
+extern struct afb_session *afb_session_create (int timeout);
+extern struct afb_session *afb_session_search (const char *uuid);
+extern struct afb_session *afb_session_get (const char *uuid, int timeout, int *created);
 extern const char *afb_session_uuid (struct afb_session *session);
 
 extern struct afb_session *afb_session_addref(struct afb_session *session);
 extern void afb_session_unref(struct afb_session *session);
+extern void afb_session_set_autoclose(struct afb_session *session, int autoclose);
 
 extern void afb_session_close(struct afb_session *session);
+extern int afb_session_is_active (struct afb_session *session);
+extern int afb_session_is_closed (struct afb_session *session);
 
 extern int afb_session_check_token(struct afb_session *session, const char *token);
 extern void afb_session_new_token(struct afb_session *session);
index 693a0d0..740a857 100644 (file)
@@ -125,6 +125,15 @@ struct server_describe
        struct afb_proto_ws_describe *describe;
 };
 
+/*
+ * structure for recording sessions
+ */
+struct server_session
+{
+       struct server_session *next;
+       struct afb_session *session;
+};
+
 /******************* stub description for client or servers ******************/
 
 struct afb_stub_ws
@@ -147,6 +156,9 @@ struct afb_stub_ws
        /* credentials (server side) */
        struct afb_cred *cred;
 
+       /* sessions (server side) */
+       struct server_session *sessions;
+
        /* apiset */
        struct afb_apiset *apiset;
 
@@ -460,6 +472,46 @@ static void on_subcall(void *closure, struct afb_proto_ws_subcall *subcall, void
 
 /*****************************************************/
 
+static void record_session(struct afb_stub_ws *stubws, struct afb_session *session)
+{
+       struct server_session *s, **prv;
+
+       /* search */
+       prv = &stubws->sessions;
+       while ((s = *prv)) {
+               if (s->session == session)
+                       return;
+               if (afb_session_is_active(s->session))
+                       prv = &s->next;
+               else {
+                       *prv = s->next;
+                       afb_session_addref(s->session);
+                       free(s);
+               }
+       }
+
+       /* create */
+       s = malloc(sizeof *s);
+       if (s) {
+               s->session = afb_session_addref(session);
+               s->next = stubws->sessions;
+               stubws->sessions = s;
+       }
+}
+
+static void release_sessions(struct afb_stub_ws *stubws)
+{
+       struct server_session *s;
+
+       while((s = stubws->sessions)) {
+               stubws->sessions = s->next;
+               afb_session_unref(s->session);
+               free(s);
+       }
+}
+
+/*****************************************************/
+
 static void on_call(void *closure, struct afb_proto_ws_call *call, const char *verb, struct json_object *args, const char *sessionid)
 {
        struct afb_stub_ws *stubws = closure;
@@ -480,6 +532,9 @@ static void on_call(void *closure, struct afb_proto_ws_call *call, const char *v
        if (afb_context_connect(&wreq->xreq.context, sessionid, NULL) < 0)
                goto unconnected;
        wreq->xreq.context.validated = 1;
+       record_session(stubws, wreq->xreq.context.session);
+       if (wreq->xreq.context.created)
+               afb_session_set_autoclose(wreq->xreq.context.session, 1);
 
        /* makes the call */
        wreq->xreq.cred = afb_cred_addref(stubws->cred);
@@ -598,6 +653,8 @@ static void on_hangup(void *closure)
 
        if (stubws->on_hangup)
                stubws->on_hangup(stubws);
+
+       release_sessions(stubws);
 }
 
 /*****************************************************/
@@ -651,6 +708,7 @@ void afb_stub_ws_unref(struct afb_stub_ws *stubws)
        if (!__atomic_sub_fetch(&stubws->refcount, 1, __ATOMIC_RELAXED)) {
                drop_all_events(stubws);
                afb_evt_listener_unref(stubws->listener);
+               release_sessions(stubws);
                afb_proto_ws_unref(stubws->proto);
                afb_cred_unref(stubws->cred);
                afb_apiset_unref(stubws->apiset);
index fb07795..021ab5a 100644 (file)
@@ -76,10 +76,9 @@ struct event {
 };
 
 /* struct for sessions */
-struct session {
-       struct session *next;           /* link to the next session */
-       struct afb_session *session;    /* the session */
-       struct afb_trace *trace;        /* the tracer */
+struct cookie {
+       struct afb_session *session;    /* the session */
+       struct afb_trace *trace;        /* the tracer */
 };
 
 /* struct for recording hooks */
@@ -88,7 +87,7 @@ struct hook {
        void *handler;                  /* the handler of the hook */
        struct event *event;            /* the associated event */
        struct tag *tag;                /* the associated tag */
-       struct session *session;        /* the associated session */
+       struct afb_session *session;    /* the associated session */
 };
 
 /* types of hooks */
@@ -111,7 +110,6 @@ struct afb_trace
        struct afb_session *bound;              /* bound to session */
        struct event *events;                   /* list of events */
        struct tag *tags;                       /* list of tags */
-       struct session *sessions;               /* list of tags */
        struct hook *hooks[Trace_Type_Count];   /* hooks */
 };
 
@@ -975,7 +973,7 @@ abstracting[Trace_Type_Count] =
 /*******************************************************************************/
 
 /* drop hooks of 'trace' matching 'tag' and 'event' and 'session' */
-static void trace_unhook(struct afb_trace *trace, struct tag *tag, struct event *event, struct session *session)
+static void trace_unhook(struct afb_trace *trace, struct tag *tag, struct event *event, struct afb_session *session)
 {
        int i;
        struct hook *hook, **prev;
@@ -1004,24 +1002,7 @@ static void trace_cleanup(struct afb_trace *trace)
        struct hook *hook;
        struct tag *tag, **ptag;
        struct event *event, **pevent;
-       struct session *session, **psession;
 
-       /* clean sessions */
-       psession = &trace->sessions;
-       while ((session = *psession)) {
-               /* search for session */
-               for (hook = NULL, i = 0 ; !hook && i < Trace_Type_Count ; i++)
-                       for (hook = trace->hooks[i] ; hook && hook->session != session ; hook = hook->next);
-               /* keep or free whether used or not */
-               if (hook)
-                       psession = &session->next;
-               else {
-                       *psession = session->next;
-                       if (__atomic_exchange_n(&session->trace, NULL, __ATOMIC_RELAXED))
-                               afb_session_set_cookie(session->session, session, NULL, NULL);
-                       free(session);
-               }
-       }
        /* clean tags */
        ptag = &trace->tags;
        while ((tag = *ptag)) {
@@ -1053,19 +1034,6 @@ static void trace_cleanup(struct afb_trace *trace)
        }
 }
 
-/* callback at end of traced session */
-static void free_session_cookie(void *cookie)
-{
-       struct session *session = cookie;
-       struct afb_trace *trace = __atomic_exchange_n(&session->trace, NULL, __ATOMIC_RELAXED);
-       if (trace) {
-               pthread_mutex_lock(&trace->mutex);
-               trace_unhook(trace, NULL, NULL, session);
-               trace_cleanup(trace);
-               pthread_mutex_unlock(&trace->mutex);
-       }
-}
-
 /*
  * Get the tag of 'name' within 'trace'.
  * If 'alloc' isn't zero, create the tag and add it.
@@ -1123,41 +1091,48 @@ static struct event *trace_get_event(struct afb_trace *trace, const char *name,
 }
 
 /*
- * Get the session of 'value' within 'trace'.
- * If 'alloc' isn't zero, create the session and add it.
+ * called on session closing
  */
-static struct session *trace_get_session(struct afb_trace *trace, struct afb_session *value, int alloc)
-{
-       struct session *session;
-
-       /* search the session */
-       session = trace->sessions;
-       while (session && session->session != value)
-               session = session->next;
-
-       if (!session && alloc) {
-               session = malloc(sizeof * session);
-               if (session) {
-                       session->session = value;
-                       session->trace = NULL;
-                       session->next = trace->sessions;
-                       trace->sessions = session;
-               }
-       }
-       return session;
+static void session_closed(void *item)
+{
+       struct cookie *cookie = item;
+
+       pthread_mutex_lock(&cookie->trace->mutex);
+       trace_unhook(cookie->trace, NULL, NULL, cookie->session);
+       pthread_mutex_unlock(&cookie->trace->mutex);
+       free(cookie);
+}
+
+/*
+ * records the cookie of session for tracking close
+ */
+static void *session_open(void *closure)
+{
+       struct cookie *param = closure, *cookie;
+       cookie = malloc(sizeof *cookie);
+       if (cookie)
+               *cookie = *param;
+       return cookie;
 }
 
 /*
  * Get the session of 'uuid' within 'trace'.
  * If 'alloc' isn't zero, create the session and add it.
  */
-static struct session *trace_get_session_by_uuid(struct afb_trace *trace, const char *uuid, int alloc)
+static struct afb_session *trace_get_session_by_uuid(struct afb_trace *trace, const char *uuid, int alloc)
 {
-       struct afb_session *session;
-       int created;
+       struct cookie cookie;
 
-       session = afb_session_get(uuid, alloc ? &created : NULL);
-       return session ? trace_get_session(trace, session, alloc) : NULL;
+       if (!alloc)
+               cookie.session = afb_session_search(uuid);
+       else {
+               cookie.session = afb_session_get(uuid, AFB_SESSION_TIMEOUT_DEFAULT, NULL);
+               if (cookie.session) {
+                       cookie.trace = trace;
+                       afb_session_cookie(cookie.session, cookie.trace, session_open, session_closed, &cookie, 0);
+               }
+       }
+       return cookie.session;
 }
 
 static struct hook *trace_make_detached_hook(struct afb_trace *trace, const char *event, const char *tag)
@@ -1178,13 +1153,8 @@ static struct hook *trace_make_detached_hook(struct afb_trace *trace, const char
 
 static void trace_attach_hook(struct afb_trace *trace, struct hook *hook, enum trace_type type)
 {
-       struct session *session = hook->session;
        hook->next = trace->hooks[type];
        trace->hooks[type] = hook;
-       if (session && !session->trace) {
-               session->trace = trace;
-               afb_session_set_cookie(session->session, session, session, free_session_cookie);
-       }
 }
 
 /*******************************************************************************/
@@ -1214,7 +1184,7 @@ struct desc
 static void addhook(struct desc *desc, enum trace_type type)
 {
        struct hook *hook;
-       struct session *session;
+       struct afb_session *session;
        struct afb_session *bind;
        struct afb_trace *trace = desc->context->trace;
 
@@ -1241,17 +1211,19 @@ static void addhook(struct desc *desc, enum trace_type type)
        /* create the hook handler */
        switch (type) {
        case Trace_Type_Xreq:
-               if (desc->session) {
+               if (!desc->session)
+                       session = afb_session_addref(bind);
+               else {
                        session = trace_get_session_by_uuid(trace, desc->session, 1);
                        if (!session) {
                                ctxt_error(&desc->context->errors, "allocation of session failed");
                                free(hook);
                                return;
                        }
-                       bind = session->session;
                }
-               hook->handler = afb_hook_create_xreq(desc->api, desc->verb, bind,
+               hook->handler = afb_hook_create_xreq(desc->api, desc->verb, session,
                                desc->flags[type], &hook_xreq_itf, hook);
+               afb_session_unref(session);
                break;
        case Trace_Type_Ditf:
                hook->handler = afb_hook_create_ditf(desc->api, desc->flags[type], &hook_ditf_itf, hook);
@@ -1443,7 +1415,7 @@ static void drop_session(void *closure, struct json_object *object)
 {
        int rc;
        struct context *context = closure;
-       struct session *session;
+       struct afb_session *session;
        const char *uuid;
 
        rc = wrap_json_unpack(object, "s", &uuid);
@@ -1453,8 +1425,10 @@ static void drop_session(void *closure, struct json_object *object)
                session = trace_get_session_by_uuid(context->trace, uuid, 0);
                if (!session)
                        ctxt_error(&context->errors, "session %s not found", uuid);
-               else
+               else {
                        trace_unhook(context->trace, NULL, NULL, session);
+                       afb_session_unref(session);
+               }
        }
 }
 
diff --git a/src/jobs-fake.c b/src/jobs-fake.c
new file mode 100644 (file)
index 0000000..3c1c273
--- /dev/null
@@ -0,0 +1,150 @@
+/*
+ * Copyright (C) 2016, 2017 "IoT.bzh"
+ * Author José Bollo <jose.bollo@iot.bzh>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define _GNU_SOURCE
+
+#include <stdlib.h>
+#include <stdint.h>
+#include <unistd.h>
+#include <signal.h>
+#include <time.h>
+#include <sys/syscall.h>
+#include <pthread.h>
+#include <errno.h>
+#include <assert.h>
+
+#include <systemd/sd-event.h>
+
+#include "jobs.h"
+#include "sig-monitor.h"
+#include "verbose.h"
+
+#include "jobs.h"
+
+struct jobloop;
+
+struct job
+{
+       struct job *next;
+       const void *group;
+       int timeout;
+       void (*callback)(int signum, void* arg);
+       void *closure;
+};
+
+static struct job *first, *last;
+static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
+
+static int add_job(const void *group, int timeout, void (*callback)(int signum, void *closure), void *closure)
+{
+       struct job *j;
+
+       j = malloc(sizeof*j);
+       if (!j) {
+               errno = ENOMEM;
+               return -1;
+       }
+
+       j->next = 0;
+       j->group = group;
+       j->timeout = timeout;
+       j->callback = callback;
+       j->closure = closure;
+
+       pthread_mutex_lock(&mutex);
+       if (first)
+               last->next = j;
+       else
+               first = j;
+       last = j;
+       pthread_mutex_unlock(&mutex);   
+       return 0;
+}
+
+static void *thrrun(void *arg)
+{
+       struct job *j;
+
+       pthread_mutex_lock(&mutex);
+       j = first;
+       if (j)
+               first = j->next;
+       pthread_mutex_unlock(&mutex);   
+       if (j) {
+               j->callback(0, j->closure);
+               free(j);
+       }
+       return 0;
+}
+
+int jobs_queue(
+       const void *group,
+       int timeout,
+       void (*callback)(int signum, void* arg),
+       void *arg)
+{
+       pthread_t tid;
+       int rc = add_job(group, timeout, callback, arg);
+       if (!rc) {
+               rc = pthread_create(&tid, NULL, thrrun, NULL);
+               if (rc)
+                       rc = -1;
+       }
+       return rc;
+}
+
+#if 0
+int jobs_enter(
+       const void *group,
+       int timeout,
+       void (*callback)(int signum, void *closure, struct jobloop *jobloop),
+       void *closure)
+{
+       return 0;
+}
+
+int jobs_leave(struct jobloop *jobloop)
+{
+       return 0;
+}
+
+int jobs_call(
+       const void *group,
+       int timeout,
+       void (*callback)(int, void*),
+       void *arg)
+{
+       return 0;
+}
+
+struct sd_event *jobs_get_sd_event()
+{
+       struct sd_event *r;
+       int rc = sd_event_default(&r);
+       return rc < 0 ? NULL : r;
+}
+
+void jobs_terminate()
+{
+}
+
+int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)(int signum))
+{
+       start(0);
+       return 0;
+}
+#endif
index 93e864f..508d1b8 100644 (file)
@@ -64,8 +64,11 @@ struct events
        struct events *next;
        struct sd_event *event;
        uint64_t timeout;
-       unsigned used: 1;
-       unsigned runs: 1;
+       enum {
+               Available,
+               Modifiable,
+               Locked
+       } state;
 };
 
 /** Description of threads */
@@ -74,10 +77,8 @@ struct thread
        struct thread *next;   /**< next thread of the list */
        struct thread *upper;  /**< upper same thread */
        struct job *job;       /**< currently processed job */
-       struct events *events; /**< currently processed job */
        pthread_t tid;         /**< the thread id */
        unsigned stop: 1;      /**< stop requested */
-       unsigned lowered: 1;   /**< has a lower same thread */
        unsigned waits: 1;     /**< is waiting? */
 };
 
@@ -109,7 +110,8 @@ static int nevents = 0; /** count of events */
 
 /* list of threads */
 static struct thread *threads;
-static _Thread_local struct thread *current;
+static _Thread_local struct thread *current_thread;
+static _Thread_local struct events *current_events;
 
 /* queue of pending jobs */
 static struct job *first_job;
@@ -204,7 +206,7 @@ static inline struct job *job_get()
 static inline struct events *events_get()
 {
        struct events *events = first_events;
-       while (events && events->used)
+       while (events && events->state != Available)
                events = events->next;
        return events;
 }
@@ -272,9 +274,34 @@ static void job_cancel(int signum, void *arg)
  */
 static void events_call(int signum, void *arg)
 {
+       int rc;
+       struct sd_event *se;
        struct events *events = arg;
-       if (!signum)
-               sd_event_run(events->event, events->timeout);
+
+       if (!signum) {
+               se = events->event;
+               rc = sd_event_prepare(se);
+               if (rc < 0) {
+                       errno = -rc;
+                       ERROR("sd_event_prepare returned an error (state: %d): %m", sd_event_get_state(events->event));
+               } else {
+                       if (rc == 0) {
+                               rc = sd_event_wait(se, events->timeout);
+                               if (rc < 0) {
+                                       errno = -rc;
+                                       ERROR("sd_event_wait returned an error (state: %d): %m", sd_event_get_state(events->event));
+                               }
+                       }
+
+                       if (rc > 0) {
+                               rc = sd_event_dispatch(se);
+                               if (rc < 0) {
+                                       errno = -rc;
+                                       ERROR("sd_event_dispatch returned an error (state: %d): %m", sd_event_get_state(events->event));
+                               }
+                       }
+               }
+       }
 }
 
 /**
@@ -286,7 +313,7 @@ static void events_call(int signum, void *arg)
  */
 static void thread_run(volatile struct thread *me)
 {
-       struct thread **prv, *thr;
+       struct thread **prv;
        struct job *job;
        struct events *events;
        uint64_t evto;
@@ -294,22 +321,18 @@ static void thread_run(volatile struct thread *me)
        /* initialize description of itself and link it in the list */
        me->tid = pthread_self();
        me->stop = 0;
-       me->lowered = 0;
        me->waits = 0;
-       me->upper = current;
-       if (current) {
-               current->lowered = 1;
+       me->upper = current_thread;
+       if (current_thread) {
                evto = EVENT_TIMEOUT_CHILD;
-               me->events = current->events;
        } else {
                started++;
                sig_monitor_init_timeouts();
                evto = EVENT_TIMEOUT_TOP;
-               me->events = NULL;
        }
        me->next = threads;
        threads = (struct thread*)me;
-       current = (struct thread*)me;
+       current_thread = (struct thread*)me;
 
        /* loop until stopped */
        while (!me->stop) {
@@ -326,37 +349,34 @@ static void thread_run(volatile struct thread *me)
                        sig_monitor(job->timeout, job->callback, job->arg);
                        pthread_mutex_lock(&mutex);
 
-                       /* release the run job */
-                       job_release(job);
-
                        /* release event if any */
-                       events = me->events;
-                       if (events) {
-                               events->used = 0;
-                               me->events = NULL;
+                       events = current_events;
+                       if (events && events->state == Modifiable) {
+                               current_events = NULL;
+                               events->state = Available;
                        }
+
+                       /* release the run job */
+                       job_release(job);
                } else {
                        /* no job, check events */
-                       events = me->events;
-                       if (!events || events->runs)
+                       events = current_events;
+                       if (!events)
                                events = events_get();
+                       else if (events->state == Locked) {
+                               events = 0;
+                               WARNING("Loosing an event loop because reentering");
+                       }
                        if (events) {
                                /* run the events */
-                               events->used = 1;
-                               events->runs = 1;
+                               events->state = Locked;
                                events->timeout = evto;
-                               me->events = events;
+                               current_events = events;
                                pthread_mutex_unlock(&mutex);
                                sig_monitor(0, events_call, events);
                                pthread_mutex_lock(&mutex);
-                               events->used = 0;
-                               events->runs = 0;
-                               me->events = NULL;
-                               thr = me->upper;
-                               while (thr && thr->events == events) {
-                                       thr->events = NULL;
-                                       thr = thr->upper;
-                               }
+                               current_events = NULL;
+                               events->state = Available;
                        } else {
                                /* no job and not events */
                                waiting++;
@@ -373,10 +393,8 @@ static void thread_run(volatile struct thread *me)
        while (*prv != me)
                prv = &(*prv)->next;
        *prv = me->next;
-       current = me->upper;
-       if (current) {
-               current->lowered = 0;
-       } else {
+       current_thread = me->upper;
+       if (!current_thread) {
                sig_monitor_clean_timeouts();
                started--;
        }
@@ -631,19 +649,13 @@ int jobs_call(
 struct sd_event *jobs_get_sd_event()
 {
        struct events *events;
-       struct thread *me;
        int rc;
 
        pthread_mutex_lock(&mutex);
 
        /* search events on stack */
-       me = current;
-       while (me && !me->events)
-               me = me->upper;
-       if (me)
-               /* return the stacked events */
-               events = me->events;
-       else {
+       events = current_events;
+       if (!events) {
                /* search an available events */
                events = events_get();
                if (!events) {
@@ -655,8 +667,7 @@ struct sd_event *jobs_get_sd_event()
                                events = malloc(sizeof *events);
                                if (events && (rc = sd_event_new(&events->event)) >= 0) {
                                        if (nevents < started || start_one_thread() >= 0) {
-                                               events->used = 0;
-                                               events->runs = 0;
+                                               events->state = Available;
                                                events->next = first_events;
                                                first_events = events;
                                        } else {
@@ -679,13 +690,10 @@ struct sd_event *jobs_get_sd_event()
                        }
                }
                if (events) {
-                       me = current;
-                       if (me) {
-                               events->used = 1;
-                               me->events = events;
-                       } else {
+                       events->state = Modifiable;
+                       if (!current_thread)
                                WARNING("event returned for unknown thread!");
-                       }
+                       current_events = events;
                }
        }
        pthread_mutex_unlock(&mutex);
@@ -715,7 +723,7 @@ int jobs_start(int allowed_count, int start_count, int waiter_count, void (*star
        pthread_mutex_lock(&mutex);
 
        /* check whether already running */
-       if (current || allowed) {
+       if (current_thread || allowed) {
                ERROR("thread already started");
                errno = EINVAL;
                goto error;
@@ -822,7 +830,7 @@ void jobs_terminate()
                head = job->next;
 
                /* search if job is stacked for current */
-               t = current;
+               t = current_thread;
                while (t && t->job != job)
                        t = t->upper;
                if (t) {
index b5b0023..150b781 100644 (file)
@@ -525,7 +525,7 @@ static void run_startup_calls()
        list = config->calls;
        if (list) {
                sreq = calloc(1, sizeof *sreq);
-               sreq->session = afb_session_create("startup", 3600);
+               sreq->session = afb_session_create(3600);
                sreq->current = list;
                startup_call_current(sreq);
        }
index f330c6f..e96627e 100644 (file)
@@ -85,6 +85,7 @@ void verbose_set_name(const char *name, int authority)
 #include <errno.h>
 #include <string.h>
 #include <sys/uio.h>
+#include <pthread.h>
 
 static const char *appname;
 
@@ -105,6 +106,8 @@ static int tty;
 
 static const char chars[] = { '\n', '?', ':', ' ', '[', ',', ']' };
 
+static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
+
 static void _vverbose_(int loglevel, const char *file, int line, const char *function, const char *fmt, va_list args)
 {
        char buffer[4000];
@@ -169,7 +172,9 @@ static void _vverbose_(int loglevel, const char *file, int line, const char *fun
        iov[n].iov_base = (void*)&chars[0];
        iov[n++].iov_len = 1;
 
+       pthread_mutex_lock(&mutex);
        writev(STDERR_FILENO, iov, n);
+       pthread_mutex_unlock(&mutex);
 
        errno = saverr;
 }
index 59e68ab..ed2ffc2 100644 (file)
@@ -21,7 +21,7 @@ if (typeof base != "object")
 
 var initial = {
        base: base.base || "api",
-       token: base.token || initialtoken || "hello",
+       token: base.token || initialtoken || "HELLO",
        host: base.host || window.location.host,
        url: base.url || undefined
 };
@@ -105,8 +105,7 @@ var AFB_websocket;
 
        function onclose(event) {
                for (var id in this.pendings) {
-                       var ferr = this.pendings[id].onerror;
-                       ferr && ferr(null, this);
+                       try { this.pendings[id][1](); } catch (x) {/*TODO?*/}
                }
                this.pendings = {};
                this.onclose && this.onclose();
@@ -131,8 +130,7 @@ var AFB_websocket;
                if (id in pendings) {
                        var p = pendings[id];
                        delete pendings[id];
-                       var f = p[offset];
-                       f(ans);
+                       try { p[offset](ans); } catch (x) {/*TODO?*/}
                }
        }
 
@@ -166,12 +164,18 @@ var AFB_websocket;
                this.onabort = function(){};
        }
 
-       function call(method, request) {
+       function call(method, request, callid) {
                return new Promise((function(resolve, reject){
                        var id, arr;
-                       do {
-                               id = String(this.counter = 4095 & (this.counter + 1));
-                       } while (id in this.pendings);
+                       if (callid) {
+                               id = String(callid);
+                               if (id in this.pendings)
+                                       throw new Error("pending callid("+id+") exists");
+                       } else {
+                               do {
+                                       id = String(this.counter = 4095 & (this.counter + 1));
+                               } while (id in this.pendings);
+                       }
                        this.pendings[id] = [ resolve, reject ];
                        arr = [CALL, id, method, request ];
                        if (AFB_context.token) arr.push(AFB_context.token);
index 59e68ab..ed2ffc2 100644 (file)
@@ -21,7 +21,7 @@ if (typeof base != "object")
 
 var initial = {
        base: base.base || "api",
-       token: base.token || initialtoken || "hello",
+       token: base.token || initialtoken || "HELLO",
        host: base.host || window.location.host,
        url: base.url || undefined
 };
@@ -105,8 +105,7 @@ var AFB_websocket;
 
        function onclose(event) {
                for (var id in this.pendings) {
-                       var ferr = this.pendings[id].onerror;
-                       ferr && ferr(null, this);
+                       try { this.pendings[id][1](); } catch (x) {/*TODO?*/}
                }
                this.pendings = {};
                this.onclose && this.onclose();
@@ -131,8 +130,7 @@ var AFB_websocket;
                if (id in pendings) {
                        var p = pendings[id];
                        delete pendings[id];
-                       var f = p[offset];
-                       f(ans);
+                       try { p[offset](ans); } catch (x) {/*TODO?*/}
                }
        }
 
@@ -166,12 +164,18 @@ var AFB_websocket;
                this.onabort = function(){};
        }
 
-       function call(method, request) {
+       function call(method, request, callid) {
                return new Promise((function(resolve, reject){
                        var id, arr;
-                       do {
-                               id = String(this.counter = 4095 & (this.counter + 1));
-                       } while (id in this.pendings);
+                       if (callid) {
+                               id = String(callid);
+                               if (id in this.pendings)
+                                       throw new Error("pending callid("+id+") exists");
+                       } else {
+                               do {
+                                       id = String(this.counter = 4095 & (this.counter + 1));
+                               } while (id in this.pendings);
+                       }
                        this.pendings[id] = [ resolve, reject ];
                        arr = [CALL, id, method, request ];
                        if (AFB_context.token) arr.push(AFB_context.token);
index d5cf3f6..839f574 100644 (file)
@@ -266,7 +266,7 @@ body {
 /*******************************************************************/
 /* json format */
 
-.json.string { color: lightskyblue; } 
+.json.string { color: teal; } 
 .json.number { color: darkorange; } 
 .json.boolean { color: deepskyblue; } 
 .json.null { color: magenta; } 
index 2c07c1b..5a41887 100644 (file)
@@ -57,7 +57,7 @@
       <div id="params" class="clearfix">
         <div>host: <input type="text" id="param-host" size="50" value="localhost"></input></div>
         <div>port: <input type="text" id="param-port" size="10" value="1234"></input></div>
-        <div>token: <input type="text" id="param-token" size="33" value="hello"></input></div>
+        <div>token: <input type="text" id="param-token" size="33" value="HELLO"></input></div>
       </div>
       <div class="-flex-fill -box-out">
         <div id="trace-events" class="-box-in">
index 5b9dc0e..3c64ab3 100644 (file)
@@ -131,7 +131,7 @@ function init() {
        at("param-host").value = document.location.hostname;
        at("param-port").value = document.location.port;
        var args = new URLSearchParams(document.location.search.substring(1));
-       at("param-token").value = args.get("x-afb-token") || args.get("token") || "hello";
+       at("param-token").value = args.get("x-afb-token") || args.get("token") || "HELLO";
 
        document.onbeforeunload = on_disconnect;