Make websocket concurrent
authorJosé Bollo <jose.bollo@iot.bzh>
Thu, 6 Apr 2017 14:24:46 +0000 (16:24 +0200)
committerJosé Bollo <jose.bollo@iot.bzh>
Thu, 6 Apr 2017 14:26:13 +0000 (16:26 +0200)
Stress tests shown that the module wsj1
wasn't ready to concurrency.

Change-Id: Ia54196f97e9712adf0920b59b188d570f39a6b4f
Signed-off-by: José Bollo <jose.bollo@iot.bzh>
src/afb-wsj1.c

index 42d842e..7a8e023 100644 (file)
@@ -23,6 +23,7 @@
 #include <errno.h>
 #include <string.h>
 #include <stdio.h>
+#include <pthread.h>
 
 #include <json-c/json.h>
 
@@ -77,6 +78,7 @@ struct afb_wsj1
        struct afb_ws *ws;
        struct afb_wsj1_msg *messages;
        struct wsj1_call *calls;
+       pthread_mutex_t mutex;
 };
 
 struct afb_wsj1 *afb_wsj1_create(struct sd_event *eloop, int fd, struct afb_wsj1_itf *itf, void *closure)
@@ -92,6 +94,7 @@ struct afb_wsj1 *afb_wsj1_create(struct sd_event *eloop, int fd, struct afb_wsj1
        result->refcount = 1;
        result->itf = itf;
        result->closure = closure;
+       pthread_mutex_init(&result->mutex, NULL);
 
        result->tokener = json_tokener_new();
        if (result->tokener == NULL)
@@ -114,13 +117,13 @@ error:
 
 void afb_wsj1_addref(struct afb_wsj1 *wsj1)
 {
-       if (wsj1 != NULL)
-               wsj1->refcount++;
+       if (wsj1)
+               __atomic_add_fetch(&wsj1->refcount, 1, __ATOMIC_RELAXED);
 }
 
 void afb_wsj1_unref(struct afb_wsj1 *wsj1)
 {
-       if (wsj1 != NULL && !--wsj1->refcount) {
+       if (wsj1 && !__atomic_sub_fetch(&wsj1->refcount, 1, __ATOMIC_RELAXED)) {
                afb_ws_destroy(wsj1->ws);
                json_tokener_free(wsj1->tokener);
                free(wsj1);
@@ -134,9 +137,10 @@ static void wsj1_on_hangup(struct afb_wsj1 *wsj1)
 }
 
 
-static struct wsj1_call *wsj1_call_search(struct afb_wsj1 *wsj1, const char *id, int remove)
+static struct wsj1_call *wsj1_locked_call_search(struct afb_wsj1 *wsj1, const char *id, int remove)
 {
        struct wsj1_call *r, **p;
+
        p = &wsj1->calls;
        while((r = *p) != NULL) {
                if (strcmp(r->id, id) == 0) {
@@ -146,6 +150,18 @@ static struct wsj1_call *wsj1_call_search(struct afb_wsj1 *wsj1, const char *id,
                }
                p = &r->next;
        }
+
+       return r;
+}
+
+static struct wsj1_call *wsj1_call_search(struct afb_wsj1 *wsj1, const char *id, int remove)
+{
+       struct wsj1_call *r;
+
+       pthread_mutex_lock(&wsj1->mutex);
+       r = wsj1_locked_call_search(wsj1, id, remove);
+       pthread_mutex_unlock(&wsj1->mutex);
+
        return r;
 }
 
@@ -155,15 +171,17 @@ static struct wsj1_call *wsj1_call_create(struct afb_wsj1 *wsj1, void (*on_reply
        if (call == NULL)
                errno = ENOMEM;
        else {
+               pthread_mutex_lock(&wsj1->mutex);
                do {
                        if (wsj1->genid == 0)
                                wsj1->genid = 999999;
                        sprintf(call->id, "%d", wsj1->genid--);
-               } while (wsj1_call_search(wsj1, call->id, 0) != NULL);
+               } while (wsj1_locked_call_search(wsj1, call->id, 0) != NULL);
                call->callback = on_reply;
                call->closure = closure;
                call->next = wsj1->calls;
                wsj1->calls = call;
+               pthread_mutex_unlock(&wsj1->mutex);
        }
        return call;
 }
@@ -301,10 +319,12 @@ static void wsj1_on_text(struct afb_wsj1 *wsj1, char *text, size_t size)
        msg->refcount = 1;
        afb_wsj1_addref(wsj1);
        msg->wsj1 = wsj1;
+       pthread_mutex_lock(&wsj1->mutex);
        msg->next = wsj1->messages;
        if (msg->next != NULL)
                msg->next->previous = msg;
        wsj1->messages = msg;
+       pthread_mutex_unlock(&wsj1->mutex);
 
        /* incoke the handler */
        switch (msg->code) {
@@ -333,19 +353,21 @@ alloc_error:
 void afb_wsj1_msg_addref(struct afb_wsj1_msg *msg)
 {
        if (msg != NULL)
-               msg->refcount++;
+               __atomic_add_fetch(&msg->refcount, 1, __ATOMIC_RELAXED);
 }
 
 void afb_wsj1_msg_unref(struct afb_wsj1_msg *msg)
 {
-       if (msg != NULL && --msg->refcount == 0) {
+       if (msg != NULL && !__atomic_sub_fetch(&msg->refcount, 1, __ATOMIC_RELAXED)) {
                /* unlink the message */
+               pthread_mutex_lock(&msg->wsj1->mutex);
                if (msg->next != NULL)
                        msg->next->previous = msg->previous;
                if (msg->previous == NULL)
                        msg->wsj1->messages = msg->next;
                else
                        msg->previous->next = msg->next;
+               pthread_mutex_unlock(&msg->wsj1->mutex);
                /* free ressources */
                afb_wsj1_unref(msg->wsj1);
                json_object_put(msg->object_j);
@@ -363,8 +385,10 @@ struct json_object *afb_wsj1_msg_object_j(struct afb_wsj1_msg *msg)
 {
        struct json_object *object = msg->object_j;
        if (object == NULL) {
+               pthread_mutex_lock(&msg->wsj1->mutex);
                json_tokener_reset(msg->wsj1->tokener);
                object = json_tokener_parse_ex(msg->wsj1->tokener, msg->object_s, (int)msg->object_s_length);
+               pthread_mutex_unlock(&msg->wsj1->mutex);
                if (object == NULL) {
                        /* lazy error detection of json request. Is it to improve? */
                        object = json_object_new_string_len(msg->object_s, (int)msg->object_s_length);