jobs: Clean unneeded code
[src/app-framework-binder.git] / src / jobs.c
1 /*
2  * Copyright (C) 2016-2019 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #if defined(NO_JOBS_WATCHDOG)
21 #   define HAS_WATCHDOG 0
22 #else
23 #   define HAS_WATCHDOG 1
24 #endif
25
26 #include <stdlib.h>
27 #include <stdint.h>
28 #include <unistd.h>
29 #include <signal.h>
30 #include <string.h>
31 #include <time.h>
32 #include <sys/syscall.h>
33 #include <pthread.h>
34 #include <errno.h>
35 #include <assert.h>
36 #include <sys/eventfd.h>
37
38 #include <systemd/sd-event.h>
39 #if HAS_WATCHDOG
40 #include <systemd/sd-daemon.h>
41 #endif
42
43 #include "jobs.h"
44 #include "sig-monitor.h"
45 #include "verbose.h"
46
47 #define EVENT_TIMEOUT_TOP       ((uint64_t)-1)
48 #define EVENT_TIMEOUT_CHILD     ((uint64_t)10000)
49
50 struct thread;
51
52 /** Internal shortcut for callback */
53 typedef void (*job_cb_t)(int, void*);
54
55 /** Description of a pending job */
56 struct job
57 {
58         struct job *next;    /**< link to the next job enqueued */
59         const void *group;   /**< group of the request */
60         job_cb_t callback;   /**< processing callback */
61         void *arg;           /**< argument */
62         int timeout;         /**< timeout in second for processing the request */
63         unsigned blocked: 1; /**< is an other request blocking this one ? */
64         unsigned dropped: 1; /**< is removed ? */
65 };
66
67 /** Description of handled event loops */
68 struct evloop
69 {
70         unsigned state;        /**< encoded state */
71         int efd;               /**< event notification */
72         struct sd_event *sdev; /**< the systemd event loop */
73         struct thread *holder; /**< holder of the evloop */
74 };
75
76 #define EVLOOP_STATE_WAIT           1U
77 #define EVLOOP_STATE_RUN            2U
78
79 /** Description of threads */
80 struct thread
81 {
82         struct thread *next;   /**< next thread of the list */
83         struct thread *upper;  /**< upper same thread */
84         struct thread *nholder;/**< next holder for evloop */
85         pthread_cond_t *cwhold;/**< condition wait for holding */
86         struct job *job;       /**< currently processed job */
87         pthread_t tid;         /**< the thread id */
88         volatile unsigned stop: 1;      /**< stop requested */
89         volatile unsigned waits: 1;     /**< is waiting? */
90 };
91
92 /**
93  * Description of synchronous callback
94  */
95 struct sync
96 {
97         struct thread thread;   /**< thread loop data */
98         union {
99                 void (*callback)(int, void*);   /**< the synchronous callback */
100                 void (*enter)(int signum, void *closure, struct jobloop *jobloop);
101                                 /**< the entering synchronous routine */
102         };
103         void *arg;              /**< the argument of the callback */
104 };
105
106
107 /* synchronisation of threads */
108 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
109 static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
110
111 /* count allowed, started and running threads */
112 static int allowed = 0; /** allowed count of threads */
113 static int started = 0; /** started count of threads */
114 static int running = 0; /** running count of threads */
115 static int remains = 0; /** allowed count of waiting jobs */
116
117 /* list of threads */
118 static struct thread *threads;
119 static _Thread_local struct thread *current_thread;
120
121 /* queue of pending jobs */
122 static struct job *first_job;
123 static struct job *free_jobs;
124
125 /* event loop */
126 static struct evloop evloop;
127
128 /**
129  * Create a new job with the given parameters
130  * @param group    the group of the job
131  * @param timeout  the timeout of the job (0 if none)
132  * @param callback the function that achieves the job
133  * @param arg      the argument of the callback
134  * @return the created job unblock or NULL when no more memory
135  */
136 static struct job *job_create(
137                 const void *group,
138                 int timeout,
139                 job_cb_t callback,
140                 void *arg)
141 {
142         struct job *job;
143
144         /* try recyle existing job */
145         job = free_jobs;
146         if (job)
147                 free_jobs = job->next;
148         else {
149                 /* allocation without blocking */
150                 pthread_mutex_unlock(&mutex);
151                 job = malloc(sizeof *job);
152                 pthread_mutex_lock(&mutex);
153                 if (!job) {
154                         ERROR("out of memory");
155                         errno = ENOMEM;
156                         goto end;
157                 }
158         }
159         /* initialises the job */
160         job->group = group;
161         job->timeout = timeout;
162         job->callback = callback;
163         job->arg = arg;
164         job->blocked = 0;
165         job->dropped = 0;
166 end:
167         return job;
168 }
169
170 /**
171  * Adds 'job' at the end of the list of jobs, marking it
172  * as blocked if an other job with the same group is pending.
173  * @param job the job to add
174  */
175 static void job_add(struct job *job)
176 {
177         const void *group;
178         struct job *ijob, **pjob;
179
180         /* prepare to add */
181         group = job->group;
182         job->next = NULL;
183
184         /* search end and blockers */
185         pjob = &first_job;
186         ijob = first_job;
187         while (ijob) {
188                 if (group && ijob->group == group)
189                         job->blocked = 1;
190                 pjob = &ijob->next;
191                 ijob = ijob->next;
192         }
193
194         /* queue the jobs */
195         *pjob = job;
196         remains--;
197 }
198
199 /**
200  * Get the next job to process or NULL if none.
201  * @return the first job that isn't blocked or NULL
202  */
203 static inline struct job *job_get()
204 {
205         struct job *job = first_job;
206         while (job && job->blocked)
207                 job = job->next;
208         if (job)
209                 remains++;
210         return job;
211 }
212
213 /**
214  * Releases the processed 'job': removes it
215  * from the list of jobs and unblock the first
216  * pending job of the same group if any.
217  * @param job the job to release
218  */
219 static inline void job_release(struct job *job)
220 {
221         struct job *ijob, **pjob;
222         const void *group;
223
224         /* first unqueue the job */
225         pjob = &first_job;
226         ijob = first_job;
227         while (ijob != job) {
228                 pjob = &ijob->next;
229                 ijob = ijob->next;
230         }
231         *pjob = job->next;
232
233         /* then unblock jobs of the same group */
234         group = job->group;
235         if (group) {
236                 ijob = job->next;
237                 while (ijob && ijob->group != group)
238                         ijob = ijob->next;
239                 if (ijob)
240                         ijob->blocked = 0;
241         }
242
243         /* recycle the job */
244         job->next = free_jobs;
245         free_jobs = job;
246 }
247
248 /**
249  * Monitored cancel callback for a job.
250  * This function is called by the monitor
251  * to cancel the job when the safe environment
252  * is set.
253  * @param signum 0 on normal flow or the number
254  *               of the signal that interrupted the normal
255  *               flow, isn't used
256  * @param arg    the job to run
257  */
258 static void job_cancel(int signum, void *arg)
259 {
260         struct job *job = arg;
261         job->callback(SIGABRT, job->arg);
262 }
263
264 /**
265  * Monitored normal callback for events.
266  * This function is called by the monitor
267  * to run the event loop when the safe environment
268  * is set.
269  * @param signum 0 on normal flow or the number
270  *               of the signal that interrupted the normal
271  *               flow
272  * @param arg     the events to run
273  */
274 static void evloop_run(int signum, void *arg)
275 {
276         int rc;
277         struct sd_event *se;
278
279         if (!signum) {
280                 se = evloop.sdev;
281                 rc = sd_event_prepare(se);
282                 if (rc < 0) {
283                         errno = -rc;
284                         CRITICAL("sd_event_prepare returned an error (state: %d): %m", sd_event_get_state(se));
285                         abort();
286                 } else {
287                         if (rc == 0) {
288                                 rc = sd_event_wait(se, (uint64_t)(int64_t)-1);
289                                 if (rc < 0) {
290                                         errno = -rc;
291                                         ERROR("sd_event_wait returned an error (state: %d): %m", sd_event_get_state(se));
292                                 }
293                         }
294                         evloop.state = EVLOOP_STATE_RUN;
295                         if (rc > 0) {
296                                 rc = sd_event_dispatch(se);
297                                 if (rc < 0) {
298                                         errno = -rc;
299                                         ERROR("sd_event_dispatch returned an error (state: %d): %m", sd_event_get_state(se));
300                                 }
301                         }
302                 }
303         }
304 }
305
306 /**
307  * Internal callback for evloop management.
308  * The effect of this function is hidden: it exits
309  * the waiting poll if any.
310  */
311 static void evloop_on_efd_event()
312 {
313         uint64_t x;
314         read(evloop.efd, &x, sizeof x);
315 }
316
317 /**
318  * wakeup the event loop if needed by sending
319  * an event.
320  */
321 static void evloop_wakeup()
322 {
323         uint64_t x;
324
325         if (evloop.state & EVLOOP_STATE_WAIT) {
326                 x = 1;
327                 write(evloop.efd, &x, sizeof x);
328         }
329 }
330
331 /**
332  * Release the currently held event loop
333  */
334 static void evloop_release()
335 {
336         struct thread *nh, *ct = current_thread;
337
338         if (ct && evloop.holder == ct) {
339                 nh = ct->nholder;
340                 evloop.holder = nh;
341                 if (nh)
342                         pthread_cond_signal(nh->cwhold);
343         }
344 }
345
346 /**
347  * get the eventloop for the current thread
348  */
349 static int evloop_get()
350 {
351         struct thread *ct = current_thread;
352
353         if (evloop.holder)
354                 return evloop.holder == ct;
355
356         if (!evloop.sdev)
357                 return 0;
358
359         ct->nholder = NULL;
360         evloop.holder = ct;
361         return 1;
362 }
363
364 /**
365  * acquire the eventloop for the current thread
366  */
367 static void evloop_acquire()
368 {
369         struct thread **pwait, *ct;
370         pthread_cond_t cond;
371
372         /* try to get the evloop */
373         if (!evloop_get()) {
374                 /* failed, init waiting state */
375                 ct = current_thread;
376                 ct->nholder = NULL;
377                 ct->cwhold = &cond;
378                 pthread_cond_init(&cond, NULL);
379
380                 /* queue current thread in holder list */
381                 pwait = &evloop.holder;
382                 while (*pwait)
383                         pwait = &(*pwait)->nholder;
384                 *pwait = ct;
385
386                 /* wake up the evloop */
387                 evloop_wakeup();
388
389                 /* wait to acquire the evloop */
390                 pthread_cond_wait(&cond, &mutex);
391                 pthread_cond_destroy(&cond);
392         }
393 }
394
395 /**
396  * Enter the thread
397  * @param me the description of the thread to enter
398  */
399 static void thread_enter(volatile struct thread *me)
400 {
401         evloop_release();
402         /* initialize description of itself and link it in the list */
403         me->tid = pthread_self();
404         me->stop = 0;
405         me->waits = 0;
406         me->upper = current_thread;
407         me->next = threads;
408         threads = (struct thread*)me;
409         current_thread = (struct thread*)me;
410 }
411
412 /**
413  * leave the thread
414  * @param me the description of the thread to leave
415  */
416 static void thread_leave()
417 {
418         struct thread **prv, *me;
419
420         /* unlink the current thread and cleanup */
421         me = current_thread;
422         prv = &threads;
423         while (*prv != me)
424                 prv = &(*prv)->next;
425         *prv = me->next;
426
427         current_thread = me->upper;
428 }
429
430 /**
431  * Main processing loop of internal threads with processing jobs.
432  * The loop must be called with the mutex locked
433  * and it returns with the mutex locked.
434  * @param me the description of the thread to use
435  * TODO: how are timeout handled when reentering?
436  */
437 static void thread_run_internal(volatile struct thread *me)
438 {
439         struct job *job;
440
441         /* enter thread */
442         thread_enter(me);
443
444         /* loop until stopped */
445         while (!me->stop) {
446                 /* release the current event loop */
447                 evloop_release();
448
449                 /* get a job */
450                 job = job_get();
451                 if (job) {
452                         /* prepare running the job */
453                         job->blocked = 1; /* mark job as blocked */
454                         me->job = job; /* record the job (only for terminate) */
455
456                         /* run the job */
457                         pthread_mutex_unlock(&mutex);
458                         sig_monitor(job->timeout, job->callback, job->arg);
459                         pthread_mutex_lock(&mutex);
460
461                         /* release the run job */
462                         job_release(job);
463                 /* no job, check event loop wait */
464                 } else if (evloop_get()) {
465                         if (evloop.state != 0) {
466                                 /* busy ? */
467                                 CRITICAL("Can't enter dispatch while in dispatch!");
468                                 abort();
469                         }
470                         /* run the events */
471                         evloop.state = EVLOOP_STATE_RUN|EVLOOP_STATE_WAIT;
472                         pthread_mutex_unlock(&mutex);
473                         sig_monitor(0, evloop_run, NULL);
474                         pthread_mutex_lock(&mutex);
475                         evloop.state = 0;
476                 } else {
477                         /* no job and no event loop */
478                         running--;
479                         if (!running)
480                                 ERROR("Entering job deep sleep! Check your bindings.");
481                         me->waits = 1;
482                         pthread_cond_wait(&cond, &mutex);
483                         me->waits = 0;
484                         running++;
485                 }
486         }
487         /* cleanup */
488         evloop_release();
489         thread_leave();
490 }
491
492 /**
493  * Main processing loop of external threads.
494  * The loop must be called with the mutex locked
495  * and it returns with the mutex locked.
496  * @param me the description of the thread to use
497  */
498 static void thread_run_external(volatile struct thread *me)
499 {
500         /* enter thread */
501         thread_enter(me);
502
503         /* loop until stopped */
504         me->waits = 1;
505         while (!me->stop)
506                 pthread_cond_wait(&cond, &mutex);
507         me->waits = 0;
508         thread_leave();
509 }
510
511 /**
512  * Root for created threads.
513  */
514 static void thread_main()
515 {
516         struct thread me;
517
518         running++;
519         started++;
520         sig_monitor_init_timeouts();
521         thread_run_internal(&me);
522         sig_monitor_clean_timeouts();
523         started--;
524         running--;
525 }
526
527 /**
528  * Entry point for created threads.
529  * @param data not used
530  * @return NULL
531  */
532 static void *thread_starter(void *data)
533 {
534         pthread_mutex_lock(&mutex);
535         thread_main();
536         pthread_mutex_unlock(&mutex);
537         return NULL;
538 }
539
540 /**
541  * Starts a new thread
542  * @return 0 in case of success or -1 in case of error
543  */
544 static int start_one_thread()
545 {
546         pthread_t tid;
547         int rc;
548
549         rc = pthread_create(&tid, NULL, thread_starter, NULL);
550         if (rc != 0) {
551                 /* errno = rc; */
552                 WARNING("not able to start thread: %m");
553                 rc = -1;
554         }
555         return rc;
556 }
557
558 /**
559  * Queues a new asynchronous job represented by 'callback' and 'arg'
560  * for the 'group' and the 'timeout'.
561  * Jobs are queued FIFO and are possibly executed in parallel
562  * concurrently except for job of the same group that are
563  * executed sequentially in FIFO order.
564  * @param group    The group of the job or NULL when no group.
565  * @param timeout  The maximum execution time in seconds of the job
566  *                 or 0 for unlimited time.
567  * @param callback The function to execute for achieving the job.
568  *                 Its first parameter is either 0 on normal flow
569  *                 or the signal number that broke the normal flow.
570  *                 The remaining parameter is the parameter 'arg1'
571  *                 given here.
572  * @param arg      The second argument for 'callback'
573  * @return 0 in case of success or -1 in case of error
574  */
575 int jobs_queue(
576                 const void *group,
577                 int timeout,
578                 void (*callback)(int, void*),
579                 void *arg)
580 {
581         struct job *job;
582         int rc;
583
584         pthread_mutex_lock(&mutex);
585
586         /* allocates the job */
587         job = job_create(group, timeout, callback, arg);
588         if (!job)
589                 goto error;
590
591         /* check availability */
592         if (remains <= 0) {
593                 ERROR("can't process job with threads: too many jobs");
594                 errno = EBUSY;
595                 goto error2;
596         }
597
598         /* start a thread if needed */
599         if (running == started && started < allowed) {
600                 /* all threads are busy and a new can be started */
601                 rc = start_one_thread();
602                 if (rc < 0 && started == 0) {
603                         ERROR("can't start initial thread: %m");
604                         goto error2;
605                 }
606         }
607
608         /* queues the job */
609         job_add(job);
610
611         /* signal an existing job */
612         pthread_cond_signal(&cond);
613         pthread_mutex_unlock(&mutex);
614         return 0;
615
616 error2:
617         job->next = free_jobs;
618         free_jobs = job;
619 error:
620         pthread_mutex_unlock(&mutex);
621         return -1;
622 }
623
624 /**
625  * Internal helper function for 'jobs_enter'.
626  * @see jobs_enter, jobs_leave
627  */
628 static void enter_cb(int signum, void *closure)
629 {
630         struct sync *sync = closure;
631         sync->enter(signum, sync->arg, (void*)&sync->thread);
632 }
633
634 /**
635  * Internal helper function for 'jobs_call'.
636  * @see jobs_call
637  */
638 static void call_cb(int signum, void *closure)
639 {
640         struct sync *sync = closure;
641         sync->callback(signum, sync->arg);
642         jobs_leave((void*)&sync->thread);
643 }
644
645 /**
646  * Internal helper for synchronous jobs. It enters
647  * a new thread loop for evaluating the given job
648  * as recorded by the couple 'sync_cb' and 'sync'.
649  * @see jobs_call, jobs_enter, jobs_leave
650  */
651 static int do_sync(
652                 const void *group,
653                 int timeout,
654                 void (*sync_cb)(int signum, void *closure),
655                 struct sync *sync
656 )
657 {
658         struct job *job;
659
660         pthread_mutex_lock(&mutex);
661
662         /* allocates the job */
663         job = job_create(group, timeout, sync_cb, sync);
664         if (!job) {
665                 pthread_mutex_unlock(&mutex);
666                 return -1;
667         }
668
669         /* queues the job */
670         job_add(job);
671
672         /* run until stopped */
673         if (current_thread)
674                 thread_run_internal(&sync->thread);
675         else
676                 thread_run_external(&sync->thread);
677         pthread_mutex_unlock(&mutex);
678         return 0;
679 }
680
681 /**
682  * Enter a synchronisation point: activates the job given by 'callback'
683  * and 'closure' using 'group' and 'timeout' to control sequencing and
684  * execution time.
685  * @param group the group for sequencing jobs
686  * @param timeout the time in seconds allocated to the job
687  * @param callback the callback that will handle the job.
688  *                 it receives 3 parameters: 'signum' that will be 0
689  *                 on normal flow or the catched signal number in case
690  *                 of interrupted flow, the context 'closure' as given and
691  *                 a 'jobloop' reference that must be used when the job is
692  *                 terminated to unlock the current execution flow.
693  * @param closure the argument to the callback
694  * @return 0 on success or -1 in case of error
695  */
696 int jobs_enter(
697                 const void *group,
698                 int timeout,
699                 void (*callback)(int signum, void *closure, struct jobloop *jobloop),
700                 void *closure
701 )
702 {
703         struct sync sync;
704
705         sync.enter = callback;
706         sync.arg = closure;
707         return do_sync(group, timeout, enter_cb, &sync);
708 }
709
710 /**
711  * Unlocks the execution flow designed by 'jobloop'.
712  * @param jobloop indication of the flow to unlock
713  * @return 0 in case of success of -1 on error
714  */
715 int jobs_leave(struct jobloop *jobloop)
716 {
717         struct thread *t;
718
719         pthread_mutex_lock(&mutex);
720         t = threads;
721         while (t && t != (struct thread*)jobloop)
722                 t = t->next;
723         if (!t) {
724                 errno = EINVAL;
725         } else {
726                 t->stop = 1;
727                 if (t->waits)
728                         pthread_cond_broadcast(&cond);
729                 else
730                         evloop_wakeup();
731         }
732         pthread_mutex_unlock(&mutex);
733         return -!t;
734 }
735
736 /**
737  * Calls synchronously the job represented by 'callback' and 'arg1'
738  * for the 'group' and the 'timeout' and waits for its completion.
739  * @param group    The group of the job or NULL when no group.
740  * @param timeout  The maximum execution time in seconds of the job
741  *                 or 0 for unlimited time.
742  * @param callback The function to execute for achieving the job.
743  *                 Its first parameter is either 0 on normal flow
744  *                 or the signal number that broke the normal flow.
745  *                 The remaining parameter is the parameter 'arg1'
746  *                 given here.
747  * @param arg      The second argument for 'callback'
748  * @return 0 in case of success or -1 in case of error
749  */
750 int jobs_call(
751                 const void *group,
752                 int timeout,
753                 void (*callback)(int, void*),
754                 void *arg)
755 {
756         struct sync sync;
757
758         sync.callback = callback;
759         sync.arg = arg;
760
761         return do_sync(group, timeout, call_cb, &sync);
762 }
763
764 /**
765  * Internal callback for evloop management.
766  * The effect of this function is hidden: it exits
767  * the waiting poll if any. Then it wakes up a thread
768  * awaiting the evloop using signal.
769  */
770 static int on_evloop_efd(sd_event_source *s, int fd, uint32_t revents, void *userdata)
771 {
772         evloop_on_efd_event();
773         return 1;
774 }
775
776 /**
777  * Gets a sd_event item for the current thread.
778  * @return a sd_event or NULL in case of error
779  */
780 static struct sd_event *get_sd_event_locked()
781 {
782         int rc;
783
784         /* creates the evloop on need */
785         if (!evloop.sdev) {
786                 /* start the creation */
787                 evloop.state = 0;
788                 /* creates the eventfd for waking up polls */
789                 evloop.efd = eventfd(0, EFD_CLOEXEC|EFD_SEMAPHORE);
790                 if (evloop.efd < 0) {
791                         ERROR("can't make eventfd for events");
792                         goto error1;
793                 }
794                 /* create the systemd event loop */
795                 rc = sd_event_new(&evloop.sdev);
796                 if (rc < 0) {
797                         ERROR("can't make new event loop");
798                         goto error2;
799                 }
800                 /* put the eventfd in the event loop */
801                 rc = sd_event_add_io(evloop.sdev, NULL, evloop.efd, EPOLLIN, on_evloop_efd, NULL);
802                 if (rc < 0) {
803                         ERROR("can't register eventfd");
804                         sd_event_unref(evloop.sdev);
805                         evloop.sdev = NULL;
806 error2:
807                         close(evloop.efd);
808 error1:
809                         return NULL;
810                 }
811         }
812
813         /* acquire the event loop */
814         evloop_acquire();
815
816         return evloop.sdev;
817 }
818
819 /**
820  * Gets a sd_event item for the current thread.
821  * @return a sd_event or NULL in case of error
822  */
823 struct sd_event *jobs_get_sd_event()
824 {
825         struct sd_event *result;
826         struct thread lt;
827
828         /* ensure an existing thread environment */
829         if (!current_thread) {
830                 memset(&lt, 0, sizeof lt);
831                 current_thread = &lt;
832         }
833
834         /* process */
835         pthread_mutex_lock(&mutex);
836         result = get_sd_event_locked();
837         pthread_mutex_unlock(&mutex);
838
839         /* release the faked thread environment if needed */
840         if (current_thread == &lt) {
841                 /*
842                  * Releasing it is needed because there is no way to guess
843                  * when it has to be released really. But here is where it is
844                  * hazardous: if the caller modifies the eventloop when it
845                  * is waiting, there is no way to make the change effective.
846                  * A workaround to achieve that goal is for the caller to
847                  * require the event loop a second time after having modified it.
848                  */
849                 NOTICE("Requiring sd_event loop out of binder callbacks is hazardous!");
850                 if (verbose_wants(Log_Level_Info))
851                         sig_monitor_dumpstack();
852                 evloop_release();
853                 current_thread = NULL;
854         }
855
856         return result;
857 }
858
859 /**
860  * Enter the jobs processing loop.
861  * @param allowed_count Maximum count of thread for jobs including this one
862  * @param start_count   Count of thread to start now, must be lower.
863  * @param waiter_count  Maximum count of jobs that can be waiting.
864  * @param start         The start routine to activate (can't be NULL)
865  * @return 0 in case of success or -1 in case of error.
866  */
867 int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)(int signum, void* arg), void *arg)
868 {
869         int rc, launched;
870         struct job *job;
871
872         assert(allowed_count >= 1);
873         assert(start_count >= 0);
874         assert(waiter_count > 0);
875         assert(start_count <= allowed_count);
876
877         rc = -1;
878         pthread_mutex_lock(&mutex);
879
880         /* check whether already running */
881         if (current_thread || allowed) {
882                 ERROR("thread already started");
883                 errno = EINVAL;
884                 goto error;
885         }
886
887         /* records the allowed count */
888         allowed = allowed_count;
889         started = 0;
890         running = 0;
891         remains = waiter_count;
892
893 #if HAS_WATCHDOG
894         /* set the watchdog */
895         if (sd_watchdog_enabled(0, NULL))
896                 sd_event_set_watchdog(get_sd_event_locked(), 1);
897 #endif
898
899         /* start at least one thread: the current one */
900         launched = 1;
901         while (launched < start_count) {
902                 if (start_one_thread() != 0) {
903                         ERROR("Not all threads can be started");
904                         goto error;
905                 }
906                 launched++;
907         }
908
909         /* queue the start job */
910         job = job_create(NULL, 0, start, arg);
911         if (!job)
912                 goto error;
913         job_add(job);
914
915         /* run until end */
916         thread_main();
917         rc = 0;
918 error:
919         pthread_mutex_unlock(&mutex);
920         return rc;
921 }
922
923 /**
924  * Terminate all the threads and cancel all pending jobs.
925  */
926 void jobs_terminate()
927 {
928         struct job *job, *head, *tail;
929         pthread_t me, *others;
930         struct thread *t;
931         int count;
932
933         /* how am i? */
934         me = pthread_self();
935
936         /* request all threads to stop */
937         pthread_mutex_lock(&mutex);
938         allowed = 0;
939
940         /* count the number of threads */
941         count = 0;
942         t = threads;
943         while (t) {
944                 if (!t->upper && !pthread_equal(t->tid, me))
945                         count++;
946                 t = t->next;
947         }
948
949         /* fill the array of threads */
950         others = alloca(count * sizeof *others);
951         count = 0;
952         t = threads;
953         while (t) {
954                 if (!t->upper && !pthread_equal(t->tid, me))
955                         others[count++] = t->tid;
956                 t = t->next;
957         }
958
959         /* stops the threads */
960         t = threads;
961         while (t) {
962                 t->stop = 1;
963                 t = t->next;
964         }
965
966         /* wait the threads */
967         pthread_cond_broadcast(&cond);
968         pthread_mutex_unlock(&mutex);
969         while (count)
970                 pthread_join(others[--count], NULL);
971         pthread_mutex_lock(&mutex);
972
973         /* cancel pending jobs of other threads */
974         remains = 0;
975         head = first_job;
976         first_job = NULL;
977         tail = NULL;
978         while (head) {
979                 /* unlink the job */
980                 job = head;
981                 head = job->next;
982
983                 /* search if job is stacked for current */
984                 t = current_thread;
985                 while (t && t->job != job)
986                         t = t->upper;
987                 if (t) {
988                         /* yes, relink it at end */
989                         if (tail)
990                                 tail->next = job;
991                         else
992                                 first_job = job;
993                         tail = job;
994                         job->next = NULL;
995                 } else {
996                         /* no cancel the job */
997                         pthread_mutex_unlock(&mutex);
998                         sig_monitor(0, job_cancel, job);
999                         free(job);
1000                         pthread_mutex_lock(&mutex);
1001                 }
1002         }
1003         pthread_mutex_unlock(&mutex);
1004 }
1005