afb-proto-ws: Fix autolock in proto-ws
[src/app-framework-binder.git] / src / afb-proto-ws.c
index 24f70e5..ce7d75d 100644 (file)
 #include <pthread.h>
 
 #include <json-c/json.h>
-#include <systemd/sd-event.h>
-
-#include "afb-common.h"
 
 #include "afb-ws.h"
 #include "afb-msg-json.h"
 #include "afb-proto-ws.h"
-#include "verbose.h"
+#include "jobs.h"
 
 struct afb_proto_ws;
 
@@ -194,6 +191,27 @@ struct afb_proto_ws
        void (*on_hangup)(void *closure);
 };
 
+/******************* streaming objects **********************************/
+
+#define WRITEBUF_COUNT_MAX  32
+struct writebuf
+{
+       struct iovec iovec[WRITEBUF_COUNT_MAX];
+       uint32_t uints[WRITEBUF_COUNT_MAX];
+       int count;
+};
+
+struct readbuf
+{
+       char *base, *head, *end;
+};
+
+struct binary
+{
+       struct afb_proto_ws *protows;
+       struct readbuf rb;
+};
+
 /******************* common useful tools **********************************/
 
 /**
@@ -208,19 +226,6 @@ static inline uint32_t ptr2id(void *ptr)
 
 /******************* serialisation part **********************************/
 
