Overall integration of job initialisation
[src/app-framework-binder.git] / src / jobs.c
1 /*
2  * Copyright (C) 2016, 2017 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #include <stdlib.h>
21 #include <unistd.h>
22 #include <signal.h>
23 #include <time.h>
24 #include <sys/syscall.h>
25 #include <pthread.h>
26 #include <errno.h>
27 #include <assert.h>
28
29 #include <systemd/sd-event.h>
30
31 #include "jobs.h"
32 #include "sig-monitor.h"
33 #include "verbose.h"
34
35 #if 0
36 #define _alert_ "do you really want to remove monitoring?"
37 #define sig_monitor_init_timeouts()  ((void)0)
38 #define sig_monitor_clean_timeouts() ((void)0)
39 #define sig_monitor(to,cb,arg)       (cb(0,arg))
40 #endif
41
42 /** Internal shortcut for callback */
43 typedef void (*job_cb_t)(int, void*, void *, void*);
44
45 /** Description of a pending job */
46 struct job
47 {
48         struct job *next;    /**< link to the next job enqueued */
49         void *group;         /**< group of the request */
50         job_cb_t callback;   /**< processing callback */
51         void *arg1;          /**< first arg */
52         void *arg2;          /**< second arg */
53         void *arg3;          /**< third arg */
54         int timeout;         /**< timeout in second for processing the request */
55         unsigned blocked: 1; /**< is an other request blocking this one ? */
56         unsigned dropped: 1; /**< is removed ? */
57 };
58
59 /** Description of handled event loops */
60 struct events
61 {
62         struct events *next;
63         struct sd_event *event;
64         unsigned runs: 1;
65 };
66
67 /** Description of threads */
68 struct thread
69 {
70         struct thread *next;   /**< next thread of the list */
71         struct thread *upper;  /**< upper same thread */
72         struct job *job;       /**< currently processed job */
73         struct events *events; /**< currently processed job */
74         pthread_t tid;         /**< the thread id */
75         unsigned stop: 1;      /**< stop requested */
76         unsigned lowered: 1;   /**< has a lower same thread */
77         unsigned waits: 1;     /**< is waiting? */
78 };
79
80 /* synchronisation of threads */
81 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
82 static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
83
84 /* count allowed, started and waiting threads */
85 static int allowed = 0; /** allowed count of threads */
86 static int started = 0; /** started count of threads */
87 static int waiting = 0; /** waiting count of threads */
88 static int remains = 0; /** allowed count of waiting jobs */
89 static int nevents = 0; /** count of events */
90
91 /* list of threads */
92 static struct thread *threads;
93 static _Thread_local struct thread *current;
94
95 /* queue of pending jobs */
96 static struct job *first_job;
97 static struct events *first_events;
98 static struct job *free_jobs;
99
100 /**
101  * Create a new job with the given parameters
102  * @param group    the group of the job
103  * @param timeout  the timeout of the job (0 if none)
104  * @param callback the function that achieves the job
105  * @param arg1     the first argument of the callback
106  * @param arg2     the second argument of the callback
107  * @param arg3     the third argument of the callback
108  * @return the created job unblock or NULL when no more memory
109  */
110 static struct job *job_create(
111                 void *group,
112                 int timeout,
113                 job_cb_t callback,
114                 void *arg1,
115                 void *arg2,
116                 void *arg3)
117 {
118         struct job *job;
119
120         /* try recyle existing job */
121         job = free_jobs;
122         if (job)
123                 free_jobs = job->next;
124         else {
125                 /* allocation  without blocking */
126                 pthread_mutex_unlock(&mutex);
127                 job = malloc(sizeof *job);
128                 pthread_mutex_lock(&mutex);
129                 if (!job) {
130                         errno = -ENOMEM;
131                         goto end;
132                 }
133         }
134         /* initialises the job */
135         job->group = group;
136         job->timeout = timeout;
137         job->callback = callback;
138         job->arg1 = arg1;
139         job->arg2 = arg2;
140         job->arg3 = arg3;
141         job->blocked = 0;
142         job->dropped = 0;
143 end:
144         return job;
145 }
146
147 /**
148  * Adds 'job1' and 'job2' at the end of the list of jobs, marking it
149  * as blocked if an other job with the same group is pending.
150  * @param job1 the first job to add
151  * @param job2 the second job to add or NULL
152  */
153 static void job_add2(struct job *job1, struct job *job2)
154 {
155         void *group1, *group2, *group;
156         struct job *ijob, **pjob;
157
158         /* prepare to add */
159         group1 = job1->group;
160         job1->next = job2;
161         if (!job2)
162                 group2 = NULL;
163         else {
164                 job2->next = NULL;
165                 group2 = job2->group;
166                 if (group2 && group2 == group1)
167                         job2->blocked = 1;
168         }
169
170         /* search end and blockers */
171         pjob = &first_job;
172         ijob = first_job;
173         while (ijob) {
174                 group = ijob->group;
175                 if (group) {
176                         if (group == group1)
177                                 job1->blocked = 1;
178                         if (group == group2)
179                                 job2->blocked = 1;
180                 }
181                 pjob = &ijob->next;
182                 ijob = ijob->next;
183         }
184
185         /* queue the jobs */
186         *pjob = job1;
187 }
188
189 /**
190  * Get the next job to process or NULL if none.
191  * @return the first job that isn't blocked or NULL
192  */
193 static inline struct job *job_get()
194 {
195         struct job *job = first_job;
196         while (job && job->blocked)
197                 job = job->next;
198         return job;
199 }
200
201 /**
202  * Get the next events to process or NULL if none.
203  * @return the first events that isn't running or NULL
204  */
205 static inline struct events *events_get()
206 {
207         struct events *events = first_events;
208         while (events && events->runs)
209                 events = events->next;
210         return events;
211 }
212
213 /**
214  * Releases the processed 'job': removes it
215  * from the list of jobs and unblock the first
216  * pending job of the same group if any.
217  * @param job the job to release
218  */
219 static inline void job_release(struct job *job)
220 {
221         struct job *ijob, **pjob;
222         void *group;
223
224         /* first unqueue the job */
225         pjob = &first_job;
226         ijob = first_job;
227         while (ijob != job) {
228                 pjob = &ijob->next;
229                 ijob = ijob->next;
230         }
231         *pjob = job->next;
232
233         /* then unblock jobs of the same group */
234         group = job->group;
235         if (group) {
236                 ijob = job->next;
237                 while (ijob && ijob->group != group)
238                         ijob = ijob->next;
239                 if (ijob)
240                         ijob->blocked = 0;
241         }
242
243         /* recycle the job */
244         job->next = free_jobs;
245         free_jobs = job;
246 }
247
248 /**
249  * Monitored normal callback for a job.
250  * This function is called by the monitor
251  * to run the job when the safe environment
252  * is set.
253  * @param signum 0 on normal flow or the number
254  *               of the signal that interrupted the normal
255  *               flow
256  * @param arg     the job to run
257  */
258 static void job_call(int signum, void *arg)
259 {
260         struct job *job = arg;
261         job->callback(signum, job->arg1, job->arg2, job->arg3);
262 }
263
264 /**
265  * Monitored cancel callback for a job.
266  * This function is called by the monitor
267  * to cancel the job when the safe environment
268  * is set.
269  * @param signum 0 on normal flow or the number
270  *               of the signal that interrupted the normal
271  *               flow, isn't used
272  * @param arg    the job to run
273  */
274 static void job_cancel(int signum, void *arg)
275 {
276         job_call(SIGABRT, arg);
277 }
278
279 /**
280  * Monitored normal callback for events.
281  * This function is called by the monitor
282  * to run the event loop when the safe environment
283  * is set.
284  * @param signum 0 on normal flow or the number
285  *               of the signal that interrupted the normal
286  *               flow
287  * @param arg     the events to run
288  */
289 static void events_call(int signum, void *arg)
290 {
291         struct events *events = arg;
292         if (!signum)
293                 sd_event_run(events->event, (uint64_t) -1);
294 }
295
296 /**
297  * Main processing loop of threads processing jobs.
298  * The loop must be called with the mutex locked
299  * and it returns with the mutex locked.
300  * @param me the description of the thread to use
301  * TODO: how are timeout handled when reentering?
302  */
303 static void thread_run(volatile struct thread *me)
304 {
305         struct thread **prv;
306         struct job *job;
307         struct events *events;
308
309         /* initialize description of itself and link it in the list */
310         me->tid = pthread_self();
311         me->stop = 0;
312         me->lowered = 0;
313         me->waits = 0;
314         me->upper = current;
315         if (current)
316                 current->lowered = 1;
317         else
318                 sig_monitor_init_timeouts();
319         current = (struct thread*)me;
320         me->next = threads;
321         threads = (struct thread*)me;
322         started++;
323
324         /* loop until stopped */
325         me->events = NULL;
326         while (!me->stop) {
327                 /* get a job */
328                 job = job_get(first_job);
329                 if (job) {
330                         /* prepare running the job */
331                         remains++; /* increases count of job that can wait */
332                         job->blocked = 1; /* mark job as blocked */
333                         me->job = job; /* record the job (only for terminate) */
334
335                         /* run the job */
336                         pthread_mutex_unlock(&mutex);
337                         sig_monitor(job->timeout, job_call, job);
338                         pthread_mutex_lock(&mutex);
339
340                         /* release the run job */
341                         job_release(job);
342
343                         /* release event if any */
344                         events = me->events;
345                         if (events) {
346                                 events->runs = 0;
347                                 me->events = NULL;
348                         }
349                 } else {
350                         /* no job, check events */
351                         events = events_get();
352                         if (events) {
353                                 /* run the events */
354                                 events->runs = 1;
355                                 me->events = events;
356                                 pthread_mutex_unlock(&mutex);
357                                 sig_monitor(0, events_call, events);
358                                 pthread_mutex_lock(&mutex);
359                                 events->runs = 0;
360                                 me->events = NULL;
361                         } else {
362                                 /* no job and not events */
363                                 waiting++;
364                                 me->waits = 1;
365                                 pthread_cond_wait(&cond, &mutex);
366                                 me->waits = 0;
367                                 waiting--;
368                         }
369                 }
370         }
371
372         /* unlink the current thread and cleanup */
373         started--;
374         prv = &threads;
375         while (*prv != me)
376                 prv = &(*prv)->next;
377         *prv = me->next;
378         current = me->upper;
379         if (current)
380                 current->lowered = 0;
381         else
382                 sig_monitor_clean_timeouts();
383 }
384
385 /**
386  * Entry point for created threads.
387  * @param data not used
388  * @return NULL
389  */
390 static void *thread_main(void *data)
391 {
392         struct thread me;
393
394         pthread_mutex_lock(&mutex);
395         thread_run(&me);
396         pthread_mutex_unlock(&mutex);
397         return NULL;
398 }
399
400 /**
401  * Starts a new thread
402  * @return 0 in case of success or -1 in case of error
403  */
404 static int start_one_thread()
405 {
406         pthread_t tid;
407         int rc;
408
409         rc = pthread_create(&tid, NULL, thread_main, NULL);
410         if (rc != 0) {
411                 /* errno = rc; */
412                 WARNING("not able to start thread: %m");
413                 rc = -1;
414         }
415         return rc;
416 }
417
418 /**
419  * Queues a new asynchronous job represented by 'callback'
420  * for the 'group' and the 'timeout'.
421  * Jobs are queued FIFO and are possibly executed in parallel
422  * concurrently except for job of the same group that are
423  * executed sequentially in FIFO order.
424  * @param group    The group of the job or NULL when no group.
425  * @param timeout  The maximum execution time in seconds of the job
426  *                 or 0 for unlimited time.
427  * @param callback The function to execute for achieving the job.
428  *                 Its first parameter is either 0 on normal flow
429  *                 or the signal number that broke the normal flow.
430  * @return 0 in case of success or -1 in case of error
431  */
432 int jobs_queue0(
433                 void *group,
434                 int timeout,
435                 void (*callback)(int signum))
436 {
437         return jobs_queue3(group, timeout, (job_cb_t)callback, NULL, NULL, NULL);
438 }
439
440 /**
441  * Queues a new asynchronous job represented by 'callback' and 'arg1'
442  * for the 'group' and the 'timeout'.
443  * Jobs are queued FIFO and are possibly executed in parallel
444  * concurrently except for job of the same group that are
445  * executed sequentially in FIFO order.
446  * @param group    The group of the job or NULL when no group.
447  * @param timeout  The maximum execution time in seconds of the job
448  *                 or 0 for unlimited time.
449  * @param callback The function to execute for achieving the job.
450  *                 Its first parameter is either 0 on normal flow
451  *                 or the signal number that broke the normal flow.
452  *                 The remaining parameter is the parameter 'arg1'
453  *                 given here.
454  * @param arg1     The second argument for 'callback'
455  * @return 0 in case of success or -1 in case of error
456  */
457 int jobs_queue(
458                 void *group,
459                 int timeout,
460                 void (*callback)(int, void*),
461                 void *arg)
462 {
463         return jobs_queue3(group, timeout, (job_cb_t)callback, arg, NULL, NULL);
464 }
465
466 /**
467  * Queues a new asynchronous job represented by 'callback' and 'arg[12]'
468  * for the 'group' and the 'timeout'.
469  * Jobs are queued FIFO and are possibly executed in parallel
470  * concurrently except for job of the same group that are
471  * executed sequentially in FIFO order.
472  * @param group    The group of the job or NULL when no group.
473  * @param timeout  The maximum execution time in seconds of the job
474  *                 or 0 for unlimited time.
475  * @param callback The function to execute for achieving the job.
476  *                 Its first parameter is either 0 on normal flow
477  *                 or the signal number that broke the normal flow.
478  *                 The remaining parameters are the parameters 'arg[12]'
479  *                 given here.
480  * @param arg1     The second argument for 'callback'
481  * @param arg2     The third argument for 'callback'
482  * @return 0 in case of success or -1 in case of error
483  */
484 int jobs_queue2(
485                 void *group,
486                 int timeout,
487                 void (*callback)(int, void*, void*),
488                 void *arg1,
489                 void *arg2)
490 {
491         return jobs_queue3(group, timeout, (job_cb_t)callback, arg1, arg2, NULL);
492 }
493
494 /**
495  * Queues a new asynchronous job represented by 'callback' and 'arg[123]'
496  * for the 'group' and the 'timeout'.
497  * Jobs are queued FIFO and are possibly executed in parallel
498  * concurrently except for job of the same group that are
499  * executed sequentially in FIFO order.
500  * @param group    The group of the job or NULL when no group.
501  * @param timeout  The maximum execution time in seconds of the job
502  *                 or 0 for unlimited time.
503  * @param callback The function to execute for achieving the job.
504  *                 Its first parameter is either 0 on normal flow
505  *                 or the signal number that broke the normal flow.
506  *                 The remaining parameters are the parameters 'arg[123]'
507  *                 given here.
508  * @param arg1     The second argument for 'callback'
509  * @param arg2     The third argument for 'callback'
510  * @param arg3     The forth argument for 'callback'
511  * @return 0 in case of success or -1 in case of error
512  */
513 int jobs_queue3(
514                 void *group,
515                 int timeout,
516                 void (*callback)(int, void*, void *, void*),
517                 void *arg1,
518                 void *arg2,
519                 void *arg3)
520 {
521         const char *info;
522         struct job *job;
523         int rc;
524
525         pthread_mutex_lock(&mutex);
526
527         /* allocates the job */
528         job = job_create(group, timeout, callback, arg1, arg2, arg3);
529         if (!job) {
530                 errno = ENOMEM;
531                 info = "out of memory";
532                 goto error;
533         }
534
535         /* check availability */
536         if (remains == 0) {
537                 errno = EBUSY;
538                 info = "too many jobs";
539                 goto error2;
540         }
541
542         /* start a thread if needed */
543         if (waiting == 0 && started < allowed) {
544                 /* all threads are busy and a new can be started */
545                 rc = start_one_thread();
546                 if (rc < 0 && started == 0) {
547                         info = "can't start first thread";
548                         goto error2;
549                 }
550         }
551
552         /* queues the job */
553         remains--;
554         job_add2(job, NULL);
555
556         /* signal an existing job */
557         pthread_cond_signal(&cond);
558         pthread_mutex_unlock(&mutex);
559         return 0;
560
561 error2:
562         job->next = free_jobs;
563         free_jobs = job;
564 error:
565         ERROR("can't process job with threads: %s, %m", info);
566         pthread_mutex_unlock(&mutex);
567         return -1;
568 }
569
570 /**
571  * Run a asynchronous job represented by 'callback'
572  * with the 'timeout' but only returns after job completion.
573  * @param timeout  The maximum execution time in seconds of the job
574  *                 or 0 for unlimited time.
575  * @param callback The function to execute for achieving the job.
576  *                 Its first parameter is either 0 on normal flow
577  *                 or the signal number that broke the normal flow.
578  * @return 0 in case of success or -1 in case of error
579  */
580 int jobs_invoke0(
581                 int timeout,
582                 void (*callback)(int signum))
583 {
584         return jobs_invoke3(timeout, (job_cb_t)callback, NULL, NULL, NULL);
585 }
586
587 /**
588  * Run a asynchronous job represented by 'callback' and 'arg1'
589  * with the 'timeout' but only returns after job completion.
590  * @param timeout  The maximum execution time in seconds of the job
591  *                 or 0 for unlimited time.
592  * @param callback The function to execute for achieving the job.
593  *                 Its first parameter is either 0 on normal flow
594  *                 or the signal number that broke the normal flow.
595  *                 The remaining parameter is the parameter 'arg1'
596  *                 given here.
597  * @param arg1     The second argument for 'callback'
598  * @return 0 in case of success or -1 in case of error
599  */
600 int jobs_invoke(
601                 int timeout,
602                 void (*callback)(int, void*),
603                 void *arg)
604 {
605         return jobs_invoke3(timeout, (job_cb_t)callback, arg, NULL, NULL);
606 }
607
608 /**
609  * Run a asynchronous job represented by 'callback' and 'arg[12]'
610  * with the 'timeout' but only returns after job completion.
611  * @param timeout  The maximum execution time in seconds of the job
612  *                 or 0 for unlimited time.
613  * @param callback The function to execute for achieving the job.
614  *                 Its first parameter is either 0 on normal flow
615  *                 or the signal number that broke the normal flow.
616  *                 The remaining parameters are the parameters 'arg[12]'
617  *                 given here.
618  * @param arg1     The second argument for 'callback'
619  * @param arg2     The third argument for 'callback'
620  * @return 0 in case of success or -1 in case of error
621  */
622 int jobs_invoke2(
623                 int timeout,
624                 void (*callback)(int, void*, void*),
625                 void *arg1,
626                 void *arg2)
627 {
628         return jobs_invoke3(timeout, (job_cb_t)callback, arg1, arg2, NULL);
629 }
630
631 /**
632  * Stops the thread pointed by 'arg1'. Used with
633  * invoke familly to return to the caller after completion.
634  * @param signum Unused
635  * @param arg1   The thread to stop
636  * @param arg2   Unused
637  * @param arg3   Unused
638  */
639 static void unlock_invoker(int signum, void *arg1, void *arg2, void *arg3)
640 {
641         struct thread *t = arg1;
642         pthread_mutex_lock(&mutex);
643         t->stop = 1;
644         if (t->waits)
645                 pthread_cond_broadcast(&cond);
646         pthread_mutex_unlock(&mutex);
647 }
648
649 /**
650  * Run a asynchronous job represented by 'callback' and 'arg[123]'
651  * with the 'timeout' but only returns after job completion.
652  * @param timeout  The maximum execution time in seconds of the job
653  *                 or 0 for unlimited time.
654  * @param callback The function to execute for achieving the job.
655  *                 Its first parameter is either 0 on normal flow
656  *                 or the signal number that broke the normal flow.
657  *                 The remaining parameters are the parameters 'arg[123]'
658  *                 given here.
659  * @param arg1     The second argument for 'callback'
660  * @param arg2     The third argument for 'callback'
661  * @param arg3     The forth argument for 'callback'
662  * @return 0 in case of success or -1 in case of error
663  */
664 int jobs_invoke3(
665                 int timeout,
666                 void (*callback)(int, void*, void *, void*),
667                 void *arg1,
668                 void *arg2,
669                 void *arg3)
670 {
671         struct job *job1, *job2;
672         struct thread me;
673         
674         pthread_mutex_lock(&mutex);
675
676         /* allocates the job */
677         job1 = job_create(&me, timeout, callback, arg1, arg2, arg3);
678         job2 = job_create(&me, 0, unlock_invoker, &me, NULL, NULL);
679         if (!job1 || !job2) {
680                 ERROR("out of memory");
681                 errno = ENOMEM;
682                 if (job1) {
683                         job1->next = free_jobs;
684                         free_jobs = job1;
685                 }
686                 if (job2) {
687                         job2->next = free_jobs;
688                         free_jobs = job2;
689                 }
690                 pthread_mutex_unlock(&mutex);
691                 return -1;
692         }
693
694         /* queues the job */
695         job_add2(job1, job2);
696
697         /* run until stopped */
698         thread_run(&me);
699         pthread_mutex_unlock(&mutex);
700         return 0;
701 }
702
703 /**
704  * Initialise the job stuff.
705  * @param allowed_count Maximum count of thread for jobs (can be 0,
706  *                      see 'jobs_add_me' for merging new threads)
707  * @param start_count   Count of thread to start now, must be lower.
708  * @param waiter_count  Maximum count of jobs that can be waiting.
709  * @return 0 in case of success or -1 in case of error.
710  */
711 int jobs_init(int allowed_count, int start_count, int waiter_count)
712 {
713         int rc, launched;
714
715         assert(allowed_count >= 0);
716         assert(start_count >= 0);
717         assert(waiter_count > 0);
718         assert(start_count <= allowed_count);
719
720         /* records the allowed count */
721         allowed = allowed_count;
722         started = 0;
723         waiting = 0;
724         remains = waiter_count;
725
726         /* start at least one thread */
727         pthread_mutex_lock(&mutex);
728         launched = 0;
729         while (launched < start_count && start_one_thread() == 0)
730                 launched++;
731         rc = -(launched != start_count);
732         pthread_mutex_unlock(&mutex);
733
734         /* end */
735         if (rc)
736                 ERROR("Not all threads can be started");
737         return rc;
738 }
739
740 /**
741  * Terminate all the threads and cancel all pending jobs.
742  */
743 void jobs_terminate()
744 {
745         struct job *job, *head, *tail;
746         pthread_t me, *others;
747         struct thread *t;
748         int count;
749
750         /* how am i? */
751         me = pthread_self();
752
753         /* request all threads to stop */
754         pthread_mutex_lock(&mutex);
755         allowed = 0;
756
757         /* count the number of threads */
758         count = 0;
759         t = threads;
760         while (t) {
761                 if (!t->upper && !pthread_equal(t->tid, me))
762                         count++;
763                 t = t->next;
764         }
765
766         /* fill the array of threads */
767         others = alloca(count * sizeof *others);
768         count = 0;
769         t = threads;
770         while (t) {
771                 if (!t->upper && !pthread_equal(t->tid, me))
772                         others[count++] = t->tid;
773                 t = t->next;
774         }
775
776         /* stops the threads */
777         t = threads;
778         while (t) {
779                 t->stop = 1;
780                 t = t->next;
781         }
782
783         /* wait the threads */
784         pthread_cond_broadcast(&cond);
785         pthread_mutex_unlock(&mutex);
786         while (count)
787                 pthread_join(others[--count], NULL);
788         pthread_mutex_lock(&mutex);
789
790         /* cancel pending jobs of other threads */
791         remains = 0;
792         head = first_job;
793         first_job = NULL;
794         tail = NULL;
795         while (head) {
796                 /* unlink the job */
797                 job = head;
798                 head = job->next;
799
800                 /* search if job is stacked for current */
801                 t = current;
802                 while (t && t->job != job)
803                         t = t->upper;
804                 if (t) {
805                         /* yes, relink it at end */
806                         if (tail)
807                                 tail->next = job;
808                         else
809                                 first_job = job;
810                         tail = job;
811                         job->next = NULL;
812                 } else {
813                         /* no cancel the job */
814                         pthread_mutex_unlock(&mutex);
815                         sig_monitor(0, job_cancel, job);
816                         free(job);
817                         pthread_mutex_lock(&mutex);
818                 }
819         }
820         pthread_mutex_unlock(&mutex);
821 }
822
823 /**
824  * Adds the current thread to the pool of threads
825  * processing the jobs. Returns normally when the threads are
826  * terminated or immediately with an error if the thread is
827  * already in the pool.
828  * @return 0 in case of success or -1 in case of error
829  */
830 int jobs_add_me()
831 {
832         struct thread me;
833
834         /* check whether already running */
835         if (current) {
836                 ERROR("thread already running");
837                 errno = EINVAL;
838                 return -1;
839         }
840
841         /* allowed... */
842         pthread_mutex_lock(&mutex);
843         allowed++;
844         thread_run(&me);
845         allowed--;
846         pthread_mutex_unlock(&mutex);
847         return 0;
848 }
849
850
851 struct sd_event *jobs_get_sd_event()
852 {
853         struct events *events;
854         struct thread *me;
855         int rc;
856
857         pthread_mutex_lock(&mutex);
858
859         /* search events on stack */
860         me = current;
861         while (me && !me->events)
862                 me = me->upper;
863         if (me)
864                 /* return the stacked events */
865                 events = me->events;
866         else {
867                 /* search an available events */
868                 events = events_get();
869                 if (!events) {
870                         /* not found, check if creation possible */
871                         if (nevents >= allowed) {
872                                 ERROR("not possible to add a new event");
873                                 events = NULL;
874                         } else {
875                                 events = malloc(sizeof *events);
876                                 if (events && (rc = sd_event_new(&events->event)) >= 0) {
877                                         if (nevents < started || start_one_thread() >= 0) {
878                                                 events->runs = 0;
879                                                 events->next = first_events;
880                                                 first_events = events;
881                                         } else {
882                                                 ERROR("can't start thread for events");
883                                                 sd_event_unref(events->event);
884                                                 free(events);
885                                                 events = NULL;
886                                         }
887                                 } else {
888                                         if (!events)
889                                                 ERROR("out of memory");
890                                         else {
891                                                 free(events);
892                                                 ERROR("creation of sd_event failed: %m");
893                                                 events = NULL;
894                                                 errno = -rc;
895                                         } 
896                                 }
897                         }
898                 }
899                 if (events) {
900                         /* */
901                         me = current;
902                         if (me) {
903                                 events->runs = 1;
904                                 me->events = events;
905                         } else {
906                                 WARNING("event returned for unknown thread!");
907                         }
908                 }
909         }
910         pthread_mutex_unlock(&mutex);
911         return events ? events->event : NULL;
912 }
913
914 /**
915  * run the jobs as 
916  * @param allowed_count Maximum count of thread for jobs including this one
917  * @param start_count   Count of thread to start now, must be lower.
918  * @param waiter_count  Maximum count of jobs that can be waiting.
919  * @param start         The start routine to activate (can't be NULL)
920  * @return 0 in case of success or -1 in case of error.
921  */
922 int jobs_enter(int allowed_count, int start_count, int waiter_count, void (*start)())
923 {
924         /* start */
925         if (sig_monitor_init() < 0) {
926                 ERROR("failed to initialise signal handlers");
927                 return -1;
928         }
929
930         /* init job processing */
931         if (jobs_init(allowed_count, start_count, waiter_count) < 0) {
932                 ERROR("failed to initialise threading");
933                 return -1;
934         }
935
936         /* queue the start job */
937         if (jobs_queue0(NULL, 0, (void(*)(int))start) < 0) {
938                 ERROR("failed to start runnning jobs");
939                 return -1;
940         }
941
942         /* turn as processing thread */
943         return jobs_add_me();
944 };