4 author: José Bollo <jose.bollo@iot.bzh>
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
28 #include <dbus/dbus.h>
30 #include "utils-jbus.h"
36 /* structure for handled requests */
38 DBusConnection *connection;
42 /* structure for recorded services */
44 struct jservice *next;
46 void (*oncall_s)(struct jreq *, const char *);
47 void (*oncall_j)(struct jreq *, struct json_object *);
50 /* structure for signal handlers */
54 void (*onsignal_s)(const char *);
55 void (*onsignal_j)(struct json_object *);
58 /* structure for recording asynchronous requests */
63 void (*onresp_s)(int, const char*, void *);
64 void (*onresp_j)(int, struct json_object*, void *);
67 /* structure for synchronous requests */
73 /* structure for handling either client or server jbus on dbus */
76 struct jservice *services;
77 DBusConnection *connection;
78 struct jsignal *signals;
79 struct jrespw *waiters;
87 /*********************** STATIC COMMON METHODS *****************/
89 static inline void free_jreq(struct jreq *jreq)
91 dbus_message_unref(jreq->request);
92 dbus_connection_unref(jreq->connection);
96 static inline int reply_out_of_memory(struct jreq *jreq)
98 static const char out_of_memory[] = "out of memory";
99 jbus_reply_error_s(jreq, out_of_memory);
104 static inline int reply_invalid_request(struct jreq *jreq)
106 static const char invalid_request[] = "invalid request";
107 jbus_reply_error_s(jreq, invalid_request);
108 return DBUS_HANDLER_RESULT_HANDLED;
111 static int matchitf(struct jbus *jbus, DBusMessage *message)
113 const char *itf = dbus_message_get_interface(message);
114 return itf != NULL && !strcmp(itf, jbus->name);
117 static int add_service(
120 void (*oncall_s)(struct jreq*, const char*),
121 void (*oncall_j)(struct jreq*, struct json_object*)
124 struct jservice *srv;
127 srv = malloc(sizeof * srv);
132 srv->method = strdup(method);
138 /* record the service */
139 srv->oncall_s = oncall_s;
140 srv->oncall_j = oncall_j;
141 srv->next = jbus->services;
142 jbus->services = srv;
152 static int add_signal(
155 void (*onsignal_s)(const char*),
156 void (*onsignal_j)(struct json_object*)
162 /* record the signal */
163 if (jbus->signals == NULL) {
165 if (0 >= asprintf(&rule, "type='signal',interface='%s',path='%s'", jbus->name, jbus->path))
167 if (0 >= asprintf(&rule, "type='signal',sender='%s',interface='%s',path='%s'", jbus->name, jbus->name, jbus->path))
170 dbus_bus_add_match(jbus->connection, rule, NULL);
175 sig = malloc(sizeof * sig);
178 sig->name = strdup(name);
182 /* record the signal */
183 sig->onsignal_s = onsignal_s;
184 sig->onsignal_j = onsignal_j;
185 sig->next = jbus->signals;
201 void (*onresp_s)(int status, const char *response, void *data),
202 void (*onresp_j)(int status, struct json_object *response, void *data),
209 resp = malloc(sizeof * resp);
215 msg = dbus_message_new_method_call(jbus->name, jbus->path, jbus->name, method);
221 if (!dbus_message_append_args(msg, DBUS_TYPE_STRING, &query, DBUS_TYPE_INVALID)) {
226 if (!dbus_connection_send(jbus->connection, msg, &resp->serial)) {
230 dbus_message_unref(msg);
232 resp->onresp_s = onresp_s;
233 resp->onresp_j = onresp_j;
234 resp->next = jbus->waiters;
235 jbus->waiters = resp;
239 dbus_message_unref(msg);
246 static void sync_of_replies(int status, const char *value, void *data)
248 struct respsync *s = data;
249 s->value = status ? NULL : strdup(value ? value : "");
253 static DBusHandlerResult incoming_resp(DBusConnection *connection, DBusMessage *message, struct jbus *jbus, int iserror)
257 struct jrespw *jrw, **prv;
258 struct json_object *reply;
259 dbus_uint32_t serial;
261 /* search for the waiter */
262 serial = dbus_message_get_reply_serial(message);
263 prv = &jbus->waiters;
264 while ((jrw = *prv) != NULL && jrw->serial != serial)
267 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
270 /* retrieve the string value */
271 if (!dbus_message_get_args(message, NULL, DBUS_TYPE_STRING, &str, DBUS_TYPE_INVALID)) {
279 jrw->onresp_s(iserror ? -1 : status, str, jrw->data);
281 reply = json_tokener_parse(str);
282 status = reply ? 0 : -1;
283 jrw->onresp_j(iserror ? -1 : status, reply, jrw->data);
284 json_object_put(reply);
288 return DBUS_HANDLER_RESULT_HANDLED;
291 static DBusHandlerResult incoming_call(DBusConnection *connection, DBusMessage *message, struct jbus *jbus)
293 struct jservice *srv;
297 struct json_object *query;
299 /* search for the service */
300 if (!matchitf(jbus, message))
301 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
302 method = dbus_message_get_member(message);
304 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
305 srv = jbus->services;
306 while(srv != NULL && strcmp(method, srv->method))
309 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
311 /* handle the message */
312 jreq = malloc(sizeof * jreq);
314 return DBUS_HANDLER_RESULT_NEED_MEMORY;
315 jreq->request = dbus_message_ref(message);
316 jreq->connection = dbus_connection_ref(jbus->connection);
318 /* retrieve the string value */
319 if (!dbus_message_get_args(message, NULL, DBUS_TYPE_STRING, &str, DBUS_TYPE_INVALID))
320 return reply_invalid_request(jreq);
322 /* handling strings only */
323 srv->oncall_s(jreq, str);
326 /* handling json only */
327 query = json_tokener_parse(str);
329 return reply_invalid_request(jreq);
330 srv->oncall_j(jreq, query);
331 json_object_put(query);
333 return DBUS_HANDLER_RESULT_HANDLED;
336 static DBusHandlerResult incoming_signal(DBusConnection *connection, DBusMessage *message, struct jbus *jbus)
341 struct json_object *obj;
343 /* search for the service */
344 if (!matchitf(jbus, message))
345 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
346 name = dbus_message_get_member(message);
348 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
350 while(sig != NULL && strcmp(name, sig->name))
353 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
355 /* retrieve the string value */
356 if (dbus_message_get_args(message, NULL, DBUS_TYPE_STRING, &str, DBUS_TYPE_INVALID)) {
357 if (sig->onsignal_s) {
358 /* handling strings only */
359 sig->onsignal_s(str);
362 /* handling json only */
363 obj = json_tokener_parse(str);
365 sig->onsignal_j(obj);
366 json_object_put(obj);
370 return DBUS_HANDLER_RESULT_HANDLED;
373 static DBusHandlerResult incoming(DBusConnection *connection, DBusMessage *message, void *data)
375 switch(dbus_message_get_type(message)) {
376 case DBUS_MESSAGE_TYPE_METHOD_CALL:
377 return incoming_call(connection, message, (struct jbus*)data);
378 case DBUS_MESSAGE_TYPE_METHOD_RETURN:
379 return incoming_resp(connection, message, (struct jbus*)data, 0);
380 case DBUS_MESSAGE_TYPE_ERROR:
381 return incoming_resp(connection, message, (struct jbus*)data, 1);
382 case DBUS_MESSAGE_TYPE_SIGNAL:
383 return incoming_signal(connection, message, (struct jbus*)data);
385 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
388 static void watchset(DBusWatch *watch, struct jbus *jbus)
393 flags = dbus_watch_get_flags(watch);
394 e = dbus_watch_get_enabled(watch);
395 wf = jbus->watchflags;
397 if (flags & DBUS_WATCH_READABLE)
399 if (flags & DBUS_WATCH_WRITABLE)
403 if (flags & DBUS_WATCH_READABLE)
405 if (flags & DBUS_WATCH_WRITABLE)
408 jbus->watchflags = wf;
411 static void watchdel(DBusWatch *watch, void *data)
413 struct jbus *jbus = data;
415 assert(jbus->watchnr > 0);
416 assert(jbus->watchfd == dbus_watch_get_unix_fd(watch));
420 static void watchtoggle(DBusWatch *watch, void *data)
422 struct jbus *jbus = data;
424 assert(jbus->watchnr > 0);
425 assert(jbus->watchfd == dbus_watch_get_unix_fd(watch));
426 watchset(watch, jbus);
429 static dbus_bool_t watchadd(DBusWatch *watch, void *data)
431 struct jbus *jbus = data;
432 if (jbus->watchnr == 0) {
433 jbus->watchfd = dbus_watch_get_unix_fd(watch);
434 jbus->watchflags = 0;
436 else if (jbus->watchfd != dbus_watch_get_unix_fd(watch))
439 watchset(watch, jbus);
443 /************************** MAIN FUNCTIONS *****************************************/
445 struct jbus *create_jbus(int session, const char *path)
450 /* create the context and connect */
451 jbus = calloc(1, sizeof * jbus);
457 jbus->path = strdup(path);
458 if (jbus->path == NULL) {
462 while(*path == '/') path++;
463 jbus->name = name = strdup(path);
474 while (name >= jbus->name && *name == '.')
482 jbus->connection = dbus_bus_get(session ? DBUS_BUS_SESSION : DBUS_BUS_SYSTEM, NULL);
483 if (jbus->connection == NULL
484 || !dbus_connection_add_filter(jbus->connection, incoming, jbus, NULL)
485 || !dbus_connection_set_watch_functions(jbus->connection, watchadd, watchdel, watchtoggle, jbus, NULL))
496 void jbus_addref(struct jbus *jbus)
501 void jbus_unref(struct jbus *jbus)
503 struct jservice *srv;
504 if (!--jbus->refcount) {
505 if (jbus->connection != NULL)
506 dbus_connection_unref(jbus->connection);
507 while((srv = jbus->services) != NULL) {
508 jbus->services = srv->next;
518 int jbus_reply_error_s(struct jreq *jreq, const char *error)
521 DBusMessage *message;
523 message = dbus_message_new_error(jreq->request, DBUS_ERROR_FAILED, error);
527 if (dbus_connection_send(jreq->connection, message, NULL))
529 dbus_message_unref(message);
535 int jbus_reply_error_j(struct jreq *jreq, struct json_object *reply)
537 const char *str = json_object_to_json_string(reply);
538 return str ? jbus_reply_error_s(jreq, str) : reply_out_of_memory(jreq);
541 int jbus_reply_s(struct jreq *jreq, const char *reply)
544 DBusMessage *message;
546 message = dbus_message_new_method_return(jreq->request);
548 return reply_out_of_memory(jreq);
550 if (!dbus_message_append_args(message, DBUS_TYPE_STRING, &reply, DBUS_TYPE_INVALID)) {
551 dbus_message_unref(message);
552 return reply_out_of_memory(jreq);
555 if (dbus_connection_send(jreq->connection, message, NULL))
557 dbus_message_unref(message);
562 int jbus_reply_j(struct jreq *jreq, struct json_object *reply)
564 const char *str = json_object_to_json_string(reply);
565 return str ? jbus_reply_s(jreq, str) : reply_out_of_memory(jreq);
568 int jbus_send_signal_s(struct jbus *jbus, const char *name, const char *content)
571 DBusMessage *message;
573 message = dbus_message_new_signal(jbus->path, jbus->name, name);
577 if (!dbus_message_set_sender(message, jbus->name)
578 || !dbus_message_append_args(message, DBUS_TYPE_STRING, &content, DBUS_TYPE_INVALID)) {
579 dbus_message_unref(message);
583 if (dbus_connection_send(jbus->connection, message, NULL))
585 dbus_message_unref(message);
593 int jbus_send_signal_j(struct jbus *jbus, const char *name, struct json_object *content)
595 const char *str = json_object_to_json_string(content);
600 return jbus_send_signal_s(jbus, name, str);
603 int jbus_add_service_s(struct jbus *jbus, const char *method, void (*oncall)(struct jreq *, const char *))
605 return add_service(jbus, method, oncall, NULL);
608 int jbus_add_service_j(struct jbus *jbus, const char *method, void (*oncall)(struct jreq *, struct json_object *))
610 return add_service(jbus, method, NULL, oncall);
613 int jbus_start_serving(struct jbus *jbus)
615 int status = dbus_bus_request_name(jbus->connection, jbus->name, DBUS_NAME_FLAG_DO_NOT_QUEUE, NULL);
617 case DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER:
618 case DBUS_REQUEST_NAME_REPLY_ALREADY_OWNER:
620 case DBUS_REQUEST_NAME_REPLY_EXISTS:
621 case DBUS_REQUEST_NAME_REPLY_IN_QUEUE:
628 int jbus_fill_pollfds(struct jbus **jbuses, int njbuses, struct pollfd *fds)
632 for (r = i = 0 ; i < njbuses ; i++) {
633 if (jbuses[i]->watchnr) {
634 fds[r].fd = jbuses[i]->watchfd;
635 fds[r].events = jbuses[i]->watchflags;
642 int jbus_dispatch_pollfds(struct jbus **jbuses, int njbuses, struct pollfd *fds, int maxcount)
645 DBusDispatchStatus sts;
647 for (r = n = i = 0 ; i < njbuses && n < maxcount ; i++) {
648 if (jbuses[i]->watchnr && fds[r].fd == jbuses[i]->watchfd) {
649 if (fds[r].revents) {
650 dbus_connection_read_write(jbuses[i]->connection, 0);
651 sts = dbus_connection_get_dispatch_status(jbuses[i]->connection);
652 while(sts == DBUS_DISPATCH_DATA_REMAINS && n < maxcount) {
653 sts = dbus_connection_dispatch(jbuses[i]->connection);
663 int jbus_dispatch_multiple(struct jbus **jbuses, int njbuses, int maxcount)
666 DBusDispatchStatus sts;
668 for (i = r = 0 ; i < njbuses && r < maxcount ; i++) {
669 dbus_connection_read_write(jbuses[i]->connection, 0);
670 while(sts == DBUS_DISPATCH_DATA_REMAINS && r < maxcount) {
671 sts = dbus_connection_dispatch(jbuses[i]->connection);
678 int jbus_read_write_dispatch_multiple(struct jbus **jbuses, int njbuses, int toms, int maxcount)
683 if (njbuses < 0 || njbuses > 100) {
687 fds = alloca(njbuses * sizeof * fds);
690 r = jbus_dispatch_multiple(jbuses, njbuses, maxcount);
691 n = jbus_fill_pollfds(jbuses, njbuses, fds);
692 s = poll(fds, n, toms);
695 n = jbus_dispatch_pollfds(jbuses, njbuses, fds, maxcount - r);
696 return n >= 0 ? r + n : r ? r : n;
699 int jbus_read_write_dispatch(struct jbus *jbus, int toms)
701 int r = jbus_read_write_dispatch_multiple(&jbus, 1, toms, 1000);
702 return r < 0 ? r : 0;
705 int jbus_call_ss(struct jbus *jbus, const char *method, const char *query, void (*onresp)(int, const char*, void*), void *data)
707 return call(jbus, method, query, onresp, NULL, data);
710 int jbus_call_sj(struct jbus *jbus, const char *method, const char *query, void (*onresp)(int, struct json_object*, void*), void *data)
712 return call(jbus, method, query, NULL, onresp, data);
715 int jbus_call_js(struct jbus *jbus, const char *method, struct json_object *query, void (*onresp)(int, const char*, void*), void *data)
717 const char *str = json_object_to_json_string(query);
722 return call(jbus, method, str, onresp, NULL, data);
725 int jbus_call_jj(struct jbus *jbus, const char *method, struct json_object *query, void (*onresp)(int, struct json_object*, void*), void *data)
727 const char *str = json_object_to_json_string(query);
732 return call(jbus, method, str, NULL, onresp, data);
735 char *jbus_call_ss_sync(struct jbus *jbus, const char *method, const char *query)
737 struct respsync synchro;
738 synchro.value = NULL;
739 synchro.replied = jbus_call_ss(jbus, method, query, sync_of_replies, &synchro);
740 while (!synchro.replied && !jbus_read_write_dispatch(jbus, -1));
741 return synchro.value;
744 struct json_object *jbus_call_sj_sync(struct jbus *jbus, const char *method, const char *query)
746 struct json_object *obj;
747 char *str = jbus_call_ss_sync(jbus, method, query);
751 obj = json_tokener_parse(str);
757 char *jbus_call_js_sync(struct jbus *jbus, const char *method, struct json_object *query)
759 const char *str = json_object_to_json_string(query);
764 return jbus_call_ss_sync(jbus, method, str);
767 struct json_object *jbus_call_jj_sync(struct jbus *jbus, const char *method, struct json_object *query)
769 const char *str = json_object_to_json_string(query);
774 return jbus_call_sj_sync(jbus, method, str);
777 int jbus_on_signal_s(struct jbus *jbus, const char *name, void (*onsig)(const char *))
779 return add_signal(jbus, name, onsig, NULL);
782 int jbus_on_signal_j(struct jbus *jbus, const char *name, void (*onsig)(struct json_object *))
784 return add_signal(jbus, name, NULL, onsig);
787 /************************** FEW LITTLE TESTS *****************************************/
793 void ping(struct jreq *jreq, struct json_object *request)
795 printf("ping(%s) -> %s\n",json_object_to_json_string(request),json_object_to_json_string(request));
796 jbus_reply_j(jreq, request);
797 json_object_put(request);
799 void incr(struct jreq *jreq, struct json_object *request)
801 static int counter = 0;
802 struct json_object *res = json_object_new_int(++counter);
803 printf("incr(%s) -> %s\n",json_object_to_json_string(request),json_object_to_json_string(res));
804 jbus_reply_j(jreq, res);
805 jbus_send_signal_j(jbus, "incremented", res);
806 json_object_put(res);
807 json_object_put(request);
812 jbus = create_jbus(1, "/bzh/iot/jdbus");
813 s1 = jbus_add_service_j(jbus, "ping", ping);
814 s2 = jbus_add_service_j(jbus, "incr", incr);
815 s3 = jbus_start_serving(jbus);
816 printf("started %d %d %d\n", s1, s2, s3);
817 while (!jbus_read_write_dispatch (jbus, -1));
824 void onresp(int status, struct json_object *response, void *data)
826 printf("resp: %d, %s, %s\n",status,(char*)data,json_object_to_json_string(response));
827 json_object_put(response);
829 void signaled(const char *data)
831 printf("signaled with {%s}\n", data);
836 jbus = create_jbus(1, "/bzh/iot/jdbus");
837 jbus_on_signal_s(jbus, "incremented", signaled);
839 jbus_call_sj(jbus, "ping", "{\"toto\":[1,2,3,4,true,\"toto\"]}", onresp, "ping");
840 jbus_call_sj(jbus, "incr", "{\"doit\":\"for-me\"}", onresp, "incr");
841 jbus_read_write_dispatch (jbus, 1);
843 printf("[[[%s]]]\n", jbus_call_ss_sync(jbus, "ping", "\"formidable!\""));
844 while (!jbus_read_write_dispatch (jbus, -1));