jobs: Refactor event loop handling 27/19527/2
authorJosé Bollo <jose.bollo@iot.bzh>
Sun, 6 Jan 2019 09:34:12 +0000 (10:34 +0100)
committerJosé Bollo <jose.bollo@iot.bzh>
Wed, 16 Jan 2019 21:32:25 +0000 (22:32 +0100)
This improves the arbitration of the single event
loop across threads.

Before introduction of using 'evenfd' there was
several event loop. At the current time, there
is only one. At the end, there will probably
remain only one.

Bug-AGL: SPEC-2089

Change-Id: Iac9db7cbe15b4c9c76e6e9a8f6e641ed2a9039e0
Signed-off-by: José Bollo <jose.bollo@iot.bzh>
src/jobs.c
src/sig-monitor.c
src/sig-monitor.h

index f2c9d52..417f7ea 100644 (file)
@@ -75,20 +75,20 @@ struct evloop
        unsigned state;        /**< encoded state */
        int efd;               /**< event notification */
        struct sd_event *sdev; /**< the systemd event loop */
-       pthread_cond_t  cond;  /**< condition */
        struct fdev *fdev;     /**< handling of events */
        struct thread *holder; /**< holder of the evloop */
 };
 
 #define EVLOOP_STATE_WAIT           1U
 #define EVLOOP_STATE_RUN            2U
-#define EVLOOP_STATE_LOCK           4U
 
 /** Description of threads */
 struct thread
 {
        struct thread *next;   /**< next thread of the list */
        struct thread *upper;  /**< upper same thread */
+       struct thread *nholder;/**< next holder for evloop */
+       pthread_cond_t *cwhold;/**< condition wait for holding */
        struct job *job;       /**< currently processed job */
        pthread_t tid;         /**< the thread id */
        volatile unsigned stop: 1;      /**< stop requested */
@@ -96,7 +96,7 @@ struct thread
 };
 
 /**
- * Description of synchonous callback
+ * Description of synchronous callback
  */
 struct sync
 {
@@ -123,14 +123,13 @@ static int remains = 0; /** allowed count of waiting jobs */
 /* list of threads */
 static struct thread *threads;
 static _Thread_local struct thread *current_thread;
-static _Thread_local struct evloop *current_evloop;
 
 /* queue of pending jobs */
 static struct job *first_job;
 static struct job *free_jobs;
 
 /* event loop */
-static struct evloop evloop[1];
+static struct evloop evloop;
 
 #if defined(REMOVE_SYSTEMD_EVENT)
 static struct fdev_epoll *fdevepoll;
@@ -300,13 +299,9 @@ static void evloop_run(int signum, void *arg)
 {
        int rc;
        struct sd_event *se;
-       struct evloop *el = arg;
 
        if (!signum) {
-               current_evloop = el;
-               __atomic_store_n(&el->state, EVLOOP_STATE_LOCK|EVLOOP_STATE_RUN|EVLOOP_STATE_WAIT, __ATOMIC_RELAXED);
-               __atomic_store_n(&el->holder, current_thread, __ATOMIC_RELAXED);
-               se = el->sdev;
+               se = evloop.sdev;
                rc = sd_event_prepare(se);
                if (rc < 0) {
                        errno = -rc;
@@ -320,8 +315,7 @@ static void evloop_run(int signum, void *arg)
                                        ERROR("sd_event_wait returned an error (state: %d): %m", sd_event_get_state(se));
                                }
                        }
-                       __atomic_and_fetch(&el->state, ~(EVLOOP_STATE_WAIT), __ATOMIC_RELAXED);
-
+                       evloop.state = EVLOOP_STATE_RUN;
                        if (rc > 0) {
                                rc = sd_event_dispatch(se);
                                if (rc < 0) {
@@ -331,9 +325,93 @@ static void evloop_run(int signum, void *arg)
                        }
                }
        }
-       __atomic_and_fetch(&el->state, ~(EVLOOP_STATE_WAIT|EVLOOP_STATE_RUN), __ATOMIC_RELAXED);
 }
 
