jobs: optimized version of fix starve
[src/app-framework-binder.git] / src / jobs.c
index cf1bb55..ed812f8 100644 (file)
@@ -18,6 +18,7 @@
 #define _GNU_SOURCE
 
 #include <stdlib.h>
+#include <stdint.h>
 #include <unistd.h>
 #include <signal.h>
 #include <time.h>
 #define sig_monitor(to,cb,arg)       (cb(0,arg))
 #endif
 
+#define EVENT_TIMEOUT_TOP      ((uint64_t)-1)
+#define EVENT_TIMEOUT_CHILD    ((uint64_t)10000)
+
 /** Internal shortcut for callback */
-typedef void (*job_cb_t)(int, void*, void *, void*);
+typedef void (*job_cb_t)(int, void*);
 
 /** Description of a pending job */
 struct job
@@ -48,9 +52,7 @@ struct job
        struct job *next;    /**< link to the next job enqueued */
        void *group;         /**< group of the request */
        job_cb_t callback;   /**< processing callback */
-       void *arg1;          /**< first arg */
-       void *arg2;          /**< second arg */
-       void *arg3;          /**< third arg */
+       void *arg;           /**< argument */
        int timeout;         /**< timeout in second for processing the request */
        unsigned blocked: 1; /**< is an other request blocking this one ? */
        unsigned dropped: 1; /**< is removed ? */
@@ -61,6 +63,8 @@ struct events
 {
        struct events *next;
        struct sd_event *event;
+       uint64_t timeout;
+       unsigned used: 1;
        unsigned runs: 1;
 };
 
@@ -77,6 +81,21 @@ struct thread
        unsigned waits: 1;     /**< is waiting? */
 };
 
+/**
+ * Description of synchonous callback
+ */
+struct sync
+{
+       struct thread thread;   /**< thread loop data */
+       union {
+               void (*callback)(int, void*);   /**< the synchronous callback */
+               void (*enter)(int signum, void *closure, struct jobloop *jobloop);
+                               /**< the entering synchronous routine */
+       };
+       void *arg;              /**< the argument of the callback */
+};
+
+
 /* synchronisation of threads */
 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
 static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
@@ -102,18 +121,14 @@ static struct job *free_jobs;
  * @param group    the group of the job
  * @param timeout  the timeout of the job (0 if none)
  * @param callback the function that achieves the job
