afb-proto-ws: fix self locking issue
authorJosé Bollo <jose.bollo@iot.bzh>
Sat, 11 Nov 2017 21:36:05 +0000 (22:36 +0100)
committerJosé Bollo <jose.bollo@iot.bzh>
Sat, 11 Nov 2017 21:36:05 +0000 (22:36 +0100)
Calling synchronously a verb on an event of the same
API was blocking.

Change-Id: I58a988c6df8c60cd3a38c3cdff23d7be8b6be54e
Signed-off-by: José Bollo <jose.bollo@iot.bzh>
src/afb-proto-ws.c

index 0903976..90c3b05 100644 (file)
@@ -343,12 +343,15 @@ int afb_proto_ws_call_success(struct afb_proto_ws_call *call, struct json_object
 {
        int rc = -1;
        struct writebuf wb = { .count = 0 };
+       struct afb_proto_ws *protows = call->protows;
 
        if (writebuf_char(&wb, CHAR_FOR_ANSWER_SUCCESS)
         && writebuf_uint32(&wb, call->callid)
         && writebuf_string(&wb, info ?: "")
         && writebuf_object(&wb, obj)) {
-               rc = afb_ws_binary_v(call->protows->ws, wb.iovec, wb.count);
+               pthread_mutex_lock(&protows->mutex);
+               rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+               pthread_mutex_unlock(&protows->mutex);
                if (rc >= 0) {
                        rc = 0;
                        goto success;
@@ -362,12 +365,15 @@ int afb_proto_ws_call_fail(struct afb_proto_ws_call *call, const char *status, c
 {
        int rc = -1;
        struct writebuf wb = { .count = 0 };
+       struct afb_proto_ws *protows = call->protows;
 
        if (writebuf_char(&wb, CHAR_FOR_ANSWER_FAIL)
         && writebuf_uint32(&wb, call->callid)
         && writebuf_string(&wb, status)
         && writebuf_string(&wb, info ? : "")) {
-               rc = afb_ws_binary_v(call->protows->ws, wb.iovec, wb.count);
+               pthread_mutex_lock(&protows->mutex);
+               rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+               pthread_mutex_unlock(&protows->mutex);
                if (rc >= 0) {
                        rc = 0;
                        goto success;
@@ -391,7 +397,7 @@ int afb_proto_ws_call_subcall(struct afb_proto_ws_call *call, const char *api, c
                sc->callback = callback;
                sc->closure = cb_closure;
 
-               pthread_mutex_unlock(&protows->mutex);
+               pthread_mutex_lock(&protows->mutex);
                sc->subcallid = ptr2id(sc);
                do {
                        sc->subcallid++;
@@ -409,7 +415,9 @@ int afb_proto_ws_call_subcall(struct afb_proto_ws_call *call, const char *api, c
                 && writebuf_string(&wb, api)
                 && writebuf_string(&wb, verb)
                 && writebuf_object(&wb, args)) {
+                       pthread_mutex_lock(&protows->mutex);
                        rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+                       pthread_mutex_unlock(&protows->mutex);
                        if (rc >= 0) {
                                rc = 0;
                                goto success;
@@ -424,12 +432,15 @@ int afb_proto_ws_call_subscribe(struct afb_proto_ws_call *call, const char *even
 {
        int rc = -1;
        struct writebuf wb = { .count = 0 };
+       struct afb_proto_ws *protows = call->protows;
 
        if (writebuf_char(&wb, CHAR_FOR_EVT_SUBSCRIBE)
         && writebuf_uint32(&wb, call->callid)
         && writebuf_uint32(&wb, (uint32_t)event_id)
         && writebuf_string(&wb, event_name)) {
-               rc = afb_ws_binary_v(call->protows->ws, wb.iovec, wb.count);
+               pthread_mutex_lock(&protows->mutex);
+               rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+               pthread_mutex_unlock(&protows->mutex);
                if (rc >= 0) {
                        rc = 0;
                        goto success;
@@ -443,12 +454,15 @@ int afb_proto_ws_call_unsubscribe(struct afb_proto_ws_call *call, const char *ev
 {
        int rc = -1;
        struct writebuf wb = { .count = 0 };
+       struct afb_proto_ws *protows = call->protows;
 
        if (writebuf_char(&wb, CHAR_FOR_EVT_UNSUBSCRIBE)
         && writebuf_uint32(&wb, call->callid)
         && writebuf_uint32(&wb, (uint32_t)event_id)
         && writebuf_string(&wb, event_name)) {
-               rc = afb_ws_binary_v(call->protows->ws, wb.iovec, wb.count);
+               pthread_mutex_lock(&protows->mutex);
+               rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+               pthread_mutex_unlock(&protows->mutex);
                if (rc >= 0) {
                        rc = 0;
                        goto success;
@@ -461,7 +475,7 @@ success:
 /******************* client part **********************************/
 
 /* search a memorized call */
-static struct client_call *client_call_search(struct afb_proto_ws *protows, uint32_t callid)
+static struct client_call *client_call_search_locked(struct afb_proto_ws *protows, uint32_t callid)
 {
        struct client_call *call;
 
@@ -472,11 +486,23 @@ static struct client_call *client_call_search(struct afb_proto_ws *protows, uint
        return call;
 }
 
+static struct client_call *client_call_search_unlocked(struct afb_proto_ws *protows, uint32_t callid)
+{
+       struct client_call *result;
+
+       pthread_mutex_lock(&protows->mutex);
+       result = client_call_search_locked(protows, callid);
+       pthread_mutex_unlock(&protows->mutex);
+       return result;
+}
+
 /* free and release the memorizing call */
 static void client_call_destroy(struct client_call *call)
 {
        struct client_call **prv;
+       struct afb_proto_ws *protows = call->protows;
 
+       pthread_mutex_lock(&protows->mutex);
        prv = &call->protows->calls;
        while (*prv != NULL) {
                if (*prv == call) {
@@ -485,6 +511,7 @@ static void client_call_destroy(struct client_call *call)
                }
                prv = &(*prv)->next;
        }
+       pthread_mutex_unlock(&protows->mutex);
        free(call);
 }
 
@@ -505,7 +532,7 @@ static int client_msg_call_get(struct afb_proto_ws *protows, struct readbuf *rb,
        }
 
        /* get the call */
-       *call = client_call_search(protows, callid);
+       *call = client_call_search_unlocked(protows, callid);
        if (*call == NULL) {
                return 0;
        }
@@ -600,6 +627,7 @@ static void client_on_reply_fail(struct afb_proto_ws *protows, struct readbuf *r
 
        if (!client_msg_call_get(protows, rb, &call))
                return;
+       
 
        if (readbuf_string(rb, &status, NULL) && readbuf_string(rb, &info, NULL)) {
                protows->client_itf->on_reply_fail(protows->closure, call->request, status, info);
@@ -614,12 +642,19 @@ static int client_send_subcall_reply(struct afb_proto_ws *protows, uint32_t subc
 {
        struct writebuf wb = { .count = 0 };
        char ie = status < 0;
+       int rc;
 
-       return -!(writebuf_char(&wb, CHAR_FOR_SUBCALL_REPLY)
+       if (writebuf_char(&wb, CHAR_FOR_SUBCALL_REPLY)
         && writebuf_uint32(&wb, subcallid)
         && writebuf_char(&wb, ie)
-        && writebuf_object(&wb, object)
-        && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0);
+        && writebuf_object(&wb, object)) {
+               pthread_mutex_lock(&protows->mutex);
+               rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+               pthread_mutex_unlock(&protows->mutex);
+               if (rc >= 0)
+                       return 0;
+       }
+       return -1;
 }
 
 /* callback for subcall reply */
@@ -682,11 +717,15 @@ static void client_on_description(struct afb_proto_ws *protows, struct readbuf *
        struct json_object *object;
 
        if (readbuf_uint32(rb, &descid)) {
+               pthread_mutex_lock(&protows->mutex);
                prv = &protows->describes;
                while ((desc = *prv) && desc->descid != descid)
                        prv = &desc->next;
-               if (desc) {
+               if (!desc)
+                       pthread_mutex_unlock(&protows->mutex);
+               else {
                        *prv = desc->next;
+                       pthread_mutex_unlock(&protows->mutex);
                        if (!readbuf_object(rb, &object))
                                object = NULL;
                        desc->callback(desc->closure, object);
@@ -707,7 +746,6 @@ static void client_on_binary(void *closure, char *data, size_t size)
                rb.end = data + size;
                protows = closure;
 
-               pthread_mutex_lock(&protows->mutex);
                switch (*rb.head++) {
                case CHAR_FOR_ANSWER_SUCCESS: /* success */
                        client_on_reply_success(protows, &rb);
@@ -743,7 +781,6 @@ static void client_on_binary(void *closure, char *data, size_t size)
                        /* TODO: close the connection */
                        break;
                }
-               pthread_mutex_unlock(&protows->mutex);
        }
        free(rb.base);
 }
@@ -771,11 +808,12 @@ int afb_proto_ws_client_call(
        /* init call data */
        pthread_mutex_lock(&protows->mutex);
        call->callid = ptr2id(call);
-       while(client_call_search(protows, call->callid) != NULL)
+       while(client_call_search_locked(protows, call->callid) != NULL)
                call->callid++;
        call->protows = protows;
        call->next = protows->calls;
        protows->calls = call;
+       pthread_mutex_unlock(&protows->mutex);
 
        /* creates the call message */
        if (!writebuf_char(&wb, CHAR_FOR_CALL)
@@ -788,7 +826,9 @@ int afb_proto_ws_client_call(
        }
 
        /* send */
+       pthread_mutex_lock(&protows->mutex);
        rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+       pthread_mutex_unlock(&protows->mutex);
        if (rc >= 0) {
                rc = 0;
                goto end;
@@ -797,7 +837,6 @@ int afb_proto_ws_client_call(
 clean:
        client_call_destroy(call);
 end:
-       pthread_mutex_unlock(&protows->mutex);
        return rc;
 }
 
@@ -830,15 +869,15 @@ int afb_proto_ws_client_describe(struct afb_proto_ws *protows, void (*callback)(
        desc->protows = protows;
        desc->next = protows->describes;
        protows->describes = desc;
-       pthread_mutex_unlock(&protows->mutex);
 
        /* send */
        if (writebuf_char(&wb, CHAR_FOR_DESCRIBE)
         && writebuf_uint32(&wb, desc->descid)
-        && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0)
+        && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0) {
+               pthread_mutex_unlock(&protows->mutex);
                return 0;
+       }
 
-       pthread_mutex_lock(&protows->mutex);
        d = protows->describes;
        if (d == desc)
                protows->describes = desc->next;
@@ -848,8 +887,8 @@ int afb_proto_ws_client_describe(struct afb_proto_ws *protows, void (*callback)(
                if (d)
                        d->next = desc->next;
        }
-       free(desc);
        pthread_mutex_unlock(&protows->mutex);
+       free(desc);
 error:
        /* TODO? callback(closure, NULL); */
        return -1;
@@ -931,18 +970,25 @@ static void server_on_subcall_reply(struct afb_proto_ws *protows, struct readbuf
 
 static int server_send_description(struct afb_proto_ws *protows, uint32_t descid, struct json_object *descobj)
 {
+       int rc;
        struct writebuf wb = { .count = 0 };
 
-       return -!(writebuf_char(&wb, CHAR_FOR_DESCRIPTION)
-                && writebuf_uint32(&wb, descid)
-                && writebuf_object(&wb, descobj)
-                && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0);
+       if (writebuf_char(&wb, CHAR_FOR_DESCRIPTION)
+        && writebuf_uint32(&wb, descid)
+        && writebuf_object(&wb, descobj)) {
+               pthread_mutex_lock(&protows->mutex);
+               rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+               pthread_mutex_unlock(&protows->mutex);
+               if (rc >= 0)
+                       return 0;
+       }
+       return -1;
 }
 
 int afb_proto_ws_describe_put(struct afb_proto_ws_describe *describe, struct json_object *description)
 {
        int rc = server_send_description(describe->protows, describe->descid, description);
-       afb_proto_ws_addref(describe->protows);
+       afb_proto_ws_unref(describe->protows);
        free(describe);
        return rc;
 }
@@ -1005,12 +1051,19 @@ static void server_on_binary(void *closure, char *data, size_t size)
 static int server_event_send(struct afb_proto_ws *protows, char order, const char *event_name, int event_id, struct json_object *data)
 {
        struct writebuf wb = { .count = 0 };
+       int rc;
 
-       return -!(writebuf_char(&wb, order)
-                && (order == CHAR_FOR_EVT_BROADCAST || writebuf_uint32(&wb, event_id))
-                && writebuf_string(&wb, event_name)
-                && (order == CHAR_FOR_EVT_ADD || order == CHAR_FOR_EVT_DEL || writebuf_object(&wb, data))
-                && afb_ws_binary_v(protows->ws, wb.iovec, wb.count) >= 0);
+       if (writebuf_char(&wb, order)
+        && (order == CHAR_FOR_EVT_BROADCAST || writebuf_uint32(&wb, event_id))
+        && writebuf_string(&wb, event_name)
+        && (order == CHAR_FOR_EVT_ADD || order == CHAR_FOR_EVT_DEL || writebuf_object(&wb, data))) {
+               pthread_mutex_lock(&protows->mutex);
+               rc = afb_ws_binary_v(protows->ws, wb.iovec, wb.count);
+               pthread_mutex_unlock(&protows->mutex);
+               if (rc >= 0)
+                       return 0;
+       }
+       return -1;
 }
 
 int afb_proto_ws_server_event_create(struct afb_proto_ws *protows, const char *event_name, int event_id)