Rework the jobs
authorJosé Bollo <jose.bollo@iot.bzh>
Mon, 3 Apr 2017 15:22:39 +0000 (17:22 +0200)
committerJosé Bollo <jose.bollo@iot.bzh>
Mon, 3 Apr 2017 15:22:39 +0000 (17:22 +0200)
Enforce starting jobs with acquiring the
calling thread.

Removes invoke methods in favour of
enter/leave synchronisation.

Change-Id: I7086f7f53b919b43ddafd2355316abc0d3516f49
Signed-off-by: José Bollo <jose.bollo@iot.bzh>
src/jobs.c
src/jobs.h
src/main.c
src/tests/test-thread.c
src/tests/test-thread.sh

index 3d912a5..cf1bb55 100644 (file)
@@ -145,45 +145,31 @@ end:
 }
 
 /**
- * Adds 'job1' and 'job2' at the end of the list of jobs, marking it
+ * Adds 'job' at the end of the list of jobs, marking it
  * as blocked if an other job with the same group is pending.
- * @param job1 the first job to add
- * @param job2 the second job to add or NULL
+ * @param job the job to add
  */
-static void job_add2(struct job *job1, struct job *job2)
+static void job_add(struct job *job)
 {
-       void *group1, *group2, *group;
+       void *group;
        struct job *ijob, **pjob;
 
        /* prepare to add */
-       group1 = job1->group;
-       job1->next = job2;
-       if (!job2)
-               group2 = NULL;
-       else {
-               job2->next = NULL;
-               group2 = job2->group;
-               if (group2 && group2 == group1)
-                       job2->blocked = 1;
-       }
+       group = job->group;
+       job->next = NULL;
 
        /* search end and blockers */
        pjob = &first_job;
        ijob = first_job;
        while (ijob) {
-               group = ijob->group;
-               if (group) {
-                       if (group == group1)
-                               job1->blocked = 1;
-                       if (group == group2)
-                               job2->blocked = 1;
-               }
+               if (group && ijob->group == group)
+                       job->blocked = 1;
                pjob = &ijob->next;
                ijob = ijob->next;
        }
 
        /* queue the jobs */
-       *pjob = job1;
+       *pjob = job;
 }
 
 /**
@@ -321,6 +307,8 @@ static void thread_run(volatile struct thread *me)
        threads = (struct thread*)me;
        started++;
 
+       NOTICE("job thread starting %d(/%d) %s", started, allowed, me->upper ? "child" : "parent");
+
        /* loop until stopped */
        me->events = NULL;
        while (!me->stop) {
@@ -368,6 +356,7 @@ static void thread_run(volatile struct thread *me)
                        }
                }
        }
+       NOTICE("job thread stoping %d(/%d) %s", started, allowed, me->upper ? "child" : "parent");
 
        /* unlink the current thread and cleanup */
        started--;
@@ -551,7 +540,7 @@ int jobs_queue3(
 
        /* queues the job */
        remains--;
-       job_add2(job, NULL);
+       job_add(job);
 
        /* signal an existing job */
        pthread_cond_signal(&cond);
@@ -568,155 +557,161 @@ error:
 }
 
 /**
- * Run a asynchronous job represented by 'callback'
- * with the 'timeout' but only returns after job completion.
- * @param timeout  The maximum execution time in seconds of the job
- *                 or 0 for unlimited time.
- * @param callback The function to execute for achieving the job.
- *                 Its first parameter is either 0 on normal flow
- *                 or the signal number that broke the normal flow.
- * @return 0 in case of success or -1 in case of error
+ * Enter a synchronisation point: activates the job given by 'callback'
+ * @param group the gro
  */
-int jobs_invoke0(
+int jobs_enter(
+               void *group,
                int timeout,
-               void (*callback)(int signum))
+               void (*callback)(int signum, void *closure, struct jobloop *jobloop),
+               void *closure)
 {
-       return jobs_invoke3(timeout, (job_cb_t)callback, NULL, NULL, NULL);
-}
+       
+       struct job *job;
+       struct thread me;
 
