Tag broadcasted events with UUID and hop
[src/app-framework-binder.git] / src / afb-evt.c
index ddad575..24ac8be 100644 (file)
@@ -31,6 +31,7 @@
 #include "afb-hook.h"
 #include "verbose.h"
 #include "jobs.h"
+#include "uuid.h"
 
 struct afb_evt_watch;
 
@@ -120,6 +121,12 @@ struct job_broadcast
        /** object atached to the event */
        struct json_object *object;
 
+       /** the uuid of the event */
+       uuid_binary_t  uuid;
+
+       /** remaining hop */
+       uint8_t hop;
+
        /** name of the event to broadcast */
        char event[];
 };
@@ -168,16 +175,39 @@ static struct afb_evtid *evtids = NULL;
 static int event_id_counter = 0;
 static int event_id_wrapped = 0;
 
+/* head of uniqueness of events */
+#if !defined(EVENT_BROADCAST_HOP_MAX)
+#  define EVENT_BROADCAST_HOP_MAX  10
+#endif
+#if !defined(EVENT_BROADCAST_MEMORY_COUNT)
+#  define EVENT_BROADCAST_MEMORY_COUNT  8
+#endif
+
+#if EVENT_BROADCAST_MEMORY_COUNT
+static struct {
+       pthread_mutex_t mutex;
+       uint8_t base;
+       uint8_t count;
+       uuid_binary_t uuids[EVENT_BROADCAST_MEMORY_COUNT];
+} uniqueness = {
+       .mutex = PTHREAD_MUTEX_INITIALIZER,
+       .base = 0,
+       .count = 0
+};
+#endif
+
 /*
  * Create structure for job of broadcasting string 'event' with 'object'
  * Returns the created structure or NULL if out of memory
  */
