jobs: Add starting mode for jobs
[src/app-framework-binder.git] / src / jobs.c
1 /*
2  * Copyright (C) 2016-2019 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #include <stdlib.h>
21 #include <stdint.h>
22 #include <unistd.h>
23 #include <signal.h>
24 #include <string.h>
25 #include <pthread.h>
26 #include <errno.h>
27 #include <assert.h>
28
29 #include "jobs.h"
30 #include "evmgr.h"
31 #include "sig-monitor.h"
32 #include "verbose.h"
33
34 #define EVENT_TIMEOUT_TOP       ((uint64_t)-1)
35 #define EVENT_TIMEOUT_CHILD     ((uint64_t)10000)
36
37 /** Internal shortcut for callback */
38 typedef void (*job_cb_t)(int, void*);
39
40 /** starting mode for jobs */
41 enum start_mode
42 {
43         Start_Default,  /**< Start a thread if more than one jobs is pending */
44         Start_Urgent,   /**< Always start a thread */
45         Start_Lazy      /**< Never start a thread */
46 };
47
48 /** Description of a pending job */
49 struct job
50 {
51         struct job *next;    /**< link to the next job enqueued */
52         const void *group;   /**< group of the request */
53         job_cb_t callback;   /**< processing callback */
54         void *arg;           /**< argument */
55         int timeout;         /**< timeout in second for processing the request */
56         unsigned blocked: 1; /**< is an other request blocking this one ? */
57         unsigned dropped: 1; /**< is removed ? */
58 };
59
60 /** Description of threads */
61 struct thread
62 {
63         struct thread *next;   /**< next thread of the list */
64         struct thread *upper;  /**< upper same thread */
65         struct thread *nholder;/**< next holder for evloop */
66         pthread_cond_t *cwhold;/**< condition wait for holding */
67         struct job *job;       /**< currently processed job */
68         pthread_t tid;         /**< the thread id */
69         volatile unsigned stop: 1;      /**< stop requested */
70         volatile unsigned waits: 1;     /**< is waiting? */
71         volatile unsigned leaved: 1;    /**< was leaved? */
72 };
73
74 /**
75  * Description of synchronous callback
76  */
77 struct sync
78 {
79         struct thread thread;   /**< thread loop data */
80         union {
81                 void (*callback)(int, void*);   /**< the synchronous callback */
82                 void (*enter)(int signum, void *closure, struct jobloop *jobloop);
83                                 /**< the entering synchronous routine */
84         };
85         void *arg;              /**< the argument of the callback */
86 };
87
88 /* synchronisation of threads */
89 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
90 static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
91
92 /* counts for threads */
93 static int allowed_thread_count = 0; /** allowed count of threads */
94 static int started_thread_count = 0; /** started count of threads */
95 static int busy_thread_count = 0;    /** count of busy threads */
96
97 /* list of threads */
98 static struct thread *threads;
99 static _Thread_local struct thread *current_thread;
100
101 /* counts for jobs */
102 static int remaining_job_count = 0;  /** count of job that can be created */
103 static int allowed_job_count = 0;    /** allowed count of pending jobs */
104
105 /* queue of pending jobs */
106 static struct job *first_pending_job;
107 static struct job *first_free_job;
108
109 /* event loop */
110 static struct evmgr *evmgr;
111
112 static void (*exit_handler)();
113
114 /**
115  * Create a new job with the given parameters
116  * @param group    the group of the job
117  * @param timeout  the timeout of the job (0 if none)
118  * @param callback the function that achieves the job
119  * @param arg      the argument of the callback
120  * @return the created job unblock or NULL when no more memory
121  */
122 static struct job *job_create(
123                 const void *group,
124                 int timeout,
125                 job_cb_t callback,
126                 void *arg)
127 {
128         struct job *job;
129
130         /* try recyle existing job */
131         job = first_free_job;
132         if (job)
133                 first_free_job = job->next;
134         else {
135                 /* allocation without blocking */
136                 pthread_mutex_unlock(&mutex);
137                 job = malloc(sizeof *job);
138                 pthread_mutex_lock(&mutex);
139                 if (!job) {
140                         ERROR("out of memory");
141                         errno = ENOMEM;
142                         goto end;
143                 }
144         }
145         /* initialises the job */
146         job->group = group;
147         job->timeout = timeout;
148         job->callback = callback;
149         job->arg = arg;
150         job->blocked = 0;
151         job->dropped = 0;
152 end:
153         return job;
154 }
155
156 /**
157  * Adds 'job' at the end of the list of jobs, marking it
158  * as blocked if an other job with the same group is pending.
159  * @param job the job to add
160  */
161 static void job_add(struct job *job)
162 {
163         const void *group;
164         struct job *ijob, **pjob;
165
166         /* prepare to add */
167         group = job->group;
168         job->next = NULL;
169
170         /* search end and blockers */
171         pjob = &first_pending_job;
172         ijob = first_pending_job;
173         while (ijob) {
174                 if (group && ijob->group == group)
175                         job->blocked = 1;
176                 pjob = &ijob->next;
177                 ijob = ijob->next;
178         }
179
180         /* queue the jobs */
181         *pjob = job;
182         remaining_job_count--;
183 }
184
185 /**
186  * Get the next job to process or NULL if none.
187  * @return the first job that isn't blocked or NULL
188  */
189 static inline struct job *job_get()
190 {
191         struct job *job = first_pending_job;
192         while (job && job->blocked)
193                 job = job->next;
194         if (job)
195                 remaining_job_count++;
196         return job;
197 }
198
199 /**
200  * Releases the processed 'job': removes it
201  * from the list of jobs and unblock the first
202  * pending job of the same group if any.
203  * @param job the job to release
204  */
205 static inline void job_release(struct job *job)
206 {
207         struct job *ijob, **pjob;
208         const void *group;
209
210         /* first unqueue the job */
211         pjob = &first_pending_job;
212         ijob = first_pending_job;
213         while (ijob != job) {
214                 pjob = &ijob->next;
215                 ijob = ijob->next;
216         }
217         *pjob = job->next;
218
219         /* then unblock jobs of the same group */
220         group = job->group;
221         if (group) {
222                 ijob = job->next;
223                 while (ijob && ijob->group != group)
224                         ijob = ijob->next;
225                 if (ijob)
226                         ijob->blocked = 0;
227         }
228
229         /* recycle the job */
230         job->next = first_free_job;
231         first_free_job = job;
232 }
233
234 /**
235  * Monitored cancel callback for a job.
236  * This function is called by the monitor
237  * to cancel the job when the safe environment
238  * is set.
239  * @param signum 0 on normal flow or the number
240  *               of the signal that interrupted the normal
241  *               flow, isn't used
242  * @param arg    the job to run
243  */
244 __attribute__((unused))
245 static void job_cancel(int signum, void *arg)
246 {
247         struct job *job = arg;
248         job->callback(SIGABRT, job->arg);
249 }
250
251 /**
252  * wakeup the event loop if needed by sending
253  * an event.
254  */
255 static void evloop_wakeup()
256 {
257         if (evmgr)
258                 evmgr_wakeup(evmgr);
259 }
260
261 /**
262  * Release the currently held event loop
263  */
264 static void evloop_release()
265 {
266         struct thread *nh, *ct = current_thread;
267
268         if (ct && evmgr && evmgr_release_if(evmgr, ct)) {
269                 nh = ct->nholder;
270                 ct->nholder = 0;
271                 if (nh) {
272                         evmgr_try_hold(evmgr, nh);
273                         pthread_cond_signal(nh->cwhold);
274                 }
275         }
276 }
277
278 /**
279  * get the eventloop for the current thread
280  */
281 static int evloop_get()
282 {
283         return evmgr && evmgr_try_hold(evmgr, current_thread);
284 }
285
286 /**
287  * acquire the eventloop for the current thread
288  */
289 static void evloop_acquire()
290 {
291         struct thread *pwait, *ct;
292         pthread_cond_t cond;
293
294         /* try to get the evloop */
295         if (!evloop_get()) {
296                 /* failed, init waiting state */
297                 ct = current_thread;
298                 ct->nholder = NULL;
299                 ct->cwhold = &cond;
300                 pthread_cond_init(&cond, NULL);
301
302                 /* queue current thread in holder list */
303                 pwait = evmgr_holder(evmgr);
304                 while (pwait->nholder)
305                         pwait = pwait->nholder;
306                 pwait->nholder = ct;
307
308                 /* wake up the evloop */
309                 evloop_wakeup();
310
311                 /* wait to acquire the evloop */
312                 pthread_cond_wait(&cond, &mutex);
313                 pthread_cond_destroy(&cond);
314         }
315 }
316
317 /**
318  * Enter the thread
319  * @param me the description of the thread to enter
320  */
321 static void thread_enter(volatile struct thread *me)
322 {
323         evloop_release();
324         /* initialize description of itself and link it in the list */
325         me->tid = pthread_self();
326         me->stop = 0;
327         me->waits = 0;
328         me->leaved = 0;
329         me->nholder = 0;
330         me->upper = current_thread;
331         me->next = threads;
332         threads = (struct thread*)me;
333         current_thread = (struct thread*)me;
334 }
335
336 /**
337  * leave the thread
338  * @param me the description of the thread to leave
339  */
340 static void thread_leave()
341 {
342         struct thread **prv, *me;
343
344         /* unlink the current thread and cleanup */
345         me = current_thread;
346         prv = &threads;
347         while (*prv != me)
348                 prv = &(*prv)->next;
349         *prv = me->next;
350
351         current_thread = me->upper;
352 }
353
354 /**
355  * Main processing loop of internal threads with processing jobs.
356  * The loop must be called with the mutex locked
357  * and it returns with the mutex locked.
358  * @param me the description of the thread to use
359  * TODO: how are timeout handled when reentering?
360  */
361 static void thread_run_internal(volatile struct thread *me)
362 {
363         struct job *job;
364
365         /* enter thread */
366         thread_enter(me);
367
368         /* loop until stopped */
369         while (!me->stop) {
370                 /* release the current event loop */
371                 evloop_release();
372
373                 /* get a job */
374                 job = job_get();
375                 if (job) {
376                         /* prepare running the job */
377                         job->blocked = 1; /* mark job as blocked */
378                         me->job = job; /* record the job (only for terminate) */
379
380                         /* run the job */
381                         pthread_mutex_unlock(&mutex);
382                         sig_monitor(job->timeout, job->callback, job->arg);
383                         pthread_mutex_lock(&mutex);
384
385                         /* release the run job */
386                         job_release(job);
387                 /* no job, check event loop wait */
388                 } else if (evloop_get()) {
389                         if (!evmgr_can_run(evmgr)) {
390                                 /* busy ? */
391                                 CRITICAL("Can't enter dispatch while in dispatch!");
392                                 abort();
393                         }
394                         /* run the events */
395                         pthread_mutex_unlock(&mutex);
396                         sig_monitor(0, (void(*)(int,void*))evmgr_job_run, evmgr);
397                         pthread_mutex_lock(&mutex);
398                 } else {
399                         /* no job and no event loop */
400                         busy_thread_count--;
401                         if (!busy_thread_count)
402                                 ERROR("Entering job deep sleep! Check your bindings.");
403                         me->waits = 1;
404                         pthread_cond_wait(&cond, &mutex);
405                         me->waits = 0;
406                         busy_thread_count++;
407                 }
408         }
409         /* cleanup */
410         evloop_release();
411         thread_leave();
412 }
413
414 /**
415  * Main processing loop of external threads.
416  * The loop must be called with the mutex locked
417  * and it returns with the mutex locked.
418  * @param me the description of the thread to use
419  */
420 static void thread_run_external(volatile struct thread *me)
421 {
422         /* enter thread */
423         thread_enter(me);
424
425         /* loop until stopped */
426         me->waits = 1;
427         while (!me->stop)
428                 pthread_cond_wait(&cond, &mutex);
429         me->waits = 0;
430         thread_leave();
431 }
432
433 /**
434  * Root for created threads.
435  */
436 static void thread_main()
437 {
438         struct thread me;
439
440         busy_thread_count++;
441         started_thread_count++;
442         sig_monitor_init_timeouts();
443         thread_run_internal(&me);
444         sig_monitor_clean_timeouts();
445         started_thread_count--;
446         busy_thread_count--;
447 }
448
449 /**
450  * Entry point for created threads.
451  * @param data not used
452  * @return NULL
453  */
454 static void *thread_starter(void *data)
455 {
456         pthread_mutex_lock(&mutex);
457         thread_main();
458         pthread_mutex_unlock(&mutex);
459         return NULL;
460 }
461
462 /**
463  * Starts a new thread
464  * @return 0 in case of success or -1 in case of error
465  */
466 static int start_one_thread()
467 {
468         pthread_t tid;
469         int rc;
470
471         rc = pthread_create(&tid, NULL, thread_starter, NULL);
472         if (rc != 0) {
473                 /* errno = rc; */
474                 WARNING("not able to start thread: %m");
475                 rc = -1;
476         }
477         return rc;
478 }
479
480 /**
481  * Queues a new asynchronous job represented by 'callback' and 'arg'
482  * for the 'group' and the 'timeout'.
483  * Jobs are queued FIFO and are possibly executed in parallel
484  * concurrently except for job of the same group that are
485  * executed sequentially in FIFO order.
486  * @param group    The group of the job or NULL when no group.
487  * @param timeout  The maximum execution time in seconds of the job
488  *                 or 0 for unlimited time.
489  * @param callback The function to execute for achieving the job.
490  *                 Its first parameter is either 0 on normal flow
491  *                 or the signal number that broke the normal flow.
492  *                 The remaining parameter is the parameter 'arg1'
493  *                 given here.
494  * @param arg      The second argument for 'callback'
495  * @param start    The start mode for threads
496  * @return 0 in case of success or -1 in case of error
497  */
498 static int queue_job(
499                 const void *group,
500                 int timeout,
501                 void (*callback)(int, void*),
502                 void *arg,
503                 enum start_mode start_mode)
504 {
505         struct job *job;
506         int rc;
507
508         pthread_mutex_lock(&mutex);
509
510         /* check availability */
511         if (remaining_job_count <= 0) {
512                 ERROR("can't process job with threads: too many jobs");
513                 errno = EBUSY;
514                 goto error;
515         }
516
517         /* allocates the job */
518         job = job_create(group, timeout, callback, arg);
519         if (!job)
520                 goto error;
521
522         /* start a thread if needed */
523         if (start_mode != Start_Lazy
524          && busy_thread_count == started_thread_count
525          && (start_mode == Start_Urgent || remaining_job_count + started_thread_count < allowed_job_count)
526          && started_thread_count < allowed_thread_count) {
527                 /* all threads are busy and a new can be started */
528                 rc = start_one_thread();
529                 if (rc < 0 && started_thread_count == 0) {
530                         ERROR("can't start initial thread: %m");
531                         goto error2;
532                 }
533         }
534
535         /* queues the job */
536         job_add(job);
537
538         /* signal an existing job */
539         pthread_cond_signal(&cond);
540         pthread_mutex_unlock(&mutex);
541         return 0;
542
543 error2:
544         job->next = first_free_job;
545         first_free_job = job;
546 error:
547         pthread_mutex_unlock(&mutex);
548         return -1;
549 }
550
551 /**
552  * Queues a new asynchronous job represented by 'callback' and 'arg'
553  * for the 'group' and the 'timeout'.
554  * Jobs are queued FIFO and are possibly executed in parallel
555  * concurrently except for job of the same group that are
556  * executed sequentially in FIFO order.
557  * @param group    The group of the job or NULL when no group.
558  * @param timeout  The maximum execution time in seconds of the job
559  *                 or 0 for unlimited time.
560  * @param callback The function to execute for achieving the job.
561  *                 Its first parameter is either 0 on normal flow
562  *                 or the signal number that broke the normal flow.
563  *                 The remaining parameter is the parameter 'arg1'
564  *                 given here.
565  * @param arg      The second argument for 'callback'
566  * @return 0 in case of success or -1 in case of error
567  */
568 int jobs_queue(
569                 const void *group,
570                 int timeout,
571                 void (*callback)(int, void*),
572                 void *arg)
573 {
574         return queue_job(group, timeout, callback, arg, Start_Default);
575 }
576
577 /**
578  * Queues lazyly a new asynchronous job represented by 'callback' and 'arg'
579  * for the 'group' and the 'timeout'.
580  * Jobs are queued FIFO and are possibly executed in parallel
581  * concurrently except for job of the same group that are
582  * executed sequentially in FIFO order.
583  * @param group    The group of the job or NULL when no group.
584  * @param timeout  The maximum execution time in seconds of the job
585  *                 or 0 for unlimited time.
586  * @param callback The function to execute for achieving the job.
587  *                 Its first parameter is either 0 on normal flow
588  *                 or the signal number that broke the normal flow.
589  *                 The remaining parameter is the parameter 'arg1'
590  *                 given here.
591  * @param arg      The second argument for 'callback'
592  * @return 0 in case of success or -1 in case of error
593  */
594 int jobs_queue_lazy(
595                 const void *group,
596                 int timeout,
597                 void (*callback)(int, void*),
598                 void *arg)
599 {
600         return queue_job(group, timeout, callback, arg, Start_Lazy);
601 }
602
603 /**
604  * Queues urgently a new asynchronous job represented by 'callback' and 'arg'
605  * for the 'group' and the 'timeout'.
606  * Jobs are queued FIFO and are possibly executed in parallel
607  * concurrently except for job of the same group that are
608  * executed sequentially in FIFO order.
609  * @param group    The group of the job or NULL when no group.
610  * @param timeout  The maximum execution time in seconds of the job
611  *                 or 0 for unlimited time.
612  * @param callback The function to execute for achieving the job.
613  *                 Its first parameter is either 0 on normal flow
614  *                 or the signal number that broke the normal flow.
615  *                 The remaining parameter is the parameter 'arg1'
616  *                 given here.
617  * @param arg      The second argument for 'callback'
618  * @return 0 in case of success or -1 in case of error
619  */
620 int jobs_queue_urgent(
621                 const void *group,
622                 int timeout,
623                 void (*callback)(int, void*),
624                 void *arg)
625 {
626         return queue_job(group, timeout, callback, arg, Start_Urgent);
627 }
628
629 /**
630  * Internal helper function for 'jobs_enter'.
631  * @see jobs_enter, jobs_leave
632  */
633 static void enter_cb(int signum, void *closure)
634 {
635         struct sync *sync = closure;
636         sync->enter(signum, sync->arg, (void*)&sync->thread);
637 }
638
639 /**
640  * Internal helper function for 'jobs_call'.
641  * @see jobs_call
642  */
643 static void call_cb(int signum, void *closure)
644 {
645         struct sync *sync = closure;
646         sync->callback(signum, sync->arg);
647         jobs_leave((void*)&sync->thread);
648 }
649
650 /**
651  * Internal helper for synchronous jobs. It enters
652  * a new thread loop for evaluating the given job
653  * as recorded by the couple 'sync_cb' and 'sync'.
654  * @see jobs_call, jobs_enter, jobs_leave
655  */
656 static int do_sync(
657                 const void *group,
658                 int timeout,
659                 void (*sync_cb)(int signum, void *closure),
660                 struct sync *sync
661 )
662 {
663         struct job *job;
664
665         pthread_mutex_lock(&mutex);
666
667         /* allocates the job */
668         job = job_create(group, timeout, sync_cb, sync);
669         if (!job) {
670                 pthread_mutex_unlock(&mutex);
671                 return -1;
672         }
673
674         /* queues the job */
675         job_add(job);
676
677         /* run until stopped */
678         if (current_thread)
679                 thread_run_internal(&sync->thread);
680         else
681                 thread_run_external(&sync->thread);
682         pthread_mutex_unlock(&mutex);
683         if (sync->thread.leaved)
684                 return 0;
685         errno = EINTR;
686         return -1;
687 }
688
689 /**
690  * Enter a synchronisation point: activates the job given by 'callback'
691  * and 'closure' using 'group' and 'timeout' to control sequencing and
692  * execution time.
693  * @param group the group for sequencing jobs
694  * @param timeout the time in seconds allocated to the job
695  * @param callback the callback that will handle the job.
696  *                 it receives 3 parameters: 'signum' that will be 0
697  *                 on normal flow or the catched signal number in case
698  *                 of interrupted flow, the context 'closure' as given and
699  *                 a 'jobloop' reference that must be used when the job is
700  *                 terminated to unlock the current execution flow.
701  * @param closure the argument to the callback
702  * @return 0 on success or -1 in case of error
703  */
704 int jobs_enter(
705                 const void *group,
706                 int timeout,
707                 void (*callback)(int signum, void *closure, struct jobloop *jobloop),
708                 void *closure
709 )
710 {
711         struct sync sync;
712
713         sync.enter = callback;
714         sync.arg = closure;
715         return do_sync(group, timeout, enter_cb, &sync);
716 }
717
718 /**
719  * Unlocks the execution flow designed by 'jobloop'.
720  * @param jobloop indication of the flow to unlock
721  * @return 0 in case of success of -1 on error
722  */
723 int jobs_leave(struct jobloop *jobloop)
724 {
725         struct thread *t;
726
727         pthread_mutex_lock(&mutex);
728         t = threads;
729         while (t && t != (struct thread*)jobloop)
730                 t = t->next;
731         if (!t) {
732                 errno = EINVAL;
733         } else {
734                 t->leaved = 1;
735                 t->stop = 1;
736                 if (t->waits)
737                         pthread_cond_broadcast(&cond);
738                 else
739                         evloop_wakeup();
740         }
741         pthread_mutex_unlock(&mutex);
742         return -!t;
743 }
744
745 /**
746  * Calls synchronously the job represented by 'callback' and 'arg1'
747  * for the 'group' and the 'timeout' and waits for its completion.
748  * @param group    The group of the job or NULL when no group.
749  * @param timeout  The maximum execution time in seconds of the job
750  *                 or 0 for unlimited time.
751  * @param callback The function to execute for achieving the job.
752  *                 Its first parameter is either 0 on normal flow
753  *                 or the signal number that broke the normal flow.
754  *                 The remaining parameter is the parameter 'arg1'
755  *                 given here.
756  * @param arg      The second argument for 'callback'
757  * @return 0 in case of success or -1 in case of error
758  */
759 int jobs_call(
760                 const void *group,
761                 int timeout,
762                 void (*callback)(int, void*),
763                 void *arg)
764 {
765         struct sync sync;
766
767         sync.callback = callback;
768         sync.arg = arg;
769
770         return do_sync(group, timeout, call_cb, &sync);
771 }
772
773 /**
774  * Ensure that the current running thread can control the event loop.
775  */
776 struct evmgr *jobs_acquire_event_manager()
777 {
778         struct thread lt;
779
780         /* ensure an existing thread environment */
781         if (!current_thread) {
782                 memset(&lt, 0, sizeof lt);
783                 current_thread = &lt;
784         }
785
786         /* lock */
787         pthread_mutex_lock(&mutex);
788
789         /* creates the evloop on need */
790         if (!evmgr)
791                 evmgr_create(&evmgr);
792
793         /* acquire the event loop under lock */
794         if (evmgr)
795                 evloop_acquire();
796
797         /* unlock */
798         pthread_mutex_unlock(&mutex);
799
800         /* release the faked thread environment if needed */
801         if (current_thread == &lt) {
802                 /*
803                  * Releasing it is needed because there is no way to guess
804                  * when it has to be released really. But here is where it is
805                  * hazardous: if the caller modifies the eventloop when it
806                  * is waiting, there is no way to make the change effective.
807                  * A workaround to achieve that goal is for the caller to
808                  * require the event loop a second time after having modified it.
809                  */
810                 NOTICE("Requiring event manager/loop from outside of binder's callback is hazardous!");
811                 if (verbose_wants(Log_Level_Info))
812                         sig_monitor_dumpstack();
813                 evloop_release();
814                 current_thread = NULL;
815         }
816         return evmgr;
817 }
818
819 /**
820  * Enter the jobs processing loop.
821  * @param allowed_count Maximum count of thread for jobs including this one
822  * @param start_count   Count of thread to start now, must be lower.
823  * @param waiter_count  Maximum count of jobs that can be waiting.
824  * @param start         The start routine to activate (can't be NULL)
825  * @return 0 in case of success or -1 in case of error.
826  */
827 int jobs_start(
828         int allowed_count,
829         int start_count,
830         int waiter_count,
831         void (*start)(int signum, void* arg),
832         void *arg)
833 {
834         int rc, launched;
835         struct job *job;
836
837         assert(allowed_count >= 1);
838         assert(start_count >= 0);
839         assert(waiter_count > 0);
840         assert(start_count <= allowed_count);
841
842         rc = -1;
843         pthread_mutex_lock(&mutex);
844
845         /* check whether already running */
846         if (current_thread || allowed_thread_count) {
847                 ERROR("thread already started");
848                 errno = EINVAL;
849                 goto error;
850         }
851
852         /* records the allowed count */
853         allowed_thread_count = allowed_count;
854         started_thread_count = 0;
855         busy_thread_count = 0;
856         remaining_job_count = waiter_count;
857         allowed_job_count = waiter_count;
858
859         /* start at least one thread: the current one */
860         launched = 1;
861         while (launched < start_count) {
862                 if (start_one_thread() != 0) {
863                         ERROR("Not all threads can be started");
864                         goto error;
865                 }
866                 launched++;
867         }
868
869         /* queue the start job */
870         job = job_create(NULL, 0, start, arg);
871         if (!job)
872                 goto error;
873         job_add(job);
874
875         /* run until end */
876         thread_main();
877         rc = 0;
878 error:
879         pthread_mutex_unlock(&mutex);
880         if (exit_handler)
881                 exit_handler();
882         return rc;
883 }
884
885 /**
886  * Exit jobs threads and call handler if not NULL.
887  */
888 void jobs_exit(void (*handler)())
889 {
890         struct thread *t;
891
892         /* request all threads to stop */
893         pthread_mutex_lock(&mutex);
894
895         /* set the handler */
896         exit_handler = handler;
897
898         /* stops the threads */
899         t = threads;
900         while (t) {
901                 t->stop = 1;
902                 t = t->next;
903         }
904
905         /* wake up the threads */
906         pthread_cond_broadcast(&cond);
907
908         /* leave */
909         pthread_mutex_unlock(&mutex);
910 }