api-v3: First draft
[src/app-framework-binder.git] / src / jobs.c
1 /*
2  * Copyright (C) 2016, 2017, 2018 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #if defined(NO_JOBS_WATCHDOG)
21 #   define HAS_WATCHDOG 0
22 #else
23 #   define HAS_WATCHDOG 1
24 #endif
25
26 #include <stdlib.h>
27 #include <stdint.h>
28 #include <unistd.h>
29 #include <signal.h>
30 #include <string.h>
31 #include <time.h>
32 #include <sys/syscall.h>
33 #include <pthread.h>
34 #include <errno.h>
35 #include <assert.h>
36 #include <sys/eventfd.h>
37
38 #include <systemd/sd-event.h>
39 #include "fdev.h"
40 #if HAS_WATCHDOG
41 #include <systemd/sd-daemon.h>
42 #endif
43
44 #include "jobs.h"
45 #include "sig-monitor.h"
46 #include "verbose.h"
47 #include "fdev-epoll.h"
48 #if 0
49 #define _alert_ "do you really want to remove signal monitoring?"
50 #define sig_monitor_init_timeouts()  ((void)0)
51 #define sig_monitor_clean_timeouts() ((void)0)
52 #define sig_monitor(to,cb,arg)       (cb(0,arg))
53 #endif
54
55 #define EVENT_TIMEOUT_TOP       ((uint64_t)-1)
56 #define EVENT_TIMEOUT_CHILD     ((uint64_t)10000)
57
58 /** Internal shortcut for callback */
59 typedef void (*job_cb_t)(int, void*);
60
61 /** Description of a pending job */
62 struct job
63 {
64         struct job *next;    /**< link to the next job enqueued */
65         const void *group;   /**< group of the request */
66         job_cb_t callback;   /**< processing callback */
67         void *arg;           /**< argument */
68         int timeout;         /**< timeout in second for processing the request */
69         unsigned blocked: 1; /**< is an other request blocking this one ? */
70         unsigned dropped: 1; /**< is removed ? */
71 };
72
73 /** Description of handled event loops */
74 struct evloop
75 {
76         unsigned state;        /**< encoded state */
77         int efd;               /**< event notification */
78         struct sd_event *sdev; /**< the systemd event loop */
79         pthread_cond_t  cond;  /**< condition */
80         struct fdev *fdev;     /**< handling of events */
81 };
82
83 #define EVLOOP_STATE_WAIT           1U
84 #define EVLOOP_STATE_RUN            2U
85 #define EVLOOP_STATE_LOCK           4U
86
87 /** Description of threads */
88 struct thread
89 {
90         struct thread *next;   /**< next thread of the list */
91         struct thread *upper;  /**< upper same thread */
92         struct job *job;       /**< currently processed job */
93         pthread_t tid;         /**< the thread id */
94         volatile unsigned stop: 1;      /**< stop requested */
95         volatile unsigned waits: 1;     /**< is waiting? */
96 };
97
98 /**
99  * Description of synchonous callback
100  */
101 struct sync
102 {
103         struct thread thread;   /**< thread loop data */
104         union {
105                 void (*callback)(int, void*);   /**< the synchronous callback */
106                 void (*enter)(int signum, void *closure, struct jobloop *jobloop);
107                                 /**< the entering synchronous routine */
108         };
109         void *arg;              /**< the argument of the callback */
110 };
111
112
113 /* synchronisation of threads */
114 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
115 static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
116
117 /* count allowed, started and running threads */
118 static int allowed = 0; /** allowed count of threads */
119 static int started = 0; /** started count of threads */
120 static int running = 0; /** running count of threads */
121 static int remains = 0; /** allowed count of waiting jobs */
122
123 /* list of threads */
124 static struct thread *threads;
125 static _Thread_local struct thread *current_thread;
126 static _Thread_local struct evloop *current_evloop;
127
128 /* queue of pending jobs */
129 static struct job *first_job;
130 static struct job *free_jobs;
131
132 /* event loop */
133 static struct evloop evloop[1];
134 static struct fdev_epoll *fdevepoll;
135 #if !defined(REMOVE_SYSTEMD_EVENT)
136 __attribute__((unused))
137 #endif
138 static int waitevt;
139
140 /**
141  * Create a new job with the given parameters
142  * @param group    the group of the job
143  * @param timeout  the timeout of the job (0 if none)
144  * @param callback the function that achieves the job
145  * @param arg      the argument of the callback
146  * @return the created job unblock or NULL when no more memory
147  */
148 static struct job *job_create(
149                 const void *group,
150                 int timeout,
151                 job_cb_t callback,
152                 void *arg)
153 {
154         struct job *job;
155
156         /* try recyle existing job */
157         job = free_jobs;
158         if (job)
159                 free_jobs = job->next;
160         else {
161                 /* allocation without blocking */
162                 pthread_mutex_unlock(&mutex);
163                 job = malloc(sizeof *job);
164                 pthread_mutex_lock(&mutex);
165                 if (!job) {
166                         errno = -ENOMEM;
167                         goto end;
168                 }
169         }
170         /* initialises the job */
171         job->group = group;
172         job->timeout = timeout;
173         job->callback = callback;
174         job->arg = arg;
175         job->blocked = 0;
176         job->dropped = 0;
177 end:
178         return job;
179 }
180
181 /**
182  * Adds 'job' at the end of the list of jobs, marking it
183  * as blocked if an other job with the same group is pending.
184  * @param job the job to add
185  */
186 static void job_add(struct job *job)
187 {
188         const void *group;
189         struct job *ijob, **pjob;
190
191         /* prepare to add */
192         group = job->group;
193         job->next = NULL;
194
195         /* search end and blockers */
196         pjob = &first_job;
197         ijob = first_job;
198         while (ijob) {
199                 if (group && ijob->group == group)
200                         job->blocked = 1;
201                 pjob = &ijob->next;
202                 ijob = ijob->next;
203         }
204
205         /* queue the jobs */
206         *pjob = job;
207 }
208
209 /**
210  * Get the next job to process or NULL if none.
211  * @return the first job that isn't blocked or NULL
212  */
213 static inline struct job *job_get()
214 {
215         struct job *job = first_job;
216         while (job && job->blocked)
217                 job = job->next;
218         return job;
219 }
220
221 /**
222  * Releases the processed 'job': removes it
223  * from the list of jobs and unblock the first
224  * pending job of the same group if any.
225  * @param job the job to release
226  */
227 static inline void job_release(struct job *job)
228 {
229         struct job *ijob, **pjob;
230         const void *group;
231
232         /* first unqueue the job */
233         pjob = &first_job;
234         ijob = first_job;
235         while (ijob != job) {
236                 pjob = &ijob->next;
237                 ijob = ijob->next;
238         }
239         *pjob = job->next;
240
241         /* then unblock jobs of the same group */
242         group = job->group;
243         if (group) {
244                 ijob = job->next;
245                 while (ijob && ijob->group != group)
246                         ijob = ijob->next;
247                 if (ijob)
248                         ijob->blocked = 0;
249         }
250
251         /* recycle the job */
252         job->next = free_jobs;
253         free_jobs = job;
254 }
255
256 /**
257  * Monitored cancel callback for a job.
258  * This function is called by the monitor
259  * to cancel the job when the safe environment
260  * is set.
261  * @param signum 0 on normal flow or the number
262  *               of the signal that interrupted the normal
263  *               flow, isn't used
264  * @param arg    the job to run
265  */
266 static void job_cancel(int signum, void *arg)
267 {
268         struct job *job = arg;
269         job->callback(SIGABRT, job->arg);
270 }
271
272 /**
273  * Gets a fdev_epoll item.
274  * @return a fdev_epoll or NULL in case of error
275  */
276 static struct fdev_epoll *get_fdevepoll()
277 {
278         struct fdev_epoll *result;
279
280         result = fdevepoll;
281         if (!result)
282                 result = fdevepoll = fdev_epoll_create();
283
284         return result;
285 }
286
287 /**
288  * Monitored normal callback for events.
289  * This function is called by the monitor
290  * to run the event loop when the safe environment
291  * is set.
292  * @param signum 0 on normal flow or the number
293  *               of the signal that interrupted the normal
294  *               flow
295  * @param arg     the events to run
296  */
297 static void evloop_run(int signum, void *arg)
298 {
299         int rc;
300         struct sd_event *se;
301         struct evloop *el = arg;
302
303         if (!signum) {
304                 current_evloop = el;
305                 __atomic_store_n(&el->state, EVLOOP_STATE_LOCK|EVLOOP_STATE_RUN|EVLOOP_STATE_WAIT, __ATOMIC_RELAXED);
306                 se = el->sdev;
307                 rc = sd_event_prepare(se);
308                 if (rc < 0) {
309                         errno = -rc;
310                         ERROR("sd_event_prepare returned an error (state: %d): %m", sd_event_get_state(se));
311                 } else {
312                         if (rc == 0) {
313                                 rc = sd_event_wait(se, (uint64_t)(int64_t)-1);
314                                 if (rc < 0) {
315                                         errno = -rc;
316                                         ERROR("sd_event_wait returned an error (state: %d): %m", sd_event_get_state(se));
317                                 }
318                         }
319                         __atomic_and_fetch(&el->state, ~(EVLOOP_STATE_WAIT), __ATOMIC_RELAXED);
320
321                         if (rc > 0) {
322                                 rc = sd_event_dispatch(se);
323                                 if (rc < 0) {
324                                         errno = -rc;
325                                         ERROR("sd_event_dispatch returned an error (state: %d): %m", sd_event_get_state(se));
326                                 }
327                         }
328                 }
329         }
330         __atomic_and_fetch(&el->state, ~(EVLOOP_STATE_WAIT|EVLOOP_STATE_RUN), __ATOMIC_RELAXED);
331 }
332
333
334 /**
335  * Monitored normal loop for waiting events.
336  * @param signum 0 on normal flow or the number
337  *               of the signal that interrupted the normal
338  *               flow
339  * @param arg     the events to run
340  */
341 #if !defined(REMOVE_SYSTEMD_EVENT)
342 __attribute__((unused))
343 #endif
344 static void monitored_wait_and_dispatch(int signum, void *arg)
345 {
346         struct fdev_epoll *fdev_epoll = arg;
347         if (!signum) {
348                 fdev_epoll_wait_and_dispatch(fdev_epoll, -1);
349         }
350 }
351
352 /**
353  * Main processing loop of threads processing jobs.
354  * The loop must be called with the mutex locked
355  * and it returns with the mutex locked.
356  * @param me the description of the thread to use
357  * TODO: how are timeout handled when reentering?
358  */
359 static void thread_run(volatile struct thread *me)
360 {
361         struct thread **prv;
362         struct job *job;
363 #if !defined(REMOVE_SYSTEMD_EVENT)
364         struct evloop *el;
365 #endif
366
367         /* initialize description of itself and link it in the list */
368         me->tid = pthread_self();
369         me->stop = 0;
370         me->waits = 0;
371         me->upper = current_thread;
372         if (!current_thread) {
373                 started++;
374                 sig_monitor_init_timeouts();
375         }
376         me->next = threads;
377         threads = (struct thread*)me;
378         current_thread = (struct thread*)me;
379
380         /* loop until stopped */
381         while (!me->stop) {
382                 /* release the event loop */
383                 if (current_evloop) {
384                         __atomic_and_fetch(&current_evloop->state, ~EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
385                         current_evloop = NULL;
386                 }
387
388                 /* get a job */
389                 job = job_get();
390                 if (job) {
391                         /* prepare running the job */
392                         remains++; /* increases count of job that can wait */
393                         job->blocked = 1; /* mark job as blocked */
394                         me->job = job; /* record the job (only for terminate) */
395
396                         /* run the job */
397                         pthread_mutex_unlock(&mutex);
398                         sig_monitor(job->timeout, job->callback, job->arg);
399                         pthread_mutex_lock(&mutex);
400
401                         /* release the run job */
402                         job_release(job);
403 #if !defined(REMOVE_SYSTEMD_EVENT)
404                 } else {
405                         /* no job, check events */
406                         el = &evloop[0];
407                         if (el->sdev && !__atomic_load_n(&el->state, __ATOMIC_RELAXED)) {
408                                 /* run the events */
409                                 __atomic_store_n(&el->state, EVLOOP_STATE_LOCK|EVLOOP_STATE_RUN|EVLOOP_STATE_WAIT, __ATOMIC_RELAXED);
410                                 current_evloop = el;
411                                 pthread_mutex_unlock(&mutex);
412                                 sig_monitor(0, evloop_run, el);
413                                 pthread_mutex_lock(&mutex);
414                         } else {
415                                 /* no job and not events */
416                                 running--;
417                                 if (!running)
418                                         ERROR("Entering job deep sleep! Check your bindings.");
419                                 me->waits = 1;
420                                 pthread_cond_wait(&cond, &mutex);
421                                 me->waits = 0;
422                                 running++;
423                         }
424 #else
425                 } else if (waitevt) {
426                         /* no job and not events */
427                         running--;
428                         if (!running)
429                                 ERROR("Entering job deep sleep! Check your bindings.");
430                         me->waits = 1;
431                         pthread_cond_wait(&cond, &mutex);
432                         me->waits = 0;
433                         running++;
434                 } else {
435                         /* wait for events */
436                         waitevt = 1;
437                         pthread_mutex_unlock(&mutex);
438                         sig_monitor(0, monitored_wait_and_dispatch, get_fdevepoll());
439                         pthread_mutex_lock(&mutex);
440                         waitevt = 0;
441 #endif
442                 }
443         }
444
445         /* release the event loop */
446         if (current_evloop) {
447                 __atomic_and_fetch(&current_evloop->state, ~EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
448                 current_evloop = NULL;
449         }
450
451         /* unlink the current thread and cleanup */
452         prv = &threads;
453         while (*prv != me)
454                 prv = &(*prv)->next;
455         *prv = me->next;
456         current_thread = me->upper;
457         if (!current_thread) {
458                 sig_monitor_clean_timeouts();
459                 started--;
460         }
461 }
462
463 /**
464  * Entry point for created threads.
465  * @param data not used
466  * @return NULL
467  */
468 static void *thread_main(void *data)
469 {
470         struct thread me;
471
472         pthread_mutex_lock(&mutex);
473         running++;
474         thread_run(&me);
475         running--;
476         pthread_mutex_unlock(&mutex);
477         return NULL;
478 }
479
480 /**
481  * Starts a new thread
482  * @return 0 in case of success or -1 in case of error
483  */
484 static int start_one_thread()
485 {
486         pthread_t tid;
487         int rc;
488
489         rc = pthread_create(&tid, NULL, thread_main, NULL);
490         if (rc != 0) {
491                 /* errno = rc; */
492                 WARNING("not able to start thread: %m");
493                 rc = -1;
494         }
495         return rc;
496 }
497
498 /**
499  * Queues a new asynchronous job represented by 'callback' and 'arg'
500  * for the 'group' and the 'timeout'.
501  * Jobs are queued FIFO and are possibly executed in parallel
502  * concurrently except for job of the same group that are
503  * executed sequentially in FIFO order.
504  * @param group    The group of the job or NULL when no group.
505  * @param timeout  The maximum execution time in seconds of the job
506  *                 or 0 for unlimited time.
507  * @param callback The function to execute for achieving the job.
508  *                 Its first parameter is either 0 on normal flow
509  *                 or the signal number that broke the normal flow.
510  *                 The remaining parameter is the parameter 'arg1'
511  *                 given here.
512  * @param arg      The second argument for 'callback'
513  * @return 0 in case of success or -1 in case of error
514  */
515 int jobs_queue(
516                 const void *group,
517                 int timeout,
518                 void (*callback)(int, void*),
519                 void *arg)
520 {
521         const char *info;
522         struct job *job;
523         int rc;
524
525         pthread_mutex_lock(&mutex);
526
527         /* allocates the job */
528         job = job_create(group, timeout, callback, arg);
529         if (!job) {
530                 errno = ENOMEM;
531                 info = "out of memory";
532                 goto error;
533         }
534
535         /* check availability */
536         if (remains == 0) {
537                 errno = EBUSY;
538                 info = "too many jobs";
539                 goto error2;
540         }
541
542         /* start a thread if needed */
543         if (running == started && started < allowed) {
544                 /* all threads are busy and a new can be started */
545                 rc = start_one_thread();
546                 if (rc < 0 && started == 0) {
547                         info = "can't start first thread";
548                         goto error2;
549                 }
550         }
551
552         /* queues the job */
553         remains--;
554         job_add(job);
555
556         /* signal an existing job */
557         pthread_cond_signal(&cond);
558         pthread_mutex_unlock(&mutex);
559         return 0;
560
561 error2:
562         job->next = free_jobs;
563         free_jobs = job;
564 error:
565         ERROR("can't process job with threads: %s, %m", info);
566         pthread_mutex_unlock(&mutex);
567         return -1;
568 }
569
570 /**
571  * Internal helper function for 'jobs_enter'.
572  * @see jobs_enter, jobs_leave
573  */
574 static void enter_cb(int signum, void *closure)
575 {
576         struct sync *sync = closure;
577         sync->enter(signum, sync->arg, (void*)&sync->thread);
578 }
579
580 /**
581  * Internal helper function for 'jobs_call'.
582  * @see jobs_call
583  */
584 static void call_cb(int signum, void *closure)
585 {
586         struct sync *sync = closure;
587         sync->callback(signum, sync->arg);
588         jobs_leave((void*)&sync->thread);
589 }
590
591 /**
592  * Internal helper for synchronous jobs. It enters
593  * a new thread loop for evaluating the given job
594  * as recorded by the couple 'sync_cb' and 'sync'.
595  * @see jobs_call, jobs_enter, jobs_leave
596  */
597 static int do_sync(
598                 const void *group,
599                 int timeout,
600                 void (*sync_cb)(int signum, void *closure),
601                 struct sync *sync
602 )
603 {
604         struct job *job;
605
606         pthread_mutex_lock(&mutex);
607
608         /* allocates the job */
609         job = job_create(group, timeout, sync_cb, sync);
610         if (!job) {
611                 ERROR("out of memory");
612                 errno = ENOMEM;
613                 pthread_mutex_unlock(&mutex);
614                 return -1;
615         }
616
617         /* queues the job */
618         job_add(job);
619
620         /* run until stopped */
621         thread_run(&sync->thread);
622         pthread_mutex_unlock(&mutex);
623         return 0;
624 }
625
626 /**
627  * Enter a synchronisation point: activates the job given by 'callback'
628  * and 'closure' using 'group' and 'timeout' to control sequencing and
629  * execution time.
630  * @param group the group for sequencing jobs
631  * @param timeout the time in seconds allocated to the job
632  * @param callback the callback that will handle the job.
633  *                 it receives 3 parameters: 'signum' that will be 0
634  *                 on normal flow or the catched signal number in case
635  *                 of interrupted flow, the context 'closure' as given and
636  *                 a 'jobloop' reference that must be used when the job is
637  *                 terminated to unlock the current execution flow.
638  * @param closure the argument to the callback
639  * @return 0 on success or -1 in case of error
640  */
641 int jobs_enter(
642                 const void *group,
643                 int timeout,
644                 void (*callback)(int signum, void *closure, struct jobloop *jobloop),
645                 void *closure
646 )
647 {
648         struct sync sync;
649
650         sync.enter = callback;
651         sync.arg = closure;
652         return do_sync(group, timeout, enter_cb, &sync);
653 }
654
655 /**
656  * Unlocks the execution flow designed by 'jobloop'.
657  * @param jobloop indication of the flow to unlock
658  * @return 0 in case of success of -1 on error
659  */
660 int jobs_leave(struct jobloop *jobloop)
661 {
662         struct thread *t;
663
664         pthread_mutex_lock(&mutex);
665         t = threads;
666         while (t && t != (struct thread*)jobloop)
667                 t = t->next;
668         if (!t) {
669                 errno = EINVAL;
670         } else {
671                 t->stop = 1;
672                 if (t->waits)
673                         pthread_cond_broadcast(&cond);
674         }
675         pthread_mutex_unlock(&mutex);
676         return -!t;
677 }
678
679 /**
680  * Calls synchronously the job represented by 'callback' and 'arg1'
681  * for the 'group' and the 'timeout' and waits for its completion.
682  * @param group    The group of the job or NULL when no group.
683  * @param timeout  The maximum execution time in seconds of the job
684  *                 or 0 for unlimited time.
685  * @param callback The function to execute for achieving the job.
686  *                 Its first parameter is either 0 on normal flow
687  *                 or the signal number that broke the normal flow.
688  *                 The remaining parameter is the parameter 'arg1'
689  *                 given here.
690  * @param arg      The second argument for 'callback'
691  * @return 0 in case of success or -1 in case of error
692  */
693 int jobs_call(
694                 const void *group,
695                 int timeout,
696                 void (*callback)(int, void*),
697                 void *arg)
698 {
699         struct sync sync;
700
701         sync.callback = callback;
702         sync.arg = arg;
703
704         return do_sync(group, timeout, call_cb, &sync);
705 }
706
707 /**
708  * Internal callback for evloop management.
709  * The effect of this function is hidden: it exits
710  * the waiting poll if any. Then it wakes up a thread
711  * awaiting the evloop using signal.
712  */
713 static int on_evloop_efd(sd_event_source *s, int fd, uint32_t revents, void *userdata)
714 {
715         uint64_t x;
716         struct evloop *evloop = userdata;
717         read(evloop->efd, &x, sizeof x);
718         pthread_mutex_lock(&mutex);
719         pthread_cond_broadcast(&evloop->cond);
720         pthread_mutex_unlock(&mutex);
721         return 1;
722 }
723
724 /* temporary hack */
725 #if !defined(REMOVE_SYSTEMD_EVENT)
726 __attribute__((unused))
727 #endif
728 static void evloop_callback(void *arg, uint32_t event, struct fdev *fdev)
729 {
730         sig_monitor(0, evloop_run, arg);
731 }
732
733 /**
734  * Gets a sd_event item for the current thread.
735  * @return a sd_event or NULL in case of error
736  */
737 static struct sd_event *get_sd_event_locked()
738 {
739         struct evloop *el;
740         uint64_t x;
741         int rc;
742
743         /* creates the evloop on need */
744         el = &evloop[0];
745         if (!el->sdev) {
746                 /* start the creation */
747                 el->state = 0;
748                 /* creates the eventfd for waking up polls */
749                 el->efd = eventfd(0, EFD_CLOEXEC);
750                 if (el->efd < 0) {
751                         ERROR("can't make eventfd for events");
752                         goto error1;
753                 }
754                 /* create the systemd event loop */
755                 rc = sd_event_new(&el->sdev);
756                 if (rc < 0) {
757                         ERROR("can't make new event loop");
758                         goto error2;
759                 }
760                 /* put the eventfd in the event loop */
761                 rc = sd_event_add_io(el->sdev, NULL, el->efd, EPOLLIN, on_evloop_efd, el);
762                 if (rc < 0) {
763                         ERROR("can't register eventfd");
764 #if !defined(REMOVE_SYSTEMD_EVENT)
765                         sd_event_unref(el->sdev);
766                         el->sdev = NULL;
767 error2:
768                         close(el->efd);
769 error1:
770                         return NULL;
771                 }
772 #else
773                         goto error3;
774                 }
775                 /* handle the event loop */
776                 el->fdev = fdev_epoll_add(get_fdevepoll(), sd_event_get_fd(el->sdev));
777                 if (!el->fdev) {
778                         ERROR("can't create fdev");
779 error3:
780                         sd_event_unref(el->sdev);
781 error2:
782                         close(el->efd);
783 error1:
784                         memset(el, 0, sizeof *el);
785                         return NULL;
786                 }
787                 fdev_set_autoclose(el->fdev, 0);
788                 fdev_set_events(el->fdev, EPOLLIN);
789                 fdev_set_callback(el->fdev, evloop_callback, el);
790 #endif
791         }
792
793         /* attach the event loop to the current thread */
794         if (current_evloop != el) {
795                 if (current_evloop)
796                         __atomic_and_fetch(&current_evloop->state, ~EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
797                 current_evloop = el;
798                 __atomic_or_fetch(&el->state, EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
799         }
800
801         /* wait for a modifiable event loop */
802         while (__atomic_load_n(&el->state, __ATOMIC_RELAXED) & EVLOOP_STATE_WAIT) {
803                 x = 1;
804                 write(el->efd, &x, sizeof x);
805                 pthread_cond_wait(&el->cond, &mutex);
806         }
807
808         return el->sdev;
809 }
810
811 /**
812  * Gets a sd_event item for the current thread.
813  * @return a sd_event or NULL in case of error
814  */
815 struct sd_event *jobs_get_sd_event()
816 {
817         struct sd_event *result;
818
819         pthread_mutex_lock(&mutex);
820         result = get_sd_event_locked();
821         pthread_mutex_unlock(&mutex);
822
823         return result;
824 }
825
826 /**
827  * Gets the fdev_epoll item.
828  * @return a fdev_epoll or NULL in case of error
829  */
830 struct fdev_epoll *jobs_get_fdev_epoll()
831 {
832         struct fdev_epoll *result;
833
834         pthread_mutex_lock(&mutex);
835         result = get_fdevepoll();
836         pthread_mutex_unlock(&mutex);
837
838         return result;
839 }
840
841 /**
842  * Enter the jobs processing loop.
843  * @param allowed_count Maximum count of thread for jobs including this one
844  * @param start_count   Count of thread to start now, must be lower.
845  * @param waiter_count  Maximum count of jobs that can be waiting.
846  * @param start         The start routine to activate (can't be NULL)
847  * @return 0 in case of success or -1 in case of error.
848  */
849 int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)(int signum, void* arg), void *arg)
850 {
851         int rc, launched;
852         struct thread me;
853         struct job *job;
854
855         assert(allowed_count >= 1);
856         assert(start_count >= 0);
857         assert(waiter_count > 0);
858         assert(start_count <= allowed_count);
859
860         rc = -1;
861         pthread_mutex_lock(&mutex);
862
863         /* check whether already running */
864         if (current_thread || allowed) {
865                 ERROR("thread already started");
866                 errno = EINVAL;
867                 goto error;
868         }
869
870         /* start */
871         if (sig_monitor_init() < 0) {
872                 ERROR("failed to initialise signal handlers");
873                 goto error;
874         }
875
876         /* records the allowed count */
877         allowed = allowed_count;
878         started = 0;
879         running = 0;
880         remains = waiter_count;
881
882 #if HAS_WATCHDOG
883         /* set the watchdog */
884         if (sd_watchdog_enabled(0, NULL))
885                 sd_event_set_watchdog(get_sd_event_locked(), 1);
886 #endif
887
888         /* start at least one thread */
889         launched = 0;
890         while ((launched + 1) < start_count) {
891                 if (start_one_thread() != 0) {
892                         ERROR("Not all threads can be started");
893                         goto error;
894                 }
895                 launched++;
896         }
897
898         /* queue the start job */
899         job = job_create(NULL, 0, start, arg);
900         if (!job) {
901                 ERROR("out of memory");
902                 errno = ENOMEM;
903                 goto error;
904         }
905         job_add(job);
906         remains--;
907
908         /* run until end */
909         thread_run(&me);
910         rc = 0;
911 error:
912         pthread_mutex_unlock(&mutex);
913         return rc;
914 }
915
916 /**
917  * Terminate all the threads and cancel all pending jobs.
918  */
919 void jobs_terminate()
920 {
921         struct job *job, *head, *tail;
922         pthread_t me, *others;
923         struct thread *t;
924         int count;
925
926         /* how am i? */
927         me = pthread_self();
928
929         /* request all threads to stop */
930         pthread_mutex_lock(&mutex);
931         allowed = 0;
932
933         /* count the number of threads */
934         count = 0;
935         t = threads;
936         while (t) {
937                 if (!t->upper && !pthread_equal(t->tid, me))
938                         count++;
939                 t = t->next;
940         }
941
942         /* fill the array of threads */
943         others = alloca(count * sizeof *others);
944         count = 0;
945         t = threads;
946         while (t) {
947                 if (!t->upper && !pthread_equal(t->tid, me))
948                         others[count++] = t->tid;
949                 t = t->next;
950         }
951
952         /* stops the threads */
953         t = threads;
954         while (t) {
955                 t->stop = 1;
956                 t = t->next;
957         }
958
959         /* wait the threads */
960         pthread_cond_broadcast(&cond);
961         pthread_mutex_unlock(&mutex);
962         while (count)
963                 pthread_join(others[--count], NULL);
964         pthread_mutex_lock(&mutex);
965
966         /* cancel pending jobs of other threads */
967         remains = 0;
968         head = first_job;
969         first_job = NULL;
970         tail = NULL;
971         while (head) {
972                 /* unlink the job */
973                 job = head;
974                 head = job->next;
975
976                 /* search if job is stacked for current */
977                 t = current_thread;
978                 while (t && t->job != job)
979                         t = t->upper;
980                 if (t) {
981                         /* yes, relink it at end */
982                         if (tail)
983                                 tail->next = job;
984                         else
985                                 first_job = job;
986                         tail = job;
987                         job->next = NULL;
988                 } else {
989                         /* no cancel the job */
990                         pthread_mutex_unlock(&mutex);
991                         sig_monitor(0, job_cancel, job);
992                         free(job);
993                         pthread_mutex_lock(&mutex);
994                 }
995         }
996         pthread_mutex_unlock(&mutex);
997 }
998