2 * Copyright (C) 2015-2018 "IoT.bzh"
3 * Author José Bollo <jose.bollo@iot.bzh>
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
26 #include <json-c/json.h>
27 #include <afb/afb-event-x2-itf.h>
28 #include <afb/afb-event-x1.h>
38 * Structure for event listeners
40 struct afb_evt_listener {
42 /* chaining listeners */
43 struct afb_evt_listener *next;
45 /* interface for callbacks */
46 const struct afb_evt_itf *itf;
48 /* closure for the callback */
51 /* head of the list of events listened */
52 struct afb_evt_watch *watchs;
54 /* rwlock of the listener */
55 pthread_rwlock_t rwlock;
57 /* count of reference to the listener */
62 * Structure for describing events
67 struct afb_event_x2 eventid;
70 struct afb_evtid *next;
72 /* head of the list of listeners watching the event */
73 struct afb_evt_watch *watchs;
75 /* rwlock of the event */
76 pthread_rwlock_t rwlock;
90 /* fullname of the event */
95 * Structure for associating events and listeners
97 struct afb_evt_watch {
100 struct afb_evtid *evtid;
102 /* link to the next watcher for the same evtid */
103 struct afb_evt_watch *next_by_evtid;
106 struct afb_evt_listener *listener;
108 /* link to the next watcher for the same listener */
109 struct afb_evt_watch *next_by_listener;
116 * structure for job of broadcasting string events
120 /** object atached to the event */
121 struct json_object *object;
123 /** name of the event to broadcast */
128 * structure for job of broadcasting or pushing events
132 /** the event to broadcast */
133 struct afb_evtid *evtid;
135 /** object atached to the event */
136 struct json_object *object;
139 /* the interface for events */
140 static struct afb_event_x2_itf afb_evt_event_x2_itf = {
141 .broadcast = (void*)afb_evt_evtid_broadcast,
142 .push = (void*)afb_evt_evtid_push,
143 .unref = (void*)afb_evt_evtid_unref,
144 .name = (void*)afb_evt_evtid_name,
145 .addref = (void*)afb_evt_evtid_addref
148 /* the interface for events */
149 static struct afb_event_x2_itf afb_evt_hooked_eventid_itf = {
150 .broadcast = (void*)afb_evt_evtid_hooked_broadcast,
151 .push = (void*)afb_evt_evtid_hooked_push,
152 .unref = (void*)afb_evt_evtid_hooked_unref,
153 .name = (void*)afb_evt_evtid_hooked_name,
154 .addref = (void*)afb_evt_evtid_hooked_addref
157 /* job groups for events push/broadcast */
158 #define BROADCAST_JOB_GROUP (&afb_evt_event_x2_itf)
159 #define PUSH_JOB_GROUP (&afb_evt_event_x2_itf)
161 /* head of the list of listeners */
162 static pthread_rwlock_t listeners_rwlock = PTHREAD_RWLOCK_INITIALIZER;
163 static struct afb_evt_listener *listeners = NULL;
165 /* handling id of events */
166 static pthread_rwlock_t events_rwlock = PTHREAD_RWLOCK_INITIALIZER;
167 static struct afb_evtid *evtids = NULL;
168 static int event_id_counter = 0;
169 static int event_id_wrapped = 0;
172 * Create structure for job of broadcasting string 'event' with 'object'
173 * Returns the created structure or NULL if out of memory
175 static struct job_string *make_job_string(const char *event, struct json_object *object)
177 size_t sz = 1 + strlen(event);
178 struct job_string *js = malloc(sz + sizeof *js);
181 memcpy(js->event, event, sz);
187 * Destroy structure 'js' for job of broadcasting string events
189 static void destroy_job_string(struct job_string *js)
191 json_object_put(js->object);
196 * Create structure for job of broadcasting or pushing 'evtid' with 'object'
197 * Returns the created structure or NULL if out of memory
199 static struct job_evtid *make_job_evtid(struct afb_evtid *evtid, struct json_object *object)
201 struct job_evtid *je = malloc(sizeof *je);
203 je->evtid = afb_evt_evtid_addref(evtid);
210 * Destroy structure for job of broadcasting or pushing evtid
212 static void destroy_job_evtid(struct job_evtid *je)
214 afb_evt_evtid_unref(je->evtid);
215 json_object_put(je->object);
220 * Broadcasts the 'event' of 'id' with its 'object'
222 static void broadcast(const char *event, struct json_object *object, int id)
224 struct afb_evt_listener *listener;
226 pthread_rwlock_rdlock(&listeners_rwlock);
227 listener = listeners;
229 if (listener->itf->broadcast != NULL)
230 listener->itf->broadcast(listener->closure, event, id, json_object_get(object));
231 listener = listener->next;
233 pthread_rwlock_unlock(&listeners_rwlock);
237 * Jobs callback for broadcasting string asynchronously
239 static void broadcast_job_string(int signum, void *closure)
241 struct job_string *js = closure;
244 broadcast(js->event, js->object, 0);
245 destroy_job_string(js);
249 * Jobs callback for broadcasting evtid asynchronously
251 static void broadcast_job_evtid(int signum, void *closure)
253 struct job_evtid *je = closure;
256 broadcast(je->evtid->fullname, je->object, je->evtid->id);
257 destroy_job_evtid(je);
261 * Broadcasts the string 'event' with its 'object'
263 static int broadcast_string(const char *event, struct json_object *object)
265 struct job_string *js;
268 js = make_job_string(event, object);
270 ERROR("Cant't create broadcast string job item for %s(%s)",
271 event, json_object_to_json_string(object));
272 json_object_put(object);
276 rc = jobs_queue(BROADCAST_JOB_GROUP, 0, broadcast_job_string, js);
278 ERROR("cant't queue broadcast string job item for %s(%s)",
279 event, json_object_to_json_string(object));
280 destroy_job_string(js);
286 * Broadcasts the 'evtid' with its 'object'
288 static int broadcast_evtid(struct afb_evtid *evtid, struct json_object *object)
290 struct job_evtid *je;
293 je = make_job_evtid(evtid, object);
295 ERROR("Cant't create broadcast evtid job item for %s(%s)",
296 evtid->fullname, json_object_to_json_string(object));
297 json_object_put(object);
301 rc = jobs_queue(BROADCAST_JOB_GROUP, 0, broadcast_job_evtid, je);
303 ERROR("cant't queue broadcast evtid job item for %s(%s)",
304 evtid->fullname, json_object_to_json_string(object));
305 destroy_job_evtid(je);
311 * Broadcasts the event 'evtid' with its 'object'
312 * 'object' is released (like json_object_put)
313 * Returns the count of listener that received the event.
315 int afb_evt_evtid_broadcast(struct afb_evtid *evtid, struct json_object *object)
317 return broadcast_evtid(evtid, object);
321 * Broadcasts the event 'evtid' with its 'object'
322 * 'object' is released (like json_object_put)
323 * Returns the count of listener that received the event.
325 int afb_evt_evtid_hooked_broadcast(struct afb_evtid *evtid, struct json_object *object)
329 json_object_get(object);
331 if (evtid->hookflags & afb_hook_flag_evt_broadcast_before)
332 afb_hook_evt_broadcast_before(evtid->fullname, evtid->id, object);
334 result = broadcast_evtid(evtid, object);
336 if (evtid->hookflags & afb_hook_flag_evt_broadcast_after)
337 afb_hook_evt_broadcast_after(evtid->fullname, evtid->id, object, result);
339 json_object_put(object);
345 * Broadcasts the 'event' with its 'object'
346 * 'object' is released (like json_object_put)
347 * Returns the count of listener having receive the event.
349 int afb_evt_broadcast(const char *event, struct json_object *object)
353 json_object_get(object);
355 afb_hook_evt_broadcast_before(event, 0, object);
356 result = broadcast_string(event, object);
357 afb_hook_evt_broadcast_after(event, 0, object, result);
359 json_object_put(object);
365 * Pushes the event 'evtid' with 'obj' to its listeners
366 * Returns the count of listener that received the event.
368 static void push_evtid(struct afb_evtid *evtid, struct json_object *object)
371 struct afb_evt_watch *watch;
372 struct afb_evt_listener *listener;
375 pthread_rwlock_rdlock(&evtid->rwlock);
376 watch = evtid->watchs;
378 listener = watch->listener;
379 assert(listener->itf->push != NULL);
380 if (watch->activity != 0) {
381 listener->itf->push(listener->closure, evtid->fullname, evtid->id, json_object_get(object));
384 watch = watch->next_by_evtid;
386 evtid->has_client = has_client;
387 pthread_rwlock_unlock(&evtid->rwlock);
391 * Jobs callback for pushing evtid asynchronously
393 static void push_job_evtid(int signum, void *closure)
395 struct job_evtid *je = closure;
398 push_evtid(je->evtid, je->object);
399 destroy_job_evtid(je);
403 * Pushes the event 'evtid' with 'obj' to its listeners
404 * 'obj' is released (like json_object_put)
405 * Returns 1 if at least one listener exists or 0 if no listener exists or
406 * -1 in case of error and the event can't be delivered
408 int afb_evt_evtid_push(struct afb_evtid *evtid, struct json_object *object)
410 struct job_evtid *je;
413 je = make_job_evtid(evtid, object);
415 ERROR("Cant't create push evtid job item for %s(%s)",
416 evtid->fullname, json_object_to_json_string(object));
417 json_object_put(object);
421 rc = jobs_queue(PUSH_JOB_GROUP, 0, push_job_evtid, je);
423 rc = evtid->has_client;
425 ERROR("cant't queue push evtid job item for %s(%s)",
426 evtid->fullname, json_object_to_json_string(object));
427 destroy_job_evtid(je);
434 * Pushes the event 'evtid' with 'obj' to its listeners
435 * 'obj' is released (like json_object_put)
436 * Emits calls to hooks.
437 * Returns the count of listener taht received the event.
439 int afb_evt_evtid_hooked_push(struct afb_evtid *evtid, struct json_object *obj)
444 /* lease the object */
445 json_object_get(obj);
447 /* hook before push */
448 if (evtid->hookflags & afb_hook_flag_evt_push_before)
449 afb_hook_evt_push_before(evtid->fullname, evtid->id, obj);
452 result = afb_evt_evtid_push(evtid, obj);
454 /* hook after push */
455 if (evtid->hookflags & afb_hook_flag_evt_push_after)
456 afb_hook_evt_push_after(evtid->fullname, evtid->id, obj, result);
458 /* release the object */
459 json_object_put(obj);
466 static void remove_watch(struct afb_evt_watch *watch)
468 struct afb_evt_watch **prv;
469 struct afb_evtid *evtid;
470 struct afb_evt_listener *listener;
472 /* notify listener if needed */
473 evtid = watch->evtid;
474 listener = watch->listener;
475 if (watch->activity != 0 && listener->itf->remove != NULL)
476 listener->itf->remove(listener->closure, evtid->fullname, evtid->id);
478 /* unlink the watch for its event */
479 prv = &evtid->watchs;
481 prv = &(*prv)->next_by_evtid;
482 *prv = watch->next_by_evtid;
484 /* unlink the watch for its listener */
485 prv = &listener->watchs;
487 prv = &(*prv)->next_by_listener;
488 *prv = watch->next_by_listener;
495 * Creates an event of name 'fullname' and returns it or NULL on error.
497 struct afb_evtid *afb_evt_evtid_create(const char *fullname)
500 struct afb_evtid *evtid, *oevt;
502 /* allocates the event */
503 len = strlen(fullname);
504 evtid = malloc(len + 1 + sizeof * evtid);
508 /* allocates the id */
509 pthread_rwlock_wrlock(&events_rwlock);
511 if (++event_id_counter < 0) {
512 event_id_wrapped = 1;
513 event_id_counter = 1024; /* heuristic: small numbers are not destroyed */
515 if (!event_id_wrapped)
518 while(oevt != NULL && oevt->id != event_id_counter)
520 } while (oevt != NULL);
522 /* initialize the event */
523 memcpy(evtid->fullname, fullname, len + 1);
524 evtid->next = evtids;
526 evtid->watchs = NULL;
527 evtid->id = event_id_counter;
528 evtid->has_client = 0;
529 pthread_rwlock_init(&evtid->rwlock, NULL);
531 evtid->hookflags = afb_hook_flags_evt(evtid->fullname);
532 evtid->eventid.itf = evtid->hookflags ? &afb_evt_hooked_eventid_itf : &afb_evt_event_x2_itf;
533 if (evtid->hookflags & afb_hook_flag_evt_create)
534 afb_hook_evt_create(evtid->fullname, evtid->id);
535 pthread_rwlock_unlock(&events_rwlock);
537 /* returns the event */
544 * Creates an event of name 'prefix'/'name' and returns it or NULL on error.
546 struct afb_evtid *afb_evt_evtid_create2(const char *prefix, const char *name)
548 size_t prelen, postlen;
551 /* makes the event fullname */
552 prelen = strlen(prefix);
553 postlen = strlen(name);
554 fullname = alloca(prelen + postlen + 2);
555 memcpy(fullname, prefix, prelen);
556 fullname[prelen] = '/';
557 memcpy(fullname + prelen + 1, name, postlen + 1);
559 /* create the event */
560 return afb_evt_evtid_create(fullname);
564 * increment the reference count of the event 'evtid'
566 struct afb_evtid *afb_evt_evtid_addref(struct afb_evtid *evtid)
568 __atomic_add_fetch(&evtid->refcount, 1, __ATOMIC_RELAXED);
573 * increment the reference count of the event 'evtid'
575 struct afb_evtid *afb_evt_evtid_hooked_addref(struct afb_evtid *evtid)
577 if (evtid->hookflags & afb_hook_flag_evt_addref)
578 afb_hook_evt_addref(evtid->fullname, evtid->id);
579 return afb_evt_evtid_addref(evtid);
583 * decrement the reference count of the event 'evtid'
584 * and destroy it when the count reachs zero
586 void afb_evt_evtid_unref(struct afb_evtid *evtid)
589 struct afb_evtid **prv;
590 struct afb_evt_listener *listener;
592 if (!__atomic_sub_fetch(&evtid->refcount, 1, __ATOMIC_RELAXED)) {
593 /* unlinks the event if valid! */
594 pthread_rwlock_wrlock(&events_rwlock);
597 while (*prv && !(found = (*prv == evtid)))
601 pthread_rwlock_unlock(&events_rwlock);
603 /* destroys the event */
605 ERROR("event not found");
607 /* removes all watchers */
608 while(evtid->watchs != NULL) {
609 listener = evtid->watchs->listener;
610 pthread_rwlock_wrlock(&listener->rwlock);
611 pthread_rwlock_wrlock(&evtid->rwlock);
612 remove_watch(evtid->watchs);
613 pthread_rwlock_unlock(&evtid->rwlock);
614 pthread_rwlock_unlock(&listener->rwlock);
618 pthread_rwlock_destroy(&evtid->rwlock);
625 * decrement the reference count of the event 'evtid'
626 * and destroy it when the count reachs zero
628 void afb_evt_evtid_hooked_unref(struct afb_evtid *evtid)
630 if (evtid->hookflags & afb_hook_flag_evt_unref)
631 afb_hook_evt_unref(evtid->fullname, evtid->id);
632 afb_evt_evtid_unref(evtid);
636 * Returns the true name of the 'event'
638 const char *afb_evt_evtid_fullname(struct afb_evtid *evtid)
640 return evtid->fullname;
644 * Returns the name of the 'event'
646 const char *afb_evt_evtid_name(struct afb_evtid *evtid)
648 const char *name = strchr(evtid->fullname, '/');
649 return name ? name + 1 : evtid->fullname;
653 * Returns the name associated to the event 'evtid'.
655 const char *afb_evt_evtid_hooked_name(struct afb_evtid *evtid)
657 const char *result = afb_evt_evtid_name(evtid);
658 if (evtid->hookflags & afb_hook_flag_evt_name)
659 afb_hook_evt_name(evtid->fullname, evtid->id, result);
664 * Returns the id of the 'event'
666 int afb_evt_evtid_id(struct afb_evtid *evtid)
672 * Returns an instance of the listener defined by the 'send' callback
674 * Returns NULL in case of memory depletion.
676 struct afb_evt_listener *afb_evt_listener_create(const struct afb_evt_itf *itf, void *closure)
678 struct afb_evt_listener *listener;
680 /* search if an instance already exists */
681 pthread_rwlock_wrlock(&listeners_rwlock);
682 listener = listeners;
683 while (listener != NULL) {
684 if (listener->itf == itf && listener->closure == closure) {
685 listener = afb_evt_listener_addref(listener);
688 listener = listener->next;
692 listener = calloc(1, sizeof *listener);
693 if (listener != NULL) {
696 listener->closure = closure;
697 listener->watchs = NULL;
698 listener->refcount = 1;
699 pthread_rwlock_init(&listener->rwlock, NULL);
700 listener->next = listeners;
701 listeners = listener;
704 pthread_rwlock_unlock(&listeners_rwlock);
709 * Increases the reference count of 'listener' and returns it
711 struct afb_evt_listener *afb_evt_listener_addref(struct afb_evt_listener *listener)
713 __atomic_add_fetch(&listener->refcount, 1, __ATOMIC_RELAXED);
718 * Decreases the reference count of the 'listener' and destroys it
721 void afb_evt_listener_unref(struct afb_evt_listener *listener)
723 struct afb_evt_listener **prv;
724 struct afb_evtid *evtid;
726 if (listener && !__atomic_sub_fetch(&listener->refcount, 1, __ATOMIC_RELAXED)) {
728 /* unlink the listener */
729 pthread_rwlock_wrlock(&listeners_rwlock);
731 while (*prv != listener)
733 *prv = listener->next;
734 pthread_rwlock_unlock(&listeners_rwlock);
736 /* remove the watchers */
737 pthread_rwlock_wrlock(&listener->rwlock);
738 while (listener->watchs != NULL) {
739 evtid = listener->watchs->evtid;
740 pthread_rwlock_wrlock(&evtid->rwlock);
741 remove_watch(listener->watchs);
742 pthread_rwlock_unlock(&evtid->rwlock);
744 pthread_rwlock_unlock(&listener->rwlock);
746 /* free the listener */
747 pthread_rwlock_destroy(&listener->rwlock);
753 * Makes the 'listener' watching 'evtid'
754 * Returns 0 in case of success or else -1.
756 int afb_evt_watch_add_evtid(struct afb_evt_listener *listener, struct afb_evtid *evtid)
758 struct afb_evt_watch *watch;
760 /* check parameter */
761 if (listener->itf->push == NULL) {
766 /* search the existing watch for the listener */
767 pthread_rwlock_wrlock(&listener->rwlock);
768 watch = listener->watchs;
769 while(watch != NULL) {
770 if (watch->evtid == evtid)
772 watch = watch->next_by_listener;
775 /* not found, allocate a new */
776 watch = malloc(sizeof *watch);
778 pthread_rwlock_unlock(&listener->rwlock);
783 /* initialise and link */
784 watch->evtid = evtid;
786 watch->listener = listener;
787 watch->next_by_listener = listener->watchs;
788 listener->watchs = watch;
789 pthread_rwlock_wrlock(&evtid->rwlock);
790 watch->next_by_evtid = evtid->watchs;
791 evtid->watchs = watch;
792 pthread_rwlock_unlock(&evtid->rwlock);
795 if (watch->activity == 0 && listener->itf->add != NULL)
796 listener->itf->add(listener->closure, evtid->fullname, evtid->id);
798 evtid->has_client = 1;
799 pthread_rwlock_unlock(&listener->rwlock);
805 * Avoids the 'listener' to watch 'evtid'
806 * Returns 0 in case of success or else -1.
808 int afb_evt_watch_sub_evtid(struct afb_evt_listener *listener, struct afb_evtid *evtid)
810 struct afb_evt_watch *watch;
812 /* search the existing watch */
813 pthread_rwlock_wrlock(&listener->rwlock);
814 watch = listener->watchs;
815 while(watch != NULL) {
816 if (watch->evtid == evtid) {
817 if (watch->activity != 0) {
819 if (watch->activity == 0 && listener->itf->remove != NULL)
820 listener->itf->remove(listener->closure, evtid->fullname, evtid->id);
822 pthread_rwlock_unlock(&listener->rwlock);
825 watch = watch->next_by_listener;
827 pthread_rwlock_unlock(&listener->rwlock);
833 * update the hooks for events
835 void afb_evt_update_hooks()
837 struct afb_evtid *evtid;
839 pthread_rwlock_rdlock(&events_rwlock);
840 for (evtid = evtids ; evtid ; evtid = evtid->next) {
841 evtid->hookflags = afb_hook_flags_evt(evtid->fullname);
842 evtid->eventid.itf = evtid->hookflags ? &afb_evt_hooked_eventid_itf : &afb_evt_event_x2_itf;
844 pthread_rwlock_unlock(&events_rwlock);
847 inline struct afb_evtid *afb_evt_event_x2_to_evtid(struct afb_event_x2 *eventid)
849 return (struct afb_evtid*)eventid;
852 inline struct afb_event_x2 *afb_evt_event_x2_from_evtid(struct afb_evtid *evtid)
854 return &evtid->eventid;
858 * Creates an event of 'fullname' and returns it.
859 * Returns an event with closure==NULL in case of error.
861 struct afb_event_x2 *afb_evt_event_x2_create(const char *fullname)
863 return afb_evt_event_x2_from_evtid(afb_evt_evtid_create(fullname));
867 * Creates an event of name 'prefix'/'name' and returns it.
868 * Returns an event with closure==NULL in case of error.
870 struct afb_event_x2 *afb_evt_event_x2_create2(const char *prefix, const char *name)
872 return afb_evt_event_x2_from_evtid(afb_evt_evtid_create2(prefix, name));
876 * Returns the fullname of the 'eventid'
878 const char *afb_evt_event_x2_fullname(struct afb_event_x2 *eventid)
880 struct afb_evtid *evtid = afb_evt_event_x2_to_evtid(eventid);
881 return evtid ? evtid->fullname : NULL;
885 * Returns the id of the 'eventid'
887 int afb_evt_event_x2_id(struct afb_event_x2 *eventid)
889 struct afb_evtid *evtid = afb_evt_event_x2_to_evtid(eventid);
890 return evtid ? evtid->id : 0;
894 * Makes the 'listener' watching 'eventid'
895 * Returns 0 in case of success or else -1.
897 int afb_evt_event_x2_add_watch(struct afb_evt_listener *listener, struct afb_event_x2 *eventid)
899 struct afb_evtid *evtid = afb_evt_event_x2_to_evtid(eventid);
901 /* check parameter */
907 /* search the existing watch for the listener */
908 return afb_evt_watch_add_evtid(listener, evtid);
912 * Avoids the 'listener' to watch 'eventid'
913 * Returns 0 in case of success or else -1.
915 int afb_evt_event_x2_remove_watch(struct afb_evt_listener *listener, struct afb_event_x2 *eventid)
917 struct afb_evtid *evtid = afb_evt_event_x2_to_evtid(eventid);
919 /* check parameter */
925 /* search the existing watch */
926 return afb_evt_watch_sub_evtid(listener, evtid);
929 int afb_evt_event_x2_push(struct afb_event_x2 *eventid, struct json_object *object)
931 struct afb_evtid *evtid = afb_evt_event_x2_to_evtid(eventid);
933 return afb_evt_evtid_hooked_push(evtid, object);
934 json_object_put(object);
938 int afb_evt_event_x2_unhooked_push(struct afb_event_x2 *eventid, struct json_object *object)
940 struct afb_evtid *evtid = afb_evt_event_x2_to_evtid(eventid);
942 return afb_evt_evtid_push(evtid, object);
943 json_object_put(object);
947 struct afb_event_x1 afb_evt_event_from_evtid(struct afb_evtid *evtid)
950 ? (struct afb_event_x1){ .itf = &afb_evt_hooked_eventid_itf, .closure = &evtid->eventid }
951 : (struct afb_event_x1){ .itf = NULL, .closure = NULL };
954 void afb_evt_event_x2_unref(struct afb_event_x2 *eventid)
956 struct afb_evtid *evtid = afb_evt_event_x2_to_evtid(eventid);
958 afb_evt_evtid_unref(evtid);
961 struct afb_event_x2 *afb_evt_event_x2_addref(struct afb_event_x2 *eventid)
963 struct afb_evtid *evtid = afb_evt_event_x2_to_evtid(eventid);
965 afb_evt_evtid_addref(evtid);