afb-hook: Improve reentrancy of hooking 83/18583/1
authorJosé Bollo <jose.bollo@iot.bzh>
Tue, 4 Dec 2018 11:05:01 +0000 (12:05 +0100)
committerJose Bollo <jose.bollo@iot.bzh>
Wed, 5 Dec 2018 09:08:10 +0000 (10:08 +0100)
Work on testing showed that a deadlock
was occuring in management of hooks.

Change-Id: Ib51eb4f0b9ffc5d9dfe2770f2c3f8f47f262b60f
Signed-off-by: José Bollo <jose.bollo@iot.bzh>
src/afb-hook.c

index 3d57ca5..3fcd36d 100644 (file)
@@ -116,8 +116,9 @@ struct afb_hook_global {
        void *closure; /**< closure for callbacks */
 };
 
-/* synchronisation across threads */
+/* synchronization across threads */
 static pthread_rwlock_t rwlock = PTHREAD_RWLOCK_INITIALIZER;
+static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
 
 /* list of hooks for xreq */
 static struct afb_hook_xreq *list_of_xreq_hooks = NULL;
@@ -171,24 +172,6 @@ static char *_pbuf_(const char *fmt, va_list args, char **palloc, char *sbuf, si
        return sbuf;
 }
 
-#if 0 /* old behaviour: use NOTICE */
-static void _hook_(const char *fmt1, const char *fmt2, va_list arg2, ...)
-{
-       char *tag, *data, *mem1, *mem2, buf1[256], buf2[2000];
-       va_list arg1;
-
-       data = _pbuf_(fmt2, arg2, &mem2, buf2, sizeof buf2, NULL);
-
-       va_start(arg1, arg2);
-       tag = _pbuf_(fmt1, arg1, &mem1, buf1, sizeof buf1, NULL);
-       va_end(arg1);
-
-       NOTICE("[HOOK %s] %s", tag, data);
-
-       free(mem1);
-       free(mem2);
-}
-#else /* new behaviour: emits directly to stderr */
 static void _hook_(const char *fmt1, const char *fmt2, va_list arg2, ...)
 {
        static const char chars[] = "HOOK: [] \n";
@@ -196,18 +179,23 @@ static void _hook_(const char *fmt1, const char *fmt2, va_list arg2, ...)
        struct iovec iov[5];
        va_list arg1;
 
+       /* "HOOK: [" */
        iov[0].iov_base = (void*)&chars[0];
        iov[0].iov_len = 7;
 
+       /* fmt1 ... */
        va_start(arg1, arg2);
        iov[1].iov_base = _pbuf_(fmt1, arg1, &mem1, buf1, sizeof buf1, &iov[1].iov_len);
        va_end(arg1);
 
+       /* "] " */
        iov[2].iov_base = (void*)&chars[7];
        iov[2].iov_len = 2;
 
+       /* fmt2 arg2 */
        iov[3].iov_base = _pbuf_(fmt2, arg2, &mem2, buf2, sizeof buf2, &iov[3].iov_len);
 
+       /* "\n" */
        iov[4].iov_base = (void*)&chars[9];
        iov[4].iov_len = 1;
 
@@ -216,7 +204,6 @@ static void _hook_(const char *fmt1, const char *fmt2, va_list arg2, ...)
        free(mem1);
        free(mem2);
 }
