utils-jbus: adds closure
[src/app-framework-main.git] / src / utils-jbus.c
index 201b0a6..6d01b1e 100644 (file)
 #include <stdio.h>
 #include <errno.h>
 #include <string.h>
+#include <poll.h>
+#include <assert.h>
 
 #include <json.h>
 #include <dbus/dbus.h>
 
 #include "utils-jbus.h"
 
+#define MAX_JSON_DEPTH 5
+
 struct jreq;
 struct jservice;
 struct jbus;
@@ -41,16 +45,18 @@ struct jreq {
 struct jservice {
        struct jservice *next;
        char *method;
-       void (*oncall_s)(struct jreq *, const char *);
-       void (*oncall_j)(struct jreq *, struct json_object *);
+       void (*oncall_s)(struct jreq *, const char *, void *);
+       void (*oncall_j)(struct jreq *, struct json_object *, void *);
+       void *data;
 };
 
 /* structure for signal handlers */
 struct jsignal {
        struct jsignal *next;
        char *name;
-       void (*onsignal_s)(const char *);
-       void (*onsignal_j)(struct json_object *);
+       void (*onsignal_s)(const char *, void *);
+       void (*onsignal_j)(struct json_object *, void *);
+       void *data;
 };
 
 /* structure for recording asynchronous requests */
@@ -71,12 +77,16 @@ struct respsync {
 /* structure for handling either client or server jbus on dbus */
 struct jbus {
        int refcount;
+       struct json_tokener *tokener;
        struct jservice *services;
        DBusConnection *connection;
        struct jsignal *signals;
        struct jrespw *waiters;
        char *path;
        char *name;
+       int watchnr;
+       int watchfd;
+       int watchflags;
 };
 
 /*********************** STATIC COMMON METHODS *****************/
@@ -112,8 +122,9 @@ static int matchitf(struct jbus *jbus, DBusMessage *message)
 static int add_service(
                struct jbus *jbus,
                const char *method,
-               void (*oncall_s)(struct jreq*, const char*),
-               void (*oncall_j)(struct jreq*, struct json_object*)
+               void (*oncall_s)(struct jreq*, const char*, void*),
+               void (*oncall_j)(struct jreq*, struct json_object*, void*),
+               void *data
 )
 {
        struct jservice *srv;
@@ -133,6 +144,7 @@ static int add_service(
        /* record the service */
        srv->oncall_s = oncall_s;
        srv->oncall_j = oncall_j;
+       srv->data = data;
        srv->next = jbus->services;
        jbus->services = srv;
 
@@ -147,8 +159,9 @@ error:
 static int add_signal(
        struct jbus *jbus,
        const char *name,
-       void (*onsignal_s)(const char*),
-       void (*onsignal_j)(struct json_object*)
+       void (*onsignal_s)(const char*, void*),
+       void (*onsignal_j)(struct json_object*, void*),
+       void *data
 )
 {
        char *rule;
@@ -156,7 +169,11 @@ static int add_signal(
 
        /* record the signal */
        if (jbus->signals == NULL) {
+#if 0
+               if (0 >= asprintf(&rule, "type='signal',interface='%s',path='%s'", jbus->name, jbus->path))
+#else
                if (0 >= asprintf(&rule, "type='signal',sender='%s',interface='%s',path='%s'", jbus->name, jbus->name, jbus->path))
+#endif
                        return -1;
                dbus_bus_add_match(jbus->connection, rule, NULL);
                free(rule);
@@ -173,6 +190,7 @@ static int add_signal(
        /* record the signal */
        sig->onsignal_s = onsignal_s;
        sig->onsignal_j = onsignal_j;
+       sig->data = data;
        sig->next = jbus->signals;
        jbus->signals = sig;
 
@@ -241,6 +259,17 @@ static void sync_of_replies(int status, const char *value, void *data)
        s->replied = 1;
 }
 
+static int parse(struct jbus *jbus, const char *msg, struct json_object **obj)
+{
+       json_tokener_reset(jbus->tokener);
+       *obj = json_tokener_parse_ex(jbus->tokener, msg, -1);
+       if (json_tokener_get_error(jbus->tokener) == json_tokener_success)
+               return 1;
+       json_object_put(*obj);
+       *obj = NULL;
+       return 0;
+}
+
 static DBusHandlerResult incoming_resp(DBusConnection *connection, DBusMessage *message, struct jbus *jbus, int iserror)
 {
        int status;
@@ -259,7 +288,9 @@ static DBusHandlerResult incoming_resp(DBusConnection *connection, DBusMessage *
        *prv = jrw->next;
 
        /* retrieve the string value */
-       if (!dbus_message_get_args(message, NULL, DBUS_TYPE_STRING, &str, DBUS_TYPE_INVALID)) {
+       if (dbus_message_get_args(message, NULL, DBUS_TYPE_STRING, &str, DBUS_TYPE_INVALID))
+               status = 0;
+       else {
                status = -1;
                str = NULL;
                reply = NULL;
@@ -269,8 +300,7 @@ static DBusHandlerResult incoming_resp(DBusConnection *connection, DBusMessage *
        if (jrw->onresp_s)
                jrw->onresp_s(iserror ? -1 : status, str, jrw->data);
        else {
-               reply = json_tokener_parse(str);
-               status = reply ? 0 : -1;
+               status = parse(jbus, str, &reply) - 1;
                jrw->onresp_j(iserror ? -1 : status, reply, jrw->data);
                json_object_put(reply);
        }
@@ -311,14 +341,13 @@ static DBusHandlerResult incoming_call(DBusConnection *connection, DBusMessage *
                return reply_invalid_request(jreq);
        if (srv->oncall_s) {
                /* handling strings only */
-               srv->oncall_s(jreq, str);
+               srv->oncall_s(jreq, str, srv->data);
        }
        else {
                /* handling json only */
-               query = json_tokener_parse(str);
-               if (query == NULL)
+               if (!parse(jbus, str, &query))
                        return reply_invalid_request(jreq);
-               srv->oncall_j(jreq, query);
+               srv->oncall_j(jreq, query, srv->data);
                json_object_put(query);
        }
        return DBUS_HANDLER_RESULT_HANDLED;
@@ -347,13 +376,12 @@ static DBusHandlerResult incoming_signal(DBusConnection *connection, DBusMessage
        if (dbus_message_get_args(message, NULL, DBUS_TYPE_STRING, &str, DBUS_TYPE_INVALID)) {
                if (sig->onsignal_s) {
                        /* handling strings only */
-                       sig->onsignal_s(str);
+                       sig->onsignal_s(str, sig->data);
                }
                else {
                        /* handling json only */
-                       obj = json_tokener_parse(str);
-                       if (obj != NULL) {
-                               sig->onsignal_j(obj);
+                       if (parse(jbus, str, &obj)) {
+                               sig->onsignal_j(obj, sig->data);
                                json_object_put(obj);
                        }
                }
@@ -376,9 +404,74 @@ static DBusHandlerResult incoming(DBusConnection *connection, DBusMessage *messa
        return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
 }
 
+static void watchset(DBusWatch *watch, struct jbus *jbus)
+{
+       unsigned int flags;
+       int wf, e;
+
+       flags = dbus_watch_get_flags(watch);
+       e = dbus_watch_get_enabled(watch);
+       wf = jbus->watchflags;
+       if (e) {
+               if (flags & DBUS_WATCH_READABLE)
+                       wf |= POLLIN;
+               if (flags & DBUS_WATCH_WRITABLE)
+                       wf |= POLLOUT;
+       }
+       else {
+               if (flags & DBUS_WATCH_READABLE)
+                       wf &= ~POLLIN;
+               if (flags & DBUS_WATCH_WRITABLE)
+                       wf &= ~POLLOUT;
+       }
+       jbus->watchflags = wf;
+}
+
+static void watchdel(DBusWatch *watch, void *data)
+{
+       struct jbus *jbus = data;
+
+       assert(jbus->watchnr > 0);
+       assert(jbus->watchfd == dbus_watch_get_unix_fd(watch));
+       jbus->watchnr--;
+}
+
+static void watchtoggle(DBusWatch *watch, void *data)
+{
+       struct jbus *jbus = data;
+
+       assert(jbus->watchnr > 0);
+       assert(jbus->watchfd == dbus_watch_get_unix_fd(watch));
+       watchset(watch, jbus);
+}
+
+static dbus_bool_t watchadd(DBusWatch *watch, void *data)
+{
+       struct jbus *jbus = data;
+       if (jbus->watchnr == 0) {
+               jbus->watchfd = dbus_watch_get_unix_fd(watch);
+               jbus->watchflags = 0;
+       }
+       else if (jbus->watchfd != dbus_watch_get_unix_fd(watch))
+               return FALSE;
+       jbus->watchnr++;
+       watchset(watch, jbus);
+       return TRUE;
+}
+
 /************************** MAIN FUNCTIONS *****************************************/
 
-struct jbus *create_jbus(int session, const char *path)
+struct jbus *create_jbus_system(const char *path)
+{
+       return create_jbus(path, 0);
+}
+
+struct jbus *create_jbus_session(const char *path)
+{
+       return create_jbus(path, 1);
+}
+
+struct jbus *create_jbus(const char *path, int session)
 {
        struct jbus *jbus;
        char *name;
@@ -390,6 +483,11 @@ struct jbus *create_jbus(int session, const char *path)
                goto error;
        }
        jbus->refcount = 1;
+       jbus->tokener = json_tokener_new_ex(MAX_JSON_DEPTH);
+       if (jbus->tokener == NULL) {
+               errno = ENOMEM;
+               goto error2;
+       }
        jbus->path = strdup(path);
        if (jbus->path == NULL) {
                errno = ENOMEM;
@@ -416,12 +514,10 @@ struct jbus *create_jbus(int session, const char *path)
 
        /* connect */
        jbus->connection = dbus_bus_get(session ? DBUS_BUS_SESSION : DBUS_BUS_SYSTEM, NULL);
-       if (jbus->connection == NULL) {
+       if (jbus->connection == NULL
+       || !dbus_connection_add_filter(jbus->connection, incoming, jbus, NULL)
+        || !dbus_connection_set_watch_functions(jbus->connection, watchadd, watchdel, watchtoggle, jbus, NULL))
                goto error2;
-       }
-       if (!dbus_connection_add_filter(jbus->connection, incoming, jbus, NULL)) {
-               goto error2;
-       }
 
        return jbus;
 
@@ -447,6 +543,8 @@ void jbus_unref(struct jbus *jbus)
                        free(srv->method);
                        free(srv);
                }
+               if (jbus->tokener != NULL)
+                       json_tokener_free(jbus->tokener);
                free(jbus->name);
                free(jbus->path);
                free(jbus);
@@ -512,7 +610,8 @@ int jbus_send_signal_s(struct jbus *jbus, const char *name, const char *content)
        if (message == NULL)
                goto error;
 
-       if (!dbus_message_append_args(message, DBUS_TYPE_STRING, &content, DBUS_TYPE_INVALID)) {
+       if (!dbus_message_set_sender(message, jbus->name)
+       ||  !dbus_message_append_args(message, DBUS_TYPE_STRING, &content, DBUS_TYPE_INVALID)) {
                dbus_message_unref(message);
                goto error;
        }
@@ -537,14 +636,14 @@ int jbus_send_signal_j(struct jbus *jbus, const char *name, struct json_object *
        return jbus_send_signal_s(jbus, name, str);
 }
 
-int jbus_add_service_s(struct jbus *jbus, const char *method, void (*oncall)(struct jreq *, const char *))
+int jbus_add_service_s(struct jbus *jbus, const char *method, void (*oncall)(struct jreq *, const char *, void *), void *data)
 {
-       return add_service(jbus, method, oncall, NULL);
+       return add_service(jbus, method, oncall, NULL, data);
 }
 
-int jbus_add_service_j(struct jbus *jbus, const char *method, void (*oncall)(struct jreq *, struct json_object *))
+int jbus_add_service_j(struct jbus *jbus, const char *method, void (*oncall)(struct jreq *, struct json_object *, void *), void *data)
 {
-       return add_service(jbus, method, NULL, oncall);
+       return add_service(jbus, method, NULL, oncall, data);
 }
 
 int jbus_start_serving(struct jbus *jbus)
@@ -562,12 +661,89 @@ int jbus_start_serving(struct jbus *jbus)
        }
 }
 
+int jbus_fill_pollfds(struct jbus **jbuses, int njbuses, struct pollfd *fds)
+{
+       int i, r;
+
+       for (r = i = 0 ; i < njbuses ; i++) {
+               if (jbuses[i]->watchnr) {
+                       fds[r].fd = jbuses[i]->watchfd;
+                       fds[r].events = jbuses[i]->watchflags;
+                       r++;
+               }
+       }
+       return r;
+}
+
+int jbus_dispatch_pollfds(struct jbus **jbuses, int njbuses, struct pollfd *fds, int maxcount)
+{
+       int i, r, n;
+       DBusDispatchStatus sts;
+
+       for (r = n = i = 0 ; i < njbuses && n < maxcount ; i++) {
+               if (jbuses[i]->watchnr && fds[r].fd == jbuses[i]->watchfd) {
+                       if (fds[r].revents) {
+                               dbus_connection_read_write(jbuses[i]->connection, 0);
+                               sts = dbus_connection_get_dispatch_status(jbuses[i]->connection);
+                               while(sts == DBUS_DISPATCH_DATA_REMAINS &&  n < maxcount) {
+                                       sts = dbus_connection_dispatch(jbuses[i]->connection);
+                                       n++;
+                               }
+                       }
+                       r++;
+               }
+       }
+       return n;
+}
+
+int jbus_dispatch_multiple(struct jbus **jbuses, int njbuses, int maxcount)
+{
+       int i, r;
+       DBusDispatchStatus sts;
+
+       for (i = r = 0 ; i < njbuses && r < maxcount ; i++) {
+               dbus_connection_read_write(jbuses[i]->connection, 0);
+               sts = dbus_connection_get_dispatch_status(jbuses[i]->connection);
+               while(sts == DBUS_DISPATCH_DATA_REMAINS &&  r < maxcount) {
+                       sts = dbus_connection_dispatch(jbuses[i]->connection);
+                       r++;
+               }
+       }
+       return r;
+}
+
+int jbus_read_write_dispatch_multiple(struct jbus **jbuses, int njbuses, int toms, int maxcount)
+{
+       int n, r, s;
+       struct pollfd *fds;
+
+       if (njbuses < 0 || njbuses > 100) {
+               errno = EINVAL;
+               return -1;
+       }
+       fds = alloca(njbuses * sizeof * fds);
+       assert(fds != NULL);
+
+       r = jbus_dispatch_multiple(jbuses, njbuses, maxcount);
+       if (r)
+               return r;
+       n = jbus_fill_pollfds(jbuses, njbuses, fds);
+       for(;;) {
+               s = poll(fds, n, toms);
+               if (s >= 0)
+                       break;
+               if (errno != EINTR)
+                       return r ? r : s;
+               toms = 0;
+       }
+       n = jbus_dispatch_pollfds(jbuses, njbuses, fds, maxcount - r);
+       return n >= 0 ? r + n : r ? r : n;
+}
+
 int jbus_read_write_dispatch(struct jbus *jbus, int toms)
 {
-       if (dbus_connection_read_write_dispatch(jbus->connection, toms));
-               return 0;
-       errno = EPIPE;
-       return -1;
+       int r = jbus_read_write_dispatch_multiple(&jbus, 1, toms, 1000);
+       return r < 0 ? r : 0;
 }
 
 int jbus_call_ss(struct jbus *jbus, const char *method, const char *query, void (*onresp)(int, const char*, void*), void *data)
@@ -616,7 +792,7 @@ struct json_object *jbus_call_sj_sync(struct jbus *jbus, const char *method, con
        if (str == NULL)
                obj = NULL;
        else {
-               obj = json_tokener_parse(str);
+               parse(jbus, str, &obj);
                free(str);
        }
        return obj;
@@ -642,14 +818,14 @@ struct json_object *jbus_call_jj_sync(struct jbus *jbus, const char *method, str
        return jbus_call_sj_sync(jbus, method, str);
 }
 
-int jbus_on_signal_s(struct jbus *jbus, const char *name, void (*onsig)(const char *))
+int jbus_on_signal_s(struct jbus *jbus, const char *name, void (*onsig)(const char *, void *), void *data)
 {
-       return add_signal(jbus, name, onsig, NULL);
+       return add_signal(jbus, name, onsig, NULL, data);
 }
 
-int jbus_on_signal_j(struct jbus *jbus, const char *name, void (*onsig)(struct json_object *))
+int jbus_on_signal_j(struct jbus *jbus, const char *name, void (*onsig)(struct json_object *, void *), void *data)
 {
-       return add_signal(jbus, name, NULL, onsig);
+       return add_signal(jbus, name, NULL, onsig, data);
 }
 
 /************************** FEW LITTLE TESTS *****************************************/
@@ -658,13 +834,13 @@ int jbus_on_signal_j(struct jbus *jbus, const char *name, void (*onsig)(struct j
 #include <stdio.h>
 #include <unistd.h>
 struct jbus *jbus;
-void ping(struct jreq *jreq, struct json_object *request)
+void ping(struct jreq *jreq, struct json_object *request, void *unused)
 {
 printf("ping(%s) -> %s\n",json_object_to_json_string(request),json_object_to_json_string(request));
        jbus_reply_j(jreq, request);
        json_object_put(request);       
 }
-void incr(struct jreq *jreq, struct json_object *request)
+void incr(struct jreq *jreq, struct json_object *request, void *unused)
 {
        static int counter = 0;
        struct json_object *res = json_object_new_int(++counter);
@@ -678,12 +854,11 @@ int main()
 {
        int s1, s2, s3;
        jbus = create_jbus(1, "/bzh/iot/jdbus");
-       s1 = jbus_add_service_j(jbus, "ping", ping);
-       s2 = jbus_add_service_j(jbus, "incr", incr);
+       s1 = jbus_add_service_j(jbus, "ping", ping, NULL);
+       s2 = jbus_add_service_j(jbus, "incr", incr, NULL);
        s3 = jbus_start_serving(jbus);
        printf("started %d %d %d\n", s1, s2, s3);
-       while (!jbus_read_write_dispatch (jbus, -1))
-               ;
+       while (!jbus_read_write_dispatch (jbus, -1));
 }
 #endif
 #ifdef CLIENT
@@ -710,8 +885,7 @@ int main()
                jbus_read_write_dispatch (jbus, 1);
        }
        printf("[[[%s]]]\n", jbus_call_ss_sync(jbus, "ping", "\"formidable!\""));
-       while (!jbus_read_write_dispatch (jbus, -1))
-               ;
+       while (!jbus_read_write_dispatch (jbus, -1));
 }
 #endif