afb-stub-ws: improvements
[src/app-framework-binder.git] / src / afb-stub-ws.c
index 8298fc7..ce68b44 100644 (file)
@@ -35,7 +35,7 @@
 #include <json-c/json.h>
 #include <systemd/sd-event.h>
 
-#include <afb/afb-req-itf.h>
+#include <afb/afb-event-itf.h>
 
 #include "afb-common.h"
 
@@ -50,9 +50,8 @@
 #include "afb-evt.h"
 #include "afb-xreq.h"
 #include "verbose.h"
+#include "jobs.h"
 
-struct client_call;
-struct client_event;
 struct afb_stub_ws;
 
 /************** constants for protocol definition *************************/
@@ -68,6 +67,8 @@ struct afb_stub_ws;
 #define CHAR_FOR_EVT_UNSUBSCRIBE  'U'
 #define CHAR_FOR_SUBCALL_CALL     'B'
 #define CHAR_FOR_SUBCALL_REPLY    'R'
+#define CHAR_FOR_DESCRIBE         'D'
+#define CHAR_FOR_DESCRIPTION      'd'
 
 /******************* handling subcalls *****************************/
 
@@ -122,7 +123,28 @@ struct client_event
        int refcount;
 };
 
-/******************* client description part for server *****************************/
+/*
+ * structure for recording describe requests
+ */
+struct client_describe
+{
+       struct client_describe *next;
+       struct afb_stub_ws *stubws;
+       struct jobloop *jobloop;
+       struct json_object *result;
+       uint32_t descid;
+};
+
+/*
+ * structure for jobs of describing
+ */
+struct server_describe
+{
+       struct afb_stub_ws *stubws;
+       uint32_t descid;
+};
+
+/******************* stub description for client or servers ******************/
 
 struct afb_stub_ws
 {
@@ -153,9 +175,15 @@ struct afb_stub_ws
        /* pending subcalls (server side) */
        struct server_subcall *subcalls;
 
+       /* pending description (client side) */
+       struct client_describe *describes;
+
        /* apiset */
        struct afb_apiset *apiset;
 
+       /* on hangup callback */
+       void (*on_hangup)(struct afb_stub_ws *);
+
        /* the api name */
        char apiname[1];
 };
