From 901a38c28bf3fe7cc3e58e3fad36190fbae585be Mon Sep 17 00:00:00 2001 From: =?utf8?q?Jos=C3=A9=20Bollo?= Date: Fri, 15 Feb 2019 20:49:54 +0100 Subject: [PATCH] jobs: Refactor exiting jobs MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit The new termination function can allow the restart because it doesn't abort the waiting jobs. So after calling 'jobs_exit', all threads stop. The function 'job_start' returns. The threads that are in blocking state, i.e. in a call to 'jobs_enter' or 'jobs_call' are stopped. An error status -1 with errno=EINTR is returned in that case. But before returning, that function calls the exit handler if any. Change-Id: I85a4b1976b09b18804eb681af940531ae5ace6c3 Signed-off-by: José Bollo --- src/jobs.c | 115 +++++++++++++++++++++----------------------------- src/jobs.h | 3 +- src/main-afb-daemon.c | 19 ++++++--- 3 files changed, 63 insertions(+), 74 deletions(-) diff --git a/src/jobs.c b/src/jobs.c index 83ca2ed4..936c6f1d 100644 --- a/src/jobs.c +++ b/src/jobs.c @@ -68,6 +68,7 @@ struct thread pthread_t tid; /**< the thread id */ volatile unsigned stop: 1; /**< stop requested */ volatile unsigned waits: 1; /**< is waiting? */ + volatile unsigned leaved: 1; /**< was leaved? */ }; /** @@ -84,7 +85,6 @@ struct sync void *arg; /**< the argument of the callback */ }; - /* synchronisation of threads */ static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; @@ -106,6 +106,8 @@ static struct job *free_jobs; /* event loop */ static struct evmgr *evmgr; +static void (*exit_handler)(); + /** * Create a new job with the given parameters * @param group the group of the job @@ -236,6 +238,7 @@ static inline void job_release(struct job *job) * flow, isn't used * @param arg the job to run */ +__attribute__((unused)) static void job_cancel(int signum, void *arg) { struct job *job = arg; @@ -319,6 +322,7 @@ static void thread_enter(volatile struct thread *me) me->tid = pthread_self(); me->stop = 0; me->waits = 0; + me->leaved = 0; me->nholder = 0; me->upper = current_thread; me->next = threads; @@ -485,13 +489,15 @@ static int start_one_thread() * The remaining parameter is the parameter 'arg1' * given here. * @param arg The second argument for 'callback' + * @param start Allow to start a thread if not zero * @return 0 in case of success or -1 in case of error */ -int jobs_queue( +static int queue_job( const void *group, int timeout, void (*callback)(int, void*), - void *arg) + void *arg, + int start) { struct job *job; int rc; @@ -511,7 +517,7 @@ int jobs_queue( } /* start a thread if needed */ - if (running == started && started < allowed) { + if (start && running == started && started < allowed) { /* all threads are busy and a new can be started */ rc = start_one_thread(); if (rc < 0 && started == 0) { @@ -536,6 +542,32 @@ error: return -1; } +/** + * Queues a new asynchronous job represented by 'callback' and 'arg' + * for the 'group' and the 'timeout'. + * Jobs are queued FIFO and are possibly executed in parallel + * concurrently except for job of the same group that are + * executed sequentially in FIFO order. + * @param group The group of the job or NULL when no group. + * @param timeout The maximum execution time in seconds of the job + * or 0 for unlimited time. + * @param callback The function to execute for achieving the job. + * Its first parameter is either 0 on normal flow + * or the signal number that broke the normal flow. + * The remaining parameter is the parameter 'arg1' + * given here. + * @param arg The second argument for 'callback' + * @return 0 in case of success or -1 in case of error + */ +int jobs_queue( + const void *group, + int timeout, + void (*callback)(int, void*), + void *arg) +{ + return queue_job(group, timeout, callback, arg, 1); +} + /** * Internal helper function for 'jobs_enter'. * @see jobs_enter, jobs_leave @@ -590,7 +622,10 @@ static int do_sync( else thread_run_external(&sync->thread); pthread_mutex_unlock(&mutex); - return 0; + if (sync->thread.leaved) + return 0; + errno = EINTR; + return -1; } /** @@ -638,6 +673,7 @@ int jobs_leave(struct jobloop *jobloop) if (!t) { errno = EINVAL; } else { + t->leaved = 1; t->stop = 1; if (t->waits) pthread_cond_broadcast(&cond); @@ -776,44 +812,23 @@ int jobs_start(int allowed_count, int start_count, int waiter_count, void (*star rc = 0; error: pthread_mutex_unlock(&mutex); + if (exit_handler) + exit_handler(); return rc; } /** - * Terminate all the threads and cancel all pending jobs. + * Exit jobs threads and call handler if not NULL. */ -void jobs_terminate() +void jobs_exit(void (*handler)()) { - struct job *job, *head, *tail; - pthread_t me, *others; struct thread *t; - int count; - - /* how am i? */ - me = pthread_self(); /* request all threads to stop */ pthread_mutex_lock(&mutex); - allowed = 0; - - /* count the number of threads */ - count = 0; - t = threads; - while (t) { - if (!t->upper && !pthread_equal(t->tid, me)) - count++; - t = t->next; - } - /* fill the array of threads */ - others = alloca(count * sizeof *others); - count = 0; - t = threads; - while (t) { - if (!t->upper && !pthread_equal(t->tid, me)) - others[count++] = t->tid; - t = t->next; - } + /* set the handler */ + exit_handler = handler; /* stops the threads */ t = threads; @@ -824,41 +839,7 @@ void jobs_terminate() /* wait the threads */ pthread_cond_broadcast(&cond); - pthread_mutex_unlock(&mutex); - while (count) - pthread_join(others[--count], NULL); - pthread_mutex_lock(&mutex); - /* cancel pending jobs of other threads */ - remains = 0; - head = first_job; - first_job = NULL; - tail = NULL; - while (head) { - /* unlink the job */ - job = head; - head = job->next; - - /* search if job is stacked for current */ - t = current_thread; - while (t && t->job != job) - t = t->upper; - if (t) { - /* yes, relink it at end */ - if (tail) - tail->next = job; - else - first_job = job; - tail = job; - job->next = NULL; - } else { - /* no cancel the job */ - pthread_mutex_unlock(&mutex); - sig_monitor(0, job_cancel, job); - free(job); - pthread_mutex_lock(&mutex); - } - } + /* leave */ pthread_mutex_unlock(&mutex); } - diff --git a/src/jobs.h b/src/jobs.h index a99c9622..4b0fa8bb 100644 --- a/src/jobs.h +++ b/src/jobs.h @@ -39,8 +39,6 @@ extern int jobs_call( void (*callback)(int, void*), void *arg); -extern void jobs_terminate(); - extern int jobs_start( int allowed_count, int start_count, @@ -50,3 +48,4 @@ extern int jobs_start( extern void jobs_acquire_event_manager(); +extern void jobs_exit(void (*handler)()); diff --git a/src/main-afb-daemon.c b/src/main-afb-daemon.c index 4ef4b280..c2a53513 100644 --- a/src/main-afb-daemon.c +++ b/src/main-afb-daemon.c @@ -399,6 +399,17 @@ static struct afb_hsrv *start_http_server() | execute_command +--------------------------------------------------------- */ +static void wait_child_and_exit() +{ + pid_t pidchld = childpid; + + childpid = 0; + if (!SELF_PGROUP) + killpg(pidchld, SIGKILL); + waitpid(pidchld, NULL, 0); + exit(0); +} + static void on_sigchld(int signum, siginfo_t *info, void *uctx) { if (info->si_pid == childpid) { @@ -406,11 +417,9 @@ static void on_sigchld(int signum, siginfo_t *info, void *uctx) case CLD_EXITED: case CLD_KILLED: case CLD_DUMPED: - childpid = 0; - if (!SELF_PGROUP) - killpg(info->si_pid, SIGKILL); - waitpid(info->si_pid, NULL, 0); - exit(0); + jobs_exit(wait_child_and_exit); + default: + break; } } } -- 2.16.6