Fix concurrency issues on event manager
authorJosé Bollo <jose.bollo@iot.bzh>
Thu, 30 Mar 2017 11:33:11 +0000 (13:33 +0200)
committerJosé Bollo <jose.bollo@iot.bzh>
Thu, 30 Mar 2017 11:33:11 +0000 (13:33 +0200)
Having only one event manager is not possible
in multithreading due to the way that systemd
has to manage timer events. We observed that
timers were not armed when set in a thread
because event was polling in an other thread.

This patch provides more than one event manager
and at most as many as the number of threads
avalaible to start.

Change-Id: Iaeab353b7bc79ce61361ab73c7b197a9e69a6109
Signed-off-by: José Bollo <jose.bollo@iot.bzh>
src/afb-common.c
src/jobs.c
src/jobs.h
src/main.c

index 47ba40f..087a628 100644 (file)
@@ -27,6 +27,7 @@
 
 #include "afb-common.h"
 #include "locale-root.h"
+#include "jobs.h"
 
 static const char *default_locale = NULL;
 static struct locale_root *rootdir = NULL;
@@ -42,7 +43,6 @@ struct sd_event *afb_common_get_thread_event_loop()
        }
        return result;
 }
-*/
 
 static void *sdopen(void **p, int (*f)(void **))
 {
@@ -55,6 +55,7 @@ static void *sdopen(void **p, int (*f)(void **))
        }
        return *p;
 }
+*/
 
 static struct sd_bus *sdbusopen(struct sd_bus **p, int (*f)(struct sd_bus **))
 {
@@ -77,8 +78,7 @@ static struct sd_bus *sdbusopen(struct sd_bus **p, int (*f)(struct sd_bus **))
 
 struct sd_event *afb_common_get_event_loop()
 {
-       static struct sd_event *result = NULL;
-       return sdopen((void*)&result, (void*)sd_event_new);
+       return jobs_get_sd_event();
 }
 
 struct sd_bus *afb_common_get_user_bus()
index 03fe425..8ffd6b6 100644 (file)
@@ -26,6 +26,8 @@
 #include <errno.h>
 #include <assert.h>
 
+#include <systemd/sd-event.h>
+
 #include "jobs.h"
 #include "sig-monitor.h"
 #include "verbose.h"
@@ -54,16 +56,25 @@ struct job
        unsigned dropped: 1; /**< is removed ? */
 };
 
+/** Description of handled event loops */
+struct events
+{
+       struct events *next;
+       struct sd_event *event;
+       unsigned runs: 1;
+};
+
 /** Description of threads */
 struct thread
 {
-       struct thread *next;  /**< next thread of the list */
-       struct thread *upper; /**< upper same thread */
-       struct job *job;      /**< currently processed job */
-       pthread_t tid;        /**< the thread id */
-       unsigned stop: 1;     /**< stop requested */
-       unsigned lowered: 1;  /**< has a lower same thread */
-       unsigned waits: 1;    /**< is waiting? */
+       struct thread *next;   /**< next thread of the list */
+       struct thread *upper;  /**< upper same thread */
+       struct job *job;       /**< currently processed job */
+       struct events *events; /**< currently processed job */
+       pthread_t tid;         /**< the thread id */
+       unsigned stop: 1;      /**< stop requested */
+       unsigned lowered: 1;   /**< has a lower same thread */
+       unsigned waits: 1;     /**< is waiting? */
 };
 
 /* synchronisation of threads */
@@ -75,6 +86,7 @@ static int allowed = 0; /** allowed count of threads */
 static int started = 0; /** started count of threads */
 static int waiting = 0; /** waiting count of threads */
 static int remains = 0; /** allowed count of waiting jobs */
+static int nevents = 0; /** count of events */
 
 /* list of threads */
 static struct thread *threads;
@@ -82,7 +94,7 @@ static _Thread_local struct thread *current;
 
 /* queue of pending jobs */
 static struct job *first_job;
-static struct job *first_events;
+static struct events *first_events;
 static struct job *free_jobs;
 
 /**
@@ -176,16 +188,28 @@ static void job_add2(struct job *job1, struct job *job2)
 
 /**
  * Get the next job to process or NULL if none.
- * @param job the head of the list to search.
  * @return the first job that isn't blocked or NULL
  */
-static inline struct job *job_get(struct job *job)
+static inline struct job *job_get()
 {
+       struct job *job = first_job;
        while (job && job->blocked)
                job = job->next;
        return job;
 }
 
