417f7eac1fab792720c9e1a9acba6f1a19436ba1
[src/app-framework-binder.git] / src / jobs.c
1 /*
2  * Copyright (C) 2016, 2017, 2018 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #if defined(NO_JOBS_WATCHDOG)
21 #   define HAS_WATCHDOG 0
22 #else
23 #   define HAS_WATCHDOG 1
24 #endif
25
26 #include <stdlib.h>
27 #include <stdint.h>
28 #include <unistd.h>
29 #include <signal.h>
30 #include <string.h>
31 #include <time.h>
32 #include <sys/syscall.h>
33 #include <pthread.h>
34 #include <errno.h>
35 #include <assert.h>
36 #include <sys/eventfd.h>
37
38 #include <systemd/sd-event.h>
39 #include "fdev.h"
40 #if HAS_WATCHDOG
41 #include <systemd/sd-daemon.h>
42 #endif
43
44 #include "jobs.h"
45 #include "sig-monitor.h"
46 #include "verbose.h"
47
48 #if defined(REMOVE_SYSTEMD_EVENT)
49 #include "fdev-epoll.h"
50 #endif
51
52 #define EVENT_TIMEOUT_TOP       ((uint64_t)-1)
53 #define EVENT_TIMEOUT_CHILD     ((uint64_t)10000)
54
55 struct thread;
56
57 /** Internal shortcut for callback */
58 typedef void (*job_cb_t)(int, void*);
59
60 /** Description of a pending job */
61 struct job
62 {
63         struct job *next;    /**< link to the next job enqueued */
64         const void *group;   /**< group of the request */
65         job_cb_t callback;   /**< processing callback */
66         void *arg;           /**< argument */
67         int timeout;         /**< timeout in second for processing the request */
68         unsigned blocked: 1; /**< is an other request blocking this one ? */
69         unsigned dropped: 1; /**< is removed ? */
70 };
71
72 /** Description of handled event loops */
73 struct evloop
74 {
75         unsigned state;        /**< encoded state */
76         int efd;               /**< event notification */
77         struct sd_event *sdev; /**< the systemd event loop */
78         struct fdev *fdev;     /**< handling of events */
79         struct thread *holder; /**< holder of the evloop */
80 };
81
82 #define EVLOOP_STATE_WAIT           1U
83 #define EVLOOP_STATE_RUN            2U
84
85 /** Description of threads */
86 struct thread
87 {
88         struct thread *next;   /**< next thread of the list */
89         struct thread *upper;  /**< upper same thread */
90         struct thread *nholder;/**< next holder for evloop */
91         pthread_cond_t *cwhold;/**< condition wait for holding */
92         struct job *job;       /**< currently processed job */
93         pthread_t tid;         /**< the thread id */
94         volatile unsigned stop: 1;      /**< stop requested */
95         volatile unsigned waits: 1;     /**< is waiting? */
96 };
97
98 /**
99  * Description of synchronous callback
100  */
101 struct sync
102 {
103         struct thread thread;   /**< thread loop data */
104         union {
105                 void (*callback)(int, void*);   /**< the synchronous callback */
106                 void (*enter)(int signum, void *closure, struct jobloop *jobloop);
107                                 /**< the entering synchronous routine */
108         };
109         void *arg;              /**< the argument of the callback */
110 };
111
112
113 /* synchronisation of threads */
114 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
115 static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
116
117 /* count allowed, started and running threads */
118 static int allowed = 0; /** allowed count of threads */
119 static int started = 0; /** started count of threads */
120 static int running = 0; /** running count of threads */
121 static int remains = 0; /** allowed count of waiting jobs */
122
123 /* list of threads */
124 static struct thread *threads;
125 static _Thread_local struct thread *current_thread;
126
127 /* queue of pending jobs */
128 static struct job *first_job;
129 static struct job *free_jobs;
130
131 /* event loop */
132 static struct evloop evloop;
133
134 #if defined(REMOVE_SYSTEMD_EVENT)
135 static struct fdev_epoll *fdevepoll;
136 static int waitevt;
137 #endif
138
139 /**
140  * Create a new job with the given parameters
141  * @param group    the group of the job
142  * @param timeout  the timeout of the job (0 if none)
143  * @param callback the function that achieves the job
144  * @param arg      the argument of the callback
145  * @return the created job unblock or NULL when no more memory
146  */
147 static struct job *job_create(
148                 const void *group,
149                 int timeout,
150                 job_cb_t callback,
151                 void *arg)
152 {
153         struct job *job;
154
155         /* try recyle existing job */
156         job = free_jobs;
157         if (job)
158                 free_jobs = job->next;
159         else {
160                 /* allocation without blocking */
161                 pthread_mutex_unlock(&mutex);
162                 job = malloc(sizeof *job);
163                 pthread_mutex_lock(&mutex);
164                 if (!job) {
165                         errno = ENOMEM;
166                         goto end;
167                 }
168         }
169         /* initialises the job */
170         job->group = group;
171         job->timeout = timeout;
172         job->callback = callback;
173         job->arg = arg;
174         job->blocked = 0;
175         job->dropped = 0;
176 end:
177         return job;
178 }
179
180 /**
181  * Adds 'job' at the end of the list of jobs, marking it
182  * as blocked if an other job with the same group is pending.
183  * @param job the job to add
184  */
185 static void job_add(struct job *job)
186 {
187         const void *group;
188         struct job *ijob, **pjob;
189
190         /* prepare to add */
191         group = job->group;
192         job->next = NULL;
193
194         /* search end and blockers */
195         pjob = &first_job;
196         ijob = first_job;
197         while (ijob) {
198                 if (group && ijob->group == group)
199                         job->blocked = 1;
200                 pjob = &ijob->next;
201                 ijob = ijob->next;
202         }
203
204         /* queue the jobs */
205         *pjob = job;
206 }
207
208 /**
209  * Get the next job to process or NULL if none.
210  * @return the first job that isn't blocked or NULL
211  */
212 static inline struct job *job_get()
213 {
214         struct job *job = first_job;
215         while (job && job->blocked)
216                 job = job->next;
217         return job;
218 }
219
220 /**
221  * Releases the processed 'job': removes it
222  * from the list of jobs and unblock the first
223  * pending job of the same group if any.
224  * @param job the job to release
225  */
226 static inline void job_release(struct job *job)
227 {
228         struct job *ijob, **pjob;
229         const void *group;
230
231         /* first unqueue the job */
232         pjob = &first_job;
233         ijob = first_job;
234         while (ijob != job) {
235                 pjob = &ijob->next;
236                 ijob = ijob->next;
237         }
238         *pjob = job->next;
239
240         /* then unblock jobs of the same group */
241         group = job->group;
242         if (group) {
243                 ijob = job->next;
244                 while (ijob && ijob->group != group)
245                         ijob = ijob->next;
246                 if (ijob)
247                         ijob->blocked = 0;
248         }
249
250         /* recycle the job */
251         job->next = free_jobs;
252         free_jobs = job;
253 }
254
255 /**
256  * Monitored cancel callback for a job.
257  * This function is called by the monitor
258  * to cancel the job when the safe environment
259  * is set.
260  * @param signum 0 on normal flow or the number
261  *               of the signal that interrupted the normal
262  *               flow, isn't used
263  * @param arg    the job to run
264  */
265 static void job_cancel(int signum, void *arg)
266 {
267         struct job *job = arg;
268         job->callback(SIGABRT, job->arg);
269 }
270
271 #if defined(REMOVE_SYSTEMD_EVENT)
272 /**
273  * Gets a fdev_epoll item.
274  * @return a fdev_epoll or NULL in case of error
275  */
276 static struct fdev_epoll *get_fdevepoll()
277 {
278         struct fdev_epoll *result;
279
280         result = fdevepoll;
281         if (!result)
282                 result = fdevepoll = fdev_epoll_create();
283
284         return result;
285 }
286 #endif
287
288 /**
289  * Monitored normal callback for events.
290  * This function is called by the monitor
291  * to run the event loop when the safe environment
292  * is set.
293  * @param signum 0 on normal flow or the number
294  *               of the signal that interrupted the normal
295  *               flow
296  * @param arg     the events to run
297  */
298 static void evloop_run(int signum, void *arg)
299 {
300         int rc;
301         struct sd_event *se;
302
303         if (!signum) {
304                 se = evloop.sdev;
305                 rc = sd_event_prepare(se);
306                 if (rc < 0) {
307                         errno = -rc;
308                         CRITICAL("sd_event_prepare returned an error (state: %d): %m", sd_event_get_state(se));
309                         abort();
310                 } else {
311                         if (rc == 0) {
312                                 rc = sd_event_wait(se, (uint64_t)(int64_t)-1);
313                                 if (rc < 0) {
314                                         errno = -rc;
315                                         ERROR("sd_event_wait returned an error (state: %d): %m", sd_event_get_state(se));
316                                 }
317                         }
318                         evloop.state = EVLOOP_STATE_RUN;
319                         if (rc > 0) {
320                                 rc = sd_event_dispatch(se);
321                                 if (rc < 0) {
322                                         errno = -rc;
323                                         ERROR("sd_event_dispatch returned an error (state: %d): %m", sd_event_get_state(se));
324                                 }
325                         }
326                 }
327         }
328 }
329
330 /**
331  * Internal callback for evloop management.
332  * The effect of this function is hidden: it exits
333  * the waiting poll if any.
334  */
335 static void evloop_on_efd_event()
336 {
337         uint64_t x;
338         read(evloop.efd, &x, sizeof x);
339 }
340
341 /**
342  * wakeup the event loop if needed by sending
343  * an event.
344  */
345 static void evloop_wakeup()
346 {
347         uint64_t x;
348
349         if (evloop.state & EVLOOP_STATE_WAIT) {
350                 x = 1;
351                 write(evloop.efd, &x, sizeof x);
352         }
353 }
354
355 /**
356  * Release the currently held event loop
357  */
358 static void evloop_release()
359 {
360         struct thread *nh, *ct = current_thread;
361
362         if (evloop.holder == ct) {
363                 nh = ct->nholder;
364                 evloop.holder = nh;
365                 if (nh)
366                         pthread_cond_signal(nh->cwhold);
367         }
368 }
369
370 /**
371  * get the eventloop for the current thread
372  */
373 static int evloop_get()
374 {
375         struct thread *ct = current_thread;
376
377         if (evloop.holder)
378                 return evloop.holder == ct;
379
380         ct->nholder = NULL;
381         evloop.holder = ct;
382         return 1;
383 }
384
385 /**
386  * acquire the eventloop for the current thread
387  */
388 static void evloop_acquire()
389 {
390         struct thread **pwait, *ct;
391         pthread_cond_t cond;
392
393         /* try to get the evloop */
394         if (!evloop_get()) {
395                 /* failed, init waiting state */
396                 ct = current_thread;
397                 ct->nholder = NULL;
398                 ct->cwhold = &cond;
399                 pthread_cond_init(&cond, NULL);
400
401                 /* queue current thread in holder list */
402                 pwait = &evloop.holder;
403                 while (*pwait)
404                         pwait = &(*pwait)->nholder;
405                 *pwait = ct;
406
407                 /* wake up the evloop */
408                 evloop_wakeup();
409
410                 /* wait to acquire the evloop */
411                 pthread_cond_wait(&cond, &mutex);
412                 pthread_cond_destroy(&cond);
413         }
414 }
415
416 #if defined(REMOVE_SYSTEMD_EVENT)
417 /**
418  * Monitored normal loop for waiting events.
419  * @param signum 0 on normal flow or the number
420  *               of the signal that interrupted the normal
421  *               flow
422  * @param arg     the events to run
423  */
424 static void monitored_wait_and_dispatch(int signum, void *arg)
425 {
426         struct fdev_epoll *fdev_epoll = arg;
427         if (!signum) {
428                 fdev_epoll_wait_and_dispatch(fdev_epoll, -1);
429         }
430 }
431 #endif
432
433 /**
434  * Main processing loop of threads processing jobs.
435  * The loop must be called with the mutex locked
436  * and it returns with the mutex locked.
437  * @param me the description of the thread to use
438  * TODO: how are timeout handled when reentering?
439  */
440 static void thread_run(volatile struct thread *me)
441 {
442         struct thread **prv;
443         struct job *job;
444
445         /* initialize description of itself and link it in the list */
446         me->tid = pthread_self();
447         me->stop = 0;
448         me->waits = 0;
449         me->upper = current_thread;
450         if (!current_thread) {
451                 started++;
452                 sig_monitor_init_timeouts();
453         }
454         me->next = threads;
455         threads = (struct thread*)me;
456         current_thread = (struct thread*)me;
457
458         /* loop until stopped */
459         while (!me->stop) {
460                 /* release the current event loop */
461                 evloop_release();
462
463                 /* get a job */
464                 job = job_get();
465                 if (job) {
466                         /* prepare running the job */
467                         remains++; /* increases count of job that can wait */
468                         job->blocked = 1; /* mark job as blocked */
469                         me->job = job; /* record the job (only for terminate) */
470
471                         /* run the job */
472                         pthread_mutex_unlock(&mutex);
473                         sig_monitor(job->timeout, job->callback, job->arg);
474                         pthread_mutex_lock(&mutex);
475
476                         /* release the run job */
477                         job_release(job);
478 #if !defined(REMOVE_SYSTEMD_EVENT)
479
480
481
482                 /* no job, check event loop wait */
483                 } else if (evloop_get()) {
484                         if (evloop.state != 0) {
485                                 /* busy ? */
486                                 CRITICAL("Can't enter dispatch while in dispatch!");
487                                 abort();
488                         }
489                         /* run the events */
490                         evloop.state = EVLOOP_STATE_RUN|EVLOOP_STATE_WAIT;
491                         pthread_mutex_unlock(&mutex);
492                         sig_monitor(0, evloop_run, NULL);
493                         pthread_mutex_lock(&mutex);
494                         evloop.state = 0;
495                 } else {
496                         /* no job and no event loop */
497                         running--;
498                         if (!running)
499                                 ERROR("Entering job deep sleep! Check your bindings.");
500                         me->waits = 1;
501                         pthread_cond_wait(&cond, &mutex);
502                         me->waits = 0;
503                         running++;
504 #else
505                 } else if (waitevt) {
506                         /* no job and not events */
507                         running--;
508                         if (!running)
509                                 ERROR("Entering job deep sleep! Check your bindings.");
510                         me->waits = 1;
511                         pthread_cond_wait(&cond, &mutex);
512                         me->waits = 0;
513                         running++;
514                 } else {
515                         /* wait for events */
516                         waitevt = 1;
517                         pthread_mutex_unlock(&mutex);
518                         sig_monitor(0, monitored_wait_and_dispatch, get_fdevepoll());
519                         pthread_mutex_lock(&mutex);
520                         waitevt = 0;
521 #endif
522                 }
523         }
524
525         /* release the event loop */
526         evloop_release();
527
528         /* unlink the current thread and cleanup */
529         prv = &threads;
530         while (*prv != me)
531                 prv = &(*prv)->next;
532         *prv = me->next;
533         current_thread = me->upper;
534         if (!current_thread) {
535                 sig_monitor_clean_timeouts();
536                 started--;
537         }
538 }
539
540 /**
541  * Entry point for created threads.
542  * @param data not used
543  * @return NULL
544  */
545 static void *thread_main(void *data)
546 {
547         struct thread me;
548
549         pthread_mutex_lock(&mutex);
550         running++;
551         thread_run(&me);
552         running--;
553         pthread_mutex_unlock(&mutex);
554         return NULL;
555 }
556
557 /**
558  * Starts a new thread
559  * @return 0 in case of success or -1 in case of error
560  */
561 static int start_one_thread()
562 {
563         pthread_t tid;
564         int rc;
565
566         rc = pthread_create(&tid, NULL, thread_main, NULL);
567         if (rc != 0) {
568                 /* errno = rc; */
569                 WARNING("not able to start thread: %m");
570                 rc = -1;
571         }
572         return rc;
573 }
574
575 /**
576  * Queues a new asynchronous job represented by 'callback' and 'arg'
577  * for the 'group' and the 'timeout'.
578  * Jobs are queued FIFO and are possibly executed in parallel
579  * concurrently except for job of the same group that are
580  * executed sequentially in FIFO order.
581  * @param group    The group of the job or NULL when no group.
582  * @param timeout  The maximum execution time in seconds of the job
583  *                 or 0 for unlimited time.
584  * @param callback The function to execute for achieving the job.
585  *                 Its first parameter is either 0 on normal flow
586  *                 or the signal number that broke the normal flow.
587  *                 The remaining parameter is the parameter 'arg1'
588  *                 given here.
589  * @param arg      The second argument for 'callback'
590  * @return 0 in case of success or -1 in case of error
591  */
592 int jobs_queue(
593                 const void *group,
594                 int timeout,
595                 void (*callback)(int, void*),
596                 void *arg)
597 {
598         const char *info;
599         struct job *job;
600         int rc;
601
602         pthread_mutex_lock(&mutex);
603
604         /* allocates the job */
605         job = job_create(group, timeout, callback, arg);
606         if (!job) {
607                 errno = ENOMEM;
608                 info = "out of memory";
609                 goto error;
610         }
611
612         /* check availability */
613         if (remains == 0) {
614                 errno = EBUSY;
615                 info = "too many jobs";
616                 goto error2;
617         }
618
619         /* start a thread if needed */
620         if (running == started && started < allowed) {
621                 /* all threads are busy and a new can be started */
622                 rc = start_one_thread();
623                 if (rc < 0 && started == 0) {
624                         info = "can't start first thread";
625                         goto error2;
626                 }
627         }
628
629         /* queues the job */
630         remains--;
631         job_add(job);
632
633         /* signal an existing job */
634         pthread_cond_signal(&cond);
635         pthread_mutex_unlock(&mutex);
636         return 0;
637
638 error2:
639         job->next = free_jobs;
640         free_jobs = job;
641 error:
642         ERROR("can't process job with threads: %s, %m", info);
643         pthread_mutex_unlock(&mutex);
644         return -1;
645 }
646
647 /**
648  * Internal helper function for 'jobs_enter'.
649  * @see jobs_enter, jobs_leave
650  */
651 static void enter_cb(int signum, void *closure)
652 {
653         struct sync *sync = closure;
654         sync->enter(signum, sync->arg, (void*)&sync->thread);
655 }
656
657 /**
658  * Internal helper function for 'jobs_call'.
659  * @see jobs_call
660  */
661 static void call_cb(int signum, void *closure)
662 {
663         struct sync *sync = closure;
664         sync->callback(signum, sync->arg);
665         jobs_leave((void*)&sync->thread);
666 }
667
668 /**
669  * Internal helper for synchronous jobs. It enters
670  * a new thread loop for evaluating the given job
671  * as recorded by the couple 'sync_cb' and 'sync'.
672  * @see jobs_call, jobs_enter, jobs_leave
673  */
674 static int do_sync(
675                 const void *group,
676                 int timeout,
677                 void (*sync_cb)(int signum, void *closure),
678                 struct sync *sync
679 )
680 {
681         struct job *job;
682
683         pthread_mutex_lock(&mutex);
684
685         /* allocates the job */
686         job = job_create(group, timeout, sync_cb, sync);
687         if (!job) {
688                 ERROR("out of memory");
689                 errno = ENOMEM;
690                 pthread_mutex_unlock(&mutex);
691                 return -1;
692         }
693
694         /* queues the job */
695         job_add(job);
696
697         /* run until stopped */
698         thread_run(&sync->thread);
699         pthread_mutex_unlock(&mutex);
700         return 0;
701 }
702
703 /**
704  * Enter a synchronisation point: activates the job given by 'callback'
705  * and 'closure' using 'group' and 'timeout' to control sequencing and
706  * execution time.
707  * @param group the group for sequencing jobs
708  * @param timeout the time in seconds allocated to the job
709  * @param callback the callback that will handle the job.
710  *                 it receives 3 parameters: 'signum' that will be 0
711  *                 on normal flow or the catched signal number in case
712  *                 of interrupted flow, the context 'closure' as given and
713  *                 a 'jobloop' reference that must be used when the job is
714  *                 terminated to unlock the current execution flow.
715  * @param closure the argument to the callback
716  * @return 0 on success or -1 in case of error
717  */
718 int jobs_enter(
719                 const void *group,
720                 int timeout,
721                 void (*callback)(int signum, void *closure, struct jobloop *jobloop),
722                 void *closure
723 )
724 {
725         struct sync sync;
726
727         sync.enter = callback;
728         sync.arg = closure;
729         return do_sync(group, timeout, enter_cb, &sync);
730 }
731
732 /**
733  * Unlocks the execution flow designed by 'jobloop'.
734  * @param jobloop indication of the flow to unlock
735  * @return 0 in case of success of -1 on error
736  */
737 int jobs_leave(struct jobloop *jobloop)
738 {
739         struct thread *t;
740
741         pthread_mutex_lock(&mutex);
742         t = threads;
743         while (t && t != (struct thread*)jobloop)
744                 t = t->next;
745         if (!t) {
746                 errno = EINVAL;
747         } else {
748                 t->stop = 1;
749                 if (t->waits)
750                         pthread_cond_broadcast(&cond);
751                 else
752                         evloop_wakeup();
753         }
754         pthread_mutex_unlock(&mutex);
755         return -!t;
756 }
757
758 /**
759  * Calls synchronously the job represented by 'callback' and 'arg1'
760  * for the 'group' and the 'timeout' and waits for its completion.
761  * @param group    The group of the job or NULL when no group.
762  * @param timeout  The maximum execution time in seconds of the job
763  *                 or 0 for unlimited time.
764  * @param callback The function to execute for achieving the job.
765  *                 Its first parameter is either 0 on normal flow
766  *                 or the signal number that broke the normal flow.
767  *                 The remaining parameter is the parameter 'arg1'
768  *                 given here.
769  * @param arg      The second argument for 'callback'
770  * @return 0 in case of success or -1 in case of error
771  */
772 int jobs_call(
773                 const void *group,
774                 int timeout,
775                 void (*callback)(int, void*),
776                 void *arg)
777 {
778         struct sync sync;
779
780         sync.callback = callback;
781         sync.arg = arg;
782
783         return do_sync(group, timeout, call_cb, &sync);
784 }
785
786 /**
787  * Internal callback for evloop management.
788  * The effect of this function is hidden: it exits
789  * the waiting poll if any. Then it wakes up a thread
790  * awaiting the evloop using signal.
791  */
792 static int on_evloop_efd(sd_event_source *s, int fd, uint32_t revents, void *userdata)
793 {
794         evloop_on_efd_event();
795         return 1;
796 }
797
798 /* temporary hack */
799 #if !defined(REMOVE_SYSTEMD_EVENT)
800 __attribute__((unused))
801 #endif
802 static void evloop_callback(void *arg, uint32_t event, struct fdev *fdev)
803 {
804         sig_monitor(0, evloop_run, arg);
805 }
806
807 /**
808  * Gets a sd_event item for the current thread.
809  * @return a sd_event or NULL in case of error
810  */
811 static struct sd_event *get_sd_event_locked()
812 {
813         int rc;
814
815         /* creates the evloop on need */
816         if (!evloop.sdev) {
817                 /* start the creation */
818                 evloop.state = 0;
819                 /* creates the eventfd for waking up polls */
820                 evloop.efd = eventfd(0, EFD_CLOEXEC|EFD_SEMAPHORE);
821                 if (evloop.efd < 0) {
822                         ERROR("can't make eventfd for events");
823                         goto error1;
824                 }
825                 /* create the systemd event loop */
826                 rc = sd_event_new(&evloop.sdev);
827                 if (rc < 0) {
828                         ERROR("can't make new event loop");
829                         goto error2;
830                 }
831                 /* put the eventfd in the event loop */
832                 rc = sd_event_add_io(evloop.sdev, NULL, evloop.efd, EPOLLIN, on_evloop_efd, NULL);
833                 if (rc < 0) {
834                         ERROR("can't register eventfd");
835 #if !defined(REMOVE_SYSTEMD_EVENT)
836                         sd_event_unref(evloop.sdev);
837                         evloop.sdev = NULL;
838 error2:
839                         close(evloop.efd);
840 error1:
841                         return NULL;
842                 }
843 #else
844                         goto error3;
845                 }
846                 /* handle the event loop */
847                 evloop.fdev = fdev_epoll_add(get_fdevepoll(), sd_event_get_fd(evloop.sdev));
848                 if (!evloop.fdev) {
849                         ERROR("can't create fdev");
850 error3:
851                         sd_event_unref(evloop.sdev);
852 error2:
853                         close(evloop.efd);
854 error1:
855                         memset(&evloop, 0, sizeof evloop);
856                         return NULL;
857                 }
858                 fdev_set_autoclose(evloop.fdev, 0);
859                 fdev_set_events(evloop.fdev, EPOLLIN);
860                 fdev_set_callback(evloop.fdev, evloop_callback, NULL);
861 #endif
862         }
863
864         /* acquire the event loop */
865         evloop_acquire();
866
867         return evloop.sdev;
868 }
869
870 /**
871  * Gets a sd_event item for the current thread.
872  * @return a sd_event or NULL in case of error
873  */
874 struct sd_event *jobs_get_sd_event()
875 {
876         struct sd_event *result;
877         struct thread lt;
878
879         /* ensure an existing thread environment */
880         if (!current_thread) {
881                 memset(&lt, 0, sizeof lt);
882                 current_thread = &lt;
883         }
884
885         /* process */
886         pthread_mutex_lock(&mutex);
887         result = get_sd_event_locked();
888         pthread_mutex_unlock(&mutex);
889
890         /* release the faked thread environment if needed */
891         if (current_thread == &lt) {
892                 /*
893                  * Releasing it is needed because there is no way to guess
894                  * when it has to be released really. But here is where it is
895                  * hazardous: if the caller modifies the eventloop when it
896                  * is waiting, there is no way to make the change effective.
897                  * A workaround to achieve that goal is for the caller to
898                  * require the event loop a second time after having modified it.
899                  */
900                 NOTICE("Requiring sd_event loop out of binder callbacks is hazardous!");
901                 if (verbose_wants(Log_Level_Info))
902                         sig_monitor_dumpstack();
903                 evloop_release();
904                 current_thread = NULL;
905         }
906
907         return result;
908 }
909
910 #if defined(REMOVE_SYSTEMD_EVENT)
911 /**
912  * Gets the fdev_epoll item.
913  * @return a fdev_epoll or NULL in case of error
914  */
915 struct fdev_epoll *jobs_get_fdev_epoll()
916 {
917         struct fdev_epoll *result;
918
919         pthread_mutex_lock(&mutex);
920         result = get_fdevepoll();
921         pthread_mutex_unlock(&mutex);
922
923         return result;
924 }
925 #endif
926
927 /**
928  * Enter the jobs processing loop.
929  * @param allowed_count Maximum count of thread for jobs including this one
930  * @param start_count   Count of thread to start now, must be lower.
931  * @param waiter_count  Maximum count of jobs that can be waiting.
932  * @param start         The start routine to activate (can't be NULL)
933  * @return 0 in case of success or -1 in case of error.
934  */
935 int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)(int signum, void* arg), void *arg)
936 {
937         int rc, launched;
938         struct thread me;
939         struct job *job;
940
941         assert(allowed_count >= 1);
942         assert(start_count >= 0);
943         assert(waiter_count > 0);
944         assert(start_count <= allowed_count);
945
946         rc = -1;
947         pthread_mutex_lock(&mutex);
948
949         /* check whether already running */
950         if (current_thread || allowed) {
951                 ERROR("thread already started");
952                 errno = EINVAL;
953                 goto error;
954         }
955
956         /* records the allowed count */
957         allowed = allowed_count;
958         started = 0;
959         running = 0;
960         remains = waiter_count;
961
962 #if HAS_WATCHDOG
963         /* set the watchdog */
964         if (sd_watchdog_enabled(0, NULL))
965                 sd_event_set_watchdog(get_sd_event_locked(), 1);
966 #endif
967
968         /* start at least one thread */
969         launched = 0;
970         while ((launched + 1) < start_count) {
971                 if (start_one_thread() != 0) {
972                         ERROR("Not all threads can be started");
973                         goto error;
974                 }
975                 launched++;
976         }
977
978         /* queue the start job */
979         job = job_create(NULL, 0, start, arg);
980         if (!job) {
981                 ERROR("out of memory");
982                 errno = ENOMEM;
983                 goto error;
984         }
985         job_add(job);
986         remains--;
987
988         /* run until end */
989         running++;
990         thread_run(&me);
991         running--;
992         rc = 0;
993 error:
994         pthread_mutex_unlock(&mutex);
995         return rc;
996 }
997
998 /**
999  * Terminate all the threads and cancel all pending jobs.
1000  */
1001 void jobs_terminate()
1002 {
1003         struct job *job, *head, *tail;
1004         pthread_t me, *others;
1005         struct thread *t;
1006         int count;
1007
1008         /* how am i? */
1009         me = pthread_self();
1010
1011         /* request all threads to stop */
1012         pthread_mutex_lock(&mutex);
1013         allowed = 0;
1014
1015         /* count the number of threads */
1016         count = 0;
1017         t = threads;
1018         while (t) {
1019                 if (!t->upper && !pthread_equal(t->tid, me))
1020                         count++;
1021                 t = t->next;
1022         }
1023
1024         /* fill the array of threads */
1025         others = alloca(count * sizeof *others);
1026         count = 0;
1027         t = threads;
1028         while (t) {
1029                 if (!t->upper && !pthread_equal(t->tid, me))
1030                         others[count++] = t->tid;
1031                 t = t->next;
1032         }
1033
1034         /* stops the threads */
1035         t = threads;
1036         while (t) {
1037                 t->stop = 1;
1038                 t = t->next;
1039         }
1040
1041         /* wait the threads */
1042         pthread_cond_broadcast(&cond);
1043         pthread_mutex_unlock(&mutex);
1044         while (count)
1045                 pthread_join(others[--count], NULL);
1046         pthread_mutex_lock(&mutex);
1047
1048         /* cancel pending jobs of other threads */
1049         remains = 0;
1050         head = first_job;
1051         first_job = NULL;
1052         tail = NULL;
1053         while (head) {
1054                 /* unlink the job */
1055                 job = head;
1056                 head = job->next;
1057
1058                 /* search if job is stacked for current */
1059                 t = current_thread;
1060                 while (t && t->job != job)
1061                         t = t->upper;
1062                 if (t) {
1063                         /* yes, relink it at end */
1064                         if (tail)
1065                                 tail->next = job;
1066                         else
1067                                 first_job = job;
1068                         tail = job;
1069                         job->next = NULL;
1070                 } else {
1071                         /* no cancel the job */
1072                         pthread_mutex_unlock(&mutex);
1073                         sig_monitor(0, job_cancel, job);
1074                         free(job);
1075                         pthread_mutex_lock(&mutex);
1076                 }
1077         }
1078         pthread_mutex_unlock(&mutex);
1079 }
1080