X-Git-Url: https://gerrit.automotivelinux.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fafb-ws-json1.c;h=88fd1c113c4ae2d03ce181ac6686efaa3dd533ff;hb=65353dce81a629e042800bb7b86fcd869a76727e;hp=78b8c2100cdd63c5c6f953e17f2772f1b2d02fba;hpb=28158192742ead144454e071720d10bf5218a20b;p=src%2Fapp-framework-binder.git diff --git a/src/afb-ws-json1.c b/src/afb-ws-json1.c index 78b8c210..88fd1c11 100644 --- a/src/afb-ws-json1.c +++ b/src/afb-ws-json1.c @@ -1,5 +1,5 @@ /* - * Copyright (C) 2016 "IoT.bzh" + * Copyright (C) 2015-2020 "IoT.bzh" * Author: José Bollo * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -25,427 +25,267 @@ #include -#include "afb-ws.h" +#include "afb-wsj1.h" #include "afb-ws-json1.h" #include "afb-msg-json.h" -#include "session.h" -#include -#include "afb-apis.h" +#include "afb-session.h" +#include "afb-cred.h" +#include "afb-apiset.h" +#include "afb-xreq.h" #include "afb-context.h" +#include "afb-evt.h" +#include "afb-token.h" -static void aws_on_hangup(struct afb_ws_json1 *ws); -static void aws_on_text(struct afb_ws_json1 *ws, char *text, size_t size); - -static struct afb_ws_itf aws_itf = { - .on_hangup = (void*)aws_on_hangup, - .on_text = (void*)aws_on_text -}; +#include "systemd.h" +#include "verbose.h" +#include "fdev.h" +/* predeclaration of structures */ +struct afb_ws_json1; struct afb_wsreq; +/* predeclaration of websocket callbacks */ +static void aws_on_hangup_cb(void *closure, struct afb_wsj1 *wsj1); +static void aws_on_call_cb(void *closure, const char *api, const char *verb, struct afb_wsj1_msg *msg); +static void aws_on_push_cb(void *closure, const char *event, uint16_t eventid, struct json_object *object); +static void aws_on_broadcast_cb(void *closure, const char *event, struct json_object *object, const uuid_binary_t uuid, uint8_t hop); + +/* predeclaration of wsreq callbacks */ +static void wsreq_destroy(struct afb_xreq *xreq); +static void wsreq_reply(struct afb_xreq *xreq, struct json_object *object, const char *error, const char *info); +static int wsreq_subscribe(struct afb_xreq *xreq, struct afb_event_x2 *event); +static int wsreq_unsubscribe(struct afb_xreq *xreq, struct afb_event_x2 *event); + +/* declaration of websocket structure */ struct afb_ws_json1 { + int refcount; void (*cleanup)(void*); void *cleanup_closure; - struct afb_wsreq *requests; - struct AFB_clientCtx *session; - struct json_tokener *tokener; - struct afb_ws *ws; - int new_session; + struct afb_session *session; + struct afb_token *token; + struct afb_evt_listener *listener; + struct afb_wsj1 *wsj1; + struct afb_cred *cred; + struct afb_apiset *apiset; +}; + +/* declaration of wsreq structure */ +struct afb_wsreq +{ + struct afb_xreq xreq; + struct afb_ws_json1 *aws; + struct afb_wsreq *next; + struct afb_wsj1_msg *msgj1; }; -static void aws_send_event(struct afb_ws_json1 *ws, const char *event, struct json_object *object); +/* interface for afb_ws_json1 / afb_wsj1 */ +static struct afb_wsj1_itf wsj1_itf = { + .on_hangup = aws_on_hangup_cb, + .on_call = aws_on_call_cb +}; -static const struct afb_event_listener_itf event_listener_itf = { - .send = (void*)aws_send_event, - .expects = NULL +/* interface for xreq */ +const struct afb_xreq_query_itf afb_ws_json1_xreq_itf = { + .reply = wsreq_reply, + .subscribe = wsreq_subscribe, + .unsubscribe = wsreq_unsubscribe, + .unref = wsreq_destroy }; -static inline struct afb_event_listener listener_for(struct afb_ws_json1 *aws) -{ - return (struct afb_event_listener){ .itf = &event_listener_itf, .closure = aws }; -} +/* the interface for events */ +static const struct afb_evt_itf evt_itf = { + .broadcast = aws_on_broadcast_cb, + .push = aws_on_push_cb +}; + +/*************************************************************** +**************************************************************** +** +** functions of afb_ws_json1 / afb_wsj1 +** +**************************************************************** +***************************************************************/ -struct afb_ws_json1 *afb_ws_json1_create(int fd, struct afb_context *context, void (*cleanup)(void*), void *cleanup_closure) +struct afb_ws_json1 *afb_ws_json1_create(struct fdev *fdev, struct afb_apiset *apiset, struct afb_context *context, void (*cleanup)(void*), void *cleanup_closure) { struct afb_ws_json1 *result; - assert(fd >= 0); + assert(fdev); assert(context != NULL); result = malloc(sizeof * result); if (result == NULL) goto error; + result->refcount = 1; result->cleanup = cleanup; result->cleanup_closure = cleanup_closure; - result->requests = NULL; - result->session = ctxClientAddRef(context->session); - result->new_session = context->created != 0; + result->session = afb_session_addref(context->session); + result->token = afb_token_addref(context->token); if (result->session == NULL) goto error2; - result->tokener = json_tokener_new(); - if (result->tokener == NULL) + result->wsj1 = afb_wsj1_create(fdev, &wsj1_itf, result); + if (result->wsj1 == NULL) goto error3; - result->ws = afb_ws_create(fd, &aws_itf, result); - if (result->ws == NULL) + result->listener = afb_evt_listener_create(&evt_itf, result); + if (result->listener == NULL) goto error4; - if (0 > ctxClientEventListenerAdd(result->session, listener_for(result))) - goto error5; - + result->cred = afb_cred_create_for_socket(fdev_fd(fdev)); + result->apiset = afb_apiset_addref(apiset); return result; -error5: - afb_ws_destroy(result->ws); error4: - json_tokener_free(result->tokener); + afb_wsj1_unref(result->wsj1); error3: - ctxClientUnref(result->session); + afb_session_unref(result->session); + afb_token_unref(result->token); error2: free(result); error: - close(fd); + fdev_unref(fdev); return NULL; } -static void aws_on_hangup(struct afb_ws_json1 *ws) +struct afb_ws_json1 *afb_ws_json1_addref(struct afb_ws_json1 *ws) { - ctxClientEventListenerRemove(ws->session, listener_for(ws)); - afb_ws_destroy(ws->ws); - json_tokener_free(ws->tokener); - if (ws->cleanup != NULL) - ws->cleanup(ws->cleanup_closure); - ctxClientUnref(ws->session); - free(ws); + __atomic_add_fetch(&ws->refcount, 1, __ATOMIC_RELAXED); + return ws; } -#define CALL 2 -#define RETOK 3 -#define RETERR 4 -#define EVENT 5 - -struct afb_wsreq -{ - struct afb_context context; - int refcount; - struct afb_ws_json1 *aws; - struct afb_wsreq *next; - char *text; - size_t size; - int code; - char *id; - size_t idlen; - char *api; - size_t apilen; - char *verb; - size_t verblen; - char *obj; - size_t objlen; - char *tok; - size_t toklen; - struct json_object *root; -}; - -static void wsreq_addref(struct afb_wsreq *wsreq); -static void wsreq_unref(struct afb_wsreq *wsreq); -static struct json_object *wsreq_json(struct afb_wsreq *wsreq); -static struct afb_arg wsreq_get(struct afb_wsreq *wsreq, const char *name); -static void wsreq_fail(struct afb_wsreq *wsreq, const char *status, const char *info); -static void wsreq_success(struct afb_wsreq *wsreq, struct json_object *obj, const char *info); -static const char *wsreq_raw(struct afb_wsreq *wsreq, size_t *size); -static void wsreq_send(struct afb_wsreq *wsreq, const char *buffer, size_t size); - - -static const struct afb_req_itf wsreq_itf = { - .json = (void*)wsreq_json, - .get = (void*)wsreq_get, - .success = (void*)wsreq_success, - .fail = (void*)wsreq_fail, - .raw = (void*)wsreq_raw, - .send = (void*)wsreq_send, - .context_get = (void*)afb_context_get, - .context_set = (void*)afb_context_set, - .addref = (void*)wsreq_addref, - .unref = (void*)wsreq_unref -}; - -static int aws_wsreq_parse(struct afb_wsreq *r, char *text, size_t size) +void afb_ws_json1_unref(struct afb_ws_json1 *ws) { - char *pos, *end, c; - int aux; - - /* scan */ - pos = text; - end = text + size; - - /* scans: [ */ - while(pos < end && *pos == ' ') pos++; - if (pos == end) goto bad_header; - if (*pos++ != '[') goto bad_header; - - /* scans code: 2|3|4 */ - while(pos < end && *pos == ' ') pos++; - if (pos == end) goto bad_header; - switch (*pos++) { - case '2': r->code = CALL; break; - case '3': r->code = RETOK; break; - case '4': r->code = RETERR; break; - default: goto bad_header; + if (!__atomic_sub_fetch(&ws->refcount, 1, __ATOMIC_RELAXED)) { + afb_evt_listener_unref(ws->listener); + afb_wsj1_unref(ws->wsj1); + if (ws->cleanup != NULL) + ws->cleanup(ws->cleanup_closure); + afb_token_unref(ws->token); + afb_session_unref(ws->session); + afb_cred_unref(ws->cred); + afb_apiset_unref(ws->apiset); + free(ws); } +} - /* scans: , */ - while(pos < end && *pos == ' ') pos++; - if (pos == end) goto bad_header; - if (*pos++ != ',') goto bad_header; - - /* scans id: "id" */ - while(pos < end && *pos == ' ') pos++; - if (pos == end) goto bad_header; - if (*pos++ != '"') goto bad_header; - r->id = pos; - while(pos < end && *pos != '"') pos++; - if (pos == end) goto bad_header; - r->idlen = (size_t)(pos++ - r->id); - - /* scans: , */ - while(pos < end && *pos == ' ') pos++; - if (pos == end) goto bad_header; - if (*pos++ != ',') goto bad_header; - - /* scans the method if needed */ - if (r->code == CALL) { - /* scans: " */ - while(pos < end && *pos == ' ') pos++; - if (pos == end) goto bad_header; - if (*pos++ != '"') goto bad_header; - - /* scans: api/ */ - r->api = pos; - while(pos < end && *pos != '"' && *pos != '/') pos++; - if (pos == end) goto bad_header; - if (*pos != '/') goto bad_header; - r->apilen = (size_t)(pos++ - r->api); - if (r->apilen && r->api[r->apilen - 1] == '\\') - r->apilen--; - - /* scans: verb" */ - r->verb = pos; - while(pos < end && *pos != '"') pos++; - if (pos == end) goto bad_header; - r->verblen = (size_t)(pos++ - r->verb); - - /* scans: , */ - while(pos < end && *pos == ' ') pos++; - if (pos == end) goto bad_header; - if (*pos++ != ',') goto bad_header; - } +static void aws_on_hangup_cb(void *closure, struct afb_wsj1 *wsj1) +{ + struct afb_ws_json1 *ws = closure; + afb_ws_json1_unref(ws); +} - /* scan obj */ - while(pos < end && *pos == ' ') pos++; - if (pos == end) goto bad_header; - aux = 0; - r->obj = pos; - while (pos < end && (aux != 0 || (*pos != ',' && *pos != ']'))) { - if (pos == end) goto bad_header; - switch(*pos) { - case '{': case '[': aux++; break; - case '}': case ']': if (!aux--) goto bad_header; break; - case '"': - do { - pos += 1 + (*pos == '\\'); - } while(pos < end && *pos != '"'); - default: - break; - } - pos++; - } - if (pos > end) goto bad_header; - if (pos == end && aux != 0) goto bad_header; - c = *pos; - r->objlen = (size_t)(pos++ - r->obj); - while (r->objlen && r->obj[r->objlen - 1] == ' ') - r->objlen--; - - /* scan the token (if any) */ - if (c == ',') { - /* scans token: "token" */ - while(pos < end && *pos == ' ') pos++; - if (pos == end) goto bad_header; - if (*pos++ != '"') goto bad_header; - r->tok = pos; - while(pos < end && *pos != '"') pos++; - if (pos == end) goto bad_header; - r->toklen = (size_t)(pos++ - r->tok); - while(pos < end && *pos == ' ') pos++; - if (pos == end) goto bad_header; - c = *pos++; +static int aws_new_token(struct afb_ws_json1 *ws, const char *new_token_string) +{ + int rc; + struct afb_token *newtok, *oldtok; + + rc = afb_token_get(&newtok, new_token_string); + if (rc >= 0) { + oldtok = ws->token; + ws->token = newtok; + afb_token_unref(oldtok); } - - /* scan: ] */ - if (c != ']') goto bad_header; - while(pos < end && *pos == ' ') pos++; - if (pos != end) goto bad_header; - - /* done */ - r->text = text; - r->size = size; - return 1; - -bad_header: - return 0; + return rc; } -static void aws_on_text(struct afb_ws_json1 *ws, char *text, size_t size) +static void aws_on_call_cb(void *closure, const char *api, const char *verb, struct afb_wsj1_msg *msg) { - struct afb_req r; + struct afb_ws_json1 *ws = closure; struct afb_wsreq *wsreq; + const char *tok; + + DEBUG("received websocket request for %s/%s: %s", api, verb, afb_wsj1_msg_object_s(msg)); + + /* handle new tokens */ + tok = afb_wsj1_msg_token(msg); + if (tok) + aws_new_token(ws, tok); /* allocate */ wsreq = calloc(1, sizeof *wsreq); - if (wsreq == NULL) - goto alloc_error; + if (wsreq == NULL) { + afb_wsj1_close(ws->wsj1, 1008, NULL); + return; + } - /* init */ - if (!aws_wsreq_parse(wsreq, text, size)) - goto bad_header; + /* init the context */ + afb_xreq_init(&wsreq->xreq, &afb_ws_json1_xreq_itf); + afb_context_init(&wsreq->xreq.context, ws->session, ws->token, ws->cred); /* fill and record the request */ - if (wsreq->tok != NULL) - wsreq->tok[wsreq->toklen] = 0; - afb_context_init(&wsreq->context, ws->session, wsreq->tok); - if (!wsreq->context.invalidated) - wsreq->context.validated = 1; - if (ws->new_session != 0) { - wsreq->context.created = 1; - ws->new_session = 0; - } - wsreq->refcount = 1; - wsreq->aws = ws; - wsreq->next = ws->requests; - ws->requests = wsreq; - - r.closure = wsreq; - r.itf = &wsreq_itf; - afb_apis_call(r, &wsreq->context, wsreq->api, wsreq->apilen, wsreq->verb, wsreq->verblen); - wsreq_unref(wsreq); - return; - -bad_header: - free(wsreq); -alloc_error: - free(text); - afb_ws_close(ws->ws, 1008, NULL); - return; + afb_wsj1_msg_addref(msg); + wsreq->msgj1 = msg; + wsreq->xreq.request.called_api = api; + wsreq->xreq.request.called_verb = verb; + wsreq->xreq.json = afb_wsj1_msg_object_j(wsreq->msgj1); + wsreq->aws = afb_ws_json1_addref(ws); + + /* emits the call */ + afb_xreq_process(&wsreq->xreq, ws->apiset); } -static void wsreq_addref(struct afb_wsreq *wsreq) +static void aws_on_event(struct afb_ws_json1 *aws, const char *event, struct json_object *object) { - wsreq->refcount++; + afb_wsj1_send_event_j(aws->wsj1, event, afb_msg_json_event(event, object)); } -static void wsreq_unref(struct afb_wsreq *wsreq) +static void aws_on_push_cb(void *closure, const char *event, uint16_t eventid, struct json_object *object) { - if (--wsreq->refcount == 0) { - struct afb_wsreq **prv = &wsreq->aws->requests; - while(*prv != NULL) { - if (*prv == wsreq) { - *prv = wsreq->next; - break; - } - prv = &(*prv)->next; - } - afb_context_disconnect(&wsreq->context); - json_object_put(wsreq->root); - free(wsreq->text); - free(wsreq); - } + aws_on_event(closure, event, object); } -static struct json_object *wsreq_json(struct afb_wsreq *wsreq) +static void aws_on_broadcast_cb(void *closure, const char *event, struct json_object *object, const uuid_binary_t uuid, uint8_t hop) { - struct json_object *root = wsreq->root; - if (root == NULL) { - json_tokener_reset(wsreq->aws->tokener); - root = json_tokener_parse_ex(wsreq->aws->tokener, wsreq->obj, (int)wsreq->objlen); - if (root == NULL) { - /* lazy error detection of json request. Is it to improve? */ - root = json_object_new_string_len(wsreq->obj, (int)wsreq->objlen); - } - wsreq->root = root; - } - return root; + aws_on_event(closure, event, afb_msg_json_event(event, object)); } -static struct afb_arg wsreq_get(struct afb_wsreq *wsreq, const char *name) -{ - struct afb_arg arg; - struct json_object *value, *root; - - root = wsreq_json(wsreq); - if (json_object_object_get_ex(root, name, &value)) { - arg.name = name; - arg.value = json_object_get_string(value); - } else { - arg.name = NULL; - arg.value = NULL; - } - arg.path = NULL; - return arg; -} +/*************************************************************** +**************************************************************** +** +** functions of wsreq / afb_req +** +**************************************************************** +***************************************************************/ -static void aws_emit(struct afb_ws_json1 *aws, int code, const char *id, size_t idlen, struct json_object *data, const char *token) +static void wsreq_destroy(struct afb_xreq *xreq) { - json_object *msg; - const char *txt; - - /* pack the message */ - msg = json_object_new_array(); - json_object_array_add(msg, json_object_new_int(code)); - json_object_array_add(msg, json_object_new_string_len(id, (int)idlen)); - json_object_array_add(msg, data); - if (token) - json_object_array_add(msg, json_object_new_string(token)); - - /* emits the reply */ - txt = json_object_to_json_string(msg); - afb_ws_text(aws->ws, txt, strlen(txt)); - json_object_put(msg); -} + struct afb_wsreq *wsreq = CONTAINER_OF_XREQ(struct afb_wsreq, xreq); -static void wsreq_reply(struct afb_wsreq *wsreq, int retcode, const char *status, const char *info, json_object *resp) -{ - const char *uuid = afb_context_sent_uuid(&wsreq->context); - const char *token = afb_context_sent_token(&wsreq->context); - struct json_object *reply = afb_msg_json_reply(status, info, resp, token, uuid); - aws_emit(wsreq->aws, retcode, wsreq->id, wsreq->idlen, reply, token); + afb_context_disconnect(&wsreq->xreq.context); + afb_wsj1_msg_unref(wsreq->msgj1); + afb_ws_json1_unref(wsreq->aws); + free(wsreq); } -static void wsreq_fail(struct afb_wsreq *wsreq, const char *status, const char *info) +static void wsreq_reply(struct afb_xreq *xreq, struct json_object *object, const char *error, const char *info) { - wsreq_reply(wsreq, RETERR, status, info, NULL); -} + struct afb_wsreq *wsreq = CONTAINER_OF_XREQ(struct afb_wsreq, xreq); + int rc; + struct json_object *reply; -static void wsreq_success(struct afb_wsreq *wsreq, json_object *obj, const char *info) -{ - wsreq_reply(wsreq, RETOK, "success", info, obj); -} + /* create the reply */ + reply = afb_msg_json_reply(object, error, info, &xreq->context); -static const char *wsreq_raw(struct afb_wsreq *wsreq, size_t *size) -{ - *size = wsreq->objlen; - return wsreq->obj; + rc = (error ? afb_wsj1_reply_error_j : afb_wsj1_reply_ok_j)( + wsreq->msgj1, reply, NULL); + if (rc) + ERROR("Can't send reply: %m"); } -static void wsreq_send(struct afb_wsreq *wsreq, const char *buffer, size_t size) +static int wsreq_subscribe(struct afb_xreq *xreq, struct afb_event_x2 *event) { - afb_ws_text(wsreq->aws->ws, buffer, size); + struct afb_wsreq *wsreq = CONTAINER_OF_XREQ(struct afb_wsreq, xreq); + + return afb_evt_listener_watch_x2(wsreq->aws->listener, event); } -static void aws_send_event(struct afb_ws_json1 *aws, const char *event, struct json_object *object) +static int wsreq_unsubscribe(struct afb_xreq *xreq, struct afb_event_x2 *event) { - aws_emit(aws, EVENT, event, strlen(event), afb_msg_json_event(event, object), NULL); + struct afb_wsreq *wsreq = CONTAINER_OF_XREQ(struct afb_wsreq, xreq); + + return afb_evt_listener_unwatch_x2(wsreq->aws->listener, event); }