X-Git-Url: https://gerrit.automotivelinux.org/gerrit/gitweb?p=src%2Fapp-framework-binder.git;a=blobdiff_plain;f=src%2Fafb-wsj1.c;h=485099cd778f8608a7bd40a0dde05099b66c95c0;hp=cf31a57ab6f83fa6908f414ecc48fb5bd46bd29d;hb=65353dce81a629e042800bb7b86fcd869a76727e;hpb=c7e9786d408f13d8f8f43c6b68da916bbb1ed5f3 diff --git a/src/afb-wsj1.c b/src/afb-wsj1.c index cf31a57a..485099cd 100644 --- a/src/afb-wsj1.c +++ b/src/afb-wsj1.c @@ -1,5 +1,5 @@ /* - * Copyright (C) 2016 "IoT.bzh" + * Copyright (C) 2015-2020 "IoT.bzh" * Author: José Bollo * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -23,19 +23,28 @@ #include #include #include +#include #include +#if !defined(JSON_C_TO_STRING_NOSLASHESCAPE) +#define JSON_C_TO_STRING_NOSLASHESCAPE 0 +#endif #include "afb-ws.h" #include "afb-wsj1.h" +#include "fdev.h" #define CALL 2 #define RETOK 3 #define RETERR 4 #define EVENT 5 +#define WEBSOCKET_CODE_POLICY_VIOLATION 1008 +#define WEBSOCKET_CODE_INTERNAL_ERROR 1011 + static void wsj1_on_hangup(struct afb_wsj1 *wsj1); static void wsj1_on_text(struct afb_wsj1 *wsj1, char *text, size_t size); +static struct afb_wsj1_msg *wsj1_msg_make(struct afb_wsj1 *wsj1, char *text, size_t size); static struct afb_ws_itf wsj1_itf = { .on_hangup = (void*)wsj1_on_hangup, @@ -57,13 +66,13 @@ struct afb_wsj1_msg struct afb_wsj1_msg *next, *previous; char *text; int code; - char *id; - char *api; - char *verb; - char *event; - char *object_s; + const char *id; + const char *api; + const char *verb; + const char *event; + const char *object_s; size_t object_s_length; - char *token; + const char *token; struct json_object *object_j; }; @@ -77,13 +86,16 @@ struct afb_wsj1 struct afb_ws *ws; struct afb_wsj1_msg *messages; struct wsj1_call *calls; + pthread_mutex_t mutex; }; -struct afb_wsj1 *afb_wsj1_create(int fd, struct afb_wsj1_itf *itf, void *closure) +struct afb_wsj1 *afb_wsj1_create(struct fdev *fdev, struct afb_wsj1_itf *itf, void *closure) { struct afb_wsj1 *result; - assert(fd >= 0); + assert(fdev); + assert(itf); + assert(itf->on_call); result = calloc(1, sizeof * result); if (result == NULL) @@ -92,12 +104,13 @@ struct afb_wsj1 *afb_wsj1_create(int fd, struct afb_wsj1_itf *itf, void *closure result->refcount = 1; result->itf = itf; result->closure = closure; + pthread_mutex_init(&result->mutex, NULL); result->tokener = json_tokener_new(); if (result->tokener == NULL) goto error2; - result->ws = afb_ws_create(fd, &wsj1_itf, result); + result->ws = afb_ws_create(fdev, &wsj1_itf, result); if (result->ws == NULL) goto error3; @@ -108,19 +121,19 @@ error3: error2: free(result); error: - close(fd); + fdev_unref(fdev); return NULL; } void afb_wsj1_addref(struct afb_wsj1 *wsj1) { - if (wsj1 != NULL) - wsj1->refcount++; + if (wsj1) + __atomic_add_fetch(&wsj1->refcount, 1, __ATOMIC_RELAXED); } void afb_wsj1_unref(struct afb_wsj1 *wsj1) { - if (wsj1 != NULL && !--wsj1->refcount) { + if (wsj1 && !__atomic_sub_fetch(&wsj1->refcount, 1, __ATOMIC_RELAXED)) { afb_ws_destroy(wsj1->ws); json_tokener_free(wsj1->tokener); free(wsj1); @@ -129,14 +142,41 @@ void afb_wsj1_unref(struct afb_wsj1 *wsj1) static void wsj1_on_hangup(struct afb_wsj1 *wsj1) { + struct wsj1_call *call, *ncall; + struct afb_wsj1_msg *msg; + char *text; + int len; + + static const char error_object_str[] = "{" + "\"jtype\":\"afb-reply\"," + "\"request\":{" + "\"status\":\"disconnected\"," + "\"info\":\"server hung up\"}}"; + + ncall = __atomic_exchange_n(&wsj1->calls, NULL, __ATOMIC_RELAXED); + while (ncall) { + call = ncall; + ncall = call->next; + len = asprintf(&text, "[%d,\"%s\",%s]", RETERR, call->id, error_object_str); + if (len > 0) { + msg = wsj1_msg_make(wsj1, text, (size_t)len); + if (msg != NULL) { + call->callback(call->closure, msg); + afb_wsj1_msg_unref(msg); + } + } + free(call); + } + if (wsj1->itf->on_hangup != NULL) wsj1->itf->on_hangup(wsj1->closure, wsj1); } -static struct wsj1_call *wsj1_call_search(struct afb_wsj1 *wsj1, const char *id, int remove) +static struct wsj1_call *wsj1_locked_call_search(struct afb_wsj1 *wsj1, const char *id, int remove) { struct wsj1_call *r, **p; + p = &wsj1->calls; while((r = *p) != NULL) { if (strcmp(r->id, id) == 0) { @@ -146,6 +186,18 @@ static struct wsj1_call *wsj1_call_search(struct afb_wsj1 *wsj1, const char *id, } p = &r->next; } + + return r; +} + +static struct wsj1_call *wsj1_call_search(struct afb_wsj1 *wsj1, const char *id, int remove) +{ + struct wsj1_call *r; + + pthread_mutex_lock(&wsj1->mutex); + r = wsj1_locked_call_search(wsj1, id, remove); + pthread_mutex_unlock(&wsj1->mutex); + return r; } @@ -155,15 +207,17 @@ static struct wsj1_call *wsj1_call_create(struct afb_wsj1 *wsj1, void (*on_reply if (call == NULL) errno = ENOMEM; else { + pthread_mutex_lock(&wsj1->mutex); do { if (wsj1->genid == 0) wsj1->genid = 999999; sprintf(call->id, "%d", wsj1->genid--); - } while (wsj1_call_search(wsj1, call->id, 0) != NULL); + } while (wsj1_locked_call_search(wsj1, call->id, 0) != NULL); call->callback = on_reply; call->closure = closure; call->next = wsj1->calls; wsj1->calls = call; + pthread_mutex_unlock(&wsj1->mutex); } return call; } @@ -237,25 +291,29 @@ static char *wsj1_msg_parse_string(char *text, size_t offset, size_t size) return wsj1_msg_parse_extract(text, offset, size); } -static void wsj1_on_text(struct afb_wsj1 *wsj1, char *text, size_t size) +static struct afb_wsj1_msg *wsj1_msg_make(struct afb_wsj1 *wsj1, char *text, size_t size) { size_t items[10][2]; int n; struct afb_wsj1_msg *msg; - struct wsj1_call *call; + char *verb; /* allocate */ msg = calloc(1, sizeof *msg); - if (msg == NULL) + if (msg == NULL) { + errno = ENOMEM; goto alloc_error; + } /* scan */ n = wsj1_msg_scan(text, items); - if (n < 0) + if (n <= 0) goto bad_header; /* scans code: 2|3|4|5 */ - if (items[0][1] != 1) goto bad_header; + if (items[0][1] != 1) + goto bad_header; + switch (text[items[0][0]]) { case '2': msg->code = CALL; break; case '3': msg->code = RETOK; break; @@ -270,9 +328,11 @@ static void wsj1_on_text(struct afb_wsj1 *wsj1, char *text, size_t size) if (n != 4 && n != 5) goto bad_header; msg->id = wsj1_msg_parse_string(text, items[1][0], items[1][1]); msg->api = wsj1_msg_parse_string(text, items[2][0], items[2][1]); - msg->verb = strchr(msg->api, '/'); - if (msg->verb == NULL) goto bad_header; - *msg->verb++ = 0; + verb = strchr(msg->api, '/'); + if (verb == NULL) goto bad_header; + *verb++ = 0; + if (!*verb || *verb == '/') goto bad_header; + msg->verb = verb; msg->object_s = wsj1_msg_parse_extract(text, items[3][0], items[3][1]); msg->object_s_length = items[3][1]; msg->token = n == 5 ? wsj1_msg_parse_string(text, items[4][0], items[4][1]) : NULL; @@ -281,8 +341,6 @@ static void wsj1_on_text(struct afb_wsj1 *wsj1, char *text, size_t size) case RETERR: if (n != 3 && n != 4) goto bad_header; msg->id = wsj1_msg_parse_string(text, items[1][0], items[1][1]); - call = wsj1_call_search(wsj1, msg->id, 1); - if (call == NULL) goto bad_header; msg->object_s = wsj1_msg_parse_extract(text, items[2][0], items[2][1]); msg->object_s_length = items[2][1]; msg->token = n == 5 ? wsj1_msg_parse_string(text, items[3][0], items[3][1]) : NULL; @@ -301,51 +359,78 @@ static void wsj1_on_text(struct afb_wsj1 *wsj1, char *text, size_t size) msg->refcount = 1; afb_wsj1_addref(wsj1); msg->wsj1 = wsj1; + pthread_mutex_lock(&wsj1->mutex); msg->next = wsj1->messages; if (msg->next != NULL) msg->next->previous = msg; wsj1->messages = msg; + pthread_mutex_unlock(&wsj1->mutex); + + return msg; + +bad_header: + errno = EBADMSG; + free(msg); + +alloc_error: + free(text); + return NULL; +} + +static void wsj1_on_text(struct afb_wsj1 *wsj1, char *text, size_t size) +{ + struct wsj1_call *call; + struct afb_wsj1_msg *msg; + + /* allocate */ + msg = wsj1_msg_make(wsj1, text, size); + if (msg == NULL) { + afb_ws_close(wsj1->ws, errno == EBADMSG + ? WEBSOCKET_CODE_POLICY_VIOLATION + : WEBSOCKET_CODE_INTERNAL_ERROR, NULL); + return; + } - /* incoke the handler */ + /* handle the message */ switch (msg->code) { case CALL: wsj1->itf->on_call(wsj1->closure, msg->api, msg->verb, msg); break; case RETOK: case RETERR: - call->callback(call->closure, msg); + call = wsj1_call_search(wsj1, msg->id, 1); + if (call == NULL) + afb_ws_close(wsj1->ws, WEBSOCKET_CODE_POLICY_VIOLATION, NULL); + else + call->callback(call->closure, msg); free(call); break; case EVENT: - wsj1->itf->on_event(wsj1->closure, msg->event, msg); + if (wsj1->itf->on_event != NULL) + wsj1->itf->on_event(wsj1->closure, msg->event, msg); break; } afb_wsj1_msg_unref(msg); - return; - -bad_header: - free(msg); -alloc_error: - free(text); - afb_ws_close(wsj1->ws, 1008, NULL); } void afb_wsj1_msg_addref(struct afb_wsj1_msg *msg) { if (msg != NULL) - msg->refcount++; + __atomic_add_fetch(&msg->refcount, 1, __ATOMIC_RELAXED); } void afb_wsj1_msg_unref(struct afb_wsj1_msg *msg) { - if (msg != NULL && --msg->refcount == 0) { + if (msg != NULL && !__atomic_sub_fetch(&msg->refcount, 1, __ATOMIC_RELAXED)) { /* unlink the message */ + pthread_mutex_lock(&msg->wsj1->mutex); if (msg->next != NULL) msg->next->previous = msg->previous; if (msg->previous == NULL) msg->wsj1->messages = msg->next; else msg->previous->next = msg->next; + pthread_mutex_unlock(&msg->wsj1->mutex); /* free ressources */ afb_wsj1_unref(msg->wsj1); json_object_put(msg->object_j); @@ -361,11 +446,15 @@ const char *afb_wsj1_msg_object_s(struct afb_wsj1_msg *msg) struct json_object *afb_wsj1_msg_object_j(struct afb_wsj1_msg *msg) { + enum json_tokener_error jerr; struct json_object *object = msg->object_j; if (object == NULL) { + pthread_mutex_lock(&msg->wsj1->mutex); json_tokener_reset(msg->wsj1->tokener); - object = json_tokener_parse_ex(msg->wsj1->tokener, msg->object_s, (int)msg->object_s_length); - if (object == NULL) { + object = json_tokener_parse_ex(msg->wsj1->tokener, msg->object_s, 1 + (int)msg->object_s_length); + jerr = json_tokener_get_error(msg->wsj1->tokener); + pthread_mutex_unlock(&msg->wsj1->mutex); + if (jerr != json_tokener_success) { /* lazy error detection of json request. Is it to improve? */ object = json_object_new_string_len(msg->object_s, (int)msg->object_s_length); } @@ -424,21 +513,29 @@ struct afb_wsj1 *afb_wsj1_msg_wsj1(struct afb_wsj1_msg *msg) return msg->wsj1; } +int afb_wsj1_close(struct afb_wsj1 *wsj1, uint16_t code, const char *text) +{ + return afb_ws_close(wsj1->ws, code, text); +} + static int wsj1_send_isot(struct afb_wsj1 *wsj1, int i1, const char *s1, const char *o1, const char *t1) { char code[2] = { (char)('0' + i1), 0 }; - return afb_ws_texts(wsj1->ws, "[", code, ",\"", s1, "\",", o1, t1 != NULL ? ",\"" : "]", t1, "\"]", NULL); + return afb_ws_texts(wsj1->ws, "[", code, ",\"", s1, "\",", o1 == NULL ? "null" : o1, t1 != NULL ? ",\"" : "]", t1, "\"]", NULL); } static int wsj1_send_issot(struct afb_wsj1 *wsj1, int i1, const char *s1, const char *s2, const char *o1, const char *t1) { char code[2] = { (char)('0' + i1), 0 }; - return afb_ws_texts(wsj1->ws, "[", code, ",\"", s1, "\",\"", s2, "\",", o1, t1 != NULL ? ",\"" : "]", t1, "\"]", NULL); + return afb_ws_texts(wsj1->ws, "[", code, ",\"", s1, "\",\"", s2, "\",", o1 == NULL ? "null" : o1, t1 != NULL ? ",\"" : "]", t1, "\"]", NULL); } int afb_wsj1_send_event_j(struct afb_wsj1 *wsj1, const char *event, struct json_object *object) { - return afb_wsj1_send_event_s(wsj1, event, json_object_to_json_string(object)); + const char *objstr = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN|JSON_C_TO_STRING_NOSLASHESCAPE); + int rc = afb_wsj1_send_event_s(wsj1, event, objstr); + json_object_put(object); + return rc; } int afb_wsj1_send_event_s(struct afb_wsj1 *wsj1, const char *event, const char *object) @@ -448,7 +545,10 @@ int afb_wsj1_send_event_s(struct afb_wsj1 *wsj1, const char *event, const char * int afb_wsj1_call_j(struct afb_wsj1 *wsj1, const char *api, const char *verb, struct json_object *object, void (*on_reply)(void *closure, struct afb_wsj1_msg *msg), void *closure) { - return afb_wsj1_call_s(wsj1, api, verb, json_object_to_json_string(object), on_reply, closure); + const char *objstr = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN|JSON_C_TO_STRING_NOSLASHESCAPE); + int rc = afb_wsj1_call_s(wsj1, api, verb, objstr, on_reply, closure); + json_object_put(object); + return rc; } int afb_wsj1_call_s(struct afb_wsj1 *wsj1, const char *api, const char *verb, const char *object, void (*on_reply)(void *closure, struct afb_wsj1_msg *msg), void *closure) @@ -477,24 +577,16 @@ int afb_wsj1_call_s(struct afb_wsj1 *wsj1, const char *api, const char *verb, co return rc; } - -int afb_wsj1_reply_ok_j(struct afb_wsj1_msg *msg, struct json_object *object, const char *token) -{ - return afb_wsj1_reply_ok_s(msg, json_object_to_json_string(object), token); -} - -int afb_wsj1_reply_ok_s(struct afb_wsj1_msg *msg, const char *object, const char *token) +int afb_wsj1_reply_j(struct afb_wsj1_msg *msg, struct json_object *object, const char *token, int iserror) { - return wsj1_send_isot(msg->wsj1, RETOK, msg->id, object, token); -} - -int afb_wsj1_reply_error_j(struct afb_wsj1_msg *msg, struct json_object *object, const char *token) -{ - return afb_wsj1_reply_error_s(msg, json_object_to_json_string(object), token); + const char *objstr = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN|JSON_C_TO_STRING_NOSLASHESCAPE); + int rc = afb_wsj1_reply_s(msg, objstr, token, iserror); + json_object_put(object); + return rc; } -int afb_wsj1_reply_error_s(struct afb_wsj1_msg *msg, const char *object, const char *token) +int afb_wsj1_reply_s(struct afb_wsj1_msg *msg, const char *object, const char *token, int iserror) { - return wsj1_send_isot(msg->wsj1, RETERR, msg->id, object, token); + return wsj1_send_isot(msg->wsj1, iserror ? RETERR : RETOK, msg->id, object, token); }