X-Git-Url: https://gerrit.automotivelinux.org/gerrit/gitweb?a=blobdiff_plain;f=plugins%2Fafm-main-plugin%2Futils-jbus.c;h=9d6c1d5dbad9b0d4bfa187b6d98ac00ab16cb149;hb=3a10185ae380956bd356c73f34138f97255bf7e6;hp=d58def90ca1cecdf057a476146bb8e905e6322d1;hpb=5d293eac235f2d59eccc9b23de803821b821c12e;p=src%2Fapp-framework-binder.git diff --git a/plugins/afm-main-plugin/utils-jbus.c b/plugins/afm-main-plugin/utils-jbus.c index d58def90..9d6c1d5d 100644 --- a/plugins/afm-main-plugin/utils-jbus.c +++ b/plugins/afm-main-plugin/utils-jbus.c @@ -21,12 +21,16 @@ #include #include #include +#include +#include #include #include #include "utils-jbus.h" +#define MAX_JSON_DEPTH 5 + struct jreq; struct jservice; struct jbus; @@ -41,16 +45,18 @@ struct jreq { struct jservice { struct jservice *next; char *method; - void (*oncall_s)(struct jreq *, const char *); - void (*oncall_j)(struct jreq *, struct json_object *); + void (*oncall_s)(struct jreq *, const char *, void *); + void (*oncall_j)(struct jreq *, struct json_object *, void *); + void *data; }; /* structure for signal handlers */ struct jsignal { struct jsignal *next; char *name; - void (*onsignal_s)(const char *); - void (*onsignal_j)(struct json_object *); + void (*onsignal_s)(const char *, void *); + void (*onsignal_j)(struct json_object *, void *); + void *data; }; /* structure for recording asynchronous requests */ @@ -71,12 +77,16 @@ struct respsync { /* structure for handling either client or server jbus on dbus */ struct jbus { int refcount; + struct json_tokener *tokener; struct jservice *services; DBusConnection *connection; struct jsignal *signals; struct jrespw *waiters; char *path; char *name; + int watchnr; + int watchfd; + short watchflags; }; /*********************** STATIC COMMON METHODS *****************/ @@ -112,8 +122,9 @@ static int matchitf(struct jbus *jbus, DBusMessage *message) static int add_service( struct jbus *jbus, const char *method, - void (*oncall_s)(struct jreq*, const char*), - void (*oncall_j)(struct jreq*, struct json_object*) + void (*oncall_s)(struct jreq*, const char*, void*), + void (*oncall_j)(struct jreq*, struct json_object*, void*), + void *data ) { struct jservice *srv; @@ -133,6 +144,7 @@ static int add_service( /* record the service */ srv->oncall_s = oncall_s; srv->oncall_j = oncall_j; + srv->data = data; srv->next = jbus->services; jbus->services = srv; @@ -147,8 +159,9 @@ error: static int add_signal( struct jbus *jbus, const char *name, - void (*onsignal_s)(const char*), - void (*onsignal_j)(struct json_object*) + void (*onsignal_s)(const char*, void*), + void (*onsignal_j)(struct json_object*, void*), + void *data ) { char *rule; @@ -156,7 +169,11 @@ static int add_signal( /* record the signal */ if (jbus->signals == NULL) { +#if 0 + if (0 >= asprintf(&rule, "type='signal',interface='%s',path='%s'", jbus->name, jbus->path)) +#else if (0 >= asprintf(&rule, "type='signal',sender='%s',interface='%s',path='%s'", jbus->name, jbus->name, jbus->path)) +#endif return -1; dbus_bus_add_match(jbus->connection, rule, NULL); free(rule); @@ -173,6 +190,7 @@ static int add_signal( /* record the signal */ sig->onsignal_s = onsignal_s; sig->onsignal_j = onsignal_j; + sig->data = data; sig->next = jbus->signals; jbus->signals = sig; @@ -241,6 +259,17 @@ static void sync_of_replies(int status, const char *value, void *data) s->replied = 1; } +static int parse(struct jbus *jbus, const char *msg, struct json_object **obj) +{ + json_tokener_reset(jbus->tokener); + *obj = json_tokener_parse_ex(jbus->tokener, msg, -1); + if (json_tokener_get_error(jbus->tokener) == json_tokener_success) + return 1; + json_object_put(*obj); + *obj = NULL; + return 0; +} + static DBusHandlerResult incoming_resp(DBusConnection *connection, DBusMessage *message, struct jbus *jbus, int iserror) { int status; @@ -259,7 +288,9 @@ static DBusHandlerResult incoming_resp(DBusConnection *connection, DBusMessage * *prv = jrw->next; /* retrieve the string value */ - if (!dbus_message_get_args(message, NULL, DBUS_TYPE_STRING, &str, DBUS_TYPE_INVALID)) { + if (dbus_message_get_args(message, NULL, DBUS_TYPE_STRING, &str, DBUS_TYPE_INVALID)) + status = 0; + else { status = -1; str = NULL; reply = NULL; @@ -269,8 +300,7 @@ static DBusHandlerResult incoming_resp(DBusConnection *connection, DBusMessage * if (jrw->onresp_s) jrw->onresp_s(iserror ? -1 : status, str, jrw->data); else { - reply = json_tokener_parse(str); - status = reply ? 0 : -1; + status = parse(jbus, str, &reply) - 1; jrw->onresp_j(iserror ? -1 : status, reply, jrw->data); json_object_put(reply); } @@ -311,14 +341,13 @@ static DBusHandlerResult incoming_call(DBusConnection *connection, DBusMessage * return reply_invalid_request(jreq); if (srv->oncall_s) { /* handling strings only */ - srv->oncall_s(jreq, str); + srv->oncall_s(jreq, str, srv->data); } else { /* handling json only */ - query = json_tokener_parse(str); - if (query == NULL) + if (!parse(jbus, str, &query)) return reply_invalid_request(jreq); - srv->oncall_j(jreq, query); + srv->oncall_j(jreq, query, srv->data); json_object_put(query); } return DBUS_HANDLER_RESULT_HANDLED; @@ -347,13 +376,12 @@ static DBusHandlerResult incoming_signal(DBusConnection *connection, DBusMessage if (dbus_message_get_args(message, NULL, DBUS_TYPE_STRING, &str, DBUS_TYPE_INVALID)) { if (sig->onsignal_s) { /* handling strings only */ - sig->onsignal_s(str); + sig->onsignal_s(str, sig->data); } else { /* handling json only */ - obj = json_tokener_parse(str); - if (obj != NULL) { - sig->onsignal_j(obj); + if (parse(jbus, str, &obj)) { + sig->onsignal_j(obj, sig->data); json_object_put(obj); } } @@ -376,9 +404,73 @@ static DBusHandlerResult incoming(DBusConnection *connection, DBusMessage *messa return DBUS_HANDLER_RESULT_NOT_YET_HANDLED; } +static void watchset(DBusWatch *watch, struct jbus *jbus) +{ + unsigned int flags; + short wf; + + flags = dbus_watch_get_flags(watch); + wf = jbus->watchflags; + if (dbus_watch_get_enabled(watch)) { + if (flags & DBUS_WATCH_READABLE) + wf |= POLLIN; + if (flags & DBUS_WATCH_WRITABLE) + wf |= POLLOUT; + } + else { + if (flags & DBUS_WATCH_READABLE) + wf &= ~POLLIN; + if (flags & DBUS_WATCH_WRITABLE) + wf &= ~POLLOUT; + } + jbus->watchflags = wf; +} + +static void watchdel(DBusWatch *watch, void *data) +{ + struct jbus *jbus = data; + + assert(jbus->watchnr > 0); + assert(jbus->watchfd == dbus_watch_get_unix_fd(watch)); + jbus->watchnr--; +} + +static void watchtoggle(DBusWatch *watch, void *data) +{ + struct jbus *jbus = data; + + assert(jbus->watchnr > 0); + assert(jbus->watchfd == dbus_watch_get_unix_fd(watch)); + watchset(watch, jbus); +} + +static dbus_bool_t watchadd(DBusWatch *watch, void *data) +{ + struct jbus *jbus = data; + if (jbus->watchnr == 0) { + jbus->watchfd = dbus_watch_get_unix_fd(watch); + jbus->watchflags = 0; + } + else if (jbus->watchfd != dbus_watch_get_unix_fd(watch)) + return FALSE; + jbus->watchnr++; + watchset(watch, jbus); + return TRUE; +} + /************************** MAIN FUNCTIONS *****************************************/ -struct jbus *create_jbus(int session, const char *path) +struct jbus *create_jbus_system(const char *path) +{ + return create_jbus(path, 0); +} + +struct jbus *create_jbus_session(const char *path) +{ + return create_jbus(path, 1); +} + +struct jbus *create_jbus(const char *path, int session) { struct jbus *jbus; char *name; @@ -390,8 +482,12 @@ struct jbus *create_jbus(int session, const char *path) goto error; } jbus->refcount = 1; + jbus->tokener = json_tokener_new_ex(MAX_JSON_DEPTH); + if (jbus->tokener == NULL) { + errno = ENOMEM; + goto error2; + } jbus->path = strdup(path); - jbus->name = NULL; if (jbus->path == NULL) { errno = ENOMEM; goto error2; @@ -417,12 +513,10 @@ struct jbus *create_jbus(int session, const char *path) /* connect */ jbus->connection = dbus_bus_get(session ? DBUS_BUS_SESSION : DBUS_BUS_SYSTEM, NULL); - if (jbus->connection == NULL) { + if (jbus->connection == NULL + || !dbus_connection_add_filter(jbus->connection, incoming, jbus, NULL) + || !dbus_connection_set_watch_functions(jbus->connection, watchadd, watchdel, watchtoggle, jbus, NULL)) goto error2; - } - if (!dbus_connection_add_filter(jbus->connection, incoming, jbus, NULL)) { - goto error2; - } return jbus; @@ -441,12 +535,15 @@ void jbus_unref(struct jbus *jbus) { struct jservice *srv; if (!--jbus->refcount) { - dbus_connection_unref(jbus->connection); + if (jbus->connection != NULL) + dbus_connection_unref(jbus->connection); while((srv = jbus->services) != NULL) { jbus->services = srv->next; free(srv->method); free(srv); } + if (jbus->tokener != NULL) + json_tokener_free(jbus->tokener); free(jbus->name); free(jbus->path); free(jbus); @@ -512,7 +609,8 @@ int jbus_send_signal_s(struct jbus *jbus, const char *name, const char *content) if (message == NULL) goto error; - if (!dbus_message_append_args(message, DBUS_TYPE_STRING, &content, DBUS_TYPE_INVALID)) { + if (!dbus_message_set_sender(message, jbus->name) + || !dbus_message_append_args(message, DBUS_TYPE_STRING, &content, DBUS_TYPE_INVALID)) { dbus_message_unref(message); goto error; } @@ -537,14 +635,14 @@ int jbus_send_signal_j(struct jbus *jbus, const char *name, struct json_object * return jbus_send_signal_s(jbus, name, str); } -int jbus_add_service_s(struct jbus *jbus, const char *method, void (*oncall)(struct jreq *, const char *)) +int jbus_add_service_s(struct jbus *jbus, const char *method, void (*oncall)(struct jreq *, const char *, void *), void *data) { - return add_service(jbus, method, oncall, NULL); + return add_service(jbus, method, oncall, NULL, data); } -int jbus_add_service_j(struct jbus *jbus, const char *method, void (*oncall)(struct jreq *, struct json_object *)) +int jbus_add_service_j(struct jbus *jbus, const char *method, void (*oncall)(struct jreq *, struct json_object *, void *), void *data) { - return add_service(jbus, method, NULL, oncall); + return add_service(jbus, method, NULL, oncall, data); } int jbus_start_serving(struct jbus *jbus) @@ -562,12 +660,89 @@ int jbus_start_serving(struct jbus *jbus) } } +int jbus_fill_pollfds(struct jbus **jbuses, int njbuses, struct pollfd *fds) +{ + int i, r; + + for (r = i = 0 ; i < njbuses ; i++) { + if (jbuses[i]->watchnr) { + fds[r].fd = jbuses[i]->watchfd; + fds[r].events = jbuses[i]->watchflags; + r++; + } + } + return r; +} + +int jbus_dispatch_pollfds(struct jbus **jbuses, int njbuses, struct pollfd *fds, int maxcount) +{ + int i, r, n; + DBusDispatchStatus sts; + + for (r = n = i = 0 ; i < njbuses && n < maxcount ; i++) { + if (jbuses[i]->watchnr && fds[r].fd == jbuses[i]->watchfd) { + if (fds[r].revents) { + dbus_connection_read_write(jbuses[i]->connection, 0); + sts = dbus_connection_get_dispatch_status(jbuses[i]->connection); + while(sts == DBUS_DISPATCH_DATA_REMAINS && n < maxcount) { + sts = dbus_connection_dispatch(jbuses[i]->connection); + n++; + } + } + r++; + } + } + return n; +} + +int jbus_dispatch_multiple(struct jbus **jbuses, int njbuses, int maxcount) +{ + int i, r; + DBusDispatchStatus sts; + + for (i = r = 0 ; i < njbuses && r < maxcount ; i++) { + dbus_connection_read_write(jbuses[i]->connection, 0); + sts = dbus_connection_get_dispatch_status(jbuses[i]->connection); + while(sts == DBUS_DISPATCH_DATA_REMAINS && r < maxcount) { + sts = dbus_connection_dispatch(jbuses[i]->connection); + r++; + } + } + return r; +} + +int jbus_read_write_dispatch_multiple(struct jbus **jbuses, int njbuses, int toms, int maxcount) +{ + int n, r, s; + struct pollfd *fds; + + if (njbuses < 0 || njbuses > 100) { + errno = EINVAL; + return -1; + } + fds = alloca((unsigned)njbuses * sizeof * fds); + assert(fds != NULL); + + r = jbus_dispatch_multiple(jbuses, njbuses, maxcount); + if (r) + return r; + n = jbus_fill_pollfds(jbuses, njbuses, fds); + for(;;) { + s = poll(fds, (nfds_t)n, toms); + if (s >= 0) + break; + if (errno != EINTR) + return r ? r : s; + toms = 0; + } + n = jbus_dispatch_pollfds(jbuses, njbuses, fds, maxcount - r); + return n >= 0 ? r + n : r ? r : n; +} + int jbus_read_write_dispatch(struct jbus *jbus, int toms) { - if (dbus_connection_read_write_dispatch(jbus->connection, toms)); - return 0; - errno = EPIPE; - return -1; + int r = jbus_read_write_dispatch_multiple(&jbus, 1, toms, 1000); + return r < 0 ? r : 0; } int jbus_call_ss(struct jbus *jbus, const char *method, const char *query, void (*onresp)(int, const char*, void*), void *data) @@ -616,7 +791,7 @@ struct json_object *jbus_call_sj_sync(struct jbus *jbus, const char *method, con if (str == NULL) obj = NULL; else { - obj = json_tokener_parse(str); + parse(jbus, str, &obj); free(str); } return obj; @@ -642,14 +817,14 @@ struct json_object *jbus_call_jj_sync(struct jbus *jbus, const char *method, str return jbus_call_sj_sync(jbus, method, str); } -int jbus_on_signal_s(struct jbus *jbus, const char *name, void (*onsig)(const char *)) +int jbus_on_signal_s(struct jbus *jbus, const char *name, void (*onsig)(const char *, void *), void *data) { - return add_signal(jbus, name, onsig, NULL); + return add_signal(jbus, name, onsig, NULL, data); } -int jbus_on_signal_j(struct jbus *jbus, const char *name, void (*onsig)(struct json_object *)) +int jbus_on_signal_j(struct jbus *jbus, const char *name, void (*onsig)(struct json_object *, void *), void *data) { - return add_signal(jbus, name, NULL, onsig); + return add_signal(jbus, name, NULL, onsig, data); } /************************** FEW LITTLE TESTS *****************************************/ @@ -658,13 +833,13 @@ int jbus_on_signal_j(struct jbus *jbus, const char *name, void (*onsig)(struct j #include #include struct jbus *jbus; -void ping(struct jreq *jreq, struct json_object *request) +void ping(struct jreq *jreq, struct json_object *request, void *unused) { printf("ping(%s) -> %s\n",json_object_to_json_string(request),json_object_to_json_string(request)); jbus_reply_j(jreq, request); json_object_put(request); } -void incr(struct jreq *jreq, struct json_object *request) +void incr(struct jreq *jreq, struct json_object *request, void *unused) { static int counter = 0; struct json_object *res = json_object_new_int(++counter); @@ -678,12 +853,11 @@ int main() { int s1, s2, s3; jbus = create_jbus(1, "/bzh/iot/jdbus"); - s1 = jbus_add_service_j(jbus, "ping", ping); - s2 = jbus_add_service_j(jbus, "incr", incr); + s1 = jbus_add_service_j(jbus, "ping", ping, NULL); + s2 = jbus_add_service_j(jbus, "incr", incr, NULL); s3 = jbus_start_serving(jbus); printf("started %d %d %d\n", s1, s2, s3); - while (!jbus_read_write_dispatch (jbus, -1)) - ; + while (!jbus_read_write_dispatch (jbus, -1)); } #endif #ifdef CLIENT @@ -710,8 +884,7 @@ int main() jbus_read_write_dispatch (jbus, 1); } printf("[[[%s]]]\n", jbus_call_ss_sync(jbus, "ping", "\"formidable!\"")); - while (!jbus_read_write_dispatch (jbus, -1)) - ; + while (!jbus_read_write_dispatch (jbus, -1)); } #endif