X-Git-Url: https://gerrit.automotivelinux.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fafb-api-ws.c;h=56682ddc89fa623ae2695df9b487e0201d4ae821;hb=01534912a8e32468b62d848ec1fe23004df1dd19;hp=9b1d8cc71c73c16c6279221d0cc75c91e9352aa9;hpb=a06a69b1cb23283095b3b7f959d2d6c5ec4a6432;p=src%2Fapp-framework-binder.git diff --git a/src/afb-api-ws.c b/src/afb-api-ws.c index 9b1d8cc7..56682ddc 100644 --- a/src/afb-api-ws.c +++ b/src/afb-api-ws.c @@ -29,6 +29,7 @@ #include #include #include +#include #include #include @@ -41,7 +42,8 @@ #include "afb-cred.h" #include "afb-ws.h" #include "afb-msg-json.h" -#include "afb-apis.h" +#include "afb-api.h" +#include "afb-apiset.h" #include "afb-api-so.h" #include "afb-context.h" #include "afb-evt.h" @@ -62,6 +64,8 @@ struct api_ws_client; #define CHAR_FOR_EVT_PUSH '!' #define CHAR_FOR_EVT_SUBSCRIBE 'S' #define CHAR_FOR_EVT_UNSUBSCRIBE 'U' +#define CHAR_FOR_SUBCALL_CALL 'B' +#define CHAR_FOR_SUBCALL_REPLY 'R' /* */ @@ -70,16 +74,16 @@ struct api_ws char *path; /* path of the object for the API */ char *api; /* api name of the interface */ int fd; /* file descriptor */ + pthread_mutex_t mutex; /**< resource control */ union { struct { - uint32_t id; struct afb_ws *ws; struct api_ws_event *events; struct api_ws_memo *memos; } client; struct { - sd_event_source *listensrc; - struct afb_evt_listener *listener; /* listener for broadcasted events */ + sd_event_source *listensrc; /**< systemd source for server socket */ + struct afb_apiset *apiset; } server; }; }; @@ -88,6 +92,18 @@ struct api_ws #define RETERR 2 #define RETRAW 3 +/******************* common usefull tools **********************************/ + +/** + * translate a pointer to some integer + * @param ptr the pointer to translate + * @return an integer + */ +static inline uint32_t ptr2id(void *ptr) +{ + return (uint32_t)(((intptr_t)ptr) >> 6); +} + /******************* websocket interface for client part **********************************/ static void api_ws_client_on_binary(void *closure, char *data, size_t size); @@ -116,6 +132,29 @@ static const struct afb_evt_itf api_ws_server_evt_itf = { .remove = api_ws_server_event_remove }; +/******************* handling subcalls *****************************/ + +/** + * Structure on server side for recording pending + * subcalls. + */ +struct api_ws_subcall +{ + struct api_ws_subcall *next; /**< next subcall for the client */ + uint32_t subcallid; /**< the subcallid */ + void (*callback)(void*, int, struct json_object*); /**< callback on completion */ + void *closure; /**< closure of the callback */ +}; + +/** + * Structure for sending back replies on client side + */ +struct api_ws_reply +{ + struct api_ws *apiws; /**< api descriptor */ + uint32_t subcallid; /**< subcallid for the reply */ +}; + /******************* client description part for server *****************************/ struct api_ws_client @@ -126,17 +165,26 @@ struct api_ws_client /* count of references */ int refcount; - /* listener for events */ - struct afb_evt_listener *listener; - /* file descriptor */ int fd; + /* resource control */ + pthread_mutex_t mutex; + + /* listener for events */ + struct afb_evt_listener *listener; + /* websocket */ struct afb_ws *ws; /* credentials */ struct afb_cred *cred; + + /* pending subcalls */ + struct api_ws_subcall *subcalls; + + /* apiset */ + struct afb_apiset *apiset; }; /******************* websocket interface for client part **********************************/ @@ -161,8 +209,6 @@ static const struct afb_ws_itf api_ws_server_ws_itf = struct api_ws_server_req { struct afb_xreq xreq; /* the xreq */ struct api_ws_client *client; /* the client of the request */ - const char *request; /* the readen request as string */ - size_t lenreq; /* the length of the request */ uint32_t msgid; /* the incoming request msgid */ }; @@ -171,11 +217,13 @@ static void api_ws_server_req_fail_cb(struct afb_xreq *xreq, const char *status, static void api_ws_server_req_destroy_cb(struct afb_xreq *xreq); static int api_ws_server_req_subscribe_cb(struct afb_xreq *xreq, struct afb_event event); static int api_ws_server_req_unsubscribe_cb(struct afb_xreq *xreq, struct afb_event event); +static void api_ws_server_req_subcall_cb(struct afb_xreq *xreq, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *cb_closure); const struct afb_xreq_query_itf afb_api_ws_xreq_itf = { .success = api_ws_server_req_success_cb, .fail = api_ws_server_req_fail_cb, .unref = api_ws_server_req_destroy_cb, + .subcall = api_ws_server_req_subcall_cb, .subscribe = api_ws_server_req_subscribe_cb, .unsubscribe = api_ws_server_req_unsubscribe_cb }; @@ -206,10 +254,11 @@ static struct api_ws *api_ws_make(const char *path) while (length && path[length - 1] != '/' && path[length - 1] != ':') length = length - 1; api->api = &api->path[length]; - if (api->api == NULL || !afb_apis_is_valid_api_name(api->api)) { + if (api->api == NULL || !afb_api_is_valid_name(api->api)) { errno = EINVAL; goto error2; } + pthread_mutex_init(&api->mutex, NULL); api->fd = -1; return api; @@ -358,6 +407,14 @@ static char *api_ws_read_get(struct readbuf *rb, uint32_t length) return before; } +static int api_ws_read_char(struct readbuf *rb, char *value) +{ + if (rb->head >= rb->end) + return 0; + *value = *rb->head++; + return 1; +} + static int api_ws_read_uint32(struct readbuf *rb, uint32_t *value) { char *after = rb->head + sizeof *value; @@ -381,9 +438,16 @@ static int api_ws_read_string(struct readbuf *rb, const char **value, size_t *le static int api_ws_read_object(struct readbuf *rb, struct json_object **object) { - size_t length; const char *string; - return api_ws_read_string(rb, &string, &length) && ((*object = json_tokener_parse(string)) != NULL) == (strcmp(string, "null") != 0); + struct json_object *o; + int rc = api_ws_read_string(rb, &string, NULL); + if (rc) { + o = json_tokener_parse(string); + if (o == NULL && strcmp(string, "null")) + o = json_object_new_string(string); + *object = o; + } + return rc; } static int api_ws_write_put(struct writebuf *wb, const void *value, size_t length) @@ -438,22 +502,15 @@ static int api_ws_write_object(struct writebuf *wb, struct json_object *object) return string != NULL && api_ws_write_string(wb, string); } - - - /******************* client part **********************************/ /* * structure for recording query data */ struct api_ws_memo { - struct api_ws_memo *next; /* the next memo */ + struct api_ws_memo *next; /* the next memo */ struct api_ws *api; /* the ws api */ struct afb_xreq *xreq; /* the request handle */ -#if 0 - struct afb_req req; /* the request handle */ - struct afb_context *context; /* the context of the query */ -#endif uint32_t msgid; /* the message identifier */ }; @@ -499,7 +556,9 @@ static struct api_ws_memo *api_ws_client_memo_make(struct api_ws *api, struct af if (memo != NULL) { afb_xreq_addref(xreq); memo->xreq = xreq; - do { memo->msgid = ++api->client.id; } while(api_ws_client_memo_search(api, memo->msgid) != NULL); + memo->msgid = ptr2id(memo); + while(api_ws_client_memo_search(api, memo->msgid) != NULL) + memo->msgid++; memo->api = api; memo->next = api->client.memos; api->client.memos = memo; @@ -575,7 +634,7 @@ static int api_ws_client_msg_memo_get(struct api_ws *api, struct readbuf *rb, st } /* read a subscrition message */ -static int api_ws_client_msg_subscription_get(struct api_ws *api, struct readbuf *rb, struct api_ws_event **ev, struct api_ws_memo **memo) +static int api_ws_client_msg_subscription_get(struct api_ws *api, struct readbuf *rb, struct api_ws_memo **memo, struct api_ws_event **ev) { return api_ws_client_msg_memo_get(api, rb, memo) && api_ws_client_msg_event_get(api, rb, ev); } @@ -649,7 +708,7 @@ static void api_ws_client_event_subscribe(struct api_ws *api, struct readbuf *rb struct api_ws_event *ev; struct api_ws_memo *memo; - if (api_ws_client_msg_subscription_get(api, rb, &ev, &memo)) { + if (api_ws_client_msg_subscription_get(api, rb, &memo, &ev)) { /* subscribe the request from the event */ if (afb_xreq_subscribe(memo->xreq, ev->event) < 0) ERROR("can't subscribe: %m"); @@ -662,7 +721,7 @@ static void api_ws_client_event_unsubscribe(struct api_ws *api, struct readbuf * struct api_ws_event *ev; struct api_ws_memo *memo; - if (api_ws_client_msg_subscription_get(api, rb, &ev, &memo)) { + if (api_ws_client_msg_subscription_get(api, rb, &memo, &ev)) { /* unsubscribe the request from the event */ if (afb_xreq_unsubscribe(memo->xreq, ev->event) < 0) ERROR("can't unsubscribe: %m"); @@ -738,39 +797,102 @@ static void api_ws_client_reply_fail(struct api_ws *api, struct readbuf *rb) api_ws_client_memo_destroy(memo); } +/* send a subcall reply */ +static void api_ws_client_send_subcall_reply(struct api_ws_reply *reply, int iserror, json_object *object) +{ + int rc; + struct writebuf wb = { .count = 0 }; + char ie = (char)!!iserror; + + if (!api_ws_write_char(&wb, CHAR_FOR_SUBCALL_REPLY) + || !api_ws_write_uint32(&wb, reply->subcallid) + || !api_ws_write_char(&wb, ie) + || !api_ws_write_object(&wb, object)) { + /* write error ? */ + return; + } + + rc = afb_ws_binary_v(reply->apiws->client.ws, wb.iovec, wb.count); + if (rc >= 0) + return; + ERROR("error while sending subcall reply"); +} + +/* callback for subcall reply */ +static void api_ws_client_subcall_reply_cb(void *closure, int iserror, json_object *object) +{ + api_ws_client_send_subcall_reply(closure, iserror, object); + free(closure); +} + +/* received a subcall request */ +static void api_ws_client_subcall(struct api_ws *apiws, struct readbuf *rb) +{ + struct api_ws_reply *reply; + struct api_ws_memo *memo; + const char *api, *verb; + uint32_t subcallid; + struct json_object *object; + + reply = malloc(sizeof *reply); + if (!reply) + return; + + /* retrieve the message data */ + if (!api_ws_client_msg_memo_get(apiws, rb, &memo)) + return; + + if (api_ws_read_uint32(rb, &subcallid) + && api_ws_read_string(rb, &api, NULL) + && api_ws_read_string(rb, &verb, NULL) + && api_ws_read_object(rb, &object)) { + reply->apiws = apiws; + reply->subcallid = subcallid; + afb_xreq_subcall(memo->xreq, api, verb, object, api_ws_client_subcall_reply_cb, reply); + } +} + /* callback when receiving binary data */ static void api_ws_client_on_binary(void *closure, char *data, size_t size) { if (size > 0) { + struct api_ws *apiws = closure; struct readbuf rb = { .head = data, .end = data + size }; + + pthread_mutex_lock(&apiws->mutex); switch (*rb.head++) { case CHAR_FOR_ANSWER_SUCCESS: /* success */ - api_ws_client_reply_success(closure, &rb); + api_ws_client_reply_success(apiws, &rb); break; case CHAR_FOR_ANSWER_FAIL: /* fail */ - api_ws_client_reply_fail(closure, &rb); + api_ws_client_reply_fail(apiws, &rb); break; case CHAR_FOR_EVT_BROADCAST: /* broadcast */ - api_ws_client_event_broadcast(closure, &rb); + api_ws_client_event_broadcast(apiws, &rb); break; case CHAR_FOR_EVT_ADD: /* creates the event */ - api_ws_client_event_create(closure, &rb); + api_ws_client_event_create(apiws, &rb); break; case CHAR_FOR_EVT_DEL: /* drops the event */ - api_ws_client_event_drop(closure, &rb); + api_ws_client_event_drop(apiws, &rb); break; case CHAR_FOR_EVT_PUSH: /* pushs the event */ - api_ws_client_event_push(closure, &rb); + api_ws_client_event_push(apiws, &rb); break; case CHAR_FOR_EVT_SUBSCRIBE: /* subscribe event for a request */ - api_ws_client_event_subscribe(closure, &rb); + api_ws_client_event_subscribe(apiws, &rb); break; case CHAR_FOR_EVT_UNSUBSCRIBE: /* unsubscribe event for a request */ - api_ws_client_event_unsubscribe(closure, &rb); + api_ws_client_event_unsubscribe(apiws, &rb); + break; + case CHAR_FOR_SUBCALL_CALL: /* subcall */ + api_ws_client_subcall(apiws, &rb); break; default: /* unexpected message */ + /* TODO: close the connection */ break; } + pthread_mutex_unlock(&apiws->mutex); } free(data); } @@ -783,13 +905,15 @@ static void api_ws_client_call_cb(void * closure, struct afb_xreq *xreq) struct writebuf wb = { .count = 0 }; const char *raw; size_t szraw; - struct api_ws *api = closure; + struct api_ws *apiws = closure; + + pthread_mutex_lock(&apiws->mutex); /* create the recording data */ - memo = api_ws_client_memo_make(api, xreq); + memo = api_ws_client_memo_make(apiws, xreq); if (memo == NULL) { afb_xreq_fail_f(xreq, "error", "out of memory"); - return; + goto end; } /* creates the call message */ @@ -805,12 +929,10 @@ static void api_ws_client_call_cb(void * closure, struct afb_xreq *xreq) goto overflow; /* send */ - rc = afb_ws_binary_v(api->client.ws, wb.iovec, wb.count); - if (rc < 0) - goto ws_send_error; - return; + rc = afb_ws_binary_v(apiws->client.ws, wb.iovec, wb.count); + if (rc >= 0) + goto end; -ws_send_error: afb_xreq_fail(xreq, "error", "websocket sending error"); goto clean_memo; @@ -823,19 +945,8 @@ overflow: clean_memo: api_ws_client_memo_destroy(memo); -} - -static int api_ws_service_start_cb(void *closure, int share_session, int onneed) -{ - struct api_ws *api = closure; - - /* not an error when onneed */ - if (onneed != 0) - return 0; - - /* already started: it is an error */ - ERROR("The WS binding %s is not a startable service", api->path); - return -1; +end: + pthread_mutex_unlock(&apiws->mutex); } /* */ @@ -869,12 +980,11 @@ static int api_ws_client_connect(struct api_ws *api) } static struct afb_api_itf ws_api_itf = { - .call = api_ws_client_call_cb, - .service_start = api_ws_service_start_cb + .call = api_ws_client_call_cb }; /* adds a afb-ws-service client api */ -int afb_api_ws_add_client(const char *path) +int afb_api_ws_add_client(const char *path, struct afb_apiset *apiset) { int rc; struct api_ws *api; @@ -895,7 +1005,7 @@ int afb_api_ws_add_client(const char *path) /* record it as an API */ afb_api.closure = api; afb_api.itf = &ws_api_itf; - if (afb_apis_add(api->api, afb_api) < 0) + if (afb_apiset_add(apiset, api->api, afb_api) < 0) goto error3; return 0; @@ -912,63 +1022,114 @@ error: static void api_ws_server_client_unref(struct api_ws_client *client) { - if (!--client->refcount) { + struct api_ws_subcall *sc, *nsc; + + if (!__atomic_sub_fetch(&client->refcount, 1, __ATOMIC_RELAXED)) { afb_evt_listener_unref(client->listener); afb_ws_destroy(client->ws); + nsc = client->subcalls; + while (nsc) { + sc= nsc; + nsc = sc->next; + sc->callback(sc->closure, 1, NULL); + free(sc); + } afb_cred_unref(client->cred); + afb_apiset_unref(client->apiset); free(client); } } +static void api_ws_server_client_addref(struct api_ws_client *client) +{ + __atomic_add_fetch(&client->refcount, 1, __ATOMIC_RELAXED); +} + /* on call, propagate it to the ws service */ static void api_ws_server_on_call(struct api_ws_client *client, struct readbuf *rb) { struct api_ws_server_req *wreq; + char *cverb; const char *uuid, *verb; - uint32_t flags; - - client->refcount++; - - /* create the request */ - wreq = calloc(1 , sizeof *wreq); - if (wreq == NULL) - goto out_of_memory; + uint32_t flags, msgid; + size_t lenverb; + struct json_object *object; - wreq->client = client; + api_ws_server_client_addref(client); /* reads the call message data */ - if (!api_ws_read_uint32(rb, &wreq->msgid) + if (!api_ws_read_uint32(rb, &msgid) || !api_ws_read_uint32(rb, &flags) - || !api_ws_read_string(rb, &verb, NULL) + || !api_ws_read_string(rb, &verb, &lenverb) || !api_ws_read_string(rb, &uuid, NULL) - || !api_ws_read_string(rb, &wreq->request, &wreq->lenreq)) + || !api_ws_read_object(rb, &object)) goto overflow; + /* create the request */ + wreq = malloc(++lenverb + sizeof *wreq); + if (wreq == NULL) + goto out_of_memory; + afb_xreq_init(&wreq->xreq, &afb_api_ws_xreq_itf); - wreq->xreq.json = json_tokener_parse(wreq->request); - if (wreq->xreq.json == NULL && strcmp(wreq->request, "null")) { - wreq->xreq.json = json_object_new_string(wreq->request); - } + wreq->client = client; + wreq->msgid = msgid; + cverb = (char*)&wreq[1]; + memcpy(cverb, verb, lenverb); /* init the context */ if (afb_context_connect(&wreq->xreq.context, uuid, NULL) < 0) - goto out_of_memory; + goto unconnected; wreq->xreq.context.flags = flags; /* makes the call */ wreq->xreq.cred = afb_cred_addref(client->cred); wreq->xreq.api = client->api; - wreq->xreq.verb = verb; - afb_apis_call(&wreq->xreq); - afb_xreq_unref(&wreq->xreq); + wreq->xreq.verb = cverb; + wreq->xreq.json = object; + afb_xreq_process(&wreq->xreq, client->apiset); return; +unconnected: + free(wreq); out_of_memory: + json_object_put(object); overflow: - free(wreq); api_ws_server_client_unref(client); } +/* on subcall reply */ +static void api_ws_server_on_subcall_reply(struct api_ws_client *client, struct readbuf *rb) +{ + char iserror; + uint32_t subcallid; + struct json_object *object; + struct api_ws_subcall *sc, **psc; + + /* reads the call message data */ + if (!api_ws_read_uint32(rb, &subcallid) + || !api_ws_read_char(rb, &iserror) + || !api_ws_read_object(rb, &object)) { + /* TODO bad protocol */ + return; + } + + /* search the subcall and unlink it */ + pthread_mutex_lock(&client->mutex); + psc = &client->subcalls; + while ((sc = *psc) && sc->subcallid != subcallid) + psc = &sc->next; + if (!sc) { + pthread_mutex_unlock(&client->mutex); + /* TODO subcall not found */ + } else { + *psc = sc->next; + pthread_mutex_unlock(&client->mutex); + sc->callback(sc->closure, (int)iserror, object); + free(sc); + } + json_object_put(object); +} + /* callback when receiving binary data */ static void api_ws_server_on_binary(void *closure, char *data, size_t size) { @@ -978,6 +1139,9 @@ static void api_ws_server_on_binary(void *closure, char *data, size_t size) case CHAR_FOR_CALL: api_ws_server_on_call(closure, &rb); break; + case CHAR_FOR_SUBCALL_REPLY: + api_ws_server_on_subcall_reply(closure, &rb); + break; default: /* unexpected message */ /* TODO: close the connection */ break; @@ -1020,7 +1184,9 @@ static void api_ws_server_accept(struct api_ws *api) client->ws = afb_ws_create(afb_common_get_event_loop(), client->fd, &api_ws_server_ws_itf, client); if (client->ws != NULL) { client->api = api->api; + client->apiset = afb_apiset_addref(api->server.apiset); client->refcount = 1; + client->subcalls = NULL; return; } afb_cred_unref(client->cred); @@ -1135,6 +1301,47 @@ static void api_ws_server_req_fail_cb(struct afb_xreq *xreq, const char *status, ERROR("error while sending fail"); } +static void api_ws_server_req_subcall_cb(struct afb_xreq *xreq, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *cb_closure) +{ + int rc; + struct writebuf wb = { .count = 0 }; + struct api_ws_subcall *sc, *osc; + struct api_ws_server_req *wreq = CONTAINER_OF_XREQ(struct api_ws_server_req, xreq); + struct api_ws_client *client = wreq->client; + + sc = malloc(sizeof *sc); + if (!sc) { + + } else { + sc->callback = callback; + sc->closure = cb_closure; + + pthread_mutex_unlock(&client->mutex); + sc->subcallid = ptr2id(sc); + do { + sc->subcallid++; + osc = client->subcalls; + while(osc && osc->subcallid != sc->subcallid) + osc = osc->next; + } while (osc); + sc->next = client->subcalls; + client->subcalls = sc; + pthread_mutex_unlock(&client->mutex); + + if (api_ws_write_char(&wb, CHAR_FOR_SUBCALL_CALL) + && api_ws_write_uint32(&wb, wreq->msgid) + && api_ws_write_uint32(&wb, sc->subcallid) + && api_ws_write_string(&wb, api) + && api_ws_write_string(&wb, verb) + && api_ws_write_object(&wb, args)) { + rc = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count); + if (rc >= 0) + return; + } + ERROR("error while sending fail"); + } +} + static int api_ws_server_req_subscribe_cb(struct afb_xreq *xreq, struct afb_event event) { int rc, rc2; @@ -1227,7 +1434,7 @@ static int api_ws_server_connect(struct api_ws *api) } /* create the service */ -int afb_api_ws_add_server(const char *path) +int afb_api_ws_add_server(const char *path, struct afb_apiset *apiset) { int rc; struct api_ws *api; @@ -1242,6 +1449,7 @@ int afb_api_ws_add_server(const char *path) if (rc < 0) goto error2; + api->server.apiset = afb_apiset_addref(apiset); return 0; error2: