X-Git-Url: https://gerrit.automotivelinux.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fafb-evt.c;h=1c81fd53bee206ff6ef9c9218325185d72668a47;hb=170aef20bc3a59d5139c2eff8794d9ba4c83a2e5;hp=a99dc89408b8630797f1021227875af5d6fbb737;hpb=31f85d741a3f35f853114e0bc299213f4456fcbc;p=src%2Fapp-framework-binder.git diff --git a/src/afb-evt.c b/src/afb-evt.c index a99dc894..1c81fd53 100644 --- a/src/afb-evt.c +++ b/src/afb-evt.c @@ -1,5 +1,5 @@ /* - * Copyright (C) 2015, 2016 "IoT.bzh" + * Copyright (C) 2015, 2016, 2017 "IoT.bzh" * Author "Fulup Ar Foll" * Author José Bollo * @@ -22,11 +22,14 @@ #include #include #include +#include #include #include #include "afb-evt.h" +#include "afb-hook.h" +#include "verbose.h" struct afb_evt_watch; @@ -38,8 +41,8 @@ struct afb_evt_listener { /* chaining listeners */ struct afb_evt_listener *next; - /* callback on event */ - void (*send)(void *closure, const char *event, struct json_object *object); + /* interface for callbacks */ + const struct afb_evt_itf *itf; /* closure for the callback */ void *closure; @@ -47,6 +50,9 @@ struct afb_evt_listener { /* head of the list of events listened */ struct afb_evt_watch *watchs; + /* mutex of the listener */ + pthread_mutex_t mutex; + /* count of reference to the listener */ int refcount; }; @@ -56,9 +62,21 @@ struct afb_evt_listener { */ struct afb_evt_event { + /* next event */ + struct afb_evt_event *next; + /* head of the list of listeners watching the event */ struct afb_evt_watch *watchs; + /* id of the event */ + int id; + + /* hooking */ + int hookflags; + + /* mutex of the event */ + pthread_mutex_t mutex; + /* name of the event */ char name[1]; }; @@ -79,92 +97,163 @@ struct afb_evt_watch { /* link to the next event for the same listener */ struct afb_evt_watch *next_by_listener; + + /* activity */ + unsigned activity; }; /* declare functions */ static int evt_broadcast(struct afb_evt_event *evt, struct json_object *obj); static int evt_push(struct afb_evt_event *evt, struct json_object *obj); static void evt_destroy(struct afb_evt_event *evt); +static const char *evt_name(struct afb_evt_event *evt); /* the interface for events */ static struct afb_event_itf afb_evt_event_itf = { .broadcast = (void*)evt_broadcast, .push = (void*)evt_push, - .drop = (void*)evt_destroy + .drop = (void*)evt_destroy, + .name = (void*)evt_name }; /* head of the list of listeners */ +static pthread_mutex_t listeners_mutex = PTHREAD_MUTEX_INITIALIZER; static struct afb_evt_listener *listeners = NULL; -/* - * Broadcasts the event 'evt' with its 'object' - * 'object' is released (like json_object_put) - * Returns the count of listener that received the event. - */ -static int evt_broadcast(struct afb_evt_event *evt, struct json_object *object) -{ - return afb_evt_broadcast(evt->name, object); -} +/* handling id of events */ +static pthread_mutex_t events_mutex = PTHREAD_MUTEX_INITIALIZER; +static struct afb_evt_event *events = NULL; +static int event_id_counter = 0; +static int event_id_wrapped = 0; /* - * Broadcasts the 'event' with its 'object' - * 'object' is released (like json_object_put) + * Broadcasts the 'event' of 'id' with its 'obj' + * 'obj' is released (like json_object_put) + * calls hooks if hookflags isn't 0 * Returns the count of listener having receive the event. */ -int afb_evt_broadcast(const char *event, struct json_object *object) +static int broadcast(const char *event, struct json_object *obj, int id, int hookflags) { int result; struct afb_evt_listener *listener; + if (hookflags & afb_hook_flag_evt_broadcast_before) + afb_hook_evt_broadcast_before(event, id, obj); result = 0; + pthread_mutex_lock(&listeners_mutex); listener = listeners; while(listener) { - listener->send(listener->closure, event, json_object_get(object)); + if (listener->itf->broadcast != NULL) { + listener->itf->broadcast(listener->closure, event, id, json_object_get(obj)); + result++; + } listener = listener->next; - result++; } - json_object_put(object); + pthread_mutex_unlock(&listeners_mutex); + if (hookflags & afb_hook_flag_evt_broadcast_after) + afb_hook_evt_broadcast_after(event, id, obj, result); + json_object_put(obj); return result; } /* * Broadcasts the event 'evt' with its 'object' * 'object' is released (like json_object_put) + * Returns the count of listener that received the event. + */ +static int evt_broadcast(struct afb_evt_event *evt, struct json_object *object) +{ + return broadcast(evt->name, object, evt->id, evt->hookflags); +} + +/* + * Broadcasts the 'event' with its 'object' + * 'object' is released (like json_object_put) + * Returns the count of listener having receive the event. + */ +int afb_evt_broadcast(const char *event, struct json_object *object) +{ + return broadcast(event, object, 0, -1); +} + +/* + * Pushes the event 'evt' with 'obj' to its listeners + * 'obj' is released (like json_object_put) + * calls hooks if hookflags isn't 0 * Returns the count of listener taht received the event. */ -static int evt_push(struct afb_evt_event *evt, struct json_object *obj) +static int push(struct afb_evt_event *evt, struct json_object *obj, int hookflags) { int result; struct afb_evt_watch *watch; struct afb_evt_listener *listener; result = 0; + pthread_mutex_lock(&evt->mutex); + if (hookflags & afb_hook_flag_evt_push_before) + afb_hook_evt_push_before(evt->name, evt->id, obj); watch = evt->watchs; - while(listener) { + while(watch) { listener = watch->listener; - listener->send(listener->closure, evt->name, json_object_get(obj)); + assert(listener->itf->push != NULL); + if (watch->activity != 0) { + listener->itf->push(listener->closure, evt->name, evt->id, json_object_get(obj)); + result++; + } watch = watch->next_by_event; - result++; } + if (hookflags & afb_hook_flag_evt_push_after) + afb_hook_evt_push_after(evt->name, evt->id, obj, result); + pthread_mutex_unlock(&evt->mutex); json_object_put(obj); return result; } +/* + * Pushes the event 'evt' with 'obj' to its listeners + * 'obj' is released (like json_object_put) + * Returns the count of listener taht received the event. + */ +static int evt_push(struct afb_evt_event *evt, struct json_object *obj) +{ + return push(evt, obj, evt->hookflags); +} + +/* + * Returns the name associated to the event 'evt'. + */ +static const char *evt_name(struct afb_evt_event *evt) +{ + const char *name = strchr(evt->name, '/'); + name = name ? name + 1 : evt->name; + if (evt->hookflags & afb_hook_flag_evt_name) + afb_hook_evt_name(evt->name, evt->id); + return name; +} + /* * remove the 'watch' */ static void remove_watch(struct afb_evt_watch *watch) { struct afb_evt_watch **prv; + struct afb_evt_event *evt; + struct afb_evt_listener *listener; + + /* notify listener if needed */ + evt = watch->event; + listener = watch->listener; + if (watch->activity != 0 && listener->itf->remove != NULL) + listener->itf->remove(listener->closure, evt->name, evt->id); /* unlink the watch for its event */ - prv = &watch->event->watchs; + prv = &evt->watchs; while(*prv != watch) prv = &(*prv)->next_by_event; *prv = watch->next_by_event; /* unlink the watch for its listener */ - prv = &watch->listener->watchs; + prv = &listener->watchs; while(*prv != watch) prv = &(*prv)->next_by_listener; *prv = watch->next_by_listener; @@ -178,11 +267,41 @@ static void remove_watch(struct afb_evt_watch *watch) */ static void evt_destroy(struct afb_evt_event *evt) { + int found; + struct afb_evt_event **prv; + struct afb_evt_listener *listener; + if (evt != NULL) { - /* removes all watchers */ - while(evt->watchs != NULL) - remove_watch(evt->watchs); - free(evt); + /* unlinks the event if valid! */ + pthread_mutex_lock(&events_mutex); + found = 0; + prv = &events; + while (*prv && !(found = (*prv == evt))) + prv = &(*prv)->next; + if (found) + *prv = evt->next; + pthread_mutex_unlock(&events_mutex); + + /* destroys the event */ + if (found) { + /* removes all watchers */ + while(evt->watchs != NULL) { + listener = evt->watchs->listener; + pthread_mutex_lock(&listener->mutex); + pthread_mutex_lock(&evt->mutex); + remove_watch(evt->watchs); + pthread_mutex_unlock(&evt->mutex); + pthread_mutex_unlock(&listener->mutex); + } + + /* hook */ + if (evt->hookflags & afb_hook_flag_evt_drop) + afb_hook_evt_drop(evt->name, evt->id); + + /* free */ + pthread_mutex_destroy(&evt->mutex); + free(evt); + } } } @@ -193,15 +312,60 @@ static void evt_destroy(struct afb_evt_event *evt) struct afb_event afb_evt_create_event(const char *name) { size_t len; - struct afb_evt_event *evt; + struct afb_evt_event *evt, *oevt; + /* allocates the event */ len = strlen(name); evt = malloc(len + sizeof * evt); - if (evt != NULL) { - evt->watchs = NULL; - memcpy(evt->name, name, len + 1); - } + if (evt == NULL) + goto error; + + /* allocates the id */ + pthread_mutex_lock(&events_mutex); + do { + if (++event_id_counter < 0) { + event_id_wrapped = 1; + event_id_counter = 1024; /* heuristic: small numbers are not destroyed */ + } + if (!event_id_wrapped) + break; + oevt = events; + while(oevt != NULL && oevt->id != event_id_counter) + oevt = oevt->next; + } while (oevt != NULL); + + /* initialize the event */ + memcpy(evt->name, name, len + 1); + evt->next = events; + evt->watchs = NULL; + evt->id = event_id_counter; + pthread_mutex_init(&evt->mutex, NULL); + events = evt; + evt->hookflags = afb_hook_flags_evt(evt->name); + if (evt->hookflags & afb_hook_flag_evt_create) + afb_hook_evt_create(evt->name, evt->id); + pthread_mutex_unlock(&events_mutex); + + /* returns the event */ return (struct afb_event){ .itf = &afb_evt_event_itf, .closure = evt }; +error: + return (struct afb_event){ .itf = NULL, .closure = NULL }; +} + +/* + * Returns the name of the 'event' + */ +const char *afb_evt_event_name(struct afb_event event) +{ + return (event.itf != &afb_evt_event_itf) ? NULL : ((struct afb_evt_event *)event.closure)->name; +} + +/* + * Returns the id of the 'event' + */ +int afb_evt_event_id(struct afb_event event) +{ + return (event.itf != &afb_evt_event_itf) ? 0 : ((struct afb_evt_event *)event.closure)->id; } /* @@ -209,15 +373,18 @@ struct afb_event afb_evt_create_event(const char *name) * and the 'closure'. * Returns NULL in case of memory depletion. */ -struct afb_evt_listener *afb_evt_listener_create(void (*send)(void *closure, const char *event, struct json_object *object), void *closure) +struct afb_evt_listener *afb_evt_listener_create(const struct afb_evt_itf *itf, void *closure) { struct afb_evt_listener *listener; /* search if an instance already exists */ + pthread_mutex_lock(&listeners_mutex); listener = listeners; while (listener != NULL) { - if (listener->send == send && listener->closure == closure) - return afb_evt_listener_addref(listener); + if (listener->itf == itf && listener->closure == closure) { + listener = afb_evt_listener_addref(listener); + goto found; + } listener = listener->next; } @@ -225,13 +392,16 @@ struct afb_evt_listener *afb_evt_listener_create(void (*send)(void *closure, con listener = calloc(1, sizeof *listener); if (listener != NULL) { /* init */ - listener->next = listeners; - listener->send = send; + listener->itf = itf; listener->closure = closure; listener->watchs = NULL; listener->refcount = 1; + pthread_mutex_init(&listener->mutex, NULL); + listener->next = listeners; listeners = listener; } + found: + pthread_mutex_unlock(&listeners_mutex); return listener; } @@ -240,7 +410,7 @@ struct afb_evt_listener *afb_evt_listener_create(void (*send)(void *closure, con */ struct afb_evt_listener *afb_evt_listener_addref(struct afb_evt_listener *listener) { - listener->refcount++; + __atomic_add_fetch(&listener->refcount, 1, __ATOMIC_RELAXED); return listener; } @@ -250,20 +420,31 @@ struct afb_evt_listener *afb_evt_listener_addref(struct afb_evt_listener *listen */ void afb_evt_listener_unref(struct afb_evt_listener *listener) { - if (0 == --listener->refcount) { - struct afb_evt_listener **prv; + struct afb_evt_listener **prv; + struct afb_evt_event *evt; - /* remove the watchers */ - while (listener->watchs != NULL) - remove_watch(listener->watchs); + if (!__atomic_sub_fetch(&listener->refcount, 1, __ATOMIC_RELAXED)) { /* unlink the listener */ + pthread_mutex_lock(&listeners_mutex); prv = &listeners; while (*prv != listener) prv = &(*prv)->next; *prv = listener->next; + pthread_mutex_unlock(&listeners_mutex); + + /* remove the watchers */ + pthread_mutex_lock(&listener->mutex); + while (listener->watchs != NULL) { + evt = listener->watchs->event; + pthread_mutex_lock(&evt->mutex); + remove_watch(listener->watchs); + pthread_mutex_unlock(&evt->mutex); + } + pthread_mutex_unlock(&listener->mutex); /* free the listener */ + pthread_mutex_destroy(&listener->mutex); free(listener); } } @@ -278,35 +459,46 @@ int afb_evt_add_watch(struct afb_evt_listener *listener, struct afb_event event) struct afb_evt_event *evt; /* check parameter */ - if (event.itf != &afb_evt_event_itf) { + if (event.itf != &afb_evt_event_itf || listener->itf->push == NULL) { errno = EINVAL; return -1; } - /* search the existing watch */ + /* search the existing watch for the listener */ + evt = event.closure; + pthread_mutex_lock(&listener->mutex); watch = listener->watchs; while(watch != NULL) { - if (watch->event == event.closure) - return 0; + if (watch->event == evt) + goto found; watch = watch->next_by_listener; } /* not found, allocate a new */ watch = malloc(sizeof *watch); if (watch == NULL) { + pthread_mutex_unlock(&listener->mutex); errno = ENOMEM; return -1; } /* initialise and link */ - evt = event.closure; watch->event = evt; - watch->next_by_event = evt->watchs; + watch->activity = 0; watch->listener = listener; watch->next_by_listener = listener->watchs; - evt->watchs = watch; listener->watchs = watch; - + pthread_mutex_lock(&evt->mutex); + watch->next_by_event = evt->watchs; + evt->watchs = watch; + pthread_mutex_unlock(&evt->mutex); + +found: + if (watch->activity == 0 && listener->itf->add != NULL) + listener->itf->add(listener->closure, evt->name, evt->id); + watch->activity++; + pthread_mutex_unlock(&listener->mutex); + return 0; } @@ -317,6 +509,7 @@ int afb_evt_add_watch(struct afb_evt_listener *listener, struct afb_event event) int afb_evt_remove_watch(struct afb_evt_listener *listener, struct afb_event event) { struct afb_evt_watch *watch; + struct afb_evt_event *evt; /* check parameter */ if (event.itf != &afb_evt_event_itf) { @@ -325,16 +518,52 @@ int afb_evt_remove_watch(struct afb_evt_listener *listener, struct afb_event eve } /* search the existing watch */ + evt = event.closure; + pthread_mutex_lock(&listener->mutex); watch = listener->watchs; while(watch != NULL) { - if (watch->event == event.closure) { - /* found: remove it */ - remove_watch(watch); - break; + if (watch->event == evt) { + if (watch->activity != 0) { + watch->activity--; + if (watch->activity == 0 && listener->itf->remove != NULL) + listener->itf->remove(listener->closure, evt->name, evt->id); + } + pthread_mutex_unlock(&listener->mutex); + return 0; } watch = watch->next_by_listener; } + pthread_mutex_unlock(&listener->mutex); + errno = ENOENT; + return -1; +} + +/* + * update the hooks for events + */ +void afb_evt_update_hooks() +{ + struct afb_evt_event *evt; + + pthread_mutex_lock(&events_mutex); + for (evt = events ; evt ; evt = evt->next) + evt->hookflags = afb_hook_flags_evt(evt->name); + pthread_mutex_unlock(&events_mutex); +} + +int afb_evt_push(struct afb_event event, struct json_object *object) +{ + if (event.itf == &afb_evt_event_itf) + return evt_push((struct afb_evt_event *)event.closure, object); + json_object_put(object); return 0; } +int afb_evt_unhooked_push(struct afb_event event, struct json_object *object) +{ + if (event.itf == &afb_evt_event_itf) + return push((struct afb_evt_event *)event.closure, object, 0); + json_object_put(object); + return 0; +}