X-Git-Url: https://gerrit.automotivelinux.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fafb-api-ws.c;h=56682ddc89fa623ae2695df9b487e0201d4ae821;hb=01534912a8e32468b62d848ec1fe23004df1dd19;hp=663170eee6119986b30e954bc23084bc1e15e1dc;hpb=09f78b70e5debb683e455f6c2de3a5ea0fd93d44;p=src%2Fapp-framework-binder.git diff --git a/src/afb-api-ws.c b/src/afb-api-ws.c index 663170ee..56682ddc 100644 --- a/src/afb-api-ws.c +++ b/src/afb-api-ws.c @@ -1,5 +1,5 @@ /* - * Copyright (C) 2015, 2016 "IoT.bzh" + * Copyright (C) 2015, 2016, 2017 "IoT.bzh" * Author José Bollo * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -29,6 +29,7 @@ #include #include #include +#include #include #include @@ -37,21 +38,34 @@ #include "afb-common.h" -#include "session.h" +#include "afb-session.h" +#include "afb-cred.h" #include "afb-ws.h" #include "afb-msg-json.h" -#include "afb-apis.h" +#include "afb-api.h" +#include "afb-apiset.h" #include "afb-api-so.h" #include "afb-context.h" #include "afb-evt.h" -#include "afb-subcall.h" +#include "afb-xreq.h" #include "verbose.h" +#include "sd-fds.h" struct api_ws_memo; struct api_ws_event; struct api_ws_client; - +#define CHAR_FOR_CALL 'C' +#define CHAR_FOR_ANSWER_SUCCESS 'T' +#define CHAR_FOR_ANSWER_FAIL 'F' +#define CHAR_FOR_EVT_BROADCAST '*' +#define CHAR_FOR_EVT_ADD '+' +#define CHAR_FOR_EVT_DEL '-' +#define CHAR_FOR_EVT_PUSH '!' +#define CHAR_FOR_EVT_SUBSCRIBE 'S' +#define CHAR_FOR_EVT_UNSUBSCRIBE 'U' +#define CHAR_FOR_SUBCALL_CALL 'B' +#define CHAR_FOR_SUBCALL_REPLY 'R' /* */ @@ -60,16 +74,16 @@ struct api_ws char *path; /* path of the object for the API */ char *api; /* api name of the interface */ int fd; /* file descriptor */ + pthread_mutex_t mutex; /**< resource control */ union { struct { - uint32_t id; struct afb_ws *ws; struct api_ws_event *events; struct api_ws_memo *memos; } client; struct { - sd_event_source *listensrc; - struct afb_evt_listener *listener; /* listener for broadcasted events */ + sd_event_source *listensrc; /**< systemd source for server socket */ + struct afb_apiset *apiset; } server; }; }; @@ -78,6 +92,18 @@ struct api_ws #define RETERR 2 #define RETRAW 3 +/******************* common usefull tools **********************************/ + +/** + * translate a pointer to some integer + * @param ptr the pointer to translate + * @return an integer + */ +static inline uint32_t ptr2id(void *ptr) +{ + return (uint32_t)(((intptr_t)ptr) >> 6); +} + /******************* websocket interface for client part **********************************/ static void api_ws_client_on_binary(void *closure, char *data, size_t size); @@ -106,6 +132,29 @@ static const struct afb_evt_itf api_ws_server_evt_itf = { .remove = api_ws_server_event_remove }; +/******************* handling subcalls *****************************/ + +/** + * Structure on server side for recording pending + * subcalls. + */ +struct api_ws_subcall +{ + struct api_ws_subcall *next; /**< next subcall for the client */ + uint32_t subcallid; /**< the subcallid */ + void (*callback)(void*, int, struct json_object*); /**< callback on completion */ + void *closure; /**< closure of the callback */ +}; + +/** + * Structure for sending back replies on client side + */ +struct api_ws_reply +{ + struct api_ws *apiws; /**< api descriptor */ + uint32_t subcallid; /**< subcallid for the reply */ +}; + /******************* client description part for server *****************************/ struct api_ws_client @@ -116,14 +165,26 @@ struct api_ws_client /* count of references */ int refcount; - /* listener for events */ - struct afb_evt_listener *listener; - /* file descriptor */ int fd; + /* resource control */ + pthread_mutex_t mutex; + + /* listener for events */ + struct afb_evt_listener *listener; + /* websocket */ struct afb_ws *ws; + + /* credentials */ + struct afb_cred *cred; + + /* pending subcalls */ + struct api_ws_subcall *subcalls; + + /* apiset */ + struct afb_apiset *apiset; }; /******************* websocket interface for client part **********************************/ @@ -146,44 +207,25 @@ static const struct afb_ws_itf api_ws_server_ws_itf = * structure for a ws request */ struct api_ws_server_req { - struct afb_context context; /* the context, should be THE FIRST */ + struct afb_xreq xreq; /* the xreq */ struct api_ws_client *client; /* the client of the request */ - char *rcvdata; /* the received data to free */ - struct json_object *json; /* the readen request as object */ - const char *request; /* the readen request as string */ - size_t lenreq; /* the length of the request */ - int refcount; /* reference count of the request */ uint32_t msgid; /* the incoming request msgid */ }; -static struct json_object *api_ws_server_req_json(struct api_ws_server_req *wreq); -static struct afb_arg api_ws_server_req_get(struct api_ws_server_req *wreq, const char *name); -static void api_ws_server_req_success(struct api_ws_server_req *wreq, struct json_object *obj, const char *info); -static void api_ws_server_req_fail(struct api_ws_server_req *wreq, const char *status, const char *info); -static const char *api_ws_server_req_raw(struct api_ws_server_req *wreq, size_t *size); -static void api_ws_server_req_send(struct api_ws_server_req *wreq, const char *buffer, size_t size); -static void api_ws_server_req_addref(struct api_ws_server_req *wreq); -static void api_ws_server_req_unref(struct api_ws_server_req *wreq); -static int api_ws_server_req_subscribe(struct api_ws_server_req *wreq, struct afb_event event); -static int api_ws_server_req_unsubscribe(struct api_ws_server_req *wreq, struct afb_event event); -static void api_ws_server_req_subcall(struct api_ws_server_req *wreq, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *closure); - -const struct afb_req_itf afb_api_ws_req_itf = { - .json = (void*)api_ws_server_req_json, - .get = (void*)api_ws_server_req_get, - .success = (void*)api_ws_server_req_success, - .fail = (void*)api_ws_server_req_fail, - .raw = (void*)api_ws_server_req_raw, - .send = (void*)api_ws_server_req_send, - .context_get = (void*)afb_context_get, - .context_set = (void*)afb_context_set, - .addref = (void*)api_ws_server_req_addref, - .unref = (void*)api_ws_server_req_unref, - .session_close = (void*)afb_context_close, - .session_set_LOA = (void*)afb_context_change_loa, - .subscribe = (void*)api_ws_server_req_subscribe, - .unsubscribe = (void*)api_ws_server_req_unsubscribe, - .subcall = (void*)api_ws_server_req_subcall +static void api_ws_server_req_success_cb(struct afb_xreq *xreq, struct json_object *obj, const char *info); +static void api_ws_server_req_fail_cb(struct afb_xreq *xreq, const char *status, const char *info); +static void api_ws_server_req_destroy_cb(struct afb_xreq *xreq); +static int api_ws_server_req_subscribe_cb(struct afb_xreq *xreq, struct afb_event event); +static int api_ws_server_req_unsubscribe_cb(struct afb_xreq *xreq, struct afb_event event); +static void api_ws_server_req_subcall_cb(struct afb_xreq *xreq, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *cb_closure); + +const struct afb_xreq_query_itf afb_api_ws_xreq_itf = { + .success = api_ws_server_req_success_cb, + .fail = api_ws_server_req_fail_cb, + .unref = api_ws_server_req_destroy_cb, + .subcall = api_ws_server_req_subcall_cb, + .subscribe = api_ws_server_req_subscribe_cb, + .unsubscribe = api_ws_server_req_unsubscribe_cb }; /******************* common part **********************************/ @@ -205,15 +247,18 @@ static struct api_ws *api_ws_make(const char *path) } /* path is copied after the struct */ - api->path = (void*)(api+1); + api->path = (char*)(api+1); memcpy(api->path, path, length + 1); /* api name is at the end of the path */ - api->api = strrchr(api->path, '/'); - if (api->api == NULL || !afb_apis_is_valid_api_name(++api->api)) { + while (length && path[length - 1] != '/' && path[length - 1] != ':') + length = length - 1; + api->api = &api->path[length]; + if (api->api == NULL || !afb_api_is_valid_name(api->api)) { errno = EINVAL; goto error2; } + pthread_mutex_init(&api->mutex, NULL); api->fd = -1; return api; @@ -281,7 +326,7 @@ static int api_ws_socket_inet(const char *path, int server) rc = getaddrinfo(host, service, &hint, &rai); if (rc != 0) { errno = EINVAL; - return NULL; + return -1; } /* get the socket */ @@ -311,21 +356,29 @@ static int api_ws_socket(const char *path, int server) { int fd, rc; - /* check for unix socket */ - if (0 == strncmp(path, "unix:", 5)) - fd = api_ws_socket_unix(path + 5, server); - else - fd = api_ws_socket_inet(path, server); - - if (fd >= 0) { - fcntl(fd, F_SETFD, FD_CLOEXEC); - fcntl(fd, F_SETFL, O_NONBLOCK); - if (server) { + /* check for systemd socket */ + if (0 == strncmp(path, "sd:", 3)) + fd = sd_fds_for(path + 3); + else { + /* check for unix socket */ + if (0 == strncmp(path, "unix:", 5)) + /* unix socket */ + fd = api_ws_socket_unix(path + 5, server); + else + /* inet socket */ + fd = api_ws_socket_inet(path, server); + + if (fd >= 0 && server) { rc = 1; setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &rc, sizeof rc); rc = listen(fd, 5); } } + /* configure the socket */ + if (fd >= 0) { + fcntl(fd, F_SETFD, FD_CLOEXEC); + fcntl(fd, F_SETFL, O_NONBLOCK); + } return fd; } @@ -354,6 +407,14 @@ static char *api_ws_read_get(struct readbuf *rb, uint32_t length) return before; } +static int api_ws_read_char(struct readbuf *rb, char *value) +{ + if (rb->head >= rb->end) + return 0; + *value = *rb->head++; + return 1; +} + static int api_ws_read_uint32(struct readbuf *rb, uint32_t *value) { char *after = rb->head + sizeof *value; @@ -377,9 +438,16 @@ static int api_ws_read_string(struct readbuf *rb, const char **value, size_t *le static int api_ws_read_object(struct readbuf *rb, struct json_object **object) { - size_t length; const char *string; - return api_ws_read_string(rb, &string, &length) && ((*object = json_tokener_parse(string)) != NULL) == (strcmp(string, "null") != 0); + struct json_object *o; + int rc = api_ws_read_string(rb, &string, NULL); + if (rc) { + o = json_tokener_parse(string); + if (o == NULL && strcmp(string, "null")) + o = json_object_new_string(string); + *object = o; + } + return rc; } static int api_ws_write_put(struct writebuf *wb, const void *value, size_t length) @@ -417,12 +485,6 @@ static int api_ws_write_uint32(struct writebuf *wb, uint32_t value) return 1; } -static int api_ws_write_string_nz(struct writebuf *wb, const char *value, size_t length) -{ - uint32_t len = (uint32_t)length; - return (size_t)len == length && ++len && api_ws_write_uint32(wb, len) && api_ws_write_put(wb, value, length) && api_ws_write_char(wb, '\0'); -} - static int api_ws_write_string_length(struct writebuf *wb, const char *value, size_t length) { uint32_t len = (uint32_t)++length; @@ -440,19 +502,15 @@ static int api_ws_write_object(struct writebuf *wb, struct json_object *object) return string != NULL && api_ws_write_string(wb, string); } - - - /******************* client part **********************************/ /* * structure for recording query data */ struct api_ws_memo { - struct api_ws_memo *next; /* the next memo */ + struct api_ws_memo *next; /* the next memo */ struct api_ws *api; /* the ws api */ - struct afb_req req; /* the request handle */ - struct afb_context *context; /* the context of the query */ + struct afb_xreq *xreq; /* the request handle */ uint32_t msgid; /* the message identifier */ }; @@ -490,16 +548,17 @@ static struct api_ws_event *api_ws_client_event_search(struct api_ws *api, uint3 /* allocates and init the memorizing data */ -static struct api_ws_memo *api_ws_client_memo_make(struct api_ws *api, struct afb_req req, struct afb_context *context) +static struct api_ws_memo *api_ws_client_memo_make(struct api_ws *api, struct afb_xreq *xreq) { struct api_ws_memo *memo; memo = malloc(sizeof *memo); if (memo != NULL) { - afb_req_addref(req); - memo->req = req; - memo->context = context; - do { memo->msgid = ++api->client.id; } while(api_ws_client_memo_search(api, memo->msgid) != NULL); + afb_xreq_addref(xreq); + memo->xreq = xreq; + memo->msgid = ptr2id(memo); + while(api_ws_client_memo_search(api, memo->msgid) != NULL) + memo->msgid++; memo->api = api; memo->next = api->client.memos; api->client.memos = memo; @@ -521,7 +580,7 @@ static void api_ws_client_memo_destroy(struct api_ws_memo *memo) prv = &(*prv)->next; } - afb_req_unref(memo->req); + afb_xreq_unref(memo->xreq); free(memo); } @@ -575,7 +634,7 @@ static int api_ws_client_msg_memo_get(struct api_ws *api, struct readbuf *rb, st } /* read a subscrition message */ -static int api_ws_client_msg_subscription_get(struct api_ws *api, struct readbuf *rb, struct api_ws_event **ev, struct api_ws_memo **memo) +static int api_ws_client_msg_subscription_get(struct api_ws *api, struct readbuf *rb, struct api_ws_memo **memo, struct api_ws_event **ev) { return api_ws_client_msg_memo_get(api, rb, memo) && api_ws_client_msg_event_get(api, rb, ev); } @@ -649,9 +708,9 @@ static void api_ws_client_event_subscribe(struct api_ws *api, struct readbuf *rb struct api_ws_event *ev; struct api_ws_memo *memo; - if (api_ws_client_msg_subscription_get(api, rb, &ev, &memo)) { + if (api_ws_client_msg_subscription_get(api, rb, &memo, &ev)) { /* subscribe the request from the event */ - if (afb_req_subscribe(memo->req, ev->event) < 0) + if (afb_xreq_subscribe(memo->xreq, ev->event) < 0) ERROR("can't subscribe: %m"); } } @@ -662,9 +721,9 @@ static void api_ws_client_event_unsubscribe(struct api_ws *api, struct readbuf * struct api_ws_event *ev; struct api_ws_memo *memo; - if (api_ws_client_msg_subscription_get(api, rb, &ev, &memo)) { + if (api_ws_client_msg_subscription_get(api, rb, &memo, &ev)) { /* unsubscribe the request from the event */ - if (afb_req_unsubscribe(memo->req, ev->event) < 0) + if (afb_xreq_unsubscribe(memo->xreq, ev->event) < 0) ERROR("can't unsubscribe: %m"); } } @@ -707,11 +766,11 @@ static void api_ws_client_reply_success(struct api_ws *api, struct readbuf *rb) if (api_ws_read_uint32(rb, &flags) && api_ws_read_string(rb, &info, NULL) && api_ws_read_object(rb, &object)) { - memo->context->flags = (unsigned)flags; - afb_req_success(memo->req, object, *info ? info : NULL); + memo->xreq->context.flags = (unsigned)flags; + afb_xreq_success(memo->xreq, object, *info ? info : NULL); } else { /* failing to have the answer */ - afb_req_fail(memo->req, "error", "ws error"); + afb_xreq_fail(memo->xreq, "error", "ws error"); } api_ws_client_memo_destroy(memo); } @@ -729,133 +788,165 @@ static void api_ws_client_reply_fail(struct api_ws *api, struct readbuf *rb) if (api_ws_read_uint32(rb, &flags) && api_ws_read_string(rb, &status, NULL) && api_ws_read_string(rb, &info, NULL)) { - memo->context->flags = (unsigned)flags; - afb_req_fail(memo->req, status, *info ? info : NULL); + memo->xreq->context.flags = (unsigned)flags; + afb_xreq_fail(memo->xreq, status, *info ? info : NULL); } else { /* failing to have the answer */ - afb_req_fail(memo->req, "error", "ws error"); + afb_xreq_fail(memo->xreq, "error", "ws error"); } api_ws_client_memo_destroy(memo); } -static void api_ws_client_reply_send(struct api_ws *api, struct readbuf *rb) +/* send a subcall reply */ +static void api_ws_client_send_subcall_reply(struct api_ws_reply *reply, int iserror, json_object *object) +{ + int rc; + struct writebuf wb = { .count = 0 }; + char ie = (char)!!iserror; + + if (!api_ws_write_char(&wb, CHAR_FOR_SUBCALL_REPLY) + || !api_ws_write_uint32(&wb, reply->subcallid) + || !api_ws_write_char(&wb, ie) + || !api_ws_write_object(&wb, object)) { + /* write error ? */ + return; + } + + rc = afb_ws_binary_v(reply->apiws->client.ws, wb.iovec, wb.count); + if (rc >= 0) + return; + ERROR("error while sending subcall reply"); +} + +/* callback for subcall reply */ +static void api_ws_client_subcall_reply_cb(void *closure, int iserror, json_object *object) { + api_ws_client_send_subcall_reply(closure, iserror, object); + free(closure); +} + +/* received a subcall request */ +static void api_ws_client_subcall(struct api_ws *apiws, struct readbuf *rb) +{ + struct api_ws_reply *reply; struct api_ws_memo *memo; - const char *data; - size_t length; - uint32_t flags; + const char *api, *verb; + uint32_t subcallid; + struct json_object *object; + + reply = malloc(sizeof *reply); + if (!reply) + return; /* retrieve the message data */ - if (!api_ws_client_msg_memo_get(api, rb, &memo)) + if (!api_ws_client_msg_memo_get(apiws, rb, &memo)) return; - if (api_ws_read_uint32(rb, &flags) - && api_ws_read_string(rb, &data, &length)) { - memo->context->flags = (unsigned)flags; - afb_req_send(memo->req, data, length); - } else { - /* failing to have the answer */ - afb_req_fail(memo->req, "error", "ws error"); + if (api_ws_read_uint32(rb, &subcallid) + && api_ws_read_string(rb, &api, NULL) + && api_ws_read_string(rb, &verb, NULL) + && api_ws_read_object(rb, &object)) { + reply->apiws = apiws; + reply->subcallid = subcallid; + afb_xreq_subcall(memo->xreq, api, verb, object, api_ws_client_subcall_reply_cb, reply); } - api_ws_client_memo_destroy(memo); } /* callback when receiving binary data */ static void api_ws_client_on_binary(void *closure, char *data, size_t size) { if (size > 0) { + struct api_ws *apiws = closure; struct readbuf rb = { .head = data, .end = data + size }; + + pthread_mutex_lock(&apiws->mutex); switch (*rb.head++) { - case 'T': /* success */ - api_ws_client_reply_success(closure, &rb); + case CHAR_FOR_ANSWER_SUCCESS: /* success */ + api_ws_client_reply_success(apiws, &rb); break; - case 'F': /* fail */ - api_ws_client_reply_fail(closure, &rb); + case CHAR_FOR_ANSWER_FAIL: /* fail */ + api_ws_client_reply_fail(apiws, &rb); break; - case 'X': /* send */ - api_ws_client_reply_send(closure, &rb); + case CHAR_FOR_EVT_BROADCAST: /* broadcast */ + api_ws_client_event_broadcast(apiws, &rb); break; - case '*': /* broadcast */ - api_ws_client_event_broadcast(closure, &rb); + case CHAR_FOR_EVT_ADD: /* creates the event */ + api_ws_client_event_create(apiws, &rb); break; - case '+': /* creates the event */ - api_ws_client_event_create(closure, &rb); + case CHAR_FOR_EVT_DEL: /* drops the event */ + api_ws_client_event_drop(apiws, &rb); break; - case '-': /* drops the event */ - api_ws_client_event_drop(closure, &rb); + case CHAR_FOR_EVT_PUSH: /* pushs the event */ + api_ws_client_event_push(apiws, &rb); break; - case '!': /* pushs the event */ - api_ws_client_event_push(closure, &rb); + case CHAR_FOR_EVT_SUBSCRIBE: /* subscribe event for a request */ + api_ws_client_event_subscribe(apiws, &rb); break; - case 'S': /* subscribe event for a request */ - api_ws_client_event_subscribe(closure, &rb); + case CHAR_FOR_EVT_UNSUBSCRIBE: /* unsubscribe event for a request */ + api_ws_client_event_unsubscribe(apiws, &rb); break; - case 'U': /* unsubscribe event for a request */ - api_ws_client_event_unsubscribe(closure, &rb); + case CHAR_FOR_SUBCALL_CALL: /* subcall */ + api_ws_client_subcall(apiws, &rb); break; default: /* unexpected message */ + /* TODO: close the connection */ break; } + pthread_mutex_unlock(&apiws->mutex); } + free(data); } /* on call, propagate it to the ws service */ -static void api_ws_client_call(struct api_ws *api, struct afb_req req, struct afb_context *context, const char *verb, size_t lenverb) +static void api_ws_client_call_cb(void * closure, struct afb_xreq *xreq) { int rc; struct api_ws_memo *memo; struct writebuf wb = { .count = 0 }; const char *raw; size_t szraw; + struct api_ws *apiws = closure; + + pthread_mutex_lock(&apiws->mutex); /* create the recording data */ - memo = api_ws_client_memo_make(api, req, context); + memo = api_ws_client_memo_make(apiws, xreq); if (memo == NULL) { - afb_req_fail(req, "error", "out of memory"); - return; + afb_xreq_fail_f(xreq, "error", "out of memory"); + goto end; } /* creates the call message */ - raw = afb_req_raw(req, &szraw); + raw = afb_xreq_raw(xreq, &szraw); if (raw == NULL) goto internal_error; - if (!api_ws_write_uint32(&wb, memo->msgid) - || !api_ws_write_uint32(&wb, (uint32_t)context->flags) - || !api_ws_write_string_nz(&wb, verb, lenverb) - || !api_ws_write_string(&wb, ctxClientGetUuid(context->session)) + if (!api_ws_write_char(&wb, CHAR_FOR_CALL) + || !api_ws_write_uint32(&wb, memo->msgid) + || !api_ws_write_uint32(&wb, (uint32_t)xreq->context.flags) + || !api_ws_write_string(&wb, xreq->verb) + || !api_ws_write_string(&wb, afb_session_uuid(xreq->context.session)) || !api_ws_write_string_length(&wb, raw, szraw)) goto overflow; /* send */ - rc = afb_ws_binary_v(api->client.ws, wb.iovec, wb.count); - if (rc < 0) - goto ws_send_error; - return; + rc = afb_ws_binary_v(apiws->client.ws, wb.iovec, wb.count); + if (rc >= 0) + goto end; -ws_send_error: - afb_req_fail(req, "error", "websocket sending error"); + afb_xreq_fail(xreq, "error", "websocket sending error"); goto clean_memo; internal_error: - afb_req_fail(req, "error", "internal: raw is NULL!"); + afb_xreq_fail(xreq, "error", "internal: raw is NULL!"); goto clean_memo; overflow: - afb_req_fail(req, "error", "overflow: size doesn't match 32 bits!"); + afb_xreq_fail(xreq, "error", "overflow: size doesn't match 32 bits!"); clean_memo: api_ws_client_memo_destroy(memo); -} - -static int api_ws_service_start(struct api_ws *api, int share_session, int onneed) -{ - /* not an error when onneed */ - if (onneed != 0) - return 0; - - /* already started: it is an error */ - ERROR("The WS binding %s is not a startable service", api->path); - return -1; +end: + pthread_mutex_unlock(&apiws->mutex); } /* */ @@ -888,8 +979,12 @@ static int api_ws_client_connect(struct api_ws *api) return -1; } +static struct afb_api_itf ws_api_itf = { + .call = api_ws_client_call_cb +}; + /* adds a afb-ws-service client api */ -int afb_api_ws_add_client(const char *path) +int afb_api_ws_add_client(const char *path, struct afb_apiset *apiset) { int rc; struct api_ws *api; @@ -909,9 +1004,8 @@ int afb_api_ws_add_client(const char *path) /* record it as an API */ afb_api.closure = api; - afb_api.call = (void*)api_ws_client_call; - afb_api.service_start = (void*)api_ws_service_start; - if (afb_apis_add(api->api, afb_api) < 0) + afb_api.itf = &ws_api_itf; + if (afb_apiset_add(apiset, api->api, afb_api) < 0) goto error3; return 0; @@ -928,64 +1022,132 @@ error: static void api_ws_server_client_unref(struct api_ws_client *client) { - if (!--client->refcount) { + struct api_ws_subcall *sc, *nsc; + + if (!__atomic_sub_fetch(&client->refcount, 1, __ATOMIC_RELAXED)) { afb_evt_listener_unref(client->listener); afb_ws_destroy(client->ws); + nsc = client->subcalls; + while (nsc) { + sc= nsc; + nsc = sc->next; + sc->callback(sc->closure, 1, NULL); + free(sc); + } + afb_cred_unref(client->cred); + afb_apiset_unref(client->apiset); free(client); } } +static void api_ws_server_client_addref(struct api_ws_client *client) +{ + __atomic_add_fetch(&client->refcount, 1, __ATOMIC_RELAXED); +} + /* on call, propagate it to the ws service */ -static void api_ws_server_called(struct api_ws_client *client, struct readbuf *rb, char *data, size_t size) +static void api_ws_server_on_call(struct api_ws_client *client, struct readbuf *rb) { struct api_ws_server_req *wreq; - struct afb_req areq; + char *cverb; const char *uuid, *verb; - uint32_t flags; + uint32_t flags, msgid; + size_t lenverb; + struct json_object *object; - client->refcount++; + api_ws_server_client_addref(client); + + /* reads the call message data */ + if (!api_ws_read_uint32(rb, &msgid) + || !api_ws_read_uint32(rb, &flags) + || !api_ws_read_string(rb, &verb, &lenverb) + || !api_ws_read_string(rb, &uuid, NULL) + || !api_ws_read_object(rb, &object)) + goto overflow; /* create the request */ - wreq = calloc(1 , sizeof *wreq); + wreq = malloc(++lenverb + sizeof *wreq); if (wreq == NULL) goto out_of_memory; + afb_xreq_init(&wreq->xreq, &afb_api_ws_xreq_itf); wreq->client = client; - wreq->rcvdata = data; - wreq->refcount = 1; - - /* reads the call message data */ - if (!api_ws_read_uint32(rb, &wreq->msgid) - || !api_ws_read_uint32(rb, &flags) - || !api_ws_read_string(rb, &verb, NULL) - || !api_ws_read_string(rb, &uuid, NULL) - || !api_ws_read_string(rb, &wreq->request, &wreq->lenreq)) - goto overflow; + wreq->msgid = msgid; + cverb = (char*)&wreq[1]; + memcpy(cverb, verb, lenverb); /* init the context */ - if (afb_context_connect(&wreq->context, uuid, NULL) < 0) - goto out_of_memory; - wreq->context.flags = flags; + if (afb_context_connect(&wreq->xreq.context, uuid, NULL) < 0) + goto unconnected; + wreq->xreq.context.flags = flags; /* makes the call */ - areq.itf = &afb_api_ws_req_itf; - areq.closure = wreq; - afb_apis_call_(areq, &wreq->context, client->api, verb); - api_ws_server_req_unref(wreq); + wreq->xreq.cred = afb_cred_addref(client->cred); + wreq->xreq.api = client->api; + wreq->xreq.verb = cverb; + wreq->xreq.json = object; + afb_xreq_process(&wreq->xreq, client->apiset); return; +unconnected: + free(wreq); out_of_memory: + json_object_put(object); overflow: - free(wreq); - free(data); api_ws_server_client_unref(client); } +/* on subcall reply */ +static void api_ws_server_on_subcall_reply(struct api_ws_client *client, struct readbuf *rb) +{ + char iserror; + uint32_t subcallid; + struct json_object *object; + struct api_ws_subcall *sc, **psc; + + /* reads the call message data */ + if (!api_ws_read_uint32(rb, &subcallid) + || !api_ws_read_char(rb, &iserror) + || !api_ws_read_object(rb, &object)) { + /* TODO bad protocol */ + return; + } + + /* search the subcall and unlink it */ + pthread_mutex_lock(&client->mutex); + psc = &client->subcalls; + while ((sc = *psc) && sc->subcallid != subcallid) + psc = &sc->next; + if (!sc) { + pthread_mutex_unlock(&client->mutex); + /* TODO subcall not found */ + } else { + *psc = sc->next; + pthread_mutex_unlock(&client->mutex); + sc->callback(sc->closure, (int)iserror, object); + free(sc); + } + json_object_put(object); +} + /* callback when receiving binary data */ static void api_ws_server_on_binary(void *closure, char *data, size_t size) { - struct readbuf rb = { .head = data, .end = data + size }; - api_ws_server_called(closure, &rb, data, size); + if (size > 0) { + struct readbuf rb = { .head = data, .end = data + size }; + switch (*rb.head++) { + case CHAR_FOR_CALL: + api_ws_server_on_call(closure, &rb); + break; + case CHAR_FOR_SUBCALL_REPLY: + api_ws_server_on_subcall_reply(closure, &rb); + break; + default: /* unexpected message */ + /* TODO: close the connection */ + break; + } + } + free(data); } /* callback when receiving a hangup */ @@ -1016,12 +1178,18 @@ static void api_ws_server_accept(struct api_ws *api) lenaddr = (socklen_t)sizeof addr; client->fd = accept(api->fd, &addr, &lenaddr); if (client->fd >= 0) { + client->cred = afb_cred_create_for_socket(client->fd); + fcntl(client->fd, F_SETFD, FD_CLOEXEC); + fcntl(client->fd, F_SETFL, O_NONBLOCK); client->ws = afb_ws_create(afb_common_get_event_loop(), client->fd, &api_ws_server_ws_itf, client); if (client->ws != NULL) { client->api = api->api; + client->apiset = afb_apiset_addref(api->server.apiset); client->refcount = 1; + client->subcalls = NULL; return; } + afb_cred_unref(client->cred); close(client->fd); } afb_evt_listener_unref(client->listener); @@ -1050,18 +1218,18 @@ static void api_ws_server_event_send(struct api_ws_client *client, char order, c static void api_ws_server_event_add(void *closure, const char *event, int eventid) { - api_ws_server_event_send(closure, '+', event, eventid, NULL); + api_ws_server_event_send(closure, CHAR_FOR_EVT_ADD, event, eventid, NULL); } static void api_ws_server_event_remove(void *closure, const char *event, int eventid) { - api_ws_server_event_send(closure, '-', event, eventid, NULL); + api_ws_server_event_send(closure, CHAR_FOR_EVT_DEL, event, eventid, NULL); } static void api_ws_server_event_push(void *closure, const char *event, int eventid, struct json_object *object) { const char *data = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN); - api_ws_server_event_send(closure, '!', event, eventid, data ? : "null"); + api_ws_server_event_send(closure, CHAR_FOR_EVT_PUSH, event, eventid, data ? : "null"); json_object_put(object); } @@ -1072,7 +1240,7 @@ static void api_ws_server_event_broadcast(void *closure, const char *event, int struct writebuf wb = { .count = 0 }; - if (api_ws_write_char(&wb, '*') && api_ws_write_string(&wb, event) && api_ws_write_object(&wb, object)) { + if (api_ws_write_char(&wb, CHAR_FOR_EVT_BROADCAST) && api_ws_write_string(&wb, event) && api_ws_write_object(&wb, object)) { rc = afb_ws_binary_v(client->ws, wb.iovec, wb.count); if (rc < 0) ERROR("error while broadcasting event %s", event); @@ -1083,52 +1251,27 @@ static void api_ws_server_event_broadcast(void *closure, const char *event, int /******************* ws request part for server *****************/ -/* increment the reference count of the request */ -static void api_ws_server_req_addref(struct api_ws_server_req *wreq) -{ - wreq->refcount++; -} - /* decrement the reference count of the request and free/release it on falling to null */ -static void api_ws_server_req_unref(struct api_ws_server_req *wreq) +static void api_ws_server_req_destroy_cb(struct afb_xreq *xreq) { - if (wreq == NULL || --wreq->refcount) - return; + struct api_ws_server_req *wreq = CONTAINER_OF_XREQ(struct api_ws_server_req, xreq); - afb_context_disconnect(&wreq->context); - json_object_put(wreq->json); - free(wreq->rcvdata); + afb_context_disconnect(&wreq->xreq.context); + afb_cred_unref(wreq->xreq.cred); + json_object_put(wreq->xreq.json); api_ws_server_client_unref(wreq->client); free(wreq); } -/* get the object of the request */ -static struct json_object *api_ws_server_req_json(struct api_ws_server_req *wreq) -{ - if (wreq->json == NULL) { - wreq->json = json_tokener_parse(wreq->request); - if (wreq->json == NULL && strcmp(wreq->request, "null")) { - /* lazy error detection of json request. Is it to improve? */ - wreq->json = json_object_new_string(wreq->request); - } - } - return wreq->json; -} - -/* get the argument of the request of 'name' */ -static struct afb_arg api_ws_server_req_get(struct api_ws_server_req *wreq, const char *name) -{ - return afb_msg_json_get_arg(api_ws_server_req_json(wreq), name); -} - -static void api_ws_server_req_success(struct api_ws_server_req *wreq, struct json_object *obj, const char *info) +static void api_ws_server_req_success_cb(struct afb_xreq *xreq, struct json_object *obj, const char *info) { int rc; struct writebuf wb = { .count = 0 }; + struct api_ws_server_req *wreq = CONTAINER_OF_XREQ(struct api_ws_server_req, xreq); - if (api_ws_write_char(&wb, 'T') + if (api_ws_write_char(&wb, CHAR_FOR_ANSWER_SUCCESS) && api_ws_write_uint32(&wb, wreq->msgid) - && api_ws_write_uint32(&wb, (uint32_t)wreq->context.flags) + && api_ws_write_uint32(&wb, (uint32_t)wreq->xreq.context.flags) && api_ws_write_string(&wb, info ? : "") && api_ws_write_object(&wb, obj)) { rc = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count); @@ -1140,14 +1283,15 @@ success: json_object_put(obj); } -static void api_ws_server_req_fail(struct api_ws_server_req *wreq, const char *status, const char *info) +static void api_ws_server_req_fail_cb(struct afb_xreq *xreq, const char *status, const char *info) { int rc; struct writebuf wb = { .count = 0 }; + struct api_ws_server_req *wreq = CONTAINER_OF_XREQ(struct api_ws_server_req, xreq); - if (api_ws_write_char(&wb, 'F') + if (api_ws_write_char(&wb, CHAR_FOR_ANSWER_FAIL) && api_ws_write_uint32(&wb, wreq->msgid) - && api_ws_write_uint32(&wb, (uint32_t)wreq->context.flags) + && api_ws_write_uint32(&wb, (uint32_t)wreq->xreq.context.flags) && api_ws_write_string(&wb, status) && api_ws_write_string(&wb, info ? : "")) { rc = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count); @@ -1157,40 +1301,58 @@ static void api_ws_server_req_fail(struct api_ws_server_req *wreq, const char *s ERROR("error while sending fail"); } -static const char *api_ws_server_req_raw(struct api_ws_server_req *wreq, size_t *size) +static void api_ws_server_req_subcall_cb(struct afb_xreq *xreq, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *cb_closure) { - if (size != NULL) - *size = wreq->lenreq; - return wreq->request; -} - -static void api_ws_server_req_send(struct api_ws_server_req *wreq, const char *buffer, size_t size) -{ - /* TODO: how to put sized buffer as strings? things aren't clear here!!! */ int rc; struct writebuf wb = { .count = 0 }; + struct api_ws_subcall *sc, *osc; + struct api_ws_server_req *wreq = CONTAINER_OF_XREQ(struct api_ws_server_req, xreq); + struct api_ws_client *client = wreq->client; - if (api_ws_write_char(&wb, 'X') - && api_ws_write_uint32(&wb, wreq->msgid) - && api_ws_write_uint32(&wb, (uint32_t)wreq->context.flags) - && api_ws_write_string_length(&wb, buffer, size)) { - rc = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count); - if (rc >= 0) - return; + sc = malloc(sizeof *sc); + if (!sc) { + + } else { + sc->callback = callback; + sc->closure = cb_closure; + + pthread_mutex_unlock(&client->mutex); + sc->subcallid = ptr2id(sc); + do { + sc->subcallid++; + osc = client->subcalls; + while(osc && osc->subcallid != sc->subcallid) + osc = osc->next; + } while (osc); + sc->next = client->subcalls; + client->subcalls = sc; + pthread_mutex_unlock(&client->mutex); + + if (api_ws_write_char(&wb, CHAR_FOR_SUBCALL_CALL) + && api_ws_write_uint32(&wb, wreq->msgid) + && api_ws_write_uint32(&wb, sc->subcallid) + && api_ws_write_string(&wb, api) + && api_ws_write_string(&wb, verb) + && api_ws_write_object(&wb, args)) { + rc = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count); + if (rc >= 0) + return; + } + ERROR("error while sending fail"); } - ERROR("error while sending raw"); } -static int api_ws_server_req_subscribe(struct api_ws_server_req *wreq, struct afb_event event) +static int api_ws_server_req_subscribe_cb(struct afb_xreq *xreq, struct afb_event event) { int rc, rc2; struct writebuf wb = { .count = 0 }; + struct api_ws_server_req *wreq = CONTAINER_OF_XREQ(struct api_ws_server_req, xreq); rc = afb_evt_add_watch(wreq->client->listener, event); if (rc < 0) return rc; - if (api_ws_write_char(&wb, 'S') + if (api_ws_write_char(&wb, CHAR_FOR_EVT_SUBSCRIBE) && api_ws_write_uint32(&wb, wreq->msgid) && api_ws_write_uint32(&wb, (uint32_t)afb_evt_event_id(event)) && api_ws_write_string(&wb, afb_evt_event_name(event))) { @@ -1203,12 +1365,13 @@ success: return rc; } -static int api_ws_server_req_unsubscribe(struct api_ws_server_req *wreq, struct afb_event event) +static int api_ws_server_req_unsubscribe_cb(struct afb_xreq *xreq, struct afb_event event) { int rc, rc2; struct writebuf wb = { .count = 0 }; + struct api_ws_server_req *wreq = CONTAINER_OF_XREQ(struct api_ws_server_req, xreq); - if (api_ws_write_char(&wb, 'U') + if (api_ws_write_char(&wb, CHAR_FOR_EVT_UNSUBSCRIBE) && api_ws_write_uint32(&wb, wreq->msgid) && api_ws_write_uint32(&wb, (uint32_t)afb_evt_event_id(event)) && api_ws_write_string(&wb, afb_evt_event_name(event))) { @@ -1222,11 +1385,6 @@ success: return rc; } -static void api_ws_server_req_subcall(struct api_ws_server_req *wreq, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *closure) -{ - afb_subcall(&wreq->context, api, verb, args, callback, closure, (struct afb_req){ .itf = &afb_api_ws_req_itf, .closure = wreq }); -} - /******************* server part **********************************/ static int api_ws_server_connect(struct api_ws *api); @@ -1276,7 +1434,7 @@ static int api_ws_server_connect(struct api_ws *api) } /* create the service */ -int afb_api_ws_add_server(const char *path) +int afb_api_ws_add_server(const char *path, struct afb_apiset *apiset) { int rc; struct api_ws *api; @@ -1291,6 +1449,7 @@ int afb_api_ws_add_server(const char *path) if (rc < 0) goto error2; + api->server.apiset = afb_apiset_addref(apiset); return 0; error2: