4 author: José Bollo <jose.bollo@iot.bzh>
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
28 #include <dbus/dbus.h>
30 #include "utils-jbus.h"
36 /* structure for handled requests */
38 DBusConnection *connection;
42 /* structure for recorded services */
44 struct jservice *next;
46 void (*oncall_s)(struct jreq *, const char *);
47 void (*oncall_j)(struct jreq *, struct json_object *);
50 /* structure for signal handlers */
54 void (*onsignal_s)(const char *);
55 void (*onsignal_j)(struct json_object *);
58 /* structure for recording asynchronous requests */
63 void (*onresp_s)(int, const char*, void *);
64 void (*onresp_j)(int, struct json_object*, void *);
67 /* structure for synchronous requests */
73 /* structure for handling either client or server jbus on dbus */
76 struct jservice *services;
77 DBusConnection *connection;
78 struct jsignal *signals;
79 struct jrespw *waiters;
87 /*********************** STATIC COMMON METHODS *****************/
89 static inline void free_jreq(struct jreq *jreq)
91 dbus_message_unref(jreq->request);
92 dbus_connection_unref(jreq->connection);
96 static inline int reply_out_of_memory(struct jreq *jreq)
98 static const char out_of_memory[] = "out of memory";
99 jbus_reply_error_s(jreq, out_of_memory);
104 static inline int reply_invalid_request(struct jreq *jreq)
106 static const char invalid_request[] = "invalid request";
107 jbus_reply_error_s(jreq, invalid_request);
108 return DBUS_HANDLER_RESULT_HANDLED;
111 static int matchitf(struct jbus *jbus, DBusMessage *message)
113 const char *itf = dbus_message_get_interface(message);
114 return itf != NULL && !strcmp(itf, jbus->name);
117 static int add_service(
120 void (*oncall_s)(struct jreq*, const char*),
121 void (*oncall_j)(struct jreq*, struct json_object*)
124 struct jservice *srv;
127 srv = malloc(sizeof * srv);
132 srv->method = strdup(method);
138 /* record the service */
139 srv->oncall_s = oncall_s;
140 srv->oncall_j = oncall_j;
141 srv->next = jbus->services;
142 jbus->services = srv;
152 static int add_signal(
155 void (*onsignal_s)(const char*),
156 void (*onsignal_j)(struct json_object*)
162 /* record the signal */
163 if (jbus->signals == NULL) {
165 if (0 >= asprintf(&rule, "type='signal',interface='%s',path='%s'", jbus->name, jbus->path))
167 if (0 >= asprintf(&rule, "type='signal',sender='%s',interface='%s',path='%s'", jbus->name, jbus->name, jbus->path))
170 dbus_bus_add_match(jbus->connection, rule, NULL);
175 sig = malloc(sizeof * sig);
178 sig->name = strdup(name);
182 /* record the signal */
183 sig->onsignal_s = onsignal_s;
184 sig->onsignal_j = onsignal_j;
185 sig->next = jbus->signals;
201 void (*onresp_s)(int status, const char *response, void *data),
202 void (*onresp_j)(int status, struct json_object *response, void *data),
209 resp = malloc(sizeof * resp);
215 msg = dbus_message_new_method_call(jbus->name, jbus->path, jbus->name, method);
221 if (!dbus_message_append_args(msg, DBUS_TYPE_STRING, &query, DBUS_TYPE_INVALID)) {
226 if (!dbus_connection_send(jbus->connection, msg, &resp->serial)) {
230 dbus_message_unref(msg);
232 resp->onresp_s = onresp_s;
233 resp->onresp_j = onresp_j;
234 resp->next = jbus->waiters;
235 jbus->waiters = resp;
239 dbus_message_unref(msg);
246 static void sync_of_replies(int status, const char *value, void *data)
248 struct respsync *s = data;
249 s->value = status ? NULL : strdup(value ? value : "");
253 static DBusHandlerResult incoming_resp(DBusConnection *connection, DBusMessage *message, struct jbus *jbus, int iserror)
257 struct jrespw *jrw, **prv;
258 struct json_object *reply;
259 dbus_uint32_t serial;
261 /* search for the waiter */
262 serial = dbus_message_get_reply_serial(message);
263 prv = &jbus->waiters;
264 while ((jrw = *prv) != NULL && jrw->serial != serial)
267 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
270 /* retrieve the string value */
271 if (dbus_message_get_args(message, NULL, DBUS_TYPE_STRING, &str, DBUS_TYPE_INVALID))
281 jrw->onresp_s(iserror ? -1 : status, str, jrw->data);
283 reply = json_tokener_parse(str);
284 status = reply ? 0 : -1;
285 jrw->onresp_j(iserror ? -1 : status, reply, jrw->data);
286 json_object_put(reply);
290 return DBUS_HANDLER_RESULT_HANDLED;
293 static DBusHandlerResult incoming_call(DBusConnection *connection, DBusMessage *message, struct jbus *jbus)
295 struct jservice *srv;
299 struct json_object *query;
301 /* search for the service */
302 if (!matchitf(jbus, message))
303 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
304 method = dbus_message_get_member(message);
306 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
307 srv = jbus->services;
308 while(srv != NULL && strcmp(method, srv->method))
311 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
313 /* handle the message */
314 jreq = malloc(sizeof * jreq);
316 return DBUS_HANDLER_RESULT_NEED_MEMORY;
317 jreq->request = dbus_message_ref(message);
318 jreq->connection = dbus_connection_ref(jbus->connection);
320 /* retrieve the string value */
321 if (!dbus_message_get_args(message, NULL, DBUS_TYPE_STRING, &str, DBUS_TYPE_INVALID))
322 return reply_invalid_request(jreq);
324 /* handling strings only */
325 srv->oncall_s(jreq, str);
328 /* handling json only */
329 query = json_tokener_parse(str);
331 return reply_invalid_request(jreq);
332 srv->oncall_j(jreq, query);
333 json_object_put(query);
335 return DBUS_HANDLER_RESULT_HANDLED;
338 static DBusHandlerResult incoming_signal(DBusConnection *connection, DBusMessage *message, struct jbus *jbus)
343 struct json_object *obj;
345 /* search for the service */
346 if (!matchitf(jbus, message))
347 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
348 name = dbus_message_get_member(message);
350 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
352 while(sig != NULL && strcmp(name, sig->name))
355 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
357 /* retrieve the string value */
358 if (dbus_message_get_args(message, NULL, DBUS_TYPE_STRING, &str, DBUS_TYPE_INVALID)) {
359 if (sig->onsignal_s) {
360 /* handling strings only */
361 sig->onsignal_s(str);
364 /* handling json only */
365 obj = json_tokener_parse(str);
367 sig->onsignal_j(obj);
368 json_object_put(obj);
372 return DBUS_HANDLER_RESULT_HANDLED;
375 static DBusHandlerResult incoming(DBusConnection *connection, DBusMessage *message, void *data)
377 switch(dbus_message_get_type(message)) {
378 case DBUS_MESSAGE_TYPE_METHOD_CALL:
379 return incoming_call(connection, message, (struct jbus*)data);
380 case DBUS_MESSAGE_TYPE_METHOD_RETURN:
381 return incoming_resp(connection, message, (struct jbus*)data, 0);
382 case DBUS_MESSAGE_TYPE_ERROR:
383 return incoming_resp(connection, message, (struct jbus*)data, 1);
384 case DBUS_MESSAGE_TYPE_SIGNAL:
385 return incoming_signal(connection, message, (struct jbus*)data);
387 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
390 static void watchset(DBusWatch *watch, struct jbus *jbus)
395 flags = dbus_watch_get_flags(watch);
396 e = dbus_watch_get_enabled(watch);
397 wf = jbus->watchflags;
399 if (flags & DBUS_WATCH_READABLE)
401 if (flags & DBUS_WATCH_WRITABLE)
405 if (flags & DBUS_WATCH_READABLE)
407 if (flags & DBUS_WATCH_WRITABLE)
410 jbus->watchflags = wf;
413 static void watchdel(DBusWatch *watch, void *data)
415 struct jbus *jbus = data;
417 assert(jbus->watchnr > 0);
418 assert(jbus->watchfd == dbus_watch_get_unix_fd(watch));
422 static void watchtoggle(DBusWatch *watch, void *data)
424 struct jbus *jbus = data;
426 assert(jbus->watchnr > 0);
427 assert(jbus->watchfd == dbus_watch_get_unix_fd(watch));
428 watchset(watch, jbus);
431 static dbus_bool_t watchadd(DBusWatch *watch, void *data)
433 struct jbus *jbus = data;
434 if (jbus->watchnr == 0) {
435 jbus->watchfd = dbus_watch_get_unix_fd(watch);
436 jbus->watchflags = 0;
438 else if (jbus->watchfd != dbus_watch_get_unix_fd(watch))
441 watchset(watch, jbus);
445 /************************** MAIN FUNCTIONS *****************************************/
447 struct jbus *create_jbus(int session, const char *path)
452 /* create the context and connect */
453 jbus = calloc(1, sizeof * jbus);
459 jbus->path = strdup(path);
460 if (jbus->path == NULL) {
464 while(*path == '/') path++;
465 jbus->name = name = strdup(path);
476 while (name >= jbus->name && *name == '.')
484 jbus->connection = dbus_bus_get(session ? DBUS_BUS_SESSION : DBUS_BUS_SYSTEM, NULL);
485 if (jbus->connection == NULL
486 || !dbus_connection_add_filter(jbus->connection, incoming, jbus, NULL)
487 || !dbus_connection_set_watch_functions(jbus->connection, watchadd, watchdel, watchtoggle, jbus, NULL))
498 void jbus_addref(struct jbus *jbus)
503 void jbus_unref(struct jbus *jbus)
505 struct jservice *srv;
506 if (!--jbus->refcount) {
507 if (jbus->connection != NULL)
508 dbus_connection_unref(jbus->connection);
509 while((srv = jbus->services) != NULL) {
510 jbus->services = srv->next;
520 int jbus_reply_error_s(struct jreq *jreq, const char *error)
523 DBusMessage *message;
525 message = dbus_message_new_error(jreq->request, DBUS_ERROR_FAILED, error);
529 if (dbus_connection_send(jreq->connection, message, NULL))
531 dbus_message_unref(message);
537 int jbus_reply_error_j(struct jreq *jreq, struct json_object *reply)
539 const char *str = json_object_to_json_string(reply);
540 return str ? jbus_reply_error_s(jreq, str) : reply_out_of_memory(jreq);
543 int jbus_reply_s(struct jreq *jreq, const char *reply)
546 DBusMessage *message;
548 message = dbus_message_new_method_return(jreq->request);
550 return reply_out_of_memory(jreq);
552 if (!dbus_message_append_args(message, DBUS_TYPE_STRING, &reply, DBUS_TYPE_INVALID)) {
553 dbus_message_unref(message);
554 return reply_out_of_memory(jreq);
557 if (dbus_connection_send(jreq->connection, message, NULL))
559 dbus_message_unref(message);
564 int jbus_reply_j(struct jreq *jreq, struct json_object *reply)
566 const char *str = json_object_to_json_string(reply);
567 return str ? jbus_reply_s(jreq, str) : reply_out_of_memory(jreq);
570 int jbus_send_signal_s(struct jbus *jbus, const char *name, const char *content)
573 DBusMessage *message;
575 message = dbus_message_new_signal(jbus->path, jbus->name, name);
579 if (!dbus_message_set_sender(message, jbus->name)
580 || !dbus_message_append_args(message, DBUS_TYPE_STRING, &content, DBUS_TYPE_INVALID)) {
581 dbus_message_unref(message);
585 if (dbus_connection_send(jbus->connection, message, NULL))
587 dbus_message_unref(message);
595 int jbus_send_signal_j(struct jbus *jbus, const char *name, struct json_object *content)
597 const char *str = json_object_to_json_string(content);
602 return jbus_send_signal_s(jbus, name, str);
605 int jbus_add_service_s(struct jbus *jbus, const char *method, void (*oncall)(struct jreq *, const char *))
607 return add_service(jbus, method, oncall, NULL);
610 int jbus_add_service_j(struct jbus *jbus, const char *method, void (*oncall)(struct jreq *, struct json_object *))
612 return add_service(jbus, method, NULL, oncall);
615 int jbus_start_serving(struct jbus *jbus)
617 int status = dbus_bus_request_name(jbus->connection, jbus->name, DBUS_NAME_FLAG_DO_NOT_QUEUE, NULL);
619 case DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER:
620 case DBUS_REQUEST_NAME_REPLY_ALREADY_OWNER:
622 case DBUS_REQUEST_NAME_REPLY_EXISTS:
623 case DBUS_REQUEST_NAME_REPLY_IN_QUEUE:
630 int jbus_fill_pollfds(struct jbus **jbuses, int njbuses, struct pollfd *fds)
634 for (r = i = 0 ; i < njbuses ; i++) {
635 if (jbuses[i]->watchnr) {
636 fds[r].fd = jbuses[i]->watchfd;
637 fds[r].events = jbuses[i]->watchflags;
644 int jbus_dispatch_pollfds(struct jbus **jbuses, int njbuses, struct pollfd *fds, int maxcount)
647 DBusDispatchStatus sts;
649 for (r = n = i = 0 ; i < njbuses && n < maxcount ; i++) {
650 if (jbuses[i]->watchnr && fds[r].fd == jbuses[i]->watchfd) {
651 if (fds[r].revents) {
652 dbus_connection_read_write(jbuses[i]->connection, 0);
653 sts = dbus_connection_get_dispatch_status(jbuses[i]->connection);
654 while(sts == DBUS_DISPATCH_DATA_REMAINS && n < maxcount) {
655 sts = dbus_connection_dispatch(jbuses[i]->connection);
665 int jbus_dispatch_multiple(struct jbus **jbuses, int njbuses, int maxcount)
668 DBusDispatchStatus sts;
670 for (i = r = 0 ; i < njbuses && r < maxcount ; i++) {
671 dbus_connection_read_write(jbuses[i]->connection, 0);
672 while(sts == DBUS_DISPATCH_DATA_REMAINS && r < maxcount) {
673 sts = dbus_connection_dispatch(jbuses[i]->connection);
680 int jbus_read_write_dispatch_multiple(struct jbus **jbuses, int njbuses, int toms, int maxcount)
685 if (njbuses < 0 || njbuses > 100) {
689 fds = alloca(njbuses * sizeof * fds);
692 r = jbus_dispatch_multiple(jbuses, njbuses, maxcount);
693 n = jbus_fill_pollfds(jbuses, njbuses, fds);
695 s = poll(fds, n, toms);
702 n = jbus_dispatch_pollfds(jbuses, njbuses, fds, maxcount - r);
703 return n >= 0 ? r + n : r ? r : n;
706 int jbus_read_write_dispatch(struct jbus *jbus, int toms)
708 int r = jbus_read_write_dispatch_multiple(&jbus, 1, toms, 1000);
709 return r < 0 ? r : 0;
712 int jbus_call_ss(struct jbus *jbus, const char *method, const char *query, void (*onresp)(int, const char*, void*), void *data)
714 return call(jbus, method, query, onresp, NULL, data);
717 int jbus_call_sj(struct jbus *jbus, const char *method, const char *query, void (*onresp)(int, struct json_object*, void*), void *data)
719 return call(jbus, method, query, NULL, onresp, data);
722 int jbus_call_js(struct jbus *jbus, const char *method, struct json_object *query, void (*onresp)(int, const char*, void*), void *data)
724 const char *str = json_object_to_json_string(query);
729 return call(jbus, method, str, onresp, NULL, data);
732 int jbus_call_jj(struct jbus *jbus, const char *method, struct json_object *query, void (*onresp)(int, struct json_object*, void*), void *data)
734 const char *str = json_object_to_json_string(query);
739 return call(jbus, method, str, NULL, onresp, data);
742 char *jbus_call_ss_sync(struct jbus *jbus, const char *method, const char *query)
744 struct respsync synchro;
745 synchro.value = NULL;
746 synchro.replied = jbus_call_ss(jbus, method, query, sync_of_replies, &synchro);
747 while (!synchro.replied && !jbus_read_write_dispatch(jbus, -1));
748 return synchro.value;
751 struct json_object *jbus_call_sj_sync(struct jbus *jbus, const char *method, const char *query)
753 struct json_object *obj;
754 char *str = jbus_call_ss_sync(jbus, method, query);
758 obj = json_tokener_parse(str);
764 char *jbus_call_js_sync(struct jbus *jbus, const char *method, struct json_object *query)
766 const char *str = json_object_to_json_string(query);
771 return jbus_call_ss_sync(jbus, method, str);
774 struct json_object *jbus_call_jj_sync(struct jbus *jbus, const char *method, struct json_object *query)
776 const char *str = json_object_to_json_string(query);
781 return jbus_call_sj_sync(jbus, method, str);
784 int jbus_on_signal_s(struct jbus *jbus, const char *name, void (*onsig)(const char *))
786 return add_signal(jbus, name, onsig, NULL);
789 int jbus_on_signal_j(struct jbus *jbus, const char *name, void (*onsig)(struct json_object *))
791 return add_signal(jbus, name, NULL, onsig);
794 /************************** FEW LITTLE TESTS *****************************************/
800 void ping(struct jreq *jreq, struct json_object *request)
802 printf("ping(%s) -> %s\n",json_object_to_json_string(request),json_object_to_json_string(request));
803 jbus_reply_j(jreq, request);
804 json_object_put(request);
806 void incr(struct jreq *jreq, struct json_object *request)
808 static int counter = 0;
809 struct json_object *res = json_object_new_int(++counter);
810 printf("incr(%s) -> %s\n",json_object_to_json_string(request),json_object_to_json_string(res));
811 jbus_reply_j(jreq, res);
812 jbus_send_signal_j(jbus, "incremented", res);
813 json_object_put(res);
814 json_object_put(request);
819 jbus = create_jbus(1, "/bzh/iot/jdbus");
820 s1 = jbus_add_service_j(jbus, "ping", ping);
821 s2 = jbus_add_service_j(jbus, "incr", incr);
822 s3 = jbus_start_serving(jbus);
823 printf("started %d %d %d\n", s1, s2, s3);
824 while (!jbus_read_write_dispatch (jbus, -1));
831 void onresp(int status, struct json_object *response, void *data)
833 printf("resp: %d, %s, %s\n",status,(char*)data,json_object_to_json_string(response));
834 json_object_put(response);
836 void signaled(const char *data)
838 printf("signaled with {%s}\n", data);
843 jbus = create_jbus(1, "/bzh/iot/jdbus");
844 jbus_on_signal_s(jbus, "incremented", signaled);
846 jbus_call_sj(jbus, "ping", "{\"toto\":[1,2,3,4,true,\"toto\"]}", onresp, "ping");
847 jbus_call_sj(jbus, "incr", "{\"doit\":\"for-me\"}", onresp, "incr");
848 jbus_read_write_dispatch (jbus, 1);
850 printf("[[[%s]]]\n", jbus_call_ss_sync(jbus, "ping", "\"formidable!\""));
851 while (!jbus_read_write_dispatch (jbus, -1));