Merge changes from topic 'spec-2089'
[src/app-framework-binder.git] / src / jobs.c
1 /*
2  * Copyright (C) 2016, 2017, 2018 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #if defined(NO_JOBS_WATCHDOG)
21 #   define HAS_WATCHDOG 0
22 #else
23 #   define HAS_WATCHDOG 1
24 #endif
25
26 #include <stdlib.h>
27 #include <stdint.h>
28 #include <unistd.h>
29 #include <signal.h>
30 #include <string.h>
31 #include <time.h>
32 #include <sys/syscall.h>
33 #include <pthread.h>
34 #include <errno.h>
35 #include <assert.h>
36 #include <sys/eventfd.h>
37
38 #include <systemd/sd-event.h>
39 #include "fdev.h"
40 #if HAS_WATCHDOG
41 #include <systemd/sd-daemon.h>
42 #endif
43
44 #include "jobs.h"
45 #include "sig-monitor.h"
46 #include "verbose.h"
47
48 #define EVENT_TIMEOUT_TOP       ((uint64_t)-1)
49 #define EVENT_TIMEOUT_CHILD     ((uint64_t)10000)
50
51 struct thread;
52
53 /** Internal shortcut for callback */
54 typedef void (*job_cb_t)(int, void*);
55
56 /** Description of a pending job */
57 struct job
58 {
59         struct job *next;    /**< link to the next job enqueued */
60         const void *group;   /**< group of the request */
61         job_cb_t callback;   /**< processing callback */
62         void *arg;           /**< argument */
63         int timeout;         /**< timeout in second for processing the request */
64         unsigned blocked: 1; /**< is an other request blocking this one ? */
65         unsigned dropped: 1; /**< is removed ? */
66 };
67
68 /** Description of handled event loops */
69 struct evloop
70 {
71         unsigned state;        /**< encoded state */
72         int efd;               /**< event notification */
73         struct sd_event *sdev; /**< the systemd event loop */
74         struct fdev *fdev;     /**< handling of events */
75         struct thread *holder; /**< holder of the evloop */
76 };
77
78 #define EVLOOP_STATE_WAIT           1U
79 #define EVLOOP_STATE_RUN            2U
80
81 /** Description of threads */
82 struct thread
83 {
84         struct thread *next;   /**< next thread of the list */
85         struct thread *upper;  /**< upper same thread */
86         struct thread *nholder;/**< next holder for evloop */
87         pthread_cond_t *cwhold;/**< condition wait for holding */
88         struct job *job;       /**< currently processed job */
89         pthread_t tid;         /**< the thread id */
90         volatile unsigned stop: 1;      /**< stop requested */
91         volatile unsigned waits: 1;     /**< is waiting? */
92 };
93
94 /**
95  * Description of synchronous callback
96  */
97 struct sync
98 {
99         struct thread thread;   /**< thread loop data */
100         union {
101                 void (*callback)(int, void*);   /**< the synchronous callback */
102                 void (*enter)(int signum, void *closure, struct jobloop *jobloop);
103                                 /**< the entering synchronous routine */
104         };
105         void *arg;              /**< the argument of the callback */
106 };
107
108
109 /* synchronisation of threads */
110 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
111 static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
112
113 /* count allowed, started and running threads */
114 static int allowed = 0; /** allowed count of threads */
115 static int started = 0; /** started count of threads */
116 static int running = 0; /** running count of threads */
117 static int remains = 0; /** allowed count of waiting jobs */
118
119 /* list of threads */
120 static struct thread *threads;
121 static _Thread_local struct thread *current_thread;
122
123 /* queue of pending jobs */
124 static struct job *first_job;
125 static struct job *free_jobs;
126
127 /* event loop */
128 static struct evloop evloop;
129
130 /**
131  * Create a new job with the given parameters
132  * @param group    the group of the job
133  * @param timeout  the timeout of the job (0 if none)
134  * @param callback the function that achieves the job
135  * @param arg      the argument of the callback
136  * @return the created job unblock or NULL when no more memory
137  */
138 static struct job *job_create(
139                 const void *group,
140                 int timeout,
141                 job_cb_t callback,
142                 void *arg)
143 {
144         struct job *job;
145
146         /* try recyle existing job */
147         job = free_jobs;
148         if (job)
149                 free_jobs = job->next;
150         else {
151                 /* allocation without blocking */
152                 pthread_mutex_unlock(&mutex);
153                 job = malloc(sizeof *job);
154                 pthread_mutex_lock(&mutex);
155                 if (!job) {
156                         ERROR("out of memory");
157                         errno = ENOMEM;
158                         goto end;
159                 }
160         }
161         /* initialises the job */
162         job->group = group;
163         job->timeout = timeout;
164         job->callback = callback;
165         job->arg = arg;
166         job->blocked = 0;
167         job->dropped = 0;
168 end:
169         return job;
170 }
171
172 /**
173  * Adds 'job' at the end of the list of jobs, marking it
174  * as blocked if an other job with the same group is pending.
175  * @param job the job to add
176  */
177 static void job_add(struct job *job)
178 {
179         const void *group;
180         struct job *ijob, **pjob;
181
182         /* prepare to add */
183         group = job->group;
184         job->next = NULL;
185
186         /* search end and blockers */
187         pjob = &first_job;
188         ijob = first_job;
189         while (ijob) {
190                 if (group && ijob->group == group)
191                         job->blocked = 1;
192                 pjob = &ijob->next;
193                 ijob = ijob->next;
194         }
195
196         /* queue the jobs */
197         *pjob = job;
198         remains--;
199 }
200
201 /**
202  * Get the next job to process or NULL if none.
203  * @return the first job that isn't blocked or NULL
204  */
205 static inline struct job *job_get()
206 {
207         struct job *job = first_job;
208         while (job && job->blocked)
209                 job = job->next;
210         if (job)
211                 remains++;
212         return job;
213 }
214
215 /**
216  * Releases the processed 'job': removes it
217  * from the list of jobs and unblock the first
218  * pending job of the same group if any.
219  * @param job the job to release
220  */
221 static inline void job_release(struct job *job)
222 {
223         struct job *ijob, **pjob;
224         const void *group;
225
226         /* first unqueue the job */
227         pjob = &first_job;
228         ijob = first_job;
229         while (ijob != job) {
230                 pjob = &ijob->next;
231                 ijob = ijob->next;
232         }
233         *pjob = job->next;
234
235         /* then unblock jobs of the same group */
236         group = job->group;
237         if (group) {
238                 ijob = job->next;
239                 while (ijob && ijob->group != group)
240                         ijob = ijob->next;
241                 if (ijob)
242                         ijob->blocked = 0;
243         }
244
245         /* recycle the job */
246         job->next = free_jobs;
247         free_jobs = job;
248 }
249
250 /**
251  * Monitored cancel callback for a job.
252  * This function is called by the monitor
253  * to cancel the job when the safe environment
254  * is set.
255  * @param signum 0 on normal flow or the number
256  *               of the signal that interrupted the normal
257  *               flow, isn't used
258  * @param arg    the job to run
259  */
260 static void job_cancel(int signum, void *arg)
261 {
262         struct job *job = arg;
263         job->callback(SIGABRT, job->arg);
264 }
265
266 /**
267  * Monitored normal callback for events.
268  * This function is called by the monitor
269  * to run the event loop when the safe environment
270  * is set.
271  * @param signum 0 on normal flow or the number
272  *               of the signal that interrupted the normal
273  *               flow
274  * @param arg     the events to run
275  */
276 static void evloop_run(int signum, void *arg)
277 {
278         int rc;
279         struct sd_event *se;
280
281         if (!signum) {
282                 se = evloop.sdev;
283                 rc = sd_event_prepare(se);
284                 if (rc < 0) {
285                         errno = -rc;
286                         CRITICAL("sd_event_prepare returned an error (state: %d): %m", sd_event_get_state(se));
287                         abort();
288                 } else {
289                         if (rc == 0) {
290                                 rc = sd_event_wait(se, (uint64_t)(int64_t)-1);
291                                 if (rc < 0) {
292                                         errno = -rc;
293                                         ERROR("sd_event_wait returned an error (state: %d): %m", sd_event_get_state(se));
294                                 }
295                         }
296                         evloop.state = EVLOOP_STATE_RUN;
297                         if (rc > 0) {
298                                 rc = sd_event_dispatch(se);
299                                 if (rc < 0) {
300                                         errno = -rc;
301                                         ERROR("sd_event_dispatch returned an error (state: %d): %m", sd_event_get_state(se));
302                                 }
303                         }
304                 }
305         }
306 }
307
308 /**
309  * Internal callback for evloop management.
310  * The effect of this function is hidden: it exits
311  * the waiting poll if any.
312  */
313 static void evloop_on_efd_event()
314 {
315         uint64_t x;
316         read(evloop.efd, &x, sizeof x);
317 }
318
319 /**
320  * wakeup the event loop if needed by sending
321  * an event.
322  */
323 static void evloop_wakeup()
324 {
325         uint64_t x;
326
327         if (evloop.state & EVLOOP_STATE_WAIT) {
328                 x = 1;
329                 write(evloop.efd, &x, sizeof x);
330         }
331 }
332
333 /**
334  * Release the currently held event loop
335  */
336 static void evloop_release()
337 {
338         struct thread *nh, *ct = current_thread;
339
340         if (evloop.holder == ct) {
341                 nh = ct->nholder;
342                 evloop.holder = nh;
343                 if (nh)
344                         pthread_cond_signal(nh->cwhold);
345         }
346 }
347
348 /**
349  * get the eventloop for the current thread
350  */
351 static int evloop_get()
352 {
353         struct thread *ct = current_thread;
354
355         if (evloop.holder)
356                 return evloop.holder == ct;
357
358         ct->nholder = NULL;
359         evloop.holder = ct;
360         return 1;
361 }
362
363 /**
364  * acquire the eventloop for the current thread
365  */
366 static void evloop_acquire()
367 {
368         struct thread **pwait, *ct;
369         pthread_cond_t cond;
370
371         /* try to get the evloop */
372         if (!evloop_get()) {
373                 /* failed, init waiting state */
374                 ct = current_thread;
375                 ct->nholder = NULL;
376                 ct->cwhold = &cond;
377                 pthread_cond_init(&cond, NULL);
378
379                 /* queue current thread in holder list */
380                 pwait = &evloop.holder;
381                 while (*pwait)
382                         pwait = &(*pwait)->nholder;
383                 *pwait = ct;
384
385                 /* wake up the evloop */
386                 evloop_wakeup();
387
388                 /* wait to acquire the evloop */
389                 pthread_cond_wait(&cond, &mutex);
390                 pthread_cond_destroy(&cond);
391         }
392 }
393
394 /**
395  * Enter the thread
396  * @param me the description of the thread to enter
397  */
398 static void thread_enter(volatile struct thread *me)
399 {
400         /* initialize description of itself and link it in the list */
401         me->tid = pthread_self();
402         me->stop = 0;
403         me->waits = 0;
404         me->upper = current_thread;
405         me->next = threads;
406         threads = (struct thread*)me;
407         current_thread = (struct thread*)me;
408 }
409
410 /**
411  * leave the thread
412  * @param me the description of the thread to leave
413  */
414 static void thread_leave()
415 {
416         struct thread **prv, *me;
417
418         /* unlink the current thread and cleanup */
419         me = current_thread;
420         prv = &threads;
421         while (*prv != me)
422                 prv = &(*prv)->next;
423         *prv = me->next;
424
425         current_thread = me->upper;
426 }
427
428 /**
429  * Main processing loop of internal threads with processing jobs.
430  * The loop must be called with the mutex locked
431  * and it returns with the mutex locked.
432  * @param me the description of the thread to use
433  * TODO: how are timeout handled when reentering?
434  */
435 static void thread_run_internal(volatile struct thread *me)
436 {
437         struct job *job;
438
439         /* enter thread */
440         thread_enter(me);
441
442         /* loop until stopped */
443         while (!me->stop) {
444                 /* release the current event loop */
445                 evloop_release();
446
447                 /* get a job */
448                 job = job_get();
449                 if (job) {
450                         /* prepare running the job */
451                         job->blocked = 1; /* mark job as blocked */
452                         me->job = job; /* record the job (only for terminate) */
453
454                         /* run the job */
455                         pthread_mutex_unlock(&mutex);
456                         sig_monitor(job->timeout, job->callback, job->arg);
457                         pthread_mutex_lock(&mutex);
458
459                         /* release the run job */
460                         job_release(job);
461                 /* no job, check event loop wait */
462                 } else if (evloop_get()) {
463                         if (evloop.state != 0) {
464                                 /* busy ? */
465                                 CRITICAL("Can't enter dispatch while in dispatch!");
466                                 abort();
467                         }
468                         /* run the events */
469                         evloop.state = EVLOOP_STATE_RUN|EVLOOP_STATE_WAIT;
470                         pthread_mutex_unlock(&mutex);
471                         sig_monitor(0, evloop_run, NULL);
472                         pthread_mutex_lock(&mutex);
473                         evloop.state = 0;
474                 } else {
475                         /* no job and no event loop */
476                         running--;
477                         if (!running)
478                                 ERROR("Entering job deep sleep! Check your bindings.");
479                         me->waits = 1;
480                         pthread_cond_wait(&cond, &mutex);
481                         me->waits = 0;
482                         running++;
483                 }
484         }
485         /* cleanup */
486         evloop_release();
487         thread_leave();
488 }
489
490 /**
491  * Main processing loop of external threads.
492  * The loop must be called with the mutex locked
493  * and it returns with the mutex locked.
494  * @param me the description of the thread to use
495  */
496 static void thread_run_external(volatile struct thread *me)
497 {
498         /* enter thread */
499         thread_enter(me);
500
501         /* loop until stopped */
502         me->waits = 1;
503         while (!me->stop)
504                 pthread_cond_wait(&cond, &mutex);
505         me->waits = 0;
506         thread_leave();
507 }
508
509 /**
510  * Root for created threads.
511  */
512 static void thread_main()
513 {
514         struct thread me;
515
516         running++;
517         started++;
518         sig_monitor_init_timeouts();
519         thread_run_internal(&me);
520         sig_monitor_clean_timeouts();
521         started--;
522         running--;
523 }
524
525 /**
526  * Entry point for created threads.
527  * @param data not used
528  * @return NULL
529  */
530 static void *thread_starter(void *data)
531 {
532         pthread_mutex_lock(&mutex);
533         thread_main();
534         pthread_mutex_unlock(&mutex);
535         return NULL;
536 }
537
538 /**
539  * Starts a new thread
540  * @return 0 in case of success or -1 in case of error
541  */
542 static int start_one_thread()
543 {
544         pthread_t tid;
545         int rc;
546
547         rc = pthread_create(&tid, NULL, thread_starter, NULL);
548         if (rc != 0) {
549                 /* errno = rc; */
550                 WARNING("not able to start thread: %m");
551                 rc = -1;
552         }
553         return rc;
554 }
555
556 /**
557  * Queues a new asynchronous job represented by 'callback' and 'arg'
558  * for the 'group' and the 'timeout'.
559  * Jobs are queued FIFO and are possibly executed in parallel
560  * concurrently except for job of the same group that are
561  * executed sequentially in FIFO order.
562  * @param group    The group of the job or NULL when no group.
563  * @param timeout  The maximum execution time in seconds of the job
564  *                 or 0 for unlimited time.
565  * @param callback The function to execute for achieving the job.
566  *                 Its first parameter is either 0 on normal flow
567  *                 or the signal number that broke the normal flow.
568  *                 The remaining parameter is the parameter 'arg1'
569  *                 given here.
570  * @param arg      The second argument for 'callback'
571  * @return 0 in case of success or -1 in case of error
572  */
573 int jobs_queue(
574                 const void *group,
575                 int timeout,
576                 void (*callback)(int, void*),
577                 void *arg)
578 {
579         struct job *job;
580         int rc;
581
582         pthread_mutex_lock(&mutex);
583
584         /* allocates the job */
585         job = job_create(group, timeout, callback, arg);
586         if (!job)
587                 goto error;
588
589         /* check availability */
590         if (remains <= 0) {
591                 ERROR("can't process job with threads: too many jobs");
592                 errno = EBUSY;
593                 goto error2;
594         }
595
596         /* start a thread if needed */
597         if (running == started && started < allowed) {
598                 /* all threads are busy and a new can be started */
599                 rc = start_one_thread();
600                 if (rc < 0 && started == 0) {
601                         ERROR("can't start initial thread: %m");
602                         goto error2;
603                 }
604         }
605
606         /* queues the job */
607         job_add(job);
608
609         /* signal an existing job */
610         pthread_cond_signal(&cond);
611         pthread_mutex_unlock(&mutex);
612         return 0;
613
614 error2:
615         job->next = free_jobs;
616         free_jobs = job;
617 error:
618         pthread_mutex_unlock(&mutex);
619         return -1;
620 }
621
622 /**
623  * Internal helper function for 'jobs_enter'.
624  * @see jobs_enter, jobs_leave
625  */
626 static void enter_cb(int signum, void *closure)
627 {
628         struct sync *sync = closure;
629         sync->enter(signum, sync->arg, (void*)&sync->thread);
630 }
631
632 /**
633  * Internal helper function for 'jobs_call'.
634  * @see jobs_call
635  */
636 static void call_cb(int signum, void *closure)
637 {
638         struct sync *sync = closure;
639         sync->callback(signum, sync->arg);
640         jobs_leave((void*)&sync->thread);
641 }
642
643 /**
644  * Internal helper for synchronous jobs. It enters
645  * a new thread loop for evaluating the given job
646  * as recorded by the couple 'sync_cb' and 'sync'.
647  * @see jobs_call, jobs_enter, jobs_leave
648  */
649 static int do_sync(
650                 const void *group,
651                 int timeout,
652                 void (*sync_cb)(int signum, void *closure),
653                 struct sync *sync
654 )
655 {
656         struct job *job;
657
658         pthread_mutex_lock(&mutex);
659
660         /* allocates the job */
661         job = job_create(group, timeout, sync_cb, sync);
662         if (!job) {
663                 pthread_mutex_unlock(&mutex);
664                 return -1;
665         }
666
667         /* queues the job */
668         job_add(job);
669
670         /* run until stopped */
671         if (current_thread)
672                 thread_run_internal(&sync->thread);
673         else
674                 thread_run_external(&sync->thread);
675         pthread_mutex_unlock(&mutex);
676         return 0;
677 }
678
679 /**
680  * Enter a synchronisation point: activates the job given by 'callback'
681  * and 'closure' using 'group' and 'timeout' to control sequencing and
682  * execution time.
683  * @param group the group for sequencing jobs
684  * @param timeout the time in seconds allocated to the job
685  * @param callback the callback that will handle the job.
686  *                 it receives 3 parameters: 'signum' that will be 0
687  *                 on normal flow or the catched signal number in case
688  *                 of interrupted flow, the context 'closure' as given and
689  *                 a 'jobloop' reference that must be used when the job is
690  *                 terminated to unlock the current execution flow.
691  * @param closure the argument to the callback
692  * @return 0 on success or -1 in case of error
693  */
694 int jobs_enter(
695                 const void *group,
696                 int timeout,
697                 void (*callback)(int signum, void *closure, struct jobloop *jobloop),
698                 void *closure
699 )
700 {
701         struct sync sync;
702
703         sync.enter = callback;
704         sync.arg = closure;
705         return do_sync(group, timeout, enter_cb, &sync);
706 }
707
708 /**
709  * Unlocks the execution flow designed by 'jobloop'.
710  * @param jobloop indication of the flow to unlock
711  * @return 0 in case of success of -1 on error
712  */
713 int jobs_leave(struct jobloop *jobloop)
714 {
715         struct thread *t;
716
717         pthread_mutex_lock(&mutex);
718         t = threads;
719         while (t && t != (struct thread*)jobloop)
720                 t = t->next;
721         if (!t) {
722                 errno = EINVAL;
723         } else {
724                 t->stop = 1;
725                 if (t->waits)
726                         pthread_cond_broadcast(&cond);
727                 else
728                         evloop_wakeup();
729         }
730         pthread_mutex_unlock(&mutex);
731         return -!t;
732 }
733
734 /**
735  * Calls synchronously the job represented by 'callback' and 'arg1'
736  * for the 'group' and the 'timeout' and waits for its completion.
737  * @param group    The group of the job or NULL when no group.
738  * @param timeout  The maximum execution time in seconds of the job
739  *                 or 0 for unlimited time.
740  * @param callback The function to execute for achieving the job.
741  *                 Its first parameter is either 0 on normal flow
742  *                 or the signal number that broke the normal flow.
743  *                 The remaining parameter is the parameter 'arg1'
744  *                 given here.
745  * @param arg      The second argument for 'callback'
746  * @return 0 in case of success or -1 in case of error
747  */
748 int jobs_call(
749                 const void *group,
750                 int timeout,
751                 void (*callback)(int, void*),
752                 void *arg)
753 {
754         struct sync sync;
755
756         sync.callback = callback;
757         sync.arg = arg;
758
759         return do_sync(group, timeout, call_cb, &sync);
760 }
761
762 /**
763  * Internal callback for evloop management.
764  * The effect of this function is hidden: it exits
765  * the waiting poll if any. Then it wakes up a thread
766  * awaiting the evloop using signal.
767  */
768 static int on_evloop_efd(sd_event_source *s, int fd, uint32_t revents, void *userdata)
769 {
770         evloop_on_efd_event();
771         return 1;
772 }
773
774 /**
775  * Gets a sd_event item for the current thread.
776  * @return a sd_event or NULL in case of error
777  */
778 static struct sd_event *get_sd_event_locked()
779 {
780         int rc;
781
782         /* creates the evloop on need */
783         if (!evloop.sdev) {
784                 /* start the creation */
785                 evloop.state = 0;
786                 /* creates the eventfd for waking up polls */
787                 evloop.efd = eventfd(0, EFD_CLOEXEC|EFD_SEMAPHORE);
788                 if (evloop.efd < 0) {
789                         ERROR("can't make eventfd for events");
790                         goto error1;
791                 }
792                 /* create the systemd event loop */
793                 rc = sd_event_new(&evloop.sdev);
794                 if (rc < 0) {
795                         ERROR("can't make new event loop");
796                         goto error2;
797                 }
798                 /* put the eventfd in the event loop */
799                 rc = sd_event_add_io(evloop.sdev, NULL, evloop.efd, EPOLLIN, on_evloop_efd, NULL);
800                 if (rc < 0) {
801                         ERROR("can't register eventfd");
802                         sd_event_unref(evloop.sdev);
803                         evloop.sdev = NULL;
804 error2:
805                         close(evloop.efd);
806 error1:
807                         return NULL;
808                 }
809         }
810
811         /* acquire the event loop */
812         evloop_acquire();
813
814         return evloop.sdev;
815 }
816
817 /**
818  * Gets a sd_event item for the current thread.
819  * @return a sd_event or NULL in case of error
820  */
821 struct sd_event *jobs_get_sd_event()
822 {
823         struct sd_event *result;
824         struct thread lt;
825
826         /* ensure an existing thread environment */
827         if (!current_thread) {
828                 memset(&lt, 0, sizeof lt);
829                 current_thread = &lt;
830         }
831
832         /* process */
833         pthread_mutex_lock(&mutex);
834         result = get_sd_event_locked();
835         pthread_mutex_unlock(&mutex);
836
837         /* release the faked thread environment if needed */
838         if (current_thread == &lt) {
839                 /*
840                  * Releasing it is needed because there is no way to guess
841                  * when it has to be released really. But here is where it is
842                  * hazardous: if the caller modifies the eventloop when it
843                  * is waiting, there is no way to make the change effective.
844                  * A workaround to achieve that goal is for the caller to
845                  * require the event loop a second time after having modified it.
846                  */
847                 NOTICE("Requiring sd_event loop out of binder callbacks is hazardous!");
848                 if (verbose_wants(Log_Level_Info))
849                         sig_monitor_dumpstack();
850                 evloop_release();
851                 current_thread = NULL;
852         }
853
854         return result;
855 }
856
857 /**
858  * Enter the jobs processing loop.
859  * @param allowed_count Maximum count of thread for jobs including this one
860  * @param start_count   Count of thread to start now, must be lower.
861  * @param waiter_count  Maximum count of jobs that can be waiting.
862  * @param start         The start routine to activate (can't be NULL)
863  * @return 0 in case of success or -1 in case of error.
864  */
865 int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)(int signum, void* arg), void *arg)
866 {
867         int rc, launched;
868         struct job *job;
869
870         assert(allowed_count >= 1);
871         assert(start_count >= 0);
872         assert(waiter_count > 0);
873         assert(start_count <= allowed_count);
874
875         rc = -1;
876         pthread_mutex_lock(&mutex);
877
878         /* check whether already running */
879         if (current_thread || allowed) {
880                 ERROR("thread already started");
881                 errno = EINVAL;
882                 goto error;
883         }
884
885         /* records the allowed count */
886         allowed = allowed_count;
887         started = 0;
888         running = 0;
889         remains = waiter_count;
890
891 #if HAS_WATCHDOG
892         /* set the watchdog */
893         if (sd_watchdog_enabled(0, NULL))
894                 sd_event_set_watchdog(get_sd_event_locked(), 1);
895 #endif
896
897         /* start at least one thread: the current one */
898         launched = 1;
899         while (launched < start_count) {
900                 if (start_one_thread() != 0) {
901                         ERROR("Not all threads can be started");
902                         goto error;
903                 }
904                 launched++;
905         }
906
907         /* queue the start job */
908         job = job_create(NULL, 0, start, arg);
909         if (!job)
910                 goto error;
911         job_add(job);
912
913         /* run until end */
914         thread_main();
915         rc = 0;
916 error:
917         pthread_mutex_unlock(&mutex);
918         return rc;
919 }
920
921 /**
922  * Terminate all the threads and cancel all pending jobs.
923  */
924 void jobs_terminate()
925 {
926         struct job *job, *head, *tail;
927         pthread_t me, *others;
928         struct thread *t;
929         int count;
930
931         /* how am i? */
932         me = pthread_self();
933
934         /* request all threads to stop */
935         pthread_mutex_lock(&mutex);
936         allowed = 0;
937
938         /* count the number of threads */
939         count = 0;
940         t = threads;
941         while (t) {
942                 if (!t->upper && !pthread_equal(t->tid, me))
943                         count++;
944                 t = t->next;
945         }
946
947         /* fill the array of threads */
948         others = alloca(count * sizeof *others);
949         count = 0;
950         t = threads;
951         while (t) {
952                 if (!t->upper && !pthread_equal(t->tid, me))
953                         others[count++] = t->tid;
954                 t = t->next;
955         }
956
957         /* stops the threads */
958         t = threads;
959         while (t) {
960                 t->stop = 1;
961                 t = t->next;
962         }
963
964         /* wait the threads */
965         pthread_cond_broadcast(&cond);
966         pthread_mutex_unlock(&mutex);
967         while (count)
968                 pthread_join(others[--count], NULL);
969         pthread_mutex_lock(&mutex);
970
971         /* cancel pending jobs of other threads */
972         remains = 0;
973         head = first_job;
974         first_job = NULL;
975         tail = NULL;
976         while (head) {
977                 /* unlink the job */
978                 job = head;
979                 head = job->next;
980
981                 /* search if job is stacked for current */
982                 t = current_thread;
983                 while (t && t->job != job)
984                         t = t->upper;
985                 if (t) {
986                         /* yes, relink it at end */
987                         if (tail)
988                                 tail->next = job;
989                         else
990                                 first_job = job;
991                         tail = job;
992                         job->next = NULL;
993                 } else {
994                         /* no cancel the job */
995                         pthread_mutex_unlock(&mutex);
996                         sig_monitor(0, job_cancel, job);
997                         free(job);
998                         pthread_mutex_lock(&mutex);
999                 }
1000         }
1001         pthread_mutex_unlock(&mutex);
1002 }
1003