4 author: José Bollo <jose.bollo@iot.bzh>
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
28 #include <dbus/dbus.h>
30 #include "utils-jbus.h"
32 #define MAX_JSON_DEPTH 5
38 /* structure for handled requests */
40 DBusConnection *connection;
44 /* structure for recorded services */
46 struct jservice *next;
48 void (*oncall_s)(struct jreq *, const char *, void *);
49 void (*oncall_j)(struct jreq *, struct json_object *, void *);
53 /* structure for signal handlers */
57 void (*onsignal_s)(const char *, void *);
58 void (*onsignal_j)(struct json_object *, void *);
62 /* structure for recording asynchronous requests */
67 void (*onresp_s)(int, const char*, void *);
68 void (*onresp_j)(int, struct json_object*, void *);
71 /* structure for synchronous requests */
77 /* structure for handling either client or server jbus on dbus */
80 struct json_tokener *tokener;
81 struct jservice *services;
82 DBusConnection *connection;
83 struct jsignal *signals;
84 struct jrespw *waiters;
92 /*********************** STATIC COMMON METHODS *****************/
94 static inline void free_jreq(struct jreq *jreq)
96 dbus_message_unref(jreq->request);
97 dbus_connection_unref(jreq->connection);
101 static inline int reply_out_of_memory(struct jreq *jreq)
103 static const char out_of_memory[] = "out of memory";
104 jbus_reply_error_s(jreq, out_of_memory);
109 static inline int reply_invalid_request(struct jreq *jreq)
111 static const char invalid_request[] = "invalid request";
112 jbus_reply_error_s(jreq, invalid_request);
113 return DBUS_HANDLER_RESULT_HANDLED;
116 static int matchitf(struct jbus *jbus, DBusMessage *message)
118 const char *itf = dbus_message_get_interface(message);
119 return itf != NULL && !strcmp(itf, jbus->name);
122 static int add_service(
125 void (*oncall_s)(struct jreq*, const char*, void*),
126 void (*oncall_j)(struct jreq*, struct json_object*, void*),
130 struct jservice *srv;
133 srv = malloc(sizeof * srv);
138 srv->method = strdup(method);
144 /* record the service */
145 srv->oncall_s = oncall_s;
146 srv->oncall_j = oncall_j;
148 srv->next = jbus->services;
149 jbus->services = srv;
159 static int add_signal(
162 void (*onsignal_s)(const char*, void*),
163 void (*onsignal_j)(struct json_object*, void*),
170 /* record the signal */
171 if (jbus->signals == NULL) {
173 if (0 >= asprintf(&rule, "type='signal',interface='%s',path='%s'", jbus->name, jbus->path))
175 if (0 >= asprintf(&rule, "type='signal',sender='%s',interface='%s',path='%s'", jbus->name, jbus->name, jbus->path))
178 dbus_bus_add_match(jbus->connection, rule, NULL);
183 sig = malloc(sizeof * sig);
186 sig->name = strdup(name);
190 /* record the signal */
191 sig->onsignal_s = onsignal_s;
192 sig->onsignal_j = onsignal_j;
194 sig->next = jbus->signals;
210 void (*onresp_s)(int status, const char *response, void *data),
211 void (*onresp_j)(int status, struct json_object *response, void *data),
218 resp = malloc(sizeof * resp);
224 msg = dbus_message_new_method_call(jbus->name, jbus->path, jbus->name, method);
230 if (!dbus_message_append_args(msg, DBUS_TYPE_STRING, &query, DBUS_TYPE_INVALID)) {
235 if (!dbus_connection_send(jbus->connection, msg, &resp->serial)) {
239 dbus_message_unref(msg);
241 resp->onresp_s = onresp_s;
242 resp->onresp_j = onresp_j;
243 resp->next = jbus->waiters;
244 jbus->waiters = resp;
248 dbus_message_unref(msg);
255 static void sync_of_replies(int status, const char *value, void *data)
257 struct respsync *s = data;
258 s->value = status ? NULL : strdup(value ? value : "");
262 static int parse(struct jbus *jbus, const char *msg, struct json_object **obj)
264 json_tokener_reset(jbus->tokener);
265 *obj = json_tokener_parse_ex(jbus->tokener, msg, -1);
266 if (json_tokener_get_error(jbus->tokener) == json_tokener_success)
268 json_object_put(*obj);
273 static DBusHandlerResult incoming_resp(DBusConnection *connection, DBusMessage *message, struct jbus *jbus, int iserror)
277 struct jrespw *jrw, **prv;
278 struct json_object *reply;
279 dbus_uint32_t serial;
281 /* search for the waiter */
282 serial = dbus_message_get_reply_serial(message);
283 prv = &jbus->waiters;
284 while ((jrw = *prv) != NULL && jrw->serial != serial)
287 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
290 /* retrieve the string value */
291 if (dbus_message_get_args(message, NULL, DBUS_TYPE_STRING, &str, DBUS_TYPE_INVALID))
301 jrw->onresp_s(iserror ? -1 : status, str, jrw->data);
303 status = parse(jbus, str, &reply) - 1;
304 jrw->onresp_j(iserror ? -1 : status, reply, jrw->data);
305 json_object_put(reply);
309 return DBUS_HANDLER_RESULT_HANDLED;
312 static DBusHandlerResult incoming_call(DBusConnection *connection, DBusMessage *message, struct jbus *jbus)
314 struct jservice *srv;
318 struct json_object *query;
320 /* search for the service */
321 if (!matchitf(jbus, message))
322 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
323 method = dbus_message_get_member(message);
325 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
326 srv = jbus->services;
327 while(srv != NULL && strcmp(method, srv->method))
330 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
332 /* handle the message */
333 jreq = malloc(sizeof * jreq);
335 return DBUS_HANDLER_RESULT_NEED_MEMORY;
336 jreq->request = dbus_message_ref(message);
337 jreq->connection = dbus_connection_ref(jbus->connection);
339 /* retrieve the string value */
340 if (!dbus_message_get_args(message, NULL, DBUS_TYPE_STRING, &str, DBUS_TYPE_INVALID))
341 return reply_invalid_request(jreq);
343 /* handling strings only */
344 srv->oncall_s(jreq, str, srv->data);
347 /* handling json only */
348 if (!parse(jbus, str, &query))
349 return reply_invalid_request(jreq);
350 srv->oncall_j(jreq, query, srv->data);
351 json_object_put(query);
353 return DBUS_HANDLER_RESULT_HANDLED;
356 static DBusHandlerResult incoming_signal(DBusConnection *connection, DBusMessage *message, struct jbus *jbus)
361 struct json_object *obj;
363 /* search for the service */
364 if (!matchitf(jbus, message))
365 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
366 name = dbus_message_get_member(message);
368 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
370 while(sig != NULL && strcmp(name, sig->name))
373 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
375 /* retrieve the string value */
376 if (dbus_message_get_args(message, NULL, DBUS_TYPE_STRING, &str, DBUS_TYPE_INVALID)) {
377 if (sig->onsignal_s) {
378 /* handling strings only */
379 sig->onsignal_s(str, sig->data);
382 /* handling json only */
383 if (parse(jbus, str, &obj)) {
384 sig->onsignal_j(obj, sig->data);
385 json_object_put(obj);
389 return DBUS_HANDLER_RESULT_HANDLED;
392 static DBusHandlerResult incoming(DBusConnection *connection, DBusMessage *message, void *data)
394 switch(dbus_message_get_type(message)) {
395 case DBUS_MESSAGE_TYPE_METHOD_CALL:
396 return incoming_call(connection, message, (struct jbus*)data);
397 case DBUS_MESSAGE_TYPE_METHOD_RETURN:
398 return incoming_resp(connection, message, (struct jbus*)data, 0);
399 case DBUS_MESSAGE_TYPE_ERROR:
400 return incoming_resp(connection, message, (struct jbus*)data, 1);
401 case DBUS_MESSAGE_TYPE_SIGNAL:
402 return incoming_signal(connection, message, (struct jbus*)data);
404 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
407 static void watchset(DBusWatch *watch, struct jbus *jbus)
412 flags = dbus_watch_get_flags(watch);
413 wf = jbus->watchflags;
414 if (dbus_watch_get_enabled(watch)) {
415 if (flags & DBUS_WATCH_READABLE)
417 if (flags & DBUS_WATCH_WRITABLE)
421 if (flags & DBUS_WATCH_READABLE)
423 if (flags & DBUS_WATCH_WRITABLE)
426 jbus->watchflags = wf;
429 static void watchdel(DBusWatch *watch, void *data)
431 struct jbus *jbus = data;
433 assert(jbus->watchnr > 0);
434 assert(jbus->watchfd == dbus_watch_get_unix_fd(watch));
438 static void watchtoggle(DBusWatch *watch, void *data)
440 struct jbus *jbus = data;
442 assert(jbus->watchnr > 0);
443 assert(jbus->watchfd == dbus_watch_get_unix_fd(watch));
444 watchset(watch, jbus);
447 static dbus_bool_t watchadd(DBusWatch *watch, void *data)
449 struct jbus *jbus = data;
450 if (jbus->watchnr == 0) {
451 jbus->watchfd = dbus_watch_get_unix_fd(watch);
452 jbus->watchflags = 0;
454 else if (jbus->watchfd != dbus_watch_get_unix_fd(watch))
457 watchset(watch, jbus);
461 /************************** MAIN FUNCTIONS *****************************************/
463 struct jbus *create_jbus_system(const char *path)
465 return create_jbus(path, 0);
468 struct jbus *create_jbus_session(const char *path)
470 return create_jbus(path, 1);
473 struct jbus *create_jbus(const char *path, int session)
478 /* create the context and connect */
479 jbus = calloc(1, sizeof * jbus);
485 jbus->tokener = json_tokener_new_ex(MAX_JSON_DEPTH);
486 if (jbus->tokener == NULL) {
490 jbus->path = strdup(path);
491 if (jbus->path == NULL) {
495 while(*path == '/') path++;
496 jbus->name = name = strdup(path);
507 while (name >= jbus->name && *name == '.')
515 jbus->connection = dbus_bus_get(session ? DBUS_BUS_SESSION : DBUS_BUS_SYSTEM, NULL);
516 if (jbus->connection == NULL
517 || !dbus_connection_add_filter(jbus->connection, incoming, jbus, NULL)
518 || !dbus_connection_set_watch_functions(jbus->connection, watchadd, watchdel, watchtoggle, jbus, NULL))
529 void jbus_addref(struct jbus *jbus)
534 void jbus_unref(struct jbus *jbus)
536 struct jservice *srv;
537 if (!--jbus->refcount) {
538 if (jbus->connection != NULL)
539 dbus_connection_unref(jbus->connection);
540 while((srv = jbus->services) != NULL) {
541 jbus->services = srv->next;
545 if (jbus->tokener != NULL)
546 json_tokener_free(jbus->tokener);
553 int jbus_reply_error_s(struct jreq *jreq, const char *error)
556 DBusMessage *message;
558 message = dbus_message_new_error(jreq->request, DBUS_ERROR_FAILED, error);
562 if (dbus_connection_send(jreq->connection, message, NULL))
564 dbus_message_unref(message);
570 int jbus_reply_error_j(struct jreq *jreq, struct json_object *reply)
572 const char *str = json_object_to_json_string(reply);
573 return str ? jbus_reply_error_s(jreq, str) : reply_out_of_memory(jreq);
576 int jbus_reply_s(struct jreq *jreq, const char *reply)
579 DBusMessage *message;
581 message = dbus_message_new_method_return(jreq->request);
583 return reply_out_of_memory(jreq);
585 if (!dbus_message_append_args(message, DBUS_TYPE_STRING, &reply, DBUS_TYPE_INVALID)) {
586 dbus_message_unref(message);
587 return reply_out_of_memory(jreq);
590 if (dbus_connection_send(jreq->connection, message, NULL))
592 dbus_message_unref(message);
597 int jbus_reply_j(struct jreq *jreq, struct json_object *reply)
599 const char *str = json_object_to_json_string(reply);
600 return str ? jbus_reply_s(jreq, str) : reply_out_of_memory(jreq);
603 int jbus_send_signal_s(struct jbus *jbus, const char *name, const char *content)
606 DBusMessage *message;
608 message = dbus_message_new_signal(jbus->path, jbus->name, name);
612 if (!dbus_message_set_sender(message, jbus->name)
613 || !dbus_message_append_args(message, DBUS_TYPE_STRING, &content, DBUS_TYPE_INVALID)) {
614 dbus_message_unref(message);
618 if (dbus_connection_send(jbus->connection, message, NULL))
620 dbus_message_unref(message);
628 int jbus_send_signal_j(struct jbus *jbus, const char *name, struct json_object *content)
630 const char *str = json_object_to_json_string(content);
635 return jbus_send_signal_s(jbus, name, str);
638 int jbus_add_service_s(struct jbus *jbus, const char *method, void (*oncall)(struct jreq *, const char *, void *), void *data)
640 return add_service(jbus, method, oncall, NULL, data);
643 int jbus_add_service_j(struct jbus *jbus, const char *method, void (*oncall)(struct jreq *, struct json_object *, void *), void *data)
645 return add_service(jbus, method, NULL, oncall, data);
648 int jbus_start_serving(struct jbus *jbus)
650 int status = dbus_bus_request_name(jbus->connection, jbus->name, DBUS_NAME_FLAG_DO_NOT_QUEUE, NULL);
652 case DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER:
653 case DBUS_REQUEST_NAME_REPLY_ALREADY_OWNER:
655 case DBUS_REQUEST_NAME_REPLY_EXISTS:
656 case DBUS_REQUEST_NAME_REPLY_IN_QUEUE:
663 int jbus_fill_pollfds(struct jbus **jbuses, int njbuses, struct pollfd *fds)
667 for (r = i = 0 ; i < njbuses ; i++) {
668 if (jbuses[i]->watchnr) {
669 fds[r].fd = jbuses[i]->watchfd;
670 fds[r].events = jbuses[i]->watchflags;
677 int jbus_dispatch_pollfds(struct jbus **jbuses, int njbuses, struct pollfd *fds, int maxcount)
680 DBusDispatchStatus sts;
682 for (r = n = i = 0 ; i < njbuses && n < maxcount ; i++) {
683 if (jbuses[i]->watchnr && fds[r].fd == jbuses[i]->watchfd) {
684 if (fds[r].revents) {
685 dbus_connection_read_write(jbuses[i]->connection, 0);
686 sts = dbus_connection_get_dispatch_status(jbuses[i]->connection);
687 while(sts == DBUS_DISPATCH_DATA_REMAINS && n < maxcount) {
688 sts = dbus_connection_dispatch(jbuses[i]->connection);
698 int jbus_dispatch_multiple(struct jbus **jbuses, int njbuses, int maxcount)
701 DBusDispatchStatus sts;
703 for (i = r = 0 ; i < njbuses && r < maxcount ; i++) {
704 dbus_connection_read_write(jbuses[i]->connection, 0);
705 sts = dbus_connection_get_dispatch_status(jbuses[i]->connection);
706 while(sts == DBUS_DISPATCH_DATA_REMAINS && r < maxcount) {
707 sts = dbus_connection_dispatch(jbuses[i]->connection);
714 int jbus_read_write_dispatch_multiple(struct jbus **jbuses, int njbuses, int toms, int maxcount)
719 if (njbuses < 0 || njbuses > 100) {
723 fds = alloca((unsigned)njbuses * sizeof * fds);
726 r = jbus_dispatch_multiple(jbuses, njbuses, maxcount);
729 n = jbus_fill_pollfds(jbuses, njbuses, fds);
731 s = poll(fds, (nfds_t)n, toms);
738 n = jbus_dispatch_pollfds(jbuses, njbuses, fds, maxcount - r);
739 return n >= 0 ? r + n : r ? r : n;
742 int jbus_read_write_dispatch(struct jbus *jbus, int toms)
744 int r = jbus_read_write_dispatch_multiple(&jbus, 1, toms, 1000);
745 return r < 0 ? r : 0;
748 int jbus_call_ss(struct jbus *jbus, const char *method, const char *query, void (*onresp)(int, const char*, void*), void *data)
750 return call(jbus, method, query, onresp, NULL, data);
753 int jbus_call_sj(struct jbus *jbus, const char *method, const char *query, void (*onresp)(int, struct json_object*, void*), void *data)
755 return call(jbus, method, query, NULL, onresp, data);
758 int jbus_call_js(struct jbus *jbus, const char *method, struct json_object *query, void (*onresp)(int, const char*, void*), void *data)
760 const char *str = json_object_to_json_string(query);
765 return call(jbus, method, str, onresp, NULL, data);
768 int jbus_call_jj(struct jbus *jbus, const char *method, struct json_object *query, void (*onresp)(int, struct json_object*, void*), void *data)
770 const char *str = json_object_to_json_string(query);
775 return call(jbus, method, str, NULL, onresp, data);
778 char *jbus_call_ss_sync(struct jbus *jbus, const char *method, const char *query)
780 struct respsync synchro;
781 synchro.value = NULL;
782 synchro.replied = jbus_call_ss(jbus, method, query, sync_of_replies, &synchro);
783 while (!synchro.replied && !jbus_read_write_dispatch(jbus, -1));
784 return synchro.value;
787 struct json_object *jbus_call_sj_sync(struct jbus *jbus, const char *method, const char *query)
789 struct json_object *obj;
790 char *str = jbus_call_ss_sync(jbus, method, query);
794 parse(jbus, str, &obj);
800 char *jbus_call_js_sync(struct jbus *jbus, const char *method, struct json_object *query)
802 const char *str = json_object_to_json_string(query);
807 return jbus_call_ss_sync(jbus, method, str);
810 struct json_object *jbus_call_jj_sync(struct jbus *jbus, const char *method, struct json_object *query)
812 const char *str = json_object_to_json_string(query);
817 return jbus_call_sj_sync(jbus, method, str);
820 int jbus_on_signal_s(struct jbus *jbus, const char *name, void (*onsig)(const char *, void *), void *data)
822 return add_signal(jbus, name, onsig, NULL, data);
825 int jbus_on_signal_j(struct jbus *jbus, const char *name, void (*onsig)(struct json_object *, void *), void *data)
827 return add_signal(jbus, name, NULL, onsig, data);
830 /************************** FEW LITTLE TESTS *****************************************/
836 void ping(struct jreq *jreq, struct json_object *request, void *unused)
838 printf("ping(%s) -> %s\n",json_object_to_json_string(request),json_object_to_json_string(request));
839 jbus_reply_j(jreq, request);
840 json_object_put(request);
842 void incr(struct jreq *jreq, struct json_object *request, void *unused)
844 static int counter = 0;
845 struct json_object *res = json_object_new_int(++counter);
846 printf("incr(%s) -> %s\n",json_object_to_json_string(request),json_object_to_json_string(res));
847 jbus_reply_j(jreq, res);
848 jbus_send_signal_j(jbus, "incremented", res);
849 json_object_put(res);
850 json_object_put(request);
855 jbus = create_jbus(1, "/bzh/iot/jdbus");
856 s1 = jbus_add_service_j(jbus, "ping", ping, NULL);
857 s2 = jbus_add_service_j(jbus, "incr", incr, NULL);
858 s3 = jbus_start_serving(jbus);
859 printf("started %d %d %d\n", s1, s2, s3);
860 while (!jbus_read_write_dispatch (jbus, -1));
867 void onresp(int status, struct json_object *response, void *data)
869 printf("resp: %d, %s, %s\n",status,(char*)data,json_object_to_json_string(response));
870 json_object_put(response);
872 void signaled(const char *data)
874 printf("signaled with {%s}\n", data);
879 jbus = create_jbus(1, "/bzh/iot/jdbus");
880 jbus_on_signal_s(jbus, "incremented", signaled);
882 jbus_call_sj(jbus, "ping", "{\"toto\":[1,2,3,4,true,\"toto\"]}", onresp, "ping");
883 jbus_call_sj(jbus, "incr", "{\"doit\":\"for-me\"}", onresp, "incr");
884 jbus_read_write_dispatch (jbus, 1);
886 printf("[[[%s]]]\n", jbus_call_ss_sync(jbus, "ping", "\"formidable!\""));
887 while (!jbus_read_write_dispatch (jbus, -1));