watchdog: Isolate the watchdog from jobs
[src/app-framework-binder.git] / src / jobs.c
1 /*
2  * Copyright (C) 2016-2019 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #include <stdlib.h>
21 #include <stdint.h>
22 #include <unistd.h>
23 #include <signal.h>
24 #include <string.h>
25 #include <time.h>
26 #include <sys/syscall.h>
27 #include <pthread.h>
28 #include <errno.h>
29 #include <assert.h>
30 #include <sys/eventfd.h>
31
32 #include <systemd/sd-event.h>
33
34 #include "jobs.h"
35 #include "sig-monitor.h"
36 #include "verbose.h"
37
38 #define EVENT_TIMEOUT_TOP       ((uint64_t)-1)
39 #define EVENT_TIMEOUT_CHILD     ((uint64_t)10000)
40
41 struct thread;
42
43 /** Internal shortcut for callback */
44 typedef void (*job_cb_t)(int, void*);
45
46 /** Description of a pending job */
47 struct job
48 {
49         struct job *next;    /**< link to the next job enqueued */
50         const void *group;   /**< group of the request */
51         job_cb_t callback;   /**< processing callback */
52         void *arg;           /**< argument */
53         int timeout;         /**< timeout in second for processing the request */
54         unsigned blocked: 1; /**< is an other request blocking this one ? */
55         unsigned dropped: 1; /**< is removed ? */
56 };
57
58 /** Description of handled event loops */
59 struct evloop
60 {
61         unsigned state;        /**< encoded state */
62         int efd;               /**< event notification */
63         struct sd_event *sdev; /**< the systemd event loop */
64         struct thread *holder; /**< holder of the evloop */
65 };
66
67 #define EVLOOP_STATE_WAIT           1U
68 #define EVLOOP_STATE_RUN            2U
69
70 /** Description of threads */
71 struct thread
72 {
73         struct thread *next;   /**< next thread of the list */
74         struct thread *upper;  /**< upper same thread */
75         struct thread *nholder;/**< next holder for evloop */
76         pthread_cond_t *cwhold;/**< condition wait for holding */
77         struct job *job;       /**< currently processed job */
78         pthread_t tid;         /**< the thread id */
79         volatile unsigned stop: 1;      /**< stop requested */
80         volatile unsigned waits: 1;     /**< is waiting? */
81 };
82
83 /**
84  * Description of synchronous callback
85  */
86 struct sync
87 {
88         struct thread thread;   /**< thread loop data */
89         union {
90                 void (*callback)(int, void*);   /**< the synchronous callback */
91                 void (*enter)(int signum, void *closure, struct jobloop *jobloop);
92                                 /**< the entering synchronous routine */
93         };
94         void *arg;              /**< the argument of the callback */
95 };
96
97
98 /* synchronisation of threads */
99 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
100 static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
101
102 /* count allowed, started and running threads */
103 static int allowed = 0; /** allowed count of threads */
104 static int started = 0; /** started count of threads */
105 static int running = 0; /** running count of threads */
106 static int remains = 0; /** allowed count of waiting jobs */
107
108 /* list of threads */
109 static struct thread *threads;
110 static _Thread_local struct thread *current_thread;
111
112 /* queue of pending jobs */
113 static struct job *first_job;
114 static struct job *free_jobs;
115
116 /* event loop */
117 static struct evloop evloop;
118
119 /**
120  * Create a new job with the given parameters
121  * @param group    the group of the job
122  * @param timeout  the timeout of the job (0 if none)
123  * @param callback the function that achieves the job
124  * @param arg      the argument of the callback
125  * @return the created job unblock or NULL when no more memory
126  */
127 static struct job *job_create(
128                 const void *group,
129                 int timeout,
130                 job_cb_t callback,
131                 void *arg)
132 {
133         struct job *job;
134
135         /* try recyle existing job */
136         job = free_jobs;
137         if (job)
138                 free_jobs = job->next;
139         else {
140                 /* allocation without blocking */
141                 pthread_mutex_unlock(&mutex);
142                 job = malloc(sizeof *job);
143                 pthread_mutex_lock(&mutex);
144                 if (!job) {
145                         ERROR("out of memory");
146                         errno = ENOMEM;
147                         goto end;
148                 }
149         }
150         /* initialises the job */
151         job->group = group;
152         job->timeout = timeout;
153         job->callback = callback;
154         job->arg = arg;
155         job->blocked = 0;
156         job->dropped = 0;
157 end:
158         return job;
159 }
160
161 /**
162  * Adds 'job' at the end of the list of jobs, marking it
163  * as blocked if an other job with the same group is pending.
164  * @param job the job to add
165  */
166 static void job_add(struct job *job)
167 {
168         const void *group;
169         struct job *ijob, **pjob;
170
171         /* prepare to add */
172         group = job->group;
173         job->next = NULL;
174
175         /* search end and blockers */
176         pjob = &first_job;
177         ijob = first_job;
178         while (ijob) {
179                 if (group && ijob->group == group)
180                         job->blocked = 1;
181                 pjob = &ijob->next;
182                 ijob = ijob->next;
183         }
184
185         /* queue the jobs */
186         *pjob = job;
187         remains--;
188 }
189
190 /**
191  * Get the next job to process or NULL if none.
192  * @return the first job that isn't blocked or NULL
193  */
194 static inline struct job *job_get()
195 {
196         struct job *job = first_job;
197         while (job && job->blocked)
198                 job = job->next;
199         if (job)
200                 remains++;
201         return job;
202 }
203
204 /**
205  * Releases the processed 'job': removes it
206  * from the list of jobs and unblock the first
207  * pending job of the same group if any.
208  * @param job the job to release
209  */
210 static inline void job_release(struct job *job)
211 {
212         struct job *ijob, **pjob;
213         const void *group;
214
215         /* first unqueue the job */
216         pjob = &first_job;
217         ijob = first_job;
218         while (ijob != job) {
219                 pjob = &ijob->next;
220                 ijob = ijob->next;
221         }
222         *pjob = job->next;
223
224         /* then unblock jobs of the same group */
225         group = job->group;
226         if (group) {
227                 ijob = job->next;
228                 while (ijob && ijob->group != group)
229                         ijob = ijob->next;
230                 if (ijob)
231                         ijob->blocked = 0;
232         }
233
234         /* recycle the job */
235         job->next = free_jobs;
236         free_jobs = job;
237 }
238
239 /**
240  * Monitored cancel callback for a job.
241  * This function is called by the monitor
242  * to cancel the job when the safe environment
243  * is set.
244  * @param signum 0 on normal flow or the number
245  *               of the signal that interrupted the normal
246  *               flow, isn't used
247  * @param arg    the job to run
248  */
249 static void job_cancel(int signum, void *arg)
250 {
251         struct job *job = arg;
252         job->callback(SIGABRT, job->arg);
253 }
254
255 /**
256  * Monitored normal callback for events.
257  * This function is called by the monitor
258  * to run the event loop when the safe environment
259  * is set.
260  * @param signum 0 on normal flow or the number
261  *               of the signal that interrupted the normal
262  *               flow
263  * @param arg     the events to run
264  */
265 static void evloop_run(int signum, void *arg)
266 {
267         int rc;
268         struct sd_event *se;
269
270         if (!signum) {
271                 se = evloop.sdev;
272                 rc = sd_event_prepare(se);
273                 if (rc < 0) {
274                         errno = -rc;
275                         CRITICAL("sd_event_prepare returned an error (state: %d): %m", sd_event_get_state(se));
276                         abort();
277                 } else {
278                         if (rc == 0) {
279                                 rc = sd_event_wait(se, (uint64_t)(int64_t)-1);
280                                 if (rc < 0) {
281                                         errno = -rc;
282                                         ERROR("sd_event_wait returned an error (state: %d): %m", sd_event_get_state(se));
283                                 }
284                         }
285                         evloop.state = EVLOOP_STATE_RUN;
286                         if (rc > 0) {
287                                 rc = sd_event_dispatch(se);
288                                 if (rc < 0) {
289                                         errno = -rc;
290                                         ERROR("sd_event_dispatch returned an error (state: %d): %m", sd_event_get_state(se));
291                                 }
292                         }
293                 }
294         }
295 }
296
297 /**
298  * Internal callback for evloop management.
299  * The effect of this function is hidden: it exits
300  * the waiting poll if any.
301  */
302 static void evloop_on_efd_event()
303 {
304         uint64_t x;
305         read(evloop.efd, &x, sizeof x);
306 }
307
308 /**
309  * wakeup the event loop if needed by sending
310  * an event.
311  */
312 static void evloop_wakeup()
313 {
314         uint64_t x;
315
316         if (evloop.state & EVLOOP_STATE_WAIT) {
317                 x = 1;
318                 write(evloop.efd, &x, sizeof x);
319         }
320 }
321
322 /**
323  * Release the currently held event loop
324  */
325 static void evloop_release()
326 {
327         struct thread *nh, *ct = current_thread;
328
329         if (ct && evloop.holder == ct) {
330                 nh = ct->nholder;
331                 evloop.holder = nh;
332                 if (nh)
333                         pthread_cond_signal(nh->cwhold);
334         }
335 }
336
337 /**
338  * get the eventloop for the current thread
339  */
340 static int evloop_get()
341 {
342         struct thread *ct = current_thread;
343
344         if (evloop.holder)
345                 return evloop.holder == ct;
346
347         if (!evloop.sdev)
348                 return 0;
349
350         ct->nholder = NULL;
351         evloop.holder = ct;
352         return 1;
353 }
354
355 /**
356  * acquire the eventloop for the current thread
357  */
358 static void evloop_acquire()
359 {
360         struct thread **pwait, *ct;
361         pthread_cond_t cond;
362
363         /* try to get the evloop */
364         if (!evloop_get()) {
365                 /* failed, init waiting state */
366                 ct = current_thread;
367                 ct->nholder = NULL;
368                 ct->cwhold = &cond;
369                 pthread_cond_init(&cond, NULL);
370
371                 /* queue current thread in holder list */
372                 pwait = &evloop.holder;
373                 while (*pwait)
374                         pwait = &(*pwait)->nholder;
375                 *pwait = ct;
376
377                 /* wake up the evloop */
378                 evloop_wakeup();
379
380                 /* wait to acquire the evloop */
381                 pthread_cond_wait(&cond, &mutex);
382                 pthread_cond_destroy(&cond);
383         }
384 }
385
386 /**
387  * Enter the thread
388  * @param me the description of the thread to enter
389  */
390 static void thread_enter(volatile struct thread *me)
391 {
392         evloop_release();
393         /* initialize description of itself and link it in the list */
394         me->tid = pthread_self();
395         me->stop = 0;
396         me->waits = 0;
397         me->upper = current_thread;
398         me->next = threads;
399         threads = (struct thread*)me;
400         current_thread = (struct thread*)me;
401 }
402
403 /**
404  * leave the thread
405  * @param me the description of the thread to leave
406  */
407 static void thread_leave()
408 {
409         struct thread **prv, *me;
410
411         /* unlink the current thread and cleanup */
412         me = current_thread;
413         prv = &threads;
414         while (*prv != me)
415                 prv = &(*prv)->next;
416         *prv = me->next;
417
418         current_thread = me->upper;
419 }
420
421 /**
422  * Main processing loop of internal threads with processing jobs.
423  * The loop must be called with the mutex locked
424  * and it returns with the mutex locked.
425  * @param me the description of the thread to use
426  * TODO: how are timeout handled when reentering?
427  */
428 static void thread_run_internal(volatile struct thread *me)
429 {
430         struct job *job;
431
432         /* enter thread */
433         thread_enter(me);
434
435         /* loop until stopped */
436         while (!me->stop) {
437                 /* release the current event loop */
438                 evloop_release();
439
440                 /* get a job */
441                 job = job_get();
442                 if (job) {
443                         /* prepare running the job */
444                         job->blocked = 1; /* mark job as blocked */
445                         me->job = job; /* record the job (only for terminate) */
446
447                         /* run the job */
448                         pthread_mutex_unlock(&mutex);
449                         sig_monitor(job->timeout, job->callback, job->arg);
450                         pthread_mutex_lock(&mutex);
451
452                         /* release the run job */
453                         job_release(job);
454                 /* no job, check event loop wait */
455                 } else if (evloop_get()) {
456                         if (evloop.state != 0) {
457                                 /* busy ? */
458                                 CRITICAL("Can't enter dispatch while in dispatch!");
459                                 abort();
460                         }
461                         /* run the events */
462                         evloop.state = EVLOOP_STATE_RUN|EVLOOP_STATE_WAIT;
463                         pthread_mutex_unlock(&mutex);
464                         sig_monitor(0, evloop_run, NULL);
465                         pthread_mutex_lock(&mutex);
466                         evloop.state = 0;
467                 } else {
468                         /* no job and no event loop */
469                         running--;
470                         if (!running)
471                                 ERROR("Entering job deep sleep! Check your bindings.");
472                         me->waits = 1;
473                         pthread_cond_wait(&cond, &mutex);
474                         me->waits = 0;
475                         running++;
476                 }
477         }
478         /* cleanup */
479         evloop_release();
480         thread_leave();
481 }
482
483 /**
484  * Main processing loop of external threads.
485  * The loop must be called with the mutex locked
486  * and it returns with the mutex locked.
487  * @param me the description of the thread to use
488  */
489 static void thread_run_external(volatile struct thread *me)
490 {
491         /* enter thread */
492         thread_enter(me);
493
494         /* loop until stopped */
495         me->waits = 1;
496         while (!me->stop)
497                 pthread_cond_wait(&cond, &mutex);
498         me->waits = 0;
499         thread_leave();
500 }
501
502 /**
503  * Root for created threads.
504  */
505 static void thread_main()
506 {
507         struct thread me;
508
509         running++;
510         started++;
511         sig_monitor_init_timeouts();
512         thread_run_internal(&me);
513         sig_monitor_clean_timeouts();
514         started--;
515         running--;
516 }
517
518 /**
519  * Entry point for created threads.
520  * @param data not used
521  * @return NULL
522  */
523 static void *thread_starter(void *data)
524 {
525         pthread_mutex_lock(&mutex);
526         thread_main();
527         pthread_mutex_unlock(&mutex);
528         return NULL;
529 }
530
531 /**
532  * Starts a new thread
533  * @return 0 in case of success or -1 in case of error
534  */
535 static int start_one_thread()
536 {
537         pthread_t tid;
538         int rc;
539
540         rc = pthread_create(&tid, NULL, thread_starter, NULL);
541         if (rc != 0) {
542                 /* errno = rc; */
543                 WARNING("not able to start thread: %m");
544                 rc = -1;
545         }
546         return rc;
547 }
548
549 /**
550  * Queues a new asynchronous job represented by 'callback' and 'arg'
551  * for the 'group' and the 'timeout'.
552  * Jobs are queued FIFO and are possibly executed in parallel
553  * concurrently except for job of the same group that are
554  * executed sequentially in FIFO order.
555  * @param group    The group of the job or NULL when no group.
556  * @param timeout  The maximum execution time in seconds of the job
557  *                 or 0 for unlimited time.
558  * @param callback The function to execute for achieving the job.
559  *                 Its first parameter is either 0 on normal flow
560  *                 or the signal number that broke the normal flow.
561  *                 The remaining parameter is the parameter 'arg1'
562  *                 given here.
563  * @param arg      The second argument for 'callback'
564  * @return 0 in case of success or -1 in case of error
565  */
566 int jobs_queue(
567                 const void *group,
568                 int timeout,
569                 void (*callback)(int, void*),
570                 void *arg)
571 {
572         struct job *job;
573         int rc;
574
575         pthread_mutex_lock(&mutex);
576
577         /* allocates the job */
578         job = job_create(group, timeout, callback, arg);
579         if (!job)
580                 goto error;
581
582         /* check availability */
583         if (remains <= 0) {
584                 ERROR("can't process job with threads: too many jobs");
585                 errno = EBUSY;
586                 goto error2;
587         }
588
589         /* start a thread if needed */
590         if (running == started && started < allowed) {
591                 /* all threads are busy and a new can be started */
592                 rc = start_one_thread();
593                 if (rc < 0 && started == 0) {
594                         ERROR("can't start initial thread: %m");
595                         goto error2;
596                 }
597         }
598
599         /* queues the job */
600         job_add(job);
601
602         /* signal an existing job */
603         pthread_cond_signal(&cond);
604         pthread_mutex_unlock(&mutex);
605         return 0;
606
607 error2:
608         job->next = free_jobs;
609         free_jobs = job;
610 error:
611         pthread_mutex_unlock(&mutex);
612         return -1;
613 }
614
615 /**
616  * Internal helper function for 'jobs_enter'.
617  * @see jobs_enter, jobs_leave
618  */
619 static void enter_cb(int signum, void *closure)
620 {
621         struct sync *sync = closure;
622         sync->enter(signum, sync->arg, (void*)&sync->thread);
623 }
624
625 /**
626  * Internal helper function for 'jobs_call'.
627  * @see jobs_call
628  */
629 static void call_cb(int signum, void *closure)
630 {
631         struct sync *sync = closure;
632         sync->callback(signum, sync->arg);
633         jobs_leave((void*)&sync->thread);
634 }
635
636 /**
637  * Internal helper for synchronous jobs. It enters
638  * a new thread loop for evaluating the given job
639  * as recorded by the couple 'sync_cb' and 'sync'.
640  * @see jobs_call, jobs_enter, jobs_leave
641  */
642 static int do_sync(
643                 const void *group,
644                 int timeout,
645                 void (*sync_cb)(int signum, void *closure),
646                 struct sync *sync
647 )
648 {
649         struct job *job;
650
651         pthread_mutex_lock(&mutex);
652
653         /* allocates the job */
654         job = job_create(group, timeout, sync_cb, sync);
655         if (!job) {
656                 pthread_mutex_unlock(&mutex);
657                 return -1;
658         }
659
660         /* queues the job */
661         job_add(job);
662
663         /* run until stopped */
664         if (current_thread)
665                 thread_run_internal(&sync->thread);
666         else
667                 thread_run_external(&sync->thread);
668         pthread_mutex_unlock(&mutex);
669         return 0;
670 }
671
672 /**
673  * Enter a synchronisation point: activates the job given by 'callback'
674  * and 'closure' using 'group' and 'timeout' to control sequencing and
675  * execution time.
676  * @param group the group for sequencing jobs
677  * @param timeout the time in seconds allocated to the job
678  * @param callback the callback that will handle the job.
679  *                 it receives 3 parameters: 'signum' that will be 0
680  *                 on normal flow or the catched signal number in case
681  *                 of interrupted flow, the context 'closure' as given and
682  *                 a 'jobloop' reference that must be used when the job is
683  *                 terminated to unlock the current execution flow.
684  * @param closure the argument to the callback
685  * @return 0 on success or -1 in case of error
686  */
687 int jobs_enter(
688                 const void *group,
689                 int timeout,
690                 void (*callback)(int signum, void *closure, struct jobloop *jobloop),
691                 void *closure
692 )
693 {
694         struct sync sync;
695
696         sync.enter = callback;
697         sync.arg = closure;
698         return do_sync(group, timeout, enter_cb, &sync);
699 }
700
701 /**
702  * Unlocks the execution flow designed by 'jobloop'.
703  * @param jobloop indication of the flow to unlock
704  * @return 0 in case of success of -1 on error
705  */
706 int jobs_leave(struct jobloop *jobloop)
707 {
708         struct thread *t;
709
710         pthread_mutex_lock(&mutex);
711         t = threads;
712         while (t && t != (struct thread*)jobloop)
713                 t = t->next;
714         if (!t) {
715                 errno = EINVAL;
716         } else {
717                 t->stop = 1;
718                 if (t->waits)
719                         pthread_cond_broadcast(&cond);
720                 else
721                         evloop_wakeup();
722         }
723         pthread_mutex_unlock(&mutex);
724         return -!t;
725 }
726
727 /**
728  * Calls synchronously the job represented by 'callback' and 'arg1'
729  * for the 'group' and the 'timeout' and waits for its completion.
730  * @param group    The group of the job or NULL when no group.
731  * @param timeout  The maximum execution time in seconds of the job
732  *                 or 0 for unlimited time.
733  * @param callback The function to execute for achieving the job.
734  *                 Its first parameter is either 0 on normal flow
735  *                 or the signal number that broke the normal flow.
736  *                 The remaining parameter is the parameter 'arg1'
737  *                 given here.
738  * @param arg      The second argument for 'callback'
739  * @return 0 in case of success or -1 in case of error
740  */
741 int jobs_call(
742                 const void *group,
743                 int timeout,
744                 void (*callback)(int, void*),
745                 void *arg)
746 {
747         struct sync sync;
748
749         sync.callback = callback;
750         sync.arg = arg;
751
752         return do_sync(group, timeout, call_cb, &sync);
753 }
754
755 /**
756  * Internal callback for evloop management.
757  * The effect of this function is hidden: it exits
758  * the waiting poll if any. Then it wakes up a thread
759  * awaiting the evloop using signal.
760  */
761 static int on_evloop_efd(sd_event_source *s, int fd, uint32_t revents, void *userdata)
762 {
763         evloop_on_efd_event();
764         return 1;
765 }
766
767 /**
768  * Gets a sd_event item for the current thread.
769  * @return a sd_event or NULL in case of error
770  */
771 static struct sd_event *get_sd_event_locked()
772 {
773         int rc;
774
775         /* creates the evloop on need */
776         if (!evloop.sdev) {
777                 /* start the creation */
778                 evloop.state = 0;
779                 /* creates the eventfd for waking up polls */
780                 evloop.efd = eventfd(0, EFD_CLOEXEC|EFD_SEMAPHORE);
781                 if (evloop.efd < 0) {
782                         ERROR("can't make eventfd for events");
783                         goto error1;
784                 }
785                 /* create the systemd event loop */
786                 rc = sd_event_new(&evloop.sdev);
787                 if (rc < 0) {
788                         ERROR("can't make new event loop");
789                         goto error2;
790                 }
791                 /* put the eventfd in the event loop */
792                 rc = sd_event_add_io(evloop.sdev, NULL, evloop.efd, EPOLLIN, on_evloop_efd, NULL);
793                 if (rc < 0) {
794                         ERROR("can't register eventfd");
795                         sd_event_unref(evloop.sdev);
796                         evloop.sdev = NULL;
797 error2:
798                         close(evloop.efd);
799 error1:
800                         return NULL;
801                 }
802         }
803
804         /* acquire the event loop */
805         evloop_acquire();
806
807         return evloop.sdev;
808 }
809
810 /**
811  * Gets a sd_event item for the current thread.
812  * @return a sd_event or NULL in case of error
813  */
814 struct sd_event *jobs_get_sd_event()
815 {
816         struct sd_event *result;
817         struct thread lt;
818
819         /* ensure an existing thread environment */
820         if (!current_thread) {
821                 memset(&lt, 0, sizeof lt);
822                 current_thread = &lt;
823         }
824
825         /* process */
826         pthread_mutex_lock(&mutex);
827         result = get_sd_event_locked();
828         pthread_mutex_unlock(&mutex);
829
830         /* release the faked thread environment if needed */
831         if (current_thread == &lt) {
832                 /*
833                  * Releasing it is needed because there is no way to guess
834                  * when it has to be released really. But here is where it is
835                  * hazardous: if the caller modifies the eventloop when it
836                  * is waiting, there is no way to make the change effective.
837                  * A workaround to achieve that goal is for the caller to
838                  * require the event loop a second time after having modified it.
839                  */
840                 NOTICE("Requiring sd_event loop out of binder callbacks is hazardous!");
841                 if (verbose_wants(Log_Level_Info))
842                         sig_monitor_dumpstack();
843                 evloop_release();
844                 current_thread = NULL;
845         }
846
847         return result;
848 }
849
850 /**
851  * Enter the jobs processing loop.
852  * @param allowed_count Maximum count of thread for jobs including this one
853  * @param start_count   Count of thread to start now, must be lower.
854  * @param waiter_count  Maximum count of jobs that can be waiting.
855  * @param start         The start routine to activate (can't be NULL)
856  * @return 0 in case of success or -1 in case of error.
857  */
858 int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)(int signum, void* arg), void *arg)
859 {
860         int rc, launched;
861         struct job *job;
862
863         assert(allowed_count >= 1);
864         assert(start_count >= 0);
865         assert(waiter_count > 0);
866         assert(start_count <= allowed_count);
867
868         rc = -1;
869         pthread_mutex_lock(&mutex);
870
871         /* check whether already running */
872         if (current_thread || allowed) {
873                 ERROR("thread already started");
874                 errno = EINVAL;
875                 goto error;
876         }
877
878         /* records the allowed count */
879         allowed = allowed_count;
880         started = 0;
881         running = 0;
882         remains = waiter_count;
883
884         /* start at least one thread: the current one */
885         launched = 1;
886         while (launched < start_count) {
887                 if (start_one_thread() != 0) {
888                         ERROR("Not all threads can be started");
889                         goto error;
890                 }
891                 launched++;
892         }
893
894         /* queue the start job */
895         job = job_create(NULL, 0, start, arg);
896         if (!job)
897                 goto error;
898         job_add(job);
899
900         /* run until end */
901         thread_main();
902         rc = 0;
903 error:
904         pthread_mutex_unlock(&mutex);
905         return rc;
906 }
907
908 /**
909  * Terminate all the threads and cancel all pending jobs.
910  */
911 void jobs_terminate()
912 {
913         struct job *job, *head, *tail;
914         pthread_t me, *others;
915         struct thread *t;
916         int count;
917
918         /* how am i? */
919         me = pthread_self();
920
921         /* request all threads to stop */
922         pthread_mutex_lock(&mutex);
923         allowed = 0;
924
925         /* count the number of threads */
926         count = 0;
927         t = threads;
928         while (t) {
929                 if (!t->upper && !pthread_equal(t->tid, me))
930                         count++;
931                 t = t->next;
932         }
933
934         /* fill the array of threads */
935         others = alloca(count * sizeof *others);
936         count = 0;
937         t = threads;
938         while (t) {
939                 if (!t->upper && !pthread_equal(t->tid, me))
940                         others[count++] = t->tid;
941                 t = t->next;
942         }
943
944         /* stops the threads */
945         t = threads;
946         while (t) {
947                 t->stop = 1;
948                 t = t->next;
949         }
950
951         /* wait the threads */
952         pthread_cond_broadcast(&cond);
953         pthread_mutex_unlock(&mutex);
954         while (count)
955                 pthread_join(others[--count], NULL);
956         pthread_mutex_lock(&mutex);
957
958         /* cancel pending jobs of other threads */
959         remains = 0;
960         head = first_job;
961         first_job = NULL;
962         tail = NULL;
963         while (head) {
964                 /* unlink the job */
965                 job = head;
966                 head = job->next;
967
968                 /* search if job is stacked for current */
969                 t = current_thread;
970                 while (t && t->job != job)
971                         t = t->upper;
972                 if (t) {
973                         /* yes, relink it at end */
974                         if (tail)
975                                 tail->next = job;
976                         else
977                                 first_job = job;
978                         tail = job;
979                         job->next = NULL;
980                 } else {
981                         /* no cancel the job */
982                         pthread_mutex_unlock(&mutex);
983                         sig_monitor(0, job_cancel, job);
984                         free(job);
985                         pthread_mutex_lock(&mutex);
986                 }
987         }
988         pthread_mutex_unlock(&mutex);
989 }
990