27b7dfbf63776d36738dff10078023a8240c5324
[src/app-framework-binder.git] / src / jobs.c
1 /*
2  * Copyright (C) 2016, 2017, 2018 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #if defined(NO_JOBS_WATCHDOG)
21 #   define HAS_WATCHDOG 0
22 #else
23 #   define HAS_WATCHDOG 1
24 #endif
25
26 #include <stdlib.h>
27 #include <stdint.h>
28 #include <unistd.h>
29 #include <signal.h>
30 #include <string.h>
31 #include <time.h>
32 #include <sys/syscall.h>
33 #include <pthread.h>
34 #include <errno.h>
35 #include <assert.h>
36 #include <sys/eventfd.h>
37
38 #include <systemd/sd-event.h>
39 #include "fdev.h"
40 #if HAS_WATCHDOG
41 #include <systemd/sd-daemon.h>
42 #endif
43
44 #include "jobs.h"
45 #include "sig-monitor.h"
46 #include "verbose.h"
47
48 #if defined(REMOVE_SYSTEMD_EVENT)
49 #include "fdev-epoll.h"
50 #endif
51
52 #if 0
53 #define _alert_ "do you really want to remove signal monitoring?"
54 #define sig_monitor_init_timeouts()  ((void)0)
55 #define sig_monitor_clean_timeouts() ((void)0)
56 #define sig_monitor(to,cb,arg)       (cb(0,arg))
57 #endif
58
59 #define EVENT_TIMEOUT_TOP       ((uint64_t)-1)
60 #define EVENT_TIMEOUT_CHILD     ((uint64_t)10000)
61
62 /** Internal shortcut for callback */
63 typedef void (*job_cb_t)(int, void*);
64
65 /** Description of a pending job */
66 struct job
67 {
68         struct job *next;    /**< link to the next job enqueued */
69         const void *group;   /**< group of the request */
70         job_cb_t callback;   /**< processing callback */
71         void *arg;           /**< argument */
72         int timeout;         /**< timeout in second for processing the request */
73         unsigned blocked: 1; /**< is an other request blocking this one ? */
74         unsigned dropped: 1; /**< is removed ? */
75 };
76
77 /** Description of handled event loops */
78 struct evloop
79 {
80         unsigned state;        /**< encoded state */
81         int efd;               /**< event notification */
82         struct sd_event *sdev; /**< the systemd event loop */
83         pthread_cond_t  cond;  /**< condition */
84         struct fdev *fdev;     /**< handling of events */
85 };
86
87 #define EVLOOP_STATE_WAIT           1U
88 #define EVLOOP_STATE_RUN            2U
89 #define EVLOOP_STATE_LOCK           4U
90
91 /** Description of threads */
92 struct thread
93 {
94         struct thread *next;   /**< next thread of the list */
95         struct thread *upper;  /**< upper same thread */
96         struct job *job;       /**< currently processed job */
97         pthread_t tid;         /**< the thread id */
98         volatile unsigned stop: 1;      /**< stop requested */
99         volatile unsigned waits: 1;     /**< is waiting? */
100 };
101
102 /**
103  * Description of synchonous callback
104  */
105 struct sync
106 {
107         struct thread thread;   /**< thread loop data */
108         union {
109                 void (*callback)(int, void*);   /**< the synchronous callback */
110                 void (*enter)(int signum, void *closure, struct jobloop *jobloop);
111                                 /**< the entering synchronous routine */
112         };
113         void *arg;              /**< the argument of the callback */
114 };
115
116
117 /* synchronisation of threads */
118 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
119 static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
120
121 /* count allowed, started and running threads */
122 static int allowed = 0; /** allowed count of threads */
123 static int started = 0; /** started count of threads */
124 static int running = 0; /** running count of threads */
125 static int remains = 0; /** allowed count of waiting jobs */
126
127 /* list of threads */
128 static struct thread *threads;
129 static _Thread_local struct thread *current_thread;
130 static _Thread_local struct evloop *current_evloop;
131
132 /* queue of pending jobs */
133 static struct job *first_job;
134 static struct job *free_jobs;
135
136 /* event loop */
137 static struct evloop evloop[1];
138
139 #if defined(REMOVE_SYSTEMD_EVENT)
140 static struct fdev_epoll *fdevepoll;
141 static int waitevt;
142 #endif
143
144 /**
145  * Create a new job with the given parameters
146  * @param group    the group of the job
147  * @param timeout  the timeout of the job (0 if none)
148  * @param callback the function that achieves the job
149  * @param arg      the argument of the callback
150  * @return the created job unblock or NULL when no more memory
151  */
152 static struct job *job_create(
153                 const void *group,
154                 int timeout,
155                 job_cb_t callback,
156                 void *arg)
157 {
158         struct job *job;
159
160         /* try recyle existing job */
161         job = free_jobs;
162         if (job)
163                 free_jobs = job->next;
164         else {
165                 /* allocation without blocking */
166                 pthread_mutex_unlock(&mutex);
167                 job = malloc(sizeof *job);
168                 pthread_mutex_lock(&mutex);
169                 if (!job) {
170                         errno = -ENOMEM;
171                         goto end;
172                 }
173         }
174         /* initialises the job */
175         job->group = group;
176         job->timeout = timeout;
177         job->callback = callback;
178         job->arg = arg;
179         job->blocked = 0;
180         job->dropped = 0;
181 end:
182         return job;
183 }
184
185 /**
186  * Adds 'job' at the end of the list of jobs, marking it
187  * as blocked if an other job with the same group is pending.
188  * @param job the job to add
189  */
190 static void job_add(struct job *job)
191 {
192         const void *group;
193         struct job *ijob, **pjob;
194
195         /* prepare to add */
196         group = job->group;
197         job->next = NULL;
198
199         /* search end and blockers */
200         pjob = &first_job;
201         ijob = first_job;
202         while (ijob) {
203                 if (group && ijob->group == group)
204                         job->blocked = 1;
205                 pjob = &ijob->next;
206                 ijob = ijob->next;
207         }
208
209         /* queue the jobs */
210         *pjob = job;
211 }
212
213 /**
214  * Get the next job to process or NULL if none.
215  * @return the first job that isn't blocked or NULL
216  */
217 static inline struct job *job_get()
218 {
219         struct job *job = first_job;
220         while (job && job->blocked)
221                 job = job->next;
222         return job;
223 }
224
225 /**
226  * Releases the processed 'job': removes it
227  * from the list of jobs and unblock the first
228  * pending job of the same group if any.
229  * @param job the job to release
230  */
231 static inline void job_release(struct job *job)
232 {
233         struct job *ijob, **pjob;
234         const void *group;
235
236         /* first unqueue the job */
237         pjob = &first_job;
238         ijob = first_job;
239         while (ijob != job) {
240                 pjob = &ijob->next;
241                 ijob = ijob->next;
242         }
243         *pjob = job->next;
244
245         /* then unblock jobs of the same group */
246         group = job->group;
247         if (group) {
248                 ijob = job->next;
249                 while (ijob && ijob->group != group)
250                         ijob = ijob->next;
251                 if (ijob)
252                         ijob->blocked = 0;
253         }
254
255         /* recycle the job */
256         job->next = free_jobs;
257         free_jobs = job;
258 }
259
260 /**
261  * Monitored cancel callback for a job.
262  * This function is called by the monitor
263  * to cancel the job when the safe environment
264  * is set.
265  * @param signum 0 on normal flow or the number
266  *               of the signal that interrupted the normal
267  *               flow, isn't used
268  * @param arg    the job to run
269  */
270 static void job_cancel(int signum, void *arg)
271 {
272         struct job *job = arg;
273         job->callback(SIGABRT, job->arg);
274 }
275
276 #if defined(REMOVE_SYSTEMD_EVENT)
277 /**
278  * Gets a fdev_epoll item.
279  * @return a fdev_epoll or NULL in case of error
280  */
281 static struct fdev_epoll *get_fdevepoll()
282 {
283         struct fdev_epoll *result;
284
285         result = fdevepoll;
286         if (!result)
287                 result = fdevepoll = fdev_epoll_create();
288
289         return result;
290 }
291 #endif
292
293 /**
294  * Monitored normal callback for events.
295  * This function is called by the monitor
296  * to run the event loop when the safe environment
297  * is set.
298  * @param signum 0 on normal flow or the number
299  *               of the signal that interrupted the normal
300  *               flow
301  * @param arg     the events to run
302  */
303 static void evloop_run(int signum, void *arg)
304 {
305         int rc;
306         struct sd_event *se;
307         struct evloop *el = arg;
308
309         if (!signum) {
310                 current_evloop = el;
311                 __atomic_store_n(&el->state, EVLOOP_STATE_LOCK|EVLOOP_STATE_RUN|EVLOOP_STATE_WAIT, __ATOMIC_RELAXED);
312                 se = el->sdev;
313                 rc = sd_event_prepare(se);
314                 if (rc < 0) {
315                         errno = -rc;
316                         CRITICAL("sd_event_prepare returned an error (state: %d): %m", sd_event_get_state(se));
317                         abort();
318                 } else {
319                         if (rc == 0) {
320                                 rc = sd_event_wait(se, (uint64_t)(int64_t)-1);
321                                 if (rc < 0) {
322                                         errno = -rc;
323                                         ERROR("sd_event_wait returned an error (state: %d): %m", sd_event_get_state(se));
324                                 }
325                         }
326                         __atomic_and_fetch(&el->state, ~(EVLOOP_STATE_WAIT), __ATOMIC_RELAXED);
327
328                         if (rc > 0) {
329                                 rc = sd_event_dispatch(se);
330                                 if (rc < 0) {
331                                         errno = -rc;
332                                         ERROR("sd_event_dispatch returned an error (state: %d): %m", sd_event_get_state(se));
333                                 }
334                         }
335                 }
336         }
337         __atomic_and_fetch(&el->state, ~(EVLOOP_STATE_WAIT|EVLOOP_STATE_RUN), __ATOMIC_RELAXED);
338 }
339
340
341 #if defined(REMOVE_SYSTEMD_EVENT)
342 /**
343  * Monitored normal loop for waiting events.
344  * @param signum 0 on normal flow or the number
345  *               of the signal that interrupted the normal
346  *               flow
347  * @param arg     the events to run
348  */
349 static void monitored_wait_and_dispatch(int signum, void *arg)
350 {
351         struct fdev_epoll *fdev_epoll = arg;
352         if (!signum) {
353                 fdev_epoll_wait_and_dispatch(fdev_epoll, -1);
354         }
355 }
356 #endif
357
358 /**
359  * Main processing loop of threads processing jobs.
360  * The loop must be called with the mutex locked
361  * and it returns with the mutex locked.
362  * @param me the description of the thread to use
363  * TODO: how are timeout handled when reentering?
364  */
365 static void thread_run(volatile struct thread *me)
366 {
367         struct thread **prv;
368         struct job *job;
369 #if !defined(REMOVE_SYSTEMD_EVENT)
370         struct evloop *el;
371 #endif
372
373         /* initialize description of itself and link it in the list */
374         me->tid = pthread_self();
375         me->stop = 0;
376         me->waits = 0;
377         me->upper = current_thread;
378         if (!current_thread) {
379                 started++;
380                 sig_monitor_init_timeouts();
381         }
382         me->next = threads;
383         threads = (struct thread*)me;
384         current_thread = (struct thread*)me;
385
386         /* loop until stopped */
387         while (!me->stop) {
388                 /* release the event loop */
389                 if (current_evloop) {
390                         __atomic_and_fetch(&current_evloop->state, ~EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
391                         current_evloop = NULL;
392                 }
393
394                 /* get a job */
395                 job = job_get();
396                 if (job) {
397                         /* prepare running the job */
398                         remains++; /* increases count of job that can wait */
399                         job->blocked = 1; /* mark job as blocked */
400                         me->job = job; /* record the job (only for terminate) */
401
402                         /* run the job */
403                         pthread_mutex_unlock(&mutex);
404                         sig_monitor(job->timeout, job->callback, job->arg);
405                         pthread_mutex_lock(&mutex);
406
407                         /* release the run job */
408                         job_release(job);
409 #if !defined(REMOVE_SYSTEMD_EVENT)
410                 } else {
411                         /* no job, check events */
412                         el = &evloop[0];
413                         if (el->sdev && !__atomic_load_n(&el->state, __ATOMIC_RELAXED)) {
414                                 /* run the events */
415                                 __atomic_store_n(&el->state, EVLOOP_STATE_LOCK|EVLOOP_STATE_RUN|EVLOOP_STATE_WAIT, __ATOMIC_RELAXED);
416                                 current_evloop = el;
417                                 pthread_mutex_unlock(&mutex);
418                                 sig_monitor(0, evloop_run, el);
419                                 pthread_mutex_lock(&mutex);
420                         } else {
421                                 /* no job and not events */
422                                 running--;
423                                 if (!running)
424                                         ERROR("Entering job deep sleep! Check your bindings.");
425                                 me->waits = 1;
426                                 pthread_cond_wait(&cond, &mutex);
427                                 me->waits = 0;
428                                 running++;
429                         }
430 #else
431                 } else if (waitevt) {
432                         /* no job and not events */
433                         running--;
434                         if (!running)
435                                 ERROR("Entering job deep sleep! Check your bindings.");
436                         me->waits = 1;
437                         pthread_cond_wait(&cond, &mutex);
438                         me->waits = 0;
439                         running++;
440                 } else {
441                         /* wait for events */
442                         waitevt = 1;
443                         pthread_mutex_unlock(&mutex);
444                         sig_monitor(0, monitored_wait_and_dispatch, get_fdevepoll());
445                         pthread_mutex_lock(&mutex);
446                         waitevt = 0;
447 #endif
448                 }
449         }
450
451         /* release the event loop */
452         if (current_evloop) {
453                 __atomic_and_fetch(&current_evloop->state, ~EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
454                 current_evloop = NULL;
455         }
456
457         /* unlink the current thread and cleanup */
458         prv = &threads;
459         while (*prv != me)
460                 prv = &(*prv)->next;
461         *prv = me->next;
462         current_thread = me->upper;
463         if (!current_thread) {
464                 sig_monitor_clean_timeouts();
465                 started--;
466         }
467 }
468
469 /**
470  * Entry point for created threads.
471  * @param data not used
472  * @return NULL
473  */
474 static void *thread_main(void *data)
475 {
476         struct thread me;
477
478         pthread_mutex_lock(&mutex);
479         running++;
480         thread_run(&me);
481         running--;
482         pthread_mutex_unlock(&mutex);
483         return NULL;
484 }
485
486 /**
487  * Starts a new thread
488  * @return 0 in case of success or -1 in case of error
489  */
490 static int start_one_thread()
491 {
492         pthread_t tid;
493         int rc;
494
495         rc = pthread_create(&tid, NULL, thread_main, NULL);
496         if (rc != 0) {
497                 /* errno = rc; */
498                 WARNING("not able to start thread: %m");
499                 rc = -1;
500         }
501         return rc;
502 }
503
504 /**
505  * Queues a new asynchronous job represented by 'callback' and 'arg'
506  * for the 'group' and the 'timeout'.
507  * Jobs are queued FIFO and are possibly executed in parallel
508  * concurrently except for job of the same group that are
509  * executed sequentially in FIFO order.
510  * @param group    The group of the job or NULL when no group.
511  * @param timeout  The maximum execution time in seconds of the job
512  *                 or 0 for unlimited time.
513  * @param callback The function to execute for achieving the job.
514  *                 Its first parameter is either 0 on normal flow
515  *                 or the signal number that broke the normal flow.
516  *                 The remaining parameter is the parameter 'arg1'
517  *                 given here.
518  * @param arg      The second argument for 'callback'
519  * @return 0 in case of success or -1 in case of error
520  */
521 int jobs_queue(
522                 const void *group,
523                 int timeout,
524                 void (*callback)(int, void*),
525                 void *arg)
526 {
527         const char *info;
528         struct job *job;
529         int rc;
530
531         pthread_mutex_lock(&mutex);
532
533         /* allocates the job */
534         job = job_create(group, timeout, callback, arg);
535         if (!job) {
536                 errno = ENOMEM;
537                 info = "out of memory";
538                 goto error;
539         }
540
541         /* check availability */
542         if (remains == 0) {
543                 errno = EBUSY;
544                 info = "too many jobs";
545                 goto error2;
546         }
547
548         /* start a thread if needed */
549         if (running == started && started < allowed) {
550                 /* all threads are busy and a new can be started */
551                 rc = start_one_thread();
552                 if (rc < 0 && started == 0) {
553                         info = "can't start first thread";
554                         goto error2;
555                 }
556         }
557
558         /* queues the job */
559         remains--;
560         job_add(job);
561
562         /* signal an existing job */
563         pthread_cond_signal(&cond);
564         pthread_mutex_unlock(&mutex);
565         return 0;
566
567 error2:
568         job->next = free_jobs;
569         free_jobs = job;
570 error:
571         ERROR("can't process job with threads: %s, %m", info);
572         pthread_mutex_unlock(&mutex);
573         return -1;
574 }
575
576 /**
577  * Internal helper function for 'jobs_enter'.
578  * @see jobs_enter, jobs_leave
579  */
580 static void enter_cb(int signum, void *closure)
581 {
582         struct sync *sync = closure;
583         sync->enter(signum, sync->arg, (void*)&sync->thread);
584 }
585
586 /**
587  * Internal helper function for 'jobs_call'.
588  * @see jobs_call
589  */
590 static void call_cb(int signum, void *closure)
591 {
592         struct sync *sync = closure;
593         sync->callback(signum, sync->arg);
594         jobs_leave((void*)&sync->thread);
595 }
596
597 /**
598  * Internal helper for synchronous jobs. It enters
599  * a new thread loop for evaluating the given job
600  * as recorded by the couple 'sync_cb' and 'sync'.
601  * @see jobs_call, jobs_enter, jobs_leave
602  */
603 static int do_sync(
604                 const void *group,
605                 int timeout,
606                 void (*sync_cb)(int signum, void *closure),
607                 struct sync *sync
608 )
609 {
610         struct job *job;
611
612         pthread_mutex_lock(&mutex);
613
614         /* allocates the job */
615         job = job_create(group, timeout, sync_cb, sync);
616         if (!job) {
617                 ERROR("out of memory");
618                 errno = ENOMEM;
619                 pthread_mutex_unlock(&mutex);
620                 return -1;
621         }
622
623         /* queues the job */
624         job_add(job);
625
626         /* run until stopped */
627         thread_run(&sync->thread);
628         pthread_mutex_unlock(&mutex);
629         return 0;
630 }
631
632 /**
633  * Enter a synchronisation point: activates the job given by 'callback'
634  * and 'closure' using 'group' and 'timeout' to control sequencing and
635  * execution time.
636  * @param group the group for sequencing jobs
637  * @param timeout the time in seconds allocated to the job
638  * @param callback the callback that will handle the job.
639  *                 it receives 3 parameters: 'signum' that will be 0
640  *                 on normal flow or the catched signal number in case
641  *                 of interrupted flow, the context 'closure' as given and
642  *                 a 'jobloop' reference that must be used when the job is
643  *                 terminated to unlock the current execution flow.
644  * @param closure the argument to the callback
645  * @return 0 on success or -1 in case of error
646  */
647 int jobs_enter(
648                 const void *group,
649                 int timeout,
650                 void (*callback)(int signum, void *closure, struct jobloop *jobloop),
651                 void *closure
652 )
653 {
654         struct sync sync;
655
656         sync.enter = callback;
657         sync.arg = closure;
658         return do_sync(group, timeout, enter_cb, &sync);
659 }
660
661 /**
662  * Unlocks the execution flow designed by 'jobloop'.
663  * @param jobloop indication of the flow to unlock
664  * @return 0 in case of success of -1 on error
665  */
666 int jobs_leave(struct jobloop *jobloop)
667 {
668         struct thread *t;
669
670         pthread_mutex_lock(&mutex);
671         t = threads;
672         while (t && t != (struct thread*)jobloop)
673                 t = t->next;
674         if (!t) {
675                 errno = EINVAL;
676         } else {
677                 t->stop = 1;
678                 if (t->waits)
679                         pthread_cond_broadcast(&cond);
680         }
681         pthread_mutex_unlock(&mutex);
682         return -!t;
683 }
684
685 /**
686  * Calls synchronously the job represented by 'callback' and 'arg1'
687  * for the 'group' and the 'timeout' and waits for its completion.
688  * @param group    The group of the job or NULL when no group.
689  * @param timeout  The maximum execution time in seconds of the job
690  *                 or 0 for unlimited time.
691  * @param callback The function to execute for achieving the job.
692  *                 Its first parameter is either 0 on normal flow
693  *                 or the signal number that broke the normal flow.
694  *                 The remaining parameter is the parameter 'arg1'
695  *                 given here.
696  * @param arg      The second argument for 'callback'
697  * @return 0 in case of success or -1 in case of error
698  */
699 int jobs_call(
700                 const void *group,
701                 int timeout,
702                 void (*callback)(int, void*),
703                 void *arg)
704 {
705         struct sync sync;
706
707         sync.callback = callback;
708         sync.arg = arg;
709
710         return do_sync(group, timeout, call_cb, &sync);
711 }
712
713 /**
714  * Internal callback for evloop management.
715  * The effect of this function is hidden: it exits
716  * the waiting poll if any. Then it wakes up a thread
717  * awaiting the evloop using signal.
718  */
719 static int on_evloop_efd(sd_event_source *s, int fd, uint32_t revents, void *userdata)
720 {
721         uint64_t x;
722         struct evloop *evloop = userdata;
723         read(evloop->efd, &x, sizeof x);
724         pthread_mutex_lock(&mutex);
725         pthread_cond_broadcast(&evloop->cond);
726         pthread_mutex_unlock(&mutex);
727         return 1;
728 }
729
730 /* temporary hack */
731 #if !defined(REMOVE_SYSTEMD_EVENT)
732 __attribute__((unused))
733 #endif
734 static void evloop_callback(void *arg, uint32_t event, struct fdev *fdev)
735 {
736         sig_monitor(0, evloop_run, arg);
737 }
738
739 /**
740  * Gets a sd_event item for the current thread.
741  * @return a sd_event or NULL in case of error
742  */
743 static struct sd_event *get_sd_event_locked()
744 {
745         struct evloop *el;
746         uint64_t x;
747         int rc;
748
749         /* creates the evloop on need */
750         el = &evloop[0];
751         if (!el->sdev) {
752                 /* start the creation */
753                 el->state = 0;
754                 /* creates the eventfd for waking up polls */
755                 el->efd = eventfd(0, EFD_CLOEXEC);
756                 if (el->efd < 0) {
757                         ERROR("can't make eventfd for events");
758                         goto error1;
759                 }
760                 /* create the systemd event loop */
761                 rc = sd_event_new(&el->sdev);
762                 if (rc < 0) {
763                         ERROR("can't make new event loop");
764                         goto error2;
765                 }
766                 /* put the eventfd in the event loop */
767                 rc = sd_event_add_io(el->sdev, NULL, el->efd, EPOLLIN, on_evloop_efd, el);
768                 if (rc < 0) {
769                         ERROR("can't register eventfd");
770 #if !defined(REMOVE_SYSTEMD_EVENT)
771                         sd_event_unref(el->sdev);
772                         el->sdev = NULL;
773 error2:
774                         close(el->efd);
775 error1:
776                         return NULL;
777                 }
778 #else
779                         goto error3;
780                 }
781                 /* handle the event loop */
782                 el->fdev = fdev_epoll_add(get_fdevepoll(), sd_event_get_fd(el->sdev));
783                 if (!el->fdev) {
784                         ERROR("can't create fdev");
785 error3:
786                         sd_event_unref(el->sdev);
787 error2:
788                         close(el->efd);
789 error1:
790                         memset(el, 0, sizeof *el);
791                         return NULL;
792                 }
793                 fdev_set_autoclose(el->fdev, 0);
794                 fdev_set_events(el->fdev, EPOLLIN);
795                 fdev_set_callback(el->fdev, evloop_callback, el);
796 #endif
797         }
798
799         /* attach the event loop to the current thread */
800         if (current_evloop != el) {
801                 if (current_evloop)
802                         __atomic_and_fetch(&current_evloop->state, ~EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
803                 current_evloop = el;
804                 __atomic_or_fetch(&el->state, EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
805         }
806
807         /* wait for a modifiable event loop */
808         while (__atomic_load_n(&el->state, __ATOMIC_RELAXED) & EVLOOP_STATE_WAIT) {
809                 x = 1;
810                 write(el->efd, &x, sizeof x);
811                 pthread_cond_wait(&el->cond, &mutex);
812         }
813
814         return el->sdev;
815 }
816
817 /**
818  * Gets a sd_event item for the current thread.
819  * @return a sd_event or NULL in case of error
820  */
821 struct sd_event *jobs_get_sd_event()
822 {
823         struct sd_event *result;
824
825         pthread_mutex_lock(&mutex);
826         result = get_sd_event_locked();
827         pthread_mutex_unlock(&mutex);
828
829         return result;
830 }
831
832 #if defined(REMOVE_SYSTEMD_EVENT)
833 /**
834  * Gets the fdev_epoll item.
835  * @return a fdev_epoll or NULL in case of error
836  */
837 struct fdev_epoll *jobs_get_fdev_epoll()
838 {
839         struct fdev_epoll *result;
840
841         pthread_mutex_lock(&mutex);
842         result = get_fdevepoll();
843         pthread_mutex_unlock(&mutex);
844
845         return result;
846 }
847 #endif
848
849 /**
850  * Enter the jobs processing loop.
851  * @param allowed_count Maximum count of thread for jobs including this one
852  * @param start_count   Count of thread to start now, must be lower.
853  * @param waiter_count  Maximum count of jobs that can be waiting.
854  * @param start         The start routine to activate (can't be NULL)
855  * @return 0 in case of success or -1 in case of error.
856  */
857 int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)(int signum, void* arg), void *arg)
858 {
859         int rc, launched;
860         struct thread me;
861         struct job *job;
862
863         assert(allowed_count >= 1);
864         assert(start_count >= 0);
865         assert(waiter_count > 0);
866         assert(start_count <= allowed_count);
867
868         rc = -1;
869         pthread_mutex_lock(&mutex);
870
871         /* check whether already running */
872         if (current_thread || allowed) {
873                 ERROR("thread already started");
874                 errno = EINVAL;
875                 goto error;
876         }
877
878         /* start */
879         if (sig_monitor_init() < 0) {
880                 ERROR("failed to initialise signal handlers");
881                 goto error;
882         }
883
884         /* records the allowed count */
885         allowed = allowed_count;
886         started = 0;
887         running = 0;
888         remains = waiter_count;
889
890 #if HAS_WATCHDOG
891         /* set the watchdog */
892         if (sd_watchdog_enabled(0, NULL))
893                 sd_event_set_watchdog(get_sd_event_locked(), 1);
894 #endif
895
896         /* start at least one thread */
897         launched = 0;
898         while ((launched + 1) < start_count) {
899                 if (start_one_thread() != 0) {
900                         ERROR("Not all threads can be started");
901                         goto error;
902                 }
903                 launched++;
904         }
905
906         /* queue the start job */
907         job = job_create(NULL, 0, start, arg);
908         if (!job) {
909                 ERROR("out of memory");
910                 errno = ENOMEM;
911                 goto error;
912         }
913         job_add(job);
914         remains--;
915
916         /* run until end */
917         thread_run(&me);
918         rc = 0;
919 error:
920         pthread_mutex_unlock(&mutex);
921         return rc;
922 }
923
924 /**
925  * Terminate all the threads and cancel all pending jobs.
926  */
927 void jobs_terminate()
928 {
929         struct job *job, *head, *tail;
930         pthread_t me, *others;
931         struct thread *t;
932         int count;
933
934         /* how am i? */
935         me = pthread_self();
936
937         /* request all threads to stop */
938         pthread_mutex_lock(&mutex);
939         allowed = 0;
940
941         /* count the number of threads */
942         count = 0;
943         t = threads;
944         while (t) {
945                 if (!t->upper && !pthread_equal(t->tid, me))
946                         count++;
947                 t = t->next;
948         }
949
950         /* fill the array of threads */
951         others = alloca(count * sizeof *others);
952         count = 0;
953         t = threads;
954         while (t) {
955                 if (!t->upper && !pthread_equal(t->tid, me))
956                         others[count++] = t->tid;
957                 t = t->next;
958         }
959
960         /* stops the threads */
961         t = threads;
962         while (t) {
963                 t->stop = 1;
964                 t = t->next;
965         }
966
967         /* wait the threads */
968         pthread_cond_broadcast(&cond);
969         pthread_mutex_unlock(&mutex);
970         while (count)
971                 pthread_join(others[--count], NULL);
972         pthread_mutex_lock(&mutex);
973
974         /* cancel pending jobs of other threads */
975         remains = 0;
976         head = first_job;
977         first_job = NULL;
978         tail = NULL;
979         while (head) {
980                 /* unlink the job */
981                 job = head;
982                 head = job->next;
983
984                 /* search if job is stacked for current */
985                 t = current_thread;
986                 while (t && t->job != job)
987                         t = t->upper;
988                 if (t) {
989                         /* yes, relink it at end */
990                         if (tail)
991                                 tail->next = job;
992                         else
993                                 first_job = job;
994                         tail = job;
995                         job->next = NULL;
996                 } else {
997                         /* no cancel the job */
998                         pthread_mutex_unlock(&mutex);
999                         sig_monitor(0, job_cancel, job);
1000                         free(job);
1001                         pthread_mutex_lock(&mutex);
1002                 }
1003         }
1004         pthread_mutex_unlock(&mutex);
1005 }
1006