From c63291ad456fd8584e0fed454baa5f0322beb27e Mon Sep 17 00:00:00 2001 From: =?utf8?q?Jos=C3=A9=20Bollo?= Date: Mon, 3 Apr 2017 17:35:03 +0200 Subject: [PATCH] Switch API websocket to xreq MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Change-Id: I57600d8dc99bf37f207b126a0e3ab5731ad08ced Signed-off-by: José Bollo --- src/afb-api-ws.c | 214 ++++++++++++++----------------------------------------- 1 file changed, 53 insertions(+), 161 deletions(-) diff --git a/src/afb-api-ws.c b/src/afb-api-ws.c index 89b83bc5..82671875 100644 --- a/src/afb-api-ws.c +++ b/src/afb-api-ws.c @@ -44,7 +44,7 @@ #include "afb-api-so.h" #include "afb-context.h" #include "afb-evt.h" -#include "afb-subcall.h" +#include "afb-xreq.h" #include "verbose.h" #include "sd-fds.h" @@ -147,47 +147,26 @@ static const struct afb_ws_itf api_ws_server_ws_itf = * structure for a ws request */ struct api_ws_server_req { - struct afb_context context; /* the context, should be THE FIRST */ + struct afb_xreq xreq; /* the xreq */ struct api_ws_client *client; /* the client of the request */ char *rcvdata; /* the received data to free */ - struct json_object *json; /* the readen request as object */ const char *request; /* the readen request as string */ size_t lenreq; /* the length of the request */ - int refcount; /* reference count of the request */ uint32_t msgid; /* the incoming request msgid */ }; -static struct json_object *api_ws_server_req_json(struct api_ws_server_req *wreq); -static void api_ws_server_req_unref(struct api_ws_server_req *wreq); - -static struct json_object *api_ws_server_req_json_cb(void *closure); -static struct afb_arg api_ws_server_req_get_cb(void *closure, const char *name); static void api_ws_server_req_success_cb(void *closure, struct json_object *obj, const char *info); static void api_ws_server_req_fail_cb(void *closure, const char *status, const char *info); -static const char *api_ws_server_req_raw_cb(void *closure, size_t *size); -static void api_ws_server_req_send_cb(void *closure, const char *buffer, size_t size); -static void api_ws_server_req_addref_cb(void *closure); -static void api_ws_server_req_unref_cb(void *closure); +static void api_ws_server_req_destroy_cb(void *closure); static int api_ws_server_req_subscribe_cb(void *closure, struct afb_event event); static int api_ws_server_req_unsubscribe_cb(void *closure, struct afb_event event); -static void api_ws_server_req_subcall_cb(void *closure, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *cb_closure); -const struct afb_req_itf afb_api_ws_req_itf = { - .json = api_ws_server_req_json_cb, - .get = api_ws_server_req_get_cb, +const struct afb_xreq_query_itf afb_api_ws_xreq_itf = { .success = api_ws_server_req_success_cb, .fail = api_ws_server_req_fail_cb, - .raw = api_ws_server_req_raw_cb, - .send = api_ws_server_req_send_cb, - .context_get = (void*)afb_context_get, - .context_set = (void*)afb_context_set, - .addref = api_ws_server_req_addref_cb, - .unref = api_ws_server_req_unref_cb, - .session_close = (void*)afb_context_close, - .session_set_LOA = (void*)afb_context_change_loa, + .unref = api_ws_server_req_destroy_cb, .subscribe = api_ws_server_req_subscribe_cb, - .unsubscribe = api_ws_server_req_unsubscribe_cb, - .subcall = api_ws_server_req_subcall_cb + .unsubscribe = api_ws_server_req_unsubscribe_cb }; /******************* common part **********************************/ @@ -216,7 +195,7 @@ static struct api_ws *api_ws_make(const char *path) while (length && path[length - 1] != '/' && path[length - 1] != ':') length = length - 1; api->api = &api->path[length]; - if (api->api == NULL || !afb_apis_is_valid_api_name(++api->api)) { + if (api->api == NULL || !afb_apis_is_valid_api_name(api->api)) { errno = EINVAL; goto error2; } @@ -459,8 +438,11 @@ static int api_ws_write_object(struct writebuf *wb, struct json_object *object) struct api_ws_memo { struct api_ws_memo *next; /* the next memo */ struct api_ws *api; /* the ws api */ + struct afb_xreq *xreq; /* the request handle */ +#if 0 struct afb_req req; /* the request handle */ struct afb_context *context; /* the context of the query */ +#endif uint32_t msgid; /* the message identifier */ }; @@ -498,15 +480,14 @@ static struct api_ws_event *api_ws_client_event_search(struct api_ws *api, uint3 /* allocates and init the memorizing data */ -static struct api_ws_memo *api_ws_client_memo_make(struct api_ws *api, struct afb_req req, struct afb_context *context) +static struct api_ws_memo *api_ws_client_memo_make(struct api_ws *api, struct afb_xreq *xreq) { struct api_ws_memo *memo; memo = malloc(sizeof *memo); if (memo != NULL) { - afb_req_addref(req); - memo->req = req; - memo->context = context; + afb_xreq_addref(xreq); + memo->xreq = xreq; do { memo->msgid = ++api->client.id; } while(api_ws_client_memo_search(api, memo->msgid) != NULL); memo->api = api; memo->next = api->client.memos; @@ -529,7 +510,7 @@ static void api_ws_client_memo_destroy(struct api_ws_memo *memo) prv = &(*prv)->next; } - afb_req_unref(memo->req); + afb_xreq_unref(memo->xreq); free(memo); } @@ -659,7 +640,7 @@ static void api_ws_client_event_subscribe(struct api_ws *api, struct readbuf *rb if (api_ws_client_msg_subscription_get(api, rb, &ev, &memo)) { /* subscribe the request from the event */ - if (afb_req_subscribe(memo->req, ev->event) < 0) + if (afb_xreq_subscribe(memo->xreq, ev->event) < 0) ERROR("can't subscribe: %m"); } } @@ -672,7 +653,7 @@ static void api_ws_client_event_unsubscribe(struct api_ws *api, struct readbuf * if (api_ws_client_msg_subscription_get(api, rb, &ev, &memo)) { /* unsubscribe the request from the event */ - if (afb_req_unsubscribe(memo->req, ev->event) < 0) + if (afb_xreq_unsubscribe(memo->xreq, ev->event) < 0) ERROR("can't unsubscribe: %m"); } } @@ -715,11 +696,11 @@ static void api_ws_client_reply_success(struct api_ws *api, struct readbuf *rb) if (api_ws_read_uint32(rb, &flags) && api_ws_read_string(rb, &info, NULL) && api_ws_read_object(rb, &object)) { - memo->context->flags = (unsigned)flags; - afb_req_success(memo->req, object, *info ? info : NULL); + memo->xreq->context.flags = (unsigned)flags; + afb_xreq_success(memo->xreq, object, *info ? info : NULL); } else { /* failing to have the answer */ - afb_req_fail(memo->req, "error", "ws error"); + afb_xreq_fail(memo->xreq, "error", "ws error"); } api_ws_client_memo_destroy(memo); } @@ -737,33 +718,11 @@ static void api_ws_client_reply_fail(struct api_ws *api, struct readbuf *rb) if (api_ws_read_uint32(rb, &flags) && api_ws_read_string(rb, &status, NULL) && api_ws_read_string(rb, &info, NULL)) { - memo->context->flags = (unsigned)flags; - afb_req_fail(memo->req, status, *info ? info : NULL); + memo->xreq->context.flags = (unsigned)flags; + afb_xreq_fail(memo->xreq, status, *info ? info : NULL); } else { /* failing to have the answer */ - afb_req_fail(memo->req, "error", "ws error"); - } - api_ws_client_memo_destroy(memo); -} - -static void api_ws_client_reply_send(struct api_ws *api, struct readbuf *rb) -{ - struct api_ws_memo *memo; - const char *data; - size_t length; - uint32_t flags; - - /* retrieve the message data */ - if (!api_ws_client_msg_memo_get(api, rb, &memo)) - return; - - if (api_ws_read_uint32(rb, &flags) - && api_ws_read_string(rb, &data, &length)) { - memo->context->flags = (unsigned)flags; - afb_req_send(memo->req, data, length); - } else { - /* failing to have the answer */ - afb_req_fail(memo->req, "error", "ws error"); + afb_xreq_fail(memo->xreq, "error", "ws error"); } api_ws_client_memo_destroy(memo); } @@ -780,9 +739,6 @@ static void api_ws_client_on_binary(void *closure, char *data, size_t size) case 'F': /* fail */ api_ws_client_reply_fail(closure, &rb); break; - case 'X': /* send */ - api_ws_client_reply_send(closure, &rb); - break; case '*': /* broadcast */ api_ws_client_event_broadcast(closure, &rb); break; @@ -809,7 +765,7 @@ static void api_ws_client_on_binary(void *closure, char *data, size_t size) } /* on call, propagate it to the ws service */ -static void api_ws_client_call_cb(void * closure, struct afb_req req, struct afb_context *context, const char *verb) +static void api_ws_client_xcall_cb(void * closure, struct afb_xreq *xreq) { int rc; struct api_ws_memo *memo; @@ -819,20 +775,20 @@ static void api_ws_client_call_cb(void * closure, struct afb_req req, struct afb struct api_ws *api = closure; /* create the recording data */ - memo = api_ws_client_memo_make(api, req, context); + memo = api_ws_client_memo_make(api, xreq); if (memo == NULL) { - afb_req_fail(req, "error", "out of memory"); + afb_xreq_fail_f(xreq, "error", "out of memory"); return; } /* creates the call message */ - raw = afb_req_raw(req, &szraw); + raw = afb_xreq_raw(xreq, &szraw); if (raw == NULL) goto internal_error; if (!api_ws_write_uint32(&wb, memo->msgid) - || !api_ws_write_uint32(&wb, (uint32_t)context->flags) - || !api_ws_write_string(&wb, verb) - || !api_ws_write_string(&wb, afb_session_uuid(context->session)) + || !api_ws_write_uint32(&wb, (uint32_t)xreq->context.flags) + || !api_ws_write_string(&wb, xreq->verb) + || !api_ws_write_string(&wb, afb_session_uuid(xreq->context.session)) || !api_ws_write_string_length(&wb, raw, szraw)) goto overflow; @@ -843,15 +799,15 @@ static void api_ws_client_call_cb(void * closure, struct afb_req req, struct afb return; ws_send_error: - afb_req_fail(req, "error", "websocket sending error"); + afb_xreq_fail(xreq, "error", "websocket sending error"); goto clean_memo; internal_error: - afb_req_fail(req, "error", "internal: raw is NULL!"); + afb_xreq_fail(xreq, "error", "internal: raw is NULL!"); goto clean_memo; overflow: - afb_req_fail(req, "error", "overflow: size doesn't match 32 bits!"); + afb_xreq_fail(xreq, "error", "overflow: size doesn't match 32 bits!"); clean_memo: api_ws_client_memo_destroy(memo); @@ -921,7 +877,7 @@ int afb_api_ws_add_client(const char *path) /* record it as an API */ afb_api.closure = api; - afb_api.call = api_ws_client_call_cb; + afb_api.xcall = api_ws_client_xcall_cb; afb_api.service_start = api_ws_service_start_cb; if (afb_apis_add(api->api, afb_api) < 0) goto error3; @@ -951,7 +907,6 @@ static void api_ws_server_client_unref(struct api_ws_client *client) static void api_ws_server_called(struct api_ws_client *client, struct readbuf *rb, char *data, size_t size) { struct api_ws_server_req *wreq; - struct afb_req areq; const char *uuid, *verb; uint32_t flags; @@ -964,7 +919,6 @@ static void api_ws_server_called(struct api_ws_client *client, struct readbuf *r wreq->client = client; wreq->rcvdata = data; - wreq->refcount = 1; /* reads the call message data */ if (!api_ws_read_uint32(rb, &wreq->msgid) @@ -974,16 +928,24 @@ static void api_ws_server_called(struct api_ws_client *client, struct readbuf *r || !api_ws_read_string(rb, &wreq->request, &wreq->lenreq)) goto overflow; + wreq->xreq.json = json_tokener_parse(wreq->request); + if (wreq->xreq.json == NULL && strcmp(wreq->request, "null")) { + wreq->xreq.json = json_object_new_string(wreq->request); + } + /* init the context */ - if (afb_context_connect(&wreq->context, uuid, NULL) < 0) + if (afb_context_connect(&wreq->xreq.context, uuid, NULL) < 0) goto out_of_memory; - wreq->context.flags = flags; + wreq->xreq.context.flags = flags; /* makes the call */ - areq.itf = &afb_api_ws_req_itf; - areq.closure = wreq; - afb_apis_call(areq, &wreq->context, client->api, verb); - api_ws_server_req_unref(wreq); + wreq->xreq.refcount = 1; + wreq->xreq.api = client->api; + wreq->xreq.verb = verb; + wreq->xreq.query = wreq; + wreq->xreq.queryitf = &afb_api_ws_xreq_itf; + afb_apis_xcall(&wreq->xreq); + afb_xreq_unref(&wreq->xreq); return; out_of_memory: @@ -1097,56 +1059,18 @@ static void api_ws_server_event_broadcast(void *closure, const char *event, int /******************* ws request part for server *****************/ -/* increment the reference count of the request */ -static void api_ws_server_req_addref_cb(void *closure) -{ - struct api_ws_server_req *wreq = closure; - wreq->refcount++; -} - /* decrement the reference count of the request and free/release it on falling to null */ -static void api_ws_server_req_unref_cb(void *closure) +static void api_ws_server_req_destroy_cb(void *closure) { - api_ws_server_req_unref(closure); -} - -static void api_ws_server_req_unref(struct api_ws_server_req *wreq) -{ - if (wreq == NULL || --wreq->refcount) - return; + struct api_ws_server_req *wreq = closure; - afb_context_disconnect(&wreq->context); - json_object_put(wreq->json); + afb_context_disconnect(&wreq->xreq.context); + json_object_put(wreq->xreq.json); free(wreq->rcvdata); api_ws_server_client_unref(wreq->client); free(wreq); } -/* get the object of the request */ -static struct json_object *api_ws_server_req_json_cb(void *closure) -{ - return api_ws_server_req_json(closure); -} - -static struct json_object *api_ws_server_req_json(struct api_ws_server_req *wreq) -{ - if (wreq->json == NULL) { - wreq->json = json_tokener_parse(wreq->request); - if (wreq->json == NULL && strcmp(wreq->request, "null")) { - /* lazy error detection of json request. Is it to improve? */ - wreq->json = json_object_new_string(wreq->request); - } - } - return wreq->json; -} - -/* get the argument of the request of 'name' */ -static struct afb_arg api_ws_server_req_get_cb(void *closure, const char *name) -{ - struct api_ws_server_req *wreq = closure; - return afb_msg_json_get_arg(api_ws_server_req_json(wreq), name); -} - static void api_ws_server_req_success_cb(void *closure, struct json_object *obj, const char *info) { int rc; @@ -1155,7 +1079,7 @@ static void api_ws_server_req_success_cb(void *closure, struct json_object *obj, if (api_ws_write_char(&wb, 'T') && api_ws_write_uint32(&wb, wreq->msgid) - && api_ws_write_uint32(&wb, (uint32_t)wreq->context.flags) + && api_ws_write_uint32(&wb, (uint32_t)wreq->xreq.context.flags) && api_ws_write_string(&wb, info ? : "") && api_ws_write_object(&wb, obj)) { rc = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count); @@ -1175,7 +1099,7 @@ static void api_ws_server_req_fail_cb(void *closure, const char *status, const c if (api_ws_write_char(&wb, 'F') && api_ws_write_uint32(&wb, wreq->msgid) - && api_ws_write_uint32(&wb, (uint32_t)wreq->context.flags) + && api_ws_write_uint32(&wb, (uint32_t)wreq->xreq.context.flags) && api_ws_write_string(&wb, status) && api_ws_write_string(&wb, info ? : "")) { rc = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count); @@ -1185,32 +1109,6 @@ static void api_ws_server_req_fail_cb(void *closure, const char *status, const c ERROR("error while sending fail"); } -static const char *api_ws_server_req_raw_cb(void *closure, size_t *size) -{ - struct api_ws_server_req *wreq = closure; - if (size != NULL) - *size = wreq->lenreq; - return wreq->request; -} - -static void api_ws_server_req_send_cb(void *closure, const char *buffer, size_t size) -{ - /* TODO: how to put sized buffer as strings? things aren't clear here!!! */ - int rc; - struct writebuf wb = { .count = 0 }; - struct api_ws_server_req *wreq = closure; - - if (api_ws_write_char(&wb, 'X') - && api_ws_write_uint32(&wb, wreq->msgid) - && api_ws_write_uint32(&wb, (uint32_t)wreq->context.flags) - && api_ws_write_string_length(&wb, buffer, size)) { - rc = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count); - if (rc >= 0) - return; - } - ERROR("error while sending raw"); -} - static int api_ws_server_req_subscribe_cb(void *closure, struct afb_event event) { int rc, rc2; @@ -1254,12 +1152,6 @@ success: return rc; } -static void api_ws_server_req_subcall_cb(void *closure, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *cb_closure) -{ - struct api_ws_server_req *wreq = closure; - afb_subcall(&wreq->context, api, verb, args, callback, cb_closure, (struct afb_req){ .itf = &afb_api_ws_req_itf, .closure = wreq }); -} - /******************* server part **********************************/ static int api_ws_server_connect(struct api_ws *api); -- 2.16.6