Update copyright dates
[src/app-framework-binder.git] / src / jobs.c
index a9773aa..c2a2ec3 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2016-2019 "IoT.bzh"
+ * Copyright (C) 2015-2020 "IoT.bzh"
  * Author José Bollo <jose.bollo@iot.bzh>
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -502,7 +502,7 @@ static int start_one_thread()
  * @param start    The start mode for threads
  * @return 0 in case of success or -1 in case of error
  */
-static int queue_job(
+static int queue_job_internal(
                const void *group,
                int timeout,
                void (*callback)(int, void*),
@@ -510,9 +510,7 @@ static int queue_job(
                enum start_mode start_mode)
 {
        struct job *job;
-       int rc;
-
-       pthread_mutex_lock(&mutex);
+       int rc, busy;
 
        /* check availability */
        if (remaining_job_count <= 0) {
@@ -527,8 +525,9 @@ static int queue_job(
                goto error;
 
        /* start a thread if needed */
+       busy = busy_thread_count == started_thread_count;
        if (start_mode != Start_Lazy
-        && busy_thread_count == started_thread_count
+        && busy
         && (start_mode == Start_Urgent || remaining_job_count + started_thread_count < allowed_job_count)
         && started_thread_count < allowed_thread_count) {
                /* all threads are busy and a new can be started */
@@ -537,24 +536,60 @@ static int queue_job(
                        ERROR("can't start initial thread: %m");
                        goto error2;
                }
+               busy = 0;
        }
 
        /* queues the job */
        job_add(job);
 
-       /* signal an existing job */
+       /* wakeup an evloop if needed */
+       if (busy)
+               evloop_wakeup();
+
        pthread_cond_signal(&cond);
-       pthread_mutex_unlock(&mutex);
        return 0;
 
 error2:
        job->next = first_free_job;
        first_free_job = job;
 error:
-       pthread_mutex_unlock(&mutex);
        return -1;
 }
 
+/**
+ * Queues a new asynchronous job represented by 'callback' and 'arg'
+ * for the 'group' and the 'timeout'.
+ * Jobs are queued FIFO and are possibly executed in parallel
+ * concurrently except for job of the same group that are
+ * executed sequentially in FIFO order.
+ * @param group    The group of the job or NULL when no group.
+ * @param timeout  The maximum execution time in seconds of the job
+ *                 or 0 for unlimited time.
+ * @param callback The function to execute for achieving the job.
+ *                 Its first parameter is either 0 on normal flow
+ *                 or the signal number that broke the normal flow.
+ *                 The remaining parameter is the parameter 'arg1'
+ *                 given here.
+ * @param arg      The second argument for 'callback'
+ * @param start    The start mode for threads
+ * @return 0 in case of success or -1 in case of error
+ */
+static int queue_job(
+               const void *group,
+               int timeout,
+               void (*callback)(int, void*),
+               void *arg,
+               enum start_mode start_mode)
+{
+       int rc;
+
+       pthread_mutex_lock(&mutex);
+       rc = queue_job_internal(group, timeout, callback, arg, start_mode);
+       pthread_mutex_unlock(&mutex);
+       return rc;
+
+}
+
 /**
  * Queues a new asynchronous job represented by 'callback' and 'arg'
  * for the 'group' and the 'timeout'.
@@ -667,30 +702,24 @@ static int do_sync(
                struct sync *sync
 )
 {
-       struct job *job;
+       int rc;
 
        pthread_mutex_lock(&mutex);
 
-       /* allocates the job */
-       job = job_create(group, timeout, sync_cb, sync);
-       if (!job) {
-               pthread_mutex_unlock(&mutex);
-               return -1;
+       rc = queue_job_internal(group, timeout, sync_cb, sync, Start_Default);
+       if (rc == 0) {
+               /* run until stopped */
+               if (current_thread)
+                       thread_run_internal(&sync->thread);
+               else
+                       thread_run_external(&sync->thread);
+               if (!sync->thread.leaved) {
+                       errno = EINTR;
+                       rc = -1;
+               }
        }
-
-       /* queues the job */
-       job_add(job);
-
-       /* run until stopped */
-       if (current_thread)
-               thread_run_internal(&sync->thread);
-       else
-               thread_run_external(&sync->thread);
        pthread_mutex_unlock(&mutex);
-       if (sync->thread.leaved)
-               return 0;
-       errno = EINTR;
-       return -1;
+       return rc;
 }
 
 /**
@@ -909,6 +938,7 @@ void jobs_exit(void (*handler)())
        }
 
        /* wake up the threads */
+       evloop_wakeup();
        pthread_cond_broadcast(&cond);
 
        /* leave */