Tune dependency to fdev-epoll
[src/app-framework-binder.git] / src / jobs.c
1 /*
2  * Copyright (C) 2016, 2017, 2018 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #if defined(NO_JOBS_WATCHDOG)
21 #   define HAS_WATCHDOG 0
22 #else
23 #   define HAS_WATCHDOG 1
24 #endif
25
26 #include <stdlib.h>
27 #include <stdint.h>
28 #include <unistd.h>
29 #include <signal.h>
30 #include <string.h>
31 #include <time.h>
32 #include <sys/syscall.h>
33 #include <pthread.h>
34 #include <errno.h>
35 #include <assert.h>
36 #include <sys/eventfd.h>
37
38 #include <systemd/sd-event.h>
39 #include "fdev.h"
40 #if HAS_WATCHDOG
41 #include <systemd/sd-daemon.h>
42 #endif
43
44 #include "jobs.h"
45 #include "sig-monitor.h"
46 #include "verbose.h"
47
48 #if defined(REMOVE_SYSTEMD_EVENT)
49 #include "fdev-epoll.h"
50 #endif
51
52 #if 0
53 #define _alert_ "do you really want to remove signal monitoring?"
54 #define sig_monitor_init_timeouts()  ((void)0)
55 #define sig_monitor_clean_timeouts() ((void)0)
56 #define sig_monitor(to,cb,arg)       (cb(0,arg))
57 #endif
58
59 #define EVENT_TIMEOUT_TOP       ((uint64_t)-1)
60 #define EVENT_TIMEOUT_CHILD     ((uint64_t)10000)
61
62 /** Internal shortcut for callback */
63 typedef void (*job_cb_t)(int, void*);
64
65 /** Description of a pending job */
66 struct job
67 {
68         struct job *next;    /**< link to the next job enqueued */
69         const void *group;   /**< group of the request */
70         job_cb_t callback;   /**< processing callback */
71         void *arg;           /**< argument */
72         int timeout;         /**< timeout in second for processing the request */
73         unsigned blocked: 1; /**< is an other request blocking this one ? */
74         unsigned dropped: 1; /**< is removed ? */
75 };
76
77 /** Description of handled event loops */
78 struct evloop
79 {
80         unsigned state;        /**< encoded state */
81         int efd;               /**< event notification */
82         struct sd_event *sdev; /**< the systemd event loop */
83         pthread_cond_t  cond;  /**< condition */
84         struct fdev *fdev;     /**< handling of events */
85 };
86
87 #define EVLOOP_STATE_WAIT           1U
88 #define EVLOOP_STATE_RUN            2U
89 #define EVLOOP_STATE_LOCK           4U
90
91 /** Description of threads */
92 struct thread
93 {
94         struct thread *next;   /**< next thread of the list */
95         struct thread *upper;  /**< upper same thread */
96         struct job *job;       /**< currently processed job */
97         pthread_t tid;         /**< the thread id */
98         volatile unsigned stop: 1;      /**< stop requested */
99         volatile unsigned waits: 1;     /**< is waiting? */
100 };
101
102 /**
103  * Description of synchonous callback
104  */
105 struct sync
106 {
107         struct thread thread;   /**< thread loop data */
108         union {
109                 void (*callback)(int, void*);   /**< the synchronous callback */
110                 void (*enter)(int signum, void *closure, struct jobloop *jobloop);
111                                 /**< the entering synchronous routine */
112         };
113         void *arg;              /**< the argument of the callback */
114 };
115
116
117 /* synchronisation of threads */
118 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
119 static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
120
121 /* count allowed, started and running threads */
122 static int allowed = 0; /** allowed count of threads */
123 static int started = 0; /** started count of threads */
124 static int running = 0; /** running count of threads */
125 static int remains = 0; /** allowed count of waiting jobs */
126
127 /* list of threads */
128 static struct thread *threads;
129 static _Thread_local struct thread *current_thread;
130 static _Thread_local struct evloop *current_evloop;
131
132 /* queue of pending jobs */
133 static struct job *first_job;
134 static struct job *free_jobs;
135
136 /* event loop */
137 static struct evloop evloop[1];
138
139 #if defined(REMOVE_SYSTEMD_EVENT)
140 static struct fdev_epoll *fdevepoll;
141 static int waitevt;
142 #endif
143
144 /**
145  * Create a new job with the given parameters
146  * @param group    the group of the job
147  * @param timeout  the timeout of the job (0 if none)
148  * @param callback the function that achieves the job
149  * @param arg      the argument of the callback
150  * @return the created job unblock or NULL when no more memory
151  */
152 static struct job *job_create(
153                 const void *group,
154                 int timeout,
155                 job_cb_t callback,
156                 void *arg)
157 {
158         struct job *job;
159
160         /* try recyle existing job */
161         job = free_jobs;
162         if (job)
163                 free_jobs = job->next;
164         else {
165                 /* allocation without blocking */
166                 pthread_mutex_unlock(&mutex);
167                 job = malloc(sizeof *job);
168                 pthread_mutex_lock(&mutex);
169                 if (!job) {
170                         errno = -ENOMEM;
171                         goto end;
172                 }
173         }
174         /* initialises the job */
175         job->group = group;
176         job->timeout = timeout;
177         job->callback = callback;
178         job->arg = arg;
179         job->blocked = 0;
180         job->dropped = 0;
181 end:
182         return job;
183 }
184
185 /**
186  * Adds 'job' at the end of the list of jobs, marking it
187  * as blocked if an other job with the same group is pending.
188  * @param job the job to add
189  */
190 static void job_add(struct job *job)
191 {
192         const void *group;
193         struct job *ijob, **pjob;
194
195         /* prepare to add */
196         group = job->group;
197         job->next = NULL;
198
199         /* search end and blockers */
200         pjob = &first_job;
201         ijob = first_job;
202         while (ijob) {
203                 if (group && ijob->group == group)
204                         job->blocked = 1;
205                 pjob = &ijob->next;
206                 ijob = ijob->next;
207         }
208
209         /* queue the jobs */
210         *pjob = job;
211 }
212
213 /**
214  * Get the next job to process or NULL if none.
215  * @return the first job that isn't blocked or NULL
216  */
217 static inline struct job *job_get()
218 {
219         struct job *job = first_job;
220         while (job && job->blocked)
221                 job = job->next;
222         return job;
223 }
224
225 /**
226  * Releases the processed 'job': removes it
227  * from the list of jobs and unblock the first
228  * pending job of the same group if any.
229  * @param job the job to release
230  */
231 static inline void job_release(struct job *job)
232 {
233         struct job *ijob, **pjob;
234         const void *group;
235
236         /* first unqueue the job */
237         pjob = &first_job;
238         ijob = first_job;
239         while (ijob != job) {
240                 pjob = &ijob->next;
241                 ijob = ijob->next;
242         }
243         *pjob = job->next;
244
245         /* then unblock jobs of the same group */
246         group = job->group;
247         if (group) {
248                 ijob = job->next;
249                 while (ijob && ijob->group != group)
250                         ijob = ijob->next;
251                 if (ijob)
252                         ijob->blocked = 0;
253         }
254
255         /* recycle the job */
256         job->next = free_jobs;
257         free_jobs = job;
258 }
259
260 /**
261  * Monitored cancel callback for a job.
262  * This function is called by the monitor
263  * to cancel the job when the safe environment
264  * is set.
265  * @param signum 0 on normal flow or the number
266  *               of the signal that interrupted the normal
267  *               flow, isn't used
268  * @param arg    the job to run
269  */
270 static void job_cancel(int signum, void *arg)
271 {
272         struct job *job = arg;
273         job->callback(SIGABRT, job->arg);
274 }
275
276 #if defined(REMOVE_SYSTEMD_EVENT)
277 /**
278  * Gets a fdev_epoll item.
279  * @return a fdev_epoll or NULL in case of error
280  */
281 static struct fdev_epoll *get_fdevepoll()
282 {
283         struct fdev_epoll *result;
284
285         result = fdevepoll;
286         if (!result)
287                 result = fdevepoll = fdev_epoll_create();
288
289         return result;
290 }
291 #endif
292
293 /**
294  * Monitored normal callback for events.
295  * This function is called by the monitor
296  * to run the event loop when the safe environment
297  * is set.
298  * @param signum 0 on normal flow or the number
299  *               of the signal that interrupted the normal
300  *               flow
301  * @param arg     the events to run
302  */
303 static void evloop_run(int signum, void *arg)
304 {
305         int rc;
306         struct sd_event *se;
307         struct evloop *el = arg;
308
309         if (!signum) {
310                 current_evloop = el;
311                 __atomic_store_n(&el->state, EVLOOP_STATE_LOCK|EVLOOP_STATE_RUN|EVLOOP_STATE_WAIT, __ATOMIC_RELAXED);
312                 se = el->sdev;
313                 rc = sd_event_prepare(se);
314                 if (rc < 0) {
315                         errno = -rc;
316                         ERROR("sd_event_prepare returned an error (state: %d): %m", sd_event_get_state(se));
317                 } else {
318                         if (rc == 0) {
319                                 rc = sd_event_wait(se, (uint64_t)(int64_t)-1);
320                                 if (rc < 0) {
321                                         errno = -rc;
322                                         ERROR("sd_event_wait returned an error (state: %d): %m", sd_event_get_state(se));
323                                 }
324                         }
325                         __atomic_and_fetch(&el->state, ~(EVLOOP_STATE_WAIT), __ATOMIC_RELAXED);
326
327                         if (rc > 0) {
328                                 rc = sd_event_dispatch(se);
329                                 if (rc < 0) {
330                                         errno = -rc;
331                                         ERROR("sd_event_dispatch returned an error (state: %d): %m", sd_event_get_state(se));
332                                 }
333                         }
334                 }
335         }
336         __atomic_and_fetch(&el->state, ~(EVLOOP_STATE_WAIT|EVLOOP_STATE_RUN), __ATOMIC_RELAXED);
337 }
338
339
340 #if defined(REMOVE_SYSTEMD_EVENT)
341 /**
342  * Monitored normal loop for waiting events.
343  * @param signum 0 on normal flow or the number
344  *               of the signal that interrupted the normal
345  *               flow
346  * @param arg     the events to run
347  */
348 static void monitored_wait_and_dispatch(int signum, void *arg)
349 {
350         struct fdev_epoll *fdev_epoll = arg;
351         if (!signum) {
352                 fdev_epoll_wait_and_dispatch(fdev_epoll, -1);
353         }
354 }
355 #endif
356
357 /**
358  * Main processing loop of threads processing jobs.
359  * The loop must be called with the mutex locked
360  * and it returns with the mutex locked.
361  * @param me the description of the thread to use
362  * TODO: how are timeout handled when reentering?
363  */
364 static void thread_run(volatile struct thread *me)
365 {
366         struct thread **prv;
367         struct job *job;
368 #if !defined(REMOVE_SYSTEMD_EVENT)
369         struct evloop *el;
370 #endif
371
372         /* initialize description of itself and link it in the list */
373         me->tid = pthread_self();
374         me->stop = 0;
375         me->waits = 0;
376         me->upper = current_thread;
377         if (!current_thread) {
378                 started++;
379                 sig_monitor_init_timeouts();
380         }
381         me->next = threads;
382         threads = (struct thread*)me;
383         current_thread = (struct thread*)me;
384
385         /* loop until stopped */
386         while (!me->stop) {
387                 /* release the event loop */
388                 if (current_evloop) {
389                         __atomic_and_fetch(&current_evloop->state, ~EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
390                         current_evloop = NULL;
391                 }
392
393                 /* get a job */
394                 job = job_get();
395                 if (job) {
396                         /* prepare running the job */
397                         remains++; /* increases count of job that can wait */
398                         job->blocked = 1; /* mark job as blocked */
399                         me->job = job; /* record the job (only for terminate) */
400
401                         /* run the job */
402                         pthread_mutex_unlock(&mutex);
403                         sig_monitor(job->timeout, job->callback, job->arg);
404                         pthread_mutex_lock(&mutex);
405
406                         /* release the run job */
407                         job_release(job);
408 #if !defined(REMOVE_SYSTEMD_EVENT)
409                 } else {
410                         /* no job, check events */
411                         el = &evloop[0];
412                         if (el->sdev && !__atomic_load_n(&el->state, __ATOMIC_RELAXED)) {
413                                 /* run the events */
414                                 __atomic_store_n(&el->state, EVLOOP_STATE_LOCK|EVLOOP_STATE_RUN|EVLOOP_STATE_WAIT, __ATOMIC_RELAXED);
415                                 current_evloop = el;
416                                 pthread_mutex_unlock(&mutex);
417                                 sig_monitor(0, evloop_run, el);
418                                 pthread_mutex_lock(&mutex);
419                         } else {
420                                 /* no job and not events */
421                                 running--;
422                                 if (!running)
423                                         ERROR("Entering job deep sleep! Check your bindings.");
424                                 me->waits = 1;
425                                 pthread_cond_wait(&cond, &mutex);
426                                 me->waits = 0;
427                                 running++;
428                         }
429 #else
430                 } else if (waitevt) {
431                         /* no job and not events */
432                         running--;
433                         if (!running)
434                                 ERROR("Entering job deep sleep! Check your bindings.");
435                         me->waits = 1;
436                         pthread_cond_wait(&cond, &mutex);
437                         me->waits = 0;
438                         running++;
439                 } else {
440                         /* wait for events */
441                         waitevt = 1;
442                         pthread_mutex_unlock(&mutex);
443                         sig_monitor(0, monitored_wait_and_dispatch, get_fdevepoll());
444                         pthread_mutex_lock(&mutex);
445                         waitevt = 0;
446 #endif
447                 }
448         }
449
450         /* release the event loop */
451         if (current_evloop) {
452                 __atomic_and_fetch(&current_evloop->state, ~EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
453                 current_evloop = NULL;
454         }
455
456         /* unlink the current thread and cleanup */
457         prv = &threads;
458         while (*prv != me)
459                 prv = &(*prv)->next;
460         *prv = me->next;
461         current_thread = me->upper;
462         if (!current_thread) {
463                 sig_monitor_clean_timeouts();
464                 started--;
465         }
466 }
467
468 /**
469  * Entry point for created threads.
470  * @param data not used
471  * @return NULL
472  */
473 static void *thread_main(void *data)
474 {
475         struct thread me;
476
477         pthread_mutex_lock(&mutex);
478         running++;
479         thread_run(&me);
480         running--;
481         pthread_mutex_unlock(&mutex);
482         return NULL;
483 }
484
485 /**
486  * Starts a new thread
487  * @return 0 in case of success or -1 in case of error
488  */
489 static int start_one_thread()
490 {
491         pthread_t tid;
492         int rc;
493
494         rc = pthread_create(&tid, NULL, thread_main, NULL);
495         if (rc != 0) {
496                 /* errno = rc; */
497                 WARNING("not able to start thread: %m");
498                 rc = -1;
499         }
500         return rc;
501 }
502
503 /**
504  * Queues a new asynchronous job represented by 'callback' and 'arg'
505  * for the 'group' and the 'timeout'.
506  * Jobs are queued FIFO and are possibly executed in parallel
507  * concurrently except for job of the same group that are
508  * executed sequentially in FIFO order.
509  * @param group    The group of the job or NULL when no group.
510  * @param timeout  The maximum execution time in seconds of the job
511  *                 or 0 for unlimited time.
512  * @param callback The function to execute for achieving the job.
513  *                 Its first parameter is either 0 on normal flow
514  *                 or the signal number that broke the normal flow.
515  *                 The remaining parameter is the parameter 'arg1'
516  *                 given here.
517  * @param arg      The second argument for 'callback'
518  * @return 0 in case of success or -1 in case of error
519  */
520 int jobs_queue(
521                 const void *group,
522                 int timeout,
523                 void (*callback)(int, void*),
524                 void *arg)
525 {
526         const char *info;
527         struct job *job;
528         int rc;
529
530         pthread_mutex_lock(&mutex);
531
532         /* allocates the job */
533         job = job_create(group, timeout, callback, arg);
534         if (!job) {
535                 errno = ENOMEM;
536                 info = "out of memory";
537                 goto error;
538         }
539
540         /* check availability */
541         if (remains == 0) {
542                 errno = EBUSY;
543                 info = "too many jobs";
544                 goto error2;
545         }
546
547         /* start a thread if needed */
548         if (running == started && started < allowed) {
549                 /* all threads are busy and a new can be started */
550                 rc = start_one_thread();
551                 if (rc < 0 && started == 0) {
552                         info = "can't start first thread";
553                         goto error2;
554                 }
555         }
556
557         /* queues the job */
558         remains--;
559         job_add(job);
560
561         /* signal an existing job */
562         pthread_cond_signal(&cond);
563         pthread_mutex_unlock(&mutex);
564         return 0;
565
566 error2:
567         job->next = free_jobs;
568         free_jobs = job;
569 error:
570         ERROR("can't process job with threads: %s, %m", info);
571         pthread_mutex_unlock(&mutex);
572         return -1;
573 }
574
575 /**
576  * Internal helper function for 'jobs_enter'.
577  * @see jobs_enter, jobs_leave
578  */
579 static void enter_cb(int signum, void *closure)
580 {
581         struct sync *sync = closure;
582         sync->enter(signum, sync->arg, (void*)&sync->thread);
583 }
584
585 /**
586  * Internal helper function for 'jobs_call'.
587  * @see jobs_call
588  */
589 static void call_cb(int signum, void *closure)
590 {
591         struct sync *sync = closure;
592         sync->callback(signum, sync->arg);
593         jobs_leave((void*)&sync->thread);
594 }
595
596 /**
597  * Internal helper for synchronous jobs. It enters
598  * a new thread loop for evaluating the given job
599  * as recorded by the couple 'sync_cb' and 'sync'.
600  * @see jobs_call, jobs_enter, jobs_leave
601  */
602 static int do_sync(
603                 const void *group,
604                 int timeout,
605                 void (*sync_cb)(int signum, void *closure),
606                 struct sync *sync
607 )
608 {
609         struct job *job;
610
611         pthread_mutex_lock(&mutex);
612
613         /* allocates the job */
614         job = job_create(group, timeout, sync_cb, sync);
615         if (!job) {
616                 ERROR("out of memory");
617                 errno = ENOMEM;
618                 pthread_mutex_unlock(&mutex);
619                 return -1;
620         }
621
622         /* queues the job */
623         job_add(job);
624
625         /* run until stopped */
626         thread_run(&sync->thread);
627         pthread_mutex_unlock(&mutex);
628         return 0;
629 }
630
631 /**
632  * Enter a synchronisation point: activates the job given by 'callback'
633  * and 'closure' using 'group' and 'timeout' to control sequencing and
634  * execution time.
635  * @param group the group for sequencing jobs
636  * @param timeout the time in seconds allocated to the job
637  * @param callback the callback that will handle the job.
638  *                 it receives 3 parameters: 'signum' that will be 0
639  *                 on normal flow or the catched signal number in case
640  *                 of interrupted flow, the context 'closure' as given and
641  *                 a 'jobloop' reference that must be used when the job is
642  *                 terminated to unlock the current execution flow.
643  * @param closure the argument to the callback
644  * @return 0 on success or -1 in case of error
645  */
646 int jobs_enter(
647                 const void *group,
648                 int timeout,
649                 void (*callback)(int signum, void *closure, struct jobloop *jobloop),
650                 void *closure
651 )
652 {
653         struct sync sync;
654
655         sync.enter = callback;
656         sync.arg = closure;
657         return do_sync(group, timeout, enter_cb, &sync);
658 }
659
660 /**
661  * Unlocks the execution flow designed by 'jobloop'.
662  * @param jobloop indication of the flow to unlock
663  * @return 0 in case of success of -1 on error
664  */
665 int jobs_leave(struct jobloop *jobloop)
666 {
667         struct thread *t;
668
669         pthread_mutex_lock(&mutex);
670         t = threads;
671         while (t && t != (struct thread*)jobloop)
672                 t = t->next;
673         if (!t) {
674                 errno = EINVAL;
675         } else {
676                 t->stop = 1;
677                 if (t->waits)
678                         pthread_cond_broadcast(&cond);
679         }
680         pthread_mutex_unlock(&mutex);
681         return -!t;
682 }
683
684 /**
685  * Calls synchronously the job represented by 'callback' and 'arg1'
686  * for the 'group' and the 'timeout' and waits for its completion.
687  * @param group    The group of the job or NULL when no group.
688  * @param timeout  The maximum execution time in seconds of the job
689  *                 or 0 for unlimited time.
690  * @param callback The function to execute for achieving the job.
691  *                 Its first parameter is either 0 on normal flow
692  *                 or the signal number that broke the normal flow.
693  *                 The remaining parameter is the parameter 'arg1'
694  *                 given here.
695  * @param arg      The second argument for 'callback'
696  * @return 0 in case of success or -1 in case of error
697  */
698 int jobs_call(
699                 const void *group,
700                 int timeout,
701                 void (*callback)(int, void*),
702                 void *arg)
703 {
704         struct sync sync;
705
706         sync.callback = callback;
707         sync.arg = arg;
708
709         return do_sync(group, timeout, call_cb, &sync);
710 }
711
712 /**
713  * Internal callback for evloop management.
714  * The effect of this function is hidden: it exits
715  * the waiting poll if any. Then it wakes up a thread
716  * awaiting the evloop using signal.
717  */
718 static int on_evloop_efd(sd_event_source *s, int fd, uint32_t revents, void *userdata)
719 {
720         uint64_t x;
721         struct evloop *evloop = userdata;
722         read(evloop->efd, &x, sizeof x);
723         pthread_mutex_lock(&mutex);
724         pthread_cond_broadcast(&evloop->cond);
725         pthread_mutex_unlock(&mutex);
726         return 1;
727 }
728
729 /* temporary hack */
730 #if !defined(REMOVE_SYSTEMD_EVENT)
731 __attribute__((unused))
732 #endif
733 static void evloop_callback(void *arg, uint32_t event, struct fdev *fdev)
734 {
735         sig_monitor(0, evloop_run, arg);
736 }
737
738 /**
739  * Gets a sd_event item for the current thread.
740  * @return a sd_event or NULL in case of error
741  */
742 static struct sd_event *get_sd_event_locked()
743 {
744         struct evloop *el;
745         uint64_t x;
746         int rc;
747
748         /* creates the evloop on need */
749         el = &evloop[0];
750         if (!el->sdev) {
751                 /* start the creation */
752                 el->state = 0;
753                 /* creates the eventfd for waking up polls */
754                 el->efd = eventfd(0, EFD_CLOEXEC);
755                 if (el->efd < 0) {
756                         ERROR("can't make eventfd for events");
757                         goto error1;
758                 }
759                 /* create the systemd event loop */
760                 rc = sd_event_new(&el->sdev);
761                 if (rc < 0) {
762                         ERROR("can't make new event loop");
763                         goto error2;
764                 }
765                 /* put the eventfd in the event loop */
766                 rc = sd_event_add_io(el->sdev, NULL, el->efd, EPOLLIN, on_evloop_efd, el);
767                 if (rc < 0) {
768                         ERROR("can't register eventfd");
769 #if !defined(REMOVE_SYSTEMD_EVENT)
770                         sd_event_unref(el->sdev);
771                         el->sdev = NULL;
772 error2:
773                         close(el->efd);
774 error1:
775                         return NULL;
776                 }
777 #else
778                         goto error3;
779                 }
780                 /* handle the event loop */
781                 el->fdev = fdev_epoll_add(get_fdevepoll(), sd_event_get_fd(el->sdev));
782                 if (!el->fdev) {
783                         ERROR("can't create fdev");
784 error3:
785                         sd_event_unref(el->sdev);
786 error2:
787                         close(el->efd);
788 error1:
789                         memset(el, 0, sizeof *el);
790                         return NULL;
791                 }
792                 fdev_set_autoclose(el->fdev, 0);
793                 fdev_set_events(el->fdev, EPOLLIN);
794                 fdev_set_callback(el->fdev, evloop_callback, el);
795 #endif
796         }
797
798         /* attach the event loop to the current thread */
799         if (current_evloop != el) {
800                 if (current_evloop)
801                         __atomic_and_fetch(&current_evloop->state, ~EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
802                 current_evloop = el;
803                 __atomic_or_fetch(&el->state, EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
804         }
805
806         /* wait for a modifiable event loop */
807         while (__atomic_load_n(&el->state, __ATOMIC_RELAXED) & EVLOOP_STATE_WAIT) {
808                 x = 1;
809                 write(el->efd, &x, sizeof x);
810                 pthread_cond_wait(&el->cond, &mutex);
811         }
812
813         return el->sdev;
814 }
815
816 /**
817  * Gets a sd_event item for the current thread.
818  * @return a sd_event or NULL in case of error
819  */
820 struct sd_event *jobs_get_sd_event()
821 {
822         struct sd_event *result;
823
824         pthread_mutex_lock(&mutex);
825         result = get_sd_event_locked();
826         pthread_mutex_unlock(&mutex);
827
828         return result;
829 }
830
831 #if defined(REMOVE_SYSTEMD_EVENT)
832 /**
833  * Gets the fdev_epoll item.
834  * @return a fdev_epoll or NULL in case of error
835  */
836 struct fdev_epoll *jobs_get_fdev_epoll()
837 {
838         struct fdev_epoll *result;
839
840         pthread_mutex_lock(&mutex);
841         result = get_fdevepoll();
842         pthread_mutex_unlock(&mutex);
843
844         return result;
845 }
846 #endif
847
848 /**
849  * Enter the jobs processing loop.
850  * @param allowed_count Maximum count of thread for jobs including this one
851  * @param start_count   Count of thread to start now, must be lower.
852  * @param waiter_count  Maximum count of jobs that can be waiting.
853  * @param start         The start routine to activate (can't be NULL)
854  * @return 0 in case of success or -1 in case of error.
855  */
856 int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)(int signum, void* arg), void *arg)
857 {
858         int rc, launched;
859         struct thread me;
860         struct job *job;
861
862         assert(allowed_count >= 1);
863         assert(start_count >= 0);
864         assert(waiter_count > 0);
865         assert(start_count <= allowed_count);
866
867         rc = -1;
868         pthread_mutex_lock(&mutex);
869
870         /* check whether already running */
871         if (current_thread || allowed) {
872                 ERROR("thread already started");
873                 errno = EINVAL;
874                 goto error;
875         }
876
877         /* start */
878         if (sig_monitor_init() < 0) {
879                 ERROR("failed to initialise signal handlers");
880                 goto error;
881         }
882
883         /* records the allowed count */
884         allowed = allowed_count;
885         started = 0;
886         running = 0;
887         remains = waiter_count;
888
889 #if HAS_WATCHDOG
890         /* set the watchdog */
891         if (sd_watchdog_enabled(0, NULL))
892                 sd_event_set_watchdog(get_sd_event_locked(), 1);
893 #endif
894
895         /* start at least one thread */
896         launched = 0;
897         while ((launched + 1) < start_count) {
898                 if (start_one_thread() != 0) {
899                         ERROR("Not all threads can be started");
900                         goto error;
901                 }
902                 launched++;
903         }
904
905         /* queue the start job */
906         job = job_create(NULL, 0, start, arg);
907         if (!job) {
908                 ERROR("out of memory");
909                 errno = ENOMEM;
910                 goto error;
911         }
912         job_add(job);
913         remains--;
914
915         /* run until end */
916         thread_run(&me);
917         rc = 0;
918 error:
919         pthread_mutex_unlock(&mutex);
920         return rc;
921 }
922
923 /**
924  * Terminate all the threads and cancel all pending jobs.
925  */
926 void jobs_terminate()
927 {
928         struct job *job, *head, *tail;
929         pthread_t me, *others;
930         struct thread *t;
931         int count;
932
933         /* how am i? */
934         me = pthread_self();
935
936         /* request all threads to stop */
937         pthread_mutex_lock(&mutex);
938         allowed = 0;
939
940         /* count the number of threads */
941         count = 0;
942         t = threads;
943         while (t) {
944                 if (!t->upper && !pthread_equal(t->tid, me))
945                         count++;
946                 t = t->next;
947         }
948
949         /* fill the array of threads */
950         others = alloca(count * sizeof *others);
951         count = 0;
952         t = threads;
953         while (t) {
954                 if (!t->upper && !pthread_equal(t->tid, me))
955                         others[count++] = t->tid;
956                 t = t->next;
957         }
958
959         /* stops the threads */
960         t = threads;
961         while (t) {
962                 t->stop = 1;
963                 t = t->next;
964         }
965
966         /* wait the threads */
967         pthread_cond_broadcast(&cond);
968         pthread_mutex_unlock(&mutex);
969         while (count)
970                 pthread_join(others[--count], NULL);
971         pthread_mutex_lock(&mutex);
972
973         /* cancel pending jobs of other threads */
974         remains = 0;
975         head = first_job;
976         first_job = NULL;
977         tail = NULL;
978         while (head) {
979                 /* unlink the job */
980                 job = head;
981                 head = job->next;
982
983                 /* search if job is stacked for current */
984                 t = current_thread;
985                 while (t && t->job != job)
986                         t = t->upper;
987                 if (t) {
988                         /* yes, relink it at end */
989                         if (tail)
990                                 tail->next = job;
991                         else
992                                 first_job = job;
993                         tail = job;
994                         job->next = NULL;
995                 } else {
996                         /* no cancel the job */
997                         pthread_mutex_unlock(&mutex);
998                         sig_monitor(0, job_cancel, job);
999                         free(job);
1000                         pthread_mutex_lock(&mutex);
1001                 }
1002         }
1003         pthread_mutex_unlock(&mutex);
1004 }
1005