jobs: Separate internal threads from others
[src/app-framework-binder.git] / src / jobs.c
1 /*
2  * Copyright (C) 2016, 2017, 2018 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #if defined(NO_JOBS_WATCHDOG)
21 #   define HAS_WATCHDOG 0
22 #else
23 #   define HAS_WATCHDOG 1
24 #endif
25
26 #include <stdlib.h>
27 #include <stdint.h>
28 #include <unistd.h>
29 #include <signal.h>
30 #include <string.h>
31 #include <time.h>
32 #include <sys/syscall.h>
33 #include <pthread.h>
34 #include <errno.h>
35 #include <assert.h>
36 #include <sys/eventfd.h>
37
38 #include <systemd/sd-event.h>
39 #include "fdev.h"
40 #if HAS_WATCHDOG
41 #include <systemd/sd-daemon.h>
42 #endif
43
44 #include "jobs.h"
45 #include "sig-monitor.h"
46 #include "verbose.h"
47
48 #if defined(REMOVE_SYSTEMD_EVENT)
49 #include "fdev-epoll.h"
50 #endif
51
52 #define EVENT_TIMEOUT_TOP       ((uint64_t)-1)
53 #define EVENT_TIMEOUT_CHILD     ((uint64_t)10000)
54
55 struct thread;
56
57 /** Internal shortcut for callback */
58 typedef void (*job_cb_t)(int, void*);
59
60 /** Description of a pending job */
61 struct job
62 {
63         struct job *next;    /**< link to the next job enqueued */
64         const void *group;   /**< group of the request */
65         job_cb_t callback;   /**< processing callback */
66         void *arg;           /**< argument */
67         int timeout;         /**< timeout in second for processing the request */
68         unsigned blocked: 1; /**< is an other request blocking this one ? */
69         unsigned dropped: 1; /**< is removed ? */
70 };
71
72 /** Description of handled event loops */
73 struct evloop
74 {
75         unsigned state;        /**< encoded state */
76         int efd;               /**< event notification */
77         struct sd_event *sdev; /**< the systemd event loop */
78         struct fdev *fdev;     /**< handling of events */
79         struct thread *holder; /**< holder of the evloop */
80 };
81
82 #define EVLOOP_STATE_WAIT           1U
83 #define EVLOOP_STATE_RUN            2U
84
85 /** Description of threads */
86 struct thread
87 {
88         struct thread *next;   /**< next thread of the list */
89         struct thread *upper;  /**< upper same thread */
90         struct thread *nholder;/**< next holder for evloop */
91         pthread_cond_t *cwhold;/**< condition wait for holding */
92         struct job *job;       /**< currently processed job */
93         pthread_t tid;         /**< the thread id */
94         volatile unsigned stop: 1;      /**< stop requested */
95         volatile unsigned waits: 1;     /**< is waiting? */
96 };
97
98 /**
99  * Description of synchronous callback
100  */
101 struct sync
102 {
103         struct thread thread;   /**< thread loop data */
104         union {
105                 void (*callback)(int, void*);   /**< the synchronous callback */
106                 void (*enter)(int signum, void *closure, struct jobloop *jobloop);
107                                 /**< the entering synchronous routine */
108         };
109         void *arg;              /**< the argument of the callback */
110 };
111
112
113 /* synchronisation of threads */
114 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
115 static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
116
117 /* count allowed, started and running threads */
118 static int allowed = 0; /** allowed count of threads */
119 static int started = 0; /** started count of threads */
120 static int running = 0; /** running count of threads */
121 static int remains = 0; /** allowed count of waiting jobs */
122
123 /* list of threads */
124 static struct thread *threads;
125 static _Thread_local struct thread *current_thread;
126
127 /* queue of pending jobs */
128 static struct job *first_job;
129 static struct job *free_jobs;
130
131 /* event loop */
132 static struct evloop evloop;
133
134 #if defined(REMOVE_SYSTEMD_EVENT)
135 static struct fdev_epoll *fdevepoll;
136 static int waitevt;
137 #endif
138
139 /**
140  * Create a new job with the given parameters
141  * @param group    the group of the job
142  * @param timeout  the timeout of the job (0 if none)
143  * @param callback the function that achieves the job
144  * @param arg      the argument of the callback
145  * @return the created job unblock or NULL when no more memory
146  */
147 static struct job *job_create(
148                 const void *group,
149                 int timeout,
150                 job_cb_t callback,
151                 void *arg)
152 {
153         struct job *job;
154
155         /* try recyle existing job */
156         job = free_jobs;
157         if (job)
158                 free_jobs = job->next;
159         else {
160                 /* allocation without blocking */
161                 pthread_mutex_unlock(&mutex);
162                 job = malloc(sizeof *job);
163                 pthread_mutex_lock(&mutex);
164                 if (!job) {
165                         ERROR("out of memory");
166                         errno = ENOMEM;
167                         goto end;
168                 }
169         }
170         /* initialises the job */
171         job->group = group;
172         job->timeout = timeout;
173         job->callback = callback;
174         job->arg = arg;
175         job->blocked = 0;
176         job->dropped = 0;
177 end:
178         return job;
179 }
180
181 /**
182  * Adds 'job' at the end of the list of jobs, marking it
183  * as blocked if an other job with the same group is pending.
184  * @param job the job to add
185  */
186 static void job_add(struct job *job)
187 {
188         const void *group;
189         struct job *ijob, **pjob;
190
191         /* prepare to add */
192         group = job->group;
193         job->next = NULL;
194
195         /* search end and blockers */
196         pjob = &first_job;
197         ijob = first_job;
198         while (ijob) {
199                 if (group && ijob->group == group)
200                         job->blocked = 1;
201                 pjob = &ijob->next;
202                 ijob = ijob->next;
203         }
204
205         /* queue the jobs */
206         *pjob = job;
207         remains--;
208 }
209
210 /**
211  * Get the next job to process or NULL if none.
212  * @return the first job that isn't blocked or NULL
213  */
214 static inline struct job *job_get()
215 {
216         struct job *job = first_job;
217         while (job && job->blocked)
218                 job = job->next;
219         if (job)
220                 remains++;
221         return job;
222 }
223
224 /**
225  * Releases the processed 'job': removes it
226  * from the list of jobs and unblock the first
227  * pending job of the same group if any.
228  * @param job the job to release
229  */
230 static inline void job_release(struct job *job)
231 {
232         struct job *ijob, **pjob;
233         const void *group;
234
235         /* first unqueue the job */
236         pjob = &first_job;
237         ijob = first_job;
238         while (ijob != job) {
239                 pjob = &ijob->next;
240                 ijob = ijob->next;
241         }
242         *pjob = job->next;
243
244         /* then unblock jobs of the same group */
245         group = job->group;
246         if (group) {
247                 ijob = job->next;
248                 while (ijob && ijob->group != group)
249                         ijob = ijob->next;
250                 if (ijob)
251                         ijob->blocked = 0;
252         }
253
254         /* recycle the job */
255         job->next = free_jobs;
256         free_jobs = job;
257 }
258
259 /**
260  * Monitored cancel callback for a job.
261  * This function is called by the monitor
262  * to cancel the job when the safe environment
263  * is set.
264  * @param signum 0 on normal flow or the number
265  *               of the signal that interrupted the normal
266  *               flow, isn't used
267  * @param arg    the job to run
268  */
269 static void job_cancel(int signum, void *arg)
270 {
271         struct job *job = arg;
272         job->callback(SIGABRT, job->arg);
273 }
274
275 #if defined(REMOVE_SYSTEMD_EVENT)
276 /**
277  * Gets a fdev_epoll item.
278  * @return a fdev_epoll or NULL in case of error
279  */
280 static struct fdev_epoll *get_fdevepoll()
281 {
282         struct fdev_epoll *result;
283
284         result = fdevepoll;
285         if (!result)
286                 result = fdevepoll = fdev_epoll_create();
287
288         return result;
289 }
290 #endif
291
292 /**
293  * Monitored normal callback for events.
294  * This function is called by the monitor
295  * to run the event loop when the safe environment
296  * is set.
297  * @param signum 0 on normal flow or the number
298  *               of the signal that interrupted the normal
299  *               flow
300  * @param arg     the events to run
301  */
302 static void evloop_run(int signum, void *arg)
303 {
304         int rc;
305         struct sd_event *se;
306
307         if (!signum) {
308                 se = evloop.sdev;
309                 rc = sd_event_prepare(se);
310                 if (rc < 0) {
311                         errno = -rc;
312                         CRITICAL("sd_event_prepare returned an error (state: %d): %m", sd_event_get_state(se));
313                         abort();
314                 } else {
315                         if (rc == 0) {
316                                 rc = sd_event_wait(se, (uint64_t)(int64_t)-1);
317                                 if (rc < 0) {
318                                         errno = -rc;
319                                         ERROR("sd_event_wait returned an error (state: %d): %m", sd_event_get_state(se));
320                                 }
321                         }
322                         evloop.state = EVLOOP_STATE_RUN;
323                         if (rc > 0) {
324                                 rc = sd_event_dispatch(se);
325                                 if (rc < 0) {
326                                         errno = -rc;
327                                         ERROR("sd_event_dispatch returned an error (state: %d): %m", sd_event_get_state(se));
328                                 }
329                         }
330                 }
331         }
332 }
333
334 /**
335  * Internal callback for evloop management.
336  * The effect of this function is hidden: it exits
337  * the waiting poll if any.
338  */
339 static void evloop_on_efd_event()
340 {
341         uint64_t x;
342         read(evloop.efd, &x, sizeof x);
343 }
344
345 /**
346  * wakeup the event loop if needed by sending
347  * an event.
348  */
349 static void evloop_wakeup()
350 {
351         uint64_t x;
352
353         if (evloop.state & EVLOOP_STATE_WAIT) {
354                 x = 1;
355                 write(evloop.efd, &x, sizeof x);
356         }
357 }
358
359 /**
360  * Release the currently held event loop
361  */
362 static void evloop_release()
363 {
364         struct thread *nh, *ct = current_thread;
365
366         if (evloop.holder == ct) {
367                 nh = ct->nholder;
368                 evloop.holder = nh;
369                 if (nh)
370                         pthread_cond_signal(nh->cwhold);
371         }
372 }
373
374 /**
375  * get the eventloop for the current thread
376  */
377 static int evloop_get()
378 {
379         struct thread *ct = current_thread;
380
381         if (evloop.holder)
382                 return evloop.holder == ct;
383
384         ct->nholder = NULL;
385         evloop.holder = ct;
386         return 1;
387 }
388
389 /**
390  * acquire the eventloop for the current thread
391  */
392 static void evloop_acquire()
393 {
394         struct thread **pwait, *ct;
395         pthread_cond_t cond;
396
397         /* try to get the evloop */
398         if (!evloop_get()) {
399                 /* failed, init waiting state */
400                 ct = current_thread;
401                 ct->nholder = NULL;
402                 ct->cwhold = &cond;
403                 pthread_cond_init(&cond, NULL);
404
405                 /* queue current thread in holder list */
406                 pwait = &evloop.holder;
407                 while (*pwait)
408                         pwait = &(*pwait)->nholder;
409                 *pwait = ct;
410
411                 /* wake up the evloop */
412                 evloop_wakeup();
413
414                 /* wait to acquire the evloop */
415                 pthread_cond_wait(&cond, &mutex);
416                 pthread_cond_destroy(&cond);
417         }
418 }
419
420 #if defined(REMOVE_SYSTEMD_EVENT)
421 /**
422  * Monitored normal loop for waiting events.
423  * @param signum 0 on normal flow or the number
424  *               of the signal that interrupted the normal
425  *               flow
426  * @param arg     the events to run
427  */
428 static void monitored_wait_and_dispatch(int signum, void *arg)
429 {
430         struct fdev_epoll *fdev_epoll = arg;
431         if (!signum) {
432                 fdev_epoll_wait_and_dispatch(fdev_epoll, -1);
433         }
434 }
435 #endif
436
437 /**
438  * Enter the thread
439  * @param me the description of the thread to enter
440  */
441 static void thread_enter(volatile struct thread *me)
442 {
443         /* initialize description of itself and link it in the list */
444         me->tid = pthread_self();
445         me->stop = 0;
446         me->waits = 0;
447         me->upper = current_thread;
448         me->next = threads;
449         threads = (struct thread*)me;
450         current_thread = (struct thread*)me;
451 }
452
453 /**
454  * leave the thread
455  * @param me the description of the thread to leave
456  */
457 static void thread_leave()
458 {
459         struct thread **prv, *me;
460
461         /* unlink the current thread and cleanup */
462         me = current_thread;
463         prv = &threads;
464         while (*prv != me)
465                 prv = &(*prv)->next;
466         *prv = me->next;
467
468         current_thread = me->upper;
469 }
470
471 /**
472  * Main processing loop of internal threads with processing jobs.
473  * The loop must be called with the mutex locked
474  * and it returns with the mutex locked.
475  * @param me the description of the thread to use
476  * TODO: how are timeout handled when reentering?
477  */
478 static void thread_run_internal(volatile struct thread *me)
479 {
480         struct job *job;
481
482         /* enter thread */
483         thread_enter(me);
484
485         /* loop until stopped */
486         while (!me->stop) {
487                 /* release the current event loop */
488                 evloop_release();
489
490                 /* get a job */
491                 job = job_get();
492                 if (job) {
493                         /* prepare running the job */
494                         job->blocked = 1; /* mark job as blocked */
495                         me->job = job; /* record the job (only for terminate) */
496
497                         /* run the job */
498                         pthread_mutex_unlock(&mutex);
499                         sig_monitor(job->timeout, job->callback, job->arg);
500                         pthread_mutex_lock(&mutex);
501
502                         /* release the run job */
503                         job_release(job);
504 #if !defined(REMOVE_SYSTEMD_EVENT)
505                 /* no job, check event loop wait */
506                 } else if (evloop_get()) {
507                         if (evloop.state != 0) {
508                                 /* busy ? */
509                                 CRITICAL("Can't enter dispatch while in dispatch!");
510                                 abort();
511                         }
512                         /* run the events */
513                         evloop.state = EVLOOP_STATE_RUN|EVLOOP_STATE_WAIT;
514                         pthread_mutex_unlock(&mutex);
515                         sig_monitor(0, evloop_run, NULL);
516                         pthread_mutex_lock(&mutex);
517                         evloop.state = 0;
518                 } else {
519                         /* no job and no event loop */
520                         running--;
521                         if (!running)
522                                 ERROR("Entering job deep sleep! Check your bindings.");
523                         me->waits = 1;
524                         pthread_cond_wait(&cond, &mutex);
525                         me->waits = 0;
526                         running++;
527 #else
528                 } else if (waitevt) {
529                         /* no job and not events */
530                         running--;
531                         if (!running)
532                                 ERROR("Entering job deep sleep! Check your bindings.");
533                         me->waits = 1;
534                         pthread_cond_wait(&cond, &mutex);
535                         me->waits = 0;
536                         running++;
537                 } else {
538                         /* wait for events */
539                         waitevt = 1;
540                         pthread_mutex_unlock(&mutex);
541                         sig_monitor(0, monitored_wait_and_dispatch, get_fdevepoll());
542                         pthread_mutex_lock(&mutex);
543                         waitevt = 0;
544 #endif
545                 }
546         }
547         /* cleanup */
548         evloop_release();
549         thread_leave();
550 }
551
552 /**
553  * Main processing loop of external threads.
554  * The loop must be called with the mutex locked
555  * and it returns with the mutex locked.
556  * @param me the description of the thread to use
557  */
558 static void thread_run_external(volatile struct thread *me)
559 {
560         /* enter thread */
561         thread_enter(me);
562
563         /* loop until stopped */
564         me->waits = 1;
565         while (!me->stop)
566                 pthread_cond_wait(&cond, &mutex);
567         me->waits = 0;
568         thread_leave();
569 }
570
571 /**
572  * Root for created threads.
573  */
574 static void thread_main()
575 {
576         struct thread me;
577
578         running++;
579         started++;
580         sig_monitor_init_timeouts();
581         thread_run_internal(&me);
582         sig_monitor_clean_timeouts();
583         started--;
584         running--;
585 }
586
587 /**
588  * Entry point for created threads.
589  * @param data not used
590  * @return NULL
591  */
592 static void *thread_starter(void *data)
593 {
594         pthread_mutex_lock(&mutex);
595         thread_main();
596         pthread_mutex_unlock(&mutex);
597         return NULL;
598 }
599
600 /**
601  * Starts a new thread
602  * @return 0 in case of success or -1 in case of error
603  */
604 static int start_one_thread()
605 {
606         pthread_t tid;
607         int rc;
608
609         rc = pthread_create(&tid, NULL, thread_starter, NULL);
610         if (rc != 0) {
611                 /* errno = rc; */
612                 WARNING("not able to start thread: %m");
613                 rc = -1;
614         }
615         return rc;
616 }
617
618 /**
619  * Queues a new asynchronous job represented by 'callback' and 'arg'
620  * for the 'group' and the 'timeout'.
621  * Jobs are queued FIFO and are possibly executed in parallel
622  * concurrently except for job of the same group that are
623  * executed sequentially in FIFO order.
624  * @param group    The group of the job or NULL when no group.
625  * @param timeout  The maximum execution time in seconds of the job
626  *                 or 0 for unlimited time.
627  * @param callback The function to execute for achieving the job.
628  *                 Its first parameter is either 0 on normal flow
629  *                 or the signal number that broke the normal flow.
630  *                 The remaining parameter is the parameter 'arg1'
631  *                 given here.
632  * @param arg      The second argument for 'callback'
633  * @return 0 in case of success or -1 in case of error
634  */
635 int jobs_queue(
636                 const void *group,
637                 int timeout,
638                 void (*callback)(int, void*),
639                 void *arg)
640 {
641         struct job *job;
642         int rc;
643
644         pthread_mutex_lock(&mutex);
645
646         /* allocates the job */
647         job = job_create(group, timeout, callback, arg);
648         if (!job)
649                 goto error;
650
651         /* check availability */
652         if (remains <= 0) {
653                 ERROR("can't process job with threads: too many jobs");
654                 errno = EBUSY;
655                 goto error2;
656         }
657
658         /* start a thread if needed */
659         if (running == started && started < allowed) {
660                 /* all threads are busy and a new can be started */
661                 rc = start_one_thread();
662                 if (rc < 0 && started == 0) {
663                         ERROR("can't start initial thread: %m");
664                         goto error2;
665                 }
666         }
667
668         /* queues the job */
669         job_add(job);
670
671         /* signal an existing job */
672         pthread_cond_signal(&cond);
673         pthread_mutex_unlock(&mutex);
674         return 0;
675
676 error2:
677         job->next = free_jobs;
678         free_jobs = job;
679 error:
680         pthread_mutex_unlock(&mutex);
681         return -1;
682 }
683
684 /**
685  * Internal helper function for 'jobs_enter'.
686  * @see jobs_enter, jobs_leave
687  */
688 static void enter_cb(int signum, void *closure)
689 {
690         struct sync *sync = closure;
691         sync->enter(signum, sync->arg, (void*)&sync->thread);
692 }
693
694 /**
695  * Internal helper function for 'jobs_call'.
696  * @see jobs_call
697  */
698 static void call_cb(int signum, void *closure)
699 {
700         struct sync *sync = closure;
701         sync->callback(signum, sync->arg);
702         jobs_leave((void*)&sync->thread);
703 }
704
705 /**
706  * Internal helper for synchronous jobs. It enters
707  * a new thread loop for evaluating the given job
708  * as recorded by the couple 'sync_cb' and 'sync'.
709  * @see jobs_call, jobs_enter, jobs_leave
710  */
711 static int do_sync(
712                 const void *group,
713                 int timeout,
714                 void (*sync_cb)(int signum, void *closure),
715                 struct sync *sync
716 )
717 {
718         struct job *job;
719
720         pthread_mutex_lock(&mutex);
721
722         /* allocates the job */
723         job = job_create(group, timeout, sync_cb, sync);
724         if (!job) {
725                 pthread_mutex_unlock(&mutex);
726                 return -1;
727         }
728
729         /* queues the job */
730         job_add(job);
731
732         /* run until stopped */
733         if (current_thread)
734                 thread_run_internal(&sync->thread);
735         else
736                 thread_run_external(&sync->thread);
737         pthread_mutex_unlock(&mutex);
738         return 0;
739 }
740
741 /**
742  * Enter a synchronisation point: activates the job given by 'callback'
743  * and 'closure' using 'group' and 'timeout' to control sequencing and
744  * execution time.
745  * @param group the group for sequencing jobs
746  * @param timeout the time in seconds allocated to the job
747  * @param callback the callback that will handle the job.
748  *                 it receives 3 parameters: 'signum' that will be 0
749  *                 on normal flow or the catched signal number in case
750  *                 of interrupted flow, the context 'closure' as given and
751  *                 a 'jobloop' reference that must be used when the job is
752  *                 terminated to unlock the current execution flow.
753  * @param closure the argument to the callback
754  * @return 0 on success or -1 in case of error
755  */
756 int jobs_enter(
757                 const void *group,
758                 int timeout,
759                 void (*callback)(int signum, void *closure, struct jobloop *jobloop),
760                 void *closure
761 )
762 {
763         struct sync sync;
764
765         sync.enter = callback;
766         sync.arg = closure;
767         return do_sync(group, timeout, enter_cb, &sync);
768 }
769
770 /**
771  * Unlocks the execution flow designed by 'jobloop'.
772  * @param jobloop indication of the flow to unlock
773  * @return 0 in case of success of -1 on error
774  */
775 int jobs_leave(struct jobloop *jobloop)
776 {
777         struct thread *t;
778
779         pthread_mutex_lock(&mutex);
780         t = threads;
781         while (t && t != (struct thread*)jobloop)
782                 t = t->next;
783         if (!t) {
784                 errno = EINVAL;
785         } else {
786                 t->stop = 1;
787                 if (t->waits)
788                         pthread_cond_broadcast(&cond);
789                 else
790                         evloop_wakeup();
791         }
792         pthread_mutex_unlock(&mutex);
793         return -!t;
794 }
795
796 /**
797  * Calls synchronously the job represented by 'callback' and 'arg1'
798  * for the 'group' and the 'timeout' and waits for its completion.
799  * @param group    The group of the job or NULL when no group.
800  * @param timeout  The maximum execution time in seconds of the job
801  *                 or 0 for unlimited time.
802  * @param callback The function to execute for achieving the job.
803  *                 Its first parameter is either 0 on normal flow
804  *                 or the signal number that broke the normal flow.
805  *                 The remaining parameter is the parameter 'arg1'
806  *                 given here.
807  * @param arg      The second argument for 'callback'
808  * @return 0 in case of success or -1 in case of error
809  */
810 int jobs_call(
811                 const void *group,
812                 int timeout,
813                 void (*callback)(int, void*),
814                 void *arg)
815 {
816         struct sync sync;
817
818         sync.callback = callback;
819         sync.arg = arg;
820
821         return do_sync(group, timeout, call_cb, &sync);
822 }
823
824 /**
825  * Internal callback for evloop management.
826  * The effect of this function is hidden: it exits
827  * the waiting poll if any. Then it wakes up a thread
828  * awaiting the evloop using signal.
829  */
830 static int on_evloop_efd(sd_event_source *s, int fd, uint32_t revents, void *userdata)
831 {
832         evloop_on_efd_event();
833         return 1;
834 }
835
836 /* temporary hack */
837 #if !defined(REMOVE_SYSTEMD_EVENT)
838 __attribute__((unused))
839 #endif
840 static void evloop_callback(void *arg, uint32_t event, struct fdev *fdev)
841 {
842         sig_monitor(0, evloop_run, arg);
843 }
844
845 /**
846  * Gets a sd_event item for the current thread.
847  * @return a sd_event or NULL in case of error
848  */
849 static struct sd_event *get_sd_event_locked()
850 {
851         int rc;
852
853         /* creates the evloop on need */
854         if (!evloop.sdev) {
855                 /* start the creation */
856                 evloop.state = 0;
857                 /* creates the eventfd for waking up polls */
858                 evloop.efd = eventfd(0, EFD_CLOEXEC|EFD_SEMAPHORE);
859                 if (evloop.efd < 0) {
860                         ERROR("can't make eventfd for events");
861                         goto error1;
862                 }
863                 /* create the systemd event loop */
864                 rc = sd_event_new(&evloop.sdev);
865                 if (rc < 0) {
866                         ERROR("can't make new event loop");
867                         goto error2;
868                 }
869                 /* put the eventfd in the event loop */
870                 rc = sd_event_add_io(evloop.sdev, NULL, evloop.efd, EPOLLIN, on_evloop_efd, NULL);
871                 if (rc < 0) {
872                         ERROR("can't register eventfd");
873 #if !defined(REMOVE_SYSTEMD_EVENT)
874                         sd_event_unref(evloop.sdev);
875                         evloop.sdev = NULL;
876 error2:
877                         close(evloop.efd);
878 error1:
879                         return NULL;
880                 }
881 #else
882                         goto error3;
883                 }
884                 /* handle the event loop */
885                 evloop.fdev = fdev_epoll_add(get_fdevepoll(), sd_event_get_fd(evloop.sdev));
886                 if (!evloop.fdev) {
887                         ERROR("can't create fdev");
888 error3:
889                         sd_event_unref(evloop.sdev);
890 error2:
891                         close(evloop.efd);
892 error1:
893                         memset(&evloop, 0, sizeof evloop);
894                         return NULL;
895                 }
896                 fdev_set_autoclose(evloop.fdev, 0);
897                 fdev_set_events(evloop.fdev, EPOLLIN);
898                 fdev_set_callback(evloop.fdev, evloop_callback, NULL);
899 #endif
900         }
901
902         /* acquire the event loop */
903         evloop_acquire();
904
905         return evloop.sdev;
906 }
907
908 /**
909  * Gets a sd_event item for the current thread.
910  * @return a sd_event or NULL in case of error
911  */
912 struct sd_event *jobs_get_sd_event()
913 {
914         struct sd_event *result;
915         struct thread lt;
916
917         /* ensure an existing thread environment */
918         if (!current_thread) {
919                 memset(&lt, 0, sizeof lt);
920                 current_thread = &lt;
921         }
922
923         /* process */
924         pthread_mutex_lock(&mutex);
925         result = get_sd_event_locked();
926         pthread_mutex_unlock(&mutex);
927
928         /* release the faked thread environment if needed */
929         if (current_thread == &lt) {
930                 /*
931                  * Releasing it is needed because there is no way to guess
932                  * when it has to be released really. But here is where it is
933                  * hazardous: if the caller modifies the eventloop when it
934                  * is waiting, there is no way to make the change effective.
935                  * A workaround to achieve that goal is for the caller to
936                  * require the event loop a second time after having modified it.
937                  */
938                 NOTICE("Requiring sd_event loop out of binder callbacks is hazardous!");
939                 if (verbose_wants(Log_Level_Info))
940                         sig_monitor_dumpstack();
941                 evloop_release();
942                 current_thread = NULL;
943         }
944
945         return result;
946 }
947
948 #if defined(REMOVE_SYSTEMD_EVENT)
949 /**
950  * Gets the fdev_epoll item.
951  * @return a fdev_epoll or NULL in case of error
952  */
953 struct fdev_epoll *jobs_get_fdev_epoll()
954 {
955         struct fdev_epoll *result;
956
957         pthread_mutex_lock(&mutex);
958         result = get_fdevepoll();
959         pthread_mutex_unlock(&mutex);
960
961         return result;
962 }
963 #endif
964
965 /**
966  * Enter the jobs processing loop.
967  * @param allowed_count Maximum count of thread for jobs including this one
968  * @param start_count   Count of thread to start now, must be lower.
969  * @param waiter_count  Maximum count of jobs that can be waiting.
970  * @param start         The start routine to activate (can't be NULL)
971  * @return 0 in case of success or -1 in case of error.
972  */
973 int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)(int signum, void* arg), void *arg)
974 {
975         int rc, launched;
976         struct job *job;
977
978         assert(allowed_count >= 1);
979         assert(start_count >= 0);
980         assert(waiter_count > 0);
981         assert(start_count <= allowed_count);
982
983         rc = -1;
984         pthread_mutex_lock(&mutex);
985
986         /* check whether already running */
987         if (current_thread || allowed) {
988                 ERROR("thread already started");
989                 errno = EINVAL;
990                 goto error;
991         }
992
993         /* records the allowed count */
994         allowed = allowed_count;
995         started = 0;
996         running = 0;
997         remains = waiter_count;
998
999 #if HAS_WATCHDOG
1000         /* set the watchdog */
1001         if (sd_watchdog_enabled(0, NULL))
1002                 sd_event_set_watchdog(get_sd_event_locked(), 1);
1003 #endif
1004
1005         /* start at least one thread: the current one */
1006         launched = 1;
1007         while (launched < start_count) {
1008                 if (start_one_thread() != 0) {
1009                         ERROR("Not all threads can be started");
1010                         goto error;
1011                 }
1012                 launched++;
1013         }
1014
1015         /* queue the start job */
1016         job = job_create(NULL, 0, start, arg);
1017         if (!job)
1018                 goto error;
1019         job_add(job);
1020
1021         /* run until end */
1022         thread_main();
1023         rc = 0;
1024 error:
1025         pthread_mutex_unlock(&mutex);
1026         return rc;
1027 }
1028
1029 /**
1030  * Terminate all the threads and cancel all pending jobs.
1031  */
1032 void jobs_terminate()
1033 {
1034         struct job *job, *head, *tail;
1035         pthread_t me, *others;
1036         struct thread *t;
1037         int count;
1038
1039         /* how am i? */
1040         me = pthread_self();
1041
1042         /* request all threads to stop */
1043         pthread_mutex_lock(&mutex);
1044         allowed = 0;
1045
1046         /* count the number of threads */
1047         count = 0;
1048         t = threads;
1049         while (t) {
1050                 if (!t->upper && !pthread_equal(t->tid, me))
1051                         count++;
1052                 t = t->next;
1053         }
1054
1055         /* fill the array of threads */
1056         others = alloca(count * sizeof *others);
1057         count = 0;
1058         t = threads;
1059         while (t) {
1060                 if (!t->upper && !pthread_equal(t->tid, me))
1061                         others[count++] = t->tid;
1062                 t = t->next;
1063         }
1064
1065         /* stops the threads */
1066         t = threads;
1067         while (t) {
1068                 t->stop = 1;
1069                 t = t->next;
1070         }
1071
1072         /* wait the threads */
1073         pthread_cond_broadcast(&cond);
1074         pthread_mutex_unlock(&mutex);
1075         while (count)
1076                 pthread_join(others[--count], NULL);
1077         pthread_mutex_lock(&mutex);
1078
1079         /* cancel pending jobs of other threads */
1080         remains = 0;
1081         head = first_job;
1082         first_job = NULL;
1083         tail = NULL;
1084         while (head) {
1085                 /* unlink the job */
1086                 job = head;
1087                 head = job->next;
1088
1089                 /* search if job is stacked for current */
1090                 t = current_thread;
1091                 while (t && t->job != job)
1092                         t = t->upper;
1093                 if (t) {
1094                         /* yes, relink it at end */
1095                         if (tail)
1096                                 tail->next = job;
1097                         else
1098                                 first_job = job;
1099                         tail = job;
1100                         job->next = NULL;
1101                 } else {
1102                         /* no cancel the job */
1103                         pthread_mutex_unlock(&mutex);
1104                         sig_monitor(0, job_cancel, job);
1105                         free(job);
1106                         pthread_mutex_lock(&mutex);
1107                 }
1108         }
1109         pthread_mutex_unlock(&mutex);
1110 }
1111