Fix concurrency issues on event manager
[src/app-framework-binder.git] / src / jobs.c
index 03fe425..8ffd6b6 100644 (file)
@@ -26,6 +26,8 @@
 #include <errno.h>
 #include <assert.h>
 
+#include <systemd/sd-event.h>
+
 #include "jobs.h"
 #include "sig-monitor.h"
 #include "verbose.h"
@@ -54,16 +56,25 @@ struct job
        unsigned dropped: 1; /**< is removed ? */
 };
 
+/** Description of handled event loops */
+struct events
+{
+       struct events *next;
+       struct sd_event *event;
+       unsigned runs: 1;
+};
+
 /** Description of threads */
 struct thread
 {
-       struct thread *next;  /**< next thread of the list */
-       struct thread *upper; /**< upper same thread */
-       struct job *job;      /**< currently processed job */
-       pthread_t tid;        /**< the thread id */
-       unsigned stop: 1;     /**< stop requested */
-       unsigned lowered: 1;  /**< has a lower same thread */
-       unsigned waits: 1;    /**< is waiting? */
+       struct thread *next;   /**< next thread of the list */
+       struct thread *upper;  /**< upper same thread */
+       struct job *job;       /**< currently processed job */
+       struct events *events; /**< currently processed job */
+       pthread_t tid;         /**< the thread id */
+       unsigned stop: 1;      /**< stop requested */
+       unsigned lowered: 1;   /**< has a lower same thread */
+       unsigned waits: 1;     /**< is waiting? */
 };
 
 /* synchronisation of threads */
@@ -75,6 +86,7 @@ static int allowed = 0; /** allowed count of threads */
 static int started = 0; /** started count of threads */
 static int waiting = 0; /** waiting count of threads */
 static int remains = 0; /** allowed count of waiting jobs */
+static int nevents = 0; /** count of events */
 
 /* list of threads */
 static struct thread *threads;
@@ -82,7 +94,7 @@ static _Thread_local struct thread *current;
 
 /* queue of pending jobs */
 static struct job *first_job;
-static struct job *first_events;
+static struct events *first_events;
 static struct job *free_jobs;
 
 /**
@@ -176,16 +188,28 @@ static void job_add2(struct job *job1, struct job *job2)
 
 /**
  * Get the next job to process or NULL if none.
- * @param job the head of the list to search.
  * @return the first job that isn't blocked or NULL
  */
-static inline struct job *job_get(struct job *job)
+static inline struct job *job_get()
 {
+       struct job *job = first_job;
        while (job && job->blocked)
                job = job->next;
        return job;
 }
 
+/**
+ * Get the next events to process or NULL if none.
+ * @return the first events that isn't running or NULL
+ */
+static inline struct events *events_get()
+{
+       struct events *events = first_events;
+       while (events && events->runs)
+               events = events->next;
+       return events;
+}
+
 /**
  * Releases the processed 'job': removes it
  * from the list of jobs and unblock the first
@@ -221,48 +245,6 @@ static inline void job_release(struct job *job)
        free_jobs = job;
 }
 
-/**
- * Releases the events 'job': removes it
- * from the list of events.
- * @param job the event to release
- */
-static inline void events_release(struct job *job)
-{
-       struct job *ijob, **pjob;
-
-       /* first unqueue the job */
-       pjob = &first_events;
-       ijob = first_events;
-       while (ijob != job) {
-               pjob = &ijob->next;
-               ijob = ijob->next;
-       }
-       *pjob = job->next;
-
-       /* recycle the job */
-       job->next = free_jobs;
-       free_jobs = job;
-}
-
-/**
- * Get the events of 'key' if existing.
- * @param key the key to search
- * @return the found events or NULL if none existing has key
- */
-static inline struct job *events_of_key(void *key)
-{
-       struct job *job;
-
-       if (!key)
-               job = NULL;
-       else {
-               job = first_events;
-               while (job && (job->dropped || job->group != key))
-                       job = job->next;
-       }
-       return job;
-}
-
 /**
  * Monitored normal callback for a job.
  * This function is called by the monitor
@@ -294,6 +276,23 @@ static void job_cancel(int signum, void *arg)
        job_call(SIGABRT, arg);
 }
 
+/**
+ * Monitored normal callback for events.
+ * This function is called by the monitor
+ * to run the event loop when the safe environment
+ * is set.
+ * @param signum 0 on normal flow or the number
+ *               of the signal that interrupted the normal
+ *               flow
+ * @param arg     the events to run
+ */
+static void events_call(int signum, void *arg)
+{
+       struct events *events = arg;
+       if (!signum)
+               sd_event_run(events->event, (uint64_t) -1);
+}
+
 /**
  * Main processing loop of threads processing jobs.
  * The loop must be called with the mutex locked
@@ -301,10 +300,11 @@ static void job_cancel(int signum, void *arg)
  * @param me the description of the thread to use
  * TODO: how are timeout handled when reentering?
  */