-/**
- * Run a asynchronous job represented by 'callback' and 'arg1'
- * with the 'timeout' but only returns after job completion.
- * @param timeout  The maximum execution time in seconds of the job
- *                 or 0 for unlimited time.
- * @param callback The function to execute for achieving the job.
- *                 Its first parameter is either 0 on normal flow
- *                 or the signal number that broke the normal flow.
- *                 The remaining parameter is the parameter 'arg1'
- *                 given here.
- * @param arg1     The second argument for 'callback'
- * @return 0 in case of success or -1 in case of error
- */
-int jobs_invoke(
-               int timeout,
-               void (*callback)(int, void*),
-               void *arg)
-{
-       return jobs_invoke3(timeout, (job_cb_t)callback, arg, NULL, NULL);
-}
+       pthread_mutex_lock(&mutex);
 
-/**
- * Run a asynchronous job represented by 'callback' and 'arg[12]'
- * with the 'timeout' but only returns after job completion.
- * @param timeout  The maximum execution time in seconds of the job
- *                 or 0 for unlimited time.
- * @param callback The function to execute for achieving the job.
- *                 Its first parameter is either 0 on normal flow
- *                 or the signal number that broke the normal flow.
- *                 The remaining parameters are the parameters 'arg[12]'
- *                 given here.
- * @param arg1     The second argument for 'callback'
- * @param arg2     The third argument for 'callback'
- * @return 0 in case of success or -1 in case of error
- */
-int jobs_invoke2(
-               int timeout,
-               void (*callback)(int, void*, void*),
-               void *arg1,
-               void *arg2)
-{
-       return jobs_invoke3(timeout, (job_cb_t)callback, arg1, arg2, NULL);
+       /* allocates the job */
+       job = job_create(group, timeout, (job_cb_t)callback, closure, &me, NULL);
+       if (!job) {
+               ERROR("out of memory");
+               errno = ENOMEM;
+               pthread_mutex_unlock(&mutex);
+               return -1;
+       }
+
+       /* queues the job */
+       job_add(job);
+
+       /* run until stopped */
+       thread_run(&me);
+       pthread_mutex_unlock(&mutex);
+       return 0;
 }
 
-/**
- * Stops the thread pointed by 'arg1'. Used with
- * invoke familly to return to the caller after completion.
- * @param signum Unused
- * @param arg1   The thread to stop
- * @param arg2   Unused
- * @param arg3   Unused
- */
-static void unlock_invoker(int signum, void *arg1, void *arg2, void *arg3)
+int jobs_leave(struct jobloop *jobloop)
 {
-       struct thread *t = arg1;
+       struct thread *t;
        pthread_mutex_lock(&mutex);
-       t->stop = 1;
-       if (t->waits)
-               pthread_cond_broadcast(&cond);
+
+       t = threads;
+       while (t && t != (struct thread*)jobloop)
+               t = t->next;
+       if (!t) {
+               errno = EINVAL;
+       } else {
+               t->stop = 1;
+               if (t->waits)
+                       pthread_cond_broadcast(&cond);
+       }
        pthread_mutex_unlock(&mutex);
+       return -!t;
 }
 
 /**
- * Run a asynchronous job represented by 'callback' and 'arg[123]'
- * with the 'timeout' but only returns after job completion.
- * @param timeout  The maximum execution time in seconds of the job
- *                 or 0 for unlimited time.
- * @param callback The function to execute for achieving the job.
- *                 Its first parameter is either 0 on normal flow
- *                 or the signal number that broke the normal flow.
- *                 The remaining parameters are the parameters 'arg[123]'
- *                 given here.
- * @param arg1     The second argument for 'callback'
- * @param arg2     The third argument for 'callback'
- * @param arg3     The forth argument for 'callback'
- * @return 0 in case of success or -1 in case of error
+ * Gets a sd_event item for the current thread.
+ * @return a sd_event or NULL in case of error
  */
