afb-proto-ws: Add message for unexpected event
[src/app-framework-binder.git] / src / afb-evt.c
index 74ed71a..1c8798f 100644 (file)
@@ -31,6 +31,7 @@
 #include "afb-hook.h"
 #include "verbose.h"
 #include "jobs.h"
+#include "uuid.h"
 
 struct afb_evt_watch;
 
@@ -55,7 +56,7 @@ struct afb_evt_listener {
        pthread_rwlock_t rwlock;
 
        /* count of reference to the listener */
-       int refcount;
+       uint16_t refcount;
 };
 
 /*
@@ -81,10 +82,10 @@ struct afb_evtid {
 #endif
 
        /* refcount */
-       int refcount;
+       uint16_t refcount;
 
        /* id of the event */
-       int id;
+       uint16_t id;
 
        /* has client? */
        int has_client;
@@ -115,13 +116,19 @@ struct afb_evt_watch {
 };
 
 /*
- * structure for job of broadcasting string events
+ * structure for job of broadcasting events
  */
-struct job_string
+struct job_broadcast
 {
        /** object atached to the event */
        struct json_object *object;
 
+       /** the uuid of the event */
+       uuid_binary_t  uuid;
+
+       /** remaining hop */
+       uint8_t hop;
+
        /** name of the event to broadcast */
        char event[];
 };
@@ -169,31 +176,54 @@ static struct afb_evt_listener *listeners = NULL;
 /* handling id of events */
 static pthread_rwlock_t events_rwlock = PTHREAD_RWLOCK_INITIALIZER;
 static struct afb_evtid *evtids = NULL;
-static int event_id_counter = 0;
-static int event_id_wrapped = 0;
+static uint16_t event_genid = 0;
+static uint16_t event_count = 0;
+
+/* head of uniqueness of events */
+#if !defined(EVENT_BROADCAST_HOP_MAX)
+#  define EVENT_BROADCAST_HOP_MAX  10
+#endif
+#if !defined(EVENT_BROADCAST_MEMORY_COUNT)
+#  define EVENT_BROADCAST_MEMORY_COUNT  8
+#endif
+
+#if EVENT_BROADCAST_MEMORY_COUNT
+static struct {
+       pthread_mutex_t mutex;
+       uint8_t base;
+       uint8_t count;
+       uuid_binary_t uuids[EVENT_BROADCAST_MEMORY_COUNT];
+} uniqueness = {
+       .mutex = PTHREAD_MUTEX_INITIALIZER,
+       .base = 0,
+       .count = 0
+};
+#endif
 
 /*
  * Create structure for job of broadcasting string 'event' with 'object'
  * Returns the created structure or NULL if out of memory
  */