+/**
+ * Get the next events to process or NULL if none.
+ * @return the first events that isn't running or NULL
+ */
+static inline struct events *events_get()
+{
+       struct events *events = first_events;
+       while (events && events->runs)
+               events = events->next;
+       return events;
+}
+
 /**
  * Releases the processed 'job': removes it
  * from the list of jobs and unblock the first
@@ -221,48 +245,6 @@ static inline void job_release(struct job *job)
        free_jobs = job;
 }
 
-/**
- * Releases the events 'job': removes it
- * from the list of events.
- * @param job the event to release
- */
-static inline void events_release(struct job *job)
-{
-       struct job *ijob, **pjob;
-
-       /* first unqueue the job */
-       pjob = &first_events;
-       ijob = first_events;
-       while (ijob != job) {
-               pjob = &ijob->next;
-               ijob = ijob->next;
-       }
-       *pjob = job->next;
-
-       /* recycle the job */
-       job->next = free_jobs;
-       free_jobs = job;
-}
-
-/**
- * Get the events of 'key' if existing.
- * @param key the key to search
- * @return the found events or NULL if none existing has key
- */
-static inline struct job *events_of_key(void *key)
-{
-       struct job *job;
-
-       if (!key)
-               job = NULL;
-       else {
-               job = first_events;
-               while (job && (job->dropped || job->group != key))
-                       job = job->next;
-       }
-       return job;
-}
-
 /**
  * Monitored normal callback for a job.
  * This function is called by the monitor
@@ -294,6 +276,23 @@ static void job_cancel(int signum, void *arg)
        job_call(SIGABRT, arg);
 }
 
+/**
+ * Monitored normal callback for events.
+ * This function is called by the monitor
+ * to run the event loop when the safe environment
+ * is set.
+ * @param signum 0 on normal flow or the number
+ *               of the signal that interrupted the normal
+ *               flow
+ * @param arg     the events to run
+ */
+static void events_call(int signum, void *arg)
+{
+       struct events *events = arg;
+       if (!signum)
+               sd_event_run(events->event, (uint64_t) -1);
+}
+
 /**
  * Main processing loop of threads processing jobs.
  * The loop must be called with the mutex locked
@@ -301,10 +300,11 @@ static void job_cancel(int signum, void *arg)
  * @param me the description of the thread to use
  * TODO: how are timeout handled when reentering?
  */