- * @param arg1     the first argument of the callback
- * @param arg2     the second argument of the callback
- * @param arg3     the third argument of the callback
+ * @param arg      the argument of the callback
  * @return the created job unblock or NULL when no more memory
  */
 static struct job *job_create(
                void *group,
                int timeout,
                job_cb_t callback,
-               void *arg1,
-               void *arg2,
-               void *arg3)
+               void *arg)
 {
        struct job *job;
 
@@ -135,9 +150,7 @@ static struct job *job_create(
        job->group = group;
        job->timeout = timeout;
        job->callback = callback;
-       job->arg1 = arg1;
-       job->arg2 = arg2;
-       job->arg3 = arg3;
+       job->arg = arg;
        job->blocked = 0;
        job->dropped = 0;
 end:
@@ -191,7 +204,7 @@ static inline struct job *job_get()
 static inline struct events *events_get()
 {
        struct events *events = first_events;
-       while (events && events->runs)
+       while (events && events->used)
                events = events->next;
        return events;
 }
@@ -231,22 +244,6 @@ static inline void job_release(struct job *job)
        free_jobs = job;
 }
 
-/**
- * Monitored normal callback for a job.
- * This function is called by the monitor
- * to run the job when the safe environment
- * is set.
- * @param signum 0 on normal flow or the number
- *               of the signal that interrupted the normal
- *               flow
- * @param arg     the job to run
- */
-static void job_call(int signum, void *arg)
-{
-       struct job *job = arg;
-       job->callback(signum, job->arg1, job->arg2, job->arg3);
-}
-
 /**
  * Monitored cancel callback for a job.
  * This function is called by the monitor
@@ -259,7 +256,8 @@ static void job_call(int signum, void *arg)
  */
 static void job_cancel(int signum, void *arg)
 {
-       job_call(SIGABRT, arg);
+       struct job *job = arg;
+       job->callback(SIGABRT, job->arg);
 }
 
 /**
@@ -276,7 +274,7 @@ static void events_call(int signum, void *arg)
 {
        struct events *events = arg;
        if (!signum)
-               sd_event_run(events->event, (uint64_t) -1);
+               sd_event_run(events->event, events->timeout);
 }
 
 /**
@@ -288,9 +286,10 @@ static void events_call(int signum, void *arg)
  */
 static void thread_run(volatile struct thread *me)
 {
-       struct thread **prv;
+       struct thread **prv, *thr;
        struct job *job;
        struct events *events;
+       uint64_t evto;
 
        /* initialize description of itself and link it in the list */
        me->tid = pthread_self();
@@ -298,19 +297,21 @@ static void thread_run(volatile struct thread *me)
        me->lowered = 0;
        me->waits = 0;
        me->upper = current;
-       if (current)
+       if (current) {
                current->lowered = 1;
-       else
+               evto = EVENT_TIMEOUT_CHILD;
+               me->events = current->events;
+       } else {
+               started++;
                sig_monitor_init_timeouts();
-       current = (struct thread*)me;
+               evto = EVENT_TIMEOUT_TOP;
+               me->events = NULL;
+       }
        me->next = threads;
        threads = (struct thread*)me;
-       started++;
-
-       NOTICE("job thread starting %d(/%d) %s", started, allowed, me->upper ? "child" : "parent");
+       current = (struct thread*)me;
 
        /* loop until stopped */
-       me->events = NULL;
        while (!me->stop) {
                /* get a job */
                job = job_get(first_job);
@@ -322,7 +323,7 @@ static void thread_run(volatile struct thread *me)
 
                        /* run the job */
                        pthread_mutex_unlock(&mutex);
-                       sig_monitor(job->timeout, job_call, job);
+                       sig_monitor(job->timeout, job->callback, job->arg);
                        pthread_mutex_lock(&mutex);
 
                        /* release the run job */
@@ -331,21 +332,31 @@ static void thread_run(volatile struct thread *me)
                        /* release event if any */
                        events = me->events;
                        if (events) {
-                               events->runs = 0;
+                               events->used = 0;
                                me->events = NULL;
                        }
                } else {
                        /* no job, check events */
-                       events = events_get();
+                       events = me->events;
+                       if (!events || events->runs)
+                               events = events_get();
                        if (events) {
                                /* run the events */
+                               events->used = 1;
                                events->runs = 1;
+                               events->timeout = evto;
                                me->events = events;
                                pthread_mutex_unlock(&mutex);
                                sig_monitor(0, events_call, events);
                                pthread_mutex_lock(&mutex);
+                               events->used = 0;
                                events->runs = 0;
                                me->events = NULL;
+                               thr = me->upper;
+                               while (thr && thr->events == events) {
+                                       thr->events = NULL;
+                                       thr = thr->upper;
+                               }
                        } else {
                                /* no job and not events */
                                waiting++;
@@ -356,19 +367,19 @@ static void thread_run(volatile struct thread *me)
                        }
                }
        }
-       NOTICE("job thread stoping %d(/%d) %s", started, allowed, me->upper ? "child" : "parent");
 
        /* unlink the current thread and cleanup */
-       started--;
        prv = &threads;
        while (*prv != me)
                prv = &(*prv)->next;
        *prv = me->next;
        current = me->upper;
-       if (current)
+       if (current) {
                current->lowered = 0;
-       else
+       } else {
                sig_monitor_clean_timeouts();
+               started--;
+       }
 }
 
 /**
@@ -405,29 +416,7 @@ static int start_one_thread()
 }
 
 /**
- * Queues a new asynchronous job represented by 'callback'
- * for the 'group' and the 'timeout'.
- * Jobs are queued FIFO and are possibly executed in parallel
- * concurrently except for job of the same group that are
- * executed sequentially in FIFO order.
- * @param group    The group of the job or NULL when no group.
- * @param timeout  The maximum execution time in seconds of the job
- *                 or 0 for unlimited time.
- * @param callback The function to execute for achieving the job.
- *                 Its first parameter is either 0 on normal flow
- *                 or the signal number that broke the normal flow.
- * @return 0 in case of success or -1 in case of error
- */
-int jobs_queue0(
-               void *group,
-               int timeout,
-               void (*callback)(int signum))
-{
-       return jobs_queue3(group, timeout, (job_cb_t)callback, NULL, NULL, NULL);
-}
-
-/**
- * Queues a new asynchronous job represented by 'callback' and 'arg1'
+ * Queues a new asynchronous job represented by 'callback' and 'arg'
  * for the 'group' and the 'timeout'.
  * Jobs are queued FIFO and are possibly executed in parallel
  * concurrently except for job of the same group that are
@@ -440,7 +429,7 @@ int jobs_queue0(
  *                 or the signal number that broke the normal flow.
  *                 The remaining parameter is the parameter 'arg1'
  *                 given here.
- * @param arg1     The second argument for 'callback'
+ * @param arg      The second argument for 'callback'
  * @return 0 in case of success or -1 in case of error
  */
 int jobs_queue(
@@ -448,64 +437,6 @@ int jobs_queue(
                int timeout,
                void (*callback)(int, void*),
                void *arg)
-{
-       return jobs_queue3(group, timeout, (job_cb_t)callback, arg, NULL, NULL);
-}
-
-/**
- * Queues a new asynchronous job represented by 'callback' and 'arg[12]'
- * for the 'group' and the 'timeout'.
- * Jobs are queued FIFO and are possibly executed in parallel
- * concurrently except for job of the same group that are
- * executed sequentially in FIFO order.
- * @param group    The group of the job or NULL when no group.
- * @param timeout  The maximum execution time in seconds of the job
- *                 or 0 for unlimited time.
- * @param callback The function to execute for achieving the job.
- *                 Its first parameter is either 0 on normal flow
- *                 or the signal number that broke the normal flow.
- *                 The remaining parameters are the parameters 'arg[12]'
- *                 given here.
- * @param arg1     The second argument for 'callback'
- * @param arg2     The third argument for 'callback'
- * @return 0 in case of success or -1 in case of error
- */
-int jobs_queue2(
-               void *group,
-               int timeout,
-               void (*callback)(int, void*, void*),
-               void *arg1,
-               void *arg2)
-{
-       return jobs_queue3(group, timeout, (job_cb_t)callback, arg1, arg2, NULL);
-}
-
-/**
- * Queues a new asynchronous job represented by 'callback' and 'arg[123]'
- * for the 'group' and the 'timeout'.
- * Jobs are queued FIFO and are possibly executed in parallel
- * concurrently except for job of the same group that are
- * executed sequentially in FIFO order.
- * @param group    The group of the job or NULL when no group.
- * @param timeout  The maximum execution time in seconds of the job
- *                 or 0 for unlimited time.
- * @param callback The function to execute for achieving the job.
- *                 Its first parameter is either 0 on normal flow
- *                 or the signal number that broke the normal flow.
- *                 The remaining parameters are the parameters 'arg[123]'
- *                 given here.
- * @param arg1     The second argument for 'callback'
- * @param arg2     The third argument for 'callback'
- * @param arg3     The forth argument for 'callback'
- * @return 0 in case of success or -1 in case of error
- */
-int jobs_queue3(
-               void *group,
-               int timeout,
-               void (*callback)(int, void*, void *, void*),
-               void *arg1,
-               void *arg2,
-               void *arg3)
 {
        const char *info;
        struct job *job;
@@ -514,7 +445,7 @@ int jobs_queue3(
        pthread_mutex_lock(&mutex);
 
        /* allocates the job */
-       job = job_create(group, timeout, callback, arg1, arg2, arg3);
+       job = job_create(group, timeout, callback, arg);
        if (!job) {
                errno = ENOMEM;
                info = "out of memory";
@@ -557,23 +488,45 @@ error:
 }
 
 /**
- * Enter a synchronisation point: activates the job given by 'callback'
- * @param group the gro
+ * Internal helper function for 'jobs_enter'.
+ * @see jobs_enter, jobs_leave
  */
-int jobs_enter(
+static void enter_cb(int signum, void *closure)
+{
+       struct sync *sync = closure;
+       sync->enter(signum, sync->arg, (void*)&sync->thread);
+}
+
+/**
+ * Internal helper function for 'jobs_call'.
+ * @see jobs_call
+ */
+static void call_cb(int signum, void *closure)
+{
+       struct sync *sync = closure;
+       sync->callback(signum, sync->arg);
+       jobs_leave((void*)&sync->thread);
+}
+
+/**
+ * Internal helper for synchronous jobs. It enters
+ * a new thread loop for evaluating the given job
+ * as recorded by the couple 'sync_cb' and 'sync'.
+ * @see jobs_call, jobs_enter, jobs_leave
+ */
+static int do_sync(
                void *group,
                int timeout,
-               void (*callback)(int signum, void *closure, struct jobloop *jobloop),
-               void *closure)
+               void (*sync_cb)(int signum, void *closure),
+               struct sync *sync
+)
 {
-       
        struct job *job;
-       struct thread me;
 
        pthread_mutex_lock(&mutex);
 
        /* allocates the job */
-       job = job_create(group, timeout, (job_cb_t)callback, closure, &me, NULL);
+       job = job_create(group, timeout, sync_cb, sync);
        if (!job) {
                ERROR("out of memory");
                errno = ENOMEM;
@@ -585,16 +538,50 @@ int jobs_enter(
        job_add(job);
 
        /* run until stopped */
-       thread_run(&me);
+       thread_run(&sync->thread);
        pthread_mutex_unlock(&mutex);
        return 0;
 }
 
+/**
+ * Enter a synchronisation point: activates the job given by 'callback'
+ * and 'closure' using 'group' and 'timeout' to control sequencing and
+ * execution time.
+ * @param group the group for sequencing jobs
+ * @param timeout the time in seconds allocated to the job
+ * @param callback the callback that will handle the job.
+ *                 it receives 3 parameters: 'signum' that will be 0
+ *                 on normal flow or the catched signal number in case
+ *                 of interrupted flow, the context 'closure' as given and
+ *                 a 'jobloop' reference that must be used when the job is
+ *                 terminated to unlock the current execution flow.
+ * @param arg the argument to the callback
+ * @return 0 on success or -1 in case of error
+ */
+int jobs_enter(
+               void *group,
+               int timeout,
+               void (*callback)(int signum, void *closure, struct jobloop *jobloop),
+               void *closure
+)
+{
+       struct sync sync;
+
+       sync.enter = callback;
+       sync.arg = closure;
+       return do_sync(group, timeout, enter_cb, &sync);
+}
+
+/**
+ * Unlocks the execution flow designed by 'jobloop'.
+ * @param jobloop indication of the flow to unlock
+ * @return 0 in case of success of -1 on error
+ */
 int jobs_leave(struct jobloop *jobloop)
 {
        struct thread *t;
-       pthread_mutex_lock(&mutex);
 
+       pthread_mutex_lock(&mutex);
        t = threads;
        while (t && t != (struct thread*)jobloop)
                t = t->next;
@@ -609,6 +596,34 @@ int jobs_leave(struct jobloop *jobloop)
        return -!t;
 }
 
+/**
+ * Calls synchronously the job represented by 'callback' and 'arg1'
+ * for the 'group' and the 'timeout' and waits for its completion.
+ * @param group    The group of the job or NULL when no group.
+ * @param timeout  The maximum execution time in seconds of the job
+ *                 or 0 for unlimited time.
+ * @param callback The function to execute for achieving the job.
+ *                 Its first parameter is either 0 on normal flow
+ *                 or the signal number that broke the normal flow.
+ *                 The remaining parameter is the parameter 'arg1'
+ *                 given here.
+ * @param arg      The second argument for 'callback'
+ * @return 0 in case of success or -1 in case of error
+ */
+int jobs_call(
+               void *group,
+               int timeout,
+               void (*callback)(int, void*),
+               void *arg)
+{
+       struct sync sync;
+
+       sync.callback = callback;
+       sync.arg = arg;
+
+       return do_sync(group, timeout, call_cb, &sync);
+}
+
 /**
  * Gets a sd_event item for the current thread.
  * @return a sd_event or NULL in case of error
@@ -640,6 +655,7 @@ struct sd_event *jobs_get_sd_event()
                                events = malloc(sizeof *events);
                                if (events && (rc = sd_event_new(&events->event)) >= 0) {
                                        if (nevents < started || start_one_thread() >= 0) {
+                                               events->used = 0;
                                                events->runs = 0;
                                                events->next = first_events;
                                                first_events = events;
@@ -663,10 +679,9 @@ struct sd_event *jobs_get_sd_event()
                        }
                }
                if (events) {
-                       /* */
                        me = current;
                        if (me) {
-                               events->runs = 1;
+                               events->used = 1;
                                me->events = events;
                        } else {
                                WARNING("event returned for unknown thread!");
@@ -685,7 +700,7 @@ struct sd_event *jobs_get_sd_event()
  * @param start         The start routine to activate (can't be NULL)
  * @return 0 in case of success or -1 in case of error.
  */
-int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)())
+int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)(int signum))
 {
        int rc, launched;
        struct thread me;
@@ -729,7 +744,7 @@ int jobs_start(int allowed_count, int start_count, int waiter_count, void (*star
        }
 
        /* queue the start job */
-       job = job_create(NULL, 0, (job_cb_t)start, NULL, NULL, NULL);
+       job = job_create(NULL, 0, (job_cb_t)start, NULL);
        if (!job) {
                ERROR("out of memory");
                errno = ENOMEM;