#include <json-c/json.h>
#include <systemd/sd-event.h>
-#include <afb/afb-req-itf.h>
+#include <afb/afb-event-itf.h>
#include "afb-common.h"
#include "afb-evt.h"
#include "afb-xreq.h"
#include "verbose.h"
+#include "jobs.h"
-struct client_call;
-struct client_event;
struct afb_stub_ws;
/************** constants for protocol definition *************************/
#define CHAR_FOR_EVT_UNSUBSCRIBE 'U'
#define CHAR_FOR_SUBCALL_CALL 'B'
#define CHAR_FOR_SUBCALL_REPLY 'R'
+#define CHAR_FOR_DESCRIBE 'D'
+#define CHAR_FOR_DESCRIPTION 'd'
/******************* handling subcalls *****************************/
int refcount;
};
+/*
+ * structure for recording describe requests
+ */
+struct client_describe
+{
+ struct client_describe *next;
+ struct afb_stub_ws *stubws;
+ struct jobloop *jobloop;
+ struct json_object *result;
+ uint32_t descid;
+};
+
+/*
+ * structure for jobs of describing
+ */
+struct server_describe
+{
+ struct afb_stub_ws *stubws;
+ uint32_t descid;
+};
+
/******************* client description part for server *****************************/
struct afb_stub_ws
/* pending subcalls (server side) */
struct server_subcall *subcalls;
+ /* pending description (client side) */
+ struct client_describe *describes;
+
/* apiset */
struct afb_apiset *apiset;
sc = malloc(sizeof *sc);
if (!sc) {
-
+ callback(cb_closure, 1, afb_msg_json_internal_error());
} else {
sc->callback = callback;
sc->closure = cb_closure;
}
/* send a subcall reply */
-static void client_send_subcall_reply(struct client_subcall *subcall, int iserror, json_object *object)
+static void client_send_subcall_reply(struct client_subcall *subcall, int status, json_object *object)
{
int rc;
struct writebuf wb = { .count = 0 };
- char ie = (char)!!iserror;
+ char ie = status < 0;
if (!writebuf_char(&wb, CHAR_FOR_SUBCALL_REPLY)
|| !writebuf_uint32(&wb, subcall->subcallid)
}
/* callback for subcall reply */
-static void client_subcall_reply_cb(void *closure, int iserror, json_object *object)
+static void client_subcall_reply_cb(void *closure, int status, json_object *object)
{
- client_send_subcall_reply(closure, iserror, object);
+ client_send_subcall_reply(closure, status, object);
free(closure);
}
}
}
+/* pushs an event */
+static void client_on_description(struct afb_stub_ws *stubws, struct readbuf *rb)
+{
+ uint32_t descid;
+ struct client_describe *desc;
+ struct json_object *object;
+
+ if (!readbuf_uint32(rb, &descid))
+ ERROR("unreadable description");
+ else {
+ desc = stubws->describes;
+ while (desc && desc->descid != descid)
+ desc = desc->next;
+ if (desc == NULL)
+ ERROR("unexpected description");
+ else {
+ if (readbuf_object(rb, &object))
+ desc->result = object;
+ else
+ ERROR("bad description");
+ jobs_leave(desc->jobloop);
+ }
+ }
+}
+
/* callback when receiving binary data */
static void client_on_binary(void *closure, char *data, size_t size)
{
case CHAR_FOR_SUBCALL_CALL: /* subcall */
client_subcall(stubws, &rb);
break;
+ case CHAR_FOR_DESCRIPTION: /* description */
+ client_on_description(stubws, &rb);
+ break;
default: /* unexpected message */
/* TODO: close the connection */
break;
pthread_mutex_unlock(&stubws->mutex);
}
+static void client_send_describe_cb(int signum, void *closure, struct jobloop *jobloop)
+{
+ struct client_describe *desc = closure;
+ struct writebuf wb = { .count = 0 };
+
+ if (!signum) {
+ /* record the jobloop */
+ desc->jobloop = jobloop;
+
+ /* send */
+ if (writebuf_char(&wb, CHAR_FOR_DESCRIBE)
+ && writebuf_uint32(&wb, desc->descid)
+ && afb_ws_binary_v(desc->stubws->ws, wb.iovec, wb.count) >= 0)
+ return;
+ }
+ jobs_leave(jobloop);
+}
+
+/* get the description */
+static struct json_object *client_describe_cb(void * closure)
+{
+ struct client_describe desc, *d;
+ struct afb_stub_ws *stubws = closure;
+
+ /* fill in stack the description of the task */
+ pthread_mutex_lock(&stubws->mutex);
+ desc.result = NULL;
+ desc.descid = ptr2id(&desc);
+ d = stubws->describes;
+ while (d) {
+ if (d->descid != desc.descid)
+ d = d->next;
+ else {
+ desc.descid++;
+ d = stubws->describes;
+ }
+ }
+ desc.stubws = stubws;
+ desc.next = stubws->describes;
+ stubws->describes = &desc;
+ pthread_mutex_unlock(&stubws->mutex);
+
+ /* synchronous job: send the request and wait its result */
+ jobs_enter(NULL, 0, client_send_describe_cb, &desc);
+
+ /* unlink and send the result */
+ pthread_mutex_lock(&stubws->mutex);
+ d = stubws->describes;
+ if (d == &desc)
+ stubws->describes = desc.next;
+ else {
+ while (d) {
+ if (d->next != &desc)
+ d = d->next;
+ else {
+ d->next = desc.next;
+ d = NULL;
+ }
+ }
+ }
+ pthread_mutex_unlock(&stubws->mutex);
+ return desc.result;
+}
+
/******************* client description part for server *****************************/
/* on call, propagate it to the ws service */
/* on subcall reply */
static void server_on_subcall_reply(struct afb_stub_ws *stubws, struct readbuf *rb)
{
- char iserror;
+ char ie;
uint32_t subcallid;
struct json_object *object;
struct server_subcall *sc, **psc;
/* reads the call message data */
if (!readbuf_uint32(rb, &subcallid)
- || !readbuf_char(rb, &iserror)
+ || !readbuf_char(rb, &ie)
|| !readbuf_object(rb, &object)) {
/* TODO bad protocol */
return;
} else {
*psc = sc->next;
pthread_mutex_unlock(&stubws->mutex);
- sc->callback(sc->closure, (int)iserror, object);
+ sc->callback(sc->closure, -(int)ie, object);
free(sc);
}
json_object_put(object);
}
+static void server_send_description(struct afb_stub_ws *stubws, uint32_t descid, struct json_object *descobj)
+{
+ struct writebuf wb = { .count = 0 };
+
+ if (!writebuf_char(&wb, CHAR_FOR_DESCRIPTION)
+ || !writebuf_uint32(&wb, descid)
+ || !writebuf_object(&wb, descobj)
+ || afb_ws_binary_v(stubws->ws, wb.iovec, wb.count) < 0)
+ ERROR("can't send description");
+}
+
+static void server_describe_job(int signum, void *closure)
+{
+ struct afb_api api;
+ struct json_object *obj;
+ struct server_describe *desc = closure;
+
+ /* get the description if possible */
+ obj = NULL;
+ if (!signum
+ && !afb_apiset_get(desc->stubws->apiset, desc->stubws->apiname, &api)
+ && api.itf->describe) {
+ obj = api.itf->describe(api.closure);
+ }
+
+ /* send it */
+ server_send_description(desc->stubws, desc->descid, obj);
+ json_object_put(obj);
+ afb_stub_ws_unref(desc->stubws);
+ free(desc);
+}
+
+/* on describe, propagate it to the ws service */
+static void server_on_describe(struct afb_stub_ws *stubws, struct readbuf *rb)
+{
+
+ uint32_t descid;
+ struct server_describe *desc;
+
+ /* reads the descid */
+ if (readbuf_uint32(rb, &descid)) {
+ /* create asynchronous job */
+ desc = malloc(sizeof *desc);
+ if (desc) {
+ desc->descid = descid;
+ desc->stubws = stubws;
+ afb_stub_ws_addref(stubws);
+ if (jobs_queue(NULL, 0, server_describe_job, desc) < 0)
+ server_describe_job(0, desc);
+ return;
+ }
+ server_send_description(stubws, descid, NULL);
+ }
+ ERROR("can't provide description");
+}
+
/* callback when receiving binary data */
static void server_on_binary(void *closure, char *data, size_t size)
{
case CHAR_FOR_SUBCALL_REPLY:
server_on_subcall_reply(closure, &rb);
break;
+ case CHAR_FOR_DESCRIBE:
+ server_on_describe(closure, &rb);
+ break;
default: /* unexpected message */
/* TODO: close the connection */
break;
};
static struct afb_api_itf ws_api_itf = {
- .call = client_call_cb
+ .call = client_call_cb,
+ .describe = client_describe_cb
};
/*****************************************************/
__atomic_add_fetch(&stubws->refcount, 1, __ATOMIC_RELAXED);
}
-/*
-static void client_disconnect(struct afb_stub_ws *stubws)
-{
- if (stubws->fd >= 0) {
- afb_ws_destroy(stubws->ws);
- stubws->ws = NULL;
- close(stubws->fd);
- stubws->fd = -1;
- }
-}
-*/
-