afb-proto-ws: Fix autolock in proto-ws
authorJosé Bollo <jose.bollo@iot.bzh>
Fri, 17 Nov 2017 15:51:02 +0000 (16:51 +0100)
committerJosé Bollo <jose.bollo@iot.bzh>
Fri, 17 Nov 2017 16:02:33 +0000 (17:02 +0100)
Because a systemd event loop can not be reentered
while evaluating an event callback, the event loop
was removed from the threads. It had the effect to
enter in deadlock when calling a synchronous call
while in an event callback.

Queueing a job solves the issue.

But because using queued job has implications on
libafbws, a fake job manager is added for libafbws.

Change-Id: Id793bea55743790082eaab48cd4cc87f7993772a
Signed-off-by: José Bollo <jose.bollo@iot.bzh>
src/CMakeLists.txt
src/afb-proto-ws.c
src/jobs-fake.c [new file with mode: 0644]
src/jobs.c

index d531ca2..338f1ce 100644 (file)
@@ -93,13 +93,14 @@ INSTALL(TARGETS afb-daemon
 ###########################################
 # build and install libafbwsc
 ###########################################
-ADD_LIBRARY(afbwsc SHARED afb-ws.c afb-ws-client.c afb-wsj1.c websock.c afb-proto-ws.c)
+ADD_LIBRARY(afbwsc SHARED afb-ws.c afb-ws-client.c afb-wsj1.c websock.c afb-proto-ws.c jobs-fake.c)
 SET_TARGET_PROPERTIES(afbwsc PROPERTIES
        VERSION ${LIBAFBWSC_VERSION}
        SOVERSION ${LIBAFBWSC_SOVERSION})
 TARGET_LINK_LIBRARIES(afbwsc
        ${libsystemd_LDFLAGS}
        ${json-c_LDFLAGS}
+       -lpthread
        -Wl,--version-script=${CMAKE_CURRENT_SOURCE_DIR}/export-afbwsc.map
        -Wl,--as-needed
        -Wl,--gc-sections
index 90c3b05..ce7d75d 100644 (file)
@@ -37,6 +37,7 @@
 #include "afb-ws.h"
 #include "afb-msg-json.h"
 #include "afb-proto-ws.h"
+#include "jobs.h"
 
 struct afb_proto_ws;
 
@@ -190,6 +191,27 @@ struct afb_proto_ws
        void (*on_hangup)(void *closure);
 };
 
+/******************* streaming objects **********************************/
+
+#define WRITEBUF_COUNT_MAX  32
+struct writebuf
+{
+       struct iovec iovec[WRITEBUF_COUNT_MAX];
+       uint32_t uints[WRITEBUF_COUNT_MAX];
+       int count;
+};
+
+struct readbuf
+{
+       char *base, *head, *end;
+};
+
+struct binary
+{
+       struct afb_proto_ws *protows;
+       struct readbuf rb;
+};
+
 /******************* common useful tools **********************************/
 
 /**
@@ -204,19 +226,6 @@ static inline uint32_t ptr2id(void *ptr)
 
 /******************* serialisation part **********************************/
 
-struct readbuf
-{
-       char *base, *head, *end;
-};
-
-#define WRITEBUF_COUNT_MAX  32
-struct writebuf
-{
-       struct iovec iovec[WRITEBUF_COUNT_MAX];
-       uint32_t uints[WRITEBUF_COUNT_MAX];
-       int count;
-};
-
 static char *readbuf_get(struct readbuf *rb, uint32_t length)
 {
        char *before = rb->head;
@@ -735,54 +744,73 @@ static void client_on_description(struct afb_proto_ws *protows, struct readbuf *
 }
 
 /* callback when receiving binary data */
-static void client_on_binary(void *closure, char *data, size_t size)
+static void client_on_binary_job(int sig, void *closure)
 {
-       struct afb_proto_ws *protows;
-       struct readbuf rb;
-
-       rb.base = data;
-       if (size > 0) {
-               rb.head = data;
-               rb.end = data + size;
-               protows = closure;
+       struct binary *binary = closure;
 
-               switch (*rb.head++) {
+       if (!sig) {
+               switch (*binary->rb.head++) {
                case CHAR_FOR_ANSWER_SUCCESS: /* success */
-                       client_on_reply_success(protows, &rb);
+                       client_on_reply_success(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_ANSWER_FAIL: /* fail */
-                       client_on_reply_fail(protows, &rb);
+                       client_on_reply_fail(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_EVT_BROADCAST: /* broadcast */
-                       client_on_event_broadcast(protows, &rb);
+                       client_on_event_broadcast(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_EVT_ADD: /* creates the event */
-                       client_on_event_create(protows, &rb);
+                       client_on_event_create(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_EVT_DEL: /* removes the event */
-                       client_on_event_remove(protows, &rb);
+                       client_on_event_remove(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_EVT_PUSH: /* pushs the event */
-                       client_on_event_push(protows, &rb);
+                       client_on_event_push(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_EVT_SUBSCRIBE: /* subscribe event for a request */
-                       client_on_event_subscribe(protows, &rb);
+                       client_on_event_subscribe(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_EVT_UNSUBSCRIBE: /* unsubscribe event for a request */
-                       client_on_event_unsubscribe(protows, &rb);
+                       client_on_event_unsubscribe(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_SUBCALL_CALL: /* subcall */
-                       client_on_subcall(protows, &rb);
+                       client_on_subcall(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_DESCRIPTION: /* description */
-                       client_on_description(protows, &rb);
+                       client_on_description(binary->protows, &binary->rb);
                        break;
                default: /* unexpected message */
                        /* TODO: close the connection */
                        break;
                }
        }
-       free(rb.base);
+       free(binary->rb.base);
+       free(binary);
+}
+
+/* callback when receiving binary data */
+static void client_on_binary(void *closure, char *data, size_t size)
+{
+       int rc;
+       struct binary *binary;
+
+       if (size) {
+               binary = malloc(sizeof *binary);
+               if (!binary) {
+                       errno = ENOMEM;
+               } else {
+                       binary->protows = closure;
+                       binary->rb.base = data;
+                       binary->rb.head = data;
+                       binary->rb.end = data + size;
+                       rc = jobs_queue(NULL, 0, client_on_binary_job, binary);
+                       if (rc >= 0)
+                               return;
+                       free(binary);
+               }
+       }
+       free(data);
 }
 
 int afb_proto_ws_client_call(
@@ -1017,33 +1045,51 @@ static void server_on_describe(struct afb_proto_ws *protows, struct readbuf *rb)
 }
 
 /* callback when receiving binary data */
-static void server_on_binary(void *closure, char *data, size_t size)
+static void server_on_binary_job(int sig, void *closure)
 {
-       struct afb_proto_ws *protows;
-       struct readbuf rb;
-
-       rb.base = data;
-       if (size > 0) {
-               rb.head = data;
-               rb.end = data + size;
-               protows = closure;
+       struct binary *binary = closure;
 
-               switch (*rb.head++) {
+       if (!sig) {
+               switch (*binary->rb.head++) {
                case CHAR_FOR_CALL:
-                       server_on_call(protows, &rb);
+                       server_on_call(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_SUBCALL_REPLY:
-                       server_on_subcall_reply(protows, &rb);
+                       server_on_subcall_reply(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_DESCRIBE:
-                       server_on_describe(protows, &rb);
+                       server_on_describe(binary->protows, &binary->rb);
                        break;
                default: /* unexpected message */
                        /* TODO: close the connection */
                        break;
                }
        }
-       free(rb.base);
+       free(binary->rb.base);
+       free(binary);
+}
+
+static void server_on_binary(void *closure, char *data, size_t size)
+{
+       int rc;
+       struct binary *binary;
+
+       if (size) {
+               binary = malloc(sizeof *binary);
+               if (!binary) {
+                       errno = ENOMEM;
+               } else {
+                       binary->protows = closure;
+                       binary->rb.base = data;
+                       binary->rb.head = data;
+                       binary->rb.end = data + size;
+                       rc = jobs_queue(NULL, 0, server_on_binary_job, binary);
+                       if (rc >= 0)
+                               return;
+                       free(binary);
+               }
+       }
+       free(data);
 }
 
 /******************* server part: manage events **********************************/
diff --git a/src/jobs-fake.c b/src/jobs-fake.c
new file mode 100644 (file)
index 0000000..3c1c273
--- /dev/null
@@ -0,0 +1,150 @@
+/*
+ * Copyright (C) 2016, 2017 "IoT.bzh"
+ * Author José Bollo <jose.bollo@iot.bzh>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define _GNU_SOURCE
+
+#include <stdlib.h>
+#include <stdint.h>
+#include <unistd.h>
+#include <signal.h>
+#include <time.h>
+#include <sys/syscall.h>
+#include <pthread.h>
+#include <errno.h>
+#include <assert.h>
+
+#include <systemd/sd-event.h>
+
+#include "jobs.h"
+#include "sig-monitor.h"
+#include "verbose.h"
+
+#include "jobs.h"
+
+struct jobloop;
+
+struct job
+{
+       struct job *next;
+       const void *group;
+       int timeout;
+       void (*callback)(int signum, void* arg);
+       void *closure;
+};
+
+static struct job *first, *last;
+static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
+
+static int add_job(const void *group, int timeout, void (*callback)(int signum, void *closure), void *closure)
+{
+       struct job *j;
+
+       j = malloc(sizeof*j);
+       if (!j) {
+               errno = ENOMEM;
+               return -1;
+       }
+
+       j->next = 0;
+       j->group = group;
+       j->timeout = timeout;
+       j->callback = callback;
+       j->closure = closure;
+
+       pthread_mutex_lock(&mutex);
+       if (first)
+               last->next = j;
+       else
+               first = j;
+       last = j;
+       pthread_mutex_unlock(&mutex);   
+       return 0;
+}
+
+static void *thrrun(void *arg)
+{
+       struct job *j;
+
+       pthread_mutex_lock(&mutex);
+       j = first;
+       if (j)
+               first = j->next;
+       pthread_mutex_unlock(&mutex);   
+       if (j) {
+               j->callback(0, j->closure);
+               free(j);
+       }
+       return 0;
+}
+
+int jobs_queue(
+       const void *group,
+       int timeout,
+       void (*callback)(int signum, void* arg),
+       void *arg)
+{
+       pthread_t tid;
+       int rc = add_job(group, timeout, callback, arg);
+       if (!rc) {
+               rc = pthread_create(&tid, NULL, thrrun, NULL);
+               if (rc)
+                       rc = -1;
+       }
+       return rc;
+}
+
+#if 0
+int jobs_enter(
+       const void *group,
+       int timeout,
+       void (*callback)(int signum, void *closure, struct jobloop *jobloop),
+       void *closure)
+{
+       return 0;
+}
+
+int jobs_leave(struct jobloop *jobloop)
+{
+       return 0;
+}
+
+int jobs_call(
+       const void *group,
+       int timeout,
+       void (*callback)(int, void*),
+       void *arg)
+{
+       return 0;
+}
+
+struct sd_event *jobs_get_sd_event()
+{
+       struct sd_event *r;
+       int rc = sd_event_default(&r);
+       return rc < 0 ? NULL : r;
+}
+
+void jobs_terminate()
+{
+}
+
+int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)(int signum))
+{
+       start(0);
+       return 0;
+}
+#endif
index b7d1611..78131fc 100644 (file)
@@ -363,8 +363,10 @@ static void thread_run(volatile struct thread *me)
                        events = current_events;
                        if (!events)
                                events = events_get();
-                       else if (events->state == Locked)
+                       else if (events->state == Locked) {
                                events = 0;
+                               AFB_WARNING("Loosing an event loop because reentering");
+                       }
                        if (events) {
                                /* run the events */
                                events->state = Locked;