jobs: Improve event loop integration
authorJosé Bollo <jose.bollo@iot.bzh>
Fri, 3 Nov 2017 12:04:30 +0000 (13:04 +0100)
committerJosé Bollo <jose.bollo@iot.bzh>
Fri, 3 Nov 2017 12:04:30 +0000 (13:04 +0100)
The previous implmentation was buggy.

This changes make the event loop a thread global
variable. A thread now refuses to run an event loop
if it is in dispatching state.

Change-Id: Ic29792b87c1cae201958feb96d93678f6d37ac8d
Signed-off-by: José Bollo <jose.bollo@iot.bzh>
src/jobs.c

index 93e864f..b7d1611 100644 (file)
@@ -64,8 +64,11 @@ struct events
        struct events *next;
        struct sd_event *event;
        uint64_t timeout;
-       unsigned used: 1;
-       unsigned runs: 1;
+       enum {
+               Available,
+               Modifiable,
+               Locked
+       } state;
 };
 
 /** Description of threads */
@@ -74,10 +77,8 @@ struct thread
        struct thread *next;   /**< next thread of the list */
        struct thread *upper;  /**< upper same thread */
        struct job *job;       /**< currently processed job */
-       struct events *events; /**< currently processed job */
        pthread_t tid;         /**< the thread id */
        unsigned stop: 1;      /**< stop requested */
-       unsigned lowered: 1;   /**< has a lower same thread */
        unsigned waits: 1;     /**< is waiting? */
 };
 
@@ -109,7 +110,8 @@ static int nevents = 0; /** count of events */
 
 /* list of threads */
 static struct thread *threads;
-static _Thread_local struct thread *current;
+static _Thread_local struct thread *current_thread;
+static _Thread_local struct events *current_events;
 
 /* queue of pending jobs */
 static struct job *first_job;
@@ -204,7 +206,7 @@ static inline struct job *job_get()
 static inline struct events *events_get()
 {
        struct events *events = first_events;
-       while (events && events->used)
+       while (events && events->state != Available)
                events = events->next;
        return events;
 }
