- /* configure the socket */
- if (fd >= 0) {
- fcntl(fd, F_SETFD, FD_CLOEXEC);
- fcntl(fd, F_SETFL, O_NONBLOCK);
- }
- return fd;
-}
-
-/******************* serialisation part **********************************/
-
-struct readbuf
-{
- char *head, *end;
-};
-
-#define WRITEBUF_COUNT_MAX 32
-struct writebuf
-{
- struct iovec iovec[WRITEBUF_COUNT_MAX];
- uint32_t uints[WRITEBUF_COUNT_MAX];
- int count;
-};
-
-static char *api_ws_read_get(struct readbuf *rb, uint32_t length)
-{
- char *before = rb->head;
- char *after = before + length;
- if (after > rb->end)
- return 0;
- rb->head = after;
- return before;
-}
-
-static int api_ws_read_char(struct readbuf *rb, char *value)
-{
- if (rb->head >= rb->end)
- return 0;
- *value = *rb->head++;
- return 1;
-}
-
-static int api_ws_read_uint32(struct readbuf *rb, uint32_t *value)
-{
- char *after = rb->head + sizeof *value;
- if (after > rb->end)
- return 0;
- memcpy(value, rb->head, sizeof *value);
- rb->head = after;
- *value = le32toh(*value);
- return 1;
-}
-
-static int api_ws_read_string(struct readbuf *rb, const char **value, size_t *length)
-{
- uint32_t len;
- if (!api_ws_read_uint32(rb, &len) || !len)
- return 0;
- if (length)
- *length = (size_t)(len - 1);
- return (*value = api_ws_read_get(rb, len)) != NULL && rb->head[-1] == 0;
-}
-
-static int api_ws_read_object(struct readbuf *rb, struct json_object **object)
-{
- const char *string;
- struct json_object *o;
- int rc = api_ws_read_string(rb, &string, NULL);
- if (rc) {
- o = json_tokener_parse(string);
- if (o == NULL && strcmp(string, "null"))
- o = json_object_new_string(string);
- *object = o;
- }
- return rc;
-}
-
-static int api_ws_write_put(struct writebuf *wb, const void *value, size_t length)
-{
- int i = wb->count;
- if (i == WRITEBUF_COUNT_MAX)
- return 0;
- wb->iovec[i].iov_base = (void*)value;
- wb->iovec[i].iov_len = length;
- wb->count = i + 1;
- return 1;
-}
-
-static int api_ws_write_char(struct writebuf *wb, char value)
-{
- int i = wb->count;
- if (i == WRITEBUF_COUNT_MAX)
- return 0;
- *(char*)&wb->uints[i] = value;
- wb->iovec[i].iov_base = &wb->uints[i];
- wb->iovec[i].iov_len = 1;
- wb->count = i + 1;
- return 1;
-}
-
-static int api_ws_write_uint32(struct writebuf *wb, uint32_t value)
-{
- int i = wb->count;
- if (i == WRITEBUF_COUNT_MAX)
- return 0;
- wb->uints[i] = htole32(value);
- wb->iovec[i].iov_base = &wb->uints[i];
- wb->iovec[i].iov_len = sizeof wb->uints[i];
- wb->count = i + 1;
- return 1;
-}
-
-static int api_ws_write_string_length(struct writebuf *wb, const char *value, size_t length)
-{
- uint32_t len = (uint32_t)++length;
- return (size_t)len == length && len && api_ws_write_uint32(wb, len) && api_ws_write_put(wb, value, length);
-}
-
-static int api_ws_write_string(struct writebuf *wb, const char *value)
-{
- return api_ws_write_string_length(wb, value, strlen(value));
-}
-
-static int api_ws_write_object(struct writebuf *wb, struct json_object *object)
-{
- const char *string = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN);
- return string != NULL && api_ws_write_string(wb, string);
-}
-
-/******************* client part **********************************/
-
-/*
- * structure for recording query data
- */
-struct api_ws_memo {
- struct api_ws_memo *next; /* the next memo */
- struct api_ws *api; /* the ws api */
- struct afb_xreq *xreq; /* the request handle */
- uint32_t msgid; /* the message identifier */
-};
-
-struct api_ws_event
-{
- struct api_ws_event *next;
- struct afb_event event;
- int eventid;
- int refcount;
-};
-
-/* search a memorized request */
-static struct api_ws_memo *api_ws_client_memo_search(struct api_ws *api, uint32_t msgid)
-{
- struct api_ws_memo *memo;
-
- memo = api->client.memos;
- while (memo != NULL && memo->msgid != msgid)
- memo = memo->next;
-
- return memo;
-}
-
-/* search the event */
-static struct api_ws_event *api_ws_client_event_search(struct api_ws *api, uint32_t eventid, const char *name)
-{
- struct api_ws_event *ev;
-
- ev = api->client.events;
- while (ev != NULL && (ev->eventid != eventid || 0 != strcmp(afb_evt_event_name(ev->event), name)))
- ev = ev->next;
-
- return ev;
-}
-
-
-/* allocates and init the memorizing data */
-static struct api_ws_memo *api_ws_client_memo_make(struct api_ws *api, struct afb_xreq *xreq)
-{
- struct api_ws_memo *memo;
-
- memo = malloc(sizeof *memo);
- if (memo != NULL) {
- afb_xreq_addref(xreq);
- memo->xreq = xreq;
- do { memo->msgid = ++api->client.id; } while(api_ws_client_memo_search(api, memo->msgid) != NULL);
- memo->api = api;
- memo->next = api->client.memos;
- api->client.memos = memo;
- }
- return memo;
-}
-
-/* free and release the memorizing data */
-static void api_ws_client_memo_destroy(struct api_ws_memo *memo)
-{
- struct api_ws_memo **prv;
-
- prv = &memo->api->client.memos;
- while (*prv != NULL) {
- if (*prv == memo) {
- *prv = memo->next;
- break;
- }
- prv = &(*prv)->next;
- }
-
- afb_xreq_unref(memo->xreq);
- free(memo);
-}
-
-/* get event data from the message */
-static int api_ws_client_msg_event_read(struct readbuf *rb, uint32_t *eventid, const char **name)
-{
- return api_ws_read_uint32(rb, eventid) && api_ws_read_string(rb, name, NULL);
-}
-
-/* get event from the message */
-static int api_ws_client_msg_event_get(struct api_ws *api, struct readbuf *rb, struct api_ws_event **ev)
-{
- const char *name;
- uint32_t eventid;
-
- /* get event data from the message */
- if (!api_ws_client_msg_event_read(rb, &eventid, &name)) {
- ERROR("Invalid message");
- return 0;
- }
-
- /* check conflicts */
- *ev = api_ws_client_event_search(api, eventid, name);
- if (*ev == NULL) {
- ERROR("event %s not found", name);
- return 0;
- }
-
- return 1;
-}
-
-/* get event from the message */
-static int api_ws_client_msg_memo_get(struct api_ws *api, struct readbuf *rb, struct api_ws_memo **memo)
-{
- uint32_t msgid;
-
- /* get event data from the message */
- if (!api_ws_read_uint32(rb, &msgid)) {
- ERROR("Invalid message");
- return 0;
- }
-
- /* get the memo */
- *memo = api_ws_client_memo_search(api, msgid);
- if (*memo == NULL) {
- ERROR("message not found");
- return 0;
- }
-
- return 1;
-}
-
-/* read a subscrition message */
-static int api_ws_client_msg_subscription_get(struct api_ws *api, struct readbuf *rb, struct api_ws_memo **memo, struct api_ws_event **ev)
-{
- return api_ws_client_msg_memo_get(api, rb, memo) && api_ws_client_msg_event_get(api, rb, ev);
-}
-
-/* adds an event */
-static void api_ws_client_event_create(struct api_ws *api, struct readbuf *rb)
-{
- size_t offset;
- const char *name;
- uint32_t eventid;
- struct api_ws_event *ev;
-
- /* get event data from the message */
- offset = api_ws_client_msg_event_read(rb, &eventid, &name);
- if (offset == 0) {
- ERROR("Invalid message");
- return;
- }
-
- /* check conflicts */
- ev = api_ws_client_event_search(api, eventid, name);
- if (ev != NULL) {
- ev->refcount++;
- return;
- }
-
- /* no conflict, try to add it */
- ev = malloc(sizeof *ev);
- if (ev != NULL) {
- ev->event = afb_evt_create_event(name);
- if (ev->event.closure == NULL)
- free(ev);
- else {
- ev->refcount = 1;
- ev->eventid = eventid;
- ev->next = api->client.events;
- api->client.events = ev;
- return;
- }
- }
- ERROR("can't create event %s, out of memory", name);
-}
-
-/* removes an event */
-static void api_ws_client_event_drop(struct api_ws *api, struct readbuf *rb)
-{
- struct api_ws_event *ev, **prv;
-
- /* retrieves the event */
- if (!api_ws_client_msg_event_get(api, rb, &ev))
- return;
-
- /* decrease the reference count */
- if (--ev->refcount)
- return;
-
- /* unlinks the event */
- prv = &api->client.events;
- while (*prv != ev)
- prv = &(*prv)->next;
- *prv = ev->next;
-
- /* destroys the event */
- afb_event_drop(ev->event);
- free(ev);
-}
-
-/* subscribes an event */
-static void api_ws_client_event_subscribe(struct api_ws *api, struct readbuf *rb)
-{
- struct api_ws_event *ev;
- struct api_ws_memo *memo;
-
- if (api_ws_client_msg_subscription_get(api, rb, &memo, &ev)) {
- /* subscribe the request from the event */
- if (afb_xreq_subscribe(memo->xreq, ev->event) < 0)
- ERROR("can't subscribe: %m");
- }
-}
-
-/* unsubscribes an event */
-static void api_ws_client_event_unsubscribe(struct api_ws *api, struct readbuf *rb)
-{
- struct api_ws_event *ev;
- struct api_ws_memo *memo;
-
- if (api_ws_client_msg_subscription_get(api, rb, &memo, &ev)) {
- /* unsubscribe the request from the event */
- if (afb_xreq_unsubscribe(memo->xreq, ev->event) < 0)
- ERROR("can't unsubscribe: %m");
- }
-}
-
-/* receives broadcasted events */
-static void api_ws_client_event_broadcast(struct api_ws *api, struct readbuf *rb)
-{
- struct json_object *object;
- const char *event;
-
- if (api_ws_read_string(rb, &event, NULL) && api_ws_read_object(rb, &object))
- afb_evt_broadcast(event, object);
- else
- ERROR("unreadable broadcasted event");
-}
-
-/* pushs an event */
-static void api_ws_client_event_push(struct api_ws *api, struct readbuf *rb)
-{
- struct api_ws_event *ev;
- struct json_object *object;
-
- if (api_ws_client_msg_event_get(api, rb, &ev) && api_ws_read_object(rb, &object))
- afb_event_push(ev->event, object);
- else
- ERROR("unreadable push event");
-}
-
-static void api_ws_client_reply_success(struct api_ws *api, struct readbuf *rb)
-{
- struct api_ws_memo *memo;
- struct json_object *object;
- const char *info;
- uint32_t flags;
-
- /* retrieve the message data */
- if (!api_ws_client_msg_memo_get(api, rb, &memo))
- return;
-
- if (api_ws_read_uint32(rb, &flags)
- && api_ws_read_string(rb, &info, NULL)
- && api_ws_read_object(rb, &object)) {
- memo->xreq->context.flags = (unsigned)flags;
- afb_xreq_success(memo->xreq, object, *info ? info : NULL);
- } else {
- /* failing to have the answer */
- afb_xreq_fail(memo->xreq, "error", "ws error");
- }
- api_ws_client_memo_destroy(memo);
-}
-
-static void api_ws_client_reply_fail(struct api_ws *api, struct readbuf *rb)
-{
- struct api_ws_memo *memo;
- const char *info, *status;
- uint32_t flags;
-
- /* retrieve the message data */
- if (!api_ws_client_msg_memo_get(api, rb, &memo))
- return;
-
- if (api_ws_read_uint32(rb, &flags)
- && api_ws_read_string(rb, &status, NULL)
- && api_ws_read_string(rb, &info, NULL)) {
- memo->xreq->context.flags = (unsigned)flags;
- afb_xreq_fail(memo->xreq, status, *info ? info : NULL);
- } else {
- /* failing to have the answer */
- afb_xreq_fail(memo->xreq, "error", "ws error");
- }
- api_ws_client_memo_destroy(memo);
-}
-
-/* send a subcall reply */
-static void api_ws_client_send_subcall_reply(struct api_ws_reply *reply, int iserror, json_object *object)
-{
- int rc;
- struct writebuf wb = { .count = 0 };
- char ie = (char)!!iserror;
-
- if (!api_ws_write_char(&wb, CHAR_FOR_SUBCALL_REPLY)
- || !api_ws_write_uint32(&wb, reply->subcallid)
- || !api_ws_write_char(&wb, ie)
- || !api_ws_write_object(&wb, object)) {
- /* write error ? */
- return;
- }
-
- rc = afb_ws_binary_v(reply->apiws->client.ws, wb.iovec, wb.count);
- if (rc >= 0)
- return;
- ERROR("error while sending subcall reply");
-}
-
-/* callback for subcall reply */
-static void api_ws_client_subcall_reply_cb(void *closure, int iserror, json_object *object)
-{
- api_ws_client_send_subcall_reply(closure, iserror, object);
- free(closure);
-}
-
-/* received a subcall request */
-static void api_ws_client_subcall(struct api_ws *apiws, struct readbuf *rb)
-{
- struct api_ws_reply *reply;
- struct api_ws_memo *memo;
- const char *api, *verb;
- uint32_t subcallid;
- struct json_object *object;
-
- reply = malloc(sizeof *reply);
- if (!reply)
- return;
-
- /* retrieve the message data */
- if (!api_ws_client_msg_memo_get(apiws, rb, &memo))
- return;
-
- if (api_ws_read_uint32(rb, &subcallid)
- && api_ws_read_string(rb, &api, NULL)
- && api_ws_read_string(rb, &verb, NULL)
- && api_ws_read_object(rb, &object)) {
- reply->apiws = apiws;
- reply->subcallid = subcallid;
- afb_xreq_subcall(memo->xreq, api, verb, object, api_ws_client_subcall_reply_cb, reply);
- }
-}
-
-/* callback when receiving binary data */
-static void api_ws_client_on_binary(void *closure, char *data, size_t size)
-{
- if (size > 0) {
- struct api_ws *apiws = closure;
- struct readbuf rb = { .head = data, .end = data + size };
-
- pthread_mutex_lock(&apiws->mutex);
- switch (*rb.head++) {
- case CHAR_FOR_ANSWER_SUCCESS: /* success */
- api_ws_client_reply_success(apiws, &rb);
- break;
- case CHAR_FOR_ANSWER_FAIL: /* fail */
- api_ws_client_reply_fail(apiws, &rb);
- break;
- case CHAR_FOR_EVT_BROADCAST: /* broadcast */
- api_ws_client_event_broadcast(apiws, &rb);
- break;
- case CHAR_FOR_EVT_ADD: /* creates the event */
- api_ws_client_event_create(apiws, &rb);
- break;
- case CHAR_FOR_EVT_DEL: /* drops the event */
- api_ws_client_event_drop(apiws, &rb);
- break;
- case CHAR_FOR_EVT_PUSH: /* pushs the event */
- api_ws_client_event_push(apiws, &rb);
- break;
- case CHAR_FOR_EVT_SUBSCRIBE: /* subscribe event for a request */
- api_ws_client_event_subscribe(apiws, &rb);
- break;
- case CHAR_FOR_EVT_UNSUBSCRIBE: /* unsubscribe event for a request */
- api_ws_client_event_unsubscribe(apiws, &rb);
- break;
- case CHAR_FOR_SUBCALL_CALL: /* subcall */
- api_ws_client_subcall(apiws, &rb);
- break;
- default: /* unexpected message */
- /* TODO: close the connection */
- break;
- }
- pthread_mutex_unlock(&apiws->mutex);
- }
- free(data);
-}
-
-/* on call, propagate it to the ws service */
-static void api_ws_client_call_cb(void * closure, struct afb_xreq *xreq)
-{
- int rc;
- struct api_ws_memo *memo;
- struct writebuf wb = { .count = 0 };
- const char *raw;
- size_t szraw;
- struct api_ws *apiws = closure;
-
- pthread_mutex_lock(&apiws->mutex);
-
- /* create the recording data */
- memo = api_ws_client_memo_make(apiws, xreq);
- if (memo == NULL) {
- afb_xreq_fail_f(xreq, "error", "out of memory");
- goto end;
- }
-
- /* creates the call message */
- raw = afb_xreq_raw(xreq, &szraw);
- if (raw == NULL)
- goto internal_error;
- if (!api_ws_write_char(&wb, CHAR_FOR_CALL)
- || !api_ws_write_uint32(&wb, memo->msgid)
- || !api_ws_write_uint32(&wb, (uint32_t)xreq->context.flags)
- || !api_ws_write_string(&wb, xreq->verb)
- || !api_ws_write_string(&wb, afb_session_uuid(xreq->context.session))
- || !api_ws_write_string_length(&wb, raw, szraw))
- goto overflow;
-
- /* send */
- rc = afb_ws_binary_v(apiws->client.ws, wb.iovec, wb.count);
- if (rc >= 0)
- goto end;
-
- afb_xreq_fail(xreq, "error", "websocket sending error");
- goto clean_memo;
-
-internal_error:
- afb_xreq_fail(xreq, "error", "internal: raw is NULL!");
- goto clean_memo;
-
-overflow:
- afb_xreq_fail(xreq, "error", "overflow: size doesn't match 32 bits!");
-
-clean_memo:
- api_ws_client_memo_destroy(memo);
-end:
- pthread_mutex_unlock(&apiws->mutex);
-}
-
-static int api_ws_service_start_cb(void *closure, int share_session, int onneed)
-{
- struct api_ws *api = closure;
-
- /* not an error when onneed */
- if (onneed != 0)
- return 0;
-
- /* already started: it is an error */
- ERROR("The WS binding %s is not a startable service", api->path);
- return -1;
-}
-
-/* */
-static void api_ws_client_disconnect(struct api_ws *api)
-{
- if (api->fd >= 0) {
- afb_ws_destroy(api->client.ws);
- api->client.ws = NULL;
- close(api->fd);
- api->fd = -1;
- }
-}
-
-/* */
-static int api_ws_client_connect(struct api_ws *api)
-{
- struct afb_ws *ws;
- int fd;
-
- fd = api_ws_socket(api->path, 0);
- if (fd >= 0) {
- ws = afb_ws_create(afb_common_get_event_loop(), fd, &api_ws_client_ws_itf, api);
- if (ws != NULL) {
- api->client.ws = ws;
- api->fd = fd;
- return 0;
- }
- close(fd);
- }
- return -1;
-}
-
-static struct afb_api_itf ws_api_itf = {
- .call = api_ws_client_call_cb,
- .service_start = api_ws_service_start_cb
-};
-
-/* adds a afb-ws-service client api */
-int afb_api_ws_add_client(const char *path)
-{
- int rc;
- struct api_ws *api;
- struct afb_api afb_api;
-
- /* create the ws client api */
- api = api_ws_make(path);
- if (api == NULL)
- goto error;
-
- /* connect to the service */
- rc = api_ws_client_connect(api);
- if (rc < 0) {
- ERROR("can't connect to ws service %s", api->path);
- goto error2;
- }
-
- /* record it as an API */
- afb_api.closure = api;
- afb_api.itf = &ws_api_itf;
- if (afb_apis_add(api->api, afb_api) < 0)
- goto error3;
-
- return 0;
-
-error3:
- api_ws_client_disconnect(api);
-error2:
- free(api);