/*
- * Copyright (C) 2015, 2016 "IoT.bzh"
+ * Copyright (C) 2015, 2016, 2017 "IoT.bzh"
* Author "Fulup Ar Foll"
* Author José Bollo <jose.bollo@iot.bzh>
*
#include <string.h>
#include <assert.h>
#include <errno.h>
+#include <pthread.h>
#include <json-c/json.h>
#include <afb/afb-event-itf.h>
#include "afb-evt.h"
+#include "afb-hook.h"
+#include "verbose.h"
struct afb_evt_watch;
/* chaining listeners */
struct afb_evt_listener *next;
- /* callback on event */
- void (*send)(void *closure, const char *event, struct json_object *object);
+ /* interface for callbacks */
+ const struct afb_evt_itf *itf;
/* closure for the callback */
void *closure;
/* head of the list of events listened */
struct afb_evt_watch *watchs;
+ /* mutex of the listener */
+ pthread_mutex_t mutex;
+
/* count of reference to the listener */
int refcount;
};
*/
struct afb_evt_event {
+ /* next event */
+ struct afb_evt_event *next;
+
/* head of the list of listeners watching the event */
struct afb_evt_watch *watchs;
+ /* id of the event */
+ int id;
+
+ /* hooking */
+ int hookflags;
+
+ /* mutex of the event */
+ pthread_mutex_t mutex;
+
/* name of the event */
char name[1];
};
/* link to the next event for the same listener */
struct afb_evt_watch *next_by_listener;
+
+ /* activity */
+ unsigned activity;
};
/* declare functions */
static int evt_broadcast(struct afb_evt_event *evt, struct json_object *obj);
static int evt_push(struct afb_evt_event *evt, struct json_object *obj);
static void evt_destroy(struct afb_evt_event *evt);
+static const char *evt_name(struct afb_evt_event *evt);
/* the interface for events */
static struct afb_event_itf afb_evt_event_itf = {
.broadcast = (void*)evt_broadcast,
.push = (void*)evt_push,
- .drop = (void*)evt_destroy
+ .drop = (void*)evt_destroy,
+ .name = (void*)evt_name
};
/* head of the list of listeners */
+static pthread_mutex_t listeners_mutex = PTHREAD_MUTEX_INITIALIZER;
static struct afb_evt_listener *listeners = NULL;
-/*
- * Broadcasts the event 'evt' with its 'object'
- * 'object' is released (like json_object_put)
- * Returns the count of listener that received the event.
- */
-static int evt_broadcast(struct afb_evt_event *evt, struct json_object *object)
-{
- return afb_evt_broadcast(evt->name, object);
-}
+/* handling id of events */
+static pthread_mutex_t events_mutex = PTHREAD_MUTEX_INITIALIZER;
+static struct afb_evt_event *events = NULL;
+static int event_id_counter = 0;
+static int event_id_wrapped = 0;
/*
- * Broadcasts the 'event' with its 'object'
- * 'object' is released (like json_object_put)
+ * Broadcasts the 'event' of 'id' with its 'obj'
+ * 'obj' is released (like json_object_put)
+ * calls hooks if hookflags isn't 0
* Returns the count of listener having receive the event.
*/
-int afb_evt_broadcast(const char *event, struct json_object *object)
+static int broadcast(const char *event, struct json_object *obj, int id, int hookflags)
{
int result;
struct afb_evt_listener *listener;
+ if (hookflags & afb_hook_flag_evt_broadcast_before)
+ afb_hook_evt_broadcast_before(event, id, obj);
result = 0;
+ pthread_mutex_lock(&listeners_mutex);
listener = listeners;
while(listener) {
- listener->send(listener->closure, event, json_object_get(object));
+ if (listener->itf->broadcast != NULL) {
+ listener->itf->broadcast(listener->closure, event, id, json_object_get(obj));
+ result++;
+ }
listener = listener->next;
- result++;
}
- json_object_put(object);
+ pthread_mutex_unlock(&listeners_mutex);
+ if (hookflags & afb_hook_flag_evt_broadcast_after)
+ afb_hook_evt_broadcast_after(event, id, obj, result);
+ json_object_put(obj);
return result;
}
/*
* Broadcasts the event 'evt' with its 'object'
* 'object' is released (like json_object_put)
+ * Returns the count of listener that received the event.
+ */
+static int evt_broadcast(struct afb_evt_event *evt, struct json_object *object)
+{
+ return broadcast(evt->name, object, evt->id, evt->hookflags);
+}
+
+/*
+ * Broadcasts the 'event' with its 'object'
+ * 'object' is released (like json_object_put)
+ * Returns the count of listener having receive the event.
+ */
+int afb_evt_broadcast(const char *event, struct json_object *object)
+{
+ return broadcast(event, object, 0, -1);
+}
+
+/*
+ * Pushes the event 'evt' with 'obj' to its listeners
+ * 'obj' is released (like json_object_put)
+ * calls hooks if hookflags isn't 0
* Returns the count of listener taht received the event.
*/
-static int evt_push(struct afb_evt_event *evt, struct json_object *obj)
+static int push(struct afb_evt_event *evt, struct json_object *obj, int hookflags)
{
int result;
struct afb_evt_watch *watch;
struct afb_evt_listener *listener;
result = 0;
+ pthread_mutex_lock(&evt->mutex);
+ if (hookflags & afb_hook_flag_evt_push_before)
+ afb_hook_evt_push_before(evt->name, evt->id, obj);
watch = evt->watchs;
- while(listener) {
+ while(watch) {
listener = watch->listener;
- listener->send(listener->closure, evt->name, json_object_get(obj));
+ assert(listener->itf->push != NULL);
+ if (watch->activity != 0) {
+ listener->itf->push(listener->closure, evt->name, evt->id, json_object_get(obj));
+ result++;
+ }
watch = watch->next_by_event;
- result++;
}
+ if (hookflags & afb_hook_flag_evt_push_after)
+ afb_hook_evt_push_after(evt->name, evt->id, obj, result);
+ pthread_mutex_unlock(&evt->mutex);
json_object_put(obj);
return result;
}
+/*
+ * Pushes the event 'evt' with 'obj' to its listeners
+ * 'obj' is released (like json_object_put)
+ * Returns the count of listener taht received the event.
+ */
+static int evt_push(struct afb_evt_event *evt, struct json_object *obj)
+{
+ return push(evt, obj, evt->hookflags);
+}
+
+/*
+ * Returns the name associated to the event 'evt'.
+ */
+static const char *evt_name(struct afb_evt_event *evt)
+{
+ const char *name = strchr(evt->name, '/');
+ name = name ? name + 1 : evt->name;
+ if (evt->hookflags & afb_hook_flag_evt_name)
+ afb_hook_evt_name(evt->name, evt->id);
+ return name;
+}
+
/*
* remove the 'watch'
*/
static void remove_watch(struct afb_evt_watch *watch)
{
struct afb_evt_watch **prv;
+ struct afb_evt_event *evt;
+ struct afb_evt_listener *listener;
+
+ /* notify listener if needed */
+ evt = watch->event;
+ listener = watch->listener;
+ if (watch->activity != 0 && listener->itf->remove != NULL)
+ listener->itf->remove(listener->closure, evt->name, evt->id);
/* unlink the watch for its event */
- prv = &watch->event->watchs;
+ prv = &evt->watchs;
while(*prv != watch)
prv = &(*prv)->next_by_event;
*prv = watch->next_by_event;
/* unlink the watch for its listener */
- prv = &watch->listener->watchs;
+ prv = &listener->watchs;
while(*prv != watch)
prv = &(*prv)->next_by_listener;
*prv = watch->next_by_listener;
*/
static void evt_destroy(struct afb_evt_event *evt)
{
+ int found;
+ struct afb_evt_event **prv;
+ struct afb_evt_listener *listener;
+
if (evt != NULL) {
- /* removes all watchers */
- while(evt->watchs != NULL)
- remove_watch(evt->watchs);
- free(evt);
+ /* unlinks the event if valid! */
+ pthread_mutex_lock(&events_mutex);
+ found = 0;
+ prv = &events;
+ while (*prv && !(found = (*prv == evt)))
+ prv = &(*prv)->next;
+ if (found)
+ *prv = evt->next;
+ pthread_mutex_unlock(&events_mutex);
+
+ /* destroys the event */
+ if (found) {
+ /* removes all watchers */
+ while(evt->watchs != NULL) {
+ listener = evt->watchs->listener;
+ pthread_mutex_lock(&listener->mutex);
+ pthread_mutex_lock(&evt->mutex);
+ remove_watch(evt->watchs);
+ pthread_mutex_unlock(&evt->mutex);
+ pthread_mutex_unlock(&listener->mutex);
+ }
+
+ /* hook */
+ if (evt->hookflags & afb_hook_flag_evt_drop)
+ afb_hook_evt_drop(evt->name, evt->id);
+
+ /* free */
+ pthread_mutex_destroy(&evt->mutex);
+ free(evt);
+ }
}
}
struct afb_event afb_evt_create_event(const char *name)
{
size_t len;
- struct afb_evt_event *evt;
+ struct afb_evt_event *evt, *oevt;
+ /* allocates the event */
len = strlen(name);
evt = malloc(len + sizeof * evt);
- if (evt != NULL) {
- evt->watchs = NULL;
- memcpy(evt->name, name, len + 1);
- }
+ if (evt == NULL)
+ goto error;
+
+ /* allocates the id */
+ pthread_mutex_lock(&events_mutex);
+ do {
+ if (++event_id_counter < 0) {
+ event_id_wrapped = 1;
+ event_id_counter = 1024; /* heuristic: small numbers are not destroyed */
+ }
+ if (!event_id_wrapped)
+ break;
+ oevt = events;
+ while(oevt != NULL && oevt->id != event_id_counter)
+ oevt = oevt->next;
+ } while (oevt != NULL);
+
+ /* initialize the event */
+ memcpy(evt->name, name, len + 1);
+ evt->next = events;
+ evt->watchs = NULL;
+ evt->id = event_id_counter;
+ pthread_mutex_init(&evt->mutex, NULL);
+ events = evt;
+ evt->hookflags = afb_hook_flags_evt(evt->name);
+ if (evt->hookflags & afb_hook_flag_evt_create)
+ afb_hook_evt_create(evt->name, evt->id);
+ pthread_mutex_unlock(&events_mutex);
+
+ /* returns the event */
return (struct afb_event){ .itf = &afb_evt_event_itf, .closure = evt };
+error:
+ return (struct afb_event){ .itf = NULL, .closure = NULL };
+}
+
+/*
+ * Returns the name of the 'event'
+ */
+const char *afb_evt_event_name(struct afb_event event)
+{
+ return (event.itf != &afb_evt_event_itf) ? NULL : ((struct afb_evt_event *)event.closure)->name;
+}
+
+/*
+ * Returns the id of the 'event'
+ */
+int afb_evt_event_id(struct afb_event event)
+{
+ return (event.itf != &afb_evt_event_itf) ? 0 : ((struct afb_evt_event *)event.closure)->id;
}
/*
* and the 'closure'.
* Returns NULL in case of memory depletion.
*/
-struct afb_evt_listener *afb_evt_listener_create(void (*send)(void *closure, const char *event, struct json_object *object), void *closure)
+struct afb_evt_listener *afb_evt_listener_create(const struct afb_evt_itf *itf, void *closure)
{
struct afb_evt_listener *listener;
/* search if an instance already exists */
+ pthread_mutex_lock(&listeners_mutex);
listener = listeners;
while (listener != NULL) {
- if (listener->send == send && listener->closure == closure)
- return afb_evt_listener_addref(listener);
+ if (listener->itf == itf && listener->closure == closure) {
+ listener = afb_evt_listener_addref(listener);
+ goto found;
+ }
listener = listener->next;
}
listener = calloc(1, sizeof *listener);
if (listener != NULL) {
/* init */
- listener->next = listeners;
- listener->send = send;
+ listener->itf = itf;
listener->closure = closure;
listener->watchs = NULL;
listener->refcount = 1;
+ pthread_mutex_init(&listener->mutex, NULL);
+ listener->next = listeners;
listeners = listener;
}
+ found:
+ pthread_mutex_unlock(&listeners_mutex);
return listener;
}
*/
struct afb_evt_listener *afb_evt_listener_addref(struct afb_evt_listener *listener)
{
- listener->refcount++;
+ __atomic_add_fetch(&listener->refcount, 1, __ATOMIC_RELAXED);
return listener;
}
*/
void afb_evt_listener_unref(struct afb_evt_listener *listener)
{
- if (0 == --listener->refcount) {
- struct afb_evt_listener **prv;
+ struct afb_evt_listener **prv;
+ struct afb_evt_event *evt;
- /* remove the watchers */
- while (listener->watchs != NULL)
- remove_watch(listener->watchs);
+ if (!__atomic_sub_fetch(&listener->refcount, 1, __ATOMIC_RELAXED)) {
/* unlink the listener */
+ pthread_mutex_lock(&listeners_mutex);
prv = &listeners;
while (*prv != listener)
prv = &(*prv)->next;
*prv = listener->next;
+ pthread_mutex_unlock(&listeners_mutex);
+
+ /* remove the watchers */
+ pthread_mutex_lock(&listener->mutex);
+ while (listener->watchs != NULL) {
+ evt = listener->watchs->event;
+ pthread_mutex_lock(&evt->mutex);
+ remove_watch(listener->watchs);
+ pthread_mutex_unlock(&evt->mutex);
+ }
+ pthread_mutex_unlock(&listener->mutex);
/* free the listener */
+ pthread_mutex_destroy(&listener->mutex);
free(listener);
}
}
struct afb_evt_event *evt;
/* check parameter */
- if (event.itf != &afb_evt_event_itf) {
+ if (event.itf != &afb_evt_event_itf || listener->itf->push == NULL) {
errno = EINVAL;
return -1;
}
- /* search the existing watch */
+ /* search the existing watch for the listener */
+ evt = event.closure;
+ pthread_mutex_lock(&listener->mutex);
watch = listener->watchs;
while(watch != NULL) {
- if (watch->event == event.closure)
- return 0;
+ if (watch->event == evt)
+ goto found;
watch = watch->next_by_listener;
}
/* not found, allocate a new */
watch = malloc(sizeof *watch);
if (watch == NULL) {
+ pthread_mutex_unlock(&listener->mutex);
errno = ENOMEM;
return -1;
}
/* initialise and link */
- evt = event.closure;
watch->event = evt;
- watch->next_by_event = evt->watchs;
+ watch->activity = 0;
watch->listener = listener;
watch->next_by_listener = listener->watchs;
- evt->watchs = watch;
listener->watchs = watch;
-
+ pthread_mutex_lock(&evt->mutex);
+ watch->next_by_event = evt->watchs;
+ evt->watchs = watch;
+ pthread_mutex_unlock(&evt->mutex);
+
+found:
+ if (watch->activity == 0 && listener->itf->add != NULL)
+ listener->itf->add(listener->closure, evt->name, evt->id);
+ watch->activity++;
+ pthread_mutex_unlock(&listener->mutex);
+
return 0;
}
int afb_evt_remove_watch(struct afb_evt_listener *listener, struct afb_event event)
{
struct afb_evt_watch *watch;
+ struct afb_evt_event *evt;
/* check parameter */
if (event.itf != &afb_evt_event_itf) {
}
/* search the existing watch */
+ evt = event.closure;
+ pthread_mutex_lock(&listener->mutex);
watch = listener->watchs;
while(watch != NULL) {
- if (watch->event == event.closure) {
- /* found: remove it */
- remove_watch(watch);
- break;
+ if (watch->event == evt) {
+ if (watch->activity != 0) {
+ watch->activity--;
+ if (watch->activity == 0 && listener->itf->remove != NULL)
+ listener->itf->remove(listener->closure, evt->name, evt->id);
+ }
+ pthread_mutex_unlock(&listener->mutex);
+ return 0;
}
watch = watch->next_by_listener;
}
+ pthread_mutex_unlock(&listener->mutex);
+ errno = ENOENT;
+ return -1;
+}
+
+/*
+ * update the hooks for events
+ */
+void afb_evt_update_hooks()
+{
+ struct afb_evt_event *evt;
+
+ pthread_mutex_lock(&events_mutex);
+ for (evt = events ; evt ; evt = evt->next)
+ evt->hookflags = afb_hook_flags_evt(evt->name);
+ pthread_mutex_unlock(&events_mutex);
+}
+
+int afb_evt_push(struct afb_event event, struct json_object *object)
+{
+ if (event.itf == &afb_evt_event_itf)
+ return evt_push((struct afb_evt_event *)event.closure, object);
+ json_object_put(object);
return 0;
}
+int afb_evt_unhooked_push(struct afb_event event, struct json_object *object)
+{
+ if (event.itf == &afb_evt_event_itf)
+ return push((struct afb_evt_event *)event.closure, object, 0);
+ json_object_put(object);
+ return 0;
+}