#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
+#include <pthread.h>
#include <json-c/json.h>
#include <systemd/sd-event.h>
#include "afb-common.h"
#include "afb-session.h"
+#include "afb-cred.h"
#include "afb-ws.h"
#include "afb-msg-json.h"
-#include "afb-apis.h"
+#include "afb-api.h"
+#include "afb-apiset.h"
#include "afb-api-so.h"
#include "afb-context.h"
#include "afb-evt.h"
-#include "afb-subcall.h"
+#include "afb-xreq.h"
#include "verbose.h"
#include "sd-fds.h"
struct api_ws_event;
struct api_ws_client;
-
+#define CHAR_FOR_CALL 'C'
+#define CHAR_FOR_ANSWER_SUCCESS 'T'
+#define CHAR_FOR_ANSWER_FAIL 'F'
+#define CHAR_FOR_EVT_BROADCAST '*'
+#define CHAR_FOR_EVT_ADD '+'
+#define CHAR_FOR_EVT_DEL '-'
+#define CHAR_FOR_EVT_PUSH '!'
+#define CHAR_FOR_EVT_SUBSCRIBE 'S'
+#define CHAR_FOR_EVT_UNSUBSCRIBE 'U'
+#define CHAR_FOR_SUBCALL_CALL 'B'
+#define CHAR_FOR_SUBCALL_REPLY 'R'
/*
*/
char *path; /* path of the object for the API */
char *api; /* api name of the interface */
int fd; /* file descriptor */
+ pthread_mutex_t mutex; /**< resource control */
union {
struct {
- uint32_t id;
struct afb_ws *ws;
struct api_ws_event *events;
struct api_ws_memo *memos;
} client;
struct {
- sd_event_source *listensrc;
- struct afb_evt_listener *listener; /* listener for broadcasted events */
+ sd_event_source *listensrc; /**< systemd source for server socket */
+ struct afb_apiset *apiset;
} server;
};
};
#define RETERR 2
#define RETRAW 3
+/******************* common usefull tools **********************************/
+
+/**
+ * translate a pointer to some integer
+ * @param ptr the pointer to translate
+ * @return an integer
+ */
+static inline uint32_t ptr2id(void *ptr)
+{
+ return (uint32_t)(((intptr_t)ptr) >> 6);
+}
+
/******************* websocket interface for client part **********************************/
static void api_ws_client_on_binary(void *closure, char *data, size_t size);
.remove = api_ws_server_event_remove
};
+/******************* handling subcalls *****************************/
+
+/**
+ * Structure on server side for recording pending
+ * subcalls.
+ */
+struct api_ws_subcall
+{
+ struct api_ws_subcall *next; /**< next subcall for the client */
+ uint32_t subcallid; /**< the subcallid */
+ void (*callback)(void*, int, struct json_object*); /**< callback on completion */
+ void *closure; /**< closure of the callback */
+};
+
+/**
+ * Structure for sending back replies on client side
+ */
+struct api_ws_reply
+{
+ struct api_ws *apiws; /**< api descriptor */
+ uint32_t subcallid; /**< subcallid for the reply */
+};
+
/******************* client description part for server *****************************/
struct api_ws_client
/* count of references */
int refcount;
- /* listener for events */
- struct afb_evt_listener *listener;
-
/* file descriptor */
int fd;
+ /* resource control */
+ pthread_mutex_t mutex;
+
+ /* listener for events */
+ struct afb_evt_listener *listener;
+
/* websocket */
struct afb_ws *ws;
+
+ /* credentials */
+ struct afb_cred *cred;
+
+ /* pending subcalls */
+ struct api_ws_subcall *subcalls;
+
+ /* apiset */
+ struct afb_apiset *apiset;
};
/******************* websocket interface for client part **********************************/
* structure for a ws request
*/
struct api_ws_server_req {
- struct afb_context context; /* the context, should be THE FIRST */
+ struct afb_xreq xreq; /* the xreq */
struct api_ws_client *client; /* the client of the request */
- char *rcvdata; /* the received data to free */
- struct json_object *json; /* the readen request as object */
- const char *request; /* the readen request as string */
- size_t lenreq; /* the length of the request */
- int refcount; /* reference count of the request */
uint32_t msgid; /* the incoming request msgid */
};
-static struct json_object *api_ws_server_req_json(struct api_ws_server_req *wreq);
-static void api_ws_server_req_unref(struct api_ws_server_req *wreq);
-
-static struct json_object *api_ws_server_req_json_cb(void *closure);
-static struct afb_arg api_ws_server_req_get_cb(void *closure, const char *name);
-static void api_ws_server_req_success_cb(void *closure, struct json_object *obj, const char *info);
-static void api_ws_server_req_fail_cb(void *closure, const char *status, const char *info);
-static const char *api_ws_server_req_raw_cb(void *closure, size_t *size);
-static void api_ws_server_req_send_cb(void *closure, const char *buffer, size_t size);
-static void api_ws_server_req_addref_cb(void *closure);
-static void api_ws_server_req_unref_cb(void *closure);
-static int api_ws_server_req_subscribe_cb(void *closure, struct afb_event event);
-static int api_ws_server_req_unsubscribe_cb(void *closure, struct afb_event event);
-static void api_ws_server_req_subcall_cb(void *closure, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *cb_closure);
-
-const struct afb_req_itf afb_api_ws_req_itf = {
- .json = api_ws_server_req_json_cb,
- .get = api_ws_server_req_get_cb,
+static void api_ws_server_req_success_cb(struct afb_xreq *xreq, struct json_object *obj, const char *info);
+static void api_ws_server_req_fail_cb(struct afb_xreq *xreq, const char *status, const char *info);
+static void api_ws_server_req_destroy_cb(struct afb_xreq *xreq);
+static int api_ws_server_req_subscribe_cb(struct afb_xreq *xreq, struct afb_event event);
+static int api_ws_server_req_unsubscribe_cb(struct afb_xreq *xreq, struct afb_event event);
+static void api_ws_server_req_subcall_cb(struct afb_xreq *xreq, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *cb_closure);
+
+const struct afb_xreq_query_itf afb_api_ws_xreq_itf = {
.success = api_ws_server_req_success_cb,
.fail = api_ws_server_req_fail_cb,
- .raw = api_ws_server_req_raw_cb,
- .send = api_ws_server_req_send_cb,
- .context_get = (void*)afb_context_get,
- .context_set = (void*)afb_context_set,
- .addref = api_ws_server_req_addref_cb,
- .unref = api_ws_server_req_unref_cb,
- .session_close = (void*)afb_context_close,
- .session_set_LOA = (void*)afb_context_change_loa,
+ .unref = api_ws_server_req_destroy_cb,
+ .subcall = api_ws_server_req_subcall_cb,
.subscribe = api_ws_server_req_subscribe_cb,
- .unsubscribe = api_ws_server_req_unsubscribe_cb,
- .subcall = api_ws_server_req_subcall_cb
+ .unsubscribe = api_ws_server_req_unsubscribe_cb
};
/******************* common part **********************************/
while (length && path[length - 1] != '/' && path[length - 1] != ':')
length = length - 1;
api->api = &api->path[length];
- if (api->api == NULL || !afb_apis_is_valid_api_name(++api->api)) {
+ if (api->api == NULL || !afb_api_is_valid_name(api->api)) {
errno = EINVAL;
goto error2;
}
+ pthread_mutex_init(&api->mutex, NULL);
api->fd = -1;
return api;
return before;
}
+static int api_ws_read_char(struct readbuf *rb, char *value)
+{
+ if (rb->head >= rb->end)
+ return 0;
+ *value = *rb->head++;
+ return 1;
+}
+
static int api_ws_read_uint32(struct readbuf *rb, uint32_t *value)
{
char *after = rb->head + sizeof *value;
static int api_ws_read_object(struct readbuf *rb, struct json_object **object)
{
- size_t length;
const char *string;
- return api_ws_read_string(rb, &string, &length) && ((*object = json_tokener_parse(string)) != NULL) == (strcmp(string, "null") != 0);
+ struct json_object *o;
+ int rc = api_ws_read_string(rb, &string, NULL);
+ if (rc) {
+ o = json_tokener_parse(string);
+ if (o == NULL && strcmp(string, "null"))
+ o = json_object_new_string(string);
+ *object = o;
+ }
+ return rc;
}
static int api_ws_write_put(struct writebuf *wb, const void *value, size_t length)
return string != NULL && api_ws_write_string(wb, string);
}
-
-
-
/******************* client part **********************************/
/*
* structure for recording query data
*/
struct api_ws_memo {
- struct api_ws_memo *next; /* the next memo */
+ struct api_ws_memo *next; /* the next memo */
struct api_ws *api; /* the ws api */
- struct afb_req req; /* the request handle */
- struct afb_context *context; /* the context of the query */
+ struct afb_xreq *xreq; /* the request handle */
uint32_t msgid; /* the message identifier */
};
/* allocates and init the memorizing data */
-static struct api_ws_memo *api_ws_client_memo_make(struct api_ws *api, struct afb_req req, struct afb_context *context)
+static struct api_ws_memo *api_ws_client_memo_make(struct api_ws *api, struct afb_xreq *xreq)
{
struct api_ws_memo *memo;
memo = malloc(sizeof *memo);
if (memo != NULL) {
- afb_req_addref(req);
- memo->req = req;
- memo->context = context;
- do { memo->msgid = ++api->client.id; } while(api_ws_client_memo_search(api, memo->msgid) != NULL);
+ afb_xreq_addref(xreq);
+ memo->xreq = xreq;
+ memo->msgid = ptr2id(memo);
+ while(api_ws_client_memo_search(api, memo->msgid) != NULL)
+ memo->msgid++;
memo->api = api;
memo->next = api->client.memos;
api->client.memos = memo;
prv = &(*prv)->next;
}
- afb_req_unref(memo->req);
+ afb_xreq_unref(memo->xreq);
free(memo);
}
}
/* read a subscrition message */
-static int api_ws_client_msg_subscription_get(struct api_ws *api, struct readbuf *rb, struct api_ws_event **ev, struct api_ws_memo **memo)
+static int api_ws_client_msg_subscription_get(struct api_ws *api, struct readbuf *rb, struct api_ws_memo **memo, struct api_ws_event **ev)
{
return api_ws_client_msg_memo_get(api, rb, memo) && api_ws_client_msg_event_get(api, rb, ev);
}
struct api_ws_event *ev;
struct api_ws_memo *memo;
- if (api_ws_client_msg_subscription_get(api, rb, &ev, &memo)) {
+ if (api_ws_client_msg_subscription_get(api, rb, &memo, &ev)) {
/* subscribe the request from the event */
- if (afb_req_subscribe(memo->req, ev->event) < 0)
+ if (afb_xreq_subscribe(memo->xreq, ev->event) < 0)
ERROR("can't subscribe: %m");
}
}
struct api_ws_event *ev;
struct api_ws_memo *memo;
- if (api_ws_client_msg_subscription_get(api, rb, &ev, &memo)) {
+ if (api_ws_client_msg_subscription_get(api, rb, &memo, &ev)) {
/* unsubscribe the request from the event */
- if (afb_req_unsubscribe(memo->req, ev->event) < 0)
+ if (afb_xreq_unsubscribe(memo->xreq, ev->event) < 0)
ERROR("can't unsubscribe: %m");
}
}
if (api_ws_read_uint32(rb, &flags)
&& api_ws_read_string(rb, &info, NULL)
&& api_ws_read_object(rb, &object)) {
- memo->context->flags = (unsigned)flags;
- afb_req_success(memo->req, object, *info ? info : NULL);
+ memo->xreq->context.flags = (unsigned)flags;
+ afb_xreq_success(memo->xreq, object, *info ? info : NULL);
} else {
/* failing to have the answer */
- afb_req_fail(memo->req, "error", "ws error");
+ afb_xreq_fail(memo->xreq, "error", "ws error");
}
api_ws_client_memo_destroy(memo);
}
if (api_ws_read_uint32(rb, &flags)
&& api_ws_read_string(rb, &status, NULL)
&& api_ws_read_string(rb, &info, NULL)) {
- memo->context->flags = (unsigned)flags;
- afb_req_fail(memo->req, status, *info ? info : NULL);
+ memo->xreq->context.flags = (unsigned)flags;
+ afb_xreq_fail(memo->xreq, status, *info ? info : NULL);
} else {
/* failing to have the answer */
- afb_req_fail(memo->req, "error", "ws error");
+ afb_xreq_fail(memo->xreq, "error", "ws error");
}
api_ws_client_memo_destroy(memo);
}
-static void api_ws_client_reply_send(struct api_ws *api, struct readbuf *rb)
+/* send a subcall reply */
+static void api_ws_client_send_subcall_reply(struct api_ws_reply *reply, int iserror, json_object *object)
{
+ int rc;
+ struct writebuf wb = { .count = 0 };
+ char ie = (char)!!iserror;
+
+ if (!api_ws_write_char(&wb, CHAR_FOR_SUBCALL_REPLY)
+ || !api_ws_write_uint32(&wb, reply->subcallid)
+ || !api_ws_write_char(&wb, ie)
+ || !api_ws_write_object(&wb, object)) {
+ /* write error ? */
+ return;
+ }
+
+ rc = afb_ws_binary_v(reply->apiws->client.ws, wb.iovec, wb.count);
+ if (rc >= 0)
+ return;
+ ERROR("error while sending subcall reply");
+}
+
+/* callback for subcall reply */
+static void api_ws_client_subcall_reply_cb(void *closure, int iserror, json_object *object)
+{
+ api_ws_client_send_subcall_reply(closure, iserror, object);
+ free(closure);
+}
+
+/* received a subcall request */
+static void api_ws_client_subcall(struct api_ws *apiws, struct readbuf *rb)
+{
+ struct api_ws_reply *reply;
struct api_ws_memo *memo;
- const char *data;
- size_t length;
- uint32_t flags;
+ const char *api, *verb;
+ uint32_t subcallid;
+ struct json_object *object;
+
+ reply = malloc(sizeof *reply);
+ if (!reply)
+ return;
/* retrieve the message data */
- if (!api_ws_client_msg_memo_get(api, rb, &memo))
+ if (!api_ws_client_msg_memo_get(apiws, rb, &memo))
return;
- if (api_ws_read_uint32(rb, &flags)
- && api_ws_read_string(rb, &data, &length)) {
- memo->context->flags = (unsigned)flags;
- afb_req_send(memo->req, data, length);
- } else {
- /* failing to have the answer */
- afb_req_fail(memo->req, "error", "ws error");
+ if (api_ws_read_uint32(rb, &subcallid)
+ && api_ws_read_string(rb, &api, NULL)
+ && api_ws_read_string(rb, &verb, NULL)
+ && api_ws_read_object(rb, &object)) {
+ reply->apiws = apiws;
+ reply->subcallid = subcallid;
+ afb_xreq_subcall(memo->xreq, api, verb, object, api_ws_client_subcall_reply_cb, reply);
}
- api_ws_client_memo_destroy(memo);
}
/* callback when receiving binary data */
static void api_ws_client_on_binary(void *closure, char *data, size_t size)
{
if (size > 0) {
+ struct api_ws *apiws = closure;
struct readbuf rb = { .head = data, .end = data + size };
+
+ pthread_mutex_lock(&apiws->mutex);
switch (*rb.head++) {
- case 'T': /* success */
- api_ws_client_reply_success(closure, &rb);
+ case CHAR_FOR_ANSWER_SUCCESS: /* success */
+ api_ws_client_reply_success(apiws, &rb);
break;
- case 'F': /* fail */
- api_ws_client_reply_fail(closure, &rb);
+ case CHAR_FOR_ANSWER_FAIL: /* fail */
+ api_ws_client_reply_fail(apiws, &rb);
break;
- case 'X': /* send */
- api_ws_client_reply_send(closure, &rb);
+ case CHAR_FOR_EVT_BROADCAST: /* broadcast */
+ api_ws_client_event_broadcast(apiws, &rb);
break;
- case '*': /* broadcast */
- api_ws_client_event_broadcast(closure, &rb);
+ case CHAR_FOR_EVT_ADD: /* creates the event */
+ api_ws_client_event_create(apiws, &rb);
break;
- case '+': /* creates the event */
- api_ws_client_event_create(closure, &rb);
+ case CHAR_FOR_EVT_DEL: /* drops the event */
+ api_ws_client_event_drop(apiws, &rb);
break;
- case '-': /* drops the event */
- api_ws_client_event_drop(closure, &rb);
+ case CHAR_FOR_EVT_PUSH: /* pushs the event */
+ api_ws_client_event_push(apiws, &rb);
break;
- case '!': /* pushs the event */
- api_ws_client_event_push(closure, &rb);
+ case CHAR_FOR_EVT_SUBSCRIBE: /* subscribe event for a request */
+ api_ws_client_event_subscribe(apiws, &rb);
break;
- case 'S': /* subscribe event for a request */
- api_ws_client_event_subscribe(closure, &rb);
+ case CHAR_FOR_EVT_UNSUBSCRIBE: /* unsubscribe event for a request */
+ api_ws_client_event_unsubscribe(apiws, &rb);
break;
- case 'U': /* unsubscribe event for a request */
- api_ws_client_event_unsubscribe(closure, &rb);
+ case CHAR_FOR_SUBCALL_CALL: /* subcall */
+ api_ws_client_subcall(apiws, &rb);
break;
default: /* unexpected message */
+ /* TODO: close the connection */
break;
}
+ pthread_mutex_unlock(&apiws->mutex);
}
free(data);
}
/* on call, propagate it to the ws service */
-static void api_ws_client_call_cb(void * closure, struct afb_req req, struct afb_context *context, const char *verb)
+static void api_ws_client_call_cb(void * closure, struct afb_xreq *xreq)
{
int rc;
struct api_ws_memo *memo;
struct writebuf wb = { .count = 0 };
const char *raw;
size_t szraw;
- struct api_ws *api = closure;
+ struct api_ws *apiws = closure;
+
+ pthread_mutex_lock(&apiws->mutex);
/* create the recording data */
- memo = api_ws_client_memo_make(api, req, context);
+ memo = api_ws_client_memo_make(apiws, xreq);
if (memo == NULL) {
- afb_req_fail(req, "error", "out of memory");
- return;
+ afb_xreq_fail_f(xreq, "error", "out of memory");
+ goto end;
}
/* creates the call message */
- raw = afb_req_raw(req, &szraw);
+ raw = afb_xreq_raw(xreq, &szraw);
if (raw == NULL)
goto internal_error;
- if (!api_ws_write_uint32(&wb, memo->msgid)
- || !api_ws_write_uint32(&wb, (uint32_t)context->flags)
- || !api_ws_write_string(&wb, verb)
- || !api_ws_write_string(&wb, afb_session_uuid(context->session))
+ if (!api_ws_write_char(&wb, CHAR_FOR_CALL)
+ || !api_ws_write_uint32(&wb, memo->msgid)
+ || !api_ws_write_uint32(&wb, (uint32_t)xreq->context.flags)
+ || !api_ws_write_string(&wb, xreq->verb)
+ || !api_ws_write_string(&wb, afb_session_uuid(xreq->context.session))
|| !api_ws_write_string_length(&wb, raw, szraw))
goto overflow;
/* send */
- rc = afb_ws_binary_v(api->client.ws, wb.iovec, wb.count);
- if (rc < 0)
- goto ws_send_error;
- return;
+ rc = afb_ws_binary_v(apiws->client.ws, wb.iovec, wb.count);
+ if (rc >= 0)
+ goto end;
-ws_send_error:
- afb_req_fail(req, "error", "websocket sending error");
+ afb_xreq_fail(xreq, "error", "websocket sending error");
goto clean_memo;
internal_error:
- afb_req_fail(req, "error", "internal: raw is NULL!");
+ afb_xreq_fail(xreq, "error", "internal: raw is NULL!");
goto clean_memo;
overflow:
- afb_req_fail(req, "error", "overflow: size doesn't match 32 bits!");
+ afb_xreq_fail(xreq, "error", "overflow: size doesn't match 32 bits!");
clean_memo:
api_ws_client_memo_destroy(memo);
-}
-
-static int api_ws_service_start_cb(void *closure, int share_session, int onneed)
-{
- struct api_ws *api = closure;
-
- /* not an error when onneed */
- if (onneed != 0)
- return 0;
-
- /* already started: it is an error */
- ERROR("The WS binding %s is not a startable service", api->path);
- return -1;
+end:
+ pthread_mutex_unlock(&apiws->mutex);
}
/* */
return -1;
}
+static struct afb_api_itf ws_api_itf = {
+ .call = api_ws_client_call_cb
+};
+
/* adds a afb-ws-service client api */
-int afb_api_ws_add_client(const char *path)
+int afb_api_ws_add_client(const char *path, struct afb_apiset *apiset)
{
int rc;
struct api_ws *api;
/* record it as an API */
afb_api.closure = api;
- afb_api.call = api_ws_client_call_cb;
- afb_api.service_start = api_ws_service_start_cb;
- if (afb_apis_add(api->api, afb_api) < 0)
+ afb_api.itf = &ws_api_itf;
+ if (afb_apiset_add(apiset, api->api, afb_api) < 0)
goto error3;
return 0;
static void api_ws_server_client_unref(struct api_ws_client *client)
{
- if (!--client->refcount) {
+ struct api_ws_subcall *sc, *nsc;
+
+ if (!__atomic_sub_fetch(&client->refcount, 1, __ATOMIC_RELAXED)) {
afb_evt_listener_unref(client->listener);
afb_ws_destroy(client->ws);
+ nsc = client->subcalls;
+ while (nsc) {
+ sc= nsc;
+ nsc = sc->next;
+ sc->callback(sc->closure, 1, NULL);
+ free(sc);
+ }
+ afb_cred_unref(client->cred);
+ afb_apiset_unref(client->apiset);
free(client);
}
}
+static void api_ws_server_client_addref(struct api_ws_client *client)
+{
+ __atomic_add_fetch(&client->refcount, 1, __ATOMIC_RELAXED);
+}
+
/* on call, propagate it to the ws service */
-static void api_ws_server_called(struct api_ws_client *client, struct readbuf *rb, char *data, size_t size)
+static void api_ws_server_on_call(struct api_ws_client *client, struct readbuf *rb)
{
struct api_ws_server_req *wreq;
- struct afb_req areq;
+ char *cverb;
const char *uuid, *verb;
- uint32_t flags;
+ uint32_t flags, msgid;
+ size_t lenverb;
+ struct json_object *object;
- client->refcount++;
+ api_ws_server_client_addref(client);
+
+ /* reads the call message data */
+ if (!api_ws_read_uint32(rb, &msgid)
+ || !api_ws_read_uint32(rb, &flags)
+ || !api_ws_read_string(rb, &verb, &lenverb)
+ || !api_ws_read_string(rb, &uuid, NULL)
+ || !api_ws_read_object(rb, &object))
+ goto overflow;
/* create the request */
- wreq = calloc(1 , sizeof *wreq);
+ wreq = malloc(++lenverb + sizeof *wreq);
if (wreq == NULL)
goto out_of_memory;
+ afb_xreq_init(&wreq->xreq, &afb_api_ws_xreq_itf);
wreq->client = client;
- wreq->rcvdata = data;
- wreq->refcount = 1;
-
- /* reads the call message data */
- if (!api_ws_read_uint32(rb, &wreq->msgid)
- || !api_ws_read_uint32(rb, &flags)
- || !api_ws_read_string(rb, &verb, NULL)
- || !api_ws_read_string(rb, &uuid, NULL)
- || !api_ws_read_string(rb, &wreq->request, &wreq->lenreq))
- goto overflow;
+ wreq->msgid = msgid;
+ cverb = (char*)&wreq[1];
+ memcpy(cverb, verb, lenverb);
/* init the context */
- if (afb_context_connect(&wreq->context, uuid, NULL) < 0)
- goto out_of_memory;
- wreq->context.flags = flags;
+ if (afb_context_connect(&wreq->xreq.context, uuid, NULL) < 0)
+ goto unconnected;
+ wreq->xreq.context.flags = flags;
/* makes the call */
- areq.itf = &afb_api_ws_req_itf;
- areq.closure = wreq;
- afb_apis_call(areq, &wreq->context, client->api, verb);
- api_ws_server_req_unref(wreq);
+ wreq->xreq.cred = afb_cred_addref(client->cred);
+ wreq->xreq.api = client->api;
+ wreq->xreq.verb = cverb;
+ wreq->xreq.json = object;
+ afb_xreq_process(&wreq->xreq, client->apiset);
return;
+unconnected:
+ free(wreq);
out_of_memory:
+ json_object_put(object);
overflow:
- free(wreq);
- free(data);
api_ws_server_client_unref(client);
}
+/* on subcall reply */
+static void api_ws_server_on_subcall_reply(struct api_ws_client *client, struct readbuf *rb)
+{
+ char iserror;
+ uint32_t subcallid;
+ struct json_object *object;
+ struct api_ws_subcall *sc, **psc;
+
+ /* reads the call message data */
+ if (!api_ws_read_uint32(rb, &subcallid)
+ || !api_ws_read_char(rb, &iserror)
+ || !api_ws_read_object(rb, &object)) {
+ /* TODO bad protocol */
+ return;
+ }
+
+ /* search the subcall and unlink it */
+ pthread_mutex_lock(&client->mutex);
+ psc = &client->subcalls;
+ while ((sc = *psc) && sc->subcallid != subcallid)
+ psc = &sc->next;
+ if (!sc) {
+ pthread_mutex_unlock(&client->mutex);
+ /* TODO subcall not found */
+ } else {
+ *psc = sc->next;
+ pthread_mutex_unlock(&client->mutex);
+ sc->callback(sc->closure, (int)iserror, object);
+ free(sc);
+ }
+ json_object_put(object);
+}
+
/* callback when receiving binary data */
static void api_ws_server_on_binary(void *closure, char *data, size_t size)
{
- struct readbuf rb = { .head = data, .end = data + size };
- api_ws_server_called(closure, &rb, data, size);
+ if (size > 0) {
+ struct readbuf rb = { .head = data, .end = data + size };
+ switch (*rb.head++) {
+ case CHAR_FOR_CALL:
+ api_ws_server_on_call(closure, &rb);
+ break;
+ case CHAR_FOR_SUBCALL_REPLY:
+ api_ws_server_on_subcall_reply(closure, &rb);
+ break;
+ default: /* unexpected message */
+ /* TODO: close the connection */
+ break;
+ }
+ }
+ free(data);
}
/* callback when receiving a hangup */
lenaddr = (socklen_t)sizeof addr;
client->fd = accept(api->fd, &addr, &lenaddr);
if (client->fd >= 0) {
+ client->cred = afb_cred_create_for_socket(client->fd);
fcntl(client->fd, F_SETFD, FD_CLOEXEC);
fcntl(client->fd, F_SETFL, O_NONBLOCK);
client->ws = afb_ws_create(afb_common_get_event_loop(), client->fd, &api_ws_server_ws_itf, client);
if (client->ws != NULL) {
client->api = api->api;
+ client->apiset = afb_apiset_addref(api->server.apiset);
client->refcount = 1;
+ client->subcalls = NULL;
return;
}
+ afb_cred_unref(client->cred);
close(client->fd);
}
afb_evt_listener_unref(client->listener);
static void api_ws_server_event_add(void *closure, const char *event, int eventid)
{
- api_ws_server_event_send(closure, '+', event, eventid, NULL);
+ api_ws_server_event_send(closure, CHAR_FOR_EVT_ADD, event, eventid, NULL);
}
static void api_ws_server_event_remove(void *closure, const char *event, int eventid)
{
- api_ws_server_event_send(closure, '-', event, eventid, NULL);
+ api_ws_server_event_send(closure, CHAR_FOR_EVT_DEL, event, eventid, NULL);
}
static void api_ws_server_event_push(void *closure, const char *event, int eventid, struct json_object *object)
{
const char *data = json_object_to_json_string_ext(object, JSON_C_TO_STRING_PLAIN);
- api_ws_server_event_send(closure, '!', event, eventid, data ? : "null");
+ api_ws_server_event_send(closure, CHAR_FOR_EVT_PUSH, event, eventid, data ? : "null");
json_object_put(object);
}
struct writebuf wb = { .count = 0 };
- if (api_ws_write_char(&wb, '*') && api_ws_write_string(&wb, event) && api_ws_write_object(&wb, object)) {
+ if (api_ws_write_char(&wb, CHAR_FOR_EVT_BROADCAST) && api_ws_write_string(&wb, event) && api_ws_write_object(&wb, object)) {
rc = afb_ws_binary_v(client->ws, wb.iovec, wb.count);
if (rc < 0)
ERROR("error while broadcasting event %s", event);
/******************* ws request part for server *****************/
-/* increment the reference count of the request */
-static void api_ws_server_req_addref_cb(void *closure)
-{
- struct api_ws_server_req *wreq = closure;
- wreq->refcount++;
-}
-
/* decrement the reference count of the request and free/release it on falling to null */
-static void api_ws_server_req_unref_cb(void *closure)
+static void api_ws_server_req_destroy_cb(struct afb_xreq *xreq)
{
- api_ws_server_req_unref(closure);
-}
+ struct api_ws_server_req *wreq = CONTAINER_OF_XREQ(struct api_ws_server_req, xreq);
-static void api_ws_server_req_unref(struct api_ws_server_req *wreq)
-{
- if (wreq == NULL || --wreq->refcount)
- return;
-
- afb_context_disconnect(&wreq->context);
- json_object_put(wreq->json);
- free(wreq->rcvdata);
+ afb_context_disconnect(&wreq->xreq.context);
+ afb_cred_unref(wreq->xreq.cred);
+ json_object_put(wreq->xreq.json);
api_ws_server_client_unref(wreq->client);
free(wreq);
}
-/* get the object of the request */
-static struct json_object *api_ws_server_req_json_cb(void *closure)
-{
- return api_ws_server_req_json(closure);
-}
-
-static struct json_object *api_ws_server_req_json(struct api_ws_server_req *wreq)
-{
- if (wreq->json == NULL) {
- wreq->json = json_tokener_parse(wreq->request);
- if (wreq->json == NULL && strcmp(wreq->request, "null")) {
- /* lazy error detection of json request. Is it to improve? */
- wreq->json = json_object_new_string(wreq->request);
- }
- }
- return wreq->json;
-}
-
-/* get the argument of the request of 'name' */
-static struct afb_arg api_ws_server_req_get_cb(void *closure, const char *name)
-{
- struct api_ws_server_req *wreq = closure;
- return afb_msg_json_get_arg(api_ws_server_req_json(wreq), name);
-}
-
-static void api_ws_server_req_success_cb(void *closure, struct json_object *obj, const char *info)
+static void api_ws_server_req_success_cb(struct afb_xreq *xreq, struct json_object *obj, const char *info)
{
int rc;
struct writebuf wb = { .count = 0 };
- struct api_ws_server_req *wreq = closure;
+ struct api_ws_server_req *wreq = CONTAINER_OF_XREQ(struct api_ws_server_req, xreq);
- if (api_ws_write_char(&wb, 'T')
+ if (api_ws_write_char(&wb, CHAR_FOR_ANSWER_SUCCESS)
&& api_ws_write_uint32(&wb, wreq->msgid)
- && api_ws_write_uint32(&wb, (uint32_t)wreq->context.flags)
+ && api_ws_write_uint32(&wb, (uint32_t)wreq->xreq.context.flags)
&& api_ws_write_string(&wb, info ? : "")
&& api_ws_write_object(&wb, obj)) {
rc = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count);
json_object_put(obj);
}
-static void api_ws_server_req_fail_cb(void *closure, const char *status, const char *info)
+static void api_ws_server_req_fail_cb(struct afb_xreq *xreq, const char *status, const char *info)
{
int rc;
struct writebuf wb = { .count = 0 };
- struct api_ws_server_req *wreq = closure;
+ struct api_ws_server_req *wreq = CONTAINER_OF_XREQ(struct api_ws_server_req, xreq);
- if (api_ws_write_char(&wb, 'F')
+ if (api_ws_write_char(&wb, CHAR_FOR_ANSWER_FAIL)
&& api_ws_write_uint32(&wb, wreq->msgid)
- && api_ws_write_uint32(&wb, (uint32_t)wreq->context.flags)
+ && api_ws_write_uint32(&wb, (uint32_t)wreq->xreq.context.flags)
&& api_ws_write_string(&wb, status)
&& api_ws_write_string(&wb, info ? : "")) {
rc = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count);
ERROR("error while sending fail");
}
-static const char *api_ws_server_req_raw_cb(void *closure, size_t *size)
+static void api_ws_server_req_subcall_cb(struct afb_xreq *xreq, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *cb_closure)
{
- struct api_ws_server_req *wreq = closure;
- if (size != NULL)
- *size = wreq->lenreq;
- return wreq->request;
-}
-
-static void api_ws_server_req_send_cb(void *closure, const char *buffer, size_t size)
-{
- /* TODO: how to put sized buffer as strings? things aren't clear here!!! */
int rc;
struct writebuf wb = { .count = 0 };
- struct api_ws_server_req *wreq = closure;
+ struct api_ws_subcall *sc, *osc;
+ struct api_ws_server_req *wreq = CONTAINER_OF_XREQ(struct api_ws_server_req, xreq);
+ struct api_ws_client *client = wreq->client;
- if (api_ws_write_char(&wb, 'X')
- && api_ws_write_uint32(&wb, wreq->msgid)
- && api_ws_write_uint32(&wb, (uint32_t)wreq->context.flags)
- && api_ws_write_string_length(&wb, buffer, size)) {
- rc = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count);
- if (rc >= 0)
- return;
+ sc = malloc(sizeof *sc);
+ if (!sc) {
+
+ } else {
+ sc->callback = callback;
+ sc->closure = cb_closure;
+
+ pthread_mutex_unlock(&client->mutex);
+ sc->subcallid = ptr2id(sc);
+ do {
+ sc->subcallid++;
+ osc = client->subcalls;
+ while(osc && osc->subcallid != sc->subcallid)
+ osc = osc->next;
+ } while (osc);
+ sc->next = client->subcalls;
+ client->subcalls = sc;
+ pthread_mutex_unlock(&client->mutex);
+
+ if (api_ws_write_char(&wb, CHAR_FOR_SUBCALL_CALL)
+ && api_ws_write_uint32(&wb, wreq->msgid)
+ && api_ws_write_uint32(&wb, sc->subcallid)
+ && api_ws_write_string(&wb, api)
+ && api_ws_write_string(&wb, verb)
+ && api_ws_write_object(&wb, args)) {
+ rc = afb_ws_binary_v(wreq->client->ws, wb.iovec, wb.count);
+ if (rc >= 0)
+ return;
+ }
+ ERROR("error while sending fail");
}
- ERROR("error while sending raw");
}
-static int api_ws_server_req_subscribe_cb(void *closure, struct afb_event event)
+static int api_ws_server_req_subscribe_cb(struct afb_xreq *xreq, struct afb_event event)
{
int rc, rc2;
struct writebuf wb = { .count = 0 };
- struct api_ws_server_req *wreq = closure;
+ struct api_ws_server_req *wreq = CONTAINER_OF_XREQ(struct api_ws_server_req, xreq);
rc = afb_evt_add_watch(wreq->client->listener, event);
if (rc < 0)
return rc;
- if (api_ws_write_char(&wb, 'S')
+ if (api_ws_write_char(&wb, CHAR_FOR_EVT_SUBSCRIBE)
&& api_ws_write_uint32(&wb, wreq->msgid)
&& api_ws_write_uint32(&wb, (uint32_t)afb_evt_event_id(event))
&& api_ws_write_string(&wb, afb_evt_event_name(event))) {
return rc;
}
-static int api_ws_server_req_unsubscribe_cb(void *closure, struct afb_event event)
+static int api_ws_server_req_unsubscribe_cb(struct afb_xreq *xreq, struct afb_event event)
{
int rc, rc2;
struct writebuf wb = { .count = 0 };
- struct api_ws_server_req *wreq = closure;
+ struct api_ws_server_req *wreq = CONTAINER_OF_XREQ(struct api_ws_server_req, xreq);
- if (api_ws_write_char(&wb, 'U')
+ if (api_ws_write_char(&wb, CHAR_FOR_EVT_UNSUBSCRIBE)
&& api_ws_write_uint32(&wb, wreq->msgid)
&& api_ws_write_uint32(&wb, (uint32_t)afb_evt_event_id(event))
&& api_ws_write_string(&wb, afb_evt_event_name(event))) {
return rc;
}
-static void api_ws_server_req_subcall_cb(void *closure, const char *api, const char *verb, struct json_object *args, void (*callback)(void*, int, struct json_object*), void *cb_closure)
-{
- struct api_ws_server_req *wreq = closure;
- afb_subcall(&wreq->context, api, verb, args, callback, cb_closure, (struct afb_req){ .itf = &afb_api_ws_req_itf, .closure = wreq });
-}
-
/******************* server part **********************************/
static int api_ws_server_connect(struct api_ws *api);
}
/* create the service */
-int afb_api_ws_add_server(const char *path)
+int afb_api_ws_add_server(const char *path, struct afb_apiset *apiset)
{
int rc;
struct api_ws *api;
if (rc < 0)
goto error2;
+ api->server.apiset = afb_apiset_addref(apiset);
return 0;
error2: