/*
- * Copyright 2016 IoT.bzh
+ * Copyright (C) 2015-2020 "IoT.bzh"
* Author: José Bollo <jose.bollo@iot.bzh>
*
* Licensed under the Apache License, Version 2.0 (the "License");
#include <errno.h>
#include <string.h>
#include <stdio.h>
+#include <pthread.h>
#include <json-c/json.h>
+#if !defined(JSON_C_TO_STRING_NOSLASHESCAPE)
+#define JSON_C_TO_STRING_NOSLASHESCAPE 0
+#endif
#include "afb-ws.h"
#include "afb-wsj1.h"
+#include "fdev.h"
#define CALL 2
#define RETOK 3
#define RETERR 4
#define EVENT 5
+#define WEBSOCKET_CODE_POLICY_VIOLATION 1008
+#define WEBSOCKET_CODE_INTERNAL_ERROR 1011
+
static void wsj1_on_hangup(struct afb_wsj1 *wsj1);
static void wsj1_on_text(struct afb_wsj1 *wsj1, char *text, size_t size);
+static struct afb_wsj1_msg *wsj1_msg_make(struct afb_wsj1 *wsj1, char *text, size_t size);
static struct afb_ws_itf wsj1_itf = {
.on_hangup = (void*)wsj1_on_hangup,
struct afb_wsj1_msg *next, *previous;
char *text;
int code;
- char *id;
- char *api;
- char *verb;
- char *event;
- char *object_s;
+ const char *id;
+ const char *api;
+ const char *verb;
+ const char *event;
+ const char *object_s;
size_t object_s_length;
- char *token;
+ const char *token;
struct json_object *object_j;
};
struct afb_ws *ws;
struct afb_wsj1_msg *messages;
struct wsj1_call *calls;
+ pthread_mutex_t mutex;
};
-struct afb_wsj1 *afb_wsj1_create(int fd, struct afb_wsj1_itf *itf, void *closure)
+struct afb_wsj1 *afb_wsj1_create(struct fdev *fdev, struct afb_wsj1_itf *itf, void *closure)
{
struct afb_wsj1 *result;
- assert(fd >= 0);
+ assert(fdev);
+ assert(itf);
+ assert(itf->on_call);
result = calloc(1, sizeof * result);
if (result == NULL)
result->refcount = 1;
result->itf = itf;
result->closure = closure;
+ pthread_mutex_init(&result->mutex, NULL);
result->tokener = json_tokener_new();
if (result->tokener == NULL)
goto error2;
- result->ws = afb_ws_create(fd, &wsj1_itf, result);
+ result->ws = afb_ws_create(fdev, &wsj1_itf, result);
if (result->ws == NULL)
goto error3;
error2:
free(result);
error:
- close(fd);
+ fdev_unref(fdev);
return NULL;
}
void afb_wsj1_addref(struct afb_wsj1 *wsj1)
{
- if (wsj1 != NULL)
- wsj1->refcount++;
+ if (wsj1)
+ __atomic_add_fetch(&wsj1->refcount, 1, __ATOMIC_RELAXED);
}
void afb_wsj1_unref(struct afb_wsj1 *wsj1)
{
- if (wsj1 != NULL && !--wsj1->refcount) {
+ if (wsj1 && !__atomic_sub_fetch(&wsj1->refcount, 1, __ATOMIC_RELAXED)) {
afb_ws_destroy(wsj1->ws);
json_tokener_free(wsj1->tokener);
free(wsj1);
static void wsj1_on_hangup(struct afb_wsj1 *wsj1)
{
+ struct wsj1_call *call, *ncall;
+ struct afb_wsj1_msg *msg;
+ char *text;
+ int len;
+
+ static const char error_object_str[] = "{"
+ "\"jtype\":\"afb-reply\","
+ "\"request\":{"
+ "\"status\":\"disconnected\","
+ "\"info\":\"server hung up\"}}";
+
+ ncall = __atomic_exchange_n(&wsj1->calls, NULL, __ATOMIC_RELAXED);
+ while (ncall) {
+ call = ncall;
+ ncall = call->next;
+ len = asprintf(&text, "[%d,\"%s\",%s]", RETERR, call->id, error_object_str);
+ if (len > 0) {
+ msg = wsj1_msg_make(wsj1, text, (size_t)len);
+ if (msg != NULL) {
+ call->callback(call->closure, msg);
+ afb_wsj1_msg_unref(msg);
+ }
+ }
+ free(call);
+ }
+
if (wsj1->itf->on_hangup != NULL)
wsj1->itf->on_hangup(wsj1->closure, wsj1);
}
-static struct wsj1_call *wsj1_call_search(struct afb_wsj1 *wsj1, const char *id, int remove)
+static struct wsj1_call *wsj1_locked_call_search(struct afb_wsj1 *wsj1, const char *id, int remove)
{
struct wsj1_call *r, **p;
+
p = &wsj1->calls;
while((r = *p) != NULL) {
if (strcmp(r->id, id) == 0) {
}
p = &r->next;
}
+
+ return r;
+}
+
+static struct wsj1_call *wsj1_call_search(struct afb_wsj1 *wsj1, const char *id, int remove)
+{
+ struct wsj1_call *r;
+
+ pthread_mutex_lock(&wsj1->mutex);
+ r = wsj1_locked_call_search(wsj1, id, remove);
+ pthread_mutex_unlock(&wsj1->mutex);
+
return r;
}
if (call == NULL)
errno = ENOMEM;
else {
+ pthread_mutex_lock(&wsj1->mutex);
do {
if (wsj1->genid == 0)
wsj1->genid = 999999;
sprintf(call->id, "%d", wsj1->genid--);
- } while (wsj1_call_search(wsj1, call->id, 0) != NULL);
+ } while (wsj1_locked_call_search(wsj1, call->id, 0) != NULL);
call->callback = on_reply;
call->closure = closure;
call->next = wsj1->calls;
wsj1->calls = call;
+ pthread_mutex_unlock(&wsj1->mutex);
}
return call;
}
return wsj1_msg_parse_extract(text, offset, size);
}
-static void wsj1_on_text(struct afb_wsj1 *wsj1, char *text, size_t size)
+static struct afb_wsj1_msg *wsj1_msg_make(struct afb_wsj1 *wsj1, char *text, size_t size)
{
size_t items[10][2];
int n;
struct afb_wsj1_msg *msg;
- struct wsj1_call *call;
+ char *verb;
/* allocate */
msg = calloc(1, sizeof *msg);
- if (msg == NULL)
+ if (msg == NULL) {
+ errno = ENOMEM;
goto alloc_error;
+ }
/* scan */
n = wsj1_msg_scan(text, items);
- if (n < 0)
+ if (n <= 0)
goto bad_header;
/* scans code: 2|3|4|5 */
- if (items[0][1] != 1) goto bad_header;
+ if (items[0][1] != 1)
+ goto bad_header;
+
switch (text[items[0][0]]) {
case '2': msg->code = CALL; break;
case '3': msg->code = RETOK; break;
if (n != 4 && n != 5) goto bad_header;
msg->id = wsj1_msg_parse_string(text, items[1][0], items[1][1]);
msg->api = wsj1_msg_parse_string(text, items[2][0], items[2][1]);
- msg->verb = strchr(msg->api, '/');
- if (msg->verb == NULL) goto bad_header;
- *msg->verb++ = 0;
+ verb = strchr(msg->api, '/');
+ if (verb == NULL) goto bad_header;
+ *verb++ = 0;
+ if (!*verb || *verb == '/') goto bad_header;
+ msg->verb = verb;
msg->object_s = wsj1_msg_parse_extract(text, items[3][0], items[3][1]);
msg->object_s_length = items[3][1];
msg->token = n == 5 ? wsj1_msg_parse_string(text, items[4][0], items[4][1]) : NULL;
case RETERR:
if (n != 3 && n != 4) goto bad_header;
msg->id = wsj1_msg_parse_string(text, items[1][0], items[1][1]);
- call = wsj1_call_search(wsj1, msg->id, 1);
- if (call == NULL) goto bad_header;
msg->object_s = wsj1_msg_parse_extract(text, items[2][0], items[2][1]);
msg->object_s_length = items[2][1];
msg->token = n == 5 ? wsj1_msg_parse_string(text, items[3][0], items[3][1]) : NULL;
msg->refcount = 1;
afb_wsj1_addref(wsj1);
msg->wsj1 = wsj1;
+ pthread_mutex_lock(&wsj1->mutex);
msg->next = wsj1->messages;
if (msg->next != NULL)
msg->next->previous = msg;
wsj1->messages = msg;
+ pthread_mutex_unlock(&wsj1->mutex);
+
+ return msg;
+
+bad_header:
+ errno = EBADMSG;
+ free(msg);
+
+alloc_error:
+ free(text);
+ return NULL;
+}
+
+static void wsj1_on_text(struct afb_wsj1 *wsj1, char *text, size_t size)
+{
+ struct wsj1_call *call;
+ struct afb_wsj1_msg *msg;
+
+ /* allocate */
+ msg = wsj1_msg_make(wsj1, text, size);
+ if (msg == NULL) {
+ afb_ws_close(wsj1->ws, errno == EBADMSG
+ ? WEBSOCKET_CODE_POLICY_VIOLATION
+ : WEBSOCKET_CODE_INTERNAL_ERROR, NULL);
+ return;
+ }
- /* incoke the handler */
+ /* handle the message */
switch (msg->code) {
case CALL:
wsj1->itf->on_call(wsj1->closure, msg->api, msg->verb, msg);
break;
case RETOK:
case RETERR:
- call->callback(call->closure, msg);
+ call = wsj1_call_search(wsj1, msg->id, 1);
+ if (call == NULL)
+ afb_ws_close(wsj1->ws, WEBSOCKET_CODE_POLICY_VIOLATION, NULL);
+ else
+ call->callback(call->closure, msg);
free(call);
break;
case EVENT:
- wsj1->itf->on_event(wsj1->closure, msg->event, msg);
+ if (wsj1->itf->on_event != NULL)
+ wsj1->itf->on_event(wsj1->closure, msg->event, msg);
break;
}
afb_wsj1_msg_unref(msg);
- return;
-
-bad_header:
- free(msg);
-alloc_error:
- free(text);
- afb_ws_close(wsj1->ws, 1008, NULL);
}
void afb_wsj1_msg_addref(struct afb_wsj1_msg *msg)
{
if (msg != NULL)
- msg->refcount++;
+ __atomic_add_fetch(&msg->refcount, 1, __ATOMIC_RELAXED);
}
void afb_wsj1_msg_unref(struct afb_wsj1_msg *msg)
{
- if (msg != NULL && --msg->refcount == 0) {
+ if (msg != NULL && !__atomic_sub_fetch(&msg->refcount, 1, __ATOMIC_RELAXED)) {
/* unlink the message */
+ pthread_mutex_lock(&msg->wsj1->mutex);
if (msg->next != NULL)
msg->next->previous = msg->previous;
if (msg->previous == NULL)
msg->wsj1->messages = msg->next;
else
msg->previous->next = msg->next;
+ pthread_mutex_unlock(&msg->wsj1->mutex);
/* free ressources */
afb_wsj1_unref(msg->wsj1);
json_object_put(msg->object_j);
struct json_object *afb_wsj1_msg_object_j(struct afb_wsj1_msg *msg)
{
+ enum json_tokener_error jerr;
struct json_object *object = msg->object_j;
if (object == NULL) {
+ pthread_mutex_lock(&msg->wsj1->mutex);
json_tokener_reset(msg->wsj1->tokener);
- object = json_tokener_parse_ex(msg->wsj1->tokener, msg->object_s, (int)msg->object_s_length);
- if (object == NULL) {
+ object = json_tokener_parse_ex(msg->wsj1->tokener, msg->object_s, 1 + (int)msg->object_s_length);
+ jerr = json_tokener_get_error(msg->wsj1->tokener);
+ pthread_mutex_unlock(&msg->wsj1->mutex);
+ if (jerr != json_tokener_success) {
/* lazy error detection of json request. Is it to improve? */
object = json_object_new_string_len(msg->object_s, (int)msg->object_s_length);
}
return msg->wsj1;
}
+int afb_wsj1_close(struct afb_wsj1 *wsj1, uint16_t code, const char *text)
+{
+ return afb_ws_close(wsj1->ws, code, text);
+}
+
static int wsj1_send_isot(struct afb_wsj1 *wsj1, int i1, const char *s1, const char *o1, const char *t1)
{
char code[2] = { (char)('0' + i1), 0 };
- return afb_ws_texts(wsj1->ws, "[", code, ",\"", s1, "\",", o1, t1 != NULL ? ",\"" : "]", t1, "\"]", NULL);
+ return afb_ws_texts(wsj1->ws, "[", code, ",\"", s1, "\",", o1 == NULL ? "null" : o1, t1 != NULL ? ",\"" : "]", t1, "\"]", NULL);
}
static int wsj1_send_issot(struct afb_wsj1 *wsj1, int i1, const char *s1, const char *s2, const char *o1, const char *t1)
{
char code[2] = { (char)('0' + i1), 0 };
- return afb_ws_texts(wsj1->ws, "[", code, ",\"", s1, "\",\"", s2, "\",", o1, t1 != NULL ? ",\"" : "]", t1, "\"]", NULL);
+ return afb_ws_texts(wsj1->ws, "[", code, ",\"", s1, "\",\"", s2, "\",", o1 == NULL ? "null" : o1, t1 != NULL ? ",\"" : "]", t1, "\"]", NULL);
}
int afb_wsj1_send_event_j(struct afb_wsj1 *wsj1, const char *event, struct json_object *object)
{
- return afb_wsj1_send_event_s(wsj1, event, json_object_to_json_string(object));
+ const char *objstr = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN|JSON_C_TO_STRING_NOSLASHESCAPE);
+ int rc = afb_wsj1_send_event_s(wsj1, event, objstr);
+ json_object_put(object);
+ return rc;
}
int afb_wsj1_send_event_s(struct afb_wsj1 *wsj1, const char *event, const char *object)
int afb_wsj1_call_j(struct afb_wsj1 *wsj1, const char *api, const char *verb, struct json_object *object, void (*on_reply)(void *closure, struct afb_wsj1_msg *msg), void *closure)
{
- return afb_wsj1_call_s(wsj1, api, verb, json_object_to_json_string(object), on_reply, closure);
+ const char *objstr = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN|JSON_C_TO_STRING_NOSLASHESCAPE);
+ int rc = afb_wsj1_call_s(wsj1, api, verb, objstr, on_reply, closure);
+ json_object_put(object);
+ return rc;
}
int afb_wsj1_call_s(struct afb_wsj1 *wsj1, const char *api, const char *verb, const char *object, void (*on_reply)(void *closure, struct afb_wsj1_msg *msg), void *closure)
return rc;
}
-
-int afb_wsj1_reply_ok_j(struct afb_wsj1_msg *msg, struct json_object *object, const char *token)
-{
- return afb_wsj1_reply_ok_s(msg, json_object_to_json_string(object), token);
-}
-
-int afb_wsj1_reply_ok_s(struct afb_wsj1_msg *msg, const char *object, const char *token)
+int afb_wsj1_reply_j(struct afb_wsj1_msg *msg, struct json_object *object, const char *token, int iserror)
{
- return wsj1_send_isot(msg->wsj1, RETOK, msg->id, object, token);
-}
-
-int afb_wsj1_reply_error_j(struct afb_wsj1_msg *msg, struct json_object *object, const char *token)
-{
- return afb_wsj1_reply_error_s(msg, json_object_to_json_string(object), token);
+ const char *objstr = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN|JSON_C_TO_STRING_NOSLASHESCAPE);
+ int rc = afb_wsj1_reply_s(msg, objstr, token, iserror);
+ json_object_put(object);
+ return rc;
}
-int afb_wsj1_reply_error_s(struct afb_wsj1_msg *msg, const char *object, const char *token)
+int afb_wsj1_reply_s(struct afb_wsj1_msg *msg, const char *object, const char *token, int iserror)
{
- return wsj1_send_isot(msg->wsj1, RETERR, msg->id, object, token);
+ return wsj1_send_isot(msg->wsj1, iserror ? RETERR : RETOK, msg->id, object, token);
}