X-Git-Url: https://gerrit.automotivelinux.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fjobs.c;h=6bbffc8d55d25fcdf90f1f373768044092ec06c0;hb=aeb015becc84a457afd5a5fb82e26dfca0978c83;hp=936c6f1de93e827392af72d2d8069b84972a9bb4;hpb=901a38c28bf3fe7cc3e58e3fad36190fbae585be;p=src%2Fapp-framework-binder.git diff --git a/src/jobs.c b/src/jobs.c index 936c6f1d..6bbffc8d 100644 --- a/src/jobs.c +++ b/src/jobs.c @@ -389,6 +389,7 @@ static void thread_run_internal(volatile struct thread *me) abort(); } /* run the events */ + evmgr_prepare_run(evmgr); pthread_mutex_unlock(&mutex); sig_monitor(0, (void(*)(int,void*))evmgr_job_run, evmgr); pthread_mutex_lock(&mutex); @@ -492,7 +493,7 @@ static int start_one_thread() * @param start Allow to start a thread if not zero * @return 0 in case of success or -1 in case of error */ -static int queue_job( +static int queue_job_internal( const void *group, int timeout, void (*callback)(int, void*), @@ -500,9 +501,14 @@ static int queue_job( int start) { struct job *job; - int rc; + int rc, busy; - pthread_mutex_lock(&mutex); + /* check availability */ + if (remains <= 0) { + ERROR("can't process job with threads: too many jobs"); + errno = EBUSY; + goto error; + } /* allocates the job */ job = job_create(group, timeout, callback, arg); @@ -517,31 +523,68 @@ static int queue_job( } /* start a thread if needed */ - if (start && running == started && started < allowed) { + busy = running == started; + if (start && busy && started < allowed) { /* all threads are busy and a new can be started */ rc = start_one_thread(); if (rc < 0 && started == 0) { ERROR("can't start initial thread: %m"); goto error2; } + busy = 0; } /* queues the job */ job_add(job); - /* signal an existing job */ + /* wakeup an evloop if needed */ + if (busy) + evloop_wakeup(); + pthread_cond_signal(&cond); - pthread_mutex_unlock(&mutex); return 0; error2: job->next = free_jobs; free_jobs = job; error: - pthread_mutex_unlock(&mutex); return -1; } +/** + * Queues a new asynchronous job represented by 'callback' and 'arg' + * for the 'group' and the 'timeout'. + * Jobs are queued FIFO and are possibly executed in parallel + * concurrently except for job of the same group that are + * executed sequentially in FIFO order. + * @param group The group of the job or NULL when no group. + * @param timeout The maximum execution time in seconds of the job + * or 0 for unlimited time. + * @param callback The function to execute for achieving the job. + * Its first parameter is either 0 on normal flow + * or the signal number that broke the normal flow. + * The remaining parameter is the parameter 'arg1' + * given here. + * @param arg The second argument for 'callback' + * @param start Allow to start a thread if not zero + * @return 0 in case of success or -1 in case of error + */ +static int queue_job( + const void *group, + int timeout, + void (*callback)(int, void*), + void *arg, + int start) +{ + int rc; + + pthread_mutex_lock(&mutex); + rc = queue_job_internal(group, timeout, callback, arg, start); + pthread_mutex_unlock(&mutex); + return rc; + +} + /** * Queues a new asynchronous job represented by 'callback' and 'arg' * for the 'group' and the 'timeout'. @@ -602,30 +645,24 @@ static int do_sync( struct sync *sync ) { - struct job *job; + int rc; pthread_mutex_lock(&mutex); - /* allocates the job */ - job = job_create(group, timeout, sync_cb, sync); - if (!job) { - pthread_mutex_unlock(&mutex); - return -1; + rc = queue_job_internal(group, timeout, sync_cb, sync, 1); + if (rc == 0) { + /* run until stopped */ + if (current_thread) + thread_run_internal(&sync->thread); + else + thread_run_external(&sync->thread); + if (!sync->thread.leaved) { + errno = EINTR; + rc = -1; + } } - - /* queues the job */ - job_add(job); - - /* run until stopped */ - if (current_thread) - thread_run_internal(&sync->thread); - else - thread_run_external(&sync->thread); pthread_mutex_unlock(&mutex); - if (sync->thread.leaved) - return 0; - errno = EINTR; - return -1; + return rc; } /** @@ -837,7 +874,8 @@ void jobs_exit(void (*handler)()) t = t->next; } - /* wait the threads */ + /* wake up the threads */ + evloop_wakeup(); pthread_cond_broadcast(&cond); /* leave */