@@ -272,9 +274,34 @@ static void job_cancel(int signum, void *arg)
  */
 static void events_call(int signum, void *arg)
 {
+       int rc;
+       struct sd_event *se;
        struct events *events = arg;
-       if (!signum)
-               sd_event_run(events->event, events->timeout);
+
+       if (!signum) {
+               se = events->event;
+               rc = sd_event_prepare(se);
+               if (rc < 0) {
+                       errno = -rc;
+                       ERROR("sd_event_prepare returned an error (state: %d): %m", sd_event_get_state(events->event));
+               } else {
+                       if (rc == 0) {
+                               rc = sd_event_wait(se, events->timeout);
+                               if (rc < 0) {
+                                       errno = -rc;
+                                       ERROR("sd_event_wait returned an error (state: %d): %m", sd_event_get_state(events->event));
+                               }
+                       }
+
+                       if (rc > 0) {
+                               rc = sd_event_dispatch(se);
+                               if (rc < 0) {
+                                       errno = -rc;
+                                       ERROR("sd_event_dispatch returned an error (state: %d): %m", sd_event_get_state(events->event));
+                               }
+                       }
+               }
+       }
 }
 
 /**
@@ -286,7 +313,7 @@ static void events_call(int signum, void *arg)
  */
 static void thread_run(volatile struct thread *me)
 {
-       struct thread **prv, *thr;
+       struct thread **prv;
        struct job *job;
        struct events *events;
        uint64_t evto;
@@ -294,22 +321,18 @@ static void thread_run(volatile struct thread *me)
        /* initialize description of itself and link it in the list */
        me->tid = pthread_self();
        me->stop = 0;
-       me->lowered = 0;
        me->waits = 0;
-       me->upper = current;
-       if (current) {
-               current->lowered = 1;
+       me->upper = current_thread;
+       if (current_thread) {
                evto = EVENT_TIMEOUT_CHILD;
-               me->events = current->events;
        } else {
                started++;
                sig_monitor_init_timeouts();
                evto = EVENT_TIMEOUT_TOP;
-               me->events = NULL;
        }
        me->next = threads;
        threads = (struct thread*)me;
-       current = (struct thread*)me;
+       current_thread = (struct thread*)me;
 
        /* loop until stopped */
        while (!me->stop) {
@@ -326,37 +349,32 @@ static void thread_run(volatile struct thread *me)
                        sig_monitor(job->timeout, job->callback, job->arg);
                        pthread_mutex_lock(&mutex);
 
-                       /* release the run job */
-                       job_release(job);
-
                        /* release event if any */
-                       events = me->events;
-                       if (events) {
-                               events->used = 0;
-                               me->events = NULL;
+                       events = current_events;
+                       if (events && events->state == Modifiable) {
+                               current_events = NULL;
+                               events->state = Available;
                        }
+
+                       /* release the run job */
+                       job_release(job);
                } else {
                        /* no job, check events */
-                       events = me->events;
-                       if (!events || events->runs)
+                       events = current_events;
+                       if (!events)
                                events = events_get();
+                       else if (events->state == Locked)
+                               events = 0;
                        if (events) {
                                /* run the events */
-                               events->used = 1;
-                               events->runs = 1;
+                               events->state = Locked;
                                events->timeout = evto;
-                               me->events = events;
+                               current_events = events;
                                pthread_mutex_unlock(&mutex);
                                sig_monitor(0, events_call, events);
                                pthread_mutex_lock(&mutex);
-                               events->used = 0;
-                               events->runs = 0;
-                               me->events = NULL;
-                               thr = me->upper;
-                               while (thr && thr->events == events) {
-                                       thr->events = NULL;
-                                       thr = thr->upper;
-                               }
+                               current_events = NULL;
+                               events->state = Available;
                        } else {
                                /* no job and not events */
                                waiting++;
@@ -373,10 +391,8 @@ static void thread_run(volatile struct thread *me)
        while (*prv != me)
                prv = &(*prv)->next;
        *prv = me->next;
-       current = me->upper;
-       if (current) {
-               current->lowered = 0;
-       } else {
+       current_thread = me->upper;
+       if (!current_thread) {
                sig_monitor_clean_timeouts();
                started--;
        }
@@ -631,19 +647,13 @@ int jobs_call(
 struct sd_event *jobs_get_sd_event()
 {
        struct events *events;
-       struct thread *me;
        int rc;
 
        pthread_mutex_lock(&mutex);
 
        /* search events on stack */
-       me = current;
-       while (me && !me->events)
-               me = me->upper;
-       if (me)
-               /* return the stacked events */
-               events = me->events;
-       else {
+       events = current_events;
+       if (!events) {
                /* search an available events */
                events = events_get();
                if (!events) {
@@ -655,8 +665,7 @@ struct sd_event *jobs_get_sd_event()
                                events = malloc(sizeof *events);
                                if (events && (rc = sd_event_new(&events->event)) >= 0) {
                                        if (nevents < started || start_one_thread() >= 0) {
-                                               events->used = 0;
-                                               events->runs = 0;
+                                               events->state = Available;
                                                events->next = first_events;
                                                first_events = events;
                                        } else {
@@ -679,13 +688,10 @@ struct sd_event *jobs_get_sd_event()
                        }
                }
                if (events) {
-                       me = current;
-                       if (me) {
-                               events->used = 1;
-                               me->events = events;
-                       } else {
+                       events->state = Modifiable;
+                       if (!current_thread)
                                WARNING("event returned for unknown thread!");
-                       }
+                       current_events = events;
                }
        }
        pthread_mutex_unlock(&mutex);
@@ -715,7 +721,7 @@ int jobs_start(int allowed_count, int start_count, int waiter_count, void (*star
        pthread_mutex_lock(&mutex);
 
        /* check whether already running */
-       if (current || allowed) {
+       if (current_thread || allowed) {
                ERROR("thread already started");
                errno = EINVAL;
                goto error;
@@ -822,7 +828,7 @@ void jobs_terminate()
                head = job->next;
 
                /* search if job is stacked for current */
-               t = current;
+               t = current_thread;
                while (t && t->job != job)
                        t = t->upper;
                if (t) {