afb-evt: Ensure unsubscribe works
[src/app-framework-binder.git] / src / afb-evt.c
index 400f8fb..7122a33 100644 (file)
@@ -87,9 +87,6 @@ struct afb_evtid {
        /* id of the event */
        uint16_t id;
 
-       /* has client? */
-       int has_client;
-
        /* fullname of the event */
        char fullname[];
 };
@@ -110,9 +107,6 @@ struct afb_evt_watch {
 
        /* link to the next watcher for the same listener */
        struct afb_evt_watch *next_by_listener;
-
-       /* activity */
-       unsigned activity;
 };
 
 /*
@@ -412,23 +406,17 @@ int afb_evt_broadcast(const char *event, struct json_object *object)
  */
 static void push_evtid(struct afb_evtid *evtid, struct json_object *object)
 {
-       int has_client;
        struct afb_evt_watch *watch;
        struct afb_evt_listener *listener;
 
-       has_client = 0;
        pthread_rwlock_rdlock(&evtid->rwlock);
        watch = evtid->watchs;
        while(watch) {
                listener = watch->listener;
                assert(listener->itf->push != NULL);
-               if (watch->activity != 0) {
-                       listener->itf->push(listener->closure, evtid->fullname, evtid->id, json_object_get(object));
-                       has_client = 1;
-               }
+               listener->itf->push(listener->closure, evtid->fullname, evtid->id, json_object_get(object));
                watch = watch->next_by_evtid;
        }
-       evtid->has_client = has_client;
        pthread_rwlock_unlock(&evtid->rwlock);
 }
 
@@ -455,6 +443,9 @@ int afb_evt_evtid_push(struct afb_evtid *evtid, struct json_object *object)
        struct job_evtid *je;
        int rc;
 
