X-Git-Url: https://gerrit.automotivelinux.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fafb-ws-json1.c;h=88fd1c113c4ae2d03ce181ac6686efaa3dd533ff;hb=65353dce81a629e042800bb7b86fcd869a76727e;hp=9d295e7806bf1613a276d983f6ed4b44fb4444fd;hpb=d8ef25780bffa6f91f013ef71b1ede908325e59d;p=src%2Fapp-framework-binder.git diff --git a/src/afb-ws-json1.c b/src/afb-ws-json1.c index 9d295e78..88fd1c11 100644 --- a/src/afb-ws-json1.c +++ b/src/afb-ws-json1.c @@ -1,5 +1,5 @@ /* - * Copyright (C) 2016 "IoT.bzh" + * Copyright (C) 2015-2020 "IoT.bzh" * Author: José Bollo * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -25,39 +25,36 @@ #include -#include - #include "afb-wsj1.h" #include "afb-ws-json1.h" #include "afb-msg-json.h" -#include "session.h" -#include "afb-apis.h" +#include "afb-session.h" +#include "afb-cred.h" +#include "afb-apiset.h" +#include "afb-xreq.h" #include "afb-context.h" #include "afb-evt.h" -#include "afb-subcall.h" +#include "afb-token.h" + +#include "systemd.h" #include "verbose.h" +#include "fdev.h" /* predeclaration of structures */ struct afb_ws_json1; struct afb_wsreq; /* predeclaration of websocket callbacks */ -static void aws_on_hangup(struct afb_ws_json1 *ws, struct afb_wsj1 *wsj1); -static void aws_on_call(struct afb_ws_json1 *ws, const char *api, const char *verb, struct afb_wsj1_msg *msg); -static void aws_on_event(struct afb_ws_json1 *ws, const char *event, int eventid, struct json_object *object); +static void aws_on_hangup_cb(void *closure, struct afb_wsj1 *wsj1); +static void aws_on_call_cb(void *closure, const char *api, const char *verb, struct afb_wsj1_msg *msg); +static void aws_on_push_cb(void *closure, const char *event, uint16_t eventid, struct json_object *object); +static void aws_on_broadcast_cb(void *closure, const char *event, struct json_object *object, const uuid_binary_t uuid, uint8_t hop); /* predeclaration of wsreq callbacks */ -static void wsreq_addref(struct afb_wsreq *wsreq); -static void wsreq_unref(struct afb_wsreq *wsreq); -static struct json_object *wsreq_json(struct afb_wsreq *wsreq); -static struct afb_arg wsreq_get(struct afb_wsreq *wsreq, const char *name); -static void wsreq_fail(struct afb_wsreq *wsreq, const char *status, const char *info); -static void wsreq_success(struct afb_wsreq *wsreq, struct json_object *obj, const char *info); -static const char *wsreq_raw(struct afb_wsreq *wsreq, size_t *size); -static void wsreq_send(struct afb_wsreq *wsreq, const char *buffer, size_t size); -static int wsreq_subscribe(struct afb_wsreq *wsreq, struct afb_event event); -static int wsreq_unsubscribe(struct afb_wsreq *wsreq, struct afb_event event); -static void wsreq_subcall(struct afb_wsreq *wsreq, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *closure); +static void wsreq_destroy(struct afb_xreq *xreq); +static void wsreq_reply(struct afb_xreq *xreq, struct json_object *object, const char *error, const char *info); +static int wsreq_subscribe(struct afb_xreq *xreq, struct afb_event_x2 *event); +static int wsreq_unsubscribe(struct afb_xreq *xreq, struct afb_event_x2 *event); /* declaration of websocket structure */ struct afb_ws_json1 @@ -65,21 +62,18 @@ struct afb_ws_json1 int refcount; void (*cleanup)(void*); void *cleanup_closure; - struct AFB_clientCtx *session; + struct afb_session *session; + struct afb_token *token; struct afb_evt_listener *listener; struct afb_wsj1 *wsj1; - int new_session; + struct afb_cred *cred; + struct afb_apiset *apiset; }; /* declaration of wsreq structure */ struct afb_wsreq { - /* - * CAUTION: 'context' field should be the first because there - * is an implicit convertion to struct afb_context - */ - struct afb_context context; - int refcount; + struct afb_xreq xreq; struct afb_ws_json1 *aws; struct afb_wsreq *next; struct afb_wsj1_msg *msgj1; @@ -87,33 +81,22 @@ struct afb_wsreq /* interface for afb_ws_json1 / afb_wsj1 */ static struct afb_wsj1_itf wsj1_itf = { - .on_hangup = (void*)aws_on_hangup, - .on_call = (void*)aws_on_call + .on_hangup = aws_on_hangup_cb, + .on_call = aws_on_call_cb }; -/* interface for wsreq / afb_req */ -const struct afb_req_itf afb_ws_json1_req_itf = { - .json = (void*)wsreq_json, - .get = (void*)wsreq_get, - .success = (void*)wsreq_success, - .fail = (void*)wsreq_fail, - .raw = (void*)wsreq_raw, - .send = (void*)wsreq_send, - .context_get = (void*)afb_context_get, - .context_set = (void*)afb_context_set, - .addref = (void*)wsreq_addref, - .unref = (void*)wsreq_unref, - .session_close = (void*)afb_context_close, - .session_set_LOA = (void*)afb_context_change_loa, - .subscribe = (void*)wsreq_subscribe, - .unsubscribe = (void*)wsreq_unsubscribe, - .subcall = (void*)wsreq_subcall +/* interface for xreq */ +const struct afb_xreq_query_itf afb_ws_json1_xreq_itf = { + .reply = wsreq_reply, + .subscribe = wsreq_subscribe, + .unsubscribe = wsreq_unsubscribe, + .unref = wsreq_destroy }; /* the interface for events */ static const struct afb_evt_itf evt_itf = { - .broadcast = (void*)aws_on_event, - .push = (void*)aws_on_event + .broadcast = aws_on_broadcast_cb, + .push = aws_on_push_cb }; /*************************************************************** @@ -124,11 +107,11 @@ static const struct afb_evt_itf evt_itf = { **************************************************************** ***************************************************************/ -struct afb_ws_json1 *afb_ws_json1_create(int fd, struct afb_context *context, void (*cleanup)(void*), void *cleanup_closure) +struct afb_ws_json1 *afb_ws_json1_create(struct fdev *fdev, struct afb_apiset *apiset, struct afb_context *context, void (*cleanup)(void*), void *cleanup_closure) { struct afb_ws_json1 *result; - assert(fd >= 0); + assert(fdev); assert(context != NULL); result = malloc(sizeof * result); @@ -138,12 +121,12 @@ struct afb_ws_json1 *afb_ws_json1_create(int fd, struct afb_context *context, vo result->refcount = 1; result->cleanup = cleanup; result->cleanup_closure = cleanup_closure; - result->session = ctxClientAddRef(context->session); - result->new_session = context->created != 0; + result->session = afb_session_addref(context->session); + result->token = afb_token_addref(context->token); if (result->session == NULL) goto error2; - result->wsj1 = afb_wsj1_create(fd, &wsj1_itf, result); + result->wsj1 = afb_wsj1_create(fdev, &wsj1_itf, result); if (result->wsj1 == NULL) goto error3; @@ -151,49 +134,76 @@ struct afb_ws_json1 *afb_ws_json1_create(int fd, struct afb_context *context, vo if (result->listener == NULL) goto error4; + result->cred = afb_cred_create_for_socket(fdev_fd(fdev)); + result->apiset = afb_apiset_addref(apiset); return result; error4: afb_wsj1_unref(result->wsj1); error3: - ctxClientUnref(result->session); + afb_session_unref(result->session); + afb_token_unref(result->token); error2: free(result); error: - close(fd); + fdev_unref(fdev); return NULL; } -static struct afb_ws_json1 *aws_addref(struct afb_ws_json1 *ws) +struct afb_ws_json1 *afb_ws_json1_addref(struct afb_ws_json1 *ws) { - ws->refcount++; + __atomic_add_fetch(&ws->refcount, 1, __ATOMIC_RELAXED); return ws; } -static void aws_unref(struct afb_ws_json1 *ws) +void afb_ws_json1_unref(struct afb_ws_json1 *ws) { - if (--ws->refcount == 0) { + if (!__atomic_sub_fetch(&ws->refcount, 1, __ATOMIC_RELAXED)) { afb_evt_listener_unref(ws->listener); afb_wsj1_unref(ws->wsj1); if (ws->cleanup != NULL) ws->cleanup(ws->cleanup_closure); - ctxClientUnref(ws->session); + afb_token_unref(ws->token); + afb_session_unref(ws->session); + afb_cred_unref(ws->cred); + afb_apiset_unref(ws->apiset); free(ws); } } -static void aws_on_hangup(struct afb_ws_json1 *ws, struct afb_wsj1 *wsj1) +static void aws_on_hangup_cb(void *closure, struct afb_wsj1 *wsj1) { - aws_unref(ws); + struct afb_ws_json1 *ws = closure; + afb_ws_json1_unref(ws); } -static void aws_on_call(struct afb_ws_json1 *ws, const char *api, const char *verb, struct afb_wsj1_msg *msg) +static int aws_new_token(struct afb_ws_json1 *ws, const char *new_token_string) { - struct afb_req r; + int rc; + struct afb_token *newtok, *oldtok; + + rc = afb_token_get(&newtok, new_token_string); + if (rc >= 0) { + oldtok = ws->token; + ws->token = newtok; + afb_token_unref(oldtok); + } + return rc; +} + +static void aws_on_call_cb(void *closure, const char *api, const char *verb, struct afb_wsj1_msg *msg) +{ + struct afb_ws_json1 *ws = closure; struct afb_wsreq *wsreq; + const char *tok; DEBUG("received websocket request for %s/%s: %s", api, verb, afb_wsj1_msg_object_s(msg)); + /* handle new tokens */ + tok = afb_wsj1_msg_token(msg); + if (tok) + aws_new_token(ws, tok); + /* allocate */ wsreq = calloc(1, sizeof *wsreq); if (wsreq == NULL) { @@ -202,32 +212,36 @@ static void aws_on_call(struct afb_ws_json1 *ws, const char *api, const char *ve } /* init the context */ - afb_context_init(&wsreq->context, ws->session, afb_wsj1_msg_token(msg)); - if (!wsreq->context.invalidated) - wsreq->context.validated = 1; - if (ws->new_session != 0) { - wsreq->context.created = 1; - ws->new_session = 0; - } + afb_xreq_init(&wsreq->xreq, &afb_ws_json1_xreq_itf); + afb_context_init(&wsreq->xreq.context, ws->session, ws->token, ws->cred); /* fill and record the request */ afb_wsj1_msg_addref(msg); wsreq->msgj1 = msg; - wsreq->refcount = 1; - wsreq->aws = aws_addref(ws); + wsreq->xreq.request.called_api = api; + wsreq->xreq.request.called_verb = verb; + wsreq->xreq.json = afb_wsj1_msg_object_j(wsreq->msgj1); + wsreq->aws = afb_ws_json1_addref(ws); /* emits the call */ - r.closure = wsreq; - r.itf = &afb_ws_json1_req_itf; - afb_apis_call_(r, &wsreq->context, api, verb); - wsreq_unref(wsreq); + afb_xreq_process(&wsreq->xreq, ws->apiset); } -static void aws_on_event(struct afb_ws_json1 *aws, const char *event, int eventid, struct json_object *object) +static void aws_on_event(struct afb_ws_json1 *aws, const char *event, struct json_object *object) { afb_wsj1_send_event_j(aws->wsj1, event, afb_msg_json_event(event, object)); } +static void aws_on_push_cb(void *closure, const char *event, uint16_t eventid, struct json_object *object) +{ + aws_on_event(closure, event, object); +} + +static void aws_on_broadcast_cb(void *closure, const char *event, struct json_object *object, const uuid_binary_t uuid, uint8_t hop) +{ + aws_on_event(closure, event, afb_msg_json_event(event, object)); +} + /*************************************************************** **************************************************************** ** @@ -236,75 +250,42 @@ static void aws_on_event(struct afb_ws_json1 *aws, const char *event, int eventi **************************************************************** ***************************************************************/ -static void wsreq_addref(struct afb_wsreq *wsreq) +static void wsreq_destroy(struct afb_xreq *xreq) { - wsreq->refcount++; -} + struct afb_wsreq *wsreq = CONTAINER_OF_XREQ(struct afb_wsreq, xreq); -static void wsreq_unref(struct afb_wsreq *wsreq) -{ - if (--wsreq->refcount == 0) { - afb_context_disconnect(&wsreq->context); - afb_wsj1_msg_unref(wsreq->msgj1); - aws_unref(wsreq->aws); - free(wsreq); - } + afb_context_disconnect(&wsreq->xreq.context); + afb_wsj1_msg_unref(wsreq->msgj1); + afb_ws_json1_unref(wsreq->aws); + free(wsreq); } -static struct json_object *wsreq_json(struct afb_wsreq *wsreq) -{ - return afb_wsj1_msg_object_j(wsreq->msgj1); -} - -static struct afb_arg wsreq_get(struct afb_wsreq *wsreq, const char *name) -{ - return afb_msg_json_get_arg(wsreq_json(wsreq), name); -} - -static void wsreq_fail(struct afb_wsreq *wsreq, const char *status, const char *info) -{ - int rc; - rc = afb_wsj1_reply_error_j(wsreq->msgj1, afb_msg_json_reply_error(status, info, &wsreq->context, NULL), afb_context_sent_token(&wsreq->context)); - if (rc) - ERROR("Can't send fail reply: %m"); -} - -static void wsreq_success(struct afb_wsreq *wsreq, json_object *obj, const char *info) +static void wsreq_reply(struct afb_xreq *xreq, struct json_object *object, const char *error, const char *info) { + struct afb_wsreq *wsreq = CONTAINER_OF_XREQ(struct afb_wsreq, xreq); int rc; - rc = afb_wsj1_reply_ok_j(wsreq->msgj1, afb_msg_json_reply_ok(info, obj, &wsreq->context, NULL), afb_context_sent_token(&wsreq->context)); - if (rc) - ERROR("Can't send success reply: %m"); -} + struct json_object *reply; -static const char *wsreq_raw(struct afb_wsreq *wsreq, size_t *size) -{ - const char *result = afb_wsj1_msg_object_s(wsreq->msgj1); - if (size != NULL) - *size = strlen(result); - return result; -} + /* create the reply */ + reply = afb_msg_json_reply(object, error, info, &xreq->context); -static void wsreq_send(struct afb_wsreq *wsreq, const char *buffer, size_t size) -{ - int rc; - rc = afb_wsj1_reply_ok_s(wsreq->msgj1, buffer, afb_context_sent_token(&wsreq->context)); + rc = (error ? afb_wsj1_reply_error_j : afb_wsj1_reply_ok_j)( + wsreq->msgj1, reply, NULL); if (rc) - ERROR("Can't send raw reply: %m"); + ERROR("Can't send reply: %m"); } -static int wsreq_subscribe(struct afb_wsreq *wsreq, struct afb_event event) +static int wsreq_subscribe(struct afb_xreq *xreq, struct afb_event_x2 *event) { - return afb_evt_add_watch(wsreq->aws->listener, event); -} + struct afb_wsreq *wsreq = CONTAINER_OF_XREQ(struct afb_wsreq, xreq); -static int wsreq_unsubscribe(struct afb_wsreq *wsreq, struct afb_event event) -{ - return afb_evt_remove_watch(wsreq->aws->listener, event); + return afb_evt_listener_watch_x2(wsreq->aws->listener, event); } -static void wsreq_subcall(struct afb_wsreq *wsreq, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *closure) +static int wsreq_unsubscribe(struct afb_xreq *xreq, struct afb_event_x2 *event) { - afb_subcall(&wsreq->context, api, verb, args, callback, closure, (struct afb_req){ .itf = &afb_ws_json1_req_itf, .closure = wsreq }); + struct afb_wsreq *wsreq = CONTAINER_OF_XREQ(struct afb_wsreq, xreq); + + return afb_evt_listener_unwatch_x2(wsreq->aws->listener, event); }