-int jobs_invoke3(
-               int timeout,
-               void (*callback)(int, void*, void *, void*),
-               void *arg1,
-               void *arg2,
-               void *arg3)
+struct sd_event *jobs_get_sd_event()
 {
-       struct job *job1, *job2;
-       struct thread me;
-       
+       struct events *events;
+       struct thread *me;
+       int rc;
+
        pthread_mutex_lock(&mutex);
 
-       /* allocates the job */
-       job1 = job_create(&me, timeout, callback, arg1, arg2, arg3);
-       job2 = job_create(&me, 0, unlock_invoker, &me, NULL, NULL);
-       if (!job1 || !job2) {
-               ERROR("out of memory");
-               errno = ENOMEM;
-               if (job1) {
-                       job1->next = free_jobs;
-                       free_jobs = job1;
+       /* search events on stack */
+       me = current;
+       while (me && !me->events)
+               me = me->upper;
+       if (me)
+               /* return the stacked events */
+               events = me->events;
+       else {
+               /* search an available events */
+               events = events_get();
+               if (!events) {
+                       /* not found, check if creation possible */
+                       if (nevents >= allowed) {
+                               ERROR("not possible to add a new event");
+                               events = NULL;
+                       } else {
+                               events = malloc(sizeof *events);
+                               if (events && (rc = sd_event_new(&events->event)) >= 0) {
+                                       if (nevents < started || start_one_thread() >= 0) {
+                                               events->runs = 0;
+                                               events->next = first_events;
+                                               first_events = events;
+                                       } else {
+                                               ERROR("can't start thread for events");
+                                               sd_event_unref(events->event);
+                                               free(events);
+                                               events = NULL;
+                                       }
+                               } else {
+                                       if (!events) {
+                                               ERROR("out of memory");
+                                               errno = ENOMEM;
+                                       } else {
+                                               free(events);
+                                               ERROR("creation of sd_event failed: %m");
+                                               events = NULL;
+                                               errno = -rc;
+                                       } 
+                               }
+                       }
                }
-               if (job2) {
-                       job2->next = free_jobs;
-                       free_jobs = job2;
+               if (events) {
+                       /* */
+                       me = current;
+                       if (me) {
+                               events->runs = 1;
+                               me->events = events;
+                       } else {
+                               WARNING("event returned for unknown thread!");
+                       }
                }
-               pthread_mutex_unlock(&mutex);
-               return -1;
        }
-
-       /* queues the job */
-       job_add2(job1, job2);
-
-       /* run until stopped */
-       thread_run(&me);
        pthread_mutex_unlock(&mutex);
-       return 0;
+       return events ? events->event : NULL;
 }
 
 /**
- * Initialise the job stuff.
- * @param allowed_count Maximum count of thread for jobs (can be 0,
- *                      see 'jobs_add_me' for merging new threads)
+ * Enter the jobs processing loop.
+ * @param allowed_count Maximum count of thread for jobs including this one
  * @param start_count   Count of thread to start now, must be lower.
  * @param waiter_count  Maximum count of jobs that can be waiting.
+ * @param start         The start routine to activate (can't be NULL)
  * @return 0 in case of success or -1 in case of error.
  */
-int jobs_init(int allowed_count, int start_count, int waiter_count)
+int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)())
 {
        int rc, launched;
+       struct thread me;
+       struct job *job;
 
-       assert(allowed_count >= 0);
+       assert(allowed_count >= 1);
        assert(start_count >= 0);
        assert(waiter_count > 0);
        assert(start_count <= allowed_count);
 
+       rc = -1;
+       pthread_mutex_lock(&mutex);
+
+       /* check whether already running */
+       if (current || allowed) {
+               ERROR("thread already started");
+               errno = EINVAL;
+               goto error;
+       }
+
+       /* start */
+       if (sig_monitor_init() < 0) {
+               ERROR("failed to initialise signal handlers");
+               goto error;
+       }
+
        /* records the allowed count */
        allowed = allowed_count;
        started = 0;
@@ -724,16 +719,30 @@ int jobs_init(int allowed_count, int start_count, int waiter_count)
        remains = waiter_count;
 
        /* start at least one thread */
-       pthread_mutex_lock(&mutex);
        launched = 0;
-       while (launched < start_count && start_one_thread() == 0)
+       while ((launched + 1) < start_count) {
+               if (start_one_thread() != 0) {
+                       ERROR("Not all threads can be started");
+                       goto error;
+               }
                launched++;
-       rc = -(launched != start_count);
-       pthread_mutex_unlock(&mutex);
+       }
+
+       /* queue the start job */
+       job = job_create(NULL, 0, (job_cb_t)start, NULL, NULL, NULL);
+       if (!job) {
+               ERROR("out of memory");
+               errno = ENOMEM;
+               goto error;
+       }
+       job_add(job);
+       remains--;
 
-       /* end */
-       if (rc)
-               ERROR("Not all threads can be started");
+       /* run until end */
+       thread_run(&me);
+       rc = 0;
+error:
+       pthread_mutex_unlock(&mutex);
        return rc;
 }
 
@@ -820,130 +829,3 @@ void jobs_terminate()
        pthread_mutex_unlock(&mutex);
 }
 
