jobs: Fix callsync hanging 68/22868/2 8.0.3 8.0.4 8.0.5 8.0.6 halibut/8.0.3 halibut/8.0.4 halibut/8.0.5 halibut/8.0.6 halibut_8.0.3 halibut_8.0.4 halibut_8.0.5 halibut_8.0.6
authorJose Bollo <jose.bollo@iot.bzh>
Wed, 6 Nov 2019 10:12:16 +0000 (11:12 +0100)
committerJan-Simon Moeller <jsmoeller@linuxfoundation.org>
Wed, 6 Nov 2019 11:08:23 +0000 (11:08 +0000)
The function implementing jobs_enter, used by
implementations of synchronous calls, was not
taking care of waking up a thread. This had the
effect of blocking calls made by an external thread.

Bug-AGL: SPEC-2937

Change-Id: I4bf0265b4c029fb619ef7128824ee9d46a45996e
Signed-off-by: Jose Bollo <jose.bollo@iot.bzh>
src/jobs.c

index 94bdce8..6bbffc8 100644 (file)
@@ -493,7 +493,7 @@ static int start_one_thread()
  * @param start    Allow to start a thread if not zero
  * @return 0 in case of success or -1 in case of error
  */
-static int queue_job(
+static int queue_job_internal(
                const void *group,
                int timeout,
                void (*callback)(int, void*),
@@ -501,9 +501,14 @@ static int queue_job(
                int start)
 {
        struct job *job;
-       int rc;
+       int rc, busy;
 
-       pthread_mutex_lock(&mutex);
+       /* check availability */
+       if (remains <= 0) {
+               ERROR("can't process job with threads: too many jobs");
+               errno = EBUSY;
+               goto error;
+       }
 
        /* allocates the job */
        job = job_create(group, timeout, callback, arg);
@@ -518,31 +523,68 @@ static int queue_job(
        }
 
        /* start a thread if needed */
-       if (start && running == started && started < allowed) {
+       busy = running == started;
+       if (start && busy && started < allowed) {
                /* all threads are busy and a new can be started */
                rc = start_one_thread();
                if (rc < 0 && started == 0) {
                        ERROR("can't start initial thread: %m");
                        goto error2;
                }
+               busy = 0;
        }
 
        /* queues the job */
        job_add(job);
 
-       /* signal an existing job */
+       /* wakeup an evloop if needed */
+       if (busy)
+               evloop_wakeup();
+
        pthread_cond_signal(&cond);
-       pthread_mutex_unlock(&mutex);
        return 0;
 
 error2:
        job->next = free_jobs;
        free_jobs = job;
 error:
-       pthread_mutex_unlock(&mutex);
        return -1;
 }
 
+/**
+ * Queues a new asynchronous job represented by 'callback' and 'arg'
+ * for the 'group' and the 'timeout'.
+ * Jobs are queued FIFO and are possibly executed in parallel
+ * concurrently except for job of the same group that are
+ * executed sequentially in FIFO order.
+ * @param group    The group of the job or NULL when no group.
+ * @param timeout  The maximum execution time in seconds of the job
+ *                 or 0 for unlimited time.
+ * @param callback The function to execute for achieving the job.
+ *                 Its first parameter is either 0 on normal flow
+ *                 or the signal number that broke the normal flow.
+ *                 The remaining parameter is the parameter 'arg1'
+ *                 given here.
+ * @param arg      The second argument for 'callback'
+ * @param start    Allow to start a thread if not zero
+ * @return 0 in case of success or -1 in case of error
+ */
+static int queue_job(
+               const void *group,
+               int timeout,
+               void (*callback)(int, void*),
+               void *arg,
+               int start)
+{
+       int rc;
+
+       pthread_mutex_lock(&mutex);
+       rc = queue_job_internal(group, timeout, callback, arg, start);
+       pthread_mutex_unlock(&mutex);
+       return rc;
+
+}
+
 /**
  * Queues a new asynchronous job represented by 'callback' and 'arg'
  * for the 'group' and the 'timeout'.
@@ -603,30 +645,24 @@ static int do_sync(
                struct sync *sync
 )
 {
-       struct job *job;
+       int rc;
 
        pthread_mutex_lock(&mutex);
 
-       /* allocates the job */
-       job = job_create(group, timeout, sync_cb, sync);
-       if (!job) {
-               pthread_mutex_unlock(&mutex);
-               return -1;
+       rc = queue_job_internal(group, timeout, sync_cb, sync, 1);
+       if (rc == 0) {
+               /* run until stopped */
+               if (current_thread)
+                       thread_run_internal(&sync->thread);
+               else
+                       thread_run_external(&sync->thread);
+               if (!sync->thread.leaved) {
+                       errno = EINTR;
+                       rc = -1;
+               }
        }
-
-       /* queues the job */
-       job_add(job);
-
-       /* run until stopped */
-       if (current_thread)
-               thread_run_internal(&sync->thread);
-       else
-               thread_run_external(&sync->thread);
        pthread_mutex_unlock(&mutex);
-       if (sync->thread.leaved)
-               return 0;
-       errno = EINTR;
-       return -1;
+       return rc;
 }
 
 /**