Update copyright dates
[src/app-framework-binder.git] / src / afb-wsj1.c
index 0a7dfd8..485099c 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016 IoT.bzh
+ * Copyright (C) 2015-2020 "IoT.bzh"
  * Author: José Bollo <jose.bollo@iot.bzh>
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
 #include <errno.h>
 #include <string.h>
 #include <stdio.h>
+#include <pthread.h>
 
 #include <json-c/json.h>
+#if !defined(JSON_C_TO_STRING_NOSLASHESCAPE)
+#define JSON_C_TO_STRING_NOSLASHESCAPE 0
+#endif
 
 #include "afb-ws.h"
 #include "afb-wsj1.h"
+#include "fdev.h"
 
 #define CALL 2
 #define RETOK 3
 #define RETERR 4
 #define EVENT 5
 
-static void wsj1_on_hangup(struct afb_wsj1 *ws);
-static void wsj1_on_text(struct afb_wsj1 *ws, char *text, size_t size);
+#define WEBSOCKET_CODE_POLICY_VIOLATION  1008
+#define WEBSOCKET_CODE_INTERNAL_ERROR    1011
+
+static void wsj1_on_hangup(struct afb_wsj1 *wsj1);
+static void wsj1_on_text(struct afb_wsj1 *wsj1, char *text, size_t size);
+static struct afb_wsj1_msg *wsj1_msg_make(struct afb_wsj1 *wsj1, char *text, size_t size);
 
 static struct afb_ws_itf wsj1_itf = {
        .on_hangup = (void*)wsj1_on_hangup,
@@ -57,13 +66,13 @@ struct afb_wsj1_msg
        struct afb_wsj1_msg *next, *previous;
        char *text;
        int code;
-       char *id;
-       char *api;
-       char *verb;
-       char *event;
-       char *object_s;
+       const char *id;
+       const char *api;
+       const char *verb;
+       const char *event;
+       const char *object_s;
        size_t object_s_length;
-       char *token;
+       const char *token;
        struct json_object *object_j;
 };
 
@@ -77,13 +86,16 @@ struct afb_wsj1
        struct afb_ws *ws;
        struct afb_wsj1_msg *messages;
        struct wsj1_call *calls;
+       pthread_mutex_t mutex;
 };
 
