X-Git-Url: https://gerrit.automotivelinux.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fafb-api-ws.c;h=56682ddc89fa623ae2695df9b487e0201d4ae821;hb=01534912a8e32468b62d848ec1fe23004df1dd19;hp=8be74c03cd54e89fbf0929bc19a4cf2c902a97e8;hpb=a05138e6bf1257b0e7b1ad90c974fb9e12f1d040;p=src%2Fapp-framework-binder.git diff --git a/src/afb-api-ws.c b/src/afb-api-ws.c index 8be74c03..56682ddc 100644 --- a/src/afb-api-ws.c +++ b/src/afb-api-ws.c @@ -29,6 +29,7 @@ #include #include #include +#include #include #include @@ -41,7 +42,8 @@ #include "afb-cred.h" #include "afb-ws.h" #include "afb-msg-json.h" -#include "afb-apis.h" +#include "afb-api.h" +#include "afb-apiset.h" #include "afb-api-so.h" #include "afb-context.h" #include "afb-evt.h" @@ -53,7 +55,17 @@ struct api_ws_memo; struct api_ws_event; struct api_ws_client; - +#define CHAR_FOR_CALL 'C' +#define CHAR_FOR_ANSWER_SUCCESS 'T' +#define CHAR_FOR_ANSWER_FAIL 'F' +#define CHAR_FOR_EVT_BROADCAST '*' +#define CHAR_FOR_EVT_ADD '+' +#define CHAR_FOR_EVT_DEL '-' +#define CHAR_FOR_EVT_PUSH '!' +#define CHAR_FOR_EVT_SUBSCRIBE 'S' +#define CHAR_FOR_EVT_UNSUBSCRIBE 'U' +#define CHAR_FOR_SUBCALL_CALL 'B' +#define CHAR_FOR_SUBCALL_REPLY 'R' /* */ @@ -62,16 +74,16 @@ struct api_ws char *path; /* path of the object for the API */ char *api; /* api name of the interface */ int fd; /* file descriptor */ + pthread_mutex_t mutex; /**< resource control */ union { struct { - uint32_t id; struct afb_ws *ws; struct api_ws_event *events; struct api_ws_memo *memos; } client; struct { - sd_event_source *listensrc; - struct afb_evt_listener *listener; /* listener for broadcasted events */ + sd_event_source *listensrc; /**< systemd source for server socket */ + struct afb_apiset *apiset; } server; }; }; @@ -80,6 +92,18 @@ struct api_ws #define RETERR 2 #define RETRAW 3 +/******************* common usefull tools **********************************/ + +/** + * translate a pointer to some integer + * @param ptr the pointer to translate + * @return an integer + */ +static inline uint32_t ptr2id(void *ptr) +{ + return (uint32_t)(((intptr_t)ptr) >> 6); +} + /******************* websocket interface for client part **********************************/ static void api_ws_client_on_binary(void *closure, char *data, size_t size); @@ -108,6 +132,29 @@ static const struct afb_evt_itf api_ws_server_evt_itf = { .remove = api_ws_server_event_remove }; +/******************* handling subcalls *****************************/ + +/** + * Structure on server side for recording pending + * subcalls. + */ +struct api_ws_subcall +{ + struct api_ws_subcall *next; /**< next subcall for the client */ + uint32_t subcallid; /**< the subcallid */ + void (*callback)(void*, int, struct json_object*); /**< callback on completion */ + void *closure; /**< closure of the callback */ +}; + +/** + * Structure for sending back replies on client side + */ +struct api_ws_reply +{ + struct api_ws *apiws; /**< api descriptor */ + uint32_t subcallid; /**< subcallid for the reply */ +}; + /******************* client description part for server *****************************/ struct api_ws_client @@ -118,17 +165,26 @@ struct api_ws_client /* count of references */ int refcount; - /* listener for events */ - struct afb_evt_listener *listener; - /* file descriptor */ int fd; + /* resource control */ + pthread_mutex_t mutex; + + /* listener for events */ + struct afb_evt_listener *listener; + /* websocket */ struct afb_ws *ws; /* credentials */ struct afb_cred *cred; + + /* pending subcalls */ + struct api_ws_subcall *subcalls; + + /* apiset */ + struct afb_apiset *apiset; }; /******************* websocket interface for client part **********************************/ @@ -153,22 +209,21 @@ static const struct afb_ws_itf api_ws_server_ws_itf = struct api_ws_server_req { struct afb_xreq xreq; /* the xreq */ struct api_ws_client *client; /* the client of the request */ - char *rcvdata; /* the received data to free */ - const char *request; /* the readen request as string */ - size_t lenreq; /* the length of the request */ uint32_t msgid; /* the incoming request msgid */ }; -static void api_ws_server_req_success_cb(void *closure, struct json_object *obj, const char *info); -static void api_ws_server_req_fail_cb(void *closure, const char *status, const char *info); -static void api_ws_server_req_destroy_cb(void *closure); -static int api_ws_server_req_subscribe_cb(void *closure, struct afb_event event); -static int api_ws_server_req_unsubscribe_cb(void *closure, struct afb_event event); +static void api_ws_server_req_success_cb(struct afb_xreq *xreq, struct json_object *obj, const char *info); +static void api_ws_server_req_fail_cb(struct afb_xreq *xreq, const char *status, const char *info); +static void api_ws_server_req_destroy_cb(struct afb_xreq *xreq); +static int api_ws_server_req_subscribe_cb(struct afb_xreq *xreq, struct afb_event event); +static int api_ws_server_req_unsubscribe_cb(struct afb_xreq *xreq, struct afb_event event); +static void api_ws_server_req_subcall_cb(struct afb_xreq *xreq, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *cb_closure); const struct afb_xreq_query_itf afb_api_ws_xreq_itf = { .success = api_ws_server_req_success_cb, .fail = api_ws_server_req_fail_cb, .unref = api_ws_server_req_destroy_cb, + .subcall = api_ws_server_req_subcall_cb, .subscribe = api_ws_server_req_subscribe_cb, .unsubscribe = api_ws_server_req_unsubscribe_cb }; @@ -199,10 +254,11 @@ static struct api_ws *api_ws_make(const char *path) while (length && path[length - 1] != '/' && path[length - 1] != ':') length = length - 1; api->api = &api->path[length]; - if (api->api == NULL || !afb_apis_is_valid_api_name(api->api)) { + if (api->api == NULL || !afb_api_is_valid_name(api->api)) { errno = EINVAL; goto error2; } + pthread_mutex_init(&api->mutex, NULL); api->fd = -1; return api; @@ -351,6 +407,14 @@ static char *api_ws_read_get(struct readbuf *rb, uint32_t length) return before; } +static int api_ws_read_char(struct readbuf *rb, char *value) +{ + if (rb->head >= rb->end) + return 0; + *value = *rb->head++; + return 1; +} + static int api_ws_read_uint32(struct readbuf *rb, uint32_t *value) { char *after = rb->head + sizeof *value; @@ -374,9 +438,16 @@ static int api_ws_read_string(struct readbuf *rb, const char **value, size_t *le static int api_ws_read_object(struct readbuf *rb, struct json_object **object) { - size_t length; const char *string; - return api_ws_read_string(rb, &string, &length) && ((*object = json_tokener_parse(string)) != NULL) == (strcmp(string, "null") != 0); + struct json_object *o; + int rc = api_ws_read_string(rb, &string, NULL); + if (rc) { + o = json_tokener_parse(string); + if (o == NULL && strcmp(string, "null")) + o = json_object_new_string(string); + *object = o; + } + return rc; } static int api_ws_write_put(struct writebuf *wb, const void *value, size_t length) @@ -431,22 +502,15 @@ static int api_ws_write_object(struct writebuf *wb, struct json_object *object) return string != NULL && api_ws_write_string(wb, string); } - - - /******************* client part **********************************/ /* * structure for recording query data */ struct api_ws_memo { - struct api_ws_memo *next; /* the next memo */ + struct api_ws_memo *next; /* the next memo */ struct api_ws *api; /* the ws api */ struct afb_xreq *xreq; /* the request handle */ -#if 0 - struct afb_req req; /* the request handle */ - struct afb_context *context; /* the context of the query */ -#endif uint32_t msgid; /* the message identifier */ }; @@ -492,7 +556,9 @@ static struct api_ws_memo *api_ws_client_memo_make(struct api_ws *api, struct af if (memo != NULL) { afb_xreq_addref(xreq); memo->xreq = xreq; - do { memo->msgid = ++api->client.id; } while(api_ws_client_memo_search(api, memo->msgid) != NULL); + memo->msgid = ptr2id(memo); + while(api_ws_client_memo_search(api, memo->msgid) != NULL) + memo->msgid++; memo->api = api; memo->next = api->client.memos; api->client.memos = memo; @@ -568,7 +634,7 @@ static int api_ws_client_msg_memo_get(struct api_ws *api, struct readbuf *rb, st } /* read a subscrition message */ -static int api_ws_client_msg_subscription_get(struct api_ws *api, struct readbuf *rb, struct api_ws_event **ev, struct api_ws_memo **memo) +static int api_ws_client_msg_subscription_get(struct api_ws *api, struct readbuf *rb, struct api_ws_memo **memo, struct api_ws_event **ev) { return api_ws_client_msg_memo_get(api, rb, memo) && api_ws_client_msg_event_get(api, rb, ev); } @@ -642,7 +708,7 @@ static void api_ws_client_event_subscribe(struct api_ws *api, struct readbuf *rb struct api_ws_event *ev; struct api_ws_memo *memo; - if (api_ws_client_msg_subscription_get(api, rb, &ev, &memo)) { + if (api_ws_client_msg_subscription_get(api, rb, &memo, &ev)) { /* subscribe the request from the event */ if (afb_xreq_subscribe(memo->xreq, ev->event) < 0) ERROR("can't subscribe: %m"); @@ -655,7 +721,7 @@ static void api_ws_client_event_unsubscribe(struct api_ws *api, struct readbuf * struct api_ws_event *ev; struct api_ws_memo *memo; - if (api_ws_client_msg_subscription_get(api, rb, &ev, &memo)) { + if (api_ws_client_msg_subscription_get(api, rb, &memo, &ev)) { /* unsubscribe the request from the event */ if (afb_xreq_unsubscribe(memo->xreq, ev->event) < 0) ERROR("can't unsubscribe: %m"); @@ -731,39 +797,102 @@ static void api_ws_client_reply_fail(struct api_ws *api, struct readbuf *rb) api_ws_client_memo_destroy(memo); } +/* send a subcall reply */ +static void api_ws_client_send_subcall_reply(struct api_ws_reply *reply, int iserror, json_object *object) +{ + int rc; + struct writebuf wb = { .count = 0 }; + char ie = (char)!!iserror; + + if (!api_ws_write_char(&wb, CHAR_FOR_SUBCALL_REPLY) + || !api_ws_write_uint32(&wb, reply->subcallid) + || !api_ws_write_char(&wb, ie) + || !api_ws_write_object(&wb, object)) { + /* write error ? */ + return; + } + + rc = afb_ws_binary_v(reply->apiws->client.ws, wb.iovec, wb.count); + if (rc >= 0) + return; + ERROR("error while sending subcall reply"); +} + +/* callback for subcall reply */ +static void api_ws_client_subcall_reply_cb(void *closure, int iserror, json_object *object) +{ + api_ws_client_send_subcall_reply(closure, iserror, object); + free(closure); +} + +/* received a subcall request */ +static void api_ws_client_subcall(struct api_ws *apiws, struct readbuf *rb) +{ + struct api_ws_reply *reply; + struct api_ws_memo *memo; + const char *api, *verb; + uint32_t subcallid; + struct json_object *object; + + reply = malloc(sizeof *reply); + if (!reply) + return; + + /* retrieve the message data */ + if (!api_ws_client_msg_memo_get(apiws, rb, &memo)) + return; + + if (api_ws_read_uint32(rb, &subcallid) + && api_ws_read_string(rb, &api, NULL) + && api_ws_read_string(rb, &verb, NULL) + && api_ws_read_object(rb, &object)) { + reply->apiws = apiws; + reply->subcallid = subcallid; + afb_xreq_subcall(memo->xreq, api, verb, object, api_ws_client_subcall_reply_cb, reply); + } +} + /* callback when receiving binary data */ static void api_ws_client_on_binary(void *closure, char *data, size_t size) { if (size > 0) { + struct api_ws *apiws = closure; struct readbuf rb = { .head = data, .end = data + size }; + + pthread_mutex_lock(&apiws->mutex); switch (*rb.head++) { - case 'T': /* success */ - api_ws_client_reply_success(closure, &rb); + case CHAR_FOR_ANSWER_SUCCESS: /* success */ + api_ws_client_reply_success(apiws, &rb); + break; + case CHAR_FOR_ANSWER_FAIL: /* fail */ + api_ws_client_reply_fail(apiws, &rb); break; - case 'F': /* fail */ - api_ws_client_reply_fail(closure, &rb); + case CHAR_FOR_EVT_BROADCAST: /* broadcast */ + api_ws_client_event_broadcast(apiws, &rb); break; - case '*': /* broadcast */ - api_ws_client_event_broadcast(closure, &rb); + case CHAR_FOR_EVT_ADD: /* creates the event */ + api_ws_client_event_create(apiws, &rb); break; - case '+': /* creates the event */ - api_ws_client_event_create(closure, &rb); + case CHAR_FOR_EVT_DEL: /* drops the event */ + api_ws_client_event_drop(apiws, &rb); break; - case '-': /* drops the event */ - api_ws_client_event_drop(closure, &rb); + case CHAR_FOR_EVT_PUSH: /* pushs the event */ + api_ws_client_event_push(apiws, &rb); break; - case '!': /* pushs the event */ - api_ws_client_event_push(closure, &rb); + case CHAR_FOR_EVT_SUBSCRIBE: /* subscribe event for a request */ + api_ws_client_event_subscribe(apiws, &rb); break; - case 'S': /* subscribe event for a request */ - api_ws_client_event_subscribe(closure, &rb); + case CHAR_FOR_EVT_UNSUBSCRIBE: /* unsubscribe event for a request */ + api_ws_client_event_unsubscribe(apiws, &rb); break; - case 'U': /* unsubscribe event for a request */ - api_ws_client_event_unsubscribe(closure, &rb); + case CHAR_FOR_SUBCALL_CALL: /* subcall */ + api_ws_client_subcall(apiws, &rb); break; default: /* unexpected message */ + /* TODO: close the connection */ break; } + pthread_mutex_unlock(&apiws->mutex); } free(data); } @@ -776,20 +905,23 @@ static void api_ws_client_call_cb(void * closure, struct afb_xreq *xreq) struct writebuf wb = { .count = 0 }; const char *raw; size_t szraw; - struct api_ws *api = closure; + struct api_ws *apiws = closure; + + pthread_mutex_lock(&apiws->mutex); /* create the recording data */ - memo = api_ws_client_memo_make(api, xreq); + memo = api_ws_client_memo_make(apiws, xreq); if (memo == NULL) { afb_xreq_fail_f(xreq, "error", "out of memory"); - return; + goto end; } /* creates the call message */ raw = afb_xreq_raw(xreq, &szraw); if (raw == NULL) goto internal_error; - if (!api_ws_write_uint32(&wb, memo->msgid) + if (!api_ws_write_char(&wb, CHAR_FOR_CALL) + || !api_ws_write_uint32(&wb, memo->msgid) || !api_ws_write_uint32(&wb, (uint32_t)xreq->context.flags) || !api_ws_write_string(&wb, xreq->verb) || !api_ws_write_string(&wb, afb_session_uuid(xreq->context.session)) @@ -797,12 +929,10 @@ static void api_ws_client_call_cb(void * closure, struct afb_xreq *xreq) goto overflow; /* send */ - rc = afb_ws_binary_v(api->client.ws, wb.iovec, wb.count); - if (rc < 0) - goto ws_send_error; - return; + rc = afb_ws_binary_v(apiws->client.ws, wb.iovec, wb.count); + if (rc >= 0) + goto end; -ws_send_error: afb_xreq_fail(xreq, "error", "websocket sending error"); goto clean_memo; @@ -815,19 +945,8 @@ overflow: clean_memo: api_ws_client_memo_destroy(memo); -} - -static int api_ws_service_start_cb(void *closure, int share_session, int onneed) -{ - struct api_ws *api = closure; - - /* not an error when onneed */ - if (onneed != 0) - return 0; - - /* already started: it is an error */ - ERROR("The WS binding %s is not a startable service", api->path); - return -1; +end: + pthread_mutex_unlock(&apiws->mutex); } /* */ @@ -861,12 +980,11 @@ static int api_ws_client_connect(struct api_ws *api) } static struct afb_api_itf ws_api_itf = { - .call = api_ws_client_call_cb, - .service_start = api_ws_service_start_cb + .call = api_ws_client_call_cb }; /* adds a afb-ws-service client api */ -int afb_api_ws_add_client(const char *path) +int afb_api_ws_add_client(const char *path, struct afb_apiset *apiset) { int rc; struct api_ws *api; @@ -887,7 +1005,7 @@ int afb_api_ws_add_client(const char *path) /* record it as an API */ afb_api.closure = api; afb_api.itf = &ws_api_itf; - if (afb_apis_add(api->api, afb_api) < 0) + if (afb_apiset_add(apiset, api->api, afb_api) < 0) goto error3; return 0; @@ -904,71 +1022,132 @@ error: static void api_ws_server_client_unref(struct api_ws_client *client) { - if (!--client->refcount) { + struct api_ws_subcall *sc, *nsc; + + if (!__atomic_sub_fetch(&client->refcount, 1, __ATOMIC_RELAXED)) { afb_evt_listener_unref(client->listener); afb_ws_destroy(client->ws); + nsc = client->subcalls; + while (nsc) { + sc= nsc; + nsc = sc->next; + sc->callback(sc->closure, 1, NULL); + free(sc); + } afb_cred_unref(client->cred); + afb_apiset_unref(client->apiset); free(client); } } +static void api_ws_server_client_addref(struct api_ws_client *client) +{ + __atomic_add_fetch(&client->refcount, 1, __ATOMIC_RELAXED); +} + /* on call, propagate it to the ws service */ -static void api_ws_server_called(struct api_ws_client *client, struct readbuf *rb, char *data, size_t size) +static void api_ws_server_on_call(struct api_ws_client *client, struct readbuf *rb) { struct api_ws_server_req *wreq; + char *cverb; const char *uuid, *verb; - uint32_t flags; - - client->refcount++; - - /* create the request */ - wreq = calloc(1 , sizeof *wreq); - if (wreq == NULL) - goto out_of_memory; + uint32_t flags, msgid; + size_t lenverb; + struct json_object *object; - wreq->client = client; - wreq->rcvdata = data; + api_ws_server_client_addref(client); /* reads the call message data */ - if (!api_ws_read_uint32(rb, &wreq->msgid) + if (!api_ws_read_uint32(rb, &msgid) || !api_ws_read_uint32(rb, &flags) - || !api_ws_read_string(rb, &verb, NULL) + || !api_ws_read_string(rb, &verb, &lenverb) || !api_ws_read_string(rb, &uuid, NULL) - || !api_ws_read_string(rb, &wreq->request, &wreq->lenreq)) + || !api_ws_read_object(rb, &object)) goto overflow; - wreq->xreq.json = json_tokener_parse(wreq->request); - if (wreq->xreq.json == NULL && strcmp(wreq->request, "null")) { - wreq->xreq.json = json_object_new_string(wreq->request); - } + /* create the request */ + wreq = malloc(++lenverb + sizeof *wreq); + if (wreq == NULL) + goto out_of_memory; + + afb_xreq_init(&wreq->xreq, &afb_api_ws_xreq_itf); + wreq->client = client; + wreq->msgid = msgid; + cverb = (char*)&wreq[1]; + memcpy(cverb, verb, lenverb); /* init the context */ if (afb_context_connect(&wreq->xreq.context, uuid, NULL) < 0) - goto out_of_memory; + goto unconnected; wreq->xreq.context.flags = flags; /* makes the call */ - wreq->xreq.refcount = 1; + wreq->xreq.cred = afb_cred_addref(client->cred); wreq->xreq.api = client->api; - wreq->xreq.verb = verb; - wreq->xreq.query = wreq; - wreq->xreq.queryitf = &afb_api_ws_xreq_itf; - afb_apis_call(&wreq->xreq); - afb_xreq_unref(&wreq->xreq); + wreq->xreq.verb = cverb; + wreq->xreq.json = object; + afb_xreq_process(&wreq->xreq, client->apiset); return; +unconnected: + free(wreq); out_of_memory: + json_object_put(object); overflow: - free(wreq); - free(data); api_ws_server_client_unref(client); } +/* on subcall reply */ +static void api_ws_server_on_subcall_reply(struct api_ws_client *client, struct readbuf *rb) +{ + char iserror; + uint32_t subcallid; + struct json_object *object; + struct api_ws_subcall *sc, **psc; + + /* reads the call message data */ + if (!api_ws_read_uint32(rb, &subcallid) + || !api_ws_read_char(rb, &iserror) + || !api_ws_read_object(rb, &object)) { + /* TODO bad protocol */ + return; + } + + /* search the subcall and unlink it */ + pthread_mutex_lock(&client->mutex); + psc = &client->subcalls; + while ((sc = *psc) && sc->subcallid != subcallid) + psc = &sc->next; + if (!sc) { + pthread_mutex_unlock(&client->mutex); + /* TODO subcall not found */ + } else { + *psc = sc->next; + pthread_mutex_unlock(&client->mutex); + sc->callback(sc->closure, (int)iserror, object); + free(sc); + } + json_object_put(object); +} + /* callback when receiving binary data */ static void api_ws_server_on_binary(void *closure, char *data, size_t size) { - struct readbuf rb = { .head = data, .end = data + size }; - api_ws_server_called(closure, &rb, data, size); + if (size > 0) { + struct readbuf rb = { .head = data, .end = data + size }; + switch (*rb.head++) { + case CHAR_FOR_CALL: + api_ws_server_on_call(closure, &rb); + break; + case CHAR_FOR_SUBCALL_REPLY: + api_ws_server_on_subcall_reply(closure, &rb); + break; + default: /* unexpected message */ + /* TODO: close the connection */ + break; + } + } + free(data); } /* callback when receiving a hangup */ @@ -1005,7 +1184,9 @@ static void api_ws_server_accept(struct api_ws *api) client->ws = afb_ws_create(afb_common_get_event_loop(), client->fd, &api_ws_server_ws_itf, client); if (client->ws != NULL) { client->api = api->api; + client->apiset = afb_apiset_addref(api->server.apiset); client->refcount = 1; + client->subcalls = NULL; return; } afb_cred_unref(client->cred); @@ -1037,18 +1218,18 @@ static void api_ws_server_event_send(struct api_ws_client *client, char order, c static void api_ws_server_event_add(void *closure, const char *event, int eventid) { - api_ws_server_event_send(closure, '+', event, eventid, NULL); + api_ws_server_event_send(closure, CHAR_FOR_EVT_ADD, event, eventid, NULL); } static void api_ws_server_event_remove(void *closure, const char *event, int eventid) { - api_ws_server_event_send(closure, '-', event, eventid, NULL); + api_ws_server_event_send(closure, CHAR_FOR_EVT_DEL, event, eventid, NULL); } static void api_ws_server_event_push(void *closure, const char *event, int eventid, struct json_object *object) { const char *data = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN); - api_ws_server_event_send(closure, '!', event, eventid, data ? : "null"); + api_ws_server_event_send(closure, CHAR_FOR_EVT_PUSH, event, eventid, data ? : "null"); json_object_put(object); } @@ -1059,7 +1240,7 @@ static void api_ws_server_event_broadcast(void *closure, const char *event, int struct writebuf wb = { .count = 0 }; - if (api_ws_write_char(&wb, '*') && api_ws_write_string(&wb, event) && api_ws_write_object(&wb, object)) { + if (api_ws_write_char(&wb, CHAR_FOR_EVT_BROADCAST) && api_ws_write_string(&wb, event) && api_ws_write_object(&wb, object)) { rc = afb_ws_binary_v(client->ws, wb.iovec, wb.count); if (rc < 0) ERROR("error while broadcasting event %s", event); @@ -1071,24 +1252,24 @@ static void api_ws_server_event_broadcast(void *closure, const char *event, int /******************* ws request part for server *****************/ /* decrement the reference count of the request and free/release it on falling to null */ -static void api_ws_server_req_destroy_cb(void *closure) +static void api_ws_server_req_destroy_cb(struct afb_xreq *xreq) { - struct api_ws_server_req *wreq = closure; + struct api_ws_server_req *wreq = CONTAINER_OF_XREQ(struct api_ws_server_req, xreq); afb_context_disconnect(&wreq->xreq.context); + afb_cred_unref(wreq->xreq.cred); json_object_put(wreq->xreq.json); - free(wreq->rcvdata); api_ws_server_client_unref(wreq->client); free(wreq); } -static void api_ws_server_req_success_cb(void *closure, struct json_object *obj, const char *info) +static void api_ws_server_req_success_cb(struct afb_xreq *xreq, struct json_object *obj, const char *info) { int rc; struct writebuf wb = { .count = 0 }; - struct api_ws_server_req *wreq = closure; + struct api_ws_server_req *wreq = CONTAINER_OF_XREQ(struct api_ws_server_req, xreq); - if (api_ws_write_char(&wb, 'T') + if (api_ws_write_char(&wb, CHAR_FOR_ANSWER_SUCCESS) && api_ws_write_uint32(&wb, wreq->msgid) && api_ws_write_uint32(&wb, (uint32_t)wreq->xreq.context.flags) && api_ws_write_string(&wb, info ? : "") @@ -1102,13 +1283,13 @@ success: json_object_put(obj); } -static void api_ws_server_req_fail_cb(void *closure, const char *status, const char *info) +static void api_ws_server_req_fail_cb(struct afb_xreq *xreq, const char *status, const char *info) { int rc; struct writebuf wb = { .count = 0 }; - struct api_ws_server_req *wreq = closure; + struct api_ws_server_req *wreq = CONTAINER_OF_XREQ(struct api_ws_server_req, xreq); - if (api_ws_write_char(&wb, 'F') + if (api_ws_write_char(&wb, CHAR_FOR_ANSWER_FAIL) && api_ws_write_uint32(&wb, wreq->msgid) && api_ws_write_uint32(&wb, (uint32_t)wreq->xreq.context.flags) && api_ws_write_string(&wb, status) @@ -1120,17 +1301,58 @@ static void api_ws_server_req_fail_cb(void *closure, const char *status, const c ERROR("error while sending fail"); } -static int api_ws_server_req_subscribe_cb(void *closure, struct afb_event event) +static void api_ws_server_req_subcall_cb(struct afb_xreq *xreq, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *cb_closure) +{ + int rc; + struct writebuf wb = { .count = 0 }; + struct api_ws_subcall *sc, *osc; + struct api_ws_server_req *wreq = CONTAINER_OF_XREQ(struct api_ws_server_req, xreq); + struct api_ws_client *client = wreq->client; + + sc = malloc(sizeof *sc); + if (!sc) { + + } else { + sc->callback = callback; + sc->closure = cb_closure; + + pthread_mutex_unlock(&client->mutex); + sc->subcallid = ptr2id(sc); + do { + sc->subcallid++; + osc = client->subcalls; + while(osc && osc->subcallid != sc->subcallid) + osc = osc->next; + } while (osc); + sc->next = client->subcalls; + client->subcalls = sc; + pthread_mutex_unlock(&client->mutex); + + if (api_ws_write_char(&wb, CHAR_FOR_SUBCALL_CALL) + && api_ws_write_uint32(&wb, wreq->msgid) + && api_ws_write_uint32(&wb, sc->subcallid) + && api_ws_write_string(&wb, api) + && api_ws_write_string(&wb, verb) + && api_ws_write_object(&wb, args)) { + rc = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count); + if (rc >= 0) + return; + } + ERROR("error while sending fail"); + } +} + +static int api_ws_server_req_subscribe_cb(struct afb_xreq *xreq, struct afb_event event) { int rc, rc2; struct writebuf wb = { .count = 0 }; - struct api_ws_server_req *wreq = closure; + struct api_ws_server_req *wreq = CONTAINER_OF_XREQ(struct api_ws_server_req, xreq); rc = afb_evt_add_watch(wreq->client->listener, event); if (rc < 0) return rc; - if (api_ws_write_char(&wb, 'S') + if (api_ws_write_char(&wb, CHAR_FOR_EVT_SUBSCRIBE) && api_ws_write_uint32(&wb, wreq->msgid) && api_ws_write_uint32(&wb, (uint32_t)afb_evt_event_id(event)) && api_ws_write_string(&wb, afb_evt_event_name(event))) { @@ -1143,13 +1365,13 @@ success: return rc; } -static int api_ws_server_req_unsubscribe_cb(void *closure, struct afb_event event) +static int api_ws_server_req_unsubscribe_cb(struct afb_xreq *xreq, struct afb_event event) { int rc, rc2; struct writebuf wb = { .count = 0 }; - struct api_ws_server_req *wreq = closure; + struct api_ws_server_req *wreq = CONTAINER_OF_XREQ(struct api_ws_server_req, xreq); - if (api_ws_write_char(&wb, 'U') + if (api_ws_write_char(&wb, CHAR_FOR_EVT_UNSUBSCRIBE) && api_ws_write_uint32(&wb, wreq->msgid) && api_ws_write_uint32(&wb, (uint32_t)afb_evt_event_id(event)) && api_ws_write_string(&wb, afb_evt_event_name(event))) { @@ -1212,7 +1434,7 @@ static int api_ws_server_connect(struct api_ws *api) } /* create the service */ -int afb_api_ws_add_server(const char *path) +int afb_api_ws_add_server(const char *path, struct afb_apiset *apiset) { int rc; struct api_ws *api; @@ -1227,6 +1449,7 @@ int afb_api_ws_add_server(const char *path) if (rc < 0) goto error2; + api->server.apiset = afb_apiset_addref(apiset); return 0; error2: