jobs: Ensure validy of event loop
[src/app-framework-binder.git] / src / jobs.c
1 /*
2  * Copyright (C) 2016-2019 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #if defined(NO_JOBS_WATCHDOG)
21 #   define HAS_WATCHDOG 0
22 #else
23 #   define HAS_WATCHDOG 1
24 #endif
25
26 #include <stdlib.h>
27 #include <stdint.h>
28 #include <unistd.h>
29 #include <signal.h>
30 #include <string.h>
31 #include <time.h>
32 #include <sys/syscall.h>
33 #include <pthread.h>
34 #include <errno.h>
35 #include <assert.h>
36 #include <sys/eventfd.h>
37
38 #include <systemd/sd-event.h>
39 #include "fdev.h"
40 #if HAS_WATCHDOG
41 #include <systemd/sd-daemon.h>
42 #endif
43
44 #include "jobs.h"
45 #include "sig-monitor.h"
46 #include "verbose.h"
47
48 #define EVENT_TIMEOUT_TOP       ((uint64_t)-1)
49 #define EVENT_TIMEOUT_CHILD     ((uint64_t)10000)
50
51 struct thread;
52
53 /** Internal shortcut for callback */
54 typedef void (*job_cb_t)(int, void*);
55
56 /** Description of a pending job */
57 struct job
58 {
59         struct job *next;    /**< link to the next job enqueued */
60         const void *group;   /**< group of the request */
61         job_cb_t callback;   /**< processing callback */
62         void *arg;           /**< argument */
63         int timeout;         /**< timeout in second for processing the request */
64         unsigned blocked: 1; /**< is an other request blocking this one ? */
65         unsigned dropped: 1; /**< is removed ? */
66 };
67
68 /** Description of handled event loops */
69 struct evloop
70 {
71         unsigned state;        /**< encoded state */
72         int efd;               /**< event notification */
73         struct sd_event *sdev; /**< the systemd event loop */
74         struct fdev *fdev;     /**< handling of events */
75         struct thread *holder; /**< holder of the evloop */
76 };
77
78 #define EVLOOP_STATE_WAIT           1U
79 #define EVLOOP_STATE_RUN            2U
80
81 /** Description of threads */
82 struct thread
83 {
84         struct thread *next;   /**< next thread of the list */
85         struct thread *upper;  /**< upper same thread */
86         struct thread *nholder;/**< next holder for evloop */
87         pthread_cond_t *cwhold;/**< condition wait for holding */
88         struct job *job;       /**< currently processed job */
89         pthread_t tid;         /**< the thread id */
90         volatile unsigned stop: 1;      /**< stop requested */
91         volatile unsigned waits: 1;     /**< is waiting? */
92 };
93
94 /**
95  * Description of synchronous callback
96  */
97 struct sync
98 {
99         struct thread thread;   /**< thread loop data */
100         union {
101                 void (*callback)(int, void*);   /**< the synchronous callback */
102                 void (*enter)(int signum, void *closure, struct jobloop *jobloop);
103                                 /**< the entering synchronous routine */
104         };
105         void *arg;              /**< the argument of the callback */
106 };
107
108
109 /* synchronisation of threads */
110 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
111 static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
112
113 /* count allowed, started and running threads */
114 static int allowed = 0; /** allowed count of threads */
115 static int started = 0; /** started count of threads */
116 static int running = 0; /** running count of threads */
117 static int remains = 0; /** allowed count of waiting jobs */
118
119 /* list of threads */
120 static struct thread *threads;
121 static _Thread_local struct thread *current_thread;
122
123 /* queue of pending jobs */
124 static struct job *first_job;
125 static struct job *free_jobs;
126
127 /* event loop */
128 static struct evloop evloop;
129
130 /**
131  * Create a new job with the given parameters
132  * @param group    the group of the job
133  * @param timeout  the timeout of the job (0 if none)
134  * @param callback the function that achieves the job
135  * @param arg      the argument of the callback
136  * @return the created job unblock or NULL when no more memory
137  */
138 static struct job *job_create(
139                 const void *group,
140                 int timeout,
141                 job_cb_t callback,
142                 void *arg)
143 {
144         struct job *job;
145
146         /* try recyle existing job */
147         job = free_jobs;
148         if (job)
149                 free_jobs = job->next;
150         else {
151                 /* allocation without blocking */
152                 pthread_mutex_unlock(&mutex);
153                 job = malloc(sizeof *job);
154                 pthread_mutex_lock(&mutex);
155                 if (!job) {
156                         ERROR("out of memory");
157                         errno = ENOMEM;
158                         goto end;
159                 }
160         }
161         /* initialises the job */
162         job->group = group;
163         job->timeout = timeout;
164         job->callback = callback;
165         job->arg = arg;
166         job->blocked = 0;
167         job->dropped = 0;
168 end:
169         return job;
170 }
171
172 /**
173  * Adds 'job' at the end of the list of jobs, marking it
174  * as blocked if an other job with the same group is pending.
175  * @param job the job to add
176  */
177 static void job_add(struct job *job)
178 {
179         const void *group;
180         struct job *ijob, **pjob;
181
182         /* prepare to add */
183         group = job->group;
184         job->next = NULL;
185
186         /* search end and blockers */
187         pjob = &first_job;
188         ijob = first_job;
189         while (ijob) {
190                 if (group && ijob->group == group)
191                         job->blocked = 1;
192                 pjob = &ijob->next;
193                 ijob = ijob->next;
194         }
195
196         /* queue the jobs */
197         *pjob = job;
198         remains--;
199 }
200
201 /**
202  * Get the next job to process or NULL if none.
203  * @return the first job that isn't blocked or NULL
204  */
205 static inline struct job *job_get()
206 {
207         struct job *job = first_job;
208         while (job && job->blocked)
209                 job = job->next;
210         if (job)
211                 remains++;
212         return job;
213 }
214
215 /**
216  * Releases the processed 'job': removes it
217  * from the list of jobs and unblock the first
218  * pending job of the same group if any.
219  * @param job the job to release
220  */
221 static inline void job_release(struct job *job)
222 {
223         struct job *ijob, **pjob;
224         const void *group;
225
226         /* first unqueue the job */
227         pjob = &first_job;
228         ijob = first_job;
229         while (ijob != job) {
230                 pjob = &ijob->next;
231                 ijob = ijob->next;
232         }
233         *pjob = job->next;
234
235         /* then unblock jobs of the same group */
236         group = job->group;
237         if (group) {
238                 ijob = job->next;
239                 while (ijob && ijob->group != group)
240                         ijob = ijob->next;
241                 if (ijob)
242                         ijob->blocked = 0;
243         }
244
245         /* recycle the job */
246         job->next = free_jobs;
247         free_jobs = job;
248 }
249
250 /**
251  * Monitored cancel callback for a job.
252  * This function is called by the monitor
253  * to cancel the job when the safe environment
254  * is set.
255  * @param signum 0 on normal flow or the number
256  *               of the signal that interrupted the normal
257  *               flow, isn't used
258  * @param arg    the job to run
259  */
260 static void job_cancel(int signum, void *arg)
261 {
262         struct job *job = arg;
263         job->callback(SIGABRT, job->arg);
264 }
265
266 /**
267  * Monitored normal callback for events.
268  * This function is called by the monitor
269  * to run the event loop when the safe environment
270  * is set.
271  * @param signum 0 on normal flow or the number
272  *               of the signal that interrupted the normal
273  *               flow
274  * @param arg     the events to run
275  */
276 static void evloop_run(int signum, void *arg)
277 {
278         int rc;
279         struct sd_event *se;
280
281         if (!signum) {
282                 se = evloop.sdev;
283                 rc = sd_event_prepare(se);
284                 if (rc < 0) {
285                         errno = -rc;
286                         CRITICAL("sd_event_prepare returned an error (state: %d): %m", sd_event_get_state(se));
287                         abort();
288                 } else {
289                         if (rc == 0) {
290                                 rc = sd_event_wait(se, (uint64_t)(int64_t)-1);
291                                 if (rc < 0) {
292                                         errno = -rc;
293                                         ERROR("sd_event_wait returned an error (state: %d): %m", sd_event_get_state(se));
294                                 }
295                         }
296                         evloop.state = EVLOOP_STATE_RUN;
297                         if (rc > 0) {
298                                 rc = sd_event_dispatch(se);
299                                 if (rc < 0) {
300                                         errno = -rc;
301                                         ERROR("sd_event_dispatch returned an error (state: %d): %m", sd_event_get_state(se));
302                                 }
303                         }
304                 }
305         }
306 }
307
308 /**
309  * Internal callback for evloop management.
310  * The effect of this function is hidden: it exits
311  * the waiting poll if any.
312  */
313 static void evloop_on_efd_event()
314 {
315         uint64_t x;
316         read(evloop.efd, &x, sizeof x);
317 }
318
319 /**
320  * wakeup the event loop if needed by sending
321  * an event.
322  */
323 static void evloop_wakeup()
324 {
325         uint64_t x;
326
327         if (evloop.state & EVLOOP_STATE_WAIT) {
328                 x = 1;
329                 write(evloop.efd, &x, sizeof x);
330         }
331 }
332
333 /**
334  * Release the currently held event loop
335  */
336 static void evloop_release()
337 {
338         struct thread *nh, *ct = current_thread;
339
340         if (ct && evloop.holder == ct) {
341                 nh = ct->nholder;
342                 evloop.holder = nh;
343                 if (nh)
344                         pthread_cond_signal(nh->cwhold);
345         }
346 }
347
348 /**
349  * get the eventloop for the current thread
350  */
351 static int evloop_get()
352 {
353         struct thread *ct = current_thread;
354
355         if (evloop.holder)
356                 return evloop.holder == ct;
357
358         if (!evloop.sdev)
359                 return 0;
360
361         ct->nholder = NULL;
362         evloop.holder = ct;
363         return 1;
364 }
365
366 /**
367  * acquire the eventloop for the current thread
368  */
369 static void evloop_acquire()
370 {
371         struct thread **pwait, *ct;
372         pthread_cond_t cond;
373
374         /* try to get the evloop */
375         if (!evloop_get()) {
376                 /* failed, init waiting state */
377                 ct = current_thread;
378                 ct->nholder = NULL;
379                 ct->cwhold = &cond;
380                 pthread_cond_init(&cond, NULL);
381
382                 /* queue current thread in holder list */
383                 pwait = &evloop.holder;
384                 while (*pwait)
385                         pwait = &(*pwait)->nholder;
386                 *pwait = ct;
387
388                 /* wake up the evloop */
389                 evloop_wakeup();
390
391                 /* wait to acquire the evloop */
392                 pthread_cond_wait(&cond, &mutex);
393                 pthread_cond_destroy(&cond);
394         }
395 }
396
397 /**
398  * Enter the thread
399  * @param me the description of the thread to enter
400  */
401 static void thread_enter(volatile struct thread *me)
402 {
403         evloop_release();
404         /* initialize description of itself and link it in the list */
405         me->tid = pthread_self();
406         me->stop = 0;
407         me->waits = 0;
408         me->upper = current_thread;
409         me->next = threads;
410         threads = (struct thread*)me;
411         current_thread = (struct thread*)me;
412 }
413
414 /**
415  * leave the thread
416  * @param me the description of the thread to leave
417  */
418 static void thread_leave()
419 {
420         struct thread **prv, *me;
421
422         /* unlink the current thread and cleanup */
423         me = current_thread;
424         prv = &threads;
425         while (*prv != me)
426                 prv = &(*prv)->next;
427         *prv = me->next;
428
429         current_thread = me->upper;
430 }
431
432 /**
433  * Main processing loop of internal threads with processing jobs.
434  * The loop must be called with the mutex locked
435  * and it returns with the mutex locked.
436  * @param me the description of the thread to use
437  * TODO: how are timeout handled when reentering?
438  */
439 static void thread_run_internal(volatile struct thread *me)
440 {
441         struct job *job;
442
443         /* enter thread */
444         thread_enter(me);
445
446         /* loop until stopped */
447         while (!me->stop) {
448                 /* release the current event loop */
449                 evloop_release();
450
451                 /* get a job */
452                 job = job_get();
453                 if (job) {
454                         /* prepare running the job */
455                         job->blocked = 1; /* mark job as blocked */
456                         me->job = job; /* record the job (only for terminate) */
457
458                         /* run the job */
459                         pthread_mutex_unlock(&mutex);
460                         sig_monitor(job->timeout, job->callback, job->arg);
461                         pthread_mutex_lock(&mutex);
462
463                         /* release the run job */
464                         job_release(job);
465                 /* no job, check event loop wait */
466                 } else if (evloop_get()) {
467                         if (evloop.state != 0) {
468                                 /* busy ? */
469                                 CRITICAL("Can't enter dispatch while in dispatch!");
470                                 abort();
471                         }
472                         /* run the events */
473                         evloop.state = EVLOOP_STATE_RUN|EVLOOP_STATE_WAIT;
474                         pthread_mutex_unlock(&mutex);
475                         sig_monitor(0, evloop_run, NULL);
476                         pthread_mutex_lock(&mutex);
477                         evloop.state = 0;
478                 } else {
479                         /* no job and no event loop */
480                         running--;
481                         if (!running)
482                                 ERROR("Entering job deep sleep! Check your bindings.");
483                         me->waits = 1;
484                         pthread_cond_wait(&cond, &mutex);
485                         me->waits = 0;
486                         running++;
487                 }
488         }
489         /* cleanup */
490         evloop_release();
491         thread_leave();
492 }
493
494 /**
495  * Main processing loop of external threads.
496  * The loop must be called with the mutex locked
497  * and it returns with the mutex locked.
498  * @param me the description of the thread to use
499  */
500 static void thread_run_external(volatile struct thread *me)
501 {
502         /* enter thread */
503         thread_enter(me);
504
505         /* loop until stopped */
506         me->waits = 1;
507         while (!me->stop)
508                 pthread_cond_wait(&cond, &mutex);
509         me->waits = 0;
510         thread_leave();
511 }
512
513 /**
514  * Root for created threads.
515  */
516 static void thread_main()
517 {
518         struct thread me;
519
520         running++;
521         started++;
522         sig_monitor_init_timeouts();
523         thread_run_internal(&me);
524         sig_monitor_clean_timeouts();
525         started--;
526         running--;
527 }
528
529 /**
530  * Entry point for created threads.
531  * @param data not used
532  * @return NULL
533  */
534 static void *thread_starter(void *data)
535 {
536         pthread_mutex_lock(&mutex);
537         thread_main();
538         pthread_mutex_unlock(&mutex);
539         return NULL;
540 }
541
542 /**
543  * Starts a new thread
544  * @return 0 in case of success or -1 in case of error
545  */
546 static int start_one_thread()
547 {
548         pthread_t tid;
549         int rc;
550
551         rc = pthread_create(&tid, NULL, thread_starter, NULL);
552         if (rc != 0) {
553                 /* errno = rc; */
554                 WARNING("not able to start thread: %m");
555                 rc = -1;
556         }
557         return rc;
558 }
559
560 /**
561  * Queues a new asynchronous job represented by 'callback' and 'arg'
562  * for the 'group' and the 'timeout'.
563  * Jobs are queued FIFO and are possibly executed in parallel
564  * concurrently except for job of the same group that are
565  * executed sequentially in FIFO order.
566  * @param group    The group of the job or NULL when no group.
567  * @param timeout  The maximum execution time in seconds of the job
568  *                 or 0 for unlimited time.
569  * @param callback The function to execute for achieving the job.
570  *                 Its first parameter is either 0 on normal flow
571  *                 or the signal number that broke the normal flow.
572  *                 The remaining parameter is the parameter 'arg1'
573  *                 given here.
574  * @param arg      The second argument for 'callback'
575  * @return 0 in case of success or -1 in case of error
576  */
577 int jobs_queue(
578                 const void *group,
579                 int timeout,
580                 void (*callback)(int, void*),
581                 void *arg)
582 {
583         struct job *job;
584         int rc;
585
586         pthread_mutex_lock(&mutex);
587
588         /* allocates the job */
589         job = job_create(group, timeout, callback, arg);
590         if (!job)
591                 goto error;
592
593         /* check availability */
594         if (remains <= 0) {
595                 ERROR("can't process job with threads: too many jobs");
596                 errno = EBUSY;
597                 goto error2;
598         }
599
600         /* start a thread if needed */
601         if (running == started && started < allowed) {
602                 /* all threads are busy and a new can be started */
603                 rc = start_one_thread();
604                 if (rc < 0 && started == 0) {
605                         ERROR("can't start initial thread: %m");
606                         goto error2;
607                 }
608         }
609
610         /* queues the job */
611         job_add(job);
612
613         /* signal an existing job */
614         pthread_cond_signal(&cond);
615         pthread_mutex_unlock(&mutex);
616         return 0;
617
618 error2:
619         job->next = free_jobs;
620         free_jobs = job;
621 error:
622         pthread_mutex_unlock(&mutex);
623         return -1;
624 }
625
626 /**
627  * Internal helper function for 'jobs_enter'.
628  * @see jobs_enter, jobs_leave
629  */
630 static void enter_cb(int signum, void *closure)
631 {
632         struct sync *sync = closure;
633         sync->enter(signum, sync->arg, (void*)&sync->thread);
634 }
635
636 /**
637  * Internal helper function for 'jobs_call'.
638  * @see jobs_call
639  */
640 static void call_cb(int signum, void *closure)
641 {
642         struct sync *sync = closure;
643         sync->callback(signum, sync->arg);
644         jobs_leave((void*)&sync->thread);
645 }
646
647 /**
648  * Internal helper for synchronous jobs. It enters
649  * a new thread loop for evaluating the given job
650  * as recorded by the couple 'sync_cb' and 'sync'.
651  * @see jobs_call, jobs_enter, jobs_leave
652  */
653 static int do_sync(
654                 const void *group,
655                 int timeout,
656                 void (*sync_cb)(int signum, void *closure),
657                 struct sync *sync
658 )
659 {
660         struct job *job;
661
662         pthread_mutex_lock(&mutex);
663
664         /* allocates the job */
665         job = job_create(group, timeout, sync_cb, sync);
666         if (!job) {
667                 pthread_mutex_unlock(&mutex);
668                 return -1;
669         }
670
671         /* queues the job */
672         job_add(job);
673
674         /* run until stopped */
675         if (current_thread)
676                 thread_run_internal(&sync->thread);
677         else
678                 thread_run_external(&sync->thread);
679         pthread_mutex_unlock(&mutex);
680         return 0;
681 }
682
683 /**
684  * Enter a synchronisation point: activates the job given by 'callback'
685  * and 'closure' using 'group' and 'timeout' to control sequencing and
686  * execution time.
687  * @param group the group for sequencing jobs
688  * @param timeout the time in seconds allocated to the job
689  * @param callback the callback that will handle the job.
690  *                 it receives 3 parameters: 'signum' that will be 0
691  *                 on normal flow or the catched signal number in case
692  *                 of interrupted flow, the context 'closure' as given and
693  *                 a 'jobloop' reference that must be used when the job is
694  *                 terminated to unlock the current execution flow.
695  * @param closure the argument to the callback
696  * @return 0 on success or -1 in case of error
697  */
698 int jobs_enter(
699                 const void *group,
700                 int timeout,
701                 void (*callback)(int signum, void *closure, struct jobloop *jobloop),
702                 void *closure
703 )
704 {
705         struct sync sync;
706
707         sync.enter = callback;
708         sync.arg = closure;
709         return do_sync(group, timeout, enter_cb, &sync);
710 }
711
712 /**
713  * Unlocks the execution flow designed by 'jobloop'.
714  * @param jobloop indication of the flow to unlock
715  * @return 0 in case of success of -1 on error
716  */
717 int jobs_leave(struct jobloop *jobloop)
718 {
719         struct thread *t;
720
721         pthread_mutex_lock(&mutex);
722         t = threads;
723         while (t && t != (struct thread*)jobloop)
724                 t = t->next;
725         if (!t) {
726                 errno = EINVAL;
727         } else {
728                 t->stop = 1;
729                 if (t->waits)
730                         pthread_cond_broadcast(&cond);
731                 else
732                         evloop_wakeup();
733         }
734         pthread_mutex_unlock(&mutex);
735         return -!t;
736 }
737
738 /**
739  * Calls synchronously the job represented by 'callback' and 'arg1'
740  * for the 'group' and the 'timeout' and waits for its completion.
741  * @param group    The group of the job or NULL when no group.
742  * @param timeout  The maximum execution time in seconds of the job
743  *                 or 0 for unlimited time.
744  * @param callback The function to execute for achieving the job.
745  *                 Its first parameter is either 0 on normal flow
746  *                 or the signal number that broke the normal flow.
747  *                 The remaining parameter is the parameter 'arg1'
748  *                 given here.
749  * @param arg      The second argument for 'callback'
750  * @return 0 in case of success or -1 in case of error
751  */
752 int jobs_call(
753                 const void *group,
754                 int timeout,
755                 void (*callback)(int, void*),
756                 void *arg)
757 {
758         struct sync sync;
759
760         sync.callback = callback;
761         sync.arg = arg;
762
763         return do_sync(group, timeout, call_cb, &sync);
764 }
765
766 /**
767  * Internal callback for evloop management.
768  * The effect of this function is hidden: it exits
769  * the waiting poll if any. Then it wakes up a thread
770  * awaiting the evloop using signal.
771  */
772 static int on_evloop_efd(sd_event_source *s, int fd, uint32_t revents, void *userdata)
773 {
774         evloop_on_efd_event();
775         return 1;
776 }
777
778 /**
779  * Gets a sd_event item for the current thread.
780  * @return a sd_event or NULL in case of error
781  */
782 static struct sd_event *get_sd_event_locked()
783 {
784         int rc;
785
786         /* creates the evloop on need */
787         if (!evloop.sdev) {
788                 /* start the creation */
789                 evloop.state = 0;
790                 /* creates the eventfd for waking up polls */
791                 evloop.efd = eventfd(0, EFD_CLOEXEC|EFD_SEMAPHORE);
792                 if (evloop.efd < 0) {
793                         ERROR("can't make eventfd for events");
794                         goto error1;
795                 }
796                 /* create the systemd event loop */
797                 rc = sd_event_new(&evloop.sdev);
798                 if (rc < 0) {
799                         ERROR("can't make new event loop");
800                         goto error2;
801                 }
802                 /* put the eventfd in the event loop */
803                 rc = sd_event_add_io(evloop.sdev, NULL, evloop.efd, EPOLLIN, on_evloop_efd, NULL);
804                 if (rc < 0) {
805                         ERROR("can't register eventfd");
806                         sd_event_unref(evloop.sdev);
807                         evloop.sdev = NULL;
808 error2:
809                         close(evloop.efd);
810 error1:
811                         return NULL;
812                 }
813         }
814
815         /* acquire the event loop */
816         evloop_acquire();
817
818         return evloop.sdev;
819 }
820
821 /**
822  * Gets a sd_event item for the current thread.
823  * @return a sd_event or NULL in case of error
824  */
825 struct sd_event *jobs_get_sd_event()
826 {
827         struct sd_event *result;
828         struct thread lt;
829
830         /* ensure an existing thread environment */
831         if (!current_thread) {
832                 memset(&lt, 0, sizeof lt);
833                 current_thread = &lt;
834         }
835
836         /* process */
837         pthread_mutex_lock(&mutex);
838         result = get_sd_event_locked();
839         pthread_mutex_unlock(&mutex);
840
841         /* release the faked thread environment if needed */
842         if (current_thread == &lt) {
843                 /*
844                  * Releasing it is needed because there is no way to guess
845                  * when it has to be released really. But here is where it is
846                  * hazardous: if the caller modifies the eventloop when it
847                  * is waiting, there is no way to make the change effective.
848                  * A workaround to achieve that goal is for the caller to
849                  * require the event loop a second time after having modified it.
850                  */
851                 NOTICE("Requiring sd_event loop out of binder callbacks is hazardous!");
852                 if (verbose_wants(Log_Level_Info))
853                         sig_monitor_dumpstack();
854                 evloop_release();
855                 current_thread = NULL;
856         }
857
858         return result;
859 }
860
861 /**
862  * Enter the jobs processing loop.
863  * @param allowed_count Maximum count of thread for jobs including this one
864  * @param start_count   Count of thread to start now, must be lower.
865  * @param waiter_count  Maximum count of jobs that can be waiting.
866  * @param start         The start routine to activate (can't be NULL)
867  * @return 0 in case of success or -1 in case of error.
868  */
869 int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)(int signum, void* arg), void *arg)
870 {
871         int rc, launched;
872         struct job *job;
873
874         assert(allowed_count >= 1);
875         assert(start_count >= 0);
876         assert(waiter_count > 0);
877         assert(start_count <= allowed_count);
878
879         rc = -1;
880         pthread_mutex_lock(&mutex);
881
882         /* check whether already running */
883         if (current_thread || allowed) {
884                 ERROR("thread already started");
885                 errno = EINVAL;
886                 goto error;
887         }
888
889         /* records the allowed count */
890         allowed = allowed_count;
891         started = 0;
892         running = 0;
893         remains = waiter_count;
894
895 #if HAS_WATCHDOG
896         /* set the watchdog */
897         if (sd_watchdog_enabled(0, NULL))
898                 sd_event_set_watchdog(get_sd_event_locked(), 1);
899 #endif
900
901         /* start at least one thread: the current one */
902         launched = 1;
903         while (launched < start_count) {
904                 if (start_one_thread() != 0) {
905                         ERROR("Not all threads can be started");
906                         goto error;
907                 }
908                 launched++;
909         }
910
911         /* queue the start job */
912         job = job_create(NULL, 0, start, arg);
913         if (!job)
914                 goto error;
915         job_add(job);
916
917         /* run until end */
918         thread_main();
919         rc = 0;
920 error:
921         pthread_mutex_unlock(&mutex);
922         return rc;
923 }
924
925 /**
926  * Terminate all the threads and cancel all pending jobs.
927  */
928 void jobs_terminate()
929 {
930         struct job *job, *head, *tail;
931         pthread_t me, *others;
932         struct thread *t;
933         int count;
934
935         /* how am i? */
936         me = pthread_self();
937
938         /* request all threads to stop */
939         pthread_mutex_lock(&mutex);
940         allowed = 0;
941
942         /* count the number of threads */
943         count = 0;
944         t = threads;
945         while (t) {
946                 if (!t->upper && !pthread_equal(t->tid, me))
947                         count++;
948                 t = t->next;
949         }
950
951         /* fill the array of threads */
952         others = alloca(count * sizeof *others);
953         count = 0;
954         t = threads;
955         while (t) {
956                 if (!t->upper && !pthread_equal(t->tid, me))
957                         others[count++] = t->tid;
958                 t = t->next;
959         }
960
961         /* stops the threads */
962         t = threads;
963         while (t) {
964                 t->stop = 1;
965                 t = t->next;
966         }
967
968         /* wait the threads */
969         pthread_cond_broadcast(&cond);
970         pthread_mutex_unlock(&mutex);
971         while (count)
972                 pthread_join(others[--count], NULL);
973         pthread_mutex_lock(&mutex);
974
975         /* cancel pending jobs of other threads */
976         remains = 0;
977         head = first_job;
978         first_job = NULL;
979         tail = NULL;
980         while (head) {
981                 /* unlink the job */
982                 job = head;
983                 head = job->next;
984
985                 /* search if job is stacked for current */
986                 t = current_thread;
987                 while (t && t->job != job)
988                         t = t->upper;
989                 if (t) {
990                         /* yes, relink it at end */
991                         if (tail)
992                                 tail->next = job;
993                         else
994                                 first_job = job;
995                         tail = job;
996                         job->next = NULL;
997                 } else {
998                         /* no cancel the job */
999                         pthread_mutex_unlock(&mutex);
1000                         sig_monitor(0, job_cancel, job);
1001                         free(job);
1002                         pthread_mutex_lock(&mutex);
1003                 }
1004         }
1005         pthread_mutex_unlock(&mutex);
1006 }
1007