-/**
- * Adds the current thread to the pool of threads
- * processing the jobs. Returns normally when the threads are
- * terminated or immediately with an error if the thread is
- * already in the pool.
- * @return 0 in case of success or -1 in case of error
- */
-int jobs_add_me()
-{
-       struct thread me;
-
-       /* check whether already running */
-       if (current) {
-               ERROR("thread already running");
-               errno = EINVAL;
-               return -1;
-       }
-
-       /* allowed... */
-       pthread_mutex_lock(&mutex);
-       allowed++;
-       thread_run(&me);
-       allowed--;
-       pthread_mutex_unlock(&mutex);
-       return 0;
-}
-
-/**
- * Gets a sd_event item for the current thread.
- * @return a sd_event or NULL in case of error
- */
-struct sd_event *jobs_get_sd_event()
-{
-       struct events *events;
-       struct thread *me;
-       int rc;
-
-       pthread_mutex_lock(&mutex);
-
-       /* search events on stack */
-       me = current;
-       while (me && !me->events)
-               me = me->upper;
-       if (me)
-               /* return the stacked events */
-               events = me->events;
-       else {
-               /* search an available events */
-               events = events_get();
-               if (!events) {
-                       /* not found, check if creation possible */
-                       if (nevents >= allowed) {
-                               ERROR("not possible to add a new event");
-                               events = NULL;
-                       } else {
-                               events = malloc(sizeof *events);
-                               if (events && (rc = sd_event_new(&events->event)) >= 0) {
-                                       if (nevents < started || start_one_thread() >= 0) {
-                                               events->runs = 0;
-                                               events->next = first_events;
-                                               first_events = events;
-                                       } else {
-                                               ERROR("can't start thread for events");
-                                               sd_event_unref(events->event);
-                                               free(events);
-                                               events = NULL;
-                                       }
-                               } else {
-                                       if (!events) {
-                                               ERROR("out of memory");
-                                               errno = ENOMEM;
-                                       } else {
-                                               free(events);
-                                               ERROR("creation of sd_event failed: %m");
-                                               events = NULL;
-                                               errno = -rc;
-                                       } 
-                               }
-                       }
-               }
-               if (events) {
-                       /* */
-                       me = current;
-                       if (me) {
-                               events->runs = 1;
-                               me->events = events;
-                       } else {
-                               WARNING("event returned for unknown thread!");
-                       }
-               }
-       }
-       pthread_mutex_unlock(&mutex);
-       return events ? events->event : NULL;
-}
-
-/**
- * Enter the jobs processing loop.
- * @param allowed_count Maximum count of thread for jobs including this one
- * @param start_count   Count of thread to start now, must be lower.
- * @param waiter_count  Maximum count of jobs that can be waiting.
- * @param start         The start routine to activate (can't be NULL)
- * @return 0 in case of success or -1 in case of error.
- */
-int jobs_enter(int allowed_count, int start_count, int waiter_count, void (*start)())
-{
-       /* start */
-       if (sig_monitor_init() < 0) {
-               ERROR("failed to initialise signal handlers");
-               return -1;
-       }
-
-       /* init job processing */
-       if (jobs_init(allowed_count, start_count, waiter_count) < 0) {
-               ERROR("failed to initialise threading");
-               return -1;
-       }
-
-       /* queue the start job */
-       if (jobs_queue0(NULL, 0, (void(*)(int))start) < 0) {
-               ERROR("failed to start runnning jobs");
-               return -1;
-       }
-
-       /* turn as processing thread */
-       return jobs_add_me();
-}
-
index 6eb0a83..f508e3c 100644 (file)
@@ -18,6 +18,7 @@
 #pragma once
 
 struct sd_event;
