jobs: Fix minor errors
[src/app-framework-binder.git] / src / jobs.c
1 /*
2  * Copyright (C) 2016, 2017, 2018 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #if defined(NO_JOBS_WATCHDOG)
21 #   define HAS_WATCHDOG 0
22 #else
23 #   define HAS_WATCHDOG 1
24 #endif
25
26 #include <stdlib.h>
27 #include <stdint.h>
28 #include <unistd.h>
29 #include <signal.h>
30 #include <string.h>
31 #include <time.h>
32 #include <sys/syscall.h>
33 #include <pthread.h>
34 #include <errno.h>
35 #include <assert.h>
36 #include <sys/eventfd.h>
37
38 #include <systemd/sd-event.h>
39 #include "fdev.h"
40 #if HAS_WATCHDOG
41 #include <systemd/sd-daemon.h>
42 #endif
43
44 #include "jobs.h"
45 #include "fdev-epoll.h"
46 #include "sig-monitor.h"
47 #include "verbose.h"
48
49 #if 0
50 #define _alert_ "do you really want to remove signal monitoring?"
51 #define sig_monitor_init_timeouts()  ((void)0)
52 #define sig_monitor_clean_timeouts() ((void)0)
53 #define sig_monitor(to,cb,arg)       (cb(0,arg))
54 #endif
55
56 #define EVENT_TIMEOUT_TOP       ((uint64_t)-1)
57 #define EVENT_TIMEOUT_CHILD     ((uint64_t)10000)
58
59 /** Internal shortcut for callback */
60 typedef void (*job_cb_t)(int, void*);
61
62 /** Description of a pending job */
63 struct job
64 {
65         struct job *next;    /**< link to the next job enqueued */
66         const void *group;   /**< group of the request */
67         job_cb_t callback;   /**< processing callback */
68         void *arg;           /**< argument */
69         int timeout;         /**< timeout in second for processing the request */
70         unsigned blocked: 1; /**< is an other request blocking this one ? */
71         unsigned dropped: 1; /**< is removed ? */
72 };
73
74 /** Description of handled event loops */
75 struct evloop
76 {
77         unsigned state;        /**< encoded state */
78         int efd;               /**< event notification */
79         struct sd_event *sdev; /**< the systemd event loop */
80         pthread_cond_t  cond;  /**< condition */
81         struct fdev *fdev;     /**< handling of events */
82 };
83
84 #define EVLOOP_STATE_WAIT           1U
85 #define EVLOOP_STATE_RUN            2U
86 #define EVLOOP_STATE_LOCK           4U
87
88 /** Description of threads */
89 struct thread
90 {
91         struct thread *next;   /**< next thread of the list */
92         struct thread *upper;  /**< upper same thread */
93         struct job *job;       /**< currently processed job */
94         pthread_t tid;         /**< the thread id */
95         volatile unsigned stop: 1;      /**< stop requested */
96         volatile unsigned waits: 1;     /**< is waiting? */
97 };
98
99 /**
100  * Description of synchonous callback
101  */
102 struct sync
103 {
104         struct thread thread;   /**< thread loop data */
105         union {
106                 void (*callback)(int, void*);   /**< the synchronous callback */
107                 void (*enter)(int signum, void *closure, struct jobloop *jobloop);
108                                 /**< the entering synchronous routine */
109         };
110         void *arg;              /**< the argument of the callback */
111 };
112
113
114 /* synchronisation of threads */
115 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
116 static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
117
118 /* count allowed, started and running threads */
119 static int allowed = 0; /** allowed count of threads */
120 static int started = 0; /** started count of threads */
121 static int running = 0; /** running count of threads */
122 static int remains = 0; /** allowed count of waiting jobs */
123
124 /* list of threads */
125 static struct thread *threads;
126 static _Thread_local struct thread *current_thread;
127 static _Thread_local struct evloop *current_evloop;
128
129 /* queue of pending jobs */
130 static struct job *first_job;
131 static struct job *free_jobs;
132
133 /* event loop */
134 static struct evloop evloop[1];
135 static struct fdev_epoll *fdevepoll;
136 static int waitevt;
137
138 /**
139  * Create a new job with the given parameters
140  * @param group    the group of the job
141  * @param timeout  the timeout of the job (0 if none)
142  * @param callback the function that achieves the job
143  * @param arg      the argument of the callback
144  * @return the created job unblock or NULL when no more memory
145  */
146 static struct job *job_create(
147                 const void *group,
148                 int timeout,
149                 job_cb_t callback,
150                 void *arg)
151 {
152         struct job *job;
153
154         /* try recyle existing job */
155         job = free_jobs;
156         if (job)
157                 free_jobs = job->next;
158         else {
159                 /* allocation without blocking */
160                 pthread_mutex_unlock(&mutex);
161                 job = malloc(sizeof *job);
162                 pthread_mutex_lock(&mutex);
163                 if (!job) {
164                         errno = -ENOMEM;
165                         goto end;
166                 }
167         }
168         /* initialises the job */
169         job->group = group;
170         job->timeout = timeout;
171         job->callback = callback;
172         job->arg = arg;
173         job->blocked = 0;
174         job->dropped = 0;
175 end:
176         return job;
177 }
178
179 /**
180  * Adds 'job' at the end of the list of jobs, marking it
181  * as blocked if an other job with the same group is pending.
182  * @param job the job to add
183  */
184 static void job_add(struct job *job)
185 {
186         const void *group;
187         struct job *ijob, **pjob;
188
189         /* prepare to add */
190         group = job->group;
191         job->next = NULL;
192
193         /* search end and blockers */
194         pjob = &first_job;
195         ijob = first_job;
196         while (ijob) {
197                 if (group && ijob->group == group)
198                         job->blocked = 1;
199                 pjob = &ijob->next;
200                 ijob = ijob->next;
201         }
202
203         /* queue the jobs */
204         *pjob = job;
205 }
206
207 /**
208  * Get the next job to process or NULL if none.
209  * @return the first job that isn't blocked or NULL
210  */
211 static inline struct job *job_get()
212 {
213         struct job *job = first_job;
214         while (job && job->blocked)
215                 job = job->next;
216         return job;
217 }
218
219 /**
220  * Releases the processed 'job': removes it
221  * from the list of jobs and unblock the first
222  * pending job of the same group if any.
223  * @param job the job to release
224  */
225 static inline void job_release(struct job *job)
226 {
227         struct job *ijob, **pjob;
228         const void *group;
229
230         /* first unqueue the job */
231         pjob = &first_job;
232         ijob = first_job;
233         while (ijob != job) {
234                 pjob = &ijob->next;
235                 ijob = ijob->next;
236         }
237         *pjob = job->next;
238
239         /* then unblock jobs of the same group */
240         group = job->group;
241         if (group) {
242                 ijob = job->next;
243                 while (ijob && ijob->group != group)
244                         ijob = ijob->next;
245                 if (ijob)
246                         ijob->blocked = 0;
247         }
248
249         /* recycle the job */
250         job->next = free_jobs;
251         free_jobs = job;
252 }
253
254 /**
255  * Monitored cancel callback for a job.
256  * This function is called by the monitor
257  * to cancel the job when the safe environment
258  * is set.
259  * @param signum 0 on normal flow or the number
260  *               of the signal that interrupted the normal
261  *               flow, isn't used
262  * @param arg    the job to run
263  */
264 static void job_cancel(int signum, void *arg)
265 {
266         struct job *job = arg;
267         job->callback(SIGABRT, job->arg);
268 }
269
270 /**
271  * Gets a fdev_epoll item.
272  * @return a fdev_epoll or NULL in case of error
273  */
274 static struct fdev_epoll *get_fdevepoll()
275 {
276         struct fdev_epoll *result;
277
278         result = fdevepoll;
279         if (!result)
280                 result = fdevepoll = fdev_epoll_create();
281
282         return result;
283 }
284
285 /**
286  * Monitored normal callback for events.
287  * This function is called by the monitor
288  * to run the event loop when the safe environment
289  * is set.
290  * @param signum 0 on normal flow or the number
291  *               of the signal that interrupted the normal
292  *               flow
293  * @param arg     the events to run
294  */
295 static void evloop_run(int signum, void *arg)
296 {
297         int rc;
298         struct sd_event *se;
299         struct evloop *el = arg;
300
301         if (!signum) {
302                 current_evloop = el;
303                 __atomic_store_n(&el->state, EVLOOP_STATE_LOCK|EVLOOP_STATE_RUN|EVLOOP_STATE_WAIT, __ATOMIC_RELAXED);
304                 se = el->sdev;
305                 rc = sd_event_prepare(se);
306                 if (rc < 0) {
307                         errno = -rc;
308                         ERROR("sd_event_prepare returned an error (state: %d): %m", sd_event_get_state(se));
309                 } else {
310                         if (rc == 0) {
311                                 rc = sd_event_wait(se, (uint64_t)(int64_t)-1);
312                                 if (rc < 0) {
313                                         errno = -rc;
314                                         ERROR("sd_event_wait returned an error (state: %d): %m", sd_event_get_state(se));
315                                 }
316                         }
317                         __atomic_and_fetch(&el->state, ~(EVLOOP_STATE_WAIT), __ATOMIC_RELAXED);
318
319                         if (rc > 0) {
320                                 rc = sd_event_dispatch(se);
321                                 if (rc < 0) {
322                                         errno = -rc;
323                                         ERROR("sd_event_dispatch returned an error (state: %d): %m", sd_event_get_state(se));
324                                 }
325                         }
326                 }
327         }
328         __atomic_and_fetch(&el->state, ~(EVLOOP_STATE_WAIT|EVLOOP_STATE_RUN), __ATOMIC_RELAXED);
329 }
330
331
332 /**
333  * Monitored normal loop for waiting events.
334  * @param signum 0 on normal flow or the number
335  *               of the signal that interrupted the normal
336  *               flow
337  * @param arg     the events to run
338  */
339 static void monitored_wait_and_dispatch(int signum, void *arg)
340 {
341         struct fdev_epoll *fdev_epoll = arg;
342         if (!signum) {
343                 fdev_epoll_wait_and_dispatch(fdev_epoll, -1);
344         }
345 }
346
347 /**
348  * Main processing loop of threads processing jobs.
349  * The loop must be called with the mutex locked
350  * and it returns with the mutex locked.
351  * @param me the description of the thread to use
352  * TODO: how are timeout handled when reentering?
353  */
354 static void thread_run(volatile struct thread *me)
355 {
356         struct thread **prv;
357         struct job *job;
358
359         /* initialize description of itself and link it in the list */
360         me->tid = pthread_self();
361         me->stop = 0;
362         me->waits = 0;
363         me->upper = current_thread;
364         if (!current_thread) {
365                 started++;
366                 sig_monitor_init_timeouts();
367         }
368         me->next = threads;
369         threads = (struct thread*)me;
370         current_thread = (struct thread*)me;
371
372         /* loop until stopped */
373         while (!me->stop) {
374                 /* release the event loop */
375                 if (current_evloop) {
376                         __atomic_and_fetch(&current_evloop->state, ~EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
377                         current_evloop = NULL;
378                 }
379
380                 /* get a job */
381                 job = job_get();
382                 if (job) {
383                         /* prepare running the job */
384                         remains++; /* increases count of job that can wait */
385                         job->blocked = 1; /* mark job as blocked */
386                         me->job = job; /* record the job (only for terminate) */
387
388                         /* run the job */
389                         pthread_mutex_unlock(&mutex);
390                         sig_monitor(job->timeout, job->callback, job->arg);
391                         pthread_mutex_lock(&mutex);
392
393                         /* release the run job */
394                         job_release(job);
395                 } else if (waitevt) {
396                         /* no job and not events */
397                         running--;
398                         if (!running)
399                                 ERROR("Entering job deep sleep! Check your bindings.");
400                         me->waits = 1;
401                         pthread_cond_wait(&cond, &mutex);
402                         me->waits = 0;
403                         running++;
404                 } else {
405                         /* wait for events */
406                         waitevt = 1;
407                         pthread_mutex_unlock(&mutex);
408                         sig_monitor(0, monitored_wait_and_dispatch, get_fdevepoll());
409                         pthread_mutex_lock(&mutex);
410                         waitevt = 0;
411                 }
412         }
413
414         /* release the event loop */
415         if (current_evloop) {
416                 __atomic_and_fetch(&current_evloop->state, ~EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
417                 current_evloop = NULL;
418         }
419
420         /* unlink the current thread and cleanup */
421         prv = &threads;
422         while (*prv != me)
423                 prv = &(*prv)->next;
424         *prv = me->next;
425         current_thread = me->upper;
426         if (!current_thread) {
427                 sig_monitor_clean_timeouts();
428                 started--;
429         }
430 }
431
432 /**
433  * Entry point for created threads.
434  * @param data not used
435  * @return NULL
436  */
437 static void *thread_main(void *data)
438 {
439         struct thread me;
440
441         pthread_mutex_lock(&mutex);
442         running++;
443         thread_run(&me);
444         running--;
445         pthread_mutex_unlock(&mutex);
446         return NULL;
447 }
448
449 /**
450  * Starts a new thread
451  * @return 0 in case of success or -1 in case of error
452  */
453 static int start_one_thread()
454 {
455         pthread_t tid;
456         int rc;
457
458         rc = pthread_create(&tid, NULL, thread_main, NULL);
459         if (rc != 0) {
460                 /* errno = rc; */
461                 WARNING("not able to start thread: %m");
462                 rc = -1;
463         }
464         return rc;
465 }
466
467 /**
468  * Queues a new asynchronous job represented by 'callback' and 'arg'
469  * for the 'group' and the 'timeout'.
470  * Jobs are queued FIFO and are possibly executed in parallel
471  * concurrently except for job of the same group that are
472  * executed sequentially in FIFO order.
473  * @param group    The group of the job or NULL when no group.
474  * @param timeout  The maximum execution time in seconds of the job
475  *                 or 0 for unlimited time.
476  * @param callback The function to execute for achieving the job.
477  *                 Its first parameter is either 0 on normal flow
478  *                 or the signal number that broke the normal flow.
479  *                 The remaining parameter is the parameter 'arg1'
480  *                 given here.
481  * @param arg      The second argument for 'callback'
482  * @return 0 in case of success or -1 in case of error
483  */
484 int jobs_queue(
485                 const void *group,
486                 int timeout,
487                 void (*callback)(int, void*),
488                 void *arg)
489 {
490         const char *info;
491         struct job *job;
492         int rc;
493
494         pthread_mutex_lock(&mutex);
495
496         /* allocates the job */
497         job = job_create(group, timeout, callback, arg);
498         if (!job) {
499                 errno = ENOMEM;
500                 info = "out of memory";
501                 goto error;
502         }
503
504         /* check availability */
505         if (remains == 0) {
506                 errno = EBUSY;
507                 info = "too many jobs";
508                 goto error2;
509         }
510
511         /* start a thread if needed */
512         if (running == started && started < allowed) {
513                 /* all threads are busy and a new can be started */
514                 rc = start_one_thread();
515                 if (rc < 0 && started == 0) {
516                         info = "can't start first thread";
517                         goto error2;
518                 }
519         }
520
521         /* queues the job */
522         remains--;
523         job_add(job);
524
525         /* signal an existing job */
526         pthread_cond_signal(&cond);
527         pthread_mutex_unlock(&mutex);
528         return 0;
529
530 error2:
531         job->next = free_jobs;
532         free_jobs = job;
533 error:
534         ERROR("can't process job with threads: %s, %m", info);
535         pthread_mutex_unlock(&mutex);
536         return -1;
537 }
538
539 /**
540  * Internal helper function for 'jobs_enter'.
541  * @see jobs_enter, jobs_leave
542  */
543 static void enter_cb(int signum, void *closure)
544 {
545         struct sync *sync = closure;
546         sync->enter(signum, sync->arg, (void*)&sync->thread);
547 }
548
549 /**
550  * Internal helper function for 'jobs_call'.
551  * @see jobs_call
552  */
553 static void call_cb(int signum, void *closure)
554 {
555         struct sync *sync = closure;
556         sync->callback(signum, sync->arg);
557         jobs_leave((void*)&sync->thread);
558 }
559
560 /**
561  * Internal helper for synchronous jobs. It enters
562  * a new thread loop for evaluating the given job
563  * as recorded by the couple 'sync_cb' and 'sync'.
564  * @see jobs_call, jobs_enter, jobs_leave
565  */
566 static int do_sync(
567                 const void *group,
568                 int timeout,
569                 void (*sync_cb)(int signum, void *closure),
570                 struct sync *sync
571 )
572 {
573         struct job *job;
574
575         pthread_mutex_lock(&mutex);
576
577         /* allocates the job */
578         job = job_create(group, timeout, sync_cb, sync);
579         if (!job) {
580                 ERROR("out of memory");
581                 errno = ENOMEM;
582                 pthread_mutex_unlock(&mutex);
583                 return -1;
584         }
585
586         /* queues the job */
587         job_add(job);
588
589         /* run until stopped */
590         thread_run(&sync->thread);
591         pthread_mutex_unlock(&mutex);
592         return 0;
593 }
594
595 /**
596  * Enter a synchronisation point: activates the job given by 'callback'
597  * and 'closure' using 'group' and 'timeout' to control sequencing and
598  * execution time.
599  * @param group the group for sequencing jobs
600  * @param timeout the time in seconds allocated to the job
601  * @param callback the callback that will handle the job.
602  *                 it receives 3 parameters: 'signum' that will be 0
603  *                 on normal flow or the catched signal number in case
604  *                 of interrupted flow, the context 'closure' as given and
605  *                 a 'jobloop' reference that must be used when the job is
606  *                 terminated to unlock the current execution flow.
607  * @param closure the argument to the callback
608  * @return 0 on success or -1 in case of error
609  */
610 int jobs_enter(
611                 const void *group,
612                 int timeout,
613                 void (*callback)(int signum, void *closure, struct jobloop *jobloop),
614                 void *closure
615 )
616 {
617         struct sync sync;
618
619         sync.enter = callback;
620         sync.arg = closure;
621         return do_sync(group, timeout, enter_cb, &sync);
622 }
623
624 /**
625  * Unlocks the execution flow designed by 'jobloop'.
626  * @param jobloop indication of the flow to unlock
627  * @return 0 in case of success of -1 on error
628  */
629 int jobs_leave(struct jobloop *jobloop)
630 {
631         struct thread *t;
632
633         pthread_mutex_lock(&mutex);
634         t = threads;
635         while (t && t != (struct thread*)jobloop)
636                 t = t->next;
637         if (!t) {
638                 errno = EINVAL;
639         } else {
640                 t->stop = 1;
641                 if (t->waits)
642                         pthread_cond_broadcast(&cond);
643         }
644         pthread_mutex_unlock(&mutex);
645         return -!t;
646 }
647
648 /**
649  * Calls synchronously the job represented by 'callback' and 'arg1'
650  * for the 'group' and the 'timeout' and waits for its completion.
651  * @param group    The group of the job or NULL when no group.
652  * @param timeout  The maximum execution time in seconds of the job
653  *                 or 0 for unlimited time.
654  * @param callback The function to execute for achieving the job.
655  *                 Its first parameter is either 0 on normal flow
656  *                 or the signal number that broke the normal flow.
657  *                 The remaining parameter is the parameter 'arg1'
658  *                 given here.
659  * @param arg      The second argument for 'callback'
660  * @return 0 in case of success or -1 in case of error
661  */
662 int jobs_call(
663                 const void *group,
664                 int timeout,
665                 void (*callback)(int, void*),
666                 void *arg)
667 {
668         struct sync sync;
669
670         sync.callback = callback;
671         sync.arg = arg;
672
673         return do_sync(group, timeout, call_cb, &sync);
674 }
675
676 /**
677  * Internal callback for evloop management.
678  * The effect of this function is hidden: it exits
679  * the waiting poll if any. Then it wakes up a thread
680  * awaiting the evloop using signal.
681  */
682 static int on_evloop_efd(sd_event_source *s, int fd, uint32_t revents, void *userdata)
683 {
684         uint64_t x;
685         struct evloop *evloop = userdata;
686         read(evloop->efd, &x, sizeof x);
687         pthread_mutex_lock(&mutex);
688         pthread_cond_broadcast(&evloop->cond);  
689         pthread_mutex_unlock(&mutex);
690         return 1;
691 }
692
693 /* temporary hack */
694 static void evloop_callback(void *arg, uint32_t event, struct fdev *fdev)
695 {
696         sig_monitor(0, evloop_run, arg);
697 }
698
699 /**
700  * Gets a sd_event item for the current thread.
701  * @return a sd_event or NULL in case of error
702  */
703 static struct sd_event *get_sd_event_locked()
704 {
705         struct evloop *el;
706         uint64_t x;
707         int rc;
708
709         /* creates the evloop on need */
710         el = &evloop[0];
711         if (!el->sdev) {
712                 /* start the creation */
713                 el->state = 0;
714                 /* creates the eventfd for waking up polls */
715                 el->efd = eventfd(0, EFD_CLOEXEC);
716                 if (el->efd < 0) {
717                         ERROR("can't make eventfd for events");
718                         goto error1;
719                 }
720                 /* create the systemd event loop */
721                 rc = sd_event_new(&el->sdev);
722                 if (rc < 0) {
723                         ERROR("can't make new event loop");
724                         goto error2;
725                 }
726                 /* put the eventfd in the event loop */
727                 rc = sd_event_add_io(el->sdev, NULL, el->efd, EPOLLIN, on_evloop_efd, el);
728                 if (rc < 0) {
729                         ERROR("can't register eventfd");
730                         goto error3;
731                 }
732                 /* handle the event loop */
733                 el->fdev = fdev_epoll_add(get_fdevepoll(), sd_event_get_fd(el->sdev));
734                 if (!el->fdev) {
735                         ERROR("can't create fdev");
736 error3:
737                         sd_event_unref(el->sdev);
738 error2:
739                         close(el->efd);
740 error1:
741                         memset(el, 0, sizeof *el);
742                         return NULL;
743                 }
744                 fdev_set_autoclose(el->fdev, 0);
745                 fdev_set_events(el->fdev, EPOLLIN);
746                 fdev_set_callback(el->fdev, evloop_callback, el);
747         }
748
749         /* attach the event loop to the current thread */
750         if (current_evloop != el) {
751                 if (current_evloop)
752                         __atomic_and_fetch(&current_evloop->state, ~EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
753                 current_evloop = el;
754                 __atomic_or_fetch(&el->state, EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
755         }
756
757         /* wait for a modifiable event loop */
758         while (__atomic_load_n(&el->state, __ATOMIC_RELAXED) & EVLOOP_STATE_WAIT) {
759                 x = 1;
760                 write(el->efd, &x, sizeof x);
761                 pthread_cond_wait(&el->cond, &mutex);
762         }
763
764         return el->sdev;
765 }
766
767 /**
768  * Gets a sd_event item for the current thread.
769  * @return a sd_event or NULL in case of error
770  */
771 struct sd_event *jobs_get_sd_event()
772 {
773         struct sd_event *result;
774
775         pthread_mutex_lock(&mutex);
776         result = get_sd_event_locked();
777         pthread_mutex_unlock(&mutex);
778
779         return result;
780 }
781
782 /**
783  * Gets the fdev_epoll item.
784  * @return a fdev_epoll or NULL in case of error
785  */
786 struct fdev_epoll *jobs_get_fdev_epoll()
787 {
788         struct fdev_epoll *result;
789
790         pthread_mutex_lock(&mutex);
791         result = get_fdevepoll();
792         pthread_mutex_unlock(&mutex);
793
794         return result;
795 }
796
797 /**
798  * Enter the jobs processing loop.
799  * @param allowed_count Maximum count of thread for jobs including this one
800  * @param start_count   Count of thread to start now, must be lower.
801  * @param waiter_count  Maximum count of jobs that can be waiting.
802  * @param start         The start routine to activate (can't be NULL)
803  * @return 0 in case of success or -1 in case of error.
804  */
805 int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)(int signum, void* arg), void *arg)
806 {
807         int rc, launched;
808         struct thread me;
809         struct job *job;
810
811         assert(allowed_count >= 1);
812         assert(start_count >= 0);
813         assert(waiter_count > 0);
814         assert(start_count <= allowed_count);
815
816         rc = -1;
817         pthread_mutex_lock(&mutex);
818
819         /* check whether already running */
820         if (current_thread || allowed) {
821                 ERROR("thread already started");
822                 errno = EINVAL;
823                 goto error;
824         }
825
826         /* start */
827         if (sig_monitor_init() < 0) {
828                 ERROR("failed to initialise signal handlers");
829                 goto error;
830         }
831
832         /* records the allowed count */
833         allowed = allowed_count;
834         started = 0;
835         running = 0;
836         remains = waiter_count;
837
838 #if HAS_WATCHDOG
839         /* set the watchdog */
840         if (sd_watchdog_enabled(0, NULL))
841                 sd_event_set_watchdog(get_sd_event_locked(), 1);
842 #endif
843
844         /* start at least one thread */
845         launched = 0;
846         while ((launched + 1) < start_count) {
847                 if (start_one_thread() != 0) {
848                         ERROR("Not all threads can be started");
849                         goto error;
850                 }
851                 launched++;
852         }
853
854         /* queue the start job */
855         job = job_create(NULL, 0, start, arg);
856         if (!job) {
857                 ERROR("out of memory");
858                 errno = ENOMEM;
859                 goto error;
860         }
861         job_add(job);
862         remains--;
863
864         /* run until end */
865         thread_run(&me);
866         rc = 0;
867 error:
868         pthread_mutex_unlock(&mutex);
869         return rc;
870 }
871
872 /**
873  * Terminate all the threads and cancel all pending jobs.
874  */
875 void jobs_terminate()
876 {
877         struct job *job, *head, *tail;
878         pthread_t me, *others;
879         struct thread *t;
880         int count;
881
882         /* how am i? */
883         me = pthread_self();
884
885         /* request all threads to stop */
886         pthread_mutex_lock(&mutex);
887         allowed = 0;
888
889         /* count the number of threads */
890         count = 0;
891         t = threads;
892         while (t) {
893                 if (!t->upper && !pthread_equal(t->tid, me))
894                         count++;
895                 t = t->next;
896         }
897
898         /* fill the array of threads */
899         others = alloca(count * sizeof *others);
900         count = 0;
901         t = threads;
902         while (t) {
903                 if (!t->upper && !pthread_equal(t->tid, me))
904                         others[count++] = t->tid;
905                 t = t->next;
906         }
907
908         /* stops the threads */
909         t = threads;
910         while (t) {
911                 t->stop = 1;
912                 t = t->next;
913         }
914
915         /* wait the threads */
916         pthread_cond_broadcast(&cond);
917         pthread_mutex_unlock(&mutex);
918         while (count)
919                 pthread_join(others[--count], NULL);
920         pthread_mutex_lock(&mutex);
921
922         /* cancel pending jobs of other threads */
923         remains = 0;
924         head = first_job;
925         first_job = NULL;
926         tail = NULL;
927         while (head) {
928                 /* unlink the job */
929                 job = head;
930                 head = job->next;
931
932                 /* search if job is stacked for current */
933                 t = current_thread;
934                 while (t && t->job != job)
935                         t = t->upper;
936                 if (t) {
937                         /* yes, relink it at end */
938                         if (tail)
939                                 tail->next = job;
940                         else
941                                 first_job = job;
942                         tail = job;
943                         job->next = NULL;
944                 } else {
945                         /* no cancel the job */
946                         pthread_mutex_unlock(&mutex);
947                         sig_monitor(0, job_cancel, job);
948                         free(job);
949                         pthread_mutex_lock(&mutex);
950                 }
951         }
952         pthread_mutex_unlock(&mutex);
953 }
954