f2c9d52be375cf756ab904b4debce1e08db53f4b
[src/app-framework-binder.git] / src / jobs.c
1 /*
2  * Copyright (C) 2016, 2017, 2018 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #if defined(NO_JOBS_WATCHDOG)
21 #   define HAS_WATCHDOG 0
22 #else
23 #   define HAS_WATCHDOG 1
24 #endif
25
26 #include <stdlib.h>
27 #include <stdint.h>
28 #include <unistd.h>
29 #include <signal.h>
30 #include <string.h>
31 #include <time.h>
32 #include <sys/syscall.h>
33 #include <pthread.h>
34 #include <errno.h>
35 #include <assert.h>
36 #include <sys/eventfd.h>
37
38 #include <systemd/sd-event.h>
39 #include "fdev.h"
40 #if HAS_WATCHDOG
41 #include <systemd/sd-daemon.h>
42 #endif
43
44 #include "jobs.h"
45 #include "sig-monitor.h"
46 #include "verbose.h"
47
48 #if defined(REMOVE_SYSTEMD_EVENT)
49 #include "fdev-epoll.h"
50 #endif
51
52 #define EVENT_TIMEOUT_TOP       ((uint64_t)-1)
53 #define EVENT_TIMEOUT_CHILD     ((uint64_t)10000)
54
55 struct thread;
56
57 /** Internal shortcut for callback */
58 typedef void (*job_cb_t)(int, void*);
59
60 /** Description of a pending job */
61 struct job
62 {
63         struct job *next;    /**< link to the next job enqueued */
64         const void *group;   /**< group of the request */
65         job_cb_t callback;   /**< processing callback */
66         void *arg;           /**< argument */
67         int timeout;         /**< timeout in second for processing the request */
68         unsigned blocked: 1; /**< is an other request blocking this one ? */
69         unsigned dropped: 1; /**< is removed ? */
70 };
71
72 /** Description of handled event loops */
73 struct evloop
74 {
75         unsigned state;        /**< encoded state */
76         int efd;               /**< event notification */
77         struct sd_event *sdev; /**< the systemd event loop */
78         pthread_cond_t  cond;  /**< condition */
79         struct fdev *fdev;     /**< handling of events */
80         struct thread *holder; /**< holder of the evloop */
81 };
82
83 #define EVLOOP_STATE_WAIT           1U
84 #define EVLOOP_STATE_RUN            2U
85 #define EVLOOP_STATE_LOCK           4U
86
87 /** Description of threads */
88 struct thread
89 {
90         struct thread *next;   /**< next thread of the list */
91         struct thread *upper;  /**< upper same thread */
92         struct job *job;       /**< currently processed job */
93         pthread_t tid;         /**< the thread id */
94         volatile unsigned stop: 1;      /**< stop requested */
95         volatile unsigned waits: 1;     /**< is waiting? */
96 };
97
98 /**
99  * Description of synchonous callback
100  */
101 struct sync
102 {
103         struct thread thread;   /**< thread loop data */
104         union {
105                 void (*callback)(int, void*);   /**< the synchronous callback */
106                 void (*enter)(int signum, void *closure, struct jobloop *jobloop);
107                                 /**< the entering synchronous routine */
108         };
109         void *arg;              /**< the argument of the callback */
110 };
111
112
113 /* synchronisation of threads */
114 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
115 static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
116
117 /* count allowed, started and running threads */
118 static int allowed = 0; /** allowed count of threads */
119 static int started = 0; /** started count of threads */
120 static int running = 0; /** running count of threads */
121 static int remains = 0; /** allowed count of waiting jobs */
122
123 /* list of threads */
124 static struct thread *threads;
125 static _Thread_local struct thread *current_thread;
126 static _Thread_local struct evloop *current_evloop;
127
128 /* queue of pending jobs */
129 static struct job *first_job;
130 static struct job *free_jobs;
131
132 /* event loop */
133 static struct evloop evloop[1];
134
135 #if defined(REMOVE_SYSTEMD_EVENT)
136 static struct fdev_epoll *fdevepoll;
137 static int waitevt;
138 #endif
139
140 /**
141  * Create a new job with the given parameters
142  * @param group    the group of the job
143  * @param timeout  the timeout of the job (0 if none)
144  * @param callback the function that achieves the job
145  * @param arg      the argument of the callback
146  * @return the created job unblock or NULL when no more memory
147  */
148 static struct job *job_create(
149                 const void *group,
150                 int timeout,
151                 job_cb_t callback,
152                 void *arg)
153 {
154         struct job *job;
155
156         /* try recyle existing job */
157         job = free_jobs;
158         if (job)
159                 free_jobs = job->next;
160         else {
161                 /* allocation without blocking */
162                 pthread_mutex_unlock(&mutex);
163                 job = malloc(sizeof *job);
164                 pthread_mutex_lock(&mutex);
165                 if (!job) {
166                         errno = ENOMEM;
167                         goto end;
168                 }
169         }
170         /* initialises the job */
171         job->group = group;
172         job->timeout = timeout;
173         job->callback = callback;
174         job->arg = arg;
175         job->blocked = 0;
176         job->dropped = 0;
177 end:
178         return job;
179 }
180
181 /**
182  * Adds 'job' at the end of the list of jobs, marking it
183  * as blocked if an other job with the same group is pending.
184  * @param job the job to add
185  */
186 static void job_add(struct job *job)
187 {
188         const void *group;
189         struct job *ijob, **pjob;
190
191         /* prepare to add */
192         group = job->group;
193         job->next = NULL;
194
195         /* search end and blockers */
196         pjob = &first_job;
197         ijob = first_job;
198         while (ijob) {
199                 if (group && ijob->group == group)
200                         job->blocked = 1;
201                 pjob = &ijob->next;
202                 ijob = ijob->next;
203         }
204
205         /* queue the jobs */
206         *pjob = job;
207 }
208
209 /**
210  * Get the next job to process or NULL if none.
211  * @return the first job that isn't blocked or NULL
212  */
213 static inline struct job *job_get()
214 {
215         struct job *job = first_job;
216         while (job && job->blocked)
217                 job = job->next;
218         return job;
219 }
220
221 /**
222  * Releases the processed 'job': removes it
223  * from the list of jobs and unblock the first
224  * pending job of the same group if any.
225  * @param job the job to release
226  */
227 static inline void job_release(struct job *job)
228 {
229         struct job *ijob, **pjob;
230         const void *group;
231
232         /* first unqueue the job */
233         pjob = &first_job;
234         ijob = first_job;
235         while (ijob != job) {
236                 pjob = &ijob->next;
237                 ijob = ijob->next;
238         }
239         *pjob = job->next;
240
241         /* then unblock jobs of the same group */
242         group = job->group;
243         if (group) {
244                 ijob = job->next;
245                 while (ijob && ijob->group != group)
246                         ijob = ijob->next;
247                 if (ijob)
248                         ijob->blocked = 0;
249         }
250
251         /* recycle the job */
252         job->next = free_jobs;
253         free_jobs = job;
254 }
255
256 /**
257  * Monitored cancel callback for a job.
258  * This function is called by the monitor
259  * to cancel the job when the safe environment
260  * is set.
261  * @param signum 0 on normal flow or the number
262  *               of the signal that interrupted the normal
263  *               flow, isn't used
264  * @param arg    the job to run
265  */
266 static void job_cancel(int signum, void *arg)
267 {
268         struct job *job = arg;
269         job->callback(SIGABRT, job->arg);
270 }
271
272 #if defined(REMOVE_SYSTEMD_EVENT)
273 /**
274  * Gets a fdev_epoll item.
275  * @return a fdev_epoll or NULL in case of error
276  */
277 static struct fdev_epoll *get_fdevepoll()
278 {
279         struct fdev_epoll *result;
280
281         result = fdevepoll;
282         if (!result)
283                 result = fdevepoll = fdev_epoll_create();
284
285         return result;
286 }
287 #endif
288
289 /**
290  * Monitored normal callback for events.
291  * This function is called by the monitor
292  * to run the event loop when the safe environment
293  * is set.
294  * @param signum 0 on normal flow or the number
295  *               of the signal that interrupted the normal
296  *               flow
297  * @param arg     the events to run
298  */
299 static void evloop_run(int signum, void *arg)
300 {
301         int rc;
302         struct sd_event *se;
303         struct evloop *el = arg;
304
305         if (!signum) {
306                 current_evloop = el;
307                 __atomic_store_n(&el->state, EVLOOP_STATE_LOCK|EVLOOP_STATE_RUN|EVLOOP_STATE_WAIT, __ATOMIC_RELAXED);
308                 __atomic_store_n(&el->holder, current_thread, __ATOMIC_RELAXED);
309                 se = el->sdev;
310                 rc = sd_event_prepare(se);
311                 if (rc < 0) {
312                         errno = -rc;
313                         CRITICAL("sd_event_prepare returned an error (state: %d): %m", sd_event_get_state(se));
314                         abort();
315                 } else {
316                         if (rc == 0) {
317                                 rc = sd_event_wait(se, (uint64_t)(int64_t)-1);
318                                 if (rc < 0) {
319                                         errno = -rc;
320                                         ERROR("sd_event_wait returned an error (state: %d): %m", sd_event_get_state(se));
321                                 }
322                         }
323                         __atomic_and_fetch(&el->state, ~(EVLOOP_STATE_WAIT), __ATOMIC_RELAXED);
324
325                         if (rc > 0) {
326                                 rc = sd_event_dispatch(se);
327                                 if (rc < 0) {
328                                         errno = -rc;
329                                         ERROR("sd_event_dispatch returned an error (state: %d): %m", sd_event_get_state(se));
330                                 }
331                         }
332                 }
333         }
334         __atomic_and_fetch(&el->state, ~(EVLOOP_STATE_WAIT|EVLOOP_STATE_RUN), __ATOMIC_RELAXED);
335 }
336
337
338 #if defined(REMOVE_SYSTEMD_EVENT)
339 /**
340  * Monitored normal loop for waiting events.
341  * @param signum 0 on normal flow or the number
342  *               of the signal that interrupted the normal
343  *               flow
344  * @param arg     the events to run
345  */
346 static void monitored_wait_and_dispatch(int signum, void *arg)
347 {
348         struct fdev_epoll *fdev_epoll = arg;
349         if (!signum) {
350                 fdev_epoll_wait_and_dispatch(fdev_epoll, -1);
351         }
352 }
353 #endif
354
355 /**
356  * Main processing loop of threads processing jobs.
357  * The loop must be called with the mutex locked
358  * and it returns with the mutex locked.
359  * @param me the description of the thread to use
360  * TODO: how are timeout handled when reentering?
361  */
362 static void thread_run(volatile struct thread *me)
363 {
364         struct thread **prv;
365         struct job *job;
366 #if !defined(REMOVE_SYSTEMD_EVENT)
367         struct evloop *el;
368 #endif
369
370         /* initialize description of itself and link it in the list */
371         me->tid = pthread_self();
372         me->stop = 0;
373         me->waits = 0;
374         me->upper = current_thread;
375         if (!current_thread) {
376                 started++;
377                 sig_monitor_init_timeouts();
378         }
379         me->next = threads;
380         threads = (struct thread*)me;
381         current_thread = (struct thread*)me;
382
383         /* loop until stopped */
384         while (!me->stop) {
385                 /* release the event loop */
386                 if (current_evloop) {
387                         __atomic_and_fetch(&current_evloop->state, ~EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
388                         __atomic_store_n(&current_evloop->holder, NULL, __ATOMIC_RELAXED);
389                         current_evloop = NULL;
390                 }
391
392                 /* get a job */
393                 job = job_get();
394                 if (job) {
395                         /* prepare running the job */
396                         remains++; /* increases count of job that can wait */
397                         job->blocked = 1; /* mark job as blocked */
398                         me->job = job; /* record the job (only for terminate) */
399
400                         /* run the job */
401                         pthread_mutex_unlock(&mutex);
402                         sig_monitor(job->timeout, job->callback, job->arg);
403                         pthread_mutex_lock(&mutex);
404
405                         /* release the run job */
406                         job_release(job);
407 #if !defined(REMOVE_SYSTEMD_EVENT)
408                 } else {
409                         /* no job, check events */
410                         el = &evloop[0];
411                         if (el->sdev && !__atomic_load_n(&el->state, __ATOMIC_RELAXED)) {
412                                 /* run the events */
413                                 __atomic_store_n(&el->state, EVLOOP_STATE_LOCK|EVLOOP_STATE_RUN|EVLOOP_STATE_WAIT, __ATOMIC_RELAXED);
414                                 __atomic_store_n(&el->holder, me, __ATOMIC_RELAXED);
415                                 current_evloop = el;
416                                 pthread_mutex_unlock(&mutex);
417                                 sig_monitor(0, evloop_run, el);
418                                 pthread_mutex_lock(&mutex);
419                         } else {
420                                 /* no job and not events */
421                                 running--;
422                                 if (!running)
423                                         ERROR("Entering job deep sleep! Check your bindings.");
424                                 me->waits = 1;
425                                 pthread_cond_wait(&cond, &mutex);
426                                 me->waits = 0;
427                                 running++;
428                         }
429 #else
430                 } else if (waitevt) {
431                         /* no job and not events */
432                         running--;
433                         if (!running)
434                                 ERROR("Entering job deep sleep! Check your bindings.");
435                         me->waits = 1;
436                         pthread_cond_wait(&cond, &mutex);
437                         me->waits = 0;
438                         running++;
439                 } else {
440                         /* wait for events */
441                         waitevt = 1;
442                         pthread_mutex_unlock(&mutex);
443                         sig_monitor(0, monitored_wait_and_dispatch, get_fdevepoll());
444                         pthread_mutex_lock(&mutex);
445                         waitevt = 0;
446 #endif
447                 }
448         }
449
450         /* release the event loop */
451         if (current_evloop) {
452                 __atomic_and_fetch(&current_evloop->state, ~EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
453                 __atomic_store_n(&el->holder, NULL, __ATOMIC_RELAXED);
454                 current_evloop = NULL;
455         }
456
457         /* unlink the current thread and cleanup */
458         prv = &threads;
459         while (*prv != me)
460                 prv = &(*prv)->next;
461         *prv = me->next;
462         current_thread = me->upper;
463         if (!current_thread) {
464                 sig_monitor_clean_timeouts();
465                 started--;
466         }
467 }
468
469 /**
470  * Entry point for created threads.
471  * @param data not used
472  * @return NULL
473  */
474 static void *thread_main(void *data)
475 {
476         struct thread me;
477
478         pthread_mutex_lock(&mutex);
479         running++;
480         thread_run(&me);
481         running--;
482         pthread_mutex_unlock(&mutex);
483         return NULL;
484 }
485
486 /**
487  * Starts a new thread
488  * @return 0 in case of success or -1 in case of error
489  */
490 static int start_one_thread()
491 {
492         pthread_t tid;
493         int rc;
494
495         rc = pthread_create(&tid, NULL, thread_main, NULL);
496         if (rc != 0) {
497                 /* errno = rc; */
498                 WARNING("not able to start thread: %m");
499                 rc = -1;
500         }
501         return rc;
502 }
503
504 /**
505  * Queues a new asynchronous job represented by 'callback' and 'arg'
506  * for the 'group' and the 'timeout'.
507  * Jobs are queued FIFO and are possibly executed in parallel
508  * concurrently except for job of the same group that are
509  * executed sequentially in FIFO order.
510  * @param group    The group of the job or NULL when no group.
511  * @param timeout  The maximum execution time in seconds of the job
512  *                 or 0 for unlimited time.
513  * @param callback The function to execute for achieving the job.
514  *                 Its first parameter is either 0 on normal flow
515  *                 or the signal number that broke the normal flow.
516  *                 The remaining parameter is the parameter 'arg1'
517  *                 given here.
518  * @param arg      The second argument for 'callback'
519  * @return 0 in case of success or -1 in case of error
520  */
521 int jobs_queue(
522                 const void *group,
523                 int timeout,
524                 void (*callback)(int, void*),
525                 void *arg)
526 {
527         const char *info;
528         struct job *job;
529         int rc;
530
531         pthread_mutex_lock(&mutex);
532
533         /* allocates the job */
534         job = job_create(group, timeout, callback, arg);
535         if (!job) {
536                 errno = ENOMEM;
537                 info = "out of memory";
538                 goto error;
539         }
540
541         /* check availability */
542         if (remains == 0) {
543                 errno = EBUSY;
544                 info = "too many jobs";
545                 goto error2;
546         }
547
548         /* start a thread if needed */
549         if (running == started && started < allowed) {
550                 /* all threads are busy and a new can be started */
551                 rc = start_one_thread();
552                 if (rc < 0 && started == 0) {
553                         info = "can't start first thread";
554                         goto error2;
555                 }
556         }
557
558         /* queues the job */
559         remains--;
560         job_add(job);
561
562         /* signal an existing job */
563         pthread_cond_signal(&cond);
564         pthread_mutex_unlock(&mutex);
565         return 0;
566
567 error2:
568         job->next = free_jobs;
569         free_jobs = job;
570 error:
571         ERROR("can't process job with threads: %s, %m", info);
572         pthread_mutex_unlock(&mutex);
573         return -1;
574 }
575
576 /**
577  * Internal helper function for 'jobs_enter'.
578  * @see jobs_enter, jobs_leave
579  */
580 static void enter_cb(int signum, void *closure)
581 {
582         struct sync *sync = closure;
583         sync->enter(signum, sync->arg, (void*)&sync->thread);
584 }
585
586 /**
587  * Internal helper function for 'jobs_call'.
588  * @see jobs_call
589  */
590 static void call_cb(int signum, void *closure)
591 {
592         struct sync *sync = closure;
593         sync->callback(signum, sync->arg);
594         jobs_leave((void*)&sync->thread);
595 }
596
597 /**
598  * Internal helper for synchronous jobs. It enters
599  * a new thread loop for evaluating the given job
600  * as recorded by the couple 'sync_cb' and 'sync'.
601  * @see jobs_call, jobs_enter, jobs_leave
602  */
603 static int do_sync(
604                 const void *group,
605                 int timeout,
606                 void (*sync_cb)(int signum, void *closure),
607                 struct sync *sync
608 )
609 {
610         struct job *job;
611
612         pthread_mutex_lock(&mutex);
613
614         /* allocates the job */
615         job = job_create(group, timeout, sync_cb, sync);
616         if (!job) {
617                 ERROR("out of memory");
618                 errno = ENOMEM;
619                 pthread_mutex_unlock(&mutex);
620                 return -1;
621         }
622
623         /* queues the job */
624         job_add(job);
625
626         /* run until stopped */
627         thread_run(&sync->thread);
628         pthread_mutex_unlock(&mutex);
629         return 0;
630 }
631
632 /**
633  * Enter a synchronisation point: activates the job given by 'callback'
634  * and 'closure' using 'group' and 'timeout' to control sequencing and
635  * execution time.
636  * @param group the group for sequencing jobs
637  * @param timeout the time in seconds allocated to the job
638  * @param callback the callback that will handle the job.
639  *                 it receives 3 parameters: 'signum' that will be 0
640  *                 on normal flow or the catched signal number in case
641  *                 of interrupted flow, the context 'closure' as given and
642  *                 a 'jobloop' reference that must be used when the job is
643  *                 terminated to unlock the current execution flow.
644  * @param closure the argument to the callback
645  * @return 0 on success or -1 in case of error
646  */
647 int jobs_enter(
648                 const void *group,
649                 int timeout,
650                 void (*callback)(int signum, void *closure, struct jobloop *jobloop),
651                 void *closure
652 )
653 {
654         struct sync sync;
655
656         sync.enter = callback;
657         sync.arg = closure;
658         return do_sync(group, timeout, enter_cb, &sync);
659 }
660
661 /**
662  * Internal callback for evloop management.
663  * The effect of this function is hidden: it exits
664  * the waiting poll if any. Then it wakes up a thread
665  * awaiting the evloop using signal.
666  */
667 static int on_evloop_efd(sd_event_source *s, int fd, uint32_t revents, void *userdata)
668 {
669         uint64_t x;
670         struct evloop *evloop = userdata;
671         read(evloop->efd, &x, sizeof x);
672         pthread_mutex_lock(&mutex);
673         pthread_cond_broadcast(&evloop->cond);
674         pthread_mutex_unlock(&mutex);
675         return 1;
676 }
677
678 /**
679  * unlock the event loop if needed by sending
680  * an event.
681  * @param el the event loop to unlock
682  * @param wait wait the unlocked state of the event loop
683  */
684 static void unlock_evloop(struct evloop *el, int wait)
685 {
686         /* wait for a modifiable event loop */
687         while (__atomic_load_n(&el->state, __ATOMIC_RELAXED) & EVLOOP_STATE_WAIT) {
688                 uint64_t x = 1;
689                 write(el->efd, &x, sizeof x);
690                 if (!wait)
691                         break;
692                 pthread_cond_wait(&el->cond, &mutex);
693         }
694 }
695
696 /**
697  * Unlocks the execution flow designed by 'jobloop'.
698  * @param jobloop indication of the flow to unlock
699  * @return 0 in case of success of -1 on error
700  */
701 int jobs_leave(struct jobloop *jobloop)
702 {
703         struct thread *t;
704         int i;
705
706         pthread_mutex_lock(&mutex);
707         t = threads;
708         while (t && t != (struct thread*)jobloop)
709                 t = t->next;
710         if (!t) {
711                 errno = EINVAL;
712         } else {
713                 t->stop = 1;
714                 if (t->waits)
715                         pthread_cond_broadcast(&cond);
716                 else {
717                         i = (int)(sizeof evloop / sizeof *evloop);
718                         while(i) {
719                                 if (evloop[--i].holder == t) {
720                                         unlock_evloop(&evloop[i], 0);
721                                         break;
722                                 }
723                         }
724                 }
725         }
726         pthread_mutex_unlock(&mutex);
727         return -!t;
728 }
729
730 /**
731  * Calls synchronously the job represented by 'callback' and 'arg1'
732  * for the 'group' and the 'timeout' and waits for its completion.
733  * @param group    The group of the job or NULL when no group.
734  * @param timeout  The maximum execution time in seconds of the job
735  *                 or 0 for unlimited time.
736  * @param callback The function to execute for achieving the job.
737  *                 Its first parameter is either 0 on normal flow
738  *                 or the signal number that broke the normal flow.
739  *                 The remaining parameter is the parameter 'arg1'
740  *                 given here.
741  * @param arg      The second argument for 'callback'
742  * @return 0 in case of success or -1 in case of error
743  */
744 int jobs_call(
745                 const void *group,
746                 int timeout,
747                 void (*callback)(int, void*),
748                 void *arg)
749 {
750         struct sync sync;
751
752         sync.callback = callback;
753         sync.arg = arg;
754
755         return do_sync(group, timeout, call_cb, &sync);
756 }
757
758 /* temporary hack */
759 #if !defined(REMOVE_SYSTEMD_EVENT)
760 __attribute__((unused))
761 #endif
762 static void evloop_callback(void *arg, uint32_t event, struct fdev *fdev)
763 {
764         sig_monitor(0, evloop_run, arg);
765 }
766
767 /**
768  * Gets a sd_event item for the current thread.
769  * @return a sd_event or NULL in case of error
770  */
771 static struct sd_event *get_sd_event_locked()
772 {
773         struct evloop *el;
774         int rc;
775
776         /* creates the evloop on need */
777         el = &evloop[0];
778         if (!el->sdev) {
779                 /* start the creation */
780                 el->state = 0;
781                 /* creates the eventfd for waking up polls */
782                 el->efd = eventfd(0, EFD_CLOEXEC);
783                 if (el->efd < 0) {
784                         ERROR("can't make eventfd for events");
785                         goto error1;
786                 }
787                 /* create the systemd event loop */
788                 rc = sd_event_new(&el->sdev);
789                 if (rc < 0) {
790                         ERROR("can't make new event loop");
791                         goto error2;
792                 }
793                 /* put the eventfd in the event loop */
794                 rc = sd_event_add_io(el->sdev, NULL, el->efd, EPOLLIN, on_evloop_efd, el);
795                 if (rc < 0) {
796                         ERROR("can't register eventfd");
797 #if !defined(REMOVE_SYSTEMD_EVENT)
798                         sd_event_unref(el->sdev);
799                         el->sdev = NULL;
800 error2:
801                         close(el->efd);
802 error1:
803                         return NULL;
804                 }
805 #else
806                         goto error3;
807                 }
808                 /* handle the event loop */
809                 el->fdev = fdev_epoll_add(get_fdevepoll(), sd_event_get_fd(el->sdev));
810                 if (!el->fdev) {
811                         ERROR("can't create fdev");
812 error3:
813                         sd_event_unref(el->sdev);
814 error2:
815                         close(el->efd);
816 error1:
817                         memset(el, 0, sizeof *el);
818                         return NULL;
819                 }
820                 fdev_set_autoclose(el->fdev, 0);
821                 fdev_set_events(el->fdev, EPOLLIN);
822                 fdev_set_callback(el->fdev, evloop_callback, el);
823 #endif
824         }
825
826         /* attach the event loop to the current thread */
827         if (current_evloop != el) {
828                 if (current_evloop) {
829                         __atomic_and_fetch(&current_evloop->state, ~EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
830                         __atomic_store_n(&current_evloop->holder, NULL, __ATOMIC_RELAXED);
831                 }
832                 current_evloop = el;
833                 __atomic_or_fetch(&el->state, EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
834                 __atomic_store_n(&el->holder, current_thread, __ATOMIC_RELAXED);
835         }
836
837         /* wait for a modifiable event loop */
838         unlock_evloop(el, 1);
839
840         return el->sdev;
841 }
842
843 /**
844  * Gets a sd_event item for the current thread.
845  * @return a sd_event or NULL in case of error
846  */
847 struct sd_event *jobs_get_sd_event()
848 {
849         struct sd_event *result;
850
851         pthread_mutex_lock(&mutex);
852         result = get_sd_event_locked();
853         pthread_mutex_unlock(&mutex);
854
855         return result;
856 }
857
858 #if defined(REMOVE_SYSTEMD_EVENT)
859 /**
860  * Gets the fdev_epoll item.
861  * @return a fdev_epoll or NULL in case of error
862  */
863 struct fdev_epoll *jobs_get_fdev_epoll()
864 {
865         struct fdev_epoll *result;
866
867         pthread_mutex_lock(&mutex);
868         result = get_fdevepoll();
869         pthread_mutex_unlock(&mutex);
870
871         return result;
872 }
873 #endif
874
875 /**
876  * Enter the jobs processing loop.
877  * @param allowed_count Maximum count of thread for jobs including this one
878  * @param start_count   Count of thread to start now, must be lower.
879  * @param waiter_count  Maximum count of jobs that can be waiting.
880  * @param start         The start routine to activate (can't be NULL)
881  * @return 0 in case of success or -1 in case of error.
882  */
883 int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)(int signum, void* arg), void *arg)
884 {
885         int rc, launched;
886         struct thread me;
887         struct job *job;
888
889         assert(allowed_count >= 1);
890         assert(start_count >= 0);
891         assert(waiter_count > 0);
892         assert(start_count <= allowed_count);
893
894         rc = -1;
895         pthread_mutex_lock(&mutex);
896
897         /* check whether already running */
898         if (current_thread || allowed) {
899                 ERROR("thread already started");
900                 errno = EINVAL;
901                 goto error;
902         }
903
904         /* records the allowed count */
905         allowed = allowed_count;
906         started = 0;
907         running = 0;
908         remains = waiter_count;
909
910 #if HAS_WATCHDOG
911         /* set the watchdog */
912         if (sd_watchdog_enabled(0, NULL))
913                 sd_event_set_watchdog(get_sd_event_locked(), 1);
914 #endif
915
916         /* start at least one thread */
917         launched = 0;
918         while ((launched + 1) < start_count) {
919                 if (start_one_thread() != 0) {
920                         ERROR("Not all threads can be started");
921                         goto error;
922                 }
923                 launched++;
924         }
925
926         /* queue the start job */
927         job = job_create(NULL, 0, start, arg);
928         if (!job) {
929                 ERROR("out of memory");
930                 errno = ENOMEM;
931                 goto error;
932         }
933         job_add(job);
934         remains--;
935
936         /* run until end */
937         running++;
938         thread_run(&me);
939         running--;
940         rc = 0;
941 error:
942         pthread_mutex_unlock(&mutex);
943         return rc;
944 }
945
946 /**
947  * Terminate all the threads and cancel all pending jobs.
948  */
949 void jobs_terminate()
950 {
951         struct job *job, *head, *tail;
952         pthread_t me, *others;
953         struct thread *t;
954         int count;
955
956         /* how am i? */
957         me = pthread_self();
958
959         /* request all threads to stop */
960         pthread_mutex_lock(&mutex);
961         allowed = 0;
962
963         /* count the number of threads */
964         count = 0;
965         t = threads;
966         while (t) {
967                 if (!t->upper && !pthread_equal(t->tid, me))
968                         count++;
969                 t = t->next;
970         }
971
972         /* fill the array of threads */
973         others = alloca(count * sizeof *others);
974         count = 0;
975         t = threads;
976         while (t) {
977                 if (!t->upper && !pthread_equal(t->tid, me))
978                         others[count++] = t->tid;
979                 t = t->next;
980         }
981
982         /* stops the threads */
983         t = threads;
984         while (t) {
985                 t->stop = 1;
986                 t = t->next;
987         }
988
989         /* wait the threads */
990         pthread_cond_broadcast(&cond);
991         pthread_mutex_unlock(&mutex);
992         while (count)
993                 pthread_join(others[--count], NULL);
994         pthread_mutex_lock(&mutex);
995
996         /* cancel pending jobs of other threads */
997         remains = 0;
998         head = first_job;
999         first_job = NULL;
1000         tail = NULL;
1001         while (head) {
1002                 /* unlink the job */
1003                 job = head;
1004                 head = job->next;
1005
1006                 /* search if job is stacked for current */
1007                 t = current_thread;
1008                 while (t && t->job != job)
1009                         t = t->upper;
1010                 if (t) {
1011                         /* yes, relink it at end */
1012                         if (tail)
1013                                 tail->next = job;
1014                         else
1015                                 first_job = job;
1016                         tail = job;
1017                         job->next = NULL;
1018                 } else {
1019                         /* no cancel the job */
1020                         pthread_mutex_unlock(&mutex);
1021                         sig_monitor(0, job_cancel, job);
1022                         free(job);
1023                         pthread_mutex_lock(&mutex);
1024                 }
1025         }
1026         pthread_mutex_unlock(&mutex);
1027 }
1028