4 author: José Bollo <jose.bollo@iot.bzh>
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
28 #include <dbus/dbus.h>
30 #include "utils-jbus.h"
36 /* structure for handled requests */
38 DBusConnection *connection;
42 /* structure for recorded services */
44 struct jservice *next;
46 void (*oncall_s)(struct jreq *, const char *);
47 void (*oncall_j)(struct jreq *, struct json_object *);
50 /* structure for signal handlers */
54 void (*onsignal_s)(const char *);
55 void (*onsignal_j)(struct json_object *);
58 /* structure for recording asynchronous requests */
63 void (*onresp_s)(int, const char*, void *);
64 void (*onresp_j)(int, struct json_object*, void *);
67 /* structure for synchronous requests */
73 /* structure for handling either client or server jbus on dbus */
76 struct jservice *services;
77 DBusConnection *connection;
78 struct jsignal *signals;
79 struct jrespw *waiters;
87 /*********************** STATIC COMMON METHODS *****************/
89 static inline void free_jreq(struct jreq *jreq)
91 dbus_message_unref(jreq->request);
92 dbus_connection_unref(jreq->connection);
96 static inline int reply_out_of_memory(struct jreq *jreq)
98 static const char out_of_memory[] = "out of memory";
99 jbus_reply_error_s(jreq, out_of_memory);
104 static inline int reply_invalid_request(struct jreq *jreq)
106 static const char invalid_request[] = "invalid request";
107 jbus_reply_error_s(jreq, invalid_request);
108 return DBUS_HANDLER_RESULT_HANDLED;
111 static int matchitf(struct jbus *jbus, DBusMessage *message)
113 const char *itf = dbus_message_get_interface(message);
114 return itf != NULL && !strcmp(itf, jbus->name);
117 static int add_service(
120 void (*oncall_s)(struct jreq*, const char*),
121 void (*oncall_j)(struct jreq*, struct json_object*)
124 struct jservice *srv;
127 srv = malloc(sizeof * srv);
132 srv->method = strdup(method);
138 /* record the service */
139 srv->oncall_s = oncall_s;
140 srv->oncall_j = oncall_j;
141 srv->next = jbus->services;
142 jbus->services = srv;
152 static int add_signal(
155 void (*onsignal_s)(const char*),
156 void (*onsignal_j)(struct json_object*)
162 /* record the signal */
163 if (jbus->signals == NULL) {
165 if (0 >= asprintf(&rule, "type='signal',interface='%s',path='%s'", jbus->name, jbus->path))
167 if (0 >= asprintf(&rule, "type='signal',sender='%s',interface='%s',path='%s'", jbus->name, jbus->name, jbus->path))
170 dbus_bus_add_match(jbus->connection, rule, NULL);
175 sig = malloc(sizeof * sig);
178 sig->name = strdup(name);
182 /* record the signal */
183 sig->onsignal_s = onsignal_s;
184 sig->onsignal_j = onsignal_j;
185 sig->next = jbus->signals;
201 void (*onresp_s)(int status, const char *response, void *data),
202 void (*onresp_j)(int status, struct json_object *response, void *data),
209 resp = malloc(sizeof * resp);
215 msg = dbus_message_new_method_call(jbus->name, jbus->path, jbus->name, method);
221 if (!dbus_message_append_args(msg, DBUS_TYPE_STRING, &query, DBUS_TYPE_INVALID)) {
226 if (!dbus_connection_send(jbus->connection, msg, &resp->serial)) {
230 dbus_message_unref(msg);
232 resp->onresp_s = onresp_s;
233 resp->onresp_j = onresp_j;
234 resp->next = jbus->waiters;
235 jbus->waiters = resp;
239 dbus_message_unref(msg);
246 static void sync_of_replies(int status, const char *value, void *data)
248 struct respsync *s = data;
249 s->value = status ? NULL : strdup(value ? value : "");
253 static DBusHandlerResult incoming_resp(DBusConnection *connection, DBusMessage *message, struct jbus *jbus, int iserror)
257 struct jrespw *jrw, **prv;
258 struct json_object *reply;
259 dbus_uint32_t serial;
261 /* search for the waiter */
262 serial = dbus_message_get_reply_serial(message);
263 prv = &jbus->waiters;
264 while ((jrw = *prv) != NULL && jrw->serial != serial)
267 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
270 /* retrieve the string value */
271 if (dbus_message_get_args(message, NULL, DBUS_TYPE_STRING, &str, DBUS_TYPE_INVALID))
281 jrw->onresp_s(iserror ? -1 : status, str, jrw->data);
283 reply = json_tokener_parse(str);
284 status = reply ? 0 : -1;
285 jrw->onresp_j(iserror ? -1 : status, reply, jrw->data);
286 json_object_put(reply);
290 return DBUS_HANDLER_RESULT_HANDLED;
293 static DBusHandlerResult incoming_call(DBusConnection *connection, DBusMessage *message, struct jbus *jbus)
295 struct jservice *srv;
299 struct json_object *query;
301 /* search for the service */
302 if (!matchitf(jbus, message))
303 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
304 method = dbus_message_get_member(message);
306 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
307 srv = jbus->services;
308 while(srv != NULL && strcmp(method, srv->method))
311 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
313 /* handle the message */
314 jreq = malloc(sizeof * jreq);
316 return DBUS_HANDLER_RESULT_NEED_MEMORY;
317 jreq->request = dbus_message_ref(message);
318 jreq->connection = dbus_connection_ref(jbus->connection);
320 /* retrieve the string value */
321 if (!dbus_message_get_args(message, NULL, DBUS_TYPE_STRING, &str, DBUS_TYPE_INVALID))
322 return reply_invalid_request(jreq);
324 /* handling strings only */
325 srv->oncall_s(jreq, str);
328 /* handling json only */
329 query = json_tokener_parse(str);
331 return reply_invalid_request(jreq);
332 srv->oncall_j(jreq, query);
333 json_object_put(query);
335 return DBUS_HANDLER_RESULT_HANDLED;
338 static DBusHandlerResult incoming_signal(DBusConnection *connection, DBusMessage *message, struct jbus *jbus)
343 struct json_object *obj;
345 /* search for the service */
346 if (!matchitf(jbus, message))
347 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
348 name = dbus_message_get_member(message);
350 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
352 while(sig != NULL && strcmp(name, sig->name))
355 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
357 /* retrieve the string value */
358 if (dbus_message_get_args(message, NULL, DBUS_TYPE_STRING, &str, DBUS_TYPE_INVALID)) {
359 if (sig->onsignal_s) {
360 /* handling strings only */
361 sig->onsignal_s(str);
364 /* handling json only */
365 obj = json_tokener_parse(str);
367 sig->onsignal_j(obj);
368 json_object_put(obj);
372 return DBUS_HANDLER_RESULT_HANDLED;
375 static DBusHandlerResult incoming(DBusConnection *connection, DBusMessage *message, void *data)
377 switch(dbus_message_get_type(message)) {
378 case DBUS_MESSAGE_TYPE_METHOD_CALL:
379 return incoming_call(connection, message, (struct jbus*)data);
380 case DBUS_MESSAGE_TYPE_METHOD_RETURN:
381 return incoming_resp(connection, message, (struct jbus*)data, 0);
382 case DBUS_MESSAGE_TYPE_ERROR:
383 return incoming_resp(connection, message, (struct jbus*)data, 1);
384 case DBUS_MESSAGE_TYPE_SIGNAL:
385 return incoming_signal(connection, message, (struct jbus*)data);
387 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
390 static void watchset(DBusWatch *watch, struct jbus *jbus)
395 flags = dbus_watch_get_flags(watch);
396 e = dbus_watch_get_enabled(watch);
397 wf = jbus->watchflags;
399 if (flags & DBUS_WATCH_READABLE)
401 if (flags & DBUS_WATCH_WRITABLE)
405 if (flags & DBUS_WATCH_READABLE)
407 if (flags & DBUS_WATCH_WRITABLE)
410 jbus->watchflags = wf;
413 static void watchdel(DBusWatch *watch, void *data)
415 struct jbus *jbus = data;
417 assert(jbus->watchnr > 0);
418 assert(jbus->watchfd == dbus_watch_get_unix_fd(watch));
422 static void watchtoggle(DBusWatch *watch, void *data)
424 struct jbus *jbus = data;
426 assert(jbus->watchnr > 0);
427 assert(jbus->watchfd == dbus_watch_get_unix_fd(watch));
428 watchset(watch, jbus);
431 static dbus_bool_t watchadd(DBusWatch *watch, void *data)
433 struct jbus *jbus = data;
434 if (jbus->watchnr == 0) {
435 jbus->watchfd = dbus_watch_get_unix_fd(watch);
436 jbus->watchflags = 0;
438 else if (jbus->watchfd != dbus_watch_get_unix_fd(watch))
441 watchset(watch, jbus);
445 /************************** MAIN FUNCTIONS *****************************************/
447 struct jbus *create_jbus(int session, const char *path)
452 /* create the context and connect */
453 jbus = calloc(1, sizeof * jbus);
459 jbus->path = strdup(path);
460 if (jbus->path == NULL) {
464 while(*path == '/') path++;
465 jbus->name = name = strdup(path);
476 while (name >= jbus->name && *name == '.')
484 jbus->connection = dbus_bus_get(session ? DBUS_BUS_SESSION : DBUS_BUS_SYSTEM, NULL);
485 if (jbus->connection == NULL
486 || !dbus_connection_add_filter(jbus->connection, incoming, jbus, NULL)
487 || !dbus_connection_set_watch_functions(jbus->connection, watchadd, watchdel, watchtoggle, jbus, NULL))
498 void jbus_addref(struct jbus *jbus)
503 void jbus_unref(struct jbus *jbus)
505 struct jservice *srv;
506 if (!--jbus->refcount) {
507 if (jbus->connection != NULL)
508 dbus_connection_unref(jbus->connection);
509 while((srv = jbus->services) != NULL) {
510 jbus->services = srv->next;
520 int jbus_reply_error_s(struct jreq *jreq, const char *error)
523 DBusMessage *message;
525 message = dbus_message_new_error(jreq->request, DBUS_ERROR_FAILED, error);
529 if (dbus_connection_send(jreq->connection, message, NULL))
531 dbus_message_unref(message);
537 int jbus_reply_error_j(struct jreq *jreq, struct json_object *reply)
539 const char *str = json_object_to_json_string(reply);
540 return str ? jbus_reply_error_s(jreq, str) : reply_out_of_memory(jreq);
543 int jbus_reply_s(struct jreq *jreq, const char *reply)
546 DBusMessage *message;
548 message = dbus_message_new_method_return(jreq->request);
550 return reply_out_of_memory(jreq);
552 if (!dbus_message_append_args(message, DBUS_TYPE_STRING, &reply, DBUS_TYPE_INVALID)) {
553 dbus_message_unref(message);
554 return reply_out_of_memory(jreq);
557 if (dbus_connection_send(jreq->connection, message, NULL))
559 dbus_message_unref(message);
564 int jbus_reply_j(struct jreq *jreq, struct json_object *reply)
566 const char *str = json_object_to_json_string(reply);
567 return str ? jbus_reply_s(jreq, str) : reply_out_of_memory(jreq);
570 int jbus_send_signal_s(struct jbus *jbus, const char *name, const char *content)
573 DBusMessage *message;
575 message = dbus_message_new_signal(jbus->path, jbus->name, name);
579 if (!dbus_message_set_sender(message, jbus->name)
580 || !dbus_message_append_args(message, DBUS_TYPE_STRING, &content, DBUS_TYPE_INVALID)) {
581 dbus_message_unref(message);
585 if (dbus_connection_send(jbus->connection, message, NULL))
587 dbus_message_unref(message);
595 int jbus_send_signal_j(struct jbus *jbus, const char *name, struct json_object *content)
597 const char *str = json_object_to_json_string(content);
602 return jbus_send_signal_s(jbus, name, str);
605 int jbus_add_service_s(struct jbus *jbus, const char *method, void (*oncall)(struct jreq *, const char *))
607 return add_service(jbus, method, oncall, NULL);
610 int jbus_add_service_j(struct jbus *jbus, const char *method, void (*oncall)(struct jreq *, struct json_object *))
612 return add_service(jbus, method, NULL, oncall);
615 int jbus_start_serving(struct jbus *jbus)
617 int status = dbus_bus_request_name(jbus->connection, jbus->name, DBUS_NAME_FLAG_DO_NOT_QUEUE, NULL);
619 case DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER:
620 case DBUS_REQUEST_NAME_REPLY_ALREADY_OWNER:
622 case DBUS_REQUEST_NAME_REPLY_EXISTS:
623 case DBUS_REQUEST_NAME_REPLY_IN_QUEUE:
630 int jbus_fill_pollfds(struct jbus **jbuses, int njbuses, struct pollfd *fds)
634 for (r = i = 0 ; i < njbuses ; i++) {
635 if (jbuses[i]->watchnr) {
636 fds[r].fd = jbuses[i]->watchfd;
637 fds[r].events = jbuses[i]->watchflags;
644 int jbus_dispatch_pollfds(struct jbus **jbuses, int njbuses, struct pollfd *fds, int maxcount)
647 DBusDispatchStatus sts;
649 for (r = n = i = 0 ; i < njbuses && n < maxcount ; i++) {
650 if (jbuses[i]->watchnr && fds[r].fd == jbuses[i]->watchfd) {
651 if (fds[r].revents) {
652 dbus_connection_read_write(jbuses[i]->connection, 0);
653 sts = dbus_connection_get_dispatch_status(jbuses[i]->connection);
654 while(sts == DBUS_DISPATCH_DATA_REMAINS && n < maxcount) {
655 sts = dbus_connection_dispatch(jbuses[i]->connection);
665 int jbus_dispatch_multiple(struct jbus **jbuses, int njbuses, int maxcount)
668 DBusDispatchStatus sts;
670 for (i = r = 0 ; i < njbuses && r < maxcount ; i++) {
671 dbus_connection_read_write(jbuses[i]->connection, 0);
672 sts = dbus_connection_get_dispatch_status(jbuses[i]->connection);
673 while(sts == DBUS_DISPATCH_DATA_REMAINS && r < maxcount) {
674 sts = dbus_connection_dispatch(jbuses[i]->connection);
681 int jbus_read_write_dispatch_multiple(struct jbus **jbuses, int njbuses, int toms, int maxcount)
686 if (njbuses < 0 || njbuses > 100) {
690 fds = alloca(njbuses * sizeof * fds);
693 r = jbus_dispatch_multiple(jbuses, njbuses, maxcount);
696 n = jbus_fill_pollfds(jbuses, njbuses, fds);
698 s = poll(fds, n, toms);
705 n = jbus_dispatch_pollfds(jbuses, njbuses, fds, maxcount - r);
706 return n >= 0 ? r + n : r ? r : n;
709 int jbus_read_write_dispatch(struct jbus *jbus, int toms)
711 int r = jbus_read_write_dispatch_multiple(&jbus, 1, toms, 1000);
712 return r < 0 ? r : 0;
715 int jbus_call_ss(struct jbus *jbus, const char *method, const char *query, void (*onresp)(int, const char*, void*), void *data)
717 return call(jbus, method, query, onresp, NULL, data);
720 int jbus_call_sj(struct jbus *jbus, const char *method, const char *query, void (*onresp)(int, struct json_object*, void*), void *data)
722 return call(jbus, method, query, NULL, onresp, data);
725 int jbus_call_js(struct jbus *jbus, const char *method, struct json_object *query, void (*onresp)(int, const char*, void*), void *data)
727 const char *str = json_object_to_json_string(query);
732 return call(jbus, method, str, onresp, NULL, data);
735 int jbus_call_jj(struct jbus *jbus, const char *method, struct json_object *query, void (*onresp)(int, struct json_object*, void*), void *data)
737 const char *str = json_object_to_json_string(query);
742 return call(jbus, method, str, NULL, onresp, data);
745 char *jbus_call_ss_sync(struct jbus *jbus, const char *method, const char *query)
747 struct respsync synchro;
748 synchro.value = NULL;
749 synchro.replied = jbus_call_ss(jbus, method, query, sync_of_replies, &synchro);
750 while (!synchro.replied && !jbus_read_write_dispatch(jbus, -1));
751 return synchro.value;
754 struct json_object *jbus_call_sj_sync(struct jbus *jbus, const char *method, const char *query)
756 struct json_object *obj;
757 char *str = jbus_call_ss_sync(jbus, method, query);
761 obj = json_tokener_parse(str);
767 char *jbus_call_js_sync(struct jbus *jbus, const char *method, struct json_object *query)
769 const char *str = json_object_to_json_string(query);
774 return jbus_call_ss_sync(jbus, method, str);
777 struct json_object *jbus_call_jj_sync(struct jbus *jbus, const char *method, struct json_object *query)
779 const char *str = json_object_to_json_string(query);
784 return jbus_call_sj_sync(jbus, method, str);
787 int jbus_on_signal_s(struct jbus *jbus, const char *name, void (*onsig)(const char *))
789 return add_signal(jbus, name, onsig, NULL);
792 int jbus_on_signal_j(struct jbus *jbus, const char *name, void (*onsig)(struct json_object *))
794 return add_signal(jbus, name, NULL, onsig);
797 /************************** FEW LITTLE TESTS *****************************************/
803 void ping(struct jreq *jreq, struct json_object *request)
805 printf("ping(%s) -> %s\n",json_object_to_json_string(request),json_object_to_json_string(request));
806 jbus_reply_j(jreq, request);
807 json_object_put(request);
809 void incr(struct jreq *jreq, struct json_object *request)
811 static int counter = 0;
812 struct json_object *res = json_object_new_int(++counter);
813 printf("incr(%s) -> %s\n",json_object_to_json_string(request),json_object_to_json_string(res));
814 jbus_reply_j(jreq, res);
815 jbus_send_signal_j(jbus, "incremented", res);
816 json_object_put(res);
817 json_object_put(request);
822 jbus = create_jbus(1, "/bzh/iot/jdbus");
823 s1 = jbus_add_service_j(jbus, "ping", ping);
824 s2 = jbus_add_service_j(jbus, "incr", incr);
825 s3 = jbus_start_serving(jbus);
826 printf("started %d %d %d\n", s1, s2, s3);
827 while (!jbus_read_write_dispatch (jbus, -1));
834 void onresp(int status, struct json_object *response, void *data)
836 printf("resp: %d, %s, %s\n",status,(char*)data,json_object_to_json_string(response));
837 json_object_put(response);
839 void signaled(const char *data)
841 printf("signaled with {%s}\n", data);
846 jbus = create_jbus(1, "/bzh/iot/jdbus");
847 jbus_on_signal_s(jbus, "incremented", signaled);
849 jbus_call_sj(jbus, "ping", "{\"toto\":[1,2,3,4,true,\"toto\"]}", onresp, "ping");
850 jbus_call_sj(jbus, "incr", "{\"doit\":\"for-me\"}", onresp, "incr");
851 jbus_read_write_dispatch (jbus, 1);
853 printf("[[[%s]]]\n", jbus_call_ss_sync(jbus, "ping", "\"formidable!\""));
854 while (!jbus_read_write_dispatch (jbus, -1));