+       if (!evtid->watchs)
+               return 0;
+
        je = make_job_evtid(evtid, object);
        if (je == NULL) {
                ERROR("Cant't create push evtid job item for %s(%s)",
@@ -465,7 +456,7 @@ int afb_evt_evtid_push(struct afb_evtid *evtid, struct json_object *object)
 
        rc = jobs_queue(PUSH_JOB_GROUP, 0, push_job_evtid, je);
        if (rc == 0)
-               rc = evtid->has_client;
+               rc = 1;
        else {
                ERROR("cant't queue push evtid job item for %s(%s)",
                        evtid->fullname, json_object_to_json_string(object));
@@ -507,32 +498,54 @@ int afb_evt_evtid_hooked_push(struct afb_evtid *evtid, struct json_object *obj)
 }
 #endif
 
-/*
- * remove the 'watch'
- */
-static void remove_watch(struct afb_evt_watch *watch)
+static void unwatch(struct afb_evt_listener *listener, struct afb_evtid *evtid, int remove)
+{
+       /* notify listener if needed */
+       if (remove && listener->itf->remove != NULL)
+               listener->itf->remove(listener->closure, evtid->fullname, evtid->id);
+}
+
+static void evtid_unwatch(struct afb_evtid *evtid, struct afb_evt_listener *listener, struct afb_evt_watch *watch, int remove)
 {
        struct afb_evt_watch **prv;
-       struct afb_evtid *evtid;
-       struct afb_evt_listener *listener;
 
        /* notify listener if needed */
-       evtid = watch->evtid;
-       listener = watch->listener;
-       if (watch->activity != 0 && listener->itf->remove != NULL)
-               listener->itf->remove(listener->closure, evtid->fullname, evtid->id);
+       unwatch(listener, evtid, remove);
 
        /* unlink the watch for its event */
-       prv = &evtid->watchs;
-       while(*prv != watch)
-               prv = &(*prv)->next_by_evtid;
-       *prv = watch->next_by_evtid;
-
-       /* unlink the watch for its listener */
+       pthread_rwlock_wrlock(&listener->rwlock);
        prv = &listener->watchs;
-       while(*prv != watch)
+       while(*prv) {
+               if (*prv == watch) {
+                       *prv = watch->next_by_listener;
+                       break;
+               }
                prv = &(*prv)->next_by_listener;
-       *prv = watch->next_by_listener;
+       }
+       pthread_rwlock_unlock(&listener->rwlock);
+
+       /* recycle memory */
+       free(watch);
+}
+
+static void listener_unwatch(struct afb_evt_listener *listener, struct afb_evtid *evtid, struct afb_evt_watch *watch, int remove)
+{
+       struct afb_evt_watch **prv;
+
+       /* notify listener if needed */
+       unwatch(listener, evtid, remove);
+
+       /* unlink the watch for its event */
+       pthread_rwlock_wrlock(&evtid->rwlock);
+       prv = &evtid->watchs;
+       while(*prv) {
+               if (*prv == watch) {
+                       *prv = watch->next_by_evtid;
+                       break;
+               }
+               prv = &(*prv)->next_by_evtid;
+       }
+       pthread_rwlock_unlock(&evtid->rwlock);
 
        /* recycle memory */
        free(watch);
@@ -578,7 +591,6 @@ struct afb_evtid *afb_evt_evtid_create(const char *fullname)
        evtid->refcount = 1;
        evtid->watchs = NULL;
        evtid->id = id;
-       evtid->has_client = 0;
        pthread_rwlock_init(&evtid->rwlock, NULL);
        evtids = evtid;
 #if WITH_AFB_HOOK
@@ -644,41 +656,42 @@ struct afb_evtid *afb_evt_evtid_hooked_addref(struct afb_evtid *evtid)
  */
 void afb_evt_evtid_unref(struct afb_evtid *evtid)
 {
-       int found;
-       struct afb_evtid **prv;
-       struct afb_evt_listener *listener;
+       struct afb_evtid **prv, *oev;
+       struct afb_evt_watch *watch, *nwatch;
 
        if (!__atomic_sub_fetch(&evtid->refcount, 1, __ATOMIC_RELAXED)) {
                /* unlinks the event if valid! */
                pthread_rwlock_wrlock(&events_rwlock);
-               found = 0;
                prv = &evtids;
-               while (*prv && !(found = (*prv == evtid)))
-                       prv = &(*prv)->next;
-               if (found) {
-                       *prv = evtid->next;
-                       event_count--;
+               for(;;) {
+                       oev = *prv;
+                       if (oev == evtid)
+                               break;
+                       if (!oev) {
+                               ERROR("unexpected event");
+                               pthread_rwlock_unlock(&events_rwlock);
+                               return;
+                       }
+                       prv = &oev->next;
                }
+               event_count--;
+               *prv = evtid->next;
                pthread_rwlock_unlock(&events_rwlock);
 
-               /* destroys the event */
-               if (!found)
-                       ERROR("event not found");
-               else {
-                       /* removes all watchers */
-                       while(evtid->watchs != NULL) {
-                               listener = evtid->watchs->listener;
-                               pthread_rwlock_wrlock(&listener->rwlock);
-                               pthread_rwlock_wrlock(&evtid->rwlock);
-                               remove_watch(evtid->watchs);
-                               pthread_rwlock_unlock(&evtid->rwlock);
-                               pthread_rwlock_unlock(&listener->rwlock);
-                       }
-
-                       /* free */
-                       pthread_rwlock_destroy(&evtid->rwlock);
-                       free(evtid);
+               /* removes all watchers */
+               pthread_rwlock_wrlock(&evtid->rwlock);
+               watch = evtid->watchs;
+               evtid->watchs = NULL;
+               pthread_rwlock_unlock(&evtid->rwlock);
+               while(watch) {
+                       nwatch = watch->next_by_evtid;
+                       evtid_unwatch(evtid, watch->listener, watch, 1);
+                       watch = nwatch;
                }
+
+               /* free */
+               pthread_rwlock_destroy(&evtid->rwlock);
+               free(evtid);
        }
 }
 
@@ -785,28 +798,29 @@ struct afb_evt_listener *afb_evt_listener_addref(struct afb_evt_listener *listen
  */
 void afb_evt_listener_unref(struct afb_evt_listener *listener)
 {
-       struct afb_evt_listener **prv;
-       struct afb_evtid *evtid;
+       struct afb_evt_listener **prv, *olis;
 
        if (listener && !__atomic_sub_fetch(&listener->refcount, 1, __ATOMIC_RELAXED)) {
 
                /* unlink the listener */
                pthread_rwlock_wrlock(&listeners_rwlock);
                prv = &listeners;
-               while (*prv != listener)
-                       prv = &(*prv)->next;
+               for(;;) {
+                       olis = *prv;
+                       if (olis == listener)
+                               break;
+                       if (!olis) {
+                               ERROR("unexpected listener");
+                               pthread_rwlock_unlock(&listeners_rwlock);
+                               return;
+                       }
+                       prv = &olis->next;
+               }
                *prv = listener->next;
                pthread_rwlock_unlock(&listeners_rwlock);
 
                /* remove the watchers */
-               pthread_rwlock_wrlock(&listener->rwlock);
-               while (listener->watchs != NULL) {
-                       evtid = listener->watchs->evtid;
-                       pthread_rwlock_wrlock(&evtid->rwlock);
-                       remove_watch(listener->watchs);
-                       pthread_rwlock_unlock(&evtid->rwlock);
-               }
-               pthread_rwlock_unlock(&listener->rwlock);
+               afb_evt_listener_unwatch_all(listener, 0);
 
                /* free the listener */
                pthread_rwlock_destroy(&listener->rwlock);
@@ -833,7 +847,7 @@ int afb_evt_listener_watch_evt(struct afb_evt_listener *listener, struct afb_evt
        watch = listener->watchs;
        while(watch != NULL) {
                if (watch->evtid == evtid)
-                       goto found;
+                       goto end;
                watch = watch->next_by_listener;
        }
 
@@ -847,7 +861,6 @@ int afb_evt_listener_watch_evt(struct afb_evt_listener *listener, struct afb_evt
 
        /* initialise and link */
        watch->evtid = evtid;
-       watch->activity = 0;
        watch->listener = listener;
        watch->next_by_listener = listener->watchs;
        listener->watchs = watch;
@@ -856,13 +869,10 @@ int afb_evt_listener_watch_evt(struct afb_evt_listener *listener, struct afb_evt
        evtid->watchs = watch;
        pthread_rwlock_unlock(&evtid->rwlock);
 
-found:
-       if (watch->activity == 0 && listener->itf->add != NULL)
+       if (listener->itf->add != NULL)
                listener->itf->add(listener->closure, evtid->fullname, evtid->id);
-       watch->activity++;
-       evtid->has_client = 1;
+end:
        pthread_rwlock_unlock(&listener->rwlock);
-
        return 0;
 }
 
@@ -872,26 +882,26 @@ found:
  */
 int afb_evt_listener_unwatch_evt(struct afb_evt_listener *listener, struct afb_evtid *evtid)
 {
-       struct afb_evt_watch *watch;
+       struct afb_evt_watch *watch, **pwatch;
 
        /* search the existing watch */
        pthread_rwlock_wrlock(&listener->rwlock);
-       watch = listener->watchs;
-       while(watch != NULL) {
-               if (watch->evtid == evtid) {
-                       if (watch->activity != 0) {
-                               watch->activity--;
-                               if (watch->activity == 0 && listener->itf->remove != NULL)
-                                       listener->itf->remove(listener->closure, evtid->fullname, evtid->id);
-                       }
+       pwatch = &listener->watchs;
+       for (;;) {
+               watch = *pwatch;
+               if (!watch) {
+                       pthread_rwlock_unlock(&listener->rwlock);
+                       errno = ENOENT;
+                       return -1;
+               }
+               if (evtid == watch->evtid) {
+                       *pwatch = watch->next_by_listener;
                        pthread_rwlock_unlock(&listener->rwlock);
+                       listener_unwatch(listener, evtid, watch, 1);
                        return 0;
                }
-               watch = watch->next_by_listener;
+               pwatch = &watch->next_by_listener;
        }
-       pthread_rwlock_unlock(&listener->rwlock);
-       errno = ENOENT;
-       return -1;
 }
 
 /*
@@ -900,28 +910,48 @@ int afb_evt_listener_unwatch_evt(struct afb_evt_listener *listener, struct afb_e
  */
 int afb_evt_listener_unwatch_id(struct afb_evt_listener *listener, uint16_t eventid)
 {
-       struct afb_evt_watch *watch;
+       struct afb_evt_watch *watch, **pwatch;
        struct afb_evtid *evtid;
 
        /* search the existing watch */
        pthread_rwlock_wrlock(&listener->rwlock);
-       watch = listener->watchs;
-       while(watch != NULL) {
+       pwatch = &listener->watchs;
+       for (;;) {
+               watch = *pwatch;
+               if (!watch) {
+                       pthread_rwlock_unlock(&listener->rwlock);
+                       errno = ENOENT;
+                       return -1;
+               }
                evtid = watch->evtid;
                if (evtid->id == eventid) {
-                       if (watch->activity != 0) {
-                               watch->activity--;
-                               if (watch->activity == 0 && listener->itf->remove != NULL)
-                                       listener->itf->remove(listener->closure, evtid->fullname, evtid->id);
-                       }
+                       *pwatch = watch->next_by_listener;
                        pthread_rwlock_unlock(&listener->rwlock);
+                       listener_unwatch(listener, evtid, watch, 1);
                        return 0;
                }
-               watch = watch->next_by_listener;
+               pwatch = &watch->next_by_listener;
        }
+}
+
+/*
+ * Avoids the 'listener' to watch any event, calling the callback
+ * 'remove' of the interface if 'remoe' is not zero.
+ */
+void afb_evt_listener_unwatch_all(struct afb_evt_listener *listener, int remove)
+{
+       struct afb_evt_watch *watch, *nwatch;
+
+       /* search the existing watch */
+       pthread_rwlock_wrlock(&listener->rwlock);
+       watch = listener->watchs;
+       listener->watchs = NULL;
        pthread_rwlock_unlock(&listener->rwlock);
-       errno = ENOENT;
-       return -1;
+       while(watch) {
+               nwatch = watch->next_by_listener;
+               listener_unwatch(listener, watch->evtid, watch, remove);
+               watch = nwatch;
+       }
 }
 
 #if WITH_AFB_HOOK