#include <string.h>
#include <assert.h>
#include <errno.h>
+#include <pthread.h>
#include <json-c/json.h>
#include <afb/afb-event-itf.h>
/* head of the list of events listened */
struct afb_evt_watch *watchs;
+ /* mutex of the listener */
+ pthread_mutex_t mutex;
+
/* count of reference to the listener */
int refcount;
};
/* id of the event */
int id;
+ /* mutex of the event */
+ pthread_mutex_t mutex;
+
/* name of the event */
char name[1];
};
};
/* head of the list of listeners */
+static pthread_mutex_t listeners_mutex = PTHREAD_MUTEX_INITIALIZER;
static struct afb_evt_listener *listeners = NULL;
/* handling id of events */
+static pthread_mutex_t events_mutex = PTHREAD_MUTEX_INITIALIZER;
static struct afb_evt_event *events = NULL;
static int event_id_counter = 0;
static int event_id_wrapped = 0;
struct afb_evt_listener *listener;
result = 0;
+ pthread_mutex_lock(&listeners_mutex);
listener = listeners;
while(listener) {
if (listener->itf->broadcast != NULL) {
}
listener = listener->next;
}
+ pthread_mutex_unlock(&listeners_mutex);
json_object_put(object);
return result;
}
struct afb_evt_listener *listener;
result = 0;
+ pthread_mutex_lock(&evt->mutex);
watch = evt->watchs;
while(watch) {
listener = watch->listener;
watch = watch->next_by_event;
result++;
}
+ pthread_mutex_unlock(&evt->mutex);
json_object_put(obj);
return result;
}
*/
static void evt_destroy(struct afb_evt_event *evt)
{
+ int found;
struct afb_evt_event **prv;
+ struct afb_evt_listener *listener;
+
if (evt != NULL) {
- /* removes the event if valid! */
+ /* unlinks the event if valid! */
+ pthread_mutex_lock(&events_mutex);
prv = &events;
- while (*prv != NULL) {
- if (*prv != evt)
- prv = &(*prv)->next;
- else {
- /* valid, unlink */
- *prv = evt->next;
-
- /* removes all watchers */
- while(evt->watchs != NULL)
- remove_watch(evt->watchs);
-
- /* free */
- free(evt);
- break;
+ while (*prv && !(found = (*prv == evt)))
+ prv = &(*prv)->next;
+ if (found)
+ *prv = evt->next;
+ pthread_mutex_unlock(&events_mutex);
+
+ /* destroys the event */
+ if (found) {
+ /* removes all watchers */
+ while(evt->watchs != NULL) {
+ listener = evt->watchs->listener;
+ pthread_mutex_lock(&listener->mutex);
+ pthread_mutex_lock(&evt->mutex);
+ remove_watch(evt->watchs);
+ pthread_mutex_unlock(&evt->mutex);
+ pthread_mutex_unlock(&listener->mutex);
}
+
+ /* free */
+ pthread_mutex_destroy(&evt->mutex);
+ free(evt);
}
}
}
size_t len;
struct afb_evt_event *evt;
+ /* allocates the event */
+ len = strlen(name);
+ evt = malloc(len + sizeof * evt);
+ if (evt == NULL)
+ goto error;
+
+ /* initialize the event */
+ evt->watchs = NULL;
+ memcpy(evt->name, name, len + 1);
+
/* allocates the id */
+ pthread_mutex_lock(&events_mutex);
do {
if (++event_id_counter < 0) {
event_id_wrapped = 1;
evt = evt->next;
} while (evt != NULL);
- /* allocates the event */
- len = strlen(name);
- evt = malloc(len + sizeof * evt);
- if (evt == NULL)
- goto error;
-
/* initialize the event */
+ memcpy(evt->name, name, len + 1);
evt->next = events;
evt->watchs = NULL;
evt->id = event_id_counter;
- assert(evt->id > 0);
- memcpy(evt->name, name, len + 1);
+ pthread_mutex_init(&evt->mutex, NULL);
events = evt;
+ pthread_mutex_unlock(&events_mutex);
/* returns the event */
return (struct afb_event){ .itf = &afb_evt_event_itf, .closure = evt };
struct afb_evt_listener *listener;
/* search if an instance already exists */
+ pthread_mutex_lock(&listeners_mutex);
listener = listeners;
while (listener != NULL) {
- if (listener->itf == itf && listener->closure == closure)
- return afb_evt_listener_addref(listener);
+ if (listener->itf == itf && listener->closure == closure) {
+ listener = afb_evt_listener_addref(listener);
+ goto found;
+ }
listener = listener->next;
}
listener = calloc(1, sizeof *listener);
if (listener != NULL) {
/* init */
- listener->next = listeners;
listener->itf = itf;
listener->closure = closure;
listener->watchs = NULL;
listener->refcount = 1;
+ pthread_mutex_init(&listener->mutex, NULL);
+ listener->next = listeners;
listeners = listener;
}
+ found:
+ pthread_mutex_unlock(&listeners_mutex);
return listener;
}
*/
struct afb_evt_listener *afb_evt_listener_addref(struct afb_evt_listener *listener)
{
- listener->refcount++;
+ __atomic_add_fetch(&listener->refcount, 1, __ATOMIC_RELAXED);
return listener;
}
*/
void afb_evt_listener_unref(struct afb_evt_listener *listener)
{
- if (0 == --listener->refcount) {
- struct afb_evt_listener **prv;
+ struct afb_evt_listener **prv;
+ struct afb_evt_event *evt;
- /* remove the watchers */
- while (listener->watchs != NULL)
- remove_watch(listener->watchs);
+ if (!__atomic_sub_fetch(&listener->refcount, 1, __ATOMIC_RELAXED)) {
/* unlink the listener */
+ pthread_mutex_lock(&listeners_mutex);
prv = &listeners;
while (*prv != listener)
prv = &(*prv)->next;
*prv = listener->next;
+ pthread_mutex_unlock(&listeners_mutex);
+
+ /* remove the watchers */
+ pthread_mutex_lock(&listener->mutex);
+ while (listener->watchs != NULL) {
+ evt = listener->watchs->event;
+ pthread_mutex_lock(&evt->mutex);
+ remove_watch(listener->watchs);
+ pthread_mutex_unlock(&evt->mutex);
+ }
+ pthread_mutex_unlock(&listener->mutex);
/* free the listener */
+ pthread_mutex_destroy(&listener->mutex);
free(listener);
}
}
/* search the existing watch for the listener */
evt = event.closure;
+ pthread_mutex_lock(&listener->mutex);
watch = listener->watchs;
while(watch != NULL) {
if (watch->event == evt)
/* not found, allocate a new */
watch = malloc(sizeof *watch);
if (watch == NULL) {
+ pthread_mutex_unlock(&listener->mutex);
errno = ENOMEM;
return -1;
}
/* initialise and link */
watch->event = evt;
- watch->next_by_event = evt->watchs;
+ watch->activity = 0;
watch->listener = listener;
watch->next_by_listener = listener->watchs;
- watch->activity = 0;
- evt->watchs = watch;
listener->watchs = watch;
+ pthread_mutex_lock(&evt->mutex);
+ watch->next_by_event = evt->watchs;
+ evt->watchs = watch;
+ pthread_mutex_unlock(&evt->mutex);
found:
if (watch->activity == 0 && listener->itf->add != NULL)
listener->itf->add(listener->closure, evt->name, evt->id);
watch->activity++;
+ pthread_mutex_unlock(&listener->mutex);
return 0;
}
/* search the existing watch */
evt = event.closure;
+ pthread_mutex_lock(&listener->mutex);
watch = listener->watchs;
while(watch != NULL) {
if (watch->event == evt) {
if (watch->activity == 0 && listener->itf->remove != NULL)
listener->itf->remove(listener->closure, evt->name, evt->id);
}
+ pthread_mutex_unlock(&listener->mutex);
return 0;
}
watch = watch->next_by_listener;
}
+ pthread_mutex_unlock(&listener->mutex);
errno = ENOENT;
return -1;
}
{
json_object *msg, *request;
const char *token, *uuid;
- static json_object *type_reply = NULL;
+ json_object *type_reply = NULL;
msg = json_object_new_object();
if (resp != NULL)
json_object_object_add(msg, "response", resp);
- if (type_reply == NULL)
- type_reply = json_object_new_string("afb-reply");
- json_object_object_add(msg, "jtype", json_object_get(type_reply));
+ type_reply = json_object_new_string("afb-reply");
+ json_object_object_add(msg, "jtype", type_reply);
request = json_object_new_object();
json_object_object_add(msg, "request", request);
struct json_object *afb_msg_json_event(const char *event, struct json_object *object)
{
json_object *msg;
- static json_object *type_event = NULL;
+ json_object *type_event = NULL;
msg = json_object_new_object();
if (object != NULL)
json_object_object_add(msg, "data", object);
- if (type_event == NULL)
- type_event = json_object_new_string("afb-event");
- json_object_object_add(msg, "jtype", json_object_get(type_event));
+ type_event = json_object_new_string("afb-event");
+ json_object_object_add(msg, "jtype", type_event);
return msg;
}