4 author: José Bollo <jose.bollo@iot.bzh>
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
28 #include <dbus/dbus.h>
30 #include "utils-jbus.h"
32 #define MAX_JSON_DEPTH 5
38 /* structure for handled requests */
40 DBusConnection *connection;
44 /* structure for recorded services */
46 struct jservice *next;
48 void (*oncall_s)(struct jreq *, const char *);
49 void (*oncall_j)(struct jreq *, struct json_object *);
52 /* structure for signal handlers */
56 void (*onsignal_s)(const char *);
57 void (*onsignal_j)(struct json_object *);
60 /* structure for recording asynchronous requests */
65 void (*onresp_s)(int, const char*, void *);
66 void (*onresp_j)(int, struct json_object*, void *);
69 /* structure for synchronous requests */
75 /* structure for handling either client or server jbus on dbus */
78 struct json_tokener *tokener;
79 struct jservice *services;
80 DBusConnection *connection;
81 struct jsignal *signals;
82 struct jrespw *waiters;
90 /*********************** STATIC COMMON METHODS *****************/
92 static inline void free_jreq(struct jreq *jreq)
94 dbus_message_unref(jreq->request);
95 dbus_connection_unref(jreq->connection);
99 static inline int reply_out_of_memory(struct jreq *jreq)
101 static const char out_of_memory[] = "out of memory";
102 jbus_reply_error_s(jreq, out_of_memory);
107 static inline int reply_invalid_request(struct jreq *jreq)
109 static const char invalid_request[] = "invalid request";
110 jbus_reply_error_s(jreq, invalid_request);
111 return DBUS_HANDLER_RESULT_HANDLED;
114 static int matchitf(struct jbus *jbus, DBusMessage *message)
116 const char *itf = dbus_message_get_interface(message);
117 return itf != NULL && !strcmp(itf, jbus->name);
120 static int add_service(
123 void (*oncall_s)(struct jreq*, const char*),
124 void (*oncall_j)(struct jreq*, struct json_object*)
127 struct jservice *srv;
130 srv = malloc(sizeof * srv);
135 srv->method = strdup(method);
141 /* record the service */
142 srv->oncall_s = oncall_s;
143 srv->oncall_j = oncall_j;
144 srv->next = jbus->services;
145 jbus->services = srv;
155 static int add_signal(
158 void (*onsignal_s)(const char*),
159 void (*onsignal_j)(struct json_object*)
165 /* record the signal */
166 if (jbus->signals == NULL) {
168 if (0 >= asprintf(&rule, "type='signal',interface='%s',path='%s'", jbus->name, jbus->path))
170 if (0 >= asprintf(&rule, "type='signal',sender='%s',interface='%s',path='%s'", jbus->name, jbus->name, jbus->path))
173 dbus_bus_add_match(jbus->connection, rule, NULL);
178 sig = malloc(sizeof * sig);
181 sig->name = strdup(name);
185 /* record the signal */
186 sig->onsignal_s = onsignal_s;
187 sig->onsignal_j = onsignal_j;
188 sig->next = jbus->signals;
204 void (*onresp_s)(int status, const char *response, void *data),
205 void (*onresp_j)(int status, struct json_object *response, void *data),
212 resp = malloc(sizeof * resp);
218 msg = dbus_message_new_method_call(jbus->name, jbus->path, jbus->name, method);
224 if (!dbus_message_append_args(msg, DBUS_TYPE_STRING, &query, DBUS_TYPE_INVALID)) {
229 if (!dbus_connection_send(jbus->connection, msg, &resp->serial)) {
233 dbus_message_unref(msg);
235 resp->onresp_s = onresp_s;
236 resp->onresp_j = onresp_j;
237 resp->next = jbus->waiters;
238 jbus->waiters = resp;
242 dbus_message_unref(msg);
249 static void sync_of_replies(int status, const char *value, void *data)
251 struct respsync *s = data;
252 s->value = status ? NULL : strdup(value ? value : "");
256 static int parse(struct jbus *jbus, const char *msg, struct json_object **obj)
258 json_tokener_reset(jbus->tokener);
259 *obj = json_tokener_parse_ex(jbus->tokener, msg, -1);
260 if (json_tokener_get_error(jbus->tokener) == json_tokener_success)
262 json_object_put(*obj);
267 static DBusHandlerResult incoming_resp(DBusConnection *connection, DBusMessage *message, struct jbus *jbus, int iserror)
271 struct jrespw *jrw, **prv;
272 struct json_object *reply;
273 dbus_uint32_t serial;
275 /* search for the waiter */
276 serial = dbus_message_get_reply_serial(message);
277 prv = &jbus->waiters;
278 while ((jrw = *prv) != NULL && jrw->serial != serial)
281 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
284 /* retrieve the string value */
285 if (dbus_message_get_args(message, NULL, DBUS_TYPE_STRING, &str, DBUS_TYPE_INVALID))
295 jrw->onresp_s(iserror ? -1 : status, str, jrw->data);
297 status = parse(jbus, str, &reply) - 1;
298 jrw->onresp_j(iserror ? -1 : status, reply, jrw->data);
299 json_object_put(reply);
303 return DBUS_HANDLER_RESULT_HANDLED;
306 static DBusHandlerResult incoming_call(DBusConnection *connection, DBusMessage *message, struct jbus *jbus)
308 struct jservice *srv;
312 struct json_object *query;
314 /* search for the service */
315 if (!matchitf(jbus, message))
316 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
317 method = dbus_message_get_member(message);
319 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
320 srv = jbus->services;
321 while(srv != NULL && strcmp(method, srv->method))
324 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
326 /* handle the message */
327 jreq = malloc(sizeof * jreq);
329 return DBUS_HANDLER_RESULT_NEED_MEMORY;
330 jreq->request = dbus_message_ref(message);
331 jreq->connection = dbus_connection_ref(jbus->connection);
333 /* retrieve the string value */
334 if (!dbus_message_get_args(message, NULL, DBUS_TYPE_STRING, &str, DBUS_TYPE_INVALID))
335 return reply_invalid_request(jreq);
337 /* handling strings only */
338 srv->oncall_s(jreq, str);
341 /* handling json only */
342 if (!parse(jbus, str, &query))
343 return reply_invalid_request(jreq);
344 srv->oncall_j(jreq, query);
345 json_object_put(query);
347 return DBUS_HANDLER_RESULT_HANDLED;
350 static DBusHandlerResult incoming_signal(DBusConnection *connection, DBusMessage *message, struct jbus *jbus)
355 struct json_object *obj;
357 /* search for the service */
358 if (!matchitf(jbus, message))
359 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
360 name = dbus_message_get_member(message);
362 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
364 while(sig != NULL && strcmp(name, sig->name))
367 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
369 /* retrieve the string value */
370 if (dbus_message_get_args(message, NULL, DBUS_TYPE_STRING, &str, DBUS_TYPE_INVALID)) {
371 if (sig->onsignal_s) {
372 /* handling strings only */
373 sig->onsignal_s(str);
376 /* handling json only */
377 if (parse(jbus, str, &obj)) {
378 sig->onsignal_j(obj);
379 json_object_put(obj);
383 return DBUS_HANDLER_RESULT_HANDLED;
386 static DBusHandlerResult incoming(DBusConnection *connection, DBusMessage *message, void *data)
388 switch(dbus_message_get_type(message)) {
389 case DBUS_MESSAGE_TYPE_METHOD_CALL:
390 return incoming_call(connection, message, (struct jbus*)data);
391 case DBUS_MESSAGE_TYPE_METHOD_RETURN:
392 return incoming_resp(connection, message, (struct jbus*)data, 0);
393 case DBUS_MESSAGE_TYPE_ERROR:
394 return incoming_resp(connection, message, (struct jbus*)data, 1);
395 case DBUS_MESSAGE_TYPE_SIGNAL:
396 return incoming_signal(connection, message, (struct jbus*)data);
398 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
401 static void watchset(DBusWatch *watch, struct jbus *jbus)
406 flags = dbus_watch_get_flags(watch);
407 e = dbus_watch_get_enabled(watch);
408 wf = jbus->watchflags;
410 if (flags & DBUS_WATCH_READABLE)
412 if (flags & DBUS_WATCH_WRITABLE)
416 if (flags & DBUS_WATCH_READABLE)
418 if (flags & DBUS_WATCH_WRITABLE)
421 jbus->watchflags = wf;
424 static void watchdel(DBusWatch *watch, void *data)
426 struct jbus *jbus = data;
428 assert(jbus->watchnr > 0);
429 assert(jbus->watchfd == dbus_watch_get_unix_fd(watch));
433 static void watchtoggle(DBusWatch *watch, void *data)
435 struct jbus *jbus = data;
437 assert(jbus->watchnr > 0);
438 assert(jbus->watchfd == dbus_watch_get_unix_fd(watch));
439 watchset(watch, jbus);
442 static dbus_bool_t watchadd(DBusWatch *watch, void *data)
444 struct jbus *jbus = data;
445 if (jbus->watchnr == 0) {
446 jbus->watchfd = dbus_watch_get_unix_fd(watch);
447 jbus->watchflags = 0;
449 else if (jbus->watchfd != dbus_watch_get_unix_fd(watch))
452 watchset(watch, jbus);
456 /************************** MAIN FUNCTIONS *****************************************/
458 struct jbus *create_jbus_system(const char *path)
460 return create_jbus(path, 0);
463 struct jbus *create_jbus_session(const char *path)
465 return create_jbus(path, 1);
468 struct jbus *create_jbus(const char *path, int session)
473 /* create the context and connect */
474 jbus = calloc(1, sizeof * jbus);
480 jbus->tokener = json_tokener_new_ex(MAX_JSON_DEPTH);
481 if (jbus->tokener == NULL) {
485 jbus->path = strdup(path);
486 if (jbus->path == NULL) {
490 while(*path == '/') path++;
491 jbus->name = name = strdup(path);
502 while (name >= jbus->name && *name == '.')
510 jbus->connection = dbus_bus_get(session ? DBUS_BUS_SESSION : DBUS_BUS_SYSTEM, NULL);
511 if (jbus->connection == NULL
512 || !dbus_connection_add_filter(jbus->connection, incoming, jbus, NULL)
513 || !dbus_connection_set_watch_functions(jbus->connection, watchadd, watchdel, watchtoggle, jbus, NULL))
524 void jbus_addref(struct jbus *jbus)
529 void jbus_unref(struct jbus *jbus)
531 struct jservice *srv;
532 if (!--jbus->refcount) {
533 if (jbus->connection != NULL)
534 dbus_connection_unref(jbus->connection);
535 while((srv = jbus->services) != NULL) {
536 jbus->services = srv->next;
540 if (jbus->tokener != NULL)
541 json_tokener_free(jbus->tokener);
548 int jbus_reply_error_s(struct jreq *jreq, const char *error)
551 DBusMessage *message;
553 message = dbus_message_new_error(jreq->request, DBUS_ERROR_FAILED, error);
557 if (dbus_connection_send(jreq->connection, message, NULL))
559 dbus_message_unref(message);
565 int jbus_reply_error_j(struct jreq *jreq, struct json_object *reply)
567 const char *str = json_object_to_json_string(reply);
568 return str ? jbus_reply_error_s(jreq, str) : reply_out_of_memory(jreq);
571 int jbus_reply_s(struct jreq *jreq, const char *reply)
574 DBusMessage *message;
576 message = dbus_message_new_method_return(jreq->request);
578 return reply_out_of_memory(jreq);
580 if (!dbus_message_append_args(message, DBUS_TYPE_STRING, &reply, DBUS_TYPE_INVALID)) {
581 dbus_message_unref(message);
582 return reply_out_of_memory(jreq);
585 if (dbus_connection_send(jreq->connection, message, NULL))
587 dbus_message_unref(message);
592 int jbus_reply_j(struct jreq *jreq, struct json_object *reply)
594 const char *str = json_object_to_json_string(reply);
595 return str ? jbus_reply_s(jreq, str) : reply_out_of_memory(jreq);
598 int jbus_send_signal_s(struct jbus *jbus, const char *name, const char *content)
601 DBusMessage *message;
603 message = dbus_message_new_signal(jbus->path, jbus->name, name);
607 if (!dbus_message_set_sender(message, jbus->name)
608 || !dbus_message_append_args(message, DBUS_TYPE_STRING, &content, DBUS_TYPE_INVALID)) {
609 dbus_message_unref(message);
613 if (dbus_connection_send(jbus->connection, message, NULL))
615 dbus_message_unref(message);
623 int jbus_send_signal_j(struct jbus *jbus, const char *name, struct json_object *content)
625 const char *str = json_object_to_json_string(content);
630 return jbus_send_signal_s(jbus, name, str);
633 int jbus_add_service_s(struct jbus *jbus, const char *method, void (*oncall)(struct jreq *, const char *))
635 return add_service(jbus, method, oncall, NULL);
638 int jbus_add_service_j(struct jbus *jbus, const char *method, void (*oncall)(struct jreq *, struct json_object *))
640 return add_service(jbus, method, NULL, oncall);
643 int jbus_start_serving(struct jbus *jbus)
645 int status = dbus_bus_request_name(jbus->connection, jbus->name, DBUS_NAME_FLAG_DO_NOT_QUEUE, NULL);
647 case DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER:
648 case DBUS_REQUEST_NAME_REPLY_ALREADY_OWNER:
650 case DBUS_REQUEST_NAME_REPLY_EXISTS:
651 case DBUS_REQUEST_NAME_REPLY_IN_QUEUE:
658 int jbus_fill_pollfds(struct jbus **jbuses, int njbuses, struct pollfd *fds)
662 for (r = i = 0 ; i < njbuses ; i++) {
663 if (jbuses[i]->watchnr) {
664 fds[r].fd = jbuses[i]->watchfd;
665 fds[r].events = jbuses[i]->watchflags;
672 int jbus_dispatch_pollfds(struct jbus **jbuses, int njbuses, struct pollfd *fds, int maxcount)
675 DBusDispatchStatus sts;
677 for (r = n = i = 0 ; i < njbuses && n < maxcount ; i++) {
678 if (jbuses[i]->watchnr && fds[r].fd == jbuses[i]->watchfd) {
679 if (fds[r].revents) {
680 dbus_connection_read_write(jbuses[i]->connection, 0);
681 sts = dbus_connection_get_dispatch_status(jbuses[i]->connection);
682 while(sts == DBUS_DISPATCH_DATA_REMAINS && n < maxcount) {
683 sts = dbus_connection_dispatch(jbuses[i]->connection);
693 int jbus_dispatch_multiple(struct jbus **jbuses, int njbuses, int maxcount)
696 DBusDispatchStatus sts;
698 for (i = r = 0 ; i < njbuses && r < maxcount ; i++) {
699 dbus_connection_read_write(jbuses[i]->connection, 0);
700 sts = dbus_connection_get_dispatch_status(jbuses[i]->connection);
701 while(sts == DBUS_DISPATCH_DATA_REMAINS && r < maxcount) {
702 sts = dbus_connection_dispatch(jbuses[i]->connection);
709 int jbus_read_write_dispatch_multiple(struct jbus **jbuses, int njbuses, int toms, int maxcount)
714 if (njbuses < 0 || njbuses > 100) {
718 fds = alloca(njbuses * sizeof * fds);
721 r = jbus_dispatch_multiple(jbuses, njbuses, maxcount);
724 n = jbus_fill_pollfds(jbuses, njbuses, fds);
726 s = poll(fds, n, toms);
733 n = jbus_dispatch_pollfds(jbuses, njbuses, fds, maxcount - r);
734 return n >= 0 ? r + n : r ? r : n;
737 int jbus_read_write_dispatch(struct jbus *jbus, int toms)
739 int r = jbus_read_write_dispatch_multiple(&jbus, 1, toms, 1000);
740 return r < 0 ? r : 0;
743 int jbus_call_ss(struct jbus *jbus, const char *method, const char *query, void (*onresp)(int, const char*, void*), void *data)
745 return call(jbus, method, query, onresp, NULL, data);
748 int jbus_call_sj(struct jbus *jbus, const char *method, const char *query, void (*onresp)(int, struct json_object*, void*), void *data)
750 return call(jbus, method, query, NULL, onresp, data);
753 int jbus_call_js(struct jbus *jbus, const char *method, struct json_object *query, void (*onresp)(int, const char*, void*), void *data)
755 const char *str = json_object_to_json_string(query);
760 return call(jbus, method, str, onresp, NULL, data);
763 int jbus_call_jj(struct jbus *jbus, const char *method, struct json_object *query, void (*onresp)(int, struct json_object*, void*), void *data)
765 const char *str = json_object_to_json_string(query);
770 return call(jbus, method, str, NULL, onresp, data);
773 char *jbus_call_ss_sync(struct jbus *jbus, const char *method, const char *query)
775 struct respsync synchro;
776 synchro.value = NULL;
777 synchro.replied = jbus_call_ss(jbus, method, query, sync_of_replies, &synchro);
778 while (!synchro.replied && !jbus_read_write_dispatch(jbus, -1));
779 return synchro.value;
782 struct json_object *jbus_call_sj_sync(struct jbus *jbus, const char *method, const char *query)
784 struct json_object *obj;
785 char *str = jbus_call_ss_sync(jbus, method, query);
789 parse(jbus, str, &obj);
795 char *jbus_call_js_sync(struct jbus *jbus, const char *method, struct json_object *query)
797 const char *str = json_object_to_json_string(query);
802 return jbus_call_ss_sync(jbus, method, str);
805 struct json_object *jbus_call_jj_sync(struct jbus *jbus, const char *method, struct json_object *query)
807 const char *str = json_object_to_json_string(query);
812 return jbus_call_sj_sync(jbus, method, str);
815 int jbus_on_signal_s(struct jbus *jbus, const char *name, void (*onsig)(const char *))
817 return add_signal(jbus, name, onsig, NULL);
820 int jbus_on_signal_j(struct jbus *jbus, const char *name, void (*onsig)(struct json_object *))
822 return add_signal(jbus, name, NULL, onsig);
825 /************************** FEW LITTLE TESTS *****************************************/
831 void ping(struct jreq *jreq, struct json_object *request)
833 printf("ping(%s) -> %s\n",json_object_to_json_string(request),json_object_to_json_string(request));
834 jbus_reply_j(jreq, request);
835 json_object_put(request);
837 void incr(struct jreq *jreq, struct json_object *request)
839 static int counter = 0;
840 struct json_object *res = json_object_new_int(++counter);
841 printf("incr(%s) -> %s\n",json_object_to_json_string(request),json_object_to_json_string(res));
842 jbus_reply_j(jreq, res);
843 jbus_send_signal_j(jbus, "incremented", res);
844 json_object_put(res);
845 json_object_put(request);
850 jbus = create_jbus(1, "/bzh/iot/jdbus");
851 s1 = jbus_add_service_j(jbus, "ping", ping);
852 s2 = jbus_add_service_j(jbus, "incr", incr);
853 s3 = jbus_start_serving(jbus);
854 printf("started %d %d %d\n", s1, s2, s3);
855 while (!jbus_read_write_dispatch (jbus, -1));
862 void onresp(int status, struct json_object *response, void *data)
864 printf("resp: %d, %s, %s\n",status,(char*)data,json_object_to_json_string(response));
865 json_object_put(response);
867 void signaled(const char *data)
869 printf("signaled with {%s}\n", data);
874 jbus = create_jbus(1, "/bzh/iot/jdbus");
875 jbus_on_signal_s(jbus, "incremented", signaled);
877 jbus_call_sj(jbus, "ping", "{\"toto\":[1,2,3,4,true,\"toto\"]}", onresp, "ping");
878 jbus_call_sj(jbus, "incr", "{\"doit\":\"for-me\"}", onresp, "incr");
879 jbus_read_write_dispatch (jbus, 1);
881 printf("[[[%s]]]\n", jbus_call_ss_sync(jbus, "ping", "\"formidable!\""));
882 while (!jbus_read_write_dispatch (jbus, -1));