#include <string.h>
#include <assert.h>
#include <errno.h>
+#include <pthread.h>
#include <json-c/json.h>
#include <afb/afb-event-itf.h>
#include "afb-evt.h"
+#include "afb-hook.h"
+#include "verbose.h"
struct afb_evt_watch;
/* head of the list of events listened */
struct afb_evt_watch *watchs;
+ /* mutex of the listener */
+ pthread_mutex_t mutex;
+
/* count of reference to the listener */
int refcount;
};
/* id of the event */
int id;
+ /* hooking */
+ int hookflags;
+
+ /* mutex of the event */
+ pthread_mutex_t mutex;
+
/* name of the event */
char name[1];
};
};
/* head of the list of listeners */
+static pthread_mutex_t listeners_mutex = PTHREAD_MUTEX_INITIALIZER;
static struct afb_evt_listener *listeners = NULL;
/* handling id of events */
+static pthread_mutex_t events_mutex = PTHREAD_MUTEX_INITIALIZER;
static struct afb_evt_event *events = NULL;
static int event_id_counter = 0;
static int event_id_wrapped = 0;
/*
- * Broadcasts the event 'evt' with its 'object'
- * 'object' is released (like json_object_put)
- * Returns the count of listener that received the event.
- */
-static int evt_broadcast(struct afb_evt_event *evt, struct json_object *object)
-{
- return afb_evt_broadcast(evt->name, object);
-}
-
-/*
- * Broadcasts the 'event' with its 'object'
- * 'object' is released (like json_object_put)
+ * Broadcasts the 'event' of 'id' with its 'obj'
+ * 'obj' is released (like json_object_put)
+ * calls hooks if hookflags isn't 0
* Returns the count of listener having receive the event.
*/
-int afb_evt_broadcast(const char *event, struct json_object *object)
+static int broadcast(const char *event, struct json_object *obj, int id, int hookflags)
{
int result;
struct afb_evt_listener *listener;
+ if (hookflags & afb_hook_flag_evt_broadcast_before)
+ afb_hook_evt_broadcast_before(event, id, obj);
result = 0;
+ pthread_mutex_lock(&listeners_mutex);
listener = listeners;
while(listener) {
if (listener->itf->broadcast != NULL) {
- listener->itf->broadcast(listener->closure, event, 0, json_object_get(object));
+ listener->itf->broadcast(listener->closure, event, id, json_object_get(obj));
result++;
}
listener = listener->next;
}
- json_object_put(object);
+ pthread_mutex_unlock(&listeners_mutex);
+ if (hookflags & afb_hook_flag_evt_broadcast_after)
+ afb_hook_evt_broadcast_after(event, id, obj, result);
+ json_object_put(obj);
return result;
}
+/*
+ * Broadcasts the event 'evt' with its 'object'
+ * 'object' is released (like json_object_put)
+ * Returns the count of listener that received the event.
+ */
+static int evt_broadcast(struct afb_evt_event *evt, struct json_object *object)
+{
+ return broadcast(evt->name, object, evt->id, evt->hookflags);
+}
+
+/*
+ * Broadcasts the 'event' with its 'object'
+ * 'object' is released (like json_object_put)
+ * Returns the count of listener having receive the event.
+ */
+int afb_evt_broadcast(const char *event, struct json_object *object)
+{
+ return broadcast(event, object, 0, -1);
+}
+
/*
* Pushes the event 'evt' with 'obj' to its listeners
* 'obj' is released (like json_object_put)
+ * calls hooks if hookflags isn't 0
* Returns the count of listener taht received the event.
*/
-static int evt_push(struct afb_evt_event *evt, struct json_object *obj)
+static int push(struct afb_evt_event *evt, struct json_object *obj, int hookflags)
{
int result;
struct afb_evt_watch *watch;
struct afb_evt_listener *listener;
result = 0;
+ pthread_mutex_lock(&evt->mutex);
+ if (hookflags & afb_hook_flag_evt_push_before)
+ afb_hook_evt_push_before(evt->name, evt->id, obj);
watch = evt->watchs;
while(watch) {
listener = watch->listener;
assert(listener->itf->push != NULL);
- if (watch->activity != 0)
+ if (watch->activity != 0) {
listener->itf->push(listener->closure, evt->name, evt->id, json_object_get(obj));
+ result++;
+ }
watch = watch->next_by_event;
- result++;
}
+ if (hookflags & afb_hook_flag_evt_push_after)
+ afb_hook_evt_push_after(evt->name, evt->id, obj, result);
+ pthread_mutex_unlock(&evt->mutex);
json_object_put(obj);
return result;
}
+/*
+ * Pushes the event 'evt' with 'obj' to its listeners
+ * 'obj' is released (like json_object_put)
+ * Returns the count of listener taht received the event.
+ */
+static int evt_push(struct afb_evt_event *evt, struct json_object *obj)
+{
+ return push(evt, obj, evt->hookflags);
+}
+
/*
* Returns the name associated to the event 'evt'.
*/
static const char *evt_name(struct afb_evt_event *evt)
{
- return evt->name;
+ const char *name = strchr(evt->name, '/');
+ name = name ? name + 1 : evt->name;
+ if (evt->hookflags & afb_hook_flag_evt_name)
+ afb_hook_evt_name(evt->name, evt->id);
+ return name;
}
/*
*/
static void evt_destroy(struct afb_evt_event *evt)
{
+ int found;
struct afb_evt_event **prv;
+ struct afb_evt_listener *listener;
+
if (evt != NULL) {
- /* removes the event if valid! */
+ /* unlinks the event if valid! */
+ pthread_mutex_lock(&events_mutex);
+ found = 0;
prv = &events;
- while (*prv != NULL) {
- if (*prv != evt)
- prv = &(*prv)->next;
- else {
- /* valid, unlink */
- *prv = evt->next;
-
- /* removes all watchers */
- while(evt->watchs != NULL)
- remove_watch(evt->watchs);
-
- /* free */
- free(evt);
- break;
+ while (*prv && !(found = (*prv == evt)))
+ prv = &(*prv)->next;
+ if (found)
+ *prv = evt->next;
+ pthread_mutex_unlock(&events_mutex);
+
+ /* destroys the event */
+ if (found) {
+ /* removes all watchers */
+ while(evt->watchs != NULL) {
+ listener = evt->watchs->listener;
+ pthread_mutex_lock(&listener->mutex);
+ pthread_mutex_lock(&evt->mutex);
+ remove_watch(evt->watchs);
+ pthread_mutex_unlock(&evt->mutex);
+ pthread_mutex_unlock(&listener->mutex);
}
+
+ /* hook */
+ if (evt->hookflags & afb_hook_flag_evt_drop)
+ afb_hook_evt_drop(evt->name, evt->id);
+
+ /* free */
+ pthread_mutex_destroy(&evt->mutex);
+ free(evt);
}
}
}
struct afb_event afb_evt_create_event(const char *name)
{
size_t len;
- struct afb_evt_event *evt;
+ struct afb_evt_event *evt, *oevt;
+
+ /* allocates the event */
+ len = strlen(name);
+ evt = malloc(len + sizeof * evt);
+ if (evt == NULL)
+ goto error;
/* allocates the id */
+ pthread_mutex_lock(&events_mutex);
do {
if (++event_id_counter < 0) {
event_id_wrapped = 1;
}
if (!event_id_wrapped)
break;
- evt = events;
- while(evt != NULL && evt->id != event_id_counter)
- evt = evt->next;
- } while (evt != NULL);
-
- /* allocates the event */
- len = strlen(name);
- evt = malloc(len + sizeof * evt);
- if (evt == NULL)
- goto error;
+ oevt = events;
+ while(oevt != NULL && oevt->id != event_id_counter)
+ oevt = oevt->next;
+ } while (oevt != NULL);
/* initialize the event */
+ memcpy(evt->name, name, len + 1);
evt->next = events;
evt->watchs = NULL;
evt->id = event_id_counter;
- assert(evt->id > 0);
- memcpy(evt->name, name, len + 1);
+ pthread_mutex_init(&evt->mutex, NULL);
events = evt;
+ evt->hookflags = afb_hook_flags_evt(evt->name);
+ if (evt->hookflags & afb_hook_flag_evt_create)
+ afb_hook_evt_create(evt->name, evt->id);
+ pthread_mutex_unlock(&events_mutex);
/* returns the event */
return (struct afb_event){ .itf = &afb_evt_event_itf, .closure = evt };
struct afb_evt_listener *listener;
/* search if an instance already exists */
+ pthread_mutex_lock(&listeners_mutex);
listener = listeners;
while (listener != NULL) {
- if (listener->itf == itf && listener->closure == closure)
- return afb_evt_listener_addref(listener);
+ if (listener->itf == itf && listener->closure == closure) {
+ listener = afb_evt_listener_addref(listener);
+ goto found;
+ }
listener = listener->next;
}
listener = calloc(1, sizeof *listener);
if (listener != NULL) {
/* init */
- listener->next = listeners;
listener->itf = itf;
listener->closure = closure;
listener->watchs = NULL;
listener->refcount = 1;
+ pthread_mutex_init(&listener->mutex, NULL);
+ listener->next = listeners;
listeners = listener;
}
+ found:
+ pthread_mutex_unlock(&listeners_mutex);
return listener;
}
*/
struct afb_evt_listener *afb_evt_listener_addref(struct afb_evt_listener *listener)
{
- listener->refcount++;
+ __atomic_add_fetch(&listener->refcount, 1, __ATOMIC_RELAXED);
return listener;
}
*/
void afb_evt_listener_unref(struct afb_evt_listener *listener)
{
- if (0 == --listener->refcount) {
- struct afb_evt_listener **prv;
+ struct afb_evt_listener **prv;
+ struct afb_evt_event *evt;
- /* remove the watchers */
- while (listener->watchs != NULL)
- remove_watch(listener->watchs);
+ if (!__atomic_sub_fetch(&listener->refcount, 1, __ATOMIC_RELAXED)) {
/* unlink the listener */
+ pthread_mutex_lock(&listeners_mutex);
prv = &listeners;
while (*prv != listener)
prv = &(*prv)->next;
*prv = listener->next;
+ pthread_mutex_unlock(&listeners_mutex);
+
+ /* remove the watchers */
+ pthread_mutex_lock(&listener->mutex);
+ while (listener->watchs != NULL) {
+ evt = listener->watchs->event;
+ pthread_mutex_lock(&evt->mutex);
+ remove_watch(listener->watchs);
+ pthread_mutex_unlock(&evt->mutex);
+ }
+ pthread_mutex_unlock(&listener->mutex);
/* free the listener */
+ pthread_mutex_destroy(&listener->mutex);
free(listener);
}
}
/* search the existing watch for the listener */
evt = event.closure;
+ pthread_mutex_lock(&listener->mutex);
watch = listener->watchs;
while(watch != NULL) {
if (watch->event == evt)
/* not found, allocate a new */
watch = malloc(sizeof *watch);
if (watch == NULL) {
+ pthread_mutex_unlock(&listener->mutex);
errno = ENOMEM;
return -1;
}
/* initialise and link */
watch->event = evt;
- watch->next_by_event = evt->watchs;
+ watch->activity = 0;
watch->listener = listener;
watch->next_by_listener = listener->watchs;
- watch->activity = 0;
- evt->watchs = watch;
listener->watchs = watch;
+ pthread_mutex_lock(&evt->mutex);
+ watch->next_by_event = evt->watchs;
+ evt->watchs = watch;
+ pthread_mutex_unlock(&evt->mutex);
found:
if (watch->activity == 0 && listener->itf->add != NULL)
listener->itf->add(listener->closure, evt->name, evt->id);
watch->activity++;
+ pthread_mutex_unlock(&listener->mutex);
return 0;
}
/* search the existing watch */
evt = event.closure;
+ pthread_mutex_lock(&listener->mutex);
watch = listener->watchs;
while(watch != NULL) {
if (watch->event == evt) {
- /* found: remove it */
if (watch->activity != 0) {
watch->activity--;
if (watch->activity == 0 && listener->itf->remove != NULL)
listener->itf->remove(listener->closure, evt->name, evt->id);
}
+ pthread_mutex_unlock(&listener->mutex);
return 0;
}
watch = watch->next_by_listener;
}
+ pthread_mutex_unlock(&listener->mutex);
errno = ENOENT;
return -1;
}
+/*
+ * update the hooks for events
+ */
+void afb_evt_update_hooks()
+{
+ struct afb_evt_event *evt;
+
+ pthread_mutex_lock(&events_mutex);
+ for (evt = events ; evt ; evt = evt->next)
+ evt->hookflags = afb_hook_flags_evt(evt->name);
+ pthread_mutex_unlock(&events_mutex);
+}
+
+int afb_evt_push(struct afb_event event, struct json_object *object)
+{
+ if (event.itf == &afb_evt_event_itf)
+ return evt_push((struct afb_evt_event *)event.closure, object);
+ json_object_put(object);
+ return 0;
+}
+
+int afb_evt_unhooked_push(struct afb_event event, struct json_object *object)
+{
+ if (event.itf == &afb_evt_event_itf)
+ return push((struct afb_evt_event *)event.closure, object, 0);
+ json_object_put(object);
+ return 0;
+}
+