#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
+#include <pthread.h>
#include <json-c/json.h>
#include <systemd/sd-event.h>
#define CHAR_FOR_EVT_PUSH '!'
#define CHAR_FOR_EVT_SUBSCRIBE 'S'
#define CHAR_FOR_EVT_UNSUBSCRIBE 'U'
+#define CHAR_FOR_SUBCALL_CALL 'B'
+#define CHAR_FOR_SUBCALL_REPLY 'R'
/*
*/
char *path; /* path of the object for the API */
char *api; /* api name of the interface */
int fd; /* file descriptor */
+ pthread_mutex_t mutex; /**< resource control */
union {
struct {
uint32_t id;
struct api_ws_memo *memos;
} client;
struct {
- sd_event_source *listensrc;
- struct afb_evt_listener *listener; /* listener for broadcasted events */
+ sd_event_source *listensrc; /**< systemd source for server socket */
} server;
};
};
.remove = api_ws_server_event_remove
};
+/******************* handling subcalls *****************************/
+
+/**
+ * Structure on server side for recording pending
+ * subcalls.
+ */
+struct api_ws_subcall
+{
+ struct api_ws_subcall *next; /**< next subcall for the client */
+ uint32_t subcallid; /**< the subcallid */
+ void (*callback)(void*, int, struct json_object*); /**< callback on completion */
+ void *closure; /**< closure of the callback */
+};
+
+/**
+ * Structure for sending back replies on client side
+ */
+struct api_ws_reply
+{
+ struct api_ws *apiws; /**< api descriptor */
+ uint32_t subcallid; /**< subcallid for the reply */
+};
+
/******************* client description part for server *****************************/
struct api_ws_client
/* count of references */
int refcount;
- /* listener for events */
- struct afb_evt_listener *listener;
-
/* file descriptor */
int fd;
+ /* resource control */
+ pthread_mutex_t mutex;
+
+ /* listener for events */
+ struct afb_evt_listener *listener;
+
/* websocket */
struct afb_ws *ws;
/* credentials */
struct afb_cred *cred;
+
+ /* pending subcalls */
+ struct api_ws_subcall *subcalls;
};
/******************* websocket interface for client part **********************************/
struct api_ws_server_req {
struct afb_xreq xreq; /* the xreq */
struct api_ws_client *client; /* the client of the request */
- const char *request; /* the readen request as string */
- size_t lenreq; /* the length of the request */
uint32_t msgid; /* the incoming request msgid */
};
static void api_ws_server_req_destroy_cb(struct afb_xreq *xreq);
static int api_ws_server_req_subscribe_cb(struct afb_xreq *xreq, struct afb_event event);
static int api_ws_server_req_unsubscribe_cb(struct afb_xreq *xreq, struct afb_event event);
+static void api_ws_server_req_subcall_cb(struct afb_xreq *xreq, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *cb_closure);
const struct afb_xreq_query_itf afb_api_ws_xreq_itf = {
.success = api_ws_server_req_success_cb,
.fail = api_ws_server_req_fail_cb,
.unref = api_ws_server_req_destroy_cb,
+ .subcall = api_ws_server_req_subcall_cb,
.subscribe = api_ws_server_req_subscribe_cb,
.unsubscribe = api_ws_server_req_unsubscribe_cb
};
errno = EINVAL;
goto error2;
}
+ pthread_mutex_init(&api->mutex, NULL);
api->fd = -1;
return api;
return before;
}
+static int api_ws_read_char(struct readbuf *rb, char *value)
+{
+ if (rb->head >= rb->end)
+ return 0;
+ *value = *rb->head++;
+ return 1;
+}
+
static int api_ws_read_uint32(struct readbuf *rb, uint32_t *value)
{
char *after = rb->head + sizeof *value;
static int api_ws_read_object(struct readbuf *rb, struct json_object **object)
{
- size_t length;
const char *string;
- return api_ws_read_string(rb, &string, &length) && ((*object = json_tokener_parse(string)) != NULL) == (strcmp(string, "null") != 0);
+ struct json_object *o;
+ int rc = api_ws_read_string(rb, &string, NULL);
+ if (rc) {
+ o = json_tokener_parse(string);
+ if (o == NULL && strcmp(string, "null"))
+ o = json_object_new_string(string);
+ *object = o;
+ }
+ return rc;
}
static int api_ws_write_put(struct writebuf *wb, const void *value, size_t length)
return string != NULL && api_ws_write_string(wb, string);
}
-
-
-
/******************* client part **********************************/
/*
* structure for recording query data
*/
struct api_ws_memo {
- struct api_ws_memo *next; /* the next memo */
+ struct api_ws_memo *next; /* the next memo */
struct api_ws *api; /* the ws api */
struct afb_xreq *xreq; /* the request handle */
-#if 0
- struct afb_req req; /* the request handle */
- struct afb_context *context; /* the context of the query */
-#endif
uint32_t msgid; /* the message identifier */
};
}
/* read a subscrition message */
-static int api_ws_client_msg_subscription_get(struct api_ws *api, struct readbuf *rb, struct api_ws_event **ev, struct api_ws_memo **memo)
+static int api_ws_client_msg_subscription_get(struct api_ws *api, struct readbuf *rb, struct api_ws_memo **memo, struct api_ws_event **ev)
{
return api_ws_client_msg_memo_get(api, rb, memo) && api_ws_client_msg_event_get(api, rb, ev);
}
struct api_ws_event *ev;
struct api_ws_memo *memo;
- if (api_ws_client_msg_subscription_get(api, rb, &ev, &memo)) {
+ if (api_ws_client_msg_subscription_get(api, rb, &memo, &ev)) {
/* subscribe the request from the event */
if (afb_xreq_subscribe(memo->xreq, ev->event) < 0)
ERROR("can't subscribe: %m");
struct api_ws_event *ev;
struct api_ws_memo *memo;
- if (api_ws_client_msg_subscription_get(api, rb, &ev, &memo)) {
+ if (api_ws_client_msg_subscription_get(api, rb, &memo, &ev)) {
/* unsubscribe the request from the event */
if (afb_xreq_unsubscribe(memo->xreq, ev->event) < 0)
ERROR("can't unsubscribe: %m");
api_ws_client_memo_destroy(memo);
}
+/* send a subcall reply */
+static void api_ws_client_send_subcall_reply(struct api_ws_reply *reply, int iserror, json_object *object)
+{
+ int rc;
+ struct writebuf wb = { .count = 0 };
+ char ie = (char)!!iserror;
+
+ if (!api_ws_write_char(&wb, CHAR_FOR_SUBCALL_REPLY)
+ || !api_ws_write_uint32(&wb, reply->subcallid)
+ || !api_ws_write_char(&wb, ie)
+ || !api_ws_write_object(&wb, object)) {
+ /* write error ? */
+ return;
+ }
+
+ rc = afb_ws_binary_v(reply->apiws->client.ws, wb.iovec, wb.count);
+ if (rc >= 0)
+ return;
+ ERROR("error while sending subcall reply");
+}
+
+/* callback for subcall reply */
+static void api_ws_client_subcall_reply_cb(void *closure, int iserror, json_object *object)
+{
+ api_ws_client_send_subcall_reply(closure, iserror, object);
+ free(closure);
+}
+
+/* received a subcall request */
+static void api_ws_client_subcall(struct api_ws *apiws, struct readbuf *rb)
+{
+ struct api_ws_reply *reply;
+ struct api_ws_memo *memo;
+ const char *api, *verb;
+ uint32_t subcallid;
+ struct json_object *object;
+
+ reply = malloc(sizeof *reply);
+ if (!reply)
+ return;
+
+ /* retrieve the message data */
+ if (!api_ws_client_msg_memo_get(apiws, rb, &memo))
+ return;
+
+ if (api_ws_read_uint32(rb, &subcallid)
+ && api_ws_read_string(rb, &api, NULL)
+ && api_ws_read_string(rb, &verb, NULL)
+ && api_ws_read_object(rb, &object)) {
+ reply->apiws = apiws;
+ reply->subcallid = subcallid;
+ afb_xreq_subcall(memo->xreq, api, verb, object, api_ws_client_subcall_reply_cb, reply);
+ }
+}
+
/* callback when receiving binary data */
static void api_ws_client_on_binary(void *closure, char *data, size_t size)
{
if (size > 0) {
+ struct api_ws *apiws = closure;
struct readbuf rb = { .head = data, .end = data + size };
+
+ pthread_mutex_lock(&apiws->mutex);
switch (*rb.head++) {
case CHAR_FOR_ANSWER_SUCCESS: /* success */
- api_ws_client_reply_success(closure, &rb);
+ api_ws_client_reply_success(apiws, &rb);
break;
case CHAR_FOR_ANSWER_FAIL: /* fail */
- api_ws_client_reply_fail(closure, &rb);
+ api_ws_client_reply_fail(apiws, &rb);
break;
case CHAR_FOR_EVT_BROADCAST: /* broadcast */
- api_ws_client_event_broadcast(closure, &rb);
+ api_ws_client_event_broadcast(apiws, &rb);
break;
case CHAR_FOR_EVT_ADD: /* creates the event */
- api_ws_client_event_create(closure, &rb);
+ api_ws_client_event_create(apiws, &rb);
break;
case CHAR_FOR_EVT_DEL: /* drops the event */
- api_ws_client_event_drop(closure, &rb);
+ api_ws_client_event_drop(apiws, &rb);
break;
case CHAR_FOR_EVT_PUSH: /* pushs the event */
- api_ws_client_event_push(closure, &rb);
+ api_ws_client_event_push(apiws, &rb);
break;
case CHAR_FOR_EVT_SUBSCRIBE: /* subscribe event for a request */
- api_ws_client_event_subscribe(closure, &rb);
+ api_ws_client_event_subscribe(apiws, &rb);
break;
case CHAR_FOR_EVT_UNSUBSCRIBE: /* unsubscribe event for a request */
- api_ws_client_event_unsubscribe(closure, &rb);
+ api_ws_client_event_unsubscribe(apiws, &rb);
+ break;
+ case CHAR_FOR_SUBCALL_CALL: /* subcall */
+ api_ws_client_subcall(apiws, &rb);
break;
default: /* unexpected message */
+ /* TODO: close the connection */
break;
}
+ pthread_mutex_unlock(&apiws->mutex);
}
free(data);
}
struct writebuf wb = { .count = 0 };
const char *raw;
size_t szraw;
- struct api_ws *api = closure;
+ struct api_ws *apiws = closure;
+
+ pthread_mutex_lock(&apiws->mutex);
/* create the recording data */
- memo = api_ws_client_memo_make(api, xreq);
+ memo = api_ws_client_memo_make(apiws, xreq);
if (memo == NULL) {
afb_xreq_fail_f(xreq, "error", "out of memory");
- return;
+ goto end;
}
/* creates the call message */
goto overflow;
/* send */
- rc = afb_ws_binary_v(api->client.ws, wb.iovec, wb.count);
- if (rc < 0)
- goto ws_send_error;
- return;
+ rc = afb_ws_binary_v(apiws->client.ws, wb.iovec, wb.count);
+ if (rc >= 0)
+ goto end;
-ws_send_error:
afb_xreq_fail(xreq, "error", "websocket sending error");
goto clean_memo;
clean_memo:
api_ws_client_memo_destroy(memo);
+end:
+ pthread_mutex_unlock(&apiws->mutex);
}
static int api_ws_service_start_cb(void *closure, int share_session, int onneed)
static void api_ws_server_client_unref(struct api_ws_client *client)
{
- if (!--client->refcount) {
+ struct api_ws_subcall *sc, *nsc;
+
+ if (!__atomic_sub_fetch(&client->refcount, 1, __ATOMIC_RELAXED)) {
afb_evt_listener_unref(client->listener);
afb_ws_destroy(client->ws);
+ nsc = client->subcalls;
+ while (nsc) {
+ sc= nsc;
+ nsc = sc->next;
+ sc->callback(sc->closure, 1, NULL);
+ free(sc);
+ }
afb_cred_unref(client->cred);
free(client);
}
}
+static void api_ws_server_client_addref(struct api_ws_client *client)
+{
+ __atomic_add_fetch(&client->refcount, 1, __ATOMIC_RELAXED);
+}
+
/* on call, propagate it to the ws service */
static void api_ws_server_on_call(struct api_ws_client *client, struct readbuf *rb)
{
struct api_ws_server_req *wreq;
+ char *cverb;
const char *uuid, *verb;
- uint32_t flags;
-
- client->refcount++;
-
- /* create the request */
- wreq = calloc(1 , sizeof *wreq);
- if (wreq == NULL)
- goto out_of_memory;
+ uint32_t flags, msgid;
+ size_t lenverb;
+ struct json_object *object;
- wreq->client = client;
+ api_ws_server_client_addref(client);
/* reads the call message data */
- if (!api_ws_read_uint32(rb, &wreq->msgid)
+ if (!api_ws_read_uint32(rb, &msgid)
|| !api_ws_read_uint32(rb, &flags)
- || !api_ws_read_string(rb, &verb, NULL)
+ || !api_ws_read_string(rb, &verb, &lenverb)
|| !api_ws_read_string(rb, &uuid, NULL)
- || !api_ws_read_string(rb, &wreq->request, &wreq->lenreq))
+ || !api_ws_read_object(rb, &object))
goto overflow;
+ /* create the request */
+ wreq = malloc(++lenverb + sizeof *wreq);
+ if (wreq == NULL)
+ goto out_of_memory;
+
afb_xreq_init(&wreq->xreq, &afb_api_ws_xreq_itf);
- wreq->xreq.json = json_tokener_parse(wreq->request);
- if (wreq->xreq.json == NULL && strcmp(wreq->request, "null")) {
- wreq->xreq.json = json_object_new_string(wreq->request);
- }
+ wreq->client = client;
+ wreq->msgid = msgid;
+ cverb = (char*)&wreq[1];
+ memcpy(cverb, verb, lenverb);
/* init the context */
if (afb_context_connect(&wreq->xreq.context, uuid, NULL) < 0)
- goto out_of_memory;
+ goto unconnected;
wreq->xreq.context.flags = flags;
/* makes the call */
wreq->xreq.cred = afb_cred_addref(client->cred);
wreq->xreq.api = client->api;
- wreq->xreq.verb = verb;
+ wreq->xreq.verb = cverb;
+ wreq->xreq.json = object;
afb_apis_call(&wreq->xreq);
afb_xreq_unref(&wreq->xreq);
return;
+unconnected:
+ free(wreq);
out_of_memory:
+ json_object_put(object);
overflow:
- free(wreq);
api_ws_server_client_unref(client);
}
+/* on subcall reply */
+static void api_ws_server_on_subcall_reply(struct api_ws_client *client, struct readbuf *rb)
+{
+ char iserror;
+ uint32_t subcallid;
+ struct json_object *object;
+ struct api_ws_subcall *sc, **psc;
+
+ /* reads the call message data */
+ if (!api_ws_read_uint32(rb, &subcallid)
+ || !api_ws_read_char(rb, &iserror)
+ || !api_ws_read_object(rb, &object)) {
+ /* TODO bad protocol */
+ return;
+ }
+
+ /* search the subcall and unlink it */
+ pthread_mutex_lock(&client->mutex);
+ psc = &client->subcalls;
+ while ((sc = *psc) && sc->subcallid != subcallid)
+ psc = &sc->next;
+ if (!sc) {
+ pthread_mutex_unlock(&client->mutex);
+ /* TODO subcall not found */
+ } else {
+ *psc = sc->next;
+ pthread_mutex_unlock(&client->mutex);
+ sc->callback(sc->closure, (int)iserror, object);
+ free(sc);
+ }
+ json_object_put(object);
+}
+
/* callback when receiving binary data */
static void api_ws_server_on_binary(void *closure, char *data, size_t size)
{
case CHAR_FOR_CALL:
api_ws_server_on_call(closure, &rb);
break;
+ case CHAR_FOR_SUBCALL_REPLY:
+ api_ws_server_on_subcall_reply(closure, &rb);
+ break;
default: /* unexpected message */
/* TODO: close the connection */
break;
if (client->ws != NULL) {
client->api = api->api;
client->refcount = 1;
+ client->subcalls = NULL;
return;
}
afb_cred_unref(client->cred);
ERROR("error while sending fail");
}
+static void api_ws_server_req_subcall_cb(struct afb_xreq *xreq, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *cb_closure)
+{
+ int rc;
+ struct writebuf wb = { .count = 0 };
+ struct api_ws_subcall *sc, *osc;
+ struct api_ws_server_req *wreq = CONTAINER_OF_XREQ(struct api_ws_server_req, xreq);
+ struct api_ws_client *client = wreq->client;
+
+ sc = malloc(sizeof *sc);
+ if (!sc) {
+
+ } else {
+ sc->callback = callback;
+ sc->closure = cb_closure;
+
+ pthread_mutex_unlock(&client->mutex);
+ sc->subcallid = (uint32_t)(((intptr_t)sc) >> 6);
+ do {
+ sc->subcallid++;
+ osc = client->subcalls;
+ while(osc && osc->subcallid != sc->subcallid)
+ osc = osc->next;
+ } while (osc);
+ sc->next = client->subcalls;
+ client->subcalls = sc;
+ pthread_mutex_unlock(&client->mutex);
+
+ if (api_ws_write_char(&wb, CHAR_FOR_SUBCALL_CALL)
+ && api_ws_write_uint32(&wb, wreq->msgid)
+ && api_ws_write_uint32(&wb, sc->subcallid)
+ && api_ws_write_string(&wb, api)
+ && api_ws_write_string(&wb, verb)
+ && api_ws_write_object(&wb, args)) {
+ rc = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count);
+ if (rc >= 0)
+ return;
+ }
+ ERROR("error while sending fail");
+ }
+}
+
static int api_ws_server_req_subscribe_cb(struct afb_xreq *xreq, struct afb_event event)
{
int rc, rc2;