Merge "main-afb-daemon: Export API after initialization"
[src/app-framework-binder.git] / src / jobs.c
1 /*
2  * Copyright (C) 2016, 2017, 2018 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #if defined(NO_JOBS_WATCHDOG)
21 #   define HAS_WATCHDOG 0
22 #else
23 #   define HAS_WATCHDOG 1
24 #endif
25
26 #include <stdlib.h>
27 #include <stdint.h>
28 #include <unistd.h>
29 #include <signal.h>
30 #include <string.h>
31 #include <time.h>
32 #include <sys/syscall.h>
33 #include <pthread.h>
34 #include <errno.h>
35 #include <assert.h>
36 #include <sys/eventfd.h>
37
38 #include <systemd/sd-event.h>
39 #include "fdev.h"
40 #if HAS_WATCHDOG
41 #include <systemd/sd-daemon.h>
42 #endif
43
44 #include "jobs.h"
45 #include "sig-monitor.h"
46 #include "verbose.h"
47
48 #if defined(REMOVE_SYSTEMD_EVENT)
49 #include "fdev-epoll.h"
50 #endif
51
52 #define EVENT_TIMEOUT_TOP       ((uint64_t)-1)
53 #define EVENT_TIMEOUT_CHILD     ((uint64_t)10000)
54
55 /** Internal shortcut for callback */
56 typedef void (*job_cb_t)(int, void*);
57
58 /** Description of a pending job */
59 struct job
60 {
61         struct job *next;    /**< link to the next job enqueued */
62         const void *group;   /**< group of the request */
63         job_cb_t callback;   /**< processing callback */
64         void *arg;           /**< argument */
65         int timeout;         /**< timeout in second for processing the request */
66         unsigned blocked: 1; /**< is an other request blocking this one ? */
67         unsigned dropped: 1; /**< is removed ? */
68 };
69
70 /** Description of handled event loops */
71 struct evloop
72 {
73         unsigned state;        /**< encoded state */
74         int efd;               /**< event notification */
75         struct sd_event *sdev; /**< the systemd event loop */
76         pthread_cond_t  cond;  /**< condition */
77         struct fdev *fdev;     /**< handling of events */
78 };
79
80 #define EVLOOP_STATE_WAIT           1U
81 #define EVLOOP_STATE_RUN            2U
82 #define EVLOOP_STATE_LOCK           4U
83
84 /** Description of threads */
85 struct thread
86 {
87         struct thread *next;   /**< next thread of the list */
88         struct thread *upper;  /**< upper same thread */
89         struct job *job;       /**< currently processed job */
90         pthread_t tid;         /**< the thread id */
91         volatile unsigned stop: 1;      /**< stop requested */
92         volatile unsigned waits: 1;     /**< is waiting? */
93 };
94
95 /**
96  * Description of synchonous callback
97  */
98 struct sync
99 {
100         struct thread thread;   /**< thread loop data */
101         union {
102                 void (*callback)(int, void*);   /**< the synchronous callback */
103                 void (*enter)(int signum, void *closure, struct jobloop *jobloop);
104                                 /**< the entering synchronous routine */
105         };
106         void *arg;              /**< the argument of the callback */
107 };
108
109
110 /* synchronisation of threads */
111 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
112 static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
113
114 /* count allowed, started and running threads */
115 static int allowed = 0; /** allowed count of threads */
116 static int started = 0; /** started count of threads */
117 static int running = 0; /** running count of threads */
118 static int remains = 0; /** allowed count of waiting jobs */
119
120 /* list of threads */
121 static struct thread *threads;
122 static _Thread_local struct thread *current_thread;
123 static _Thread_local struct evloop *current_evloop;
124
125 /* queue of pending jobs */
126 static struct job *first_job;
127 static struct job *free_jobs;
128
129 /* event loop */
130 static struct evloop evloop[1];
131
132 #if defined(REMOVE_SYSTEMD_EVENT)
133 static struct fdev_epoll *fdevepoll;
134 static int waitevt;
135 #endif
136
137 /**
138  * Create a new job with the given parameters
139  * @param group    the group of the job
140  * @param timeout  the timeout of the job (0 if none)
141  * @param callback the function that achieves the job
142  * @param arg      the argument of the callback
143  * @return the created job unblock or NULL when no more memory
144  */
145 static struct job *job_create(
146                 const void *group,
147                 int timeout,
148                 job_cb_t callback,
149                 void *arg)
150 {
151         struct job *job;
152
153         /* try recyle existing job */
154         job = free_jobs;
155         if (job)
156                 free_jobs = job->next;
157         else {
158                 /* allocation without blocking */
159                 pthread_mutex_unlock(&mutex);
160                 job = malloc(sizeof *job);
161                 pthread_mutex_lock(&mutex);
162                 if (!job) {
163                         errno = ENOMEM;
164                         goto end;
165                 }
166         }
167         /* initialises the job */
168         job->group = group;
169         job->timeout = timeout;
170         job->callback = callback;
171         job->arg = arg;
172         job->blocked = 0;
173         job->dropped = 0;
174 end:
175         return job;
176 }
177
178 /**
179  * Adds 'job' at the end of the list of jobs, marking it
180  * as blocked if an other job with the same group is pending.
181  * @param job the job to add
182  */
183 static void job_add(struct job *job)
184 {
185         const void *group;
186         struct job *ijob, **pjob;
187
188         /* prepare to add */
189         group = job->group;
190         job->next = NULL;
191
192         /* search end and blockers */
193         pjob = &first_job;
194         ijob = first_job;
195         while (ijob) {
196                 if (group && ijob->group == group)
197                         job->blocked = 1;
198                 pjob = &ijob->next;
199                 ijob = ijob->next;
200         }
201
202         /* queue the jobs */
203         *pjob = job;
204 }
205
206 /**
207  * Get the next job to process or NULL if none.
208  * @return the first job that isn't blocked or NULL
209  */
210 static inline struct job *job_get()
211 {
212         struct job *job = first_job;
213         while (job && job->blocked)
214                 job = job->next;
215         return job;
216 }
217
218 /**
219  * Releases the processed 'job': removes it
220  * from the list of jobs and unblock the first
221  * pending job of the same group if any.
222  * @param job the job to release
223  */
224 static inline void job_release(struct job *job)
225 {
226         struct job *ijob, **pjob;
227         const void *group;
228
229         /* first unqueue the job */
230         pjob = &first_job;
231         ijob = first_job;
232         while (ijob != job) {
233                 pjob = &ijob->next;
234                 ijob = ijob->next;
235         }
236         *pjob = job->next;
237
238         /* then unblock jobs of the same group */
239         group = job->group;
240         if (group) {
241                 ijob = job->next;
242                 while (ijob && ijob->group != group)
243                         ijob = ijob->next;
244                 if (ijob)
245                         ijob->blocked = 0;
246         }
247
248         /* recycle the job */
249         job->next = free_jobs;
250         free_jobs = job;
251 }
252
253 /**
254  * Monitored cancel callback for a job.
255  * This function is called by the monitor
256  * to cancel the job when the safe environment
257  * is set.
258  * @param signum 0 on normal flow or the number
259  *               of the signal that interrupted the normal
260  *               flow, isn't used
261  * @param arg    the job to run
262  */
263 static void job_cancel(int signum, void *arg)
264 {
265         struct job *job = arg;
266         job->callback(SIGABRT, job->arg);
267 }
268
269 #if defined(REMOVE_SYSTEMD_EVENT)
270 /**
271  * Gets a fdev_epoll item.
272  * @return a fdev_epoll or NULL in case of error
273  */
274 static struct fdev_epoll *get_fdevepoll()
275 {
276         struct fdev_epoll *result;
277
278         result = fdevepoll;
279         if (!result)
280                 result = fdevepoll = fdev_epoll_create();
281
282         return result;
283 }
284 #endif
285
286 /**
287  * Monitored normal callback for events.
288  * This function is called by the monitor
289  * to run the event loop when the safe environment
290  * is set.
291  * @param signum 0 on normal flow or the number
292  *               of the signal that interrupted the normal
293  *               flow
294  * @param arg     the events to run
295  */
296 static void evloop_run(int signum, void *arg)
297 {
298         int rc;
299         struct sd_event *se;
300         struct evloop *el = arg;
301
302         if (!signum) {
303                 current_evloop = el;
304                 __atomic_store_n(&el->state, EVLOOP_STATE_LOCK|EVLOOP_STATE_RUN|EVLOOP_STATE_WAIT, __ATOMIC_RELAXED);
305                 se = el->sdev;
306                 rc = sd_event_prepare(se);
307                 if (rc < 0) {
308                         errno = -rc;
309                         CRITICAL("sd_event_prepare returned an error (state: %d): %m", sd_event_get_state(se));
310                         abort();
311                 } else {
312                         if (rc == 0) {
313                                 rc = sd_event_wait(se, (uint64_t)(int64_t)-1);
314                                 if (rc < 0) {
315                                         errno = -rc;
316                                         ERROR("sd_event_wait returned an error (state: %d): %m", sd_event_get_state(se));
317                                 }
318                         }
319                         __atomic_and_fetch(&el->state, ~(EVLOOP_STATE_WAIT), __ATOMIC_RELAXED);
320
321                         if (rc > 0) {
322                                 rc = sd_event_dispatch(se);
323                                 if (rc < 0) {
324                                         errno = -rc;
325                                         ERROR("sd_event_dispatch returned an error (state: %d): %m", sd_event_get_state(se));
326                                 }
327                         }
328                 }
329         }
330         __atomic_and_fetch(&el->state, ~(EVLOOP_STATE_WAIT|EVLOOP_STATE_RUN), __ATOMIC_RELAXED);
331 }
332
333
334 #if defined(REMOVE_SYSTEMD_EVENT)
335 /**
336  * Monitored normal loop for waiting events.
337  * @param signum 0 on normal flow or the number
338  *               of the signal that interrupted the normal
339  *               flow
340  * @param arg     the events to run
341  */
342 static void monitored_wait_and_dispatch(int signum, void *arg)
343 {
344         struct fdev_epoll *fdev_epoll = arg;
345         if (!signum) {
346                 fdev_epoll_wait_and_dispatch(fdev_epoll, -1);
347         }
348 }
349 #endif
350
351 /**
352  * Main processing loop of threads processing jobs.
353  * The loop must be called with the mutex locked
354  * and it returns with the mutex locked.
355  * @param me the description of the thread to use
356  * TODO: how are timeout handled when reentering?
357  */
358 static void thread_run(volatile struct thread *me)
359 {
360         struct thread **prv;
361         struct job *job;
362 #if !defined(REMOVE_SYSTEMD_EVENT)
363         struct evloop *el;
364 #endif
365
366         /* initialize description of itself and link it in the list */
367         me->tid = pthread_self();
368         me->stop = 0;
369         me->waits = 0;
370         me->upper = current_thread;
371         if (!current_thread) {
372                 started++;
373                 sig_monitor_init_timeouts();
374         }
375         me->next = threads;
376         threads = (struct thread*)me;
377         current_thread = (struct thread*)me;
378
379         /* loop until stopped */
380         while (!me->stop) {
381                 /* release the event loop */
382                 if (current_evloop) {
383                         __atomic_and_fetch(&current_evloop->state, ~EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
384                         current_evloop = NULL;
385                 }
386
387                 /* get a job */
388                 job = job_get();
389                 if (job) {
390                         /* prepare running the job */
391                         remains++; /* increases count of job that can wait */
392                         job->blocked = 1; /* mark job as blocked */
393                         me->job = job; /* record the job (only for terminate) */
394
395                         /* run the job */
396                         pthread_mutex_unlock(&mutex);
397                         sig_monitor(job->timeout, job->callback, job->arg);
398                         pthread_mutex_lock(&mutex);
399
400                         /* release the run job */
401                         job_release(job);
402 #if !defined(REMOVE_SYSTEMD_EVENT)
403                 } else {
404                         /* no job, check events */
405                         el = &evloop[0];
406                         if (el->sdev && !__atomic_load_n(&el->state, __ATOMIC_RELAXED)) {
407                                 /* run the events */
408                                 __atomic_store_n(&el->state, EVLOOP_STATE_LOCK|EVLOOP_STATE_RUN|EVLOOP_STATE_WAIT, __ATOMIC_RELAXED);
409                                 current_evloop = el;
410                                 pthread_mutex_unlock(&mutex);
411                                 sig_monitor(0, evloop_run, el);
412                                 pthread_mutex_lock(&mutex);
413                         } else {
414                                 /* no job and not events */
415                                 running--;
416                                 if (!running)
417                                         ERROR("Entering job deep sleep! Check your bindings.");
418                                 me->waits = 1;
419                                 pthread_cond_wait(&cond, &mutex);
420                                 me->waits = 0;
421                                 running++;
422                         }
423 #else
424                 } else if (waitevt) {
425                         /* no job and not events */
426                         running--;
427                         if (!running)
428                                 ERROR("Entering job deep sleep! Check your bindings.");
429                         me->waits = 1;
430                         pthread_cond_wait(&cond, &mutex);
431                         me->waits = 0;
432                         running++;
433                 } else {
434                         /* wait for events */
435                         waitevt = 1;
436                         pthread_mutex_unlock(&mutex);
437                         sig_monitor(0, monitored_wait_and_dispatch, get_fdevepoll());
438                         pthread_mutex_lock(&mutex);
439                         waitevt = 0;
440 #endif
441                 }
442         }
443
444         /* release the event loop */
445         if (current_evloop) {
446                 __atomic_and_fetch(&current_evloop->state, ~EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
447                 current_evloop = NULL;
448         }
449
450         /* unlink the current thread and cleanup */
451         prv = &threads;
452         while (*prv != me)
453                 prv = &(*prv)->next;
454         *prv = me->next;
455         current_thread = me->upper;
456         if (!current_thread) {
457                 sig_monitor_clean_timeouts();
458                 started--;
459         }
460 }
461
462 /**
463  * Entry point for created threads.
464  * @param data not used
465  * @return NULL
466  */
467 static void *thread_main(void *data)
468 {
469         struct thread me;
470
471         pthread_mutex_lock(&mutex);
472         running++;
473         thread_run(&me);
474         running--;
475         pthread_mutex_unlock(&mutex);
476         return NULL;
477 }
478
479 /**
480  * Starts a new thread
481  * @return 0 in case of success or -1 in case of error
482  */
483 static int start_one_thread()
484 {
485         pthread_t tid;
486         int rc;
487
488         rc = pthread_create(&tid, NULL, thread_main, NULL);
489         if (rc != 0) {
490                 /* errno = rc; */
491                 WARNING("not able to start thread: %m");
492                 rc = -1;
493         }
494         return rc;
495 }
496
497 /**
498  * Queues a new asynchronous job represented by 'callback' and 'arg'
499  * for the 'group' and the 'timeout'.
500  * Jobs are queued FIFO and are possibly executed in parallel
501  * concurrently except for job of the same group that are
502  * executed sequentially in FIFO order.
503  * @param group    The group of the job or NULL when no group.
504  * @param timeout  The maximum execution time in seconds of the job
505  *                 or 0 for unlimited time.
506  * @param callback The function to execute for achieving the job.
507  *                 Its first parameter is either 0 on normal flow
508  *                 or the signal number that broke the normal flow.
509  *                 The remaining parameter is the parameter 'arg1'
510  *                 given here.
511  * @param arg      The second argument for 'callback'
512  * @return 0 in case of success or -1 in case of error
513  */
514 int jobs_queue(
515                 const void *group,
516                 int timeout,
517                 void (*callback)(int, void*),
518                 void *arg)
519 {
520         const char *info;
521         struct job *job;
522         int rc;
523
524         pthread_mutex_lock(&mutex);
525
526         /* allocates the job */
527         job = job_create(group, timeout, callback, arg);
528         if (!job) {
529                 errno = ENOMEM;
530                 info = "out of memory";
531                 goto error;
532         }
533
534         /* check availability */
535         if (remains == 0) {
536                 errno = EBUSY;
537                 info = "too many jobs";
538                 goto error2;
539         }
540
541         /* start a thread if needed */
542         if (running == started && started < allowed) {
543                 /* all threads are busy and a new can be started */
544                 rc = start_one_thread();
545                 if (rc < 0 && started == 0) {
546                         info = "can't start first thread";
547                         goto error2;
548                 }
549         }
550
551         /* queues the job */
552         remains--;
553         job_add(job);
554
555         /* signal an existing job */
556         pthread_cond_signal(&cond);
557         pthread_mutex_unlock(&mutex);
558         return 0;
559
560 error2:
561         job->next = free_jobs;
562         free_jobs = job;
563 error:
564         ERROR("can't process job with threads: %s, %m", info);
565         pthread_mutex_unlock(&mutex);
566         return -1;
567 }
568
569 /**
570  * Internal helper function for 'jobs_enter'.
571  * @see jobs_enter, jobs_leave
572  */
573 static void enter_cb(int signum, void *closure)
574 {
575         struct sync *sync = closure;
576         sync->enter(signum, sync->arg, (void*)&sync->thread);
577 }
578
579 /**
580  * Internal helper function for 'jobs_call'.
581  * @see jobs_call
582  */
583 static void call_cb(int signum, void *closure)
584 {
585         struct sync *sync = closure;
586         sync->callback(signum, sync->arg);
587         jobs_leave((void*)&sync->thread);
588 }
589
590 /**
591  * Internal helper for synchronous jobs. It enters
592  * a new thread loop for evaluating the given job
593  * as recorded by the couple 'sync_cb' and 'sync'.
594  * @see jobs_call, jobs_enter, jobs_leave
595  */
596 static int do_sync(
597                 const void *group,
598                 int timeout,
599                 void (*sync_cb)(int signum, void *closure),
600                 struct sync *sync
601 )
602 {
603         struct job *job;
604
605         pthread_mutex_lock(&mutex);
606
607         /* allocates the job */
608         job = job_create(group, timeout, sync_cb, sync);
609         if (!job) {
610                 ERROR("out of memory");
611                 errno = ENOMEM;
612                 pthread_mutex_unlock(&mutex);
613                 return -1;
614         }
615
616         /* queues the job */
617         job_add(job);
618
619         /* run until stopped */
620         thread_run(&sync->thread);
621         pthread_mutex_unlock(&mutex);
622         return 0;
623 }
624
625 /**
626  * Enter a synchronisation point: activates the job given by 'callback'
627  * and 'closure' using 'group' and 'timeout' to control sequencing and
628  * execution time.
629  * @param group the group for sequencing jobs
630  * @param timeout the time in seconds allocated to the job
631  * @param callback the callback that will handle the job.
632  *                 it receives 3 parameters: 'signum' that will be 0
633  *                 on normal flow or the catched signal number in case
634  *                 of interrupted flow, the context 'closure' as given and
635  *                 a 'jobloop' reference that must be used when the job is
636  *                 terminated to unlock the current execution flow.
637  * @param closure the argument to the callback
638  * @return 0 on success or -1 in case of error
639  */
640 int jobs_enter(
641                 const void *group,
642                 int timeout,
643                 void (*callback)(int signum, void *closure, struct jobloop *jobloop),
644                 void *closure
645 )
646 {
647         struct sync sync;
648
649         sync.enter = callback;
650         sync.arg = closure;
651         return do_sync(group, timeout, enter_cb, &sync);
652 }
653
654 /**
655  * Unlocks the execution flow designed by 'jobloop'.
656  * @param jobloop indication of the flow to unlock
657  * @return 0 in case of success of -1 on error
658  */
659 int jobs_leave(struct jobloop *jobloop)
660 {
661         struct thread *t;
662
663         pthread_mutex_lock(&mutex);
664         t = threads;
665         while (t && t != (struct thread*)jobloop)
666                 t = t->next;
667         if (!t) {
668                 errno = EINVAL;
669         } else {
670                 t->stop = 1;
671                 if (t->waits)
672                         pthread_cond_broadcast(&cond);
673         }
674         pthread_mutex_unlock(&mutex);
675         return -!t;
676 }
677
678 /**
679  * Calls synchronously the job represented by 'callback' and 'arg1'
680  * for the 'group' and the 'timeout' and waits for its completion.
681  * @param group    The group of the job or NULL when no group.
682  * @param timeout  The maximum execution time in seconds of the job
683  *                 or 0 for unlimited time.
684  * @param callback The function to execute for achieving the job.
685  *                 Its first parameter is either 0 on normal flow
686  *                 or the signal number that broke the normal flow.
687  *                 The remaining parameter is the parameter 'arg1'
688  *                 given here.
689  * @param arg      The second argument for 'callback'
690  * @return 0 in case of success or -1 in case of error
691  */
692 int jobs_call(
693                 const void *group,
694                 int timeout,
695                 void (*callback)(int, void*),
696                 void *arg)
697 {
698         struct sync sync;
699
700         sync.callback = callback;
701         sync.arg = arg;
702
703         return do_sync(group, timeout, call_cb, &sync);
704 }
705
706 /**
707  * Internal callback for evloop management.
708  * The effect of this function is hidden: it exits
709  * the waiting poll if any. Then it wakes up a thread
710  * awaiting the evloop using signal.
711  */
712 static int on_evloop_efd(sd_event_source *s, int fd, uint32_t revents, void *userdata)
713 {
714         uint64_t x;
715         struct evloop *evloop = userdata;
716         read(evloop->efd, &x, sizeof x);
717         pthread_mutex_lock(&mutex);
718         pthread_cond_broadcast(&evloop->cond);
719         pthread_mutex_unlock(&mutex);
720         return 1;
721 }
722
723 /* temporary hack */
724 #if !defined(REMOVE_SYSTEMD_EVENT)
725 __attribute__((unused))
726 #endif
727 static void evloop_callback(void *arg, uint32_t event, struct fdev *fdev)
728 {
729         sig_monitor(0, evloop_run, arg);
730 }
731
732 /**
733  * Gets a sd_event item for the current thread.
734  * @return a sd_event or NULL in case of error
735  */
736 static struct sd_event *get_sd_event_locked()
737 {
738         struct evloop *el;
739         uint64_t x;
740         int rc;
741
742         /* creates the evloop on need */
743         el = &evloop[0];
744         if (!el->sdev) {
745                 /* start the creation */
746                 el->state = 0;
747                 /* creates the eventfd for waking up polls */
748                 el->efd = eventfd(0, EFD_CLOEXEC);
749                 if (el->efd < 0) {
750                         ERROR("can't make eventfd for events");
751                         goto error1;
752                 }
753                 /* create the systemd event loop */
754                 rc = sd_event_new(&el->sdev);
755                 if (rc < 0) {
756                         ERROR("can't make new event loop");
757                         goto error2;
758                 }
759                 /* put the eventfd in the event loop */
760                 rc = sd_event_add_io(el->sdev, NULL, el->efd, EPOLLIN, on_evloop_efd, el);
761                 if (rc < 0) {
762                         ERROR("can't register eventfd");
763 #if !defined(REMOVE_SYSTEMD_EVENT)
764                         sd_event_unref(el->sdev);
765                         el->sdev = NULL;
766 error2:
767                         close(el->efd);
768 error1:
769                         return NULL;
770                 }
771 #else
772                         goto error3;
773                 }
774                 /* handle the event loop */
775                 el->fdev = fdev_epoll_add(get_fdevepoll(), sd_event_get_fd(el->sdev));
776                 if (!el->fdev) {
777                         ERROR("can't create fdev");
778 error3:
779                         sd_event_unref(el->sdev);
780 error2:
781                         close(el->efd);
782 error1:
783                         memset(el, 0, sizeof *el);
784                         return NULL;
785                 }
786                 fdev_set_autoclose(el->fdev, 0);
787                 fdev_set_events(el->fdev, EPOLLIN);
788                 fdev_set_callback(el->fdev, evloop_callback, el);
789 #endif
790         }
791
792         /* attach the event loop to the current thread */
793         if (current_evloop != el) {
794                 if (current_evloop)
795                         __atomic_and_fetch(&current_evloop->state, ~EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
796                 current_evloop = el;
797                 __atomic_or_fetch(&el->state, EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
798         }
799
800         /* wait for a modifiable event loop */
801         while (__atomic_load_n(&el->state, __ATOMIC_RELAXED) & EVLOOP_STATE_WAIT) {
802                 x = 1;
803                 write(el->efd, &x, sizeof x);
804                 pthread_cond_wait(&el->cond, &mutex);
805         }
806
807         return el->sdev;
808 }
809
810 /**
811  * Gets a sd_event item for the current thread.
812  * @return a sd_event or NULL in case of error
813  */
814 struct sd_event *jobs_get_sd_event()
815 {
816         struct sd_event *result;
817
818         pthread_mutex_lock(&mutex);
819         result = get_sd_event_locked();
820         pthread_mutex_unlock(&mutex);
821
822         return result;
823 }
824
825 #if defined(REMOVE_SYSTEMD_EVENT)
826 /**
827  * Gets the fdev_epoll item.
828  * @return a fdev_epoll or NULL in case of error
829  */
830 struct fdev_epoll *jobs_get_fdev_epoll()
831 {
832         struct fdev_epoll *result;
833
834         pthread_mutex_lock(&mutex);
835         result = get_fdevepoll();
836         pthread_mutex_unlock(&mutex);
837
838         return result;
839 }
840 #endif
841
842 /**
843  * Enter the jobs processing loop.
844  * @param allowed_count Maximum count of thread for jobs including this one
845  * @param start_count   Count of thread to start now, must be lower.
846  * @param waiter_count  Maximum count of jobs that can be waiting.
847  * @param start         The start routine to activate (can't be NULL)
848  * @return 0 in case of success or -1 in case of error.
849  */
850 int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)(int signum, void* arg), void *arg)
851 {
852         int rc, launched;
853         struct thread me;
854         struct job *job;
855
856         assert(allowed_count >= 1);
857         assert(start_count >= 0);
858         assert(waiter_count > 0);
859         assert(start_count <= allowed_count);
860
861         rc = -1;
862         pthread_mutex_lock(&mutex);
863
864         /* check whether already running */
865         if (current_thread || allowed) {
866                 ERROR("thread already started");
867                 errno = EINVAL;
868                 goto error;
869         }
870
871         /* records the allowed count */
872         allowed = allowed_count;
873         started = 0;
874         running = 0;
875         remains = waiter_count;
876
877 #if HAS_WATCHDOG
878         /* set the watchdog */
879         if (sd_watchdog_enabled(0, NULL))
880                 sd_event_set_watchdog(get_sd_event_locked(), 1);
881 #endif
882
883         /* start at least one thread */
884         launched = 0;
885         while ((launched + 1) < start_count) {
886                 if (start_one_thread() != 0) {
887                         ERROR("Not all threads can be started");
888                         goto error;
889                 }
890                 launched++;
891         }
892
893         /* queue the start job */
894         job = job_create(NULL, 0, start, arg);
895         if (!job) {
896                 ERROR("out of memory");
897                 errno = ENOMEM;
898                 goto error;
899         }
900         job_add(job);
901         remains--;
902
903         /* run until end */
904         thread_run(&me);
905         rc = 0;
906 error:
907         pthread_mutex_unlock(&mutex);
908         return rc;
909 }
910
911 /**
912  * Terminate all the threads and cancel all pending jobs.
913  */
914 void jobs_terminate()
915 {
916         struct job *job, *head, *tail;
917         pthread_t me, *others;
918         struct thread *t;
919         int count;
920
921         /* how am i? */
922         me = pthread_self();
923
924         /* request all threads to stop */
925         pthread_mutex_lock(&mutex);
926         allowed = 0;
927
928         /* count the number of threads */
929         count = 0;
930         t = threads;
931         while (t) {
932                 if (!t->upper && !pthread_equal(t->tid, me))
933                         count++;
934                 t = t->next;
935         }
936
937         /* fill the array of threads */
938         others = alloca(count * sizeof *others);
939         count = 0;
940         t = threads;
941         while (t) {
942                 if (!t->upper && !pthread_equal(t->tid, me))
943                         others[count++] = t->tid;
944                 t = t->next;
945         }
946
947         /* stops the threads */
948         t = threads;
949         while (t) {
950                 t->stop = 1;
951                 t = t->next;
952         }
953
954         /* wait the threads */
955         pthread_cond_broadcast(&cond);
956         pthread_mutex_unlock(&mutex);
957         while (count)
958                 pthread_join(others[--count], NULL);
959         pthread_mutex_lock(&mutex);
960
961         /* cancel pending jobs of other threads */
962         remains = 0;
963         head = first_job;
964         first_job = NULL;
965         tail = NULL;
966         while (head) {
967                 /* unlink the job */
968                 job = head;
969                 head = job->next;
970
971                 /* search if job is stacked for current */
972                 t = current_thread;
973                 while (t && t->job != job)
974                         t = t->upper;
975                 if (t) {
976                         /* yes, relink it at end */
977                         if (tail)
978                                 tail->next = job;
979                         else
980                                 first_job = job;
981                         tail = job;
982                         job->next = NULL;
983                 } else {
984                         /* no cancel the job */
985                         pthread_mutex_unlock(&mutex);
986                         sig_monitor(0, job_cancel, job);
987                         free(job);
988                         pthread_mutex_lock(&mutex);
989                 }
990         }
991         pthread_mutex_unlock(&mutex);
992 }
993