2 * Copyright (C) 2015-2019 "IoT.bzh"
3 * Author José Bollo <jose.bollo@iot.bzh>
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
26 #include <json-c/json.h>
27 #include <afb/afb-event-x2-itf.h>
28 #include <afb/afb-event-x1.h>
38 * Structure for event listeners
40 struct afb_evt_listener {
42 /* chaining listeners */
43 struct afb_evt_listener *next;
45 /* interface for callbacks */
46 const struct afb_evt_itf *itf;
48 /* closure for the callback */
51 /* head of the list of events listened */
52 struct afb_evt_watch *watchs;
54 /* rwlock of the listener */
55 pthread_rwlock_t rwlock;
57 /* count of reference to the listener */
62 * Structure for describing events
67 struct afb_event_x2 eventid;
70 struct afb_evtid *next;
72 /* head of the list of listeners watching the event */
73 struct afb_evt_watch *watchs;
75 /* rwlock of the event */
76 pthread_rwlock_t rwlock;
92 /* fullname of the event */
97 * Structure for associating events and listeners
99 struct afb_evt_watch {
102 struct afb_evtid *evtid;
104 /* link to the next watcher for the same evtid */
105 struct afb_evt_watch *next_by_evtid;
108 struct afb_evt_listener *listener;
110 /* link to the next watcher for the same listener */
111 struct afb_evt_watch *next_by_listener;
118 * structure for job of broadcasting events
122 /** object atached to the event */
123 struct json_object *object;
125 /** name of the event to broadcast */
130 * structure for job of broadcasting or pushing events
134 /** the event to broadcast */
135 struct afb_evtid *evtid;
137 /** object atached to the event */
138 struct json_object *object;
141 /* the interface for events */
142 static struct afb_event_x2_itf afb_evt_event_x2_itf = {
143 .broadcast = (void*)afb_evt_evtid_broadcast,
144 .push = (void*)afb_evt_evtid_push,
145 .unref = (void*)afb_evt_evtid_unref,
146 .name = (void*)afb_evt_evtid_name,
147 .addref = (void*)afb_evt_evtid_addref
151 /* the interface for events */
152 static struct afb_event_x2_itf afb_evt_hooked_event_x2_itf = {
153 .broadcast = (void*)afb_evt_evtid_hooked_broadcast,
154 .push = (void*)afb_evt_evtid_hooked_push,
155 .unref = (void*)afb_evt_evtid_hooked_unref,
156 .name = (void*)afb_evt_evtid_hooked_name,
157 .addref = (void*)afb_evt_evtid_hooked_addref
161 /* job groups for events push/broadcast */
162 #define BROADCAST_JOB_GROUP (&afb_evt_event_x2_itf)
163 #define PUSH_JOB_GROUP (&afb_evt_event_x2_itf)
165 /* head of the list of listeners */
166 static pthread_rwlock_t listeners_rwlock = PTHREAD_RWLOCK_INITIALIZER;
167 static struct afb_evt_listener *listeners = NULL;
169 /* handling id of events */
170 static pthread_rwlock_t events_rwlock = PTHREAD_RWLOCK_INITIALIZER;
171 static struct afb_evtid *evtids = NULL;
172 static int event_id_counter = 0;
173 static int event_id_wrapped = 0;
176 * Create structure for job of broadcasting string 'event' with 'object'
177 * Returns the created structure or NULL if out of memory
179 static struct job_broadcast *make_job_broadcast(const char *event, struct json_object *object)
181 size_t sz = 1 + strlen(event);
182 struct job_broadcast *jb = malloc(sz + sizeof *jb);
185 memcpy(jb->event, event, sz);
191 * Destroy structure 'jb' for job of broadcasting string events
193 static void destroy_job_broadcast(struct job_broadcast *jb)
195 json_object_put(jb->object);
200 * Create structure for job of broadcasting or pushing 'evtid' with 'object'
201 * Returns the created structure or NULL if out of memory
203 static struct job_evtid *make_job_evtid(struct afb_evtid *evtid, struct json_object *object)
205 struct job_evtid *je = malloc(sizeof *je);
207 je->evtid = afb_evt_evtid_addref(evtid);
214 * Destroy structure for job of broadcasting or pushing evtid
216 static void destroy_job_evtid(struct job_evtid *je)
218 afb_evt_evtid_unref(je->evtid);
219 json_object_put(je->object);
224 * Broadcasts the 'event' of 'id' with its 'object'
226 static void broadcast(const char *event, struct json_object *object)
228 struct afb_evt_listener *listener;
230 pthread_rwlock_rdlock(&listeners_rwlock);
231 listener = listeners;
233 if (listener->itf->broadcast != NULL)
234 listener->itf->broadcast(listener->closure, event, json_object_get(object));
235 listener = listener->next;
237 pthread_rwlock_unlock(&listeners_rwlock);
241 * Jobs callback for broadcasting string asynchronously
243 static void broadcast_job(int signum, void *closure)
245 struct job_broadcast *jb = closure;
248 broadcast(jb->event, jb->object);
249 destroy_job_broadcast(jb);
253 * Broadcasts the string 'event' with its 'object'
255 static int unhooked_broadcast(const char *event, struct json_object *object)
257 struct job_broadcast *jb;
260 jb = make_job_broadcast(event, object);
262 ERROR("Cant't create broadcast string job item for %s(%s)",
263 event, json_object_to_json_string(object));
264 json_object_put(object);
268 rc = jobs_queue(BROADCAST_JOB_GROUP, 0, broadcast_job, jb);
270 ERROR("cant't queue broadcast string job item for %s(%s)",
271 event, json_object_to_json_string(object));
272 destroy_job_broadcast(jb);
278 * Broadcasts the event 'evtid' with its 'object'
279 * 'object' is released (like json_object_put)
280 * Returns the count of listener that received the event.
282 int afb_evt_evtid_broadcast(struct afb_evtid *evtid, struct json_object *object)
284 return unhooked_broadcast(evtid->fullname, object);
289 * Broadcasts the event 'evtid' with its 'object'
290 * 'object' is released (like json_object_put)
291 * Returns the count of listener that received the event.
293 int afb_evt_evtid_hooked_broadcast(struct afb_evtid *evtid, struct json_object *object)
297 json_object_get(object);
299 if (evtid->hookflags & afb_hook_flag_evt_broadcast_before)
300 afb_hook_evt_broadcast_before(evtid->fullname, evtid->id, object);
302 result = afb_evt_evtid_broadcast(evtid, object);
304 if (evtid->hookflags & afb_hook_flag_evt_broadcast_after)
305 afb_hook_evt_broadcast_after(evtid->fullname, evtid->id, object, result);
307 json_object_put(object);
314 * Broadcasts the 'event' with its 'object'
315 * 'object' is released (like json_object_put)
316 * Returns the count of listener having receive the event.
318 int afb_evt_broadcast(const char *event, struct json_object *object)
323 json_object_get(object);
324 afb_hook_evt_broadcast_before(event, 0, object);
327 result = unhooked_broadcast(event, object);
330 afb_hook_evt_broadcast_after(event, 0, object, result);
331 json_object_put(object);
337 * Pushes the event 'evtid' with 'obj' to its listeners
338 * Returns the count of listener that received the event.
340 static void push_evtid(struct afb_evtid *evtid, struct json_object *object)
343 struct afb_evt_watch *watch;
344 struct afb_evt_listener *listener;
347 pthread_rwlock_rdlock(&evtid->rwlock);
348 watch = evtid->watchs;
350 listener = watch->listener;
351 assert(listener->itf->push != NULL);
352 if (watch->activity != 0) {
353 listener->itf->push(listener->closure, evtid->fullname, evtid->id, json_object_get(object));
356 watch = watch->next_by_evtid;
358 evtid->has_client = has_client;
359 pthread_rwlock_unlock(&evtid->rwlock);
363 * Jobs callback for pushing evtid asynchronously
365 static void push_job_evtid(int signum, void *closure)
367 struct job_evtid *je = closure;
370 push_evtid(je->evtid, je->object);
371 destroy_job_evtid(je);
375 * Pushes the event 'evtid' with 'obj' to its listeners
376 * 'obj' is released (like json_object_put)
377 * Returns 1 if at least one listener exists or 0 if no listener exists or
378 * -1 in case of error and the event can't be delivered
380 int afb_evt_evtid_push(struct afb_evtid *evtid, struct json_object *object)
382 struct job_evtid *je;
385 je = make_job_evtid(evtid, object);
387 ERROR("Cant't create push evtid job item for %s(%s)",
388 evtid->fullname, json_object_to_json_string(object));
389 json_object_put(object);
393 rc = jobs_queue(PUSH_JOB_GROUP, 0, push_job_evtid, je);
395 rc = evtid->has_client;
397 ERROR("cant't queue push evtid job item for %s(%s)",
398 evtid->fullname, json_object_to_json_string(object));
399 destroy_job_evtid(je);
407 * Pushes the event 'evtid' with 'obj' to its listeners
408 * 'obj' is released (like json_object_put)
409 * Emits calls to hooks.
410 * Returns the count of listener taht received the event.
412 int afb_evt_evtid_hooked_push(struct afb_evtid *evtid, struct json_object *obj)
417 /* lease the object */
418 json_object_get(obj);
420 /* hook before push */
421 if (evtid->hookflags & afb_hook_flag_evt_push_before)
422 afb_hook_evt_push_before(evtid->fullname, evtid->id, obj);
425 result = afb_evt_evtid_push(evtid, obj);
427 /* hook after push */
428 if (evtid->hookflags & afb_hook_flag_evt_push_after)
429 afb_hook_evt_push_after(evtid->fullname, evtid->id, obj, result);
431 /* release the object */
432 json_object_put(obj);
440 static void remove_watch(struct afb_evt_watch *watch)
442 struct afb_evt_watch **prv;
443 struct afb_evtid *evtid;
444 struct afb_evt_listener *listener;
446 /* notify listener if needed */
447 evtid = watch->evtid;
448 listener = watch->listener;
449 if (watch->activity != 0 && listener->itf->remove != NULL)
450 listener->itf->remove(listener->closure, evtid->fullname, evtid->id);
452 /* unlink the watch for its event */
453 prv = &evtid->watchs;
455 prv = &(*prv)->next_by_evtid;
456 *prv = watch->next_by_evtid;
458 /* unlink the watch for its listener */
459 prv = &listener->watchs;
461 prv = &(*prv)->next_by_listener;
462 *prv = watch->next_by_listener;
469 * Creates an event of name 'fullname' and returns it or NULL on error.
471 struct afb_evtid *afb_evt_evtid_create(const char *fullname)
474 struct afb_evtid *evtid, *oevt;
476 /* allocates the event */
477 len = strlen(fullname);
478 evtid = malloc(len + 1 + sizeof * evtid);
482 /* allocates the id */
483 pthread_rwlock_wrlock(&events_rwlock);
485 if (++event_id_counter < 0) {
486 event_id_wrapped = 1;
487 event_id_counter = 1024; /* heuristic: small numbers are not destroyed */
489 if (!event_id_wrapped)
492 while(oevt != NULL && oevt->id != event_id_counter)
494 } while (oevt != NULL);
496 /* initialize the event */
497 memcpy(evtid->fullname, fullname, len + 1);
498 evtid->next = evtids;
500 evtid->watchs = NULL;
501 evtid->id = event_id_counter;
502 evtid->has_client = 0;
503 pthread_rwlock_init(&evtid->rwlock, NULL);
506 evtid->hookflags = afb_hook_flags_evt(evtid->fullname);
507 evtid->eventid.itf = evtid->hookflags ? &afb_evt_hooked_event_x2_itf : &afb_evt_event_x2_itf;
508 if (evtid->hookflags & afb_hook_flag_evt_create)
509 afb_hook_evt_create(evtid->fullname, evtid->id);
511 evtid->eventid.itf = &afb_evt_event_x2_itf;
513 pthread_rwlock_unlock(&events_rwlock);
515 /* returns the event */
522 * Creates an event of name 'prefix'/'name' and returns it or NULL on error.
524 struct afb_evtid *afb_evt_evtid_create2(const char *prefix, const char *name)
526 size_t prelen, postlen;
529 /* makes the event fullname */
530 prelen = strlen(prefix);
531 postlen = strlen(name);
532 fullname = alloca(prelen + postlen + 2);
533 memcpy(fullname, prefix, prelen);
534 fullname[prelen] = '/';
535 memcpy(fullname + prelen + 1, name, postlen + 1);
537 /* create the event */
538 return afb_evt_evtid_create(fullname);
542 * increment the reference count of the event 'evtid'
544 struct afb_evtid *afb_evt_evtid_addref(struct afb_evtid *evtid)
546 __atomic_add_fetch(&evtid->refcount, 1, __ATOMIC_RELAXED);
552 * increment the reference count of the event 'evtid'
554 struct afb_evtid *afb_evt_evtid_hooked_addref(struct afb_evtid *evtid)
556 if (evtid->hookflags & afb_hook_flag_evt_addref)
557 afb_hook_evt_addref(evtid->fullname, evtid->id);
558 return afb_evt_evtid_addref(evtid);
563 * decrement the reference count of the event 'evtid'
564 * and destroy it when the count reachs zero
566 void afb_evt_evtid_unref(struct afb_evtid *evtid)
569 struct afb_evtid **prv;
570 struct afb_evt_listener *listener;
572 if (!__atomic_sub_fetch(&evtid->refcount, 1, __ATOMIC_RELAXED)) {
573 /* unlinks the event if valid! */
574 pthread_rwlock_wrlock(&events_rwlock);
577 while (*prv && !(found = (*prv == evtid)))
581 pthread_rwlock_unlock(&events_rwlock);
583 /* destroys the event */
585 ERROR("event not found");
587 /* removes all watchers */
588 while(evtid->watchs != NULL) {
589 listener = evtid->watchs->listener;
590 pthread_rwlock_wrlock(&listener->rwlock);
591 pthread_rwlock_wrlock(&evtid->rwlock);
592 remove_watch(evtid->watchs);
593 pthread_rwlock_unlock(&evtid->rwlock);
594 pthread_rwlock_unlock(&listener->rwlock);
598 pthread_rwlock_destroy(&evtid->rwlock);
606 * decrement the reference count of the event 'evtid'
607 * and destroy it when the count reachs zero
609 void afb_evt_evtid_hooked_unref(struct afb_evtid *evtid)
611 if (evtid->hookflags & afb_hook_flag_evt_unref)
612 afb_hook_evt_unref(evtid->fullname, evtid->id);
613 afb_evt_evtid_unref(evtid);
618 * Returns the true name of the 'event'
620 const char *afb_evt_evtid_fullname(struct afb_evtid *evtid)
622 return evtid->fullname;
626 * Returns the name of the 'event'
628 const char *afb_evt_evtid_name(struct afb_evtid *evtid)
630 const char *name = strchr(evtid->fullname, '/');
631 return name ? name + 1 : evtid->fullname;
636 * Returns the name associated to the event 'evtid'.
638 const char *afb_evt_evtid_hooked_name(struct afb_evtid *evtid)
640 const char *result = afb_evt_evtid_name(evtid);
641 if (evtid->hookflags & afb_hook_flag_evt_name)
642 afb_hook_evt_name(evtid->fullname, evtid->id, result);
648 * Returns the id of the 'event'
650 int afb_evt_evtid_id(struct afb_evtid *evtid)
656 * Returns an instance of the listener defined by the 'send' callback
658 * Returns NULL in case of memory depletion.
660 struct afb_evt_listener *afb_evt_listener_create(const struct afb_evt_itf *itf, void *closure)
662 struct afb_evt_listener *listener;
664 /* search if an instance already exists */
665 pthread_rwlock_wrlock(&listeners_rwlock);
666 listener = listeners;
667 while (listener != NULL) {
668 if (listener->itf == itf && listener->closure == closure) {
669 listener = afb_evt_listener_addref(listener);
672 listener = listener->next;
676 listener = calloc(1, sizeof *listener);
677 if (listener != NULL) {
680 listener->closure = closure;
681 listener->watchs = NULL;
682 listener->refcount = 1;
683 pthread_rwlock_init(&listener->rwlock, NULL);
684 listener->next = listeners;
685 listeners = listener;
688 pthread_rwlock_unlock(&listeners_rwlock);
693 * Increases the reference count of 'listener' and returns it
695 struct afb_evt_listener *afb_evt_listener_addref(struct afb_evt_listener *listener)
697 __atomic_add_fetch(&listener->refcount, 1, __ATOMIC_RELAXED);
702 * Decreases the reference count of the 'listener' and destroys it
705 void afb_evt_listener_unref(struct afb_evt_listener *listener)
707 struct afb_evt_listener **prv;
708 struct afb_evtid *evtid;
710 if (listener && !__atomic_sub_fetch(&listener->refcount, 1, __ATOMIC_RELAXED)) {
712 /* unlink the listener */
713 pthread_rwlock_wrlock(&listeners_rwlock);
715 while (*prv != listener)
717 *prv = listener->next;
718 pthread_rwlock_unlock(&listeners_rwlock);
720 /* remove the watchers */
721 pthread_rwlock_wrlock(&listener->rwlock);
722 while (listener->watchs != NULL) {
723 evtid = listener->watchs->evtid;
724 pthread_rwlock_wrlock(&evtid->rwlock);
725 remove_watch(listener->watchs);
726 pthread_rwlock_unlock(&evtid->rwlock);
728 pthread_rwlock_unlock(&listener->rwlock);
730 /* free the listener */
731 pthread_rwlock_destroy(&listener->rwlock);
737 * Makes the 'listener' watching 'evtid'
738 * Returns 0 in case of success or else -1.
740 int afb_evt_watch_add_evtid(struct afb_evt_listener *listener, struct afb_evtid *evtid)
742 struct afb_evt_watch *watch;
744 /* check parameter */
745 if (listener->itf->push == NULL) {
750 /* search the existing watch for the listener */
751 pthread_rwlock_wrlock(&listener->rwlock);
752 watch = listener->watchs;
753 while(watch != NULL) {
754 if (watch->evtid == evtid)
756 watch = watch->next_by_listener;
759 /* not found, allocate a new */
760 watch = malloc(sizeof *watch);
762 pthread_rwlock_unlock(&listener->rwlock);
767 /* initialise and link */
768 watch->evtid = evtid;
770 watch->listener = listener;
771 watch->next_by_listener = listener->watchs;
772 listener->watchs = watch;
773 pthread_rwlock_wrlock(&evtid->rwlock);
774 watch->next_by_evtid = evtid->watchs;
775 evtid->watchs = watch;
776 pthread_rwlock_unlock(&evtid->rwlock);
779 if (watch->activity == 0 && listener->itf->add != NULL)
780 listener->itf->add(listener->closure, evtid->fullname, evtid->id);
782 evtid->has_client = 1;
783 pthread_rwlock_unlock(&listener->rwlock);
789 * Avoids the 'listener' to watch 'evtid'
790 * Returns 0 in case of success or else -1.
792 int afb_evt_watch_sub_evtid(struct afb_evt_listener *listener, struct afb_evtid *evtid)
794 struct afb_evt_watch *watch;
796 /* search the existing watch */
797 pthread_rwlock_wrlock(&listener->rwlock);
798 watch = listener->watchs;
799 while(watch != NULL) {
800 if (watch->evtid == evtid) {
801 if (watch->activity != 0) {
803 if (watch->activity == 0 && listener->itf->remove != NULL)
804 listener->itf->remove(listener->closure, evtid->fullname, evtid->id);
806 pthread_rwlock_unlock(&listener->rwlock);
809 watch = watch->next_by_listener;
811 pthread_rwlock_unlock(&listener->rwlock);
818 * update the hooks for events
820 void afb_evt_update_hooks()
822 struct afb_evtid *evtid;
824 pthread_rwlock_rdlock(&events_rwlock);
825 for (evtid = evtids ; evtid ; evtid = evtid->next) {
826 evtid->hookflags = afb_hook_flags_evt(evtid->fullname);
827 evtid->eventid.itf = evtid->hookflags ? &afb_evt_hooked_event_x2_itf : &afb_evt_event_x2_itf;
829 pthread_rwlock_unlock(&events_rwlock);
833 inline struct afb_evtid *afb_evt_event_x2_to_evtid(struct afb_event_x2 *eventid)
835 return (struct afb_evtid*)eventid;
838 inline struct afb_event_x2 *afb_evt_event_x2_from_evtid(struct afb_evtid *evtid)
840 return &evtid->eventid;
844 * Creates an event of 'fullname' and returns it.
845 * Returns an event with closure==NULL in case of error.
847 struct afb_event_x2 *afb_evt_event_x2_create(const char *fullname)
849 return afb_evt_event_x2_from_evtid(afb_evt_evtid_create(fullname));
853 * Creates an event of name 'prefix'/'name' and returns it.
854 * Returns an event with closure==NULL in case of error.
856 struct afb_event_x2 *afb_evt_event_x2_create2(const char *prefix, const char *name)
858 return afb_evt_event_x2_from_evtid(afb_evt_evtid_create2(prefix, name));
862 * Returns the fullname of the 'eventid'
864 const char *afb_evt_event_x2_fullname(struct afb_event_x2 *eventid)
866 struct afb_evtid *evtid = afb_evt_event_x2_to_evtid(eventid);
867 return evtid ? evtid->fullname : NULL;
871 * Returns the id of the 'eventid'
873 int afb_evt_event_x2_id(struct afb_event_x2 *eventid)
875 struct afb_evtid *evtid = afb_evt_event_x2_to_evtid(eventid);
876 return evtid ? evtid->id : 0;
880 * Makes the 'listener' watching 'eventid'
881 * Returns 0 in case of success or else -1.
883 int afb_evt_event_x2_add_watch(struct afb_evt_listener *listener, struct afb_event_x2 *eventid)
885 struct afb_evtid *evtid = afb_evt_event_x2_to_evtid(eventid);
887 /* check parameter */
893 /* search the existing watch for the listener */
894 return afb_evt_watch_add_evtid(listener, evtid);
898 * Avoids the 'listener' to watch 'eventid'
899 * Returns 0 in case of success or else -1.
901 int afb_evt_event_x2_remove_watch(struct afb_evt_listener *listener, struct afb_event_x2 *eventid)
903 struct afb_evtid *evtid = afb_evt_event_x2_to_evtid(eventid);
905 /* check parameter */
911 /* search the existing watch */
912 return afb_evt_watch_sub_evtid(listener, evtid);
915 int afb_evt_event_x2_push(struct afb_event_x2 *eventid, struct json_object *object)
918 struct afb_evtid *evtid = afb_evt_event_x2_to_evtid(eventid);
920 return afb_evt_evtid_hooked_push(evtid, object);
921 json_object_put(object);
925 __attribute__((alias("afb_evt_event_x2_unhooked_push")));
928 int afb_evt_event_x2_unhooked_push(struct afb_event_x2 *eventid, struct json_object *object)
930 struct afb_evtid *evtid = afb_evt_event_x2_to_evtid(eventid);
932 return afb_evt_evtid_push(evtid, object);
933 json_object_put(object);
937 #if WITH_LEGACY_BINDING_V1 || WITH_LEGACY_BINDING_V2
938 struct afb_event_x1 afb_evt_event_from_evtid(struct afb_evtid *evtid)
942 ? (struct afb_event_x1){ .itf = &afb_evt_hooked_event_x2_itf, .closure = &evtid->eventid }
944 ? (struct afb_event_x1){ .itf = &afb_evt_event_x2_itf, .closure = &evtid->eventid }
946 : (struct afb_event_x1){ .itf = NULL, .closure = NULL };
950 void afb_evt_event_x2_unref(struct afb_event_x2 *eventid)
952 struct afb_evtid *evtid = afb_evt_event_x2_to_evtid(eventid);
954 afb_evt_evtid_unref(evtid);
957 struct afb_event_x2 *afb_evt_event_x2_addref(struct afb_event_x2 *eventid)
959 struct afb_evtid *evtid = afb_evt_event_x2_to_evtid(eventid);
961 afb_evt_evtid_addref(evtid);