Update copyright date
[src/app-framework-binder.git] / src / jobs.c
1 /*
2  * Copyright (C) 2016-2019 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #if defined(NO_JOBS_WATCHDOG)
21 #   define HAS_WATCHDOG 0
22 #else
23 #   define HAS_WATCHDOG 1
24 #endif
25
26 #include <stdlib.h>
27 #include <stdint.h>
28 #include <unistd.h>
29 #include <signal.h>
30 #include <string.h>
31 #include <time.h>
32 #include <sys/syscall.h>
33 #include <pthread.h>
34 #include <errno.h>
35 #include <assert.h>
36 #include <sys/eventfd.h>
37
38 #include <systemd/sd-event.h>
39 #include "fdev.h"
40 #if HAS_WATCHDOG
41 #include <systemd/sd-daemon.h>
42 #endif
43
44 #include "jobs.h"
45 #include "sig-monitor.h"
46 #include "verbose.h"
47
48 #define EVENT_TIMEOUT_TOP       ((uint64_t)-1)
49 #define EVENT_TIMEOUT_CHILD     ((uint64_t)10000)
50
51 struct thread;
52
53 /** Internal shortcut for callback */
54 typedef void (*job_cb_t)(int, void*);
55
56 /** Description of a pending job */
57 struct job
58 {
59         struct job *next;    /**< link to the next job enqueued */
60         const void *group;   /**< group of the request */
61         job_cb_t callback;   /**< processing callback */
62         void *arg;           /**< argument */
63         int timeout;         /**< timeout in second for processing the request */
64         unsigned blocked: 1; /**< is an other request blocking this one ? */
65         unsigned dropped: 1; /**< is removed ? */
66 };
67
68 /** Description of handled event loops */
69 struct evloop
70 {
71         unsigned state;        /**< encoded state */
72         int efd;               /**< event notification */
73         struct sd_event *sdev; /**< the systemd event loop */
74         struct fdev *fdev;     /**< handling of events */
75         struct thread *holder; /**< holder of the evloop */
76 };
77
78 #define EVLOOP_STATE_WAIT           1U
79 #define EVLOOP_STATE_RUN            2U
80
81 /** Description of threads */
82 struct thread
83 {
84         struct thread *next;   /**< next thread of the list */
85         struct thread *upper;  /**< upper same thread */
86         struct thread *nholder;/**< next holder for evloop */
87         pthread_cond_t *cwhold;/**< condition wait for holding */
88         struct job *job;       /**< currently processed job */
89         pthread_t tid;         /**< the thread id */
90         volatile unsigned stop: 1;      /**< stop requested */
91         volatile unsigned waits: 1;     /**< is waiting? */
92 };
93
94 /**
95  * Description of synchronous callback
96  */
97 struct sync
98 {
99         struct thread thread;   /**< thread loop data */
100         union {
101                 void (*callback)(int, void*);   /**< the synchronous callback */
102                 void (*enter)(int signum, void *closure, struct jobloop *jobloop);
103                                 /**< the entering synchronous routine */
104         };
105         void *arg;              /**< the argument of the callback */
106 };
107
108
109 /* synchronisation of threads */
110 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
111 static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
112
113 /* count allowed, started and running threads */
114 static int allowed = 0; /** allowed count of threads */
115 static int started = 0; /** started count of threads */
116 static int running = 0; /** running count of threads */
117 static int remains = 0; /** allowed count of waiting jobs */
118
119 /* list of threads */
120 static struct thread *threads;
121 static _Thread_local struct thread *current_thread;
122
123 /* queue of pending jobs */
124 static struct job *first_job;
125 static struct job *free_jobs;
126
127 /* event loop */
128 static struct evloop evloop;
129
130 /**
131  * Create a new job with the given parameters
132  * @param group    the group of the job
133  * @param timeout  the timeout of the job (0 if none)
134  * @param callback the function that achieves the job
135  * @param arg      the argument of the callback
136  * @return the created job unblock or NULL when no more memory
137  */
138 static struct job *job_create(
139                 const void *group,
140                 int timeout,
141                 job_cb_t callback,
142                 void *arg)
143 {
144         struct job *job;
145
146         /* try recyle existing job */
147         job = free_jobs;
148         if (job)
149                 free_jobs = job->next;
150         else {
151                 /* allocation without blocking */
152                 pthread_mutex_unlock(&mutex);
153                 job = malloc(sizeof *job);
154                 pthread_mutex_lock(&mutex);
155                 if (!job) {
156                         ERROR("out of memory");
157                         errno = ENOMEM;
158                         goto end;
159                 }
160         }
161         /* initialises the job */
162         job->group = group;
163         job->timeout = timeout;
164         job->callback = callback;
165         job->arg = arg;
166         job->blocked = 0;
167         job->dropped = 0;
168 end:
169         return job;
170 }
171
172 /**
173  * Adds 'job' at the end of the list of jobs, marking it
174  * as blocked if an other job with the same group is pending.
175  * @param job the job to add
176  */
177 static void job_add(struct job *job)
178 {
179         const void *group;
180         struct job *ijob, **pjob;
181
182         /* prepare to add */
183         group = job->group;
184         job->next = NULL;
185
186         /* search end and blockers */
187         pjob = &first_job;
188         ijob = first_job;
189         while (ijob) {
190                 if (group && ijob->group == group)
191                         job->blocked = 1;
192                 pjob = &ijob->next;
193                 ijob = ijob->next;
194         }
195
196         /* queue the jobs */
197         *pjob = job;
198         remains--;
199 }
200
201 /**
202  * Get the next job to process or NULL if none.
203  * @return the first job that isn't blocked or NULL
204  */
205 static inline struct job *job_get()
206 {
207         struct job *job = first_job;
208         while (job && job->blocked)
209                 job = job->next;
210         if (job)
211                 remains++;
212         return job;
213 }
214
215 /**
216  * Releases the processed 'job': removes it
217  * from the list of jobs and unblock the first
218  * pending job of the same group if any.
219  * @param job the job to release
220  */
221 static inline void job_release(struct job *job)
222 {
223         struct job *ijob, **pjob;
224         const void *group;
225
226         /* first unqueue the job */
227         pjob = &first_job;
228         ijob = first_job;
229         while (ijob != job) {
230                 pjob = &ijob->next;
231                 ijob = ijob->next;
232         }
233         *pjob = job->next;
234
235         /* then unblock jobs of the same group */
236         group = job->group;
237         if (group) {
238                 ijob = job->next;
239                 while (ijob && ijob->group != group)
240                         ijob = ijob->next;
241                 if (ijob)
242                         ijob->blocked = 0;
243         }
244
245         /* recycle the job */
246         job->next = free_jobs;
247         free_jobs = job;
248 }
249
250 /**
251  * Monitored cancel callback for a job.
252  * This function is called by the monitor
253  * to cancel the job when the safe environment
254  * is set.
255  * @param signum 0 on normal flow or the number
256  *               of the signal that interrupted the normal
257  *               flow, isn't used
258  * @param arg    the job to run
259  */
260 static void job_cancel(int signum, void *arg)
261 {
262         struct job *job = arg;
263         job->callback(SIGABRT, job->arg);
264 }
265
266 /**
267  * Monitored normal callback for events.
268  * This function is called by the monitor
269  * to run the event loop when the safe environment
270  * is set.
271  * @param signum 0 on normal flow or the number
272  *               of the signal that interrupted the normal
273  *               flow
274  * @param arg     the events to run
275  */
276 static void evloop_run(int signum, void *arg)
277 {
278         int rc;
279         struct sd_event *se;
280
281         if (!signum) {
282                 se = evloop.sdev;
283                 rc = sd_event_prepare(se);
284                 if (rc < 0) {
285                         errno = -rc;
286                         CRITICAL("sd_event_prepare returned an error (state: %d): %m", sd_event_get_state(se));
287                         abort();
288                 } else {
289                         if (rc == 0) {
290                                 rc = sd_event_wait(se, (uint64_t)(int64_t)-1);
291                                 if (rc < 0) {
292                                         errno = -rc;
293                                         ERROR("sd_event_wait returned an error (state: %d): %m", sd_event_get_state(se));
294                                 }
295                         }
296                         evloop.state = EVLOOP_STATE_RUN;
297                         if (rc > 0) {
298                                 rc = sd_event_dispatch(se);
299                                 if (rc < 0) {
300                                         errno = -rc;
301                                         ERROR("sd_event_dispatch returned an error (state: %d): %m", sd_event_get_state(se));
302                                 }
303                         }
304                 }
305         }
306 }
307
308 /**
309  * Internal callback for evloop management.
310  * The effect of this function is hidden: it exits
311  * the waiting poll if any.
312  */
313 static void evloop_on_efd_event()
314 {
315         uint64_t x;
316         read(evloop.efd, &x, sizeof x);
317 }
318
319 /**
320  * wakeup the event loop if needed by sending
321  * an event.
322  */
323 static void evloop_wakeup()
324 {
325         uint64_t x;
326
327         if (evloop.state & EVLOOP_STATE_WAIT) {
328                 x = 1;
329                 write(evloop.efd, &x, sizeof x);
330         }
331 }
332
333 /**
334  * Release the currently held event loop
335  */
336 static void evloop_release()
337 {
338         struct thread *nh, *ct = current_thread;
339
340         if (ct && evloop.holder == ct) {
341                 nh = ct->nholder;
342                 evloop.holder = nh;
343                 if (nh)
344                         pthread_cond_signal(nh->cwhold);
345         }
346 }
347
348 /**
349  * get the eventloop for the current thread
350  */
351 static int evloop_get()
352 {
353         struct thread *ct = current_thread;
354
355         if (evloop.holder)
356                 return evloop.holder == ct;
357
358         ct->nholder = NULL;
359         evloop.holder = ct;
360         return 1;
361 }
362
363 /**
364  * acquire the eventloop for the current thread
365  */
366 static void evloop_acquire()
367 {
368         struct thread **pwait, *ct;
369         pthread_cond_t cond;
370
371         /* try to get the evloop */
372         if (!evloop_get()) {
373                 /* failed, init waiting state */
374                 ct = current_thread;
375                 ct->nholder = NULL;
376                 ct->cwhold = &cond;
377                 pthread_cond_init(&cond, NULL);
378
379                 /* queue current thread in holder list */
380                 pwait = &evloop.holder;
381                 while (*pwait)
382                         pwait = &(*pwait)->nholder;
383                 *pwait = ct;
384
385                 /* wake up the evloop */
386                 evloop_wakeup();
387
388                 /* wait to acquire the evloop */
389                 pthread_cond_wait(&cond, &mutex);
390                 pthread_cond_destroy(&cond);
391         }
392 }
393
394 /**
395  * Enter the thread
396  * @param me the description of the thread to enter
397  */
398 static void thread_enter(volatile struct thread *me)
399 {
400         evloop_release();
401         /* initialize description of itself and link it in the list */
402         me->tid = pthread_self();
403         me->stop = 0;
404         me->waits = 0;
405         me->upper = current_thread;
406         me->next = threads;
407         threads = (struct thread*)me;
408         current_thread = (struct thread*)me;
409 }
410
411 /**
412  * leave the thread
413  * @param me the description of the thread to leave
414  */
415 static void thread_leave()
416 {
417         struct thread **prv, *me;
418
419         /* unlink the current thread and cleanup */
420         me = current_thread;
421         prv = &threads;
422         while (*prv != me)
423                 prv = &(*prv)->next;
424         *prv = me->next;
425
426         current_thread = me->upper;
427 }
428
429 /**
430  * Main processing loop of internal threads with processing jobs.
431  * The loop must be called with the mutex locked
432  * and it returns with the mutex locked.
433  * @param me the description of the thread to use
434  * TODO: how are timeout handled when reentering?
435  */
436 static void thread_run_internal(volatile struct thread *me)
437 {
438         struct job *job;
439
440         /* enter thread */
441         thread_enter(me);
442
443         /* loop until stopped */
444         while (!me->stop) {
445                 /* release the current event loop */
446                 evloop_release();
447
448                 /* get a job */
449                 job = job_get();
450                 if (job) {
451                         /* prepare running the job */
452                         job->blocked = 1; /* mark job as blocked */
453                         me->job = job; /* record the job (only for terminate) */
454
455                         /* run the job */
456                         pthread_mutex_unlock(&mutex);
457                         sig_monitor(job->timeout, job->callback, job->arg);
458                         pthread_mutex_lock(&mutex);
459
460                         /* release the run job */
461                         job_release(job);
462                 /* no job, check event loop wait */
463                 } else if (evloop_get()) {
464                         if (evloop.state != 0) {
465                                 /* busy ? */
466                                 CRITICAL("Can't enter dispatch while in dispatch!");
467                                 abort();
468                         }
469                         /* run the events */
470                         evloop.state = EVLOOP_STATE_RUN|EVLOOP_STATE_WAIT;
471                         pthread_mutex_unlock(&mutex);
472                         sig_monitor(0, evloop_run, NULL);
473                         pthread_mutex_lock(&mutex);
474                         evloop.state = 0;
475                 } else {
476                         /* no job and no event loop */
477                         running--;
478                         if (!running)
479                                 ERROR("Entering job deep sleep! Check your bindings.");
480                         me->waits = 1;
481                         pthread_cond_wait(&cond, &mutex);
482                         me->waits = 0;
483                         running++;
484                 }
485         }
486         /* cleanup */
487         evloop_release();
488         thread_leave();
489 }
490
491 /**
492  * Main processing loop of external threads.
493  * The loop must be called with the mutex locked
494  * and it returns with the mutex locked.
495  * @param me the description of the thread to use
496  */
497 static void thread_run_external(volatile struct thread *me)
498 {
499         /* enter thread */
500         thread_enter(me);
501
502         /* loop until stopped */
503         me->waits = 1;
504         while (!me->stop)
505                 pthread_cond_wait(&cond, &mutex);
506         me->waits = 0;
507         thread_leave();
508 }
509
510 /**
511  * Root for created threads.
512  */
513 static void thread_main()
514 {
515         struct thread me;
516
517         running++;
518         started++;
519         sig_monitor_init_timeouts();
520         thread_run_internal(&me);
521         sig_monitor_clean_timeouts();
522         started--;
523         running--;
524 }
525
526 /**
527  * Entry point for created threads.
528  * @param data not used
529  * @return NULL
530  */
531 static void *thread_starter(void *data)
532 {
533         pthread_mutex_lock(&mutex);
534         thread_main();
535         pthread_mutex_unlock(&mutex);
536         return NULL;
537 }
538
539 /**
540  * Starts a new thread
541  * @return 0 in case of success or -1 in case of error
542  */
543 static int start_one_thread()
544 {
545         pthread_t tid;
546         int rc;
547
548         rc = pthread_create(&tid, NULL, thread_starter, NULL);
549         if (rc != 0) {
550                 /* errno = rc; */
551                 WARNING("not able to start thread: %m");
552                 rc = -1;
553         }
554         return rc;
555 }
556
557 /**
558  * Queues a new asynchronous job represented by 'callback' and 'arg'
559  * for the 'group' and the 'timeout'.
560  * Jobs are queued FIFO and are possibly executed in parallel
561  * concurrently except for job of the same group that are
562  * executed sequentially in FIFO order.
563  * @param group    The group of the job or NULL when no group.
564  * @param timeout  The maximum execution time in seconds of the job
565  *                 or 0 for unlimited time.
566  * @param callback The function to execute for achieving the job.
567  *                 Its first parameter is either 0 on normal flow
568  *                 or the signal number that broke the normal flow.
569  *                 The remaining parameter is the parameter 'arg1'
570  *                 given here.
571  * @param arg      The second argument for 'callback'
572  * @return 0 in case of success or -1 in case of error
573  */
574 int jobs_queue(
575                 const void *group,
576                 int timeout,
577                 void (*callback)(int, void*),
578                 void *arg)
579 {
580         struct job *job;
581         int rc;
582
583         pthread_mutex_lock(&mutex);
584
585         /* allocates the job */
586         job = job_create(group, timeout, callback, arg);
587         if (!job)
588                 goto error;
589
590         /* check availability */
591         if (remains <= 0) {
592                 ERROR("can't process job with threads: too many jobs");
593                 errno = EBUSY;
594                 goto error2;
595         }
596
597         /* start a thread if needed */
598         if (running == started && started < allowed) {
599                 /* all threads are busy and a new can be started */
600                 rc = start_one_thread();
601                 if (rc < 0 && started == 0) {
602                         ERROR("can't start initial thread: %m");
603                         goto error2;
604                 }
605         }
606
607         /* queues the job */
608         job_add(job);
609
610         /* signal an existing job */
611         pthread_cond_signal(&cond);
612         pthread_mutex_unlock(&mutex);
613         return 0;
614
615 error2:
616         job->next = free_jobs;
617         free_jobs = job;
618 error:
619         pthread_mutex_unlock(&mutex);
620         return -1;
621 }
622
623 /**
624  * Internal helper function for 'jobs_enter'.
625  * @see jobs_enter, jobs_leave
626  */
627 static void enter_cb(int signum, void *closure)
628 {
629         struct sync *sync = closure;
630         sync->enter(signum, sync->arg, (void*)&sync->thread);
631 }
632
633 /**
634  * Internal helper function for 'jobs_call'.
635  * @see jobs_call
636  */
637 static void call_cb(int signum, void *closure)
638 {
639         struct sync *sync = closure;
640         sync->callback(signum, sync->arg);
641         jobs_leave((void*)&sync->thread);
642 }
643
644 /**
645  * Internal helper for synchronous jobs. It enters
646  * a new thread loop for evaluating the given job
647  * as recorded by the couple 'sync_cb' and 'sync'.
648  * @see jobs_call, jobs_enter, jobs_leave
649  */
650 static int do_sync(
651                 const void *group,
652                 int timeout,
653                 void (*sync_cb)(int signum, void *closure),
654                 struct sync *sync
655 )
656 {
657         struct job *job;
658
659         pthread_mutex_lock(&mutex);
660
661         /* allocates the job */
662         job = job_create(group, timeout, sync_cb, sync);
663         if (!job) {
664                 pthread_mutex_unlock(&mutex);
665                 return -1;
666         }
667
668         /* queues the job */
669         job_add(job);
670
671         /* run until stopped */
672         if (current_thread)
673                 thread_run_internal(&sync->thread);
674         else
675                 thread_run_external(&sync->thread);
676         pthread_mutex_unlock(&mutex);
677         return 0;
678 }
679
680 /**
681  * Enter a synchronisation point: activates the job given by 'callback'
682  * and 'closure' using 'group' and 'timeout' to control sequencing and
683  * execution time.
684  * @param group the group for sequencing jobs
685  * @param timeout the time in seconds allocated to the job
686  * @param callback the callback that will handle the job.
687  *                 it receives 3 parameters: 'signum' that will be 0
688  *                 on normal flow or the catched signal number in case
689  *                 of interrupted flow, the context 'closure' as given and
690  *                 a 'jobloop' reference that must be used when the job is
691  *                 terminated to unlock the current execution flow.
692  * @param closure the argument to the callback
693  * @return 0 on success or -1 in case of error
694  */
695 int jobs_enter(
696                 const void *group,
697                 int timeout,
698                 void (*callback)(int signum, void *closure, struct jobloop *jobloop),
699                 void *closure
700 )
701 {
702         struct sync sync;
703
704         sync.enter = callback;
705         sync.arg = closure;
706         return do_sync(group, timeout, enter_cb, &sync);
707 }
708
709 /**
710  * Unlocks the execution flow designed by 'jobloop'.
711  * @param jobloop indication of the flow to unlock
712  * @return 0 in case of success of -1 on error
713  */
714 int jobs_leave(struct jobloop *jobloop)
715 {
716         struct thread *t;
717
718         pthread_mutex_lock(&mutex);
719         t = threads;
720         while (t && t != (struct thread*)jobloop)
721                 t = t->next;
722         if (!t) {
723                 errno = EINVAL;
724         } else {
725                 t->stop = 1;
726                 if (t->waits)
727                         pthread_cond_broadcast(&cond);
728                 else
729                         evloop_wakeup();
730         }
731         pthread_mutex_unlock(&mutex);
732         return -!t;
733 }
734
735 /**
736  * Calls synchronously the job represented by 'callback' and 'arg1'
737  * for the 'group' and the 'timeout' and waits for its completion.
738  * @param group    The group of the job or NULL when no group.
739  * @param timeout  The maximum execution time in seconds of the job
740  *                 or 0 for unlimited time.
741  * @param callback The function to execute for achieving the job.
742  *                 Its first parameter is either 0 on normal flow
743  *                 or the signal number that broke the normal flow.
744  *                 The remaining parameter is the parameter 'arg1'
745  *                 given here.
746  * @param arg      The second argument for 'callback'
747  * @return 0 in case of success or -1 in case of error
748  */
749 int jobs_call(
750                 const void *group,
751                 int timeout,
752                 void (*callback)(int, void*),
753                 void *arg)
754 {
755         struct sync sync;
756
757         sync.callback = callback;
758         sync.arg = arg;
759
760         return do_sync(group, timeout, call_cb, &sync);
761 }
762
763 /**
764  * Internal callback for evloop management.
765  * The effect of this function is hidden: it exits
766  * the waiting poll if any. Then it wakes up a thread
767  * awaiting the evloop using signal.
768  */
769 static int on_evloop_efd(sd_event_source *s, int fd, uint32_t revents, void *userdata)
770 {
771         evloop_on_efd_event();
772         return 1;
773 }
774
775 /**
776  * Gets a sd_event item for the current thread.
777  * @return a sd_event or NULL in case of error
778  */
779 static struct sd_event *get_sd_event_locked()
780 {
781         int rc;
782
783         /* creates the evloop on need */
784         if (!evloop.sdev) {
785                 /* start the creation */
786                 evloop.state = 0;
787                 /* creates the eventfd for waking up polls */
788                 evloop.efd = eventfd(0, EFD_CLOEXEC|EFD_SEMAPHORE);
789                 if (evloop.efd < 0) {
790                         ERROR("can't make eventfd for events");
791                         goto error1;
792                 }
793                 /* create the systemd event loop */
794                 rc = sd_event_new(&evloop.sdev);
795                 if (rc < 0) {
796                         ERROR("can't make new event loop");
797                         goto error2;
798                 }
799                 /* put the eventfd in the event loop */
800                 rc = sd_event_add_io(evloop.sdev, NULL, evloop.efd, EPOLLIN, on_evloop_efd, NULL);
801                 if (rc < 0) {
802                         ERROR("can't register eventfd");
803                         sd_event_unref(evloop.sdev);
804                         evloop.sdev = NULL;
805 error2:
806                         close(evloop.efd);
807 error1:
808                         return NULL;
809                 }
810         }
811
812         /* acquire the event loop */
813         evloop_acquire();
814
815         return evloop.sdev;
816 }
817
818 /**
819  * Gets a sd_event item for the current thread.
820  * @return a sd_event or NULL in case of error
821  */
822 struct sd_event *jobs_get_sd_event()
823 {
824         struct sd_event *result;
825         struct thread lt;
826
827         /* ensure an existing thread environment */
828         if (!current_thread) {
829                 memset(&lt, 0, sizeof lt);
830                 current_thread = &lt;
831         }
832
833         /* process */
834         pthread_mutex_lock(&mutex);
835         result = get_sd_event_locked();
836         pthread_mutex_unlock(&mutex);
837
838         /* release the faked thread environment if needed */
839         if (current_thread == &lt) {
840                 /*
841                  * Releasing it is needed because there is no way to guess
842                  * when it has to be released really. But here is where it is
843                  * hazardous: if the caller modifies the eventloop when it
844                  * is waiting, there is no way to make the change effective.
845                  * A workaround to achieve that goal is for the caller to
846                  * require the event loop a second time after having modified it.
847                  */
848                 NOTICE("Requiring sd_event loop out of binder callbacks is hazardous!");
849                 if (verbose_wants(Log_Level_Info))
850                         sig_monitor_dumpstack();
851                 evloop_release();
852                 current_thread = NULL;
853         }
854
855         return result;
856 }
857
858 /**
859  * Enter the jobs processing loop.
860  * @param allowed_count Maximum count of thread for jobs including this one
861  * @param start_count   Count of thread to start now, must be lower.
862  * @param waiter_count  Maximum count of jobs that can be waiting.
863  * @param start         The start routine to activate (can't be NULL)
864  * @return 0 in case of success or -1 in case of error.
865  */
866 int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)(int signum, void* arg), void *arg)
867 {
868         int rc, launched;
869         struct job *job;
870
871         assert(allowed_count >= 1);
872         assert(start_count >= 0);
873         assert(waiter_count > 0);
874         assert(start_count <= allowed_count);
875
876         rc = -1;
877         pthread_mutex_lock(&mutex);
878
879         /* check whether already running */
880         if (current_thread || allowed) {
881                 ERROR("thread already started");
882                 errno = EINVAL;
883                 goto error;
884         }
885
886         /* records the allowed count */
887         allowed = allowed_count;
888         started = 0;
889         running = 0;
890         remains = waiter_count;
891
892 #if HAS_WATCHDOG
893         /* set the watchdog */
894         if (sd_watchdog_enabled(0, NULL))
895                 sd_event_set_watchdog(get_sd_event_locked(), 1);
896 #endif
897
898         /* start at least one thread: the current one */
899         launched = 1;
900         while (launched < start_count) {
901                 if (start_one_thread() != 0) {
902                         ERROR("Not all threads can be started");
903                         goto error;
904                 }
905                 launched++;
906         }
907
908         /* queue the start job */
909         job = job_create(NULL, 0, start, arg);
910         if (!job)
911                 goto error;
912         job_add(job);
913
914         /* run until end */
915         thread_main();
916         rc = 0;
917 error:
918         pthread_mutex_unlock(&mutex);
919         return rc;
920 }
921
922 /**
923  * Terminate all the threads and cancel all pending jobs.
924  */
925 void jobs_terminate()
926 {
927         struct job *job, *head, *tail;
928         pthread_t me, *others;
929         struct thread *t;
930         int count;
931
932         /* how am i? */
933         me = pthread_self();
934
935         /* request all threads to stop */
936         pthread_mutex_lock(&mutex);
937         allowed = 0;
938
939         /* count the number of threads */
940         count = 0;
941         t = threads;
942         while (t) {
943                 if (!t->upper && !pthread_equal(t->tid, me))
944                         count++;
945                 t = t->next;
946         }
947
948         /* fill the array of threads */
949         others = alloca(count * sizeof *others);
950         count = 0;
951         t = threads;
952         while (t) {
953                 if (!t->upper && !pthread_equal(t->tid, me))
954                         others[count++] = t->tid;
955                 t = t->next;
956         }
957
958         /* stops the threads */
959         t = threads;
960         while (t) {
961                 t->stop = 1;
962                 t = t->next;
963         }
964
965         /* wait the threads */
966         pthread_cond_broadcast(&cond);
967         pthread_mutex_unlock(&mutex);
968         while (count)
969                 pthread_join(others[--count], NULL);
970         pthread_mutex_lock(&mutex);
971
972         /* cancel pending jobs of other threads */
973         remains = 0;
974         head = first_job;
975         first_job = NULL;
976         tail = NULL;
977         while (head) {
978                 /* unlink the job */
979                 job = head;
980                 head = job->next;
981
982                 /* search if job is stacked for current */
983                 t = current_thread;
984                 while (t && t->job != job)
985                         t = t->upper;
986                 if (t) {
987                         /* yes, relink it at end */
988                         if (tail)
989                                 tail->next = job;
990                         else
991                                 first_job = job;
992                         tail = job;
993                         job->next = NULL;
994                 } else {
995                         /* no cancel the job */
996                         pthread_mutex_unlock(&mutex);
997                         sig_monitor(0, job_cancel, job);
998                         free(job);
999                         pthread_mutex_lock(&mutex);
1000                 }
1001         }
1002         pthread_mutex_unlock(&mutex);
1003 }
1004