+struct jobloop;
 
 extern int jobs_queue0(
                void *group,
@@ -45,33 +46,17 @@ extern int jobs_queue3(
                void *arg2,
                void *arg3);
 
-extern int jobs_invoke0(
-               int timeout,
-               void (*callback)(int signum));
-
-extern int jobs_invoke(
-               int timeout,
-               void (*callback)(int signum, void* arg),
-               void *arg);
-
-extern int jobs_invoke2(
+extern int jobs_enter(
+               void *group,
                int timeout,
-               void (*callback)(int signum, void* arg1, void *arg2),
-               void *arg1,
-               void *arg2);
+               void (*callback)(int signum, void *closure, struct jobloop *jobloop),
+               void *closure);
 
-extern int jobs_invoke3(
-               int timeout,
-               void (*callback)(int signum, void* arg1, void *arg2, void *arg3),
-               void *arg1,
-               void *arg2,
-               void *arg3);
+extern int jobs_leave(struct jobloop *jobloop);
 
 extern struct sd_event *jobs_get_sd_event();
 
-extern int jobs_init(int allowed_count, int start_count, int waiter_count);
-extern int jobs_add_me();
 extern void jobs_terminate();
 
-extern int jobs_enter(int allowed_count, int start_count, int waiter_count, void (*start)());
+extern int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)());
 
index 71b2138..1c85411 100644 (file)
@@ -489,13 +489,13 @@ int main(int argc, char *argv[])
        }
 
        /* handle groups */
-       atexit(exit_handler);
+//     atexit(exit_handler);
 
        /* ignore any SIGPIPE */
        signal(SIGPIPE, SIG_IGN);
 
        /* enter job processing */
-       jobs_enter(3, 1, 20, start);
+       jobs_start(3, 0, 50, start);
        WARNING("hoops returned from jobs_enter! [report bug]");
        return 1;
 }
index 30e27a2..d3ce08c 100644 (file)
@@ -81,7 +81,7 @@ void terminate(int signum)
        exit(0);
 }
 
-int main()
+void start()
 {
        int i;
        struct foo *foo;
@@ -89,7 +89,6 @@ int main()
        struct timespec ts;
 
        req.itf = &itf;
-       jobs_init(4, 0, 20000);
        for (i = 0 ; i  < 10000 ; i++) {
                req.closure = foo = malloc(sizeof *foo);
                foo->value = i;
@@ -97,18 +96,26 @@ int main()
                afb_thread_req_call(req, process, 5, (&ts) + (i % 7));
                unref(foo);
                if (i == 5000)
-#if 0
-                       jobs_invoke0(0, terminate);
-#else
                        jobs_queue0(NULL, 0, terminate);
-#endif
                ts.tv_sec = 0;
                ts.tv_nsec = 1000000;
 //             nanosleep(&ts, NULL);
        }
-       return -jobs_add_me();
 }
 
 
 
+int main()
+{
+       int i;
+       struct foo *foo;
+       struct afb_req req;
+       struct timespec ts;
+
+       req.itf = &itf;
+       jobs_start(4, 0, 20000, start);
+       return 1;
+}
+
+
 
index a9e2ee3..a7bb45a 100755 (executable)
@@ -1,4 +1,4 @@
 #!/bin/sh
 
-cc test-thread.c ../afb-thread.c ../verbose.c ../sig-monitor.c ../jobs.c -o test-thread -lrt -lpthread -lsystemd I../../include -g 
+cc test-thread.c ../afb-thread.c ../verbose.c ../sig-monitor.c ../jobs.c -o test-thread -lrt -lpthread -lsystemd -I../../include -g 
 ./test-thread