+/**
+ * Internal callback for evloop management.
+ * The effect of this function is hidden: it exits
+ * the waiting poll if any.
+ */
+static void evloop_on_efd_event()
+{
+       uint64_t x;
+       read(evloop.efd, &x, sizeof x);
+}
+
+/**
+ * wakeup the event loop if needed by sending
+ * an event.
+ */
+static void evloop_wakeup()
+{
+       uint64_t x;
+
+       if (evloop.state & EVLOOP_STATE_WAIT) {
+               x = 1;
+               write(evloop.efd, &x, sizeof x);
+       }
+}
+
+/**
+ * Release the currently held event loop
+ */
+static void evloop_release()
+{
+       struct thread *nh, *ct = current_thread;
+
+       if (evloop.holder == ct) {
+               nh = ct->nholder;
+               evloop.holder = nh;
+               if (nh)
+                       pthread_cond_signal(nh->cwhold);
+       }
+}
+
+/**
+ * get the eventloop for the current thread
+ */
+static int evloop_get()
+{
+       struct thread *ct = current_thread;
+
+       if (evloop.holder)
+               return evloop.holder == ct;
+
+       ct->nholder = NULL;
+       evloop.holder = ct;
+       return 1;
+}
+
+/**
+ * acquire the eventloop for the current thread
+ */
+static void evloop_acquire()
+{
+       struct thread **pwait, *ct;
+       pthread_cond_t cond;
+
+       /* try to get the evloop */
+       if (!evloop_get()) {
+               /* failed, init waiting state */
+               ct = current_thread;
+               ct->nholder = NULL;
+               ct->cwhold = &cond;
+               pthread_cond_init(&cond, NULL);
+
+               /* queue current thread in holder list */
+               pwait = &evloop.holder;
+               while (*pwait)
+                       pwait = &(*pwait)->nholder;
+               *pwait = ct;
+
+               /* wake up the evloop */
+               evloop_wakeup();
+
+               /* wait to acquire the evloop */
+               pthread_cond_wait(&cond, &mutex);
+               pthread_cond_destroy(&cond);
+       }
+}
 
 #if defined(REMOVE_SYSTEMD_EVENT)
 /**
@@ -363,9 +441,6 @@ static void thread_run(volatile struct thread *me)
 {
        struct thread **prv;
        struct job *job;
-#if !defined(REMOVE_SYSTEMD_EVENT)
-       struct evloop *el;
-#endif
 
        /* initialize description of itself and link it in the list */
        me->tid = pthread_self();
