4 author: José Bollo <jose.bollo@iot.bzh>
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
28 #include <dbus/dbus.h>
30 #include "utils-jbus.h"
32 #define MAX_JSON_DEPTH 5
38 /* structure for handled requests */
40 DBusConnection *connection;
44 /* structure for recorded services */
46 struct jservice *next;
48 void (*oncall_s)(struct jreq *, const char *, void *);
49 void (*oncall_j)(struct jreq *, struct json_object *, void *);
53 /* structure for signal handlers */
57 void (*onsignal_s)(const char *, void *);
58 void (*onsignal_j)(struct json_object *, void *);
62 /* structure for recording asynchronous requests */
67 void (*onresp_s)(int, const char*, void *);
68 void (*onresp_j)(int, struct json_object*, void *);
71 /* structure for synchronous requests */
77 /* structure for handling either client or server jbus on dbus */
80 struct json_tokener *tokener;
81 struct jservice *services;
82 DBusConnection *connection;
83 struct jsignal *signals;
84 struct jrespw *waiters;
92 /*********************** STATIC COMMON METHODS *****************/
94 static inline void free_jreq(struct jreq *jreq)
96 dbus_message_unref(jreq->request);
97 dbus_connection_unref(jreq->connection);
101 static inline int reply_out_of_memory(struct jreq *jreq)
103 static const char out_of_memory[] = "out of memory";
104 jbus_reply_error_s(jreq, out_of_memory);
109 static inline int reply_invalid_request(struct jreq *jreq)
111 static const char invalid_request[] = "invalid request";
112 jbus_reply_error_s(jreq, invalid_request);
113 return DBUS_HANDLER_RESULT_HANDLED;
116 static int matchitf(struct jbus *jbus, DBusMessage *message)
118 const char *itf = dbus_message_get_interface(message);
119 return itf != NULL && !strcmp(itf, jbus->name);
122 static int add_service(
125 void (*oncall_s)(struct jreq*, const char*, void*),
126 void (*oncall_j)(struct jreq*, struct json_object*, void*),
130 struct jservice *srv;
133 srv = malloc(sizeof * srv);
138 srv->method = strdup(method);
144 /* record the service */
145 srv->oncall_s = oncall_s;
146 srv->oncall_j = oncall_j;
148 srv->next = jbus->services;
149 jbus->services = srv;
159 static int add_signal(
162 void (*onsignal_s)(const char*, void*),
163 void (*onsignal_j)(struct json_object*, void*),
170 /* record the signal */
171 if (jbus->signals == NULL) {
173 if (0 >= asprintf(&rule, "type='signal',interface='%s',path='%s'", jbus->name, jbus->path))
175 if (0 >= asprintf(&rule, "type='signal',sender='%s',interface='%s',path='%s'", jbus->name, jbus->name, jbus->path))
178 dbus_bus_add_match(jbus->connection, rule, NULL);
183 sig = malloc(sizeof * sig);
186 sig->name = strdup(name);
190 /* record the signal */
191 sig->onsignal_s = onsignal_s;
192 sig->onsignal_j = onsignal_j;
194 sig->next = jbus->signals;
210 void (*onresp_s)(int status, const char *response, void *data),
211 void (*onresp_j)(int status, struct json_object *response, void *data),
218 resp = malloc(sizeof * resp);
224 msg = dbus_message_new_method_call(jbus->name, jbus->path, jbus->name, method);
230 if (!dbus_message_append_args(msg, DBUS_TYPE_STRING, &query, DBUS_TYPE_INVALID)) {
235 if (!dbus_connection_send(jbus->connection, msg, &resp->serial)) {
239 dbus_message_unref(msg);
241 resp->onresp_s = onresp_s;
242 resp->onresp_j = onresp_j;
243 resp->next = jbus->waiters;
244 jbus->waiters = resp;
248 dbus_message_unref(msg);
255 static void sync_of_replies(int status, const char *value, void *data)
257 struct respsync *s = data;
258 s->value = status ? NULL : strdup(value ? value : "");
262 static int parse(struct jbus *jbus, const char *msg, struct json_object **obj)
264 json_tokener_reset(jbus->tokener);
265 *obj = json_tokener_parse_ex(jbus->tokener, msg, -1);
266 if (json_tokener_get_error(jbus->tokener) == json_tokener_success)
268 json_object_put(*obj);
273 static DBusHandlerResult incoming_resp(DBusConnection *connection, DBusMessage *message, struct jbus *jbus, int iserror)
277 struct jrespw *jrw, **prv;
278 struct json_object *reply;
279 dbus_uint32_t serial;
281 /* search for the waiter */
282 serial = dbus_message_get_reply_serial(message);
283 prv = &jbus->waiters;
284 while ((jrw = *prv) != NULL && jrw->serial != serial)
287 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
290 /* retrieve the string value */
291 if (dbus_message_get_args(message, NULL, DBUS_TYPE_STRING, &str, DBUS_TYPE_INVALID))
301 jrw->onresp_s(iserror ? -1 : status, str, jrw->data);
303 status = parse(jbus, str, &reply) - 1;
304 jrw->onresp_j(iserror ? -1 : status, reply, jrw->data);
305 json_object_put(reply);
309 return DBUS_HANDLER_RESULT_HANDLED;
312 static DBusHandlerResult incoming_call(DBusConnection *connection, DBusMessage *message, struct jbus *jbus)
314 struct jservice *srv;
318 struct json_object *query;
320 /* search for the service */
321 if (!matchitf(jbus, message))
322 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
323 method = dbus_message_get_member(message);
325 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
326 srv = jbus->services;
327 while(srv != NULL && strcmp(method, srv->method))
330 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
332 /* handle the message */
333 jreq = malloc(sizeof * jreq);
335 return DBUS_HANDLER_RESULT_NEED_MEMORY;
336 jreq->request = dbus_message_ref(message);
337 jreq->connection = dbus_connection_ref(jbus->connection);
339 /* retrieve the string value */
340 if (!dbus_message_get_args(message, NULL, DBUS_TYPE_STRING, &str, DBUS_TYPE_INVALID))
341 return reply_invalid_request(jreq);
343 /* handling strings only */
344 srv->oncall_s(jreq, str, srv->data);
347 /* handling json only */
348 if (!parse(jbus, str, &query))
349 return reply_invalid_request(jreq);
350 srv->oncall_j(jreq, query, srv->data);
351 json_object_put(query);
353 return DBUS_HANDLER_RESULT_HANDLED;
356 static DBusHandlerResult incoming_signal(DBusConnection *connection, DBusMessage *message, struct jbus *jbus)
361 struct json_object *obj;
363 /* search for the service */
364 if (!matchitf(jbus, message))
365 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
366 name = dbus_message_get_member(message);
368 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
370 while(sig != NULL && strcmp(name, sig->name))
373 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
375 /* retrieve the string value */
376 if (dbus_message_get_args(message, NULL, DBUS_TYPE_STRING, &str, DBUS_TYPE_INVALID)) {
377 if (sig->onsignal_s) {
378 /* handling strings only */
379 sig->onsignal_s(str, sig->data);
382 /* handling json only */
383 if (parse(jbus, str, &obj)) {
384 sig->onsignal_j(obj, sig->data);
385 json_object_put(obj);
389 return DBUS_HANDLER_RESULT_HANDLED;
392 static DBusHandlerResult incoming(DBusConnection *connection, DBusMessage *message, void *data)
394 switch(dbus_message_get_type(message)) {
395 case DBUS_MESSAGE_TYPE_METHOD_CALL:
396 return incoming_call(connection, message, (struct jbus*)data);
397 case DBUS_MESSAGE_TYPE_METHOD_RETURN:
398 return incoming_resp(connection, message, (struct jbus*)data, 0);
399 case DBUS_MESSAGE_TYPE_ERROR:
400 return incoming_resp(connection, message, (struct jbus*)data, 1);
401 case DBUS_MESSAGE_TYPE_SIGNAL:
402 return incoming_signal(connection, message, (struct jbus*)data);
404 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
407 static void watchset(DBusWatch *watch, struct jbus *jbus)
412 flags = dbus_watch_get_flags(watch);
413 e = dbus_watch_get_enabled(watch);
414 wf = jbus->watchflags;
416 if (flags & DBUS_WATCH_READABLE)
418 if (flags & DBUS_WATCH_WRITABLE)
422 if (flags & DBUS_WATCH_READABLE)
424 if (flags & DBUS_WATCH_WRITABLE)
427 jbus->watchflags = wf;
430 static void watchdel(DBusWatch *watch, void *data)
432 struct jbus *jbus = data;
434 assert(jbus->watchnr > 0);
435 assert(jbus->watchfd == dbus_watch_get_unix_fd(watch));
439 static void watchtoggle(DBusWatch *watch, void *data)
441 struct jbus *jbus = data;
443 assert(jbus->watchnr > 0);
444 assert(jbus->watchfd == dbus_watch_get_unix_fd(watch));
445 watchset(watch, jbus);
448 static dbus_bool_t watchadd(DBusWatch *watch, void *data)
450 struct jbus *jbus = data;
451 if (jbus->watchnr == 0) {
452 jbus->watchfd = dbus_watch_get_unix_fd(watch);
453 jbus->watchflags = 0;
455 else if (jbus->watchfd != dbus_watch_get_unix_fd(watch))
458 watchset(watch, jbus);
462 /************************** MAIN FUNCTIONS *****************************************/
464 struct jbus *create_jbus_system(const char *path)
466 return create_jbus(path, 0);
469 struct jbus *create_jbus_session(const char *path)
471 return create_jbus(path, 1);
474 struct jbus *create_jbus(const char *path, int session)
479 /* create the context and connect */
480 jbus = calloc(1, sizeof * jbus);
486 jbus->tokener = json_tokener_new_ex(MAX_JSON_DEPTH);
487 if (jbus->tokener == NULL) {
491 jbus->path = strdup(path);
492 if (jbus->path == NULL) {
496 while(*path == '/') path++;
497 jbus->name = name = strdup(path);
508 while (name >= jbus->name && *name == '.')
516 jbus->connection = dbus_bus_get(session ? DBUS_BUS_SESSION : DBUS_BUS_SYSTEM, NULL);
517 if (jbus->connection == NULL
518 || !dbus_connection_add_filter(jbus->connection, incoming, jbus, NULL)
519 || !dbus_connection_set_watch_functions(jbus->connection, watchadd, watchdel, watchtoggle, jbus, NULL))
530 void jbus_addref(struct jbus *jbus)
535 void jbus_unref(struct jbus *jbus)
537 struct jservice *srv;
538 if (!--jbus->refcount) {
539 if (jbus->connection != NULL)
540 dbus_connection_unref(jbus->connection);
541 while((srv = jbus->services) != NULL) {
542 jbus->services = srv->next;
546 if (jbus->tokener != NULL)
547 json_tokener_free(jbus->tokener);
554 int jbus_reply_error_s(struct jreq *jreq, const char *error)
557 DBusMessage *message;
559 message = dbus_message_new_error(jreq->request, DBUS_ERROR_FAILED, error);
563 if (dbus_connection_send(jreq->connection, message, NULL))
565 dbus_message_unref(message);
571 int jbus_reply_error_j(struct jreq *jreq, struct json_object *reply)
573 const char *str = json_object_to_json_string(reply);
574 return str ? jbus_reply_error_s(jreq, str) : reply_out_of_memory(jreq);
577 int jbus_reply_s(struct jreq *jreq, const char *reply)
580 DBusMessage *message;
582 message = dbus_message_new_method_return(jreq->request);
584 return reply_out_of_memory(jreq);
586 if (!dbus_message_append_args(message, DBUS_TYPE_STRING, &reply, DBUS_TYPE_INVALID)) {
587 dbus_message_unref(message);
588 return reply_out_of_memory(jreq);
591 if (dbus_connection_send(jreq->connection, message, NULL))
593 dbus_message_unref(message);
598 int jbus_reply_j(struct jreq *jreq, struct json_object *reply)
600 const char *str = json_object_to_json_string(reply);
601 return str ? jbus_reply_s(jreq, str) : reply_out_of_memory(jreq);
604 int jbus_send_signal_s(struct jbus *jbus, const char *name, const char *content)
607 DBusMessage *message;
609 message = dbus_message_new_signal(jbus->path, jbus->name, name);
613 if (!dbus_message_set_sender(message, jbus->name)
614 || !dbus_message_append_args(message, DBUS_TYPE_STRING, &content, DBUS_TYPE_INVALID)) {
615 dbus_message_unref(message);
619 if (dbus_connection_send(jbus->connection, message, NULL))
621 dbus_message_unref(message);
629 int jbus_send_signal_j(struct jbus *jbus, const char *name, struct json_object *content)
631 const char *str = json_object_to_json_string(content);
636 return jbus_send_signal_s(jbus, name, str);
639 int jbus_add_service_s(struct jbus *jbus, const char *method, void (*oncall)(struct jreq *, const char *, void *), void *data)
641 return add_service(jbus, method, oncall, NULL, data);
644 int jbus_add_service_j(struct jbus *jbus, const char *method, void (*oncall)(struct jreq *, struct json_object *, void *), void *data)
646 return add_service(jbus, method, NULL, oncall, data);
649 int jbus_start_serving(struct jbus *jbus)
651 int status = dbus_bus_request_name(jbus->connection, jbus->name, DBUS_NAME_FLAG_DO_NOT_QUEUE, NULL);
653 case DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER:
654 case DBUS_REQUEST_NAME_REPLY_ALREADY_OWNER:
656 case DBUS_REQUEST_NAME_REPLY_EXISTS:
657 case DBUS_REQUEST_NAME_REPLY_IN_QUEUE:
664 int jbus_fill_pollfds(struct jbus **jbuses, int njbuses, struct pollfd *fds)
668 for (r = i = 0 ; i < njbuses ; i++) {
669 if (jbuses[i]->watchnr) {
670 fds[r].fd = jbuses[i]->watchfd;
671 fds[r].events = jbuses[i]->watchflags;
678 int jbus_dispatch_pollfds(struct jbus **jbuses, int njbuses, struct pollfd *fds, int maxcount)
681 DBusDispatchStatus sts;
683 for (r = n = i = 0 ; i < njbuses && n < maxcount ; i++) {
684 if (jbuses[i]->watchnr && fds[r].fd == jbuses[i]->watchfd) {
685 if (fds[r].revents) {
686 dbus_connection_read_write(jbuses[i]->connection, 0);
687 sts = dbus_connection_get_dispatch_status(jbuses[i]->connection);
688 while(sts == DBUS_DISPATCH_DATA_REMAINS && n < maxcount) {
689 sts = dbus_connection_dispatch(jbuses[i]->connection);
699 int jbus_dispatch_multiple(struct jbus **jbuses, int njbuses, int maxcount)
702 DBusDispatchStatus sts;
704 for (i = r = 0 ; i < njbuses && r < maxcount ; i++) {
705 dbus_connection_read_write(jbuses[i]->connection, 0);
706 sts = dbus_connection_get_dispatch_status(jbuses[i]->connection);
707 while(sts == DBUS_DISPATCH_DATA_REMAINS && r < maxcount) {
708 sts = dbus_connection_dispatch(jbuses[i]->connection);
715 int jbus_read_write_dispatch_multiple(struct jbus **jbuses, int njbuses, int toms, int maxcount)
720 if (njbuses < 0 || njbuses > 100) {
724 fds = alloca(njbuses * sizeof * fds);
727 r = jbus_dispatch_multiple(jbuses, njbuses, maxcount);
730 n = jbus_fill_pollfds(jbuses, njbuses, fds);
732 s = poll(fds, n, toms);
739 n = jbus_dispatch_pollfds(jbuses, njbuses, fds, maxcount - r);
740 return n >= 0 ? r + n : r ? r : n;
743 int jbus_read_write_dispatch(struct jbus *jbus, int toms)
745 int r = jbus_read_write_dispatch_multiple(&jbus, 1, toms, 1000);
746 return r < 0 ? r : 0;
749 int jbus_call_ss(struct jbus *jbus, const char *method, const char *query, void (*onresp)(int, const char*, void*), void *data)
751 return call(jbus, method, query, onresp, NULL, data);
754 int jbus_call_sj(struct jbus *jbus, const char *method, const char *query, void (*onresp)(int, struct json_object*, void*), void *data)
756 return call(jbus, method, query, NULL, onresp, data);
759 int jbus_call_js(struct jbus *jbus, const char *method, struct json_object *query, void (*onresp)(int, const char*, void*), void *data)
761 const char *str = json_object_to_json_string(query);
766 return call(jbus, method, str, onresp, NULL, data);
769 int jbus_call_jj(struct jbus *jbus, const char *method, struct json_object *query, void (*onresp)(int, struct json_object*, void*), void *data)
771 const char *str = json_object_to_json_string(query);
776 return call(jbus, method, str, NULL, onresp, data);
779 char *jbus_call_ss_sync(struct jbus *jbus, const char *method, const char *query)
781 struct respsync synchro;
782 synchro.value = NULL;
783 synchro.replied = jbus_call_ss(jbus, method, query, sync_of_replies, &synchro);
784 while (!synchro.replied && !jbus_read_write_dispatch(jbus, -1));
785 return synchro.value;
788 struct json_object *jbus_call_sj_sync(struct jbus *jbus, const char *method, const char *query)
790 struct json_object *obj;
791 char *str = jbus_call_ss_sync(jbus, method, query);
795 parse(jbus, str, &obj);
801 char *jbus_call_js_sync(struct jbus *jbus, const char *method, struct json_object *query)
803 const char *str = json_object_to_json_string(query);
808 return jbus_call_ss_sync(jbus, method, str);
811 struct json_object *jbus_call_jj_sync(struct jbus *jbus, const char *method, struct json_object *query)
813 const char *str = json_object_to_json_string(query);
818 return jbus_call_sj_sync(jbus, method, str);
821 int jbus_on_signal_s(struct jbus *jbus, const char *name, void (*onsig)(const char *, void *), void *data)
823 return add_signal(jbus, name, onsig, NULL, data);
826 int jbus_on_signal_j(struct jbus *jbus, const char *name, void (*onsig)(struct json_object *, void *), void *data)
828 return add_signal(jbus, name, NULL, onsig, data);
831 /************************** FEW LITTLE TESTS *****************************************/
837 void ping(struct jreq *jreq, struct json_object *request, void *unused)
839 printf("ping(%s) -> %s\n",json_object_to_json_string(request),json_object_to_json_string(request));
840 jbus_reply_j(jreq, request);
841 json_object_put(request);
843 void incr(struct jreq *jreq, struct json_object *request, void *unused)
845 static int counter = 0;
846 struct json_object *res = json_object_new_int(++counter);
847 printf("incr(%s) -> %s\n",json_object_to_json_string(request),json_object_to_json_string(res));
848 jbus_reply_j(jreq, res);
849 jbus_send_signal_j(jbus, "incremented", res);
850 json_object_put(res);
851 json_object_put(request);
856 jbus = create_jbus(1, "/bzh/iot/jdbus");
857 s1 = jbus_add_service_j(jbus, "ping", ping, NULL);
858 s2 = jbus_add_service_j(jbus, "incr", incr, NULL);
859 s3 = jbus_start_serving(jbus);
860 printf("started %d %d %d\n", s1, s2, s3);
861 while (!jbus_read_write_dispatch (jbus, -1));
868 void onresp(int status, struct json_object *response, void *data)
870 printf("resp: %d, %s, %s\n",status,(char*)data,json_object_to_json_string(response));
871 json_object_put(response);
873 void signaled(const char *data)
875 printf("signaled with {%s}\n", data);
880 jbus = create_jbus(1, "/bzh/iot/jdbus");
881 jbus_on_signal_s(jbus, "incremented", signaled);
883 jbus_call_sj(jbus, "ping", "{\"toto\":[1,2,3,4,true,\"toto\"]}", onresp, "ping");
884 jbus_call_sj(jbus, "incr", "{\"doit\":\"for-me\"}", onresp, "incr");
885 jbus_read_write_dispatch (jbus, 1);
887 printf("[[[%s]]]\n", jbus_call_ss_sync(jbus, "ping", "\"formidable!\""));
888 while (!jbus_read_write_dispatch (jbus, -1));