-static struct job_string *make_job_string(const char *event, struct json_object *object)
+static struct job_broadcast *make_job_broadcast(const char *event, struct json_object *object, const uuid_binary_t uuid, uint8_t hop)
 {
        size_t sz = 1 + strlen(event);
-       struct job_string *js = malloc(sz + sizeof *js);
-       if (js) {
-               js->object = object;
-               memcpy(js->event, event, sz);
+       struct job_broadcast *jb = malloc(sz + sizeof *jb);
+       if (jb) {
+               jb->object = object;
+               memcpy(jb->uuid, uuid, sizeof jb->uuid);
+               jb->hop = hop;
+               memcpy(jb->event, event, sz);
        }
-       return js;
+       return jb;
 }
 
 /*
- * Destroy structure 'js' for job of broadcasting string events
+ * Destroy structure 'jb' for job of broadcasting string events
  */
-static void destroy_job_string(struct job_string *js)
+static void destroy_job_broadcast(struct job_broadcast *jb)
 {
-       json_object_put(js->object);
-       free(js);
+       json_object_put(jb->object);
+       free(jb);
 }
 
 /*
@@ -223,7 +253,7 @@ static void destroy_job_evtid(struct job_evtid *je)
 /*
  * Broadcasts the 'event' of 'id' with its 'object'
  */
-static void broadcast(const char *event, struct json_object *object, int id)
+static void broadcast(struct job_broadcast *jb)
 {
        struct afb_evt_listener *listener;
 
@@ -231,7 +261,7 @@ static void broadcast(const char *event, struct json_object *object, int id)
        listener = listeners;
        while(listener) {
                if (listener->itf->broadcast != NULL)
-                       listener->itf->broadcast(listener->closure, event, id, json_object_get(object));
+                       listener->itf->broadcast(listener->closure, jb->event, json_object_get(jb->object), jb->uuid, jb->hop);
                listener = listener->next;
        }
        pthread_rwlock_unlock(&listeners_rwlock);
@@ -240,73 +270,74 @@ static void broadcast(const char *event, struct json_object *object, int id)
 /*
  * Jobs callback for broadcasting string asynchronously
  */
-static void broadcast_job_string(int signum, void *closure)
-{
-       struct job_string *js = closure;
-
-       if (signum == 0)
-               broadcast(js->event, js->object, 0);
-       destroy_job_string(js);
-}
-
-/*
- * Jobs callback for broadcasting evtid asynchronously
- */
-static void broadcast_job_evtid(int signum, void *closure)
+static void broadcast_job(int signum, void *closure)
 {
-       struct job_evtid *je = closure;
+       struct job_broadcast *jb = closure;
 
        if (signum == 0)
-               broadcast(je->evtid->fullname, je->object, je->evtid->id);
-       destroy_job_evtid(je);
+               broadcast(jb);
+       destroy_job_broadcast(jb);
 }
 
 /*
  * Broadcasts the string 'event' with its 'object'
  */
-static int broadcast_string(const char *event, struct json_object *object)
+static int unhooked_broadcast(const char *event, struct json_object *object, const uuid_binary_t uuid, uint8_t hop)
 {
-       struct job_string *js;
+       uuid_binary_t local_uuid;
+       struct job_broadcast *jb;
        int rc;
+#if EVENT_BROADCAST_MEMORY_COUNT
+       int iter, count;
+#endif
 
-       js = make_job_string(event, object);
-       if (js == NULL) {
+       /* check if lately sent */
+       if (!uuid) {
+               uuid_new_binary(local_uuid);
+               uuid = local_uuid;
+               hop = EVENT_BROADCAST_HOP_MAX;
+#if EVENT_BROADCAST_MEMORY_COUNT
+               pthread_mutex_lock(&uniqueness.mutex);
+       } else {
+               pthread_mutex_lock(&uniqueness.mutex);
+               iter = (int)uniqueness.base;
+               count = (int)uniqueness.count;
+               while (count) {
+                       if (0 == memcmp(uuid, uniqueness.uuids[iter], sizeof(uuid_binary_t))) {
+                               pthread_mutex_unlock(&uniqueness.mutex);
+                               return 0;
+                       }
+                       if (++iter == EVENT_BROADCAST_MEMORY_COUNT)
+                               iter = 0;
+                       count--;
+               }
+       }
+       iter = (int)uniqueness.base;
+       if (uniqueness.count < EVENT_BROADCAST_MEMORY_COUNT)
+               iter += (int)(uniqueness.count++);
+       else if (++uniqueness.base == EVENT_BROADCAST_MEMORY_COUNT)
+               uniqueness.base = 0;
+       memcpy(uniqueness.uuids[iter], uuid, sizeof(uuid_binary_t));
+       pthread_mutex_unlock(&uniqueness.mutex);
+#else
+       }
+#endif
+
+       /* create the structure for the job */
+       jb = make_job_broadcast(event, object, uuid, hop);
+       if (jb == NULL) {
                ERROR("Cant't create broadcast string job item for %s(%s)",
                        event, json_object_to_json_string(object));
                json_object_put(object);
                return -1;
        }
 
-       rc = jobs_queue(BROADCAST_JOB_GROUP, 0, broadcast_job_string, js);
+       /* queue the job */
+       rc = jobs_queue(BROADCAST_JOB_GROUP, 0, broadcast_job, jb);
        if (rc) {
                ERROR("cant't queue broadcast string job item for %s(%s)",
                        event, json_object_to_json_string(object));
-               destroy_job_string(js);
-       }
-       return rc;
-}
-
-/*
- * Broadcasts the 'evtid' with its 'object'
- */
-static int broadcast_evtid(struct afb_evtid *evtid, struct json_object *object)
-{
-       struct job_evtid *je;
-       int rc;
-
-       je = make_job_evtid(evtid, object);
-       if (je == NULL) {
-               ERROR("Cant't create broadcast evtid job item for %s(%s)",
-                       evtid->fullname, json_object_to_json_string(object));
-               json_object_put(object);
-               return -1;
-       }
-
-       rc = jobs_queue(BROADCAST_JOB_GROUP, 0, broadcast_job_evtid, je);
-       if (rc) {
-               ERROR("cant't queue broadcast evtid job item for %s(%s)",
-                       evtid->fullname, json_object_to_json_string(object));
-               destroy_job_evtid(je);
+               destroy_job_broadcast(jb);
        }
        return rc;
 }
@@ -318,7 +349,7 @@ static int broadcast_evtid(struct afb_evtid *evtid, struct json_object *object)
  */
 int afb_evt_evtid_broadcast(struct afb_evtid *evtid, struct json_object *object)
 {
-       return broadcast_evtid(evtid, object);
+       return unhooked_broadcast(evtid->fullname, object, NULL, 0);
 }
 
 #if WITH_AFB_HOOK
@@ -336,7 +367,7 @@ int afb_evt_evtid_hooked_broadcast(struct afb_evtid *evtid, struct json_object *
        if (evtid->hookflags & afb_hook_flag_evt_broadcast_before)
                afb_hook_evt_broadcast_before(evtid->fullname, evtid->id, object);
 
-       result = broadcast_evtid(evtid, object);
+       result = afb_evt_evtid_broadcast(evtid, object);
 
        if (evtid->hookflags & afb_hook_flag_evt_broadcast_after)
                afb_hook_evt_broadcast_after(evtid->fullname, evtid->id, object, result);
@@ -347,28 +378,32 @@ int afb_evt_evtid_hooked_broadcast(struct afb_evtid *evtid, struct json_object *
 }
 #endif
 
-/*
- * Broadcasts the 'event' with its 'object'
- * 'object' is released (like json_object_put)
- * Returns the count of listener having receive the event.
- */
-int afb_evt_broadcast(const char *event, struct json_object *object)
+int afb_evt_rebroadcast(const char *event, struct json_object *object, const uuid_binary_t uuid, uint8_t hop)
 {
-#if WITH_AFB_HOOK
        int result;
 
+#if WITH_AFB_HOOK
        json_object_get(object);
-
        afb_hook_evt_broadcast_before(event, 0, object);
-       result = broadcast_string(event, object);
-       afb_hook_evt_broadcast_after(event, 0, object, result);
+#endif
 
-       json_object_put(object);
+       result = unhooked_broadcast(event, object, uuid, hop);
 
-       return result;
-#else
-       return broadcast_string(event, object);
+#if WITH_AFB_HOOK
+       afb_hook_evt_broadcast_after(event, 0, object, result);
+       json_object_put(object);
 #endif
+       return result;
+}
+
+/*
+ * Broadcasts the 'event' with its 'object'
+ * 'object' is released (like json_object_put)
+ * Returns the count of listener having receive the event.
+ */
+int afb_evt_broadcast(const char *event, struct json_object *object)
+{
+       return afb_evt_rebroadcast(event, object, NULL, 0);
 }
 
 /*
@@ -510,6 +545,7 @@ struct afb_evtid *afb_evt_evtid_create(const char *fullname)
 {
        size_t len;
        struct afb_evtid *evtid, *oevt;
+       uint16_t id;
 
        /* allocates the event */
        len = strlen(fullname);
@@ -519,15 +555,20 @@ struct afb_evtid *afb_evt_evtid_create(const char *fullname)
 
        /* allocates the id */
        pthread_rwlock_wrlock(&events_rwlock);
+       if (event_count == UINT16_MAX) {
+               pthread_rwlock_unlock(&events_rwlock);
+               free(evtid);
+               ERROR("Can't create more events");
+               return NULL;
+       }
+       event_count++;
        do {
-               if (++event_id_counter < 0) {
-                       event_id_wrapped = 1;
-                       event_id_counter = 1024; /* heuristic: small numbers are not destroyed */
-               }
-               if (!event_id_wrapped)
-                       break;
+               /* TODO add a guard (counting number of event created) */
+               id = ++event_genid;
+               if (!id)
+                       id = event_genid = 1;
                oevt = evtids;
-               while(oevt != NULL && oevt->id != event_id_counter)
+               while(oevt != NULL && oevt->id != id)
                        oevt = oevt->next;
        } while (oevt != NULL);
 
@@ -536,7 +577,7 @@ struct afb_evtid *afb_evt_evtid_create(const char *fullname)
        evtid->next = evtids;
        evtid->refcount = 1;
        evtid->watchs = NULL;
-       evtid->id = event_id_counter;
+       evtid->id = id;
        evtid->has_client = 0;
        pthread_rwlock_init(&evtid->rwlock, NULL);
        evtids = evtid;
@@ -614,8 +655,10 @@ void afb_evt_evtid_unref(struct afb_evtid *evtid)
                prv = &evtids;
                while (*prv && !(found = (*prv == evtid)))
                        prv = &(*prv)->next;
-               if (found)
+               if (found) {
                        *prv = evtid->next;
+                       event_count--;
+               }
                pthread_rwlock_unlock(&events_rwlock);
 
                /* destroys the event */
@@ -685,7 +728,7 @@ const char *afb_evt_evtid_hooked_name(struct afb_evtid *evtid)
 /*
  * Returns the id of the 'event'
  */
-int afb_evt_evtid_id(struct afb_evtid *evtid)
+uint16_t afb_evt_evtid_id(struct afb_evtid *evtid)
 {
        return evtid->id;
 }
@@ -851,6 +894,36 @@ int afb_evt_watch_sub_evtid(struct afb_evt_listener *listener, struct afb_evtid
        return -1;
 }
 
+/*
+ * Avoids the 'listener' to watch 'eventid'
+ * Returns 0 in case of success or else -1.
+ */
+int afb_evt_watch_sub_eventid(struct afb_evt_listener *listener, uint16_t eventid)
+{
+       struct afb_evt_watch *watch;
+       struct afb_evtid *evtid;
+
+       /* search the existing watch */
+       pthread_rwlock_wrlock(&listener->rwlock);
+       watch = listener->watchs;
+       while(watch != NULL) {
+               evtid = watch->evtid;
+               if (evtid->id == eventid) {
+                       if (watch->activity != 0) {
+                               watch->activity--;
+                               if (watch->activity == 0 && listener->itf->remove != NULL)
+                                       listener->itf->remove(listener->closure, evtid->fullname, evtid->id);
+                       }
+                       pthread_rwlock_unlock(&listener->rwlock);
+                       return 0;
+               }
+               watch = watch->next_by_listener;
+       }
+       pthread_rwlock_unlock(&listener->rwlock);
+       errno = ENOENT;
+       return -1;
+}
+
 #if WITH_AFB_HOOK
 /*
  * update the hooks for events
@@ -908,7 +981,7 @@ const char *afb_evt_event_x2_fullname(struct afb_event_x2 *eventid)
 /*
  * Returns the id of the 'eventid'
  */
-int afb_evt_event_x2_id(struct afb_event_x2 *eventid)
+uint16_t afb_evt_event_x2_id(struct afb_event_x2 *eventid)
 {
        struct afb_evtid *evtid = afb_evt_event_x2_to_evtid(eventid);
        return evtid ? evtid->id : 0;