@@ -354,7 +382,7 @@ static void server_req_subcall_cb(struct afb_xreq *xreq, const char *api, const
 
        sc = malloc(sizeof *sc);
        if (!sc) {
-
+               callback(cb_closure, 1, afb_msg_json_internal_error());
        } else {
                sc->callback = callback;
                sc->closure = cb_closure;
@@ -715,11 +743,11 @@ static void client_reply_fail(struct afb_stub_ws *stubws, struct readbuf *rb)
 }
 
 /* send a subcall reply */
-static void client_send_subcall_reply(struct client_subcall *subcall, int iserror, json_object *object)
+static void client_send_subcall_reply(struct client_subcall *subcall, int status, json_object *object)
 {
        int rc;
        struct writebuf wb = { .count = 0 };
-       char ie = (char)!!iserror;
+       char ie = status < 0;
 
        if (!writebuf_char(&wb, CHAR_FOR_SUBCALL_REPLY)
         || !writebuf_uint32(&wb, subcall->subcallid)
@@ -736,9 +764,9 @@ static void client_send_subcall_reply(struct client_subcall *subcall, int iserro
 }
 
 /* callback for subcall reply */
-static void client_subcall_reply_cb(void *closure, int iserror, json_object *object)
+static void client_subcall_reply_cb(void *closure, int status, json_object *object)
 {
-       client_send_subcall_reply(closure, iserror, object);
+       client_send_subcall_reply(closure, status, object);
        free(closure);
 }
 
@@ -769,6 +797,31 @@ static void client_subcall(struct afb_stub_ws *stubws, struct readbuf *rb)
        }
 }
 
+/* pushs an event */
+static void client_on_description(struct afb_stub_ws *stubws, struct readbuf *rb)
+{
+       uint32_t descid;
+       struct client_describe *desc;
+       struct json_object *object;
+
+       if (!readbuf_uint32(rb, &descid))
+               ERROR("unreadable description");
+       else {
+               desc = stubws->describes;
+               while (desc && desc->descid != descid)
+                       desc = desc->next;
+               if (desc == NULL)
+                       ERROR("unexpected description");
+               else {
+                       if (readbuf_object(rb, &object))
+                               desc->result = object;
+                       else
+                               ERROR("bad description");
+                       jobs_leave(desc->jobloop);
+               }
+       }
+}
+
 /* callback when receiving binary data */
 static void client_on_binary(void *closure, char *data, size_t size)
 {
@@ -805,6 +858,9 @@ static void client_on_binary(void *closure, char *data, size_t size)
                case CHAR_FOR_SUBCALL_CALL: /* subcall */
                        client_subcall(stubws, &rb);
                        break;
+               case CHAR_FOR_DESCRIPTION: /* description */
+                       client_on_description(stubws, &rb);
+                       break;
                default: /* unexpected message */
                        /* TODO: close the connection */
                        break;
@@ -866,6 +922,70 @@ end:
        pthread_mutex_unlock(&stubws->mutex);
 }
 
+static void client_send_describe_cb(int signum, void *closure, struct jobloop *jobloop)
+{
+       struct client_describe *desc = closure;
+       struct writebuf wb = { .count = 0 };
+
+       if (!signum) {
+               /* record the jobloop */
+               desc->jobloop = jobloop;
+
+               /* send */
+               if (writebuf_char(&wb, CHAR_FOR_DESCRIBE)
+                && writebuf_uint32(&wb, desc->descid)
+                && afb_ws_binary_v(desc->stubws->ws, wb.iovec, wb.count) >= 0)
+                       return;
+       }
+       jobs_leave(jobloop);
+}
+
+/* get the description */
+static struct json_object *client_describe_cb(void * closure)
+{
+       struct client_describe desc, *d;
+       struct afb_stub_ws *stubws = closure;
+
+       /* fill in stack the description of the task */
+       pthread_mutex_lock(&stubws->mutex);
+       desc.result = NULL;
+       desc.descid = ptr2id(&desc);
+       d = stubws->describes;
+       while (d) {
+               if (d->descid != desc.descid)
+                       d = d->next;
+               else {
+                       desc.descid++;
+                       d = stubws->describes;
+               }
+       }
+       desc.stubws = stubws;
+       desc.next = stubws->describes;
+       stubws->describes = &desc;
+       pthread_mutex_unlock(&stubws->mutex);
+
+       /* synchronous job: send the request and wait its result */
+       jobs_enter(NULL, 0, client_send_describe_cb, &desc);
+
+       /* unlink and send the result */
+       pthread_mutex_lock(&stubws->mutex);
+       d = stubws->describes;
+       if (d == &desc)
+               stubws->describes = desc.next;
+       else {
+               while (d) {
+                       if (d->next != &desc)
+                               d = d->next;
+                       else {
+                               d->next = desc.next;
+                               d = NULL;
+                       }
+               }
+       }
+       pthread_mutex_unlock(&stubws->mutex);
+       return desc.result;
+}
+
 /******************* client description part for server *****************************/
 
 /* on call, propagate it to the ws service */
@@ -923,14 +1043,14 @@ overflow:
 /* on subcall reply */
 static void server_on_subcall_reply(struct afb_stub_ws *stubws, struct readbuf *rb)
 {
-       char iserror;
+       char ie;
        uint32_t subcallid;
        struct json_object *object;
        struct server_subcall *sc, **psc;
 
        /* reads the call message data */
        if (!readbuf_uint32(rb, &subcallid)
-        || !readbuf_char(rb, &iserror)
+        || !readbuf_char(rb, &ie)
         || !readbuf_object(rb, &object)) {
                /* TODO bad protocol */
                return;
@@ -947,12 +1067,62 @@ static void server_on_subcall_reply(struct afb_stub_ws *stubws, struct readbuf *
        } else {
                *psc = sc->next;
                pthread_mutex_unlock(&stubws->mutex);
-               sc->callback(sc->closure, (int)iserror, object);
+               sc->callback(sc->closure, -(int)ie, object);
                free(sc);
        }
        json_object_put(object);
 }
 
+static void server_send_description(struct afb_stub_ws *stubws, uint32_t descid, struct json_object *descobj)
+{
+       struct writebuf wb = { .count = 0 };
+
+       if (!writebuf_char(&wb, CHAR_FOR_DESCRIPTION)
+        || !writebuf_uint32(&wb, descid)
+        || !writebuf_object(&wb, descobj)
+         || afb_ws_binary_v(stubws->ws, wb.iovec, wb.count) < 0)
+               ERROR("can't send description");
+}
+
+static void server_describe_job(int signum, void *closure)
+{
+       struct json_object *obj;
+       struct server_describe *desc = closure;
+
+       /* get the description if possible */
+       obj = !signum ? afb_apiset_describe(desc->stubws->apiset, desc->stubws->apiname) : NULL;
+
+       /* send it */
+       server_send_description(desc->stubws, desc->descid, obj);
+       json_object_put(obj);
+       afb_stub_ws_unref(desc->stubws);
+       free(desc);
+}
+
+/* on describe, propagate it to the ws service */
+static void server_on_describe(struct afb_stub_ws *stubws, struct readbuf *rb)
+{
+
+       uint32_t descid;
+       struct server_describe *desc;
+
+       /* reads the descid */
+       if (readbuf_uint32(rb, &descid)) {
+               /* create asynchronous job */
+               desc = malloc(sizeof *desc);
+               if (desc) {
+                       desc->descid = descid;
+                       desc->stubws = stubws;
+                       afb_stub_ws_addref(stubws);
+                       if (jobs_queue(NULL, 0, server_describe_job, desc) < 0)
+                               server_describe_job(0, desc);
+                       return;
+               }
+               server_send_description(stubws, descid, NULL);
+       }
+       ERROR("can't provide description");
+}
+
 /* callback when receiving binary data */
 static void server_on_binary(void *closure, char *data, size_t size)
 {
@@ -965,6 +1135,9 @@ static void server_on_binary(void *closure, char *data, size_t size)
                case CHAR_FOR_SUBCALL_REPLY:
                        server_on_subcall_reply(closure, &rb);
                        break;
+               case CHAR_FOR_DESCRIBE:
+                       server_on_describe(closure, &rb);
+                       break;
                default: /* unexpected message */
                        /* TODO: close the connection */
                        break;
@@ -1035,6 +1208,8 @@ static void server_on_hangup(void *closure)
        if (stubws->fd >= 0) {
                close(stubws->fd);
                stubws->fd = -1;
+               if (stubws->on_hangup)
+                       stubws->on_hangup(stubws);
        }
 
        /* release the client */
@@ -1070,7 +1245,8 @@ static const struct afb_ws_itf server_ws_itf =
 };
 
 static struct afb_api_itf ws_api_itf = {
-       .call = client_call_cb
+       .call = client_call_cb,
+       .describe = client_describe_cb
 };
 
 /*****************************************************/
@@ -1101,19 +1277,7 @@ static struct afb_stub_ws *afb_stub_ws_create(int fd, const char *apiname, struc
 
 struct afb_stub_ws *afb_stub_ws_create_client(int fd, const char *apiname, struct afb_apiset *apiset)
 {
-       struct afb_api afb_api;
-       struct afb_stub_ws *stubws;
-
-       stubws = afb_stub_ws_create(fd, apiname, apiset, &stub_ws_client_ws_itf);
-       if (stubws) {
-               afb_api.closure = stubws;
-               afb_api.itf = &ws_api_itf;
-               if (afb_apiset_add(apiset, stubws->apiname, afb_api) >= 0)
-                       return stubws;
-               afb_stub_ws_unref(stubws);
-       }
-       return NULL;
-
+       return afb_stub_ws_create(fd, apiname, apiset, &stub_ws_client_ws_itf);
 }
 
 struct afb_stub_ws *afb_stub_ws_create_server(int fd, const char *apiname, struct afb_apiset *apiset)
@@ -1156,3 +1320,28 @@ void afb_stub_ws_addref(struct afb_stub_ws *stubws)
        __atomic_add_fetch(&stubws->refcount, 1, __ATOMIC_RELAXED);
 }
 
+void afb_stub_ws_on_hangup(struct afb_stub_ws *stubws, void (*on_hangup)(struct afb_stub_ws*))
+{
+       stubws->on_hangup = on_hangup;
+}
+
+const char *afb_stub_ws_name(struct afb_stub_ws *stubws)
+{
+       return stubws->apiname;
+}
+
+struct afb_api afb_stub_ws_client_api(struct afb_stub_ws *stubws)
+{
+       struct afb_api api;
+
+       assert(!stubws->listener); /* check client */
+       api.closure = stubws;
+       api.itf = &ws_api_itf;
+       return api;
+}
+
+int afb_stub_ws_client_add(struct afb_stub_ws *stubws, struct afb_apiset *apiset)
+{
+       return afb_apiset_add(apiset, stubws->apiname, afb_stub_ws_client_api(stubws));
+}
+