- ERROR("can't create event %s, out of memory", name);
-}
-
-/* removes an event */
-static void api_ws_client_event_drop(struct api_ws *api, struct readbuf *rb)
-{
- struct api_ws_event *ev, **prv;
-
- /* retrieves the event */
- if (!api_ws_client_msg_event_get(api, rb, &ev))
- return;
-
- /* decrease the reference count */
- if (--ev->refcount)
- return;
-
- /* unlinks the event */
- prv = &api->client.events;
- while (*prv != ev)
- prv = &(*prv)->next;
- *prv = ev->next;
-
- /* destroys the event */
- afb_event_drop(ev->event);
- free(ev);
-}
-
-/* subscribes an event */
-static void api_ws_client_event_subscribe(struct api_ws *api, struct readbuf *rb)
-{
- struct api_ws_event *ev;
- struct api_ws_memo *memo;
-
- if (api_ws_client_msg_subscription_get(api, rb, &memo, &ev)) {
- /* subscribe the request from the event */
- if (afb_xreq_subscribe(memo->xreq, ev->event) < 0)
- ERROR("can't subscribe: %m");
- }
-}
-
-/* unsubscribes an event */
-static void api_ws_client_event_unsubscribe(struct api_ws *api, struct readbuf *rb)
-{
- struct api_ws_event *ev;
- struct api_ws_memo *memo;
-
- if (api_ws_client_msg_subscription_get(api, rb, &memo, &ev)) {
- /* unsubscribe the request from the event */
- if (afb_xreq_unsubscribe(memo->xreq, ev->event) < 0)
- ERROR("can't unsubscribe: %m");
- }
-}
-
-/* receives broadcasted events */
-static void api_ws_client_event_broadcast(struct api_ws *api, struct readbuf *rb)
-{
- struct json_object *object;
- const char *event;
-
- if (api_ws_read_string(rb, &event, NULL) && api_ws_read_object(rb, &object))
- afb_evt_broadcast(event, object);
- else
- ERROR("unreadable broadcasted event");
-}
-
-/* pushs an event */
-static void api_ws_client_event_push(struct api_ws *api, struct readbuf *rb)
-{
- struct api_ws_event *ev;
- struct json_object *object;
-
- if (api_ws_client_msg_event_get(api, rb, &ev) && api_ws_read_object(rb, &object))
- afb_event_push(ev->event, object);
- else
- ERROR("unreadable push event");
-}
-
-static void api_ws_client_reply_success(struct api_ws *api, struct readbuf *rb)
-{
- struct api_ws_memo *memo;
- struct json_object *object;
- const char *info;
- uint32_t flags;
-
- /* retrieve the message data */
- if (!api_ws_client_msg_memo_get(api, rb, &memo))
- return;
-
- if (api_ws_read_uint32(rb, &flags)
- && api_ws_read_string(rb, &info, NULL)
- && api_ws_read_object(rb, &object)) {
- memo->xreq->context.flags = (unsigned)flags;
- afb_xreq_success(memo->xreq, object, *info ? info : NULL);
- } else {
- /* failing to have the answer */
- afb_xreq_fail(memo->xreq, "error", "ws error");
- }
- api_ws_client_memo_destroy(memo);
-}
-
-static void api_ws_client_reply_fail(struct api_ws *api, struct readbuf *rb)
-{
- struct api_ws_memo *memo;
- const char *info, *status;
- uint32_t flags;
-
- /* retrieve the message data */
- if (!api_ws_client_msg_memo_get(api, rb, &memo))
- return;
-
- if (api_ws_read_uint32(rb, &flags)
- && api_ws_read_string(rb, &status, NULL)
- && api_ws_read_string(rb, &info, NULL)) {
- memo->xreq->context.flags = (unsigned)flags;
- afb_xreq_fail(memo->xreq, status, *info ? info : NULL);
- } else {
- /* failing to have the answer */
- afb_xreq_fail(memo->xreq, "error", "ws error");
- }
- api_ws_client_memo_destroy(memo);
-}
-
-/* send a subcall reply */
-static void api_ws_client_send_subcall_reply(struct api_ws_reply *reply, int iserror, json_object *object)
-{
- int rc;
- struct writebuf wb = { .count = 0 };
- char ie = (char)!!iserror;
-
- if (!api_ws_write_char(&wb, CHAR_FOR_SUBCALL_REPLY)
- || !api_ws_write_uint32(&wb, reply->subcallid)
- || !api_ws_write_char(&wb, ie)
- || !api_ws_write_object(&wb, object)) {
- /* write error ? */
- return;
- }
-
- rc = afb_ws_binary_v(reply->apiws->client.ws, wb.iovec, wb.count);
- if (rc >= 0)
- return;
- ERROR("error while sending subcall reply");
-}
-
-/* callback for subcall reply */
-static void api_ws_client_subcall_reply_cb(void *closure, int iserror, json_object *object)
-{
- api_ws_client_send_subcall_reply(closure, iserror, object);
- free(closure);
-}
-
-/* received a subcall request */
-static void api_ws_client_subcall(struct api_ws *apiws, struct readbuf *rb)
-{
- struct api_ws_reply *reply;
- struct api_ws_memo *memo;
- const char *api, *verb;
- uint32_t subcallid;
- struct json_object *object;
-
- reply = malloc(sizeof *reply);
- if (!reply)
- return;
-
- /* retrieve the message data */
- if (!api_ws_client_msg_memo_get(apiws, rb, &memo))
- return;
-
- if (api_ws_read_uint32(rb, &subcallid)
- && api_ws_read_string(rb, &api, NULL)
- && api_ws_read_string(rb, &verb, NULL)
- && api_ws_read_object(rb, &object)) {
- reply->apiws = apiws;
- reply->subcallid = subcallid;
- afb_xreq_subcall(memo->xreq, api, verb, object, api_ws_client_subcall_reply_cb, reply);
- }
-}
-
-/* callback when receiving binary data */
-static void api_ws_client_on_binary(void *closure, char *data, size_t size)
-{
- if (size > 0) {
- struct api_ws *apiws = closure;
- struct readbuf rb = { .head = data, .end = data + size };
-
- pthread_mutex_lock(&apiws->mutex);
- switch (*rb.head++) {
- case CHAR_FOR_ANSWER_SUCCESS: /* success */
- api_ws_client_reply_success(apiws, &rb);
- break;
- case CHAR_FOR_ANSWER_FAIL: /* fail */
- api_ws_client_reply_fail(apiws, &rb);
- break;
- case CHAR_FOR_EVT_BROADCAST: /* broadcast */
- api_ws_client_event_broadcast(apiws, &rb);
- break;
- case CHAR_FOR_EVT_ADD: /* creates the event */
- api_ws_client_event_create(apiws, &rb);
- break;
- case CHAR_FOR_EVT_DEL: /* drops the event */
- api_ws_client_event_drop(apiws, &rb);
- break;
- case CHAR_FOR_EVT_PUSH: /* pushs the event */
- api_ws_client_event_push(apiws, &rb);
- break;
- case CHAR_FOR_EVT_SUBSCRIBE: /* subscribe event for a request */
- api_ws_client_event_subscribe(apiws, &rb);
- break;
- case CHAR_FOR_EVT_UNSUBSCRIBE: /* unsubscribe event for a request */
- api_ws_client_event_unsubscribe(apiws, &rb);
- break;
- case CHAR_FOR_SUBCALL_CALL: /* subcall */
- api_ws_client_subcall(apiws, &rb);
- break;
- default: /* unexpected message */
- /* TODO: close the connection */
- break;
- }
- pthread_mutex_unlock(&apiws->mutex);
- }
- free(data);
-}
-
-/* on call, propagate it to the ws service */
-static void api_ws_client_call_cb(void * closure, struct afb_xreq *xreq)
-{
- int rc;
- struct api_ws_memo *memo;
- struct writebuf wb = { .count = 0 };
- const char *raw;
- size_t szraw;
- struct api_ws *apiws = closure;
-
- pthread_mutex_lock(&apiws->mutex);
-
- /* create the recording data */
- memo = api_ws_client_memo_make(apiws, xreq);
- if (memo == NULL) {
- afb_xreq_fail_f(xreq, "error", "out of memory");
- goto end;
- }
-
- /* creates the call message */
- raw = afb_xreq_raw(xreq, &szraw);
- if (raw == NULL)
- goto internal_error;
- if (!api_ws_write_char(&wb, CHAR_FOR_CALL)
- || !api_ws_write_uint32(&wb, memo->msgid)
- || !api_ws_write_uint32(&wb, (uint32_t)xreq->context.flags)
- || !api_ws_write_string(&wb, xreq->verb)
- || !api_ws_write_string(&wb, afb_session_uuid(xreq->context.session))
- || !api_ws_write_string_length(&wb, raw, szraw))
- goto overflow;
-
- /* send */
- rc = afb_ws_binary_v(apiws->client.ws, wb.iovec, wb.count);
- if (rc >= 0)
- goto end;
-
- afb_xreq_fail(xreq, "error", "websocket sending error");
- goto clean_memo;
-
-internal_error:
- afb_xreq_fail(xreq, "error", "internal: raw is NULL!");
- goto clean_memo;
-
-overflow:
- afb_xreq_fail(xreq, "error", "overflow: size doesn't match 32 bits!");
-
-clean_memo:
- api_ws_client_memo_destroy(memo);
-end:
- pthread_mutex_unlock(&apiws->mutex);
-}
-
-/* */
-static void api_ws_client_disconnect(struct api_ws *api)
-{
- if (api->fd >= 0) {
- afb_ws_destroy(api->client.ws);
- api->client.ws = NULL;
- close(api->fd);
- api->fd = -1;
- }
-}
-
-/* */
-static int api_ws_client_connect(struct api_ws *api)
-{
- struct afb_ws *ws;
- int fd;
-
- fd = api_ws_socket(api->path, 0);
- if (fd >= 0) {
- ws = afb_ws_create(afb_common_get_event_loop(), fd, &api_ws_client_ws_itf, api);
- if (ws != NULL) {
- api->client.ws = ws;
- api->fd = fd;
- return 0;
- }
- close(fd);
- }
- return -1;
-}
-
-static struct afb_api_itf ws_api_itf = {
- .call = api_ws_client_call_cb
-};
-
-/* adds a afb-ws-service client api */
-int afb_api_ws_add_client(const char *path, struct afb_apiset *apiset)
-{
- int rc;
- struct api_ws *api;
- struct afb_api afb_api;
-
- /* create the ws client api */
- api = api_ws_make(path);
- if (api == NULL)
- goto error;
-
- /* connect to the service */
- rc = api_ws_client_connect(api);
- if (rc < 0) {
- ERROR("can't connect to ws service %s", api->path);
- goto error2;
- }
-
- /* record it as an API */
- afb_api.closure = api;
- afb_api.itf = &ws_api_itf;
- if (afb_apiset_add(apiset, api->api, afb_api) < 0)
- goto error3;
-
- return 0;
-
-error3:
- api_ws_client_disconnect(api);
-error2:
- free(api);