@@ -382,12 +457,8 @@ static void thread_run(volatile struct thread *me)
 
        /* loop until stopped */
        while (!me->stop) {
-               /* release the event loop */
-               if (current_evloop) {
-                       __atomic_and_fetch(&current_evloop->state, ~EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
-                       __atomic_store_n(&current_evloop->holder, NULL, __ATOMIC_RELAXED);
-                       current_evloop = NULL;
-               }
+               /* release the current event loop */
+               evloop_release();
 
                /* get a job */
                job = job_get();
@@ -405,27 +476,31 @@ static void thread_run(volatile struct thread *me)
                        /* release the run job */
                        job_release(job);
 #if !defined(REMOVE_SYSTEMD_EVENT)
-               } else {
-                       /* no job, check events */
-                       el = &evloop[0];
-                       if (el->sdev && !__atomic_load_n(&el->state, __ATOMIC_RELAXED)) {
-                               /* run the events */
-                               __atomic_store_n(&el->state, EVLOOP_STATE_LOCK|EVLOOP_STATE_RUN|EVLOOP_STATE_WAIT, __ATOMIC_RELAXED);
-                               __atomic_store_n(&el->holder, me, __ATOMIC_RELAXED);
-                               current_evloop = el;
-                               pthread_mutex_unlock(&mutex);
-                               sig_monitor(0, evloop_run, el);
-                               pthread_mutex_lock(&mutex);
-                       } else {
-                               /* no job and not events */
-                               running--;
-                               if (!running)
-                                       ERROR("Entering job deep sleep! Check your bindings.");
-                               me->waits = 1;
-                               pthread_cond_wait(&cond, &mutex);
-                               me->waits = 0;
-                               running++;
+
+
+
+               /* no job, check event loop wait */
+               } else if (evloop_get()) {
+                       if (evloop.state != 0) {
+                               /* busy ? */
+                               CRITICAL("Can't enter dispatch while in dispatch!");
+                               abort();
                        }
+                       /* run the events */
+                       evloop.state = EVLOOP_STATE_RUN|EVLOOP_STATE_WAIT;
+                       pthread_mutex_unlock(&mutex);
+                       sig_monitor(0, evloop_run, NULL);
+                       pthread_mutex_lock(&mutex);
+                       evloop.state = 0;
+               } else {
+                       /* no job and no event loop */
+                       running--;
+                       if (!running)
+                               ERROR("Entering job deep sleep! Check your bindings.");
+                       me->waits = 1;
+                       pthread_cond_wait(&cond, &mutex);
+                       me->waits = 0;
+                       running++;
 #else
                } else if (waitevt) {
                        /* no job and not events */
@@ -448,11 +523,7 @@ static void thread_run(volatile struct thread *me)
        }
 
        /* release the event loop */
-       if (current_evloop) {
-               __atomic_and_fetch(&current_evloop->state, ~EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
-               __atomic_store_n(&el->holder, NULL, __ATOMIC_RELAXED);
-               current_evloop = NULL;
-       }
+       evloop_release();
 
        /* unlink the current thread and cleanup */
        prv = &threads;
@@ -658,41 +729,6 @@ int jobs_enter(
        return do_sync(group, timeout, enter_cb, &sync);
 }
 
-/**
- * Internal callback for evloop management.
- * The effect of this function is hidden: it exits
- * the waiting poll if any. Then it wakes up a thread
- * awaiting the evloop using signal.
- */
-static int on_evloop_efd(sd_event_source *s, int fd, uint32_t revents, void *userdata)
-{
-       uint64_t x;
-       struct evloop *evloop = userdata;
-       read(evloop->efd, &x, sizeof x);
-       pthread_mutex_lock(&mutex);
-       pthread_cond_broadcast(&evloop->cond);
-       pthread_mutex_unlock(&mutex);
-       return 1;
-}
-
-/**
- * unlock the event loop if needed by sending
- * an event.
- * @param el the event loop to unlock
- * @param wait wait the unlocked state of the event loop
- */
-static void unlock_evloop(struct evloop *el, int wait)
-{
-       /* wait for a modifiable event loop */
-       while (__atomic_load_n(&el->state, __ATOMIC_RELAXED) & EVLOOP_STATE_WAIT) {
-               uint64_t x = 1;
-               write(el->efd, &x, sizeof x);
-               if (!wait)
-                       break;
-               pthread_cond_wait(&el->cond, &mutex);
-       }
-}
-
 /**
  * Unlocks the execution flow designed by 'jobloop'.
  * @param jobloop indication of the flow to unlock
@@ -701,7 +737,6 @@ static void unlock_evloop(struct evloop *el, int wait)
 int jobs_leave(struct jobloop *jobloop)
 {
        struct thread *t;
-       int i;
 
        pthread_mutex_lock(&mutex);
        t = threads;
@@ -713,15 +748,8 @@ int jobs_leave(struct jobloop *jobloop)
                t->stop = 1;
                if (t->waits)
                        pthread_cond_broadcast(&cond);
-               else {
-                       i = (int)(sizeof evloop / sizeof *evloop);
-                       while(i) {
-                               if (evloop[--i].holder == t) {
-                                       unlock_evloop(&evloop[i], 0);
-                                       break;
-                               }
-                       }
-               }
+               else
+                       evloop_wakeup();
        }
        pthread_mutex_unlock(&mutex);
        return -!t;
@@ -755,6 +783,18 @@ int jobs_call(
        return do_sync(group, timeout, call_cb, &sync);
 }
 
+/**
+ * Internal callback for evloop management.
+ * The effect of this function is hidden: it exits
+ * the waiting poll if any. Then it wakes up a thread
+ * awaiting the evloop using signal.
+ */
+static int on_evloop_efd(sd_event_source *s, int fd, uint32_t revents, void *userdata)
+{
+       evloop_on_efd_event();
+       return 1;
+}
+
 /* temporary hack */
 #if !defined(REMOVE_SYSTEMD_EVENT)
 __attribute__((unused))
@@ -770,35 +810,33 @@ static void evloop_callback(void *arg, uint32_t event, struct fdev *fdev)
  */
 static struct sd_event *get_sd_event_locked()
 {
-       struct evloop *el;
        int rc;
 
        /* creates the evloop on need */
-       el = &evloop[0];
-       if (!el->sdev) {
+       if (!evloop.sdev) {
                /* start the creation */
-               el->state = 0;
+               evloop.state = 0;
                /* creates the eventfd for waking up polls */
-               el->efd = eventfd(0, EFD_CLOEXEC);
-               if (el->efd < 0) {
+               evloop.efd = eventfd(0, EFD_CLOEXEC|EFD_SEMAPHORE);
+               if (evloop.efd < 0) {
                        ERROR("can't make eventfd for events");
                        goto error1;
                }
                /* create the systemd event loop */
-               rc = sd_event_new(&el->sdev);
+               rc = sd_event_new(&evloop.sdev);
                if (rc < 0) {
                        ERROR("can't make new event loop");
                        goto error2;
                }
                /* put the eventfd in the event loop */
-               rc = sd_event_add_io(el->sdev, NULL, el->efd, EPOLLIN, on_evloop_efd, el);
+               rc = sd_event_add_io(evloop.sdev, NULL, evloop.efd, EPOLLIN, on_evloop_efd, NULL);
                if (rc < 0) {
                        ERROR("can't register eventfd");
 #if !defined(REMOVE_SYSTEMD_EVENT)
-                       sd_event_unref(el->sdev);
-                       el->sdev = NULL;
+                       sd_event_unref(evloop.sdev);
+                       evloop.sdev = NULL;
 error2:
-                       close(el->efd);
+                       close(evloop.efd);
 error1:
                        return NULL;
                }
@@ -806,38 +844,27 @@ error1:
                        goto error3;
                }
                /* handle the event loop */
-               el->fdev = fdev_epoll_add(get_fdevepoll(), sd_event_get_fd(el->sdev));
-               if (!el->fdev) {
+               evloop.fdev = fdev_epoll_add(get_fdevepoll(), sd_event_get_fd(evloop.sdev));
+               if (!evloop.fdev) {
                        ERROR("can't create fdev");
 error3:
-                       sd_event_unref(el->sdev);
+                       sd_event_unref(evloop.sdev);
 error2:
-                       close(el->efd);
+                       close(evloop.efd);
 error1:
-                       memset(el, 0, sizeof *el);
+                       memset(&evloop, 0, sizeof evloop);
                        return NULL;
                }
-               fdev_set_autoclose(el->fdev, 0);
-               fdev_set_events(el->fdev, EPOLLIN);
-               fdev_set_callback(el->fdev, evloop_callback, el);
+               fdev_set_autoclose(evloop.fdev, 0);
+               fdev_set_events(evloop.fdev, EPOLLIN);
+               fdev_set_callback(evloop.fdev, evloop_callback, NULL);
 #endif
        }
 
-       /* attach the event loop to the current thread */
-       if (current_evloop != el) {
-               if (current_evloop) {
-                       __atomic_and_fetch(&current_evloop->state, ~EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
-                       __atomic_store_n(&current_evloop->holder, NULL, __ATOMIC_RELAXED);
-               }
-               current_evloop = el;
-               __atomic_or_fetch(&el->state, EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
-               __atomic_store_n(&el->holder, current_thread, __ATOMIC_RELAXED);
-       }
-
-       /* wait for a modifiable event loop */
-       unlock_evloop(el, 1);
+       /* acquire the event loop */
+       evloop_acquire();
 
-       return el->sdev;
+       return evloop.sdev;
 }
 
 /**
@@ -847,11 +874,36 @@ error1:
 struct sd_event *jobs_get_sd_event()
 {
        struct sd_event *result;
+       struct thread lt;
+
+       /* ensure an existing thread environment */
+       if (!current_thread) {
+               memset(&lt, 0, sizeof lt);
+               current_thread = &lt;
+       }
 
+       /* process */
        pthread_mutex_lock(&mutex);
        result = get_sd_event_locked();
        pthread_mutex_unlock(&mutex);
 
+       /* release the faked thread environment if needed */
+       if (current_thread == &lt) {
+               /*
+                * Releasing it is needed because there is no way to guess
+                * when it has to be released really. But here is where it is
+                * hazardous: if the caller modifies the eventloop when it
+                * is waiting, there is no way to make the change effective.
+                * A workaround to achieve that goal is for the caller to
+                * require the event loop a second time after having modified it.
+                */
+               NOTICE("Requiring sd_event loop out of binder callbacks is hazardous!");
+               if (verbose_wants(Log_Level_Info))
+                       sig_monitor_dumpstack();
+               evloop_release();
+               current_thread = NULL;
+       }
+
        return result;
 }
 
index 15fe260..9e13fa1 100644 (file)
@@ -299,3 +299,8 @@ void sig_monitor(int timeout, void (*function)(int sig, void*), void *arg)
        else
                function(0, arg);
 }
+
+void sig_monitor_dumpstack()
+{
+       return dumpstack(1, 0);
+}
index 5fdac16..03c8ec4 100644 (file)
@@ -25,3 +25,5 @@ extern int sig_monitor_enable();
 
 extern void sig_monitor(int timeout, void (*function)(int sig, void*), void *arg);
 
+extern void sig_monitor_dumpstack();
+