X-Git-Url: https://gerrit.automotivelinux.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fjobs.c;fp=src%2Fjobs.c;h=936c6f1de93e827392af72d2d8069b84972a9bb4;hb=79d63b68dd5f4047a49ac178094afb2c206c167f;hp=83ca2ed4e11a23321837f96c6bd47efc36b86278;hpb=81f9cf2f0ca3ec74ffeb3092231d85023013315d;p=src%2Fapp-framework-binder.git diff --git a/src/jobs.c b/src/jobs.c index 83ca2ed4..936c6f1d 100644 --- a/src/jobs.c +++ b/src/jobs.c @@ -68,6 +68,7 @@ struct thread pthread_t tid; /**< the thread id */ volatile unsigned stop: 1; /**< stop requested */ volatile unsigned waits: 1; /**< is waiting? */ + volatile unsigned leaved: 1; /**< was leaved? */ }; /** @@ -84,7 +85,6 @@ struct sync void *arg; /**< the argument of the callback */ }; - /* synchronisation of threads */ static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; @@ -106,6 +106,8 @@ static struct job *free_jobs; /* event loop */ static struct evmgr *evmgr; +static void (*exit_handler)(); + /** * Create a new job with the given parameters * @param group the group of the job @@ -236,6 +238,7 @@ static inline void job_release(struct job *job) * flow, isn't used * @param arg the job to run */ +__attribute__((unused)) static void job_cancel(int signum, void *arg) { struct job *job = arg; @@ -319,6 +322,7 @@ static void thread_enter(volatile struct thread *me) me->tid = pthread_self(); me->stop = 0; me->waits = 0; + me->leaved = 0; me->nholder = 0; me->upper = current_thread; me->next = threads; @@ -485,13 +489,15 @@ static int start_one_thread() * The remaining parameter is the parameter 'arg1' * given here. * @param arg The second argument for 'callback' + * @param start Allow to start a thread if not zero * @return 0 in case of success or -1 in case of error */ -int jobs_queue( +static int queue_job( const void *group, int timeout, void (*callback)(int, void*), - void *arg) + void *arg, + int start) { struct job *job; int rc; @@ -511,7 +517,7 @@ int jobs_queue( } /* start a thread if needed */ - if (running == started && started < allowed) { + if (start && running == started && started < allowed) { /* all threads are busy and a new can be started */ rc = start_one_thread(); if (rc < 0 && started == 0) { @@ -536,6 +542,32 @@ error: return -1; } +/** + * Queues a new asynchronous job represented by 'callback' and 'arg' + * for the 'group' and the 'timeout'. + * Jobs are queued FIFO and are possibly executed in parallel + * concurrently except for job of the same group that are + * executed sequentially in FIFO order. + * @param group The group of the job or NULL when no group. + * @param timeout The maximum execution time in seconds of the job + * or 0 for unlimited time. + * @param callback The function to execute for achieving the job. + * Its first parameter is either 0 on normal flow + * or the signal number that broke the normal flow. + * The remaining parameter is the parameter 'arg1' + * given here. + * @param arg The second argument for 'callback' + * @return 0 in case of success or -1 in case of error + */ +int jobs_queue( + const void *group, + int timeout, + void (*callback)(int, void*), + void *arg) +{ + return queue_job(group, timeout, callback, arg, 1); +} + /** * Internal helper function for 'jobs_enter'. * @see jobs_enter, jobs_leave @@ -590,7 +622,10 @@ static int do_sync( else thread_run_external(&sync->thread); pthread_mutex_unlock(&mutex); - return 0; + if (sync->thread.leaved) + return 0; + errno = EINTR; + return -1; } /** @@ -638,6 +673,7 @@ int jobs_leave(struct jobloop *jobloop) if (!t) { errno = EINVAL; } else { + t->leaved = 1; t->stop = 1; if (t->waits) pthread_cond_broadcast(&cond); @@ -776,44 +812,23 @@ int jobs_start(int allowed_count, int start_count, int waiter_count, void (*star rc = 0; error: pthread_mutex_unlock(&mutex); + if (exit_handler) + exit_handler(); return rc; } /** - * Terminate all the threads and cancel all pending jobs. + * Exit jobs threads and call handler if not NULL. */ -void jobs_terminate() +void jobs_exit(void (*handler)()) { - struct job *job, *head, *tail; - pthread_t me, *others; struct thread *t; - int count; - - /* how am i? */ - me = pthread_self(); /* request all threads to stop */ pthread_mutex_lock(&mutex); - allowed = 0; - - /* count the number of threads */ - count = 0; - t = threads; - while (t) { - if (!t->upper && !pthread_equal(t->tid, me)) - count++; - t = t->next; - } - /* fill the array of threads */ - others = alloca(count * sizeof *others); - count = 0; - t = threads; - while (t) { - if (!t->upper && !pthread_equal(t->tid, me)) - others[count++] = t->tid; - t = t->next; - } + /* set the handler */ + exit_handler = handler; /* stops the threads */ t = threads; @@ -824,41 +839,7 @@ void jobs_terminate() /* wait the threads */ pthread_cond_broadcast(&cond); - pthread_mutex_unlock(&mutex); - while (count) - pthread_join(others[--count], NULL); - pthread_mutex_lock(&mutex); - /* cancel pending jobs of other threads */ - remains = 0; - head = first_job; - first_job = NULL; - tail = NULL; - while (head) { - /* unlink the job */ - job = head; - head = job->next; - - /* search if job is stacked for current */ - t = current_thread; - while (t && t->job != job) - t = t->upper; - if (t) { - /* yes, relink it at end */ - if (tail) - tail->next = job; - else - first_job = job; - tail = job; - job->next = NULL; - } else { - /* no cancel the job */ - pthread_mutex_unlock(&mutex); - sig_monitor(0, job_cancel, job); - free(job); - pthread_mutex_lock(&mutex); - } - } + /* leave */ pthread_mutex_unlock(&mutex); } -