-#endif
 
 static void _hook_xreq_(const struct afb_xreq *xreq, const char *format, ...)
 {
@@ -413,7 +400,8 @@ static struct afb_hook_xreq_itf hook_xreq_default_itf = {
        init_hookid(&hookid); \
        hook = list_of_xreq_hooks; \
        while (hook) { \
-               if (hook->itf->hook_xreq_##func \
+               if (hook->refcount \
+                && hook->itf->hook_xreq_##func \
                 && (hook->flags & afb_hook_flag_req_##flag) != 0 \
                 && (!hook->session || hook->session == xreq->context.session) \
                 && MATCH_API(hook->api, xreq->request.called_api) \
@@ -569,10 +557,10 @@ struct json_object *afb_hook_xreq_get_client_info(const struct afb_xreq *xreq, s
 
 void afb_hook_init_xreq(struct afb_xreq *xreq)
 {
-       static int reqindex;
+       static int reqindex = 0;
 
        int f, flags;
-       int add;
+       int add, x;
        struct afb_hook_xreq *hook;
 
        /* scan hook list to get the expected flags */
@@ -594,11 +582,10 @@ void afb_hook_init_xreq(struct afb_xreq *xreq)
        /* store the hooking data */
        xreq->hookflags = flags;
        if (flags) {
-               pthread_rwlock_wrlock(&rwlock);
-               if (++reqindex < 0)
-                       reqindex = 1;
-               xreq->hookindex = reqindex;
-               pthread_rwlock_unlock(&rwlock);
+               do {
+                       x = __atomic_load_n(&reqindex, __ATOMIC_RELAXED);
+                       xreq->hookindex = (x + 1) % 1000000 ?: 1;
+               } while (x != __atomic_exchange_n(&reqindex, xreq->hookindex, __ATOMIC_RELAXED));
        }
 }
 
@@ -631,10 +618,10 @@ struct afb_hook_xreq *afb_hook_create_xreq(const char *api, const char *verb, st
        hook->closure = closure;
 
        /* record the hook */
-       pthread_rwlock_wrlock(&rwlock);
+       pthread_mutex_lock(&mutex);
        hook->next = list_of_xreq_hooks;
        list_of_xreq_hooks = hook;
-       pthread_rwlock_unlock(&rwlock);
+       pthread_mutex_unlock(&mutex);
 
        /* returns it */
        return hook;
@@ -642,31 +629,34 @@ struct afb_hook_xreq *afb_hook_create_xreq(const char *api, const char *verb, st
 
 struct afb_hook_xreq *afb_hook_addref_xreq(struct afb_hook_xreq *hook)
 {
-       pthread_rwlock_wrlock(&rwlock);
-       hook->refcount++;
-       pthread_rwlock_unlock(&rwlock);
+       __atomic_add_fetch(&hook->refcount, 1, __ATOMIC_RELAXED);
        return hook;
 }
 
-void afb_hook_unref_xreq(struct afb_hook_xreq *hook)
+static void hook_clean_xreq()
 {
-       struct afb_hook_xreq **prv;
+       struct afb_hook_xreq **prv, *hook, *head;
 
-       if (hook) {
-               pthread_rwlock_wrlock(&rwlock);
-               if (--hook->refcount)
-                       hook = NULL;
-               else {
-                       /* unlink */
-                       prv = &list_of_xreq_hooks;
-                       while (*prv && *prv != hook)
+       if (pthread_rwlock_trywrlock(&rwlock) == 0) {
+               /* unlink under mutex */
+               head = NULL;
+               pthread_mutex_lock(&mutex);
+               prv = &list_of_xreq_hooks;
+               while ((hook = *prv)) {
+                       if (hook->refcount)
                                prv = &(*prv)->next;
-                       if(*prv)
+                       else {
                                *prv = hook->next;
+                               hook->next = head;
+                               head = hook;
+                       }
                }
+               pthread_mutex_unlock(&mutex);
                pthread_rwlock_unlock(&rwlock);
-               if (hook) {
-                       /* free */
+
+               /* free found hooks */
+               while((hook = head)) {
+                       head = hook->next;
                        free(hook->api);
                        free(hook->verb);
                        if (hook->session)
@@ -676,6 +666,12 @@ void afb_hook_unref_xreq(struct afb_hook_xreq *hook)
        }
 }
 
+void afb_hook_unref_xreq(struct afb_hook_xreq *hook)
+{
+       if (hook && !__atomic_sub_fetch(&hook->refcount, 1, __ATOMIC_RELAXED))
+               hook_clean_xreq();
+}
+
 /******************************************************************************
  * section: default callbacks for tracing daemon interface
  *****************************************************************************/
@@ -972,7 +968,8 @@ static struct afb_hook_api_itf hook_api_default_itf = {
        init_hookid(&hookid); \
        hook = list_of_api_hooks; \
        while (hook) { \
-               if (hook->itf->hook_api_##func \
+               if (hook->refcount \
+                && hook->itf->hook_api_##func \
                 && (hook->flags & afb_hook_flag_api_##flag) != 0 \
                 && MATCH_API(hook->api, apiname)) { \
                        hook->itf->hook_api_##func(hook->closure, &hookid, __VA_ARGS__); \
@@ -1245,10 +1242,10 @@ struct afb_hook_api *afb_hook_create_api(const char *api, int flags, struct afb_
        hook->closure = closure;
 
        /* record the hook */
-       pthread_rwlock_wrlock(&rwlock);
+       pthread_mutex_lock(&mutex);
        hook->next = list_of_api_hooks;
        list_of_api_hooks = hook;
-       pthread_rwlock_unlock(&rwlock);
+       pthread_mutex_unlock(&mutex);
 
        /* returns it */
        return hook;
@@ -1256,37 +1253,46 @@ struct afb_hook_api *afb_hook_create_api(const char *api, int flags, struct afb_
 
 struct afb_hook_api *afb_hook_addref_api(struct afb_hook_api *hook)
 {
-       pthread_rwlock_wrlock(&rwlock);
-       hook->refcount++;
-       pthread_rwlock_unlock(&rwlock);
+       __atomic_add_fetch(&hook->refcount, 1, __ATOMIC_RELAXED);
        return hook;
 }
 
-void afb_hook_unref_api(struct afb_hook_api *hook)
+static void hook_clean_api()
 {
-       struct afb_hook_api **prv;
+       struct afb_hook_api **prv, *hook, *head;
 
-       if (hook) {
-               pthread_rwlock_wrlock(&rwlock);
-               if (--hook->refcount)
-                       hook = NULL;
-               else {
-                       /* unlink */
-                       prv = &list_of_api_hooks;
-                       while (*prv && *prv != hook)
+       if (pthread_rwlock_trywrlock(&rwlock) == 0) {
+               /* unlink under mutex */
+               head = NULL;
+               pthread_mutex_lock(&mutex);
+               prv = &list_of_api_hooks;
+               while ((hook = *prv)) {
+                       if (hook->refcount)
                                prv = &(*prv)->next;
-                       if(*prv)
+                       else {
                                *prv = hook->next;
+                               hook->next = head;
+                               head = hook;
+                       }
                }
+               pthread_mutex_unlock(&mutex);
                pthread_rwlock_unlock(&rwlock);
-               if (hook) {
-                       /* free */
+
+               /* free found hooks */
+               while((hook = head)) {
+                       head = hook->next;
                        free(hook->api);
                        free(hook);
                }
        }
 }
 
+void afb_hook_unref_api(struct afb_hook_api *hook)
+{
+       if (hook && !__atomic_sub_fetch(&hook->refcount, 1, __ATOMIC_RELAXED))
+               hook_clean_api();
+}
+
 /******************************************************************************
  * section: default callbacks for tracing service interface (evt)
  *****************************************************************************/
@@ -1362,7 +1368,8 @@ static struct afb_hook_evt_itf hook_evt_default_itf = {
        init_hookid(&hookid); \
        hook = list_of_evt_hooks; \
        while (hook) { \
-               if (hook->itf->hook_evt_##what \
+               if (hook->refcount \
+                && hook->itf->hook_evt_##what \
                 && (hook->flags & afb_hook_flag_evt_##what) != 0 \
                 && MATCH_EVENT(hook->pattern, evt)) { \
                        hook->itf->hook_evt_##what(hook->closure, &hookid, __VA_ARGS__); \
@@ -1457,10 +1464,10 @@ struct afb_hook_evt *afb_hook_create_evt(const char *pattern, int flags, struct
        hook->closure = closure;
 
        /* record the hook */
-       pthread_rwlock_wrlock(&rwlock);
+       pthread_mutex_lock(&mutex);
        hook->next = list_of_evt_hooks;
        list_of_evt_hooks = hook;
-       pthread_rwlock_unlock(&rwlock);
+       pthread_mutex_unlock(&mutex);
 
        /* returns it */
        return hook;
@@ -1468,37 +1475,46 @@ struct afb_hook_evt *afb_hook_create_evt(const char *pattern, int flags, struct
 
 struct afb_hook_evt *afb_hook_addref_evt(struct afb_hook_evt *hook)
 {
-       pthread_rwlock_wrlock(&rwlock);
-       hook->refcount++;
-       pthread_rwlock_unlock(&rwlock);
+       __atomic_add_fetch(&hook->refcount, 1, __ATOMIC_RELAXED);
        return hook;
 }
 
-void afb_hook_unref_evt(struct afb_hook_evt *hook)
+static void hook_clean_evt()
 {
-       struct afb_hook_evt **prv;
+       struct afb_hook_evt **prv, *hook, *head;
 
-       if (hook) {
-               pthread_rwlock_wrlock(&rwlock);
-               if (--hook->refcount)
-                       hook = NULL;
-               else {
-                       /* unlink */
-                       prv = &list_of_evt_hooks;
-                       while (*prv && *prv != hook)
+       if (pthread_rwlock_trywrlock(&rwlock) == 0) {
+               /* unlink under mutex */
+               head = NULL;
+               pthread_mutex_lock(&mutex);
+               prv = &list_of_evt_hooks;
+               while ((hook = *prv)) {
+                       if (hook->refcount)
                                prv = &(*prv)->next;
-                       if(*prv)
+                       else {
                                *prv = hook->next;
+                               hook->next = head;
+                               head = hook;
+                       }
                }
+               pthread_mutex_unlock(&mutex);
                pthread_rwlock_unlock(&rwlock);
-               if (hook) {
-                       /* free */
+
+               /* free found hooks */
+               while((hook = head)) {
+                       head = hook->next;
                        free(hook->pattern);
                        free(hook);
                }
        }
 }
 
+void afb_hook_unref_evt(struct afb_hook_evt *hook)
+{
+       if (hook && !__atomic_sub_fetch(&hook->refcount, 1, __ATOMIC_RELAXED))
+               hook_clean_evt();
+}
+
 /******************************************************************************
  * section: default callbacks for sessions (session)
  *****************************************************************************/
@@ -1562,7 +1578,8 @@ static struct afb_hook_session_itf hook_session_default_itf = {
        init_hookid(&hookid); \
        hook = list_of_session_hooks; \
        while (hook) { \
-               if (hook->itf->hook_session_##what \
+               if (hook->refcount \
+                && hook->itf->hook_session_##what \
                 && (hook->flags & afb_hook_flag_session_##what) != 0 \
                 && MATCH_SESSION(hook->pattern, (sessid?:(sessid=afb_session_uuid(session))))) { \
                        hook->itf->hook_session_##what(hook->closure, &hookid, __VA_ARGS__); \
@@ -1629,10 +1646,10 @@ struct afb_hook_session *afb_hook_create_session(const char *pattern, int flags,
        hook->closure = closure;
 
        /* record the hook */
-       pthread_rwlock_wrlock(&rwlock);
+       pthread_mutex_lock(&mutex);
        hook->next = list_of_session_hooks;
        list_of_session_hooks = hook;
-       pthread_rwlock_unlock(&rwlock);
+       pthread_mutex_unlock(&mutex);
 
        /* returns it */
        return hook;
@@ -1640,37 +1657,46 @@ struct afb_hook_session *afb_hook_create_session(const char *pattern, int flags,
 
 struct afb_hook_session *afb_hook_addref_session(struct afb_hook_session *hook)
 {
-       pthread_rwlock_wrlock(&rwlock);
-       hook->refcount++;
-       pthread_rwlock_unlock(&rwlock);
+       __atomic_add_fetch(&hook->refcount, 1, __ATOMIC_RELAXED);
        return hook;
 }
 
-void afb_hook_unref_session(struct afb_hook_session *hook)
+static void hook_clean_session()
 {
-       struct afb_hook_session **prv;
+       struct afb_hook_session **prv, *hook, *head;
 
-       if (hook) {
-               pthread_rwlock_wrlock(&rwlock);
-               if (--hook->refcount)
-                       hook = NULL;
-               else {
-                       /* unlink */
-                       prv = &list_of_session_hooks;
-                       while (*prv && *prv != hook)
+       if (pthread_rwlock_trywrlock(&rwlock) == 0) {
+               /* unlink under mutex */
+               head = NULL;
+               pthread_mutex_lock(&mutex);
+               prv = &list_of_session_hooks;
+               while ((hook = *prv)) {
+                       if (hook->refcount)
                                prv = &(*prv)->next;
-                       if(*prv)
+                       else {
                                *prv = hook->next;
+                               hook->next = head;
+                               head = hook;
+                       }
                }
+               pthread_mutex_unlock(&mutex);
                pthread_rwlock_unlock(&rwlock);
-               if (hook) {
-                       /* free */
+
+               /* free found hooks */
+               while((hook = head)) {
+                       head = hook->next;
                        free(hook->pattern);
                        free(hook);
                }
        }
 }
 
+void afb_hook_unref_session(struct afb_hook_session *hook)
+{
+       if (hook && !__atomic_sub_fetch(&hook->refcount, 1, __ATOMIC_RELAXED))
+               hook_clean_session();
+}
+
 /******************************************************************************
  * section: default callbacks for globals (global)
  *****************************************************************************/
@@ -1716,7 +1742,8 @@ static struct afb_hook_global_itf hook_global_default_itf = {
        init_hookid(&hookid); \
        hook = list_of_global_hooks; \
        while (hook) { \
-               if (hook->itf->hook_global_##what \
+               if (hook->refcount \
+                && hook->itf->hook_global_##what \
                 && (hook->flags & afb_hook_flag_global_##what) != 0) { \
                        hook->itf->hook_global_##what(hook->closure, &hookid, __VA_ARGS__); \
                } \
@@ -1741,7 +1768,8 @@ static void update_global()
        pthread_rwlock_rdlock(&rwlock);
        hook = list_of_global_hooks;
        while (hook) {
-               flags = hook->flags;
+               if (hook->refcount)
+                       flags = hook->flags;
                hook = hook->next;
        }
        verbose_observer = (flags & afb_hook_flag_global_vverbose) ? afb_hook_global_vverbose : NULL;
@@ -1764,10 +1792,10 @@ struct afb_hook_global *afb_hook_create_global(int flags, struct afb_hook_global
        hook->closure = closure;
 
        /* record the hook */
-       pthread_rwlock_wrlock(&rwlock);
+       pthread_mutex_lock(&mutex);
        hook->next = list_of_global_hooks;
        list_of_global_hooks = hook;
-       pthread_rwlock_unlock(&rwlock);
+       pthread_mutex_unlock(&mutex);
 
        /* update hooking */
        update_global();
@@ -1778,36 +1806,45 @@ struct afb_hook_global *afb_hook_create_global(int flags, struct afb_hook_global
 
 struct afb_hook_global *afb_hook_addref_global(struct afb_hook_global *hook)
 {
-       pthread_rwlock_wrlock(&rwlock);
-       hook->refcount++;
-       pthread_rwlock_unlock(&rwlock);
+       __atomic_add_fetch(&hook->refcount, 1, __ATOMIC_RELAXED);
        return hook;
 }
 
-void afb_hook_unref_global(struct afb_hook_global *hook)
+static void hook_clean_global()
 {
-       struct afb_hook_global **prv;
+       struct afb_hook_global **prv, *hook, *head;
 
-       if (hook) {
-               pthread_rwlock_wrlock(&rwlock);
-               if (--hook->refcount)
-                       hook = NULL;
-               else {
-                       /* unlink */
-                       prv = &list_of_global_hooks;
-                       while (*prv && *prv != hook)
+       if (pthread_rwlock_trywrlock(&rwlock) == 0) {
+               /* unlink under mutex */
+               head = NULL;
+               pthread_mutex_lock(&mutex);
+               prv = &list_of_global_hooks;
+               while ((hook = *prv)) {
+                       if (hook->refcount)
                                prv = &(*prv)->next;
-                       if(*prv)
+                       else {
                                *prv = hook->next;
+                               hook->next = head;
+                               head = hook;
+                       }
                }
+               pthread_mutex_unlock(&mutex);
                pthread_rwlock_unlock(&rwlock);
-               if (hook) {
-                       /* free */
-                       free(hook);
 
-                       /* update hooking */
-                       update_global();
+               /* free found hooks */
+               while((hook = head)) {
+                       head = hook->next;
+                       free(hook);
                }
        }
 }
 
+void afb_hook_unref_global(struct afb_hook_global *hook)
+{
+       if (hook && !__atomic_sub_fetch(&hook->refcount, 1, __ATOMIC_RELAXED)) {
+               /* update hooking */
+               update_global();
+               hook_clean_global();
+       }
+}
+