Make main thread used for common jobs
[src/app-framework-binder.git] / src / jobs.c
index c4f3224..1be6ec7 100644 (file)
 #include "sig-monitor.h"
 #include "verbose.h"
 
-/* control of threads */
+/** control of threads */
 struct thread
 {
-       pthread_t tid;     /* the thread id */
-       unsigned stop: 1;  /* stop request */
-       unsigned ended: 1; /* ended status */
-       unsigned works: 1; /* is it processing a job? */
+       struct thread *next; /**< next thread of the list */
+       pthread_t tid;     /**< the thread id */
+       unsigned stop: 1;  /**< stop request */
 };
 
 /* describes pending job */
@@ -56,42 +55,54 @@ struct job
 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
 static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
 
-/* queue of pending jobs */
-static struct job *first_job = NULL;
-
 /* count allowed, started and running threads */
-static int allowed = 0;
-static int started = 0;
-static int running = 0;
-static int remains = 0;
+static int allowed = 0; /** allowed count of threads */
+static int started = 0; /** started count of threads */
+static int running = 0; /** running count of threads */
+static int remains = 0; /** remaining count of jobs that can be created */
 
 /* list of threads */
-static struct thread *threads = NULL;
+static struct thread *threads;
 
-/* add the job to the list */
+/* queue of pending jobs */
+static struct job *first_job;
+static struct job *first_evloop;
+static struct job *free_jobs;
+
+/**
+ * Adds the 'job' at the end of the list of jobs, marking it
+ * as blocked if an other job with the same group is pending.
+ * @param job the job to add
+ */
 static inline void job_add(struct job *job)
 {
-       void *group = job->group;
+       void *group;
        struct job *ijob, **pjob;
 
        pjob = &first_job;
        ijob = first_job;
-       group = job->group ? : job;
+       group = job->group ? : (void*)(intptr_t)1;
        while (ijob) {
                if (ijob->group == group)
                        job->blocked = 1;
                pjob = &ijob->next;
                ijob = ijob->next;
        }
-       *pjob = job;
        job->next = NULL;
+       *pjob = job;
        remains--;
 }
 
-/* get the next job to process or NULL if none */
+/**
+ * Get the next job to process or NULL if none.
+ * The returned job if any is removed from the list of
+ * jobs.
+ * @return the job to process
+ */
 static inline struct job *job_get()
 {
        struct job *job, **pjob;
+
        pjob = &first_job;
        job = first_job;
        while (job && job->blocked) {
@@ -105,7 +116,10 @@ static inline struct job *job_get()
        return job;
 }
 
-/* unblock a group of job */
+/**
+ * Unblock the first pending job of a group (if any)
+ * @param group the group to unblock
+ */
 static inline void job_unblock(void *group)
 {
        struct job *job;
@@ -120,74 +134,140 @@ static inline void job_unblock(void *group)
        }
 }
 
-/* call the job */
-static inline void job_call(int signum, void *arg)
+static struct job *job_create(
+               void *group,
+               int timeout,
+               void (*callback)(int, void*, void *, void*),
+               void *arg1,
+               void *arg2,
+               void *arg3)
+{
+       struct job *job;
+
+       /* allocates the job */
+       job = free_jobs;
+       if (!job) {
+               pthread_mutex_unlock(&mutex);
+               job = malloc(sizeof *job);
+               pthread_mutex_lock(&mutex);
+               if (!job) {
+                       errno = -ENOMEM;
+                       goto end;
+               }
+       }
+       job->group = group;
+       job->timeout = timeout;
+       job->callback = callback;
+       job->arg1 = arg1;
+       job->arg2 = arg2;
+       job->arg3 = arg3;
+       job->blocked = 0;
+end:
+       return job;
+}
+
+static inline void job_destroy(struct job *job)
+{
+       job->next = free_jobs;
+       free_jobs = job;
+}
+
+static inline void job_release(struct job *job)
+{
+       if (job->group)
+               job_unblock(job->group);
+       job_destroy(job);
+}
+
+/** monitored call to the job */
+static void job_call(int signum, void *arg)
 {
        struct job *job = arg;
        job->callback(signum, job->arg1, job->arg2, job->arg3);
 }
 
