jobs: Refactor exiting jobs
authorJosé Bollo <jose.bollo@iot.bzh>
Fri, 15 Feb 2019 19:49:54 +0000 (20:49 +0100)
committerJosé Bollo <jose.bollo@iot.bzh>
Fri, 22 Mar 2019 11:21:54 +0000 (12:21 +0100)
The new termination function can allow the restart
because it doesn't abort the waiting jobs.

So after calling 'jobs_exit', all threads stop.
The function 'job_start' returns.

The threads that are in blocking state, i.e. in
a call to 'jobs_enter' or 'jobs_call' are stopped.
An error status -1 with errno=EINTR is returned in
that case.

But before returning, that function calls the exit
handler if any.

Change-Id: I85a4b1976b09b18804eb681af940531ae5ace6c3
Signed-off-by: José Bollo <jose.bollo@iot.bzh>
src/jobs.c
src/jobs.h
src/main-afb-daemon.c

index 83ca2ed..936c6f1 100644 (file)
@@ -68,6 +68,7 @@ struct thread
        pthread_t tid;         /**< the thread id */
        volatile unsigned stop: 1;      /**< stop requested */
        volatile unsigned waits: 1;     /**< is waiting? */
+       volatile unsigned leaved: 1;    /**< was leaved? */
 };
 
 /**
@@ -84,7 +85,6 @@ struct sync
        void *arg;              /**< the argument of the callback */
 };
 
-
 /* synchronisation of threads */
 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
 static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
@@ -106,6 +106,8 @@ static struct job *free_jobs;
 /* event loop */
 static struct evmgr *evmgr;
 
+static void (*exit_handler)();
+
 /**
  * Create a new job with the given parameters
  * @param group    the group of the job
@@ -236,6 +238,7 @@ static inline void job_release(struct job *job)
  *               flow, isn't used
  * @param arg    the job to run
  */