-static struct job_broadcast *make_job_broadcast(const char *event, struct json_object *object)
+static struct job_broadcast *make_job_broadcast(const char *event, struct json_object *object, const uuid_binary_t uuid, uint8_t hop)
 {
        size_t sz = 1 + strlen(event);
        struct job_broadcast *jb = malloc(sz + sizeof *jb);
        if (jb) {
                jb->object = object;
+               memcpy(jb->uuid, uuid, sizeof jb->uuid);
+               jb->hop = hop;
                memcpy(jb->event, event, sz);
        }
        return jb;
@@ -219,7 +249,7 @@ static void destroy_job_evtid(struct job_evtid *je)
 /*
  * Broadcasts the 'event' of 'id' with its 'object'
  */
-static void broadcast(const char *event, struct json_object *object)
+static void broadcast(struct job_broadcast *jb)
 {
        struct afb_evt_listener *listener;
 
@@ -227,7 +257,7 @@ static void broadcast(const char *event, struct json_object *object)
        listener = listeners;
        while(listener) {
                if (listener->itf->broadcast != NULL)
-                       listener->itf->broadcast(listener->closure, event, json_object_get(object));
+                       listener->itf->broadcast(listener->closure, jb->event, json_object_get(jb->object), jb->uuid, jb->hop);
                listener = listener->next;
        }
        pthread_rwlock_unlock(&listeners_rwlock);
@@ -241,19 +271,56 @@ static void broadcast_job(int signum, void *closure)
        struct job_broadcast *jb = closure;
 
        if (signum == 0)
-               broadcast(jb->event, jb->object);
+               broadcast(jb);
        destroy_job_broadcast(jb);
 }
 
 /*
  * Broadcasts the string 'event' with its 'object'
  */
-static int unhooked_broadcast(const char *event, struct json_object *object)
+static int unhooked_broadcast(const char *event, struct json_object *object, const uuid_binary_t uuid, uint8_t hop)
 {
+       uuid_binary_t local_uuid;
        struct job_broadcast *jb;
        int rc;
+#if EVENT_BROADCAST_MEMORY_COUNT
+       int iter, count;
+#endif
+
+       /* check if lately sent */
+       if (!uuid) {
+               uuid_new_binary(local_uuid);
+               uuid = local_uuid;
+               hop = EVENT_BROADCAST_HOP_MAX;
+#if EVENT_BROADCAST_MEMORY_COUNT
+               pthread_mutex_lock(&uniqueness.mutex);
+       } else {
+               pthread_mutex_lock(&uniqueness.mutex);
+               iter = (int)uniqueness.base;
+               count = (int)uniqueness.count;
+               while (count) {
+                       if (0 == memcmp(uuid, uniqueness.uuids[iter], sizeof(uuid_binary_t))) {
+                               pthread_mutex_unlock(&uniqueness.mutex);
+                               return 0;
+                       }
+                       if (++iter == EVENT_BROADCAST_MEMORY_COUNT)
+                               iter = 0;
+                       count--;
+               }
+       }
+       iter = (int)uniqueness.base;
+       if (uniqueness.count < EVENT_BROADCAST_MEMORY_COUNT)
+               iter += (int)(uniqueness.count++);
+       else if (++uniqueness.base == EVENT_BROADCAST_MEMORY_COUNT)
+               uniqueness.base = 0;
+       memcpy(uniqueness.uuids[iter], uuid, sizeof(uuid_binary_t));
+       pthread_mutex_unlock(&uniqueness.mutex);
+#else
+       }
+#endif
 
-       jb = make_job_broadcast(event, object);
+       /* create the structure for the job */
+       jb = make_job_broadcast(event, object, uuid, hop);
        if (jb == NULL) {
                ERROR("Cant't create broadcast string job item for %s(%s)",
                        event, json_object_to_json_string(object));
@@ -261,6 +328,7 @@ static int unhooked_broadcast(const char *event, struct json_object *object)
                return -1;
        }
 
+       /* queue the job */
        rc = jobs_queue(BROADCAST_JOB_GROUP, 0, broadcast_job, jb);
        if (rc) {
                ERROR("cant't queue broadcast string job item for %s(%s)",
@@ -277,7 +345,7 @@ static int unhooked_broadcast(const char *event, struct json_object *object)
  */
 int afb_evt_evtid_broadcast(struct afb_evtid *evtid, struct json_object *object)
 {
-       return unhooked_broadcast(evtid->fullname, object);
+       return unhooked_broadcast(evtid->fullname, object, NULL, 0);
 }
 
 /*
@@ -304,12 +372,7 @@ int afb_evt_evtid_hooked_broadcast(struct afb_evtid *evtid, struct json_object *
        return result;
 }
 
-/*
- * Broadcasts the 'event' with its 'object'
- * 'object' is released (like json_object_put)
- * Returns the count of listener having receive the event.
- */
-int afb_evt_broadcast(const char *event, struct json_object *object)
+int afb_evt_rebroadcast(const char *event, struct json_object *object, const uuid_binary_t uuid, uint8_t hop)
 {
        int result;
 
@@ -318,7 +381,7 @@ int afb_evt_broadcast(const char *event, struct json_object *object)
        afb_hook_evt_broadcast_before(event, 0, object);
 #endif
 
-       result = unhooked_broadcast(event, object);
+       result = unhooked_broadcast(event, object, uuid, hop);
 
 #if WITH_AFB_HOOK
        afb_hook_evt_broadcast_after(event, 0, object, result);
@@ -327,6 +390,16 @@ int afb_evt_broadcast(const char *event, struct json_object *object)
        return result;
 }
 
+/*
+ * Broadcasts the 'event' with its 'object'
+ * 'object' is released (like json_object_put)
+ * Returns the count of listener having receive the event.
+ */
+int afb_evt_broadcast(const char *event, struct json_object *object)
+{
+       return afb_evt_rebroadcast(event, object, NULL, 0);
+}
+
 /*
  * Pushes the event 'evtid' with 'obj' to its listeners
  * Returns the count of listener that received the event.