-static void thread_run(struct thread *me)
+static void thread_run(volatile struct thread *me)
 {
        struct thread **prv;
        struct job *job;
+       struct events *events;
 
        /* initialize description of itself and link it in the list */
        me->tid = pthread_self();
@@ -316,12 +316,13 @@ static void thread_run(struct thread *me)
                current->lowered = 1;
        else
                sig_monitor_init_timeouts();
-       current = me;
+       current = (struct thread*)me;
        me->next = threads;
-       threads = me;
+       threads = (struct thread*)me;
        started++;
 
        /* loop until stopped */
+       me->events = NULL;
        while (!me->stop) {
                /* get a job */
                job = job_get(first_job);
@@ -338,18 +339,25 @@ static void thread_run(struct thread *me)
 
                        /* release the run job */
                        job_release(job);
+
+                       /* release event if any */
+                       events = me->events;
+                       if (events) {
+                               events->runs = 0;
+                               me->events = NULL;
+                       }
                } else {
                        /* no job, check events */
-                       job = job_get(first_events);
-                       if (job) {
+                       events = events_get();
+                       if (events) {
                                /* run the events */
-                               job->blocked = 1;
+                               events->runs = 1;
+                               me->events = events;
                                pthread_mutex_unlock(&mutex);
-                               sig_monitor(job->timeout, job_call, job);
+                               sig_monitor(0, events_call, events);
                                pthread_mutex_lock(&mutex);
-                               job->blocked = 0;
-                               if (job->dropped)
-                                       events_release(job);
+                               events->runs = 0;
+                               me->events = NULL;
                        } else {
                                /* no job and not events */
                                waiting++;
@@ -812,76 +820,6 @@ void jobs_terminate()
        pthread_mutex_unlock(&mutex);
 }
 
-/**
- * Adds the events waiter/dispatcher to the list of events waiters/dispatchers
- * to monitor.
- * @param key     A key to register the events waiter/dispatcher (see
- *                'jobs_del_events')
- * @param timeout Timeout in second of the function or 0 if none
- * @param events  The callback, the first argument is 0 for normal
- *                flow or the signal number when normal flow failed
- * @param closure The closure to give to the callback as secondd argument
- * @return 0 in case of success or -1 in case of error
- */
-int jobs_add_events(void *key, int timeout, void (*events)(int signum, void*), void *closure)
-{
-       struct job *job;
-
-       pthread_mutex_lock(&mutex);
-
-       /* look at an already existsing events for same key */
-       job = events_of_key(key);
-       if (job) {
-               pthread_mutex_unlock(&mutex);
-               ERROR("events of key %p already exist", key);
-               errno = EEXIST;
-               return -1;
-       }
-
-       /* creates the job */
-       job = job_create(key, timeout, (job_cb_t)events, closure, NULL, NULL);
-       if (!job) {
-               pthread_mutex_unlock(&mutex);
-               ERROR("Can't create events, out of memory");
-               errno = ENOMEM;
-               return -1;
-       }
-
-       /* adds the loop */
-       job->next = first_events;
-       first_events = job;
-
-       /* signal the loop */
-       if (waiting)
-               pthread_cond_signal(&cond);
-       pthread_mutex_unlock(&mutex);
-       return 0;
-}
-
-/**
- * Removes the events of 'key'
- * @param key The key of the events to remove
- * @return 0 in case of success or -1 in case of error
- */
-int jobs_del_events(void *key)
-{
-       struct job *job;
-
-       pthread_mutex_lock(&mutex);
-       job = events_of_key(key);
-       if (job)
-               if (job->blocked)
-                       job->dropped = 1;
-               else
-                       events_release(job);
-       pthread_mutex_unlock(&mutex);
-       if (!job) {
-               ERROR("events of key %p not found", key);
-               errno = ENOENT;
-       }
-       return -!job;
-}
-
 /**
  * Adds the current thread to the pool of threads
  * processing the jobs. Returns normally when the threads are
@@ -910,3 +848,66 @@ int jobs_add_me()
 }
 
 
+struct sd_event *jobs_get_sd_event()
+{
+       struct events *events;
+       struct thread *me;
+       int rc;
+
+       pthread_mutex_lock(&mutex);
+
+       /* search events on stack */
+       me = current;
+       while (me && !me->events)
+               me = me->upper;
+       if (me)
+               /* return the stacked events */
+               events = me->events;
+       else {
+               /* search an available events */
+               events = events_get();
+               if (!events) {
+                       /* not found, check if creation possible */
+                       if (nevents >= allowed) {
+                               ERROR("not possible to add a new event");
+                               events = NULL;
+                       } else {
+                               events = malloc(sizeof *events);
+                               if (events && (rc = sd_event_new(&events->event)) >= 0) {
+                                       if (nevents < started || start_one_thread() >= 0) {
+                                               events->runs = 0;
+                                               events->next = first_events;
+                                               first_events = events;
+                                       } else {
+                                               ERROR("can't start thread for events");
+                                               sd_event_unref(events->event);
+                                               free(events);
+                                               events = NULL;
+                                       }
+                               } else {
+                                       if (!events)
+                                               ERROR("out of memory");
+                                       else {
+                                               free(events);
+                                               ERROR("creation of sd_event failed: %m");
+                                               events = NULL;
+                                               errno = -rc;
+                                       } 
+                               }
+                       }
+               }
+               if (events) {
+                       /* */
+                       me = current;
+                       if (me) {
+                               events->runs = 1;
+                               me->events = events;
+                       } else {
+                               WARNING("event returned for unknown thread!");
+                       }
+               }
+       }
+       pthread_mutex_unlock(&mutex);
+       return events ? events->event : NULL;
+}
+
index 3c0746c..0eceef3 100644 (file)
@@ -17,6 +17,8 @@
 
 #pragma once
 
+struct sd_event;
+
 extern int jobs_queue0(
                void *group,
                int timeout,
@@ -65,8 +67,7 @@ extern int jobs_invoke3(
                void *arg2,
                void *arg3);
 
-extern int jobs_add_events(void *key, int timeout, void (*events)(int, void*), void *closure);
-extern int jobs_del_events(void *key);
+extern struct sd_event *jobs_get_sd_event();
 
 extern int jobs_init(int allowed_count, int start_count, int waiter_count);
 extern int jobs_add_me();
index 3809400..9c2f3c5 100644 (file)
@@ -393,17 +393,6 @@ static int execute_command()
        return -1;
 }
 
-/*---------------------------------------------------------
- | main event processing
- +--------------------------------------------------------- */
-
-static void main_event_wait_and_dispatch(int signum, void *closure)
-{
-       struct sd_event *event = closure;
-       if (signum == 0)
-               sd_event_run(event, 30000000);
-}
-
 /*---------------------------------------------------------
  | job for starting the daemon
  +--------------------------------------------------------- */
@@ -518,15 +507,9 @@ int main(int argc, char *argv[])
                return 1;
        }
 
-       /* records the loop */
-       if (jobs_add_events(NULL, 0, main_event_wait_and_dispatch, afb_common_get_event_loop()) < 0) {
-               ERROR("failed to set main_event_wait_and_dispatch");
-               return 1;
-       }
-
        /* queue the start job */
        if (jobs_queue0(NULL, 0, start) < 0) {
-               ERROR("failed to set main_event_wait_and_dispatch");
+               ERROR("failed to start runnning jobs");
                return 1;
        }