-static void thread_run(struct thread *me)
+static void thread_run(volatile struct thread *me)
 {
        struct thread **prv;
        struct job *job;
+       struct events *events;
 
        /* initialize description of itself and link it in the list */
        me->tid = pthread_self();
@@ -316,12 +316,13 @@ static void thread_run(struct thread *me)
                current->lowered = 1;
        else
                sig_monitor_init_timeouts();
-       current = me;
+       current = (struct thread*)me;
        me->next = threads;
-       threads = me;
+       threads = (struct thread*)me;
        started++;
 
        /* loop until stopped */
+       me->events = NULL;
        while (!me->stop) {
                /* get a job */
                job = job_get(first_job);
@@ -338,18 +339,25 @@ static void thread_run(struct thread *me)
 
                        /* release the run job */
                        job_release(job);
+
+                       /* release event if any */
+                       events = me->events;
+                       if (events) {
+                               events->runs = 0;
+                               me->events = NULL;
+                       }
                } else {
                        /* no job, check events */
-                       job = job_get(first_events);
-                       if (job) {
+                       events = events_get();
+                       if (events) {
                                /* run the events */
-                               job->blocked = 1;
+                               events->runs = 1;
+                               me->events = events;
                                pthread_mutex_unlock(&mutex);
-                               sig_monitor(job->timeout, job_call, job);
+                               sig_monitor(0, events_call, events);
                                pthread_mutex_lock(&mutex);
-                               job->blocked = 0;
-                               if (job->dropped)
-                                       events_release(job);
+                               events->runs = 0;
+                               me->events = NULL;
                        } else {
                                /* no job and not events */
                                waiting++;
@@ -812,76 +820,6 @@ void jobs_terminate()
        pthread_mutex_unlock(&mutex);
 }
 
-/**
- * Adds the events waiter/dispatcher to the list of events waiters/dispatchers
- * to monitor.
- * @param key     A key to register the events waiter/dispatcher (see
- *                'jobs_del_events')
- * @param timeout Timeout in second of the function or 0 if none
- * @param events  The callback, the first argument is 0 for normal
- *                flow or the signal number when normal flow failed
- * @param closure The closure to give to the callback as secondd argument
- * @return 0 in case of success or -1 in case of error
- */
-int jobs_add_events(void *key, int timeout, void (*events)(int signum, void*), void *closure)
-{
-       struct job *job;
-
-       pthread_mutex_lock(&mutex);
-
-       /* look at an already existsing events for same key */
-       job = events_of_key(key);
-       if (job) {
-               pthread_mutex_unlock(&mutex);
-               ERROR("events of key %p already exist", key);
-               errno = EEXIST;
-               return -1;
-       }
-
-       /* creates the job */
-       job = job_create(key, timeout, (job_cb_t)events, closure, NULL, NULL);
-       if (!job) {
-               pthread_mutex_unlock(&mutex);
-               ERROR("Can't create events, out of memory");
-               errno = ENOMEM;
-               return -1;
-       }
-
-       /* adds the loop */
-       job->next = first_events;
-       first_events = job;
-
-       /* signal the loop */
-       if (waiting)
-               pthread_cond_signal(&cond);
-       pthread_mutex_unlock(&mutex);
-       return 0;
-}
-
-/**
- * Removes the events of 'key'
- * @param key The key of the events to remove
- * @return 0 in case of success or -1 in case of error
- */
-int jobs_del_events(void *key)
-{
-       struct job *job;
-
-       pthread_mutex_lock(&mutex);
-       job = events_of_key(key);
-       if (job)
-               if (job->blocked)
-                       job->dropped = 1;
-               else
-                       events_release(job);
-       pthread_mutex_unlock(&mutex);
-       if (!job) {
-               ERROR("events of key %p not found", key);
-               errno = ENOENT;
-       }
-       return -!job;
-}
-
 /**
  * Adds the current thread to the pool of threads
  * processing the jobs. Returns normally when the threads are
@@ -910,3 +848,66 @@ int jobs_add_me()
 }
 
 
+struct sd_event *jobs_get_sd_event()
+{
+       struct events *events;
+       struct thread *me;
+       int rc;
+
+       pthread_mutex_lock(&mutex);
+
+       /* search events on stack */
+       me = current;
+       while (me && !me->events)
+               me = me->upper;
+       if (me)
+               /* return the stacked events */
+               events = me->events;
+       else {
+               /* search an available events */
+               events = events_get();
+               if (!events) {
+                       /* not found, check if creation possible */
+                       if (nevents >= allowed) {
+                               ERROR("not possible to add a new event");
+                               events = NULL;
+                       } else {
+                               events = malloc(sizeof *events);
+                               if (events && (rc = sd_event_new(&events->event)) >= 0) {
+                                       if (nevents < started || start_one_thread() >= 0) {
+                                               events->runs = 0;
+                                               events->next = first_events;
+                                               first_events = events;
+                                       } else {
+                                               ERROR("can't start thread for events");
+                                               sd_event_unref(events->event);
+                                               free(events);
+                                               events = NULL;
+                                       }
+                               } else {
+                                       if (!events)
+                                               ERROR("out of memory");
+                                       else {
+                                               free(events);
+                                               ERROR("creation of sd_event failed: %m");
+                                               events = NULL;
+                                               errno = -rc;
+                                       } 
+                               }
+                       }
+               }
+               if (events) {
+                       /* */
+                       me = current;
+                       if (me) {
+                               events->runs = 1;
+                               me->events = events;
+                       } else {
+                               WARNING("event returned for unknown thread!");
+                       }
+               }
+       }
+       pthread_mutex_unlock(&mutex);
+       return events ? events->event : NULL;
+}
+