-/* cancel the job */
-static inline void job_cancel(int signum, void *arg)
+/** monitored cancel of the job */
+static void job_cancel(int signum, void *arg)
 {
-       struct job *job = arg;
-       job->callback(SIGABRT, job->arg1, job->arg2, job->arg3);
+       job_call(SIGABRT, arg);
 }
 
 /* main loop of processing threads */
 static void *thread_main_loop(void *data)
 {
-       struct thread *me = data;
+       struct thread me, **prv;
        struct job *job;
 
-       me->works = 0;
-       me->ended = 0;
+       /* init */
+       me.tid = pthread_self();
+       me.stop = 0;
        sig_monitor_init_timeouts();
+
+       /* chain in */
        pthread_mutex_lock(&mutex);
-       while (!me->stop) {
+       me.next = threads;
+       threads = &me;
+
+       /* loop until stopped */
+       running++;
+       while (!me.stop) {
                /* get a job */
                job = job_get();
-               if (job == NULL && first_job != NULL && running == 0) {
+               if (!job && first_job && running == 0) {
                        /* sad situation!! should not happen */
                        ERROR("threads are blocked!");
                        job = first_job;
                        first_job = job->next;
                }
-               if (job == NULL) {
-                       /* no job... */
-                       pthread_cond_wait(&cond, &mutex);
-               } else {
+               if (job) {
                        /* run the job */
-                       running++;
-                       me->works = 1;
                        pthread_mutex_unlock(&mutex);
                        sig_monitor(job->timeout, job_call, job);
                        pthread_mutex_lock(&mutex);
-                       me->works = 0;
-                       running--;
-                       if (job->group != NULL)
-                               job_unblock(job->group);
-                       free(job);
+                       job_release(job);
+               } else {
+                       /* no job, check evloop */
+                       job = first_evloop;
+                       if (job) {
+                               /* evloop */
+                               first_evloop = job->next;
+                               pthread_mutex_unlock(&mutex);
+                               sig_monitor(job->timeout, job_call, job);
+                               pthread_mutex_lock(&mutex);
+                               job->next = first_evloop;
+                               first_evloop = job;
+                       } else {
+                               /* no job and not evloop */
+                               running--;
+                               pthread_cond_wait(&cond, &mutex);
+                               running++;
+                       }
                }
-
        }
-       me->ended = 1;
+       running--;
+
+       /* chain out */
+       prv = &threads;
+       while (*prv != &me)
+               prv = &(*prv)->next;
+       *prv = me.next;
        pthread_mutex_unlock(&mutex);
+
+       /* uninit and terminate */
        sig_monitor_clean_timeouts();
-       return me;
+       return NULL;
 }
 
 /* start a new thread */
 static int start_one_thread()
 {
-       struct thread *t;
+       pthread_t tid;
        int rc;
 
        assert(started < allowed);
 
-       t = &threads[started++];
-       t->stop = 0;
-       rc = pthread_create(&t->tid, NULL, thread_main_loop, t);
+       started++;
+       rc = pthread_create(&tid, NULL, thread_main_loop, NULL);
        if (rc != 0) {
                started--;
                errno = rc;
@@ -229,16 +309,17 @@ int jobs_queue3(
        struct job *job;
        int rc;
 
+       pthread_mutex_lock(&mutex);
+
        /* allocates the job */
-       job = malloc(sizeof *job);
-       if (job == NULL) {
+       job = job_create(group, timeout, callback, arg1, arg2, arg3);
+       if (!job) {
                errno = ENOMEM;
                info = "out of memory";
                goto error;
        }
 
        /* start a thread if needed */
-       pthread_mutex_lock(&mutex);
        if (remains == 0) {
                errno = EBUSY;
                info = "too many jobs";
@@ -253,14 +334,7 @@ int jobs_queue3(
                }
        }
 
-       /* fills and queues the job */
-       job->group = group;
-       job->timeout = timeout;
-       job->callback = callback;
-       job->arg1 = arg1;
-       job->arg2 = arg2;
-       job->arg3 = arg3;
-       job->blocked = 0;
+       /* queues the job */
        job_add(job);
        pthread_mutex_unlock(&mutex);
 
@@ -269,23 +343,16 @@ int jobs_queue3(
        return 0;
 
 error2:
-       pthread_mutex_unlock(&mutex);
-       free(job);
+       job_destroy(job);
 error:
        ERROR("can't process job with threads: %s, %m", info);
+       pthread_mutex_unlock(&mutex);
        return -1;
 }
 
 /* initialise the threads */
 int jobs_init(int allowed_count, int start_count, int waiter_count)
 {
-       threads = calloc(allowed_count, sizeof *threads);
-       if (threads == NULL) {
-               errno = ENOMEM;
-               ERROR("can't allocate threads");
-               return -1;
-       }
-
        /* records the allowed count */
        allowed = allowed_count;
        started = 0;
@@ -304,34 +371,31 @@ int jobs_init(int allowed_count, int start_count, int waiter_count)
 /* terminate all the threads and all pending requests */
 void jobs_terminate()
 {
-       int i, n;
        struct job *job;
+       pthread_t me, other;
+       struct thread *t;
+
+       /* how am i? */
+       me = pthread_self();
 
        /* request all threads to stop */
        pthread_mutex_lock(&mutex);
        allowed = 0;
-       n = started;
-       for (i = 0 ; i < n ; i++)
-               threads[i].stop = 1;
-
-       /* wait until all thread are terminated */
-       while (started != 0) {
-               /* signal threads */
+       for(;;) {
+               /* search the next thread to stop */
+               t = threads;
+               while (t && pthread_equal(t->tid, me))
+                       t = t->next;
+               if (!t)
+                       break;
+               /* stop it */
+               other = t->tid;
+               t->stop = 1;
                pthread_mutex_unlock(&mutex);
                pthread_cond_broadcast(&cond);
+               pthread_join(other, NULL);
                pthread_mutex_lock(&mutex);
-
-               /* join the terminated threads */
-               for (i = 0 ; i < n ; i++) {
-                       if (threads[i].tid && threads[i].ended) {
-                               pthread_join(threads[i].tid, NULL);
-                               threads[i].tid = 0;
-                               started--;
-                       }
-               }
        }
-       pthread_mutex_unlock(&mutex);
-       free(threads);
 
        /* cancel pending jobs */
        while (first_job) {
@@ -342,3 +406,57 @@ void jobs_terminate()
        }
 }
 
+int jobs_add_event_loop(void *key, int timeout, void (*evloop)(int signum, void*), void *closure)
+{
+       struct job *job;
+
+       pthread_mutex_lock(&mutex);
+       job = job_create(key, timeout, (void (*)(int,  void *, void *, void *))evloop, closure, NULL, NULL);
+       if (job) {
+               /* adds the loop */
+               job->next = first_evloop;
+               first_evloop = job;
+
+               /* signal the loop */
+               pthread_cond_signal(&cond);
+       }
+       pthread_mutex_unlock(&mutex);
+       return -!job;
+}
+
+int jobs_add_me()
+{
+       pthread_t me;
+       struct thread *t;
+
+       /* how am i? */
+       me = pthread_self();
+
+       /* request all threads to stop */
+       pthread_mutex_lock(&mutex);
+       t = threads;
+       while (t) {
+               if (pthread_equal(t->tid, me)) {
+                       pthread_mutex_unlock(&mutex);
+                       ERROR("thread already running");
+                       errno = EINVAL;
+                       return -1;
+               }
+               t = t->next;
+       }
+
+       /* allowed... */
+       allowed++;
+       pthread_mutex_unlock(&mutex);
+
+       /* run */
+       thread_main_loop(NULL);
+
+       /* returns */
+       pthread_mutex_lock(&mutex);
+       allowed--;
+       pthread_mutex_unlock(&mutex);
+       return 0;
+}
+
+