-struct readbuf
-{
-       char *base, *head, *end;
-};
-
-#define WRITEBUF_COUNT_MAX  32
-struct writebuf
-{
-       struct iovec iovec[WRITEBUF_COUNT_MAX];
-       uint32_t uints[WRITEBUF_COUNT_MAX];
-       int count;
-};
-
 static char *readbuf_get(struct readbuf *rb, uint32_t length)
 {
        char *before = rb->head;
@@ -347,18 +352,20 @@ int afb_proto_ws_call_success(struct afb_proto_ws_call *call, struct json_object
 {
        int rc = -1;
        struct writebuf wb = { .count = 0 };
+       struct afb_proto_ws *protows = call->protows;
 
        if (writebuf_char(&wb, CHAR_FOR_ANSWER_SUCCESS)
         && writebuf_uint32(&wb, call->callid)
         && writebuf_string(&wb, info ?: "")
         && writebuf_object(&wb, obj)) {
-               rc = afb_ws_binary_v(call->protows->ws, wb.iovec, wb.count);
+               pthread_mutex_lock(&protows->mutex);
+               rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+               pthread_mutex_unlock(&protows->mutex);
                if (rc >= 0) {
                        rc = 0;
                        goto success;
                }
        }
-       ERROR("error while sending success");
 success:
        return rc;
 }
@@ -367,18 +374,20 @@ int afb_proto_ws_call_fail(struct afb_proto_ws_call *call, const char *status, c
 {
        int rc = -1;
        struct writebuf wb = { .count = 0 };
+       struct afb_proto_ws *protows = call->protows;
 
        if (writebuf_char(&wb, CHAR_FOR_ANSWER_FAIL)
         && writebuf_uint32(&wb, call->callid)
         && writebuf_string(&wb, status)
         && writebuf_string(&wb, info ? : "")) {
-               rc = afb_ws_binary_v(call->protows->ws, wb.iovec, wb.count);
+               pthread_mutex_lock(&protows->mutex);
+               rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+               pthread_mutex_unlock(&protows->mutex);
                if (rc >= 0) {
                        rc = 0;
                        goto success;
                }
        }
-       ERROR("error while sending fail");
 success:
        return rc;
 }
@@ -397,7 +406,7 @@ int afb_proto_ws_call_subcall(struct afb_proto_ws_call *call, const char *api, c
                sc->callback = callback;
                sc->closure = cb_closure;
 
-               pthread_mutex_unlock(&protows->mutex);
+               pthread_mutex_lock(&protows->mutex);
                sc->subcallid = ptr2id(sc);
                do {
                        sc->subcallid++;
@@ -415,14 +424,15 @@ int afb_proto_ws_call_subcall(struct afb_proto_ws_call *call, const char *api, c
                 && writebuf_string(&wb, api)
                 && writebuf_string(&wb, verb)
                 && writebuf_object(&wb, args)) {
+                       pthread_mutex_lock(&protows->mutex);
                        rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+                       pthread_mutex_unlock(&protows->mutex);
                        if (rc >= 0) {
                                rc = 0;
                                goto success;
                        }
                }
        }
-       ERROR("error while sending subcall");
 success:
        return rc;
 }
@@ -431,18 +441,20 @@ int afb_proto_ws_call_subscribe(struct afb_proto_ws_call *call, const char *even
 {
        int rc = -1;
        struct writebuf wb = { .count = 0 };
+       struct afb_proto_ws *protows = call->protows;
 
        if (writebuf_char(&wb, CHAR_FOR_EVT_SUBSCRIBE)
         && writebuf_uint32(&wb, call->callid)
         && writebuf_uint32(&wb, (uint32_t)event_id)
         && writebuf_string(&wb, event_name)) {
-               rc = afb_ws_binary_v(call->protows->ws, wb.iovec, wb.count);
+               pthread_mutex_lock(&protows->mutex);
+               rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+               pthread_mutex_unlock(&protows->mutex);
                if (rc >= 0) {
                        rc = 0;
                        goto success;
                }
        }
-       ERROR("error while subscribing event");
 success:
        return rc;
 }
@@ -451,18 +463,20 @@ int afb_proto_ws_call_unsubscribe(struct afb_proto_ws_call *call, const char *ev
 {
        int rc = -1;
        struct writebuf wb = { .count = 0 };
+       struct afb_proto_ws *protows = call->protows;
 
        if (writebuf_char(&wb, CHAR_FOR_EVT_UNSUBSCRIBE)
         && writebuf_uint32(&wb, call->callid)
         && writebuf_uint32(&wb, (uint32_t)event_id)
         && writebuf_string(&wb, event_name)) {
-               rc = afb_ws_binary_v(call->protows->ws, wb.iovec, wb.count);
+               pthread_mutex_lock(&protows->mutex);
+               rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+               pthread_mutex_unlock(&protows->mutex);
                if (rc >= 0) {
                        rc = 0;
                        goto success;
                }
        }
-       ERROR("error while subscribing event");
 success:
        return rc;
 }
@@ -470,7 +484,7 @@ success:
 /******************* client part **********************************/
 
 /* search a memorized call */
-static struct client_call *client_call_search(struct afb_proto_ws *protows, uint32_t callid)
+static struct client_call *client_call_search_locked(struct afb_proto_ws *protows, uint32_t callid)
 {
        struct client_call *call;
 
@@ -481,11 +495,23 @@ static struct client_call *client_call_search(struct afb_proto_ws *protows, uint
        return call;
 }
 
+static struct client_call *client_call_search_unlocked(struct afb_proto_ws *protows, uint32_t callid)
+{
+       struct client_call *result;
+
+       pthread_mutex_lock(&protows->mutex);
+       result = client_call_search_locked(protows, callid);
+       pthread_mutex_unlock(&protows->mutex);
+       return result;
+}
+
 /* free and release the memorizing call */
 static void client_call_destroy(struct client_call *call)
 {
        struct client_call **prv;
+       struct afb_proto_ws *protows = call->protows;
 
+       pthread_mutex_lock(&protows->mutex);
        prv = &call->protows->calls;
        while (*prv != NULL) {
                if (*prv == call) {
@@ -494,6 +520,7 @@ static void client_call_destroy(struct client_call *call)
                }
                prv = &(*prv)->next;
        }
+       pthread_mutex_unlock(&protows->mutex);
        free(call);
 }
 
@@ -510,14 +537,12 @@ static int client_msg_call_get(struct afb_proto_ws *protows, struct readbuf *rb,
 
        /* get event data from the message */
        if (!readbuf_uint32(rb, &callid)) {
-               ERROR("Invalid message");
                return 0;
        }
 
        /* get the call */
-       *call = client_call_search(protows, callid);
+       *call = client_call_search_unlocked(protows, callid);
        if (*call == NULL) {
-               ERROR("message not found");
                return 0;
        }
 
@@ -611,6 +636,7 @@ static void client_on_reply_fail(struct afb_proto_ws *protows, struct readbuf *r
 
        if (!client_msg_call_get(protows, rb, &call))
                return;
+       
 
        if (readbuf_string(rb, &status, NULL) && readbuf_string(rb, &info, NULL)) {
                protows->client_itf->on_reply_fail(protows->closure, call->request, status, info);
@@ -625,12 +651,19 @@ static int client_send_subcall_reply(struct afb_proto_ws *protows, uint32_t subc
 {
        struct writebuf wb = { .count = 0 };
        char ie = status < 0;
+       int rc;
 
-       return -!(writebuf_char(&wb, CHAR_FOR_SUBCALL_REPLY)
+       if (writebuf_char(&wb, CHAR_FOR_SUBCALL_REPLY)
         && writebuf_uint32(&wb, subcallid)
         && writebuf_char(&wb, ie)
-        && writebuf_object(&wb, object)
-        && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0);
+        && writebuf_object(&wb, object)) {
+               pthread_mutex_lock(&protows->mutex);
+               rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+               pthread_mutex_unlock(&protows->mutex);
+               if (rc >= 0)
+                       return 0;
+       }
+       return -1;
 }
 
 /* callback for subcall reply */
@@ -693,11 +726,15 @@ static void client_on_description(struct afb_proto_ws *protows, struct readbuf *
        struct json_object *object;
 
        if (readbuf_uint32(rb, &descid)) {
+               pthread_mutex_lock(&protows->mutex);
                prv = &protows->describes;
                while ((desc = *prv) && desc->descid != descid)
                        prv = &desc->next;
-               if (desc) {
+               if (!desc)
+                       pthread_mutex_unlock(&protows->mutex);
+               else {
                        *prv = desc->next;
+                       pthread_mutex_unlock(&protows->mutex);
                        if (!readbuf_object(rb, &object))
                                object = NULL;
                        desc->callback(desc->closure, object);
@@ -707,56 +744,73 @@ static void client_on_description(struct afb_proto_ws *protows, struct readbuf *
 }
 
 /* callback when receiving binary data */
-static void client_on_binary(void *closure, char *data, size_t size)
+static void client_on_binary_job(int sig, void *closure)
 {
-       struct afb_proto_ws *protows;
-       struct readbuf rb;
-
-       rb.base = data;
-       if (size > 0) {
-               rb.head = data;
-               rb.end = data + size;
-               protows = closure;
+       struct binary *binary = closure;
 
-               pthread_mutex_lock(&protows->mutex);
-               switch (*rb.head++) {
+       if (!sig) {
+               switch (*binary->rb.head++) {
                case CHAR_FOR_ANSWER_SUCCESS: /* success */
-                       client_on_reply_success(protows, &rb);
+                       client_on_reply_success(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_ANSWER_FAIL: /* fail */
-                       client_on_reply_fail(protows, &rb);
+                       client_on_reply_fail(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_EVT_BROADCAST: /* broadcast */
-                       client_on_event_broadcast(protows, &rb);
+                       client_on_event_broadcast(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_EVT_ADD: /* creates the event */
-                       client_on_event_create(protows, &rb);
+                       client_on_event_create(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_EVT_DEL: /* removes the event */
-                       client_on_event_remove(protows, &rb);
+                       client_on_event_remove(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_EVT_PUSH: /* pushs the event */
-                       client_on_event_push(protows, &rb);
+                       client_on_event_push(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_EVT_SUBSCRIBE: /* subscribe event for a request */
-                       client_on_event_subscribe(protows, &rb);
+                       client_on_event_subscribe(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_EVT_UNSUBSCRIBE: /* unsubscribe event for a request */
-                       client_on_event_unsubscribe(protows, &rb);
+                       client_on_event_unsubscribe(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_SUBCALL_CALL: /* subcall */
-                       client_on_subcall(protows, &rb);
+                       client_on_subcall(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_DESCRIPTION: /* description */
-                       client_on_description(protows, &rb);
+                       client_on_description(binary->protows, &binary->rb);
                        break;
                default: /* unexpected message */
                        /* TODO: close the connection */
                        break;
                }
-               pthread_mutex_unlock(&protows->mutex);
        }
-       free(rb.base);
+       free(binary->rb.base);
+       free(binary);
+}
+
+/* callback when receiving binary data */
+static void client_on_binary(void *closure, char *data, size_t size)
+{
+       int rc;
+       struct binary *binary;
+
+       if (size) {
+               binary = malloc(sizeof *binary);
+               if (!binary) {
+                       errno = ENOMEM;
+               } else {
+                       binary->protows = closure;
+                       binary->rb.base = data;
+                       binary->rb.head = data;
+                       binary->rb.end = data + size;
+                       rc = jobs_queue(NULL, 0, client_on_binary_job, binary);
+                       if (rc >= 0)
+                               return;
+                       free(binary);
+               }
+       }
+       free(data);
 }
 
 int afb_proto_ws_client_call(
@@ -782,11 +836,12 @@ int afb_proto_ws_client_call(
        /* init call data */
        pthread_mutex_lock(&protows->mutex);
        call->callid = ptr2id(call);
-       while(client_call_search(protows, call->callid) != NULL)
+       while(client_call_search_locked(protows, call->callid) != NULL)
                call->callid++;
        call->protows = protows;
        call->next = protows->calls;
        protows->calls = call;
+       pthread_mutex_unlock(&protows->mutex);
 
        /* creates the call message */
        if (!writebuf_char(&wb, CHAR_FOR_CALL)
@@ -799,7 +854,9 @@ int afb_proto_ws_client_call(
        }
 
        /* send */
+       pthread_mutex_lock(&protows->mutex);
        rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+       pthread_mutex_unlock(&protows->mutex);
        if (rc >= 0) {
                rc = 0;
                goto end;
@@ -808,7 +865,6 @@ int afb_proto_ws_client_call(
 clean:
        client_call_destroy(call);
 end:
-       pthread_mutex_unlock(&protows->mutex);
        return rc;
 }
 
@@ -841,15 +897,15 @@ int afb_proto_ws_client_describe(struct afb_proto_ws *protows, void (*callback)(
        desc->protows = protows;
        desc->next = protows->describes;
        protows->describes = desc;
-       pthread_mutex_unlock(&protows->mutex);
 
        /* send */
        if (writebuf_char(&wb, CHAR_FOR_DESCRIBE)
         && writebuf_uint32(&wb, desc->descid)
-        && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0)
+        && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0) {
+               pthread_mutex_unlock(&protows->mutex);
                return 0;
+       }
 
-       pthread_mutex_lock(&protows->mutex);
        d = protows->describes;
        if (d == desc)
                protows->describes = desc->next;
@@ -859,8 +915,8 @@ int afb_proto_ws_client_describe(struct afb_proto_ws *protows, void (*callback)(
                if (d)
                        d->next = desc->next;
        }
-       free(desc);
        pthread_mutex_unlock(&protows->mutex);
+       free(desc);
 error:
        /* TODO? callback(closure, NULL); */
        return -1;
@@ -942,18 +998,25 @@ static void server_on_subcall_reply(struct afb_proto_ws *protows, struct readbuf
 
 static int server_send_description(struct afb_proto_ws *protows, uint32_t descid, struct json_object *descobj)
 {
+       int rc;
        struct writebuf wb = { .count = 0 };
 
-       return -!(writebuf_char(&wb, CHAR_FOR_DESCRIPTION)
-                && writebuf_uint32(&wb, descid)
-                && writebuf_object(&wb, descobj)
-                && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0);
+       if (writebuf_char(&wb, CHAR_FOR_DESCRIPTION)
+        && writebuf_uint32(&wb, descid)
+        && writebuf_object(&wb, descobj)) {
+               pthread_mutex_lock(&protows->mutex);
+               rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+               pthread_mutex_unlock(&protows->mutex);
+               if (rc >= 0)
+                       return 0;
+       }
+       return -1;
 }
 
 int afb_proto_ws_describe_put(struct afb_proto_ws_describe *describe, struct json_object *description)
 {
        int rc = server_send_description(describe->protows, describe->descid, description);
-       afb_proto_ws_addref(describe->protows);
+       afb_proto_ws_unref(describe->protows);
        free(describe);
        return rc;
 }
@@ -982,33 +1045,51 @@ static void server_on_describe(struct afb_proto_ws *protows, struct readbuf *rb)
 }
 
 /* callback when receiving binary data */
-static void server_on_binary(void *closure, char *data, size_t size)
+static void server_on_binary_job(int sig, void *closure)
 {
-       struct afb_proto_ws *protows;
-       struct readbuf rb;
-
-       rb.base = data;
-       if (size > 0) {
-               rb.head = data;
-               rb.end = data + size;
-               protows = closure;
+       struct binary *binary = closure;
 
-               switch (*rb.head++) {
+       if (!sig) {
+               switch (*binary->rb.head++) {
                case CHAR_FOR_CALL:
-                       server_on_call(protows, &rb);
+                       server_on_call(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_SUBCALL_REPLY:
-                       server_on_subcall_reply(protows, &rb);
+                       server_on_subcall_reply(binary->protows, &binary->rb);
                        break;
                case CHAR_FOR_DESCRIBE:
-                       server_on_describe(protows, &rb);
+                       server_on_describe(binary->protows, &binary->rb);
                        break;
                default: /* unexpected message */
                        /* TODO: close the connection */
                        break;
                }
        }
-       free(rb.base);
+       free(binary->rb.base);
+       free(binary);
+}
+
+static void server_on_binary(void *closure, char *data, size_t size)
+{
+       int rc;
+       struct binary *binary;
+
+       if (size) {
+               binary = malloc(sizeof *binary);
+               if (!binary) {
+                       errno = ENOMEM;
+               } else {
+                       binary->protows = closure;
+                       binary->rb.base = data;
+                       binary->rb.head = data;
+                       binary->rb.end = data + size;
+                       rc = jobs_queue(NULL, 0, server_on_binary_job, binary);
+                       if (rc >= 0)
+                               return;
+                       free(binary);
+               }
+       }
+       free(data);
 }
 
 /******************* server part: manage events **********************************/
@@ -1016,12 +1097,19 @@ static void server_on_binary(void *closure, char *data, size_t size)
 static int server_event_send(struct afb_proto_ws *protows, char order, const char *event_name, int event_id, struct json_object *data)
 {
        struct writebuf wb = { .count = 0 };
+       int rc;
 
-       return -!(writebuf_char(&wb, order)
-                && (order == CHAR_FOR_EVT_BROADCAST || writebuf_uint32(&wb, event_id))
-                && writebuf_string(&wb, event_name)
-                && (order == CHAR_FOR_EVT_ADD || order == CHAR_FOR_EVT_DEL || writebuf_object(&wb, data))
-                && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0);
+       if (writebuf_char(&wb, order)
+        && (order == CHAR_FOR_EVT_BROADCAST || writebuf_uint32(&wb, event_id))
+        && writebuf_string(&wb, event_name)
+        && (order == CHAR_FOR_EVT_ADD || order == CHAR_FOR_EVT_DEL || writebuf_object(&wb, data))) {
+               pthread_mutex_lock(&protows->mutex);
+               rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+               pthread_mutex_unlock(&protows->mutex);
+               if (rc >= 0)
+                       return 0;
+       }
+       return -1;
 }
 
 int afb_proto_ws_server_event_create(struct afb_proto_ws *protows, const char *event_name, int event_id)
@@ -1070,6 +1158,7 @@ static void on_hangup(void *closure)
        }
 
        if (protows->fd >= 0) {
+               close(protows->fd);
                protows->fd = -1;
                if (protows->on_hangup)
                        protows->on_hangup(protows->closure);
@@ -1098,7 +1187,7 @@ static const struct afb_ws_itf server_ws_itf =
 
 /*****************************************************/
 
-static struct afb_proto_ws *afb_proto_ws_create(int fd, const struct afb_proto_ws_server_itf *itfs, const struct afb_proto_ws_client_itf *itfc, void *closure, const struct afb_ws_itf *itf)
+static struct afb_proto_ws *afb_proto_ws_create(struct sd_event *eloop, int fd, const struct afb_proto_ws_server_itf *itfs, const struct afb_proto_ws_client_itf *itfc, void *closure, const struct afb_ws_itf *itf)
 {
        struct afb_proto_ws *protows;
 
@@ -1108,7 +1197,7 @@ static struct afb_proto_ws *afb_proto_ws_create(int fd, const struct afb_proto_w
        else {
                fcntl(fd, F_SETFD, FD_CLOEXEC);
                fcntl(fd, F_SETFL, O_NONBLOCK);
-               protows->ws = afb_ws_create(afb_common_get_event_loop(), fd, itf, protows);
+               protows->ws = afb_ws_create(eloop, fd, itf, protows);
                if (protows->ws != NULL) {
                        protows->fd = fd;
                        protows->refcount = 1;
@@ -1124,14 +1213,14 @@ static struct afb_proto_ws *afb_proto_ws_create(int fd, const struct afb_proto_w
        return NULL;
 }
 
-struct afb_proto_ws *afb_proto_ws_create_client(int fd, const struct afb_proto_ws_client_itf *itf, void *closure)
+struct afb_proto_ws *afb_proto_ws_create_client(struct sd_event *eloop, int fd, const struct afb_proto_ws_client_itf *itf, void *closure)
 {
-       return afb_proto_ws_create(fd, NULL, itf, closure, &proto_ws_client_ws_itf);
+       return afb_proto_ws_create(eloop, fd, NULL, itf, closure, &proto_ws_client_ws_itf);
 }
 
-struct afb_proto_ws *afb_proto_ws_create_server(int fd, const struct afb_proto_ws_server_itf *itf, void *closure)
+struct afb_proto_ws *afb_proto_ws_create_server(struct sd_event *eloop, int fd, const struct afb_proto_ws_server_itf *itf, void *closure)
 {
-       return afb_proto_ws_create(fd, itf, NULL, closure, &server_ws_itf);
+       return afb_proto_ws_create(eloop, fd, itf, NULL, closure, &server_ws_itf);
 }
 
 void afb_proto_ws_unref(struct afb_proto_ws *protows)