+__attribute__((unused))
 static void job_cancel(int signum, void *arg)
 {
        struct job *job = arg;
@@ -319,6 +322,7 @@ static void thread_enter(volatile struct thread *me)
        me->tid = pthread_self();
        me->stop = 0;
        me->waits = 0;
+       me->leaved = 0;
        me->nholder = 0;
        me->upper = current_thread;
        me->next = threads;
@@ -485,13 +489,15 @@ static int start_one_thread()
  *                 The remaining parameter is the parameter 'arg1'
  *                 given here.
  * @param arg      The second argument for 'callback'
+ * @param start    Allow to start a thread if not zero
  * @return 0 in case of success or -1 in case of error
  */
-int jobs_queue(
+static int queue_job(
                const void *group,
                int timeout,
                void (*callback)(int, void*),
-               void *arg)
+               void *arg,
+               int start)
 {
        struct job *job;
        int rc;
@@ -511,7 +517,7 @@ int jobs_queue(
        }
 
        /* start a thread if needed */
-       if (running == started && started < allowed) {
+       if (start && running == started && started < allowed) {
                /* all threads are busy and a new can be started */
                rc = start_one_thread();
                if (rc < 0 && started == 0) {
@@ -536,6 +542,32 @@ error:
        return -1;
 }
 
+/**
+ * Queues a new asynchronous job represented by 'callback' and 'arg'
+ * for the 'group' and the 'timeout'.
+ * Jobs are queued FIFO and are possibly executed in parallel
+ * concurrently except for job of the same group that are
+ * executed sequentially in FIFO order.
+ * @param group    The group of the job or NULL when no group.
+ * @param timeout  The maximum execution time in seconds of the job
+ *                 or 0 for unlimited time.
+ * @param callback The function to execute for achieving the job.
+ *                 Its first parameter is either 0 on normal flow
+ *                 or the signal number that broke the normal flow.
+ *                 The remaining parameter is the parameter 'arg1'
+ *                 given here.
+ * @param arg      The second argument for 'callback'
+ * @return 0 in case of success or -1 in case of error
+ */
+int jobs_queue(
+               const void *group,
+               int timeout,
+               void (*callback)(int, void*),
+               void *arg)
+{
+       return queue_job(group, timeout, callback, arg, 1);
+}
+
 /**
  * Internal helper function for 'jobs_enter'.
  * @see jobs_enter, jobs_leave
@@ -590,7 +622,10 @@ static int do_sync(
        else
                thread_run_external(&sync->thread);
        pthread_mutex_unlock(&mutex);
-       return 0;
+       if (sync->thread.leaved)
+               return 0;
+       errno = EINTR;
+       return -1;
 }
 
 /**
@@ -638,6 +673,7 @@ int jobs_leave(struct jobloop *jobloop)
        if (!t) {
                errno = EINVAL;
        } else {
+               t->leaved = 1;
                t->stop = 1;
                if (t->waits)
                        pthread_cond_broadcast(&cond);
@@ -776,44 +812,23 @@ int jobs_start(int allowed_count, int start_count, int waiter_count, void (*star
        rc = 0;
 error:
        pthread_mutex_unlock(&mutex);
+       if (exit_handler)
+               exit_handler();
        return rc;
 }
 
 /**
- * Terminate all the threads and cancel all pending jobs.
+ * Exit jobs threads and call handler if not NULL.
  */
-void jobs_terminate()
+void jobs_exit(void (*handler)())
 {
-       struct job *job, *head, *tail;
-       pthread_t me, *others;
        struct thread *t;
-       int count;
-
-       /* how am i? */
-       me = pthread_self();
 
        /* request all threads to stop */
        pthread_mutex_lock(&mutex);
-       allowed = 0;
-
-       /* count the number of threads */
-       count = 0;
-       t = threads;
-       while (t) {
-               if (!t->upper && !pthread_equal(t->tid, me))
-                       count++;
-               t = t->next;
-       }
 
-       /* fill the array of threads */
-       others = alloca(count * sizeof *others);
-       count = 0;
-       t = threads;
-       while (t) {
-               if (!t->upper && !pthread_equal(t->tid, me))
-                       others[count++] = t->tid;
-               t = t->next;
-       }
+       /* set the handler */
+       exit_handler = handler;
 
        /* stops the threads */
        t = threads;
@@ -824,41 +839,7 @@ void jobs_terminate()
 
        /* wait the threads */
        pthread_cond_broadcast(&cond);
-       pthread_mutex_unlock(&mutex);
-       while (count)
-               pthread_join(others[--count], NULL);
-       pthread_mutex_lock(&mutex);
 
-       /* cancel pending jobs of other threads */
-       remains = 0;
-       head = first_job;
-       first_job = NULL;
-       tail = NULL;
-       while (head) {
-               /* unlink the job */
-               job = head;
-               head = job->next;
-
-               /* search if job is stacked for current */
-               t = current_thread;
-               while (t && t->job != job)
-                       t = t->upper;
-               if (t) {
-                       /* yes, relink it at end */
-                       if (tail)
-                               tail->next = job;
-                       else
-                               first_job = job;
-                       tail = job;
-                       job->next = NULL;
-               } else {
-                       /* no cancel the job */
-                       pthread_mutex_unlock(&mutex);
-                       sig_monitor(0, job_cancel, job);
-                       free(job);
-                       pthread_mutex_lock(&mutex);
-               }
-       }
+       /* leave */
        pthread_mutex_unlock(&mutex);
 }
-
index a99c962..4b0fa8b 100644 (file)
@@ -39,8 +39,6 @@ extern int jobs_call(
                void (*callback)(int, void*),
                void *arg);
 
-extern void jobs_terminate();
-
 extern int jobs_start(
                int allowed_count,
                int start_count,
@@ -50,3 +48,4 @@ extern int jobs_start(
 
 extern void jobs_acquire_event_manager();
 
+extern void jobs_exit(void (*handler)());
index 4ef4b28..c2a5351 100644 (file)
@@ -399,6 +399,17 @@ static struct afb_hsrv *start_http_server()
  | execute_command
  +--------------------------------------------------------- */
 
+static void wait_child_and_exit()
+{
+       pid_t pidchld = childpid;
+
+       childpid = 0;
+       if (!SELF_PGROUP)
+               killpg(pidchld, SIGKILL);
+       waitpid(pidchld, NULL, 0);
+       exit(0);
+}
+
 static void on_sigchld(int signum, siginfo_t *info, void *uctx)
 {
        if (info->si_pid == childpid) {
@@ -406,11 +417,9 @@ static void on_sigchld(int signum, siginfo_t *info, void *uctx)
                case CLD_EXITED:
                case CLD_KILLED:
                case CLD_DUMPED:
-                       childpid = 0;
-                       if (!SELF_PGROUP)
-                               killpg(info->si_pid, SIGKILL);
-                       waitpid(info->si_pid, NULL, 0);
-                       exit(0);
+                       jobs_exit(wait_child_and_exit);
+               default:
+                       break;
                }
        }
 }