-struct afb_wsj1 *afb_wsj1_create(int fd, struct afb_wsj1_itf *itf, void *closure)
+struct afb_wsj1 *afb_wsj1_create(struct fdev *fdev, struct afb_wsj1_itf *itf, void *closure)
 {
        struct afb_wsj1 *result;
 
-       assert(fd >= 0);
+       assert(fdev);
+       assert(itf);
+       assert(itf->on_call);
 
        result = calloc(1, sizeof * result);
        if (result == NULL)
@@ -92,12 +104,13 @@ struct afb_wsj1 *afb_wsj1_create(int fd, struct afb_wsj1_itf *itf, void *closure
        result->refcount = 1;
        result->itf = itf;
        result->closure = closure;
+       pthread_mutex_init(&result->mutex, NULL);
 
        result->tokener = json_tokener_new();
        if (result->tokener == NULL)
                goto error2;
 
-       result->ws = afb_ws_create(fd, &wsj1_itf, result);
+       result->ws = afb_ws_create(fdev, &wsj1_itf, result);
        if (result->ws == NULL)
                goto error3;
 
@@ -108,35 +121,62 @@ error3:
 error2:
        free(result);
 error:
-       close(fd);
+       fdev_unref(fdev);
        return NULL;
 }
 
 void afb_wsj1_addref(struct afb_wsj1 *wsj1)
 {
-       if (wsj1 != NULL)
-               wsj1->refcount++;
+       if (wsj1)
+               __atomic_add_fetch(&wsj1->refcount, 1, __ATOMIC_RELAXED);
 }
 
 void afb_wsj1_unref(struct afb_wsj1 *wsj1)
 {
-       if (wsj1 != NULL && !--wsj1->refcount) {
+       if (wsj1 && !__atomic_sub_fetch(&wsj1->refcount, 1, __ATOMIC_RELAXED)) {
                afb_ws_destroy(wsj1->ws);
                json_tokener_free(wsj1->tokener);
                free(wsj1);
        }
 }
 
-static void wsj1_on_hangup(struct afb_wsj1 *ws)
+static void wsj1_on_hangup(struct afb_wsj1 *wsj1)
 {
-       if (ws->itf->on_hangup != NULL)
-               ws->itf->on_hangup(ws->closure);
+       struct wsj1_call *call, *ncall;
+       struct afb_wsj1_msg *msg;
+       char *text;
+       int len;
+
+       static const char error_object_str[] = "{"
+               "\"jtype\":\"afb-reply\","
+               "\"request\":{"
+                       "\"status\":\"disconnected\","
+                       "\"info\":\"server hung up\"}}";
+
+       ncall = __atomic_exchange_n(&wsj1->calls, NULL, __ATOMIC_RELAXED);
+       while (ncall) {
+               call = ncall;
+               ncall = call->next;
+               len = asprintf(&text, "[%d,\"%s\",%s]", RETERR, call->id, error_object_str);
+               if (len > 0) {
+                       msg = wsj1_msg_make(wsj1, text, (size_t)len);
+                       if (msg != NULL) {
+                               call->callback(call->closure, msg);
+                               afb_wsj1_msg_unref(msg);
+                       }
+               }
+               free(call);
+       }
+
+       if (wsj1->itf->on_hangup != NULL)
+               wsj1->itf->on_hangup(wsj1->closure, wsj1);
 }
 
 
-static struct wsj1_call *wsj1_call_search(struct afb_wsj1 *wsj1, const char *id, int remove)
+static struct wsj1_call *wsj1_locked_call_search(struct afb_wsj1 *wsj1, const char *id, int remove)
 {
        struct wsj1_call *r, **p;
+
        p = &wsj1->calls;
        while((r = *p) != NULL) {
                if (strcmp(r->id, id) == 0) {
@@ -146,6 +186,18 @@ static struct wsj1_call *wsj1_call_search(struct afb_wsj1 *wsj1, const char *id,
                }
                p = &r->next;
        }
+
+       return r;
+}
+
+static struct wsj1_call *wsj1_call_search(struct afb_wsj1 *wsj1, const char *id, int remove)
+{
+       struct wsj1_call *r;
+
+       pthread_mutex_lock(&wsj1->mutex);
+       r = wsj1_locked_call_search(wsj1, id, remove);
+       pthread_mutex_unlock(&wsj1->mutex);
+
        return r;
 }
 
@@ -155,15 +207,17 @@ static struct wsj1_call *wsj1_call_create(struct afb_wsj1 *wsj1, void (*on_reply
        if (call == NULL)
                errno = ENOMEM;
        else {
+               pthread_mutex_lock(&wsj1->mutex);
                do {
                        if (wsj1->genid == 0)
                                wsj1->genid = 999999;
                        sprintf(call->id, "%d", wsj1->genid--);
-               } while (wsj1_call_search(wsj1, call->id, 0) != NULL);
+               } while (wsj1_locked_call_search(wsj1, call->id, 0) != NULL);
                call->callback = on_reply;
                call->closure = closure;
                call->next = wsj1->calls;
                wsj1->calls = call;
+               pthread_mutex_unlock(&wsj1->mutex);
        }
        return call;
 }
@@ -237,25 +291,29 @@ static char *wsj1_msg_parse_string(char *text, size_t offset, size_t size)
        return wsj1_msg_parse_extract(text, offset, size);
 }
 
-static void wsj1_on_text(struct afb_wsj1 *wsj1, char *text, size_t size)
+static struct afb_wsj1_msg *wsj1_msg_make(struct afb_wsj1 *wsj1, char *text, size_t size)
 {
        size_t items[10][2];
        int n;
        struct afb_wsj1_msg *msg;
-       struct wsj1_call *call;
+       char *verb;
 
        /* allocate */
        msg = calloc(1, sizeof *msg);
-       if (msg == NULL)
+       if (msg == NULL) {
+               errno = ENOMEM;
                goto alloc_error;
+       }
 
        /* scan */
        n = wsj1_msg_scan(text, items);
-       if (n < 0)
+       if (n <= 0)
                goto bad_header;
 
        /* scans code: 2|3|4|5 */
-       if (items[0][1] != 1) goto bad_header;
+       if (items[0][1] != 1)
+               goto bad_header;
+
        switch (text[items[0][0]]) {
        case '2': msg->code = CALL; break;
        case '3': msg->code = RETOK; break;
@@ -270,9 +328,11 @@ static void wsj1_on_text(struct afb_wsj1 *wsj1, char *text, size_t size)
                if (n != 4 && n != 5) goto bad_header;
                msg->id = wsj1_msg_parse_string(text, items[1][0], items[1][1]);
                msg->api = wsj1_msg_parse_string(text, items[2][0], items[2][1]);
-               msg->verb = strchr(msg->api, '/');
-               if (msg->verb == NULL) goto bad_header;
-               *msg->verb++ = 0;
+               verb = strchr(msg->api, '/');
+               if (verb == NULL) goto bad_header;
+               *verb++ = 0;
+               if (!*verb || *verb == '/') goto bad_header;
+               msg->verb = verb;
                msg->object_s = wsj1_msg_parse_extract(text, items[3][0], items[3][1]);
                msg->object_s_length = items[3][1];
                msg->token = n == 5 ? wsj1_msg_parse_string(text, items[4][0], items[4][1]) : NULL;
@@ -281,8 +341,6 @@ static void wsj1_on_text(struct afb_wsj1 *wsj1, char *text, size_t size)
        case RETERR:
                if (n != 3 && n != 4) goto bad_header;
                msg->id = wsj1_msg_parse_string(text, items[1][0], items[1][1]);
-               call = wsj1_call_search(wsj1, msg->id, 1);
-               if (call == NULL) goto bad_header;
                msg->object_s = wsj1_msg_parse_extract(text, items[2][0], items[2][1]);
                msg->object_s_length = items[2][1];
                msg->token = n == 5 ? wsj1_msg_parse_string(text, items[3][0], items[3][1]) : NULL;
@@ -301,50 +359,78 @@ static void wsj1_on_text(struct afb_wsj1 *wsj1, char *text, size_t size)
        msg->refcount = 1;
        afb_wsj1_addref(wsj1);
        msg->wsj1 = wsj1;
+       pthread_mutex_lock(&wsj1->mutex);
        msg->next = wsj1->messages;
-       msg->next->previous = msg;
+       if (msg->next != NULL)
+               msg->next->previous = msg;
        wsj1->messages = msg;
+       pthread_mutex_unlock(&wsj1->mutex);
 
-       /* incoke the handler */
+       return msg;
+
+bad_header:
+       errno = EBADMSG;
+       free(msg);
+
+alloc_error:
+       free(text);
+       return NULL;
+}
+
+static void wsj1_on_text(struct afb_wsj1 *wsj1, char *text, size_t size)
+{
+       struct wsj1_call *call;
+       struct afb_wsj1_msg *msg;
+
+       /* allocate */
+       msg = wsj1_msg_make(wsj1, text, size);
+       if (msg == NULL) {
+               afb_ws_close(wsj1->ws, errno == EBADMSG
+                       ? WEBSOCKET_CODE_POLICY_VIOLATION
+                       : WEBSOCKET_CODE_INTERNAL_ERROR, NULL);
+               return;
+       }
+
+       /* handle the message */
        switch (msg->code) {
        case CALL:
                wsj1->itf->on_call(wsj1->closure, msg->api, msg->verb, msg);
                break;
        case RETOK:
        case RETERR:
-               call->callback(call->closure, msg);
+               call = wsj1_call_search(wsj1, msg->id, 1);
+               if (call == NULL)
+                       afb_ws_close(wsj1->ws, WEBSOCKET_CODE_POLICY_VIOLATION, NULL);
+               else
+                       call->callback(call->closure, msg);
                free(call);
                break;
        case EVENT:
-               wsj1->itf->on_event(wsj1->closure, msg->event, msg);
+               if (wsj1->itf->on_event != NULL)
+                       wsj1->itf->on_event(wsj1->closure, msg->event, msg);
                break;
        }
        afb_wsj1_msg_unref(msg);
-       return;
-
-bad_header:
-       free(msg);
-alloc_error:
-       free(text);
-       afb_ws_close(wsj1->ws, 1008, NULL);
 }
 
 void afb_wsj1_msg_addref(struct afb_wsj1_msg *msg)
 {
        if (msg != NULL)
-               msg->refcount++;
+               __atomic_add_fetch(&msg->refcount, 1, __ATOMIC_RELAXED);
 }
 
 void afb_wsj1_msg_unref(struct afb_wsj1_msg *msg)
 {
-       if (msg != NULL && --msg->refcount == 0) {
+       if (msg != NULL && !__atomic_sub_fetch(&msg->refcount, 1, __ATOMIC_RELAXED)) {
                /* unlink the message */
+               pthread_mutex_lock(&msg->wsj1->mutex);
                if (msg->next != NULL)
                        msg->next->previous = msg->previous;
                if (msg->previous == NULL)
                        msg->wsj1->messages = msg->next;
                else
                        msg->previous->next = msg->next;
+               pthread_mutex_unlock(&msg->wsj1->mutex);
                /* free ressources */
                afb_wsj1_unref(msg->wsj1);
                json_object_put(msg->object_j);
@@ -360,11 +446,15 @@ const char *afb_wsj1_msg_object_s(struct afb_wsj1_msg *msg)
 
 struct json_object *afb_wsj1_msg_object_j(struct afb_wsj1_msg *msg)
 {
+       enum json_tokener_error jerr;
        struct json_object *object = msg->object_j;
        if (object == NULL) {
+               pthread_mutex_lock(&msg->wsj1->mutex);
                json_tokener_reset(msg->wsj1->tokener);
-               object = json_tokener_parse_ex(msg->wsj1->tokener, msg->object_s, (int)msg->object_s_length);
-               if (object == NULL) {
+               object = json_tokener_parse_ex(msg->wsj1->tokener, msg->object_s, 1 + (int)msg->object_s_length);
+               jerr = json_tokener_get_error(msg->wsj1->tokener);
+               pthread_mutex_unlock(&msg->wsj1->mutex);
+               if (jerr != json_tokener_success) {
                        /* lazy error detection of json request. Is it to improve? */
                        object = json_object_new_string_len(msg->object_s, (int)msg->object_s_length);
                }
@@ -418,105 +508,34 @@ const char *afb_wsj1_msg_token(struct afb_wsj1_msg *msg)
        return msg->token;
 }
 
-
-
-
-
-
-
-
-
-
-#if 0
-
-
-
-
-
-static void wsj1_emit(struct afb_wsj1 *wsj1, int code, const char *id, size_t idlen, struct json_object *data, const char *token)
-{
-       json_object *msg;
-       const char *txt;
-
-       /* pack the message */
-       msg = json_object_new_array();
-       json_object_array_add(msg, json_object_new_int(code));
-       json_object_array_add(msg, json_object_new_string_len(id, (int)idlen));
-       json_object_array_add(msg, data);
-       if (token)
-               json_object_array_add(msg, json_object_new_string(token));
-
-       /* emits the reply */
-       txt = json_object_to_json_string(msg);
-       afb_ws_text(wsj1->ws, txt, strlen(txt));
-       json_object_put(msg);
-}
-
-static void wsj1_msg_reply(struct afb_wsj1_msg *msg, int retcode, const char *status, const char *info, json_object *resp)
-{
-       const char *token = afb_context_sent_token(&msg->context);
-       wsj1_emit(msg->wsj1, retcode, msg->id, msg->idlen, afb_msg_json_reply(status, info, resp, token, NULL), token);
-}
-
-static void wsj1_msg_fail(struct afb_wsj1_msg *msg, const char *status, const char *info)
-{
-       wsj1_msg_reply(msg, RETERR, status, info, NULL);
-}
-
-static void wsj1_msg_success(struct afb_wsj1_msg *msg, json_object *obj, const char *info)
+struct afb_wsj1 *afb_wsj1_msg_wsj1(struct afb_wsj1_msg *msg)
 {
-       wsj1_msg_reply(msg, RETOK, "success", info, obj);
+       return msg->wsj1;
 }
 
-static const char *wsj1_msg_raw(struct afb_wsj1_msg *msg, size_t *size)
+int afb_wsj1_close(struct afb_wsj1 *wsj1, uint16_t code, const char *text)
 {
-       *size = msg->objlen;
-       return msg->obj;
+       return afb_ws_close(wsj1->ws, code, text);
 }
 
-static void wsj1_msg_send(struct afb_wsj1_msg *msg, const char *buffer, size_t size)
-{
-       afb_ws_text(msg->wsj1->ws, buffer, size);
-}
-
-static void wsj1_send_event(struct afb_wsj1 *wsj1, const char *event, struct json_object *object)
-{
-       wsj1_emit(wsj1, EVENT, event, strlen(event), afb_msg_json_event(event, object), NULL);
-}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-#endif
-
-
 static int wsj1_send_isot(struct afb_wsj1 *wsj1, int i1, const char *s1, const char *o1, const char *t1)
 {
        char code[2] = { (char)('0' + i1), 0 };
-       return afb_ws_texts(wsj1->ws, "[", code, ",\"", s1, "\",", o1, t1 != NULL ? ",\"" : "]", t1, "\"]", NULL);
+       return afb_ws_texts(wsj1->ws, "[", code, ",\"", s1, "\",", o1 == NULL ? "null" : o1, t1 != NULL ? ",\"" : "]", t1, "\"]", NULL);
 }
 
 static int wsj1_send_issot(struct afb_wsj1 *wsj1, int i1, const char *s1, const char *s2, const char *o1, const char *t1)
 {
        char code[2] = { (char)('0' + i1), 0 };
-       return afb_ws_texts(wsj1->ws, "[", code, ",\"", s1, "\",\"", s2, "\",", o1, t1 != NULL ? ",\"" : "]", t1, "\"]", NULL);
+       return afb_ws_texts(wsj1->ws, "[", code, ",\"", s1, "\",\"", s2, "\",", o1 == NULL ? "null" : o1, t1 != NULL ? ",\"" : "]", t1, "\"]", NULL);
 }
 
 int afb_wsj1_send_event_j(struct afb_wsj1 *wsj1, const char *event, struct json_object *object)
 {
-       return afb_wsj1_send_event_s(wsj1, event, json_object_to_json_string(object));
+       const char *objstr = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN|JSON_C_TO_STRING_NOSLASHESCAPE);
+       int rc = afb_wsj1_send_event_s(wsj1, event, objstr);
+       json_object_put(object);
+       return rc;
 }
 
 int afb_wsj1_send_event_s(struct afb_wsj1 *wsj1, const char *event, const char *object)
@@ -526,7 +545,10 @@ int afb_wsj1_send_event_s(struct afb_wsj1 *wsj1, const char *event, const char *
 
 int afb_wsj1_call_j(struct afb_wsj1 *wsj1, const char *api, const char *verb, struct json_object *object, void (*on_reply)(void *closure, struct afb_wsj1_msg *msg), void *closure)
 {
-       return afb_wsj1_call_s(wsj1, api, verb, json_object_to_json_string(object), on_reply, closure);
+       const char *objstr = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN|JSON_C_TO_STRING_NOSLASHESCAPE);
+       int rc = afb_wsj1_call_s(wsj1, api, verb, objstr, on_reply, closure);
+       json_object_put(object);
+       return rc;
 }
 
 int afb_wsj1_call_s(struct afb_wsj1 *wsj1, const char *api, const char *verb, const char *object, void (*on_reply)(void *closure, struct afb_wsj1_msg *msg), void *closure)
@@ -555,24 +577,16 @@ int afb_wsj1_call_s(struct afb_wsj1 *wsj1, const char *api, const char *verb, co
        return rc;
 }
 
-
-int afb_wsj1_reply_ok_j(struct afb_wsj1_msg *msg, struct json_object *object, const char *token)
+int afb_wsj1_reply_j(struct afb_wsj1_msg *msg, struct json_object *object, const char *token, int iserror)
 {
-       return afb_wsj1_reply_ok_s(msg, json_object_to_json_string(object), token);
-}
-
-int afb_wsj1_reply_ok_s(struct afb_wsj1_msg *msg, const char *object, const char *token)
-{
-       return wsj1_send_isot(msg->wsj1, RETOK, msg->id, object, token);
-}
-
-int afb_wsj1_reply_error_j(struct afb_wsj1_msg *msg, struct json_object *object, const char *token)
-{
-       return afb_wsj1_reply_error_s(msg, json_object_to_json_string(object), token);
+       const char *objstr = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN|JSON_C_TO_STRING_NOSLASHESCAPE);
+       int rc = afb_wsj1_reply_s(msg, objstr, token, iserror);
+       json_object_put(object);
+       return rc;
 }
 
-int afb_wsj1_reply_error_s(struct afb_wsj1_msg *msg, const char *object, const char *token)
+int afb_wsj1_reply_s(struct afb_wsj1_msg *msg, const char *object, const char *token, int iserror)
 {
-       return wsj1_send_isot(msg->wsj1, RETERR, msg->id, object, token);
+       return wsj1_send_isot(msg->wsj1, iserror ? RETERR : RETOK, msg->id, object, token);
 }