system & jobs: Reverse link and acquiring events
[src/app-framework-binder.git] / src / jobs.c
1 /*
2  * Copyright (C) 2016-2019 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #include <stdlib.h>
21 #include <stdint.h>
22 #include <unistd.h>
23 #include <signal.h>
24 #include <string.h>
25 #include <time.h>
26 #include <sys/syscall.h>
27 #include <pthread.h>
28 #include <errno.h>
29 #include <assert.h>
30 #include <sys/eventfd.h>
31
32 #include <systemd/sd-event.h>
33
34 #include "jobs.h"
35 #include "sig-monitor.h"
36 #include "verbose.h"
37 #include "systemd.h"
38
39 #define EVENT_TIMEOUT_TOP       ((uint64_t)-1)
40 #define EVENT_TIMEOUT_CHILD     ((uint64_t)10000)
41
42 struct thread;
43
44 /** Internal shortcut for callback */
45 typedef void (*job_cb_t)(int, void*);
46
47 /** Description of a pending job */
48 struct job
49 {
50         struct job *next;    /**< link to the next job enqueued */
51         const void *group;   /**< group of the request */
52         job_cb_t callback;   /**< processing callback */
53         void *arg;           /**< argument */
54         int timeout;         /**< timeout in second for processing the request */
55         unsigned blocked: 1; /**< is an other request blocking this one ? */
56         unsigned dropped: 1; /**< is removed ? */
57 };
58
59 /** Description of handled event loops */
60 struct evloop
61 {
62         unsigned state;        /**< encoded state */
63         int efd;               /**< event notification */
64         struct sd_event *sdev; /**< the systemd event loop */
65         struct thread *holder; /**< holder of the evloop */
66 };
67
68 #define EVLOOP_STATE_WAIT           1U
69 #define EVLOOP_STATE_RUN            2U
70
71 /** Description of threads */
72 struct thread
73 {
74         struct thread *next;   /**< next thread of the list */
75         struct thread *upper;  /**< upper same thread */
76         struct thread *nholder;/**< next holder for evloop */
77         pthread_cond_t *cwhold;/**< condition wait for holding */
78         struct job *job;       /**< currently processed job */
79         pthread_t tid;         /**< the thread id */
80         volatile unsigned stop: 1;      /**< stop requested */
81         volatile unsigned waits: 1;     /**< is waiting? */
82 };
83
84 /**
85  * Description of synchronous callback
86  */
87 struct sync
88 {
89         struct thread thread;   /**< thread loop data */
90         union {
91                 void (*callback)(int, void*);   /**< the synchronous callback */
92                 void (*enter)(int signum, void *closure, struct jobloop *jobloop);
93                                 /**< the entering synchronous routine */
94         };
95         void *arg;              /**< the argument of the callback */
96 };
97
98
99 /* synchronisation of threads */
100 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
101 static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
102
103 /* count allowed, started and running threads */
104 static int allowed = 0; /** allowed count of threads */
105 static int started = 0; /** started count of threads */
106 static int running = 0; /** running count of threads */
107 static int remains = 0; /** allowed count of waiting jobs */
108
109 /* list of threads */
110 static struct thread *threads;
111 static _Thread_local struct thread *current_thread;
112
113 /* queue of pending jobs */
114 static struct job *first_job;
115 static struct job *free_jobs;
116
117 /* event loop */
118 static struct evloop evloop;
119
120 /**
121  * Create a new job with the given parameters
122  * @param group    the group of the job
123  * @param timeout  the timeout of the job (0 if none)
124  * @param callback the function that achieves the job
125  * @param arg      the argument of the callback
126  * @return the created job unblock or NULL when no more memory
127  */
128 static struct job *job_create(
129                 const void *group,
130                 int timeout,
131                 job_cb_t callback,
132                 void *arg)
133 {
134         struct job *job;
135
136         /* try recyle existing job */
137         job = free_jobs;
138         if (job)
139                 free_jobs = job->next;
140         else {
141                 /* allocation without blocking */
142                 pthread_mutex_unlock(&mutex);
143                 job = malloc(sizeof *job);
144                 pthread_mutex_lock(&mutex);
145                 if (!job) {
146                         ERROR("out of memory");
147                         errno = ENOMEM;
148                         goto end;
149                 }
150         }
151         /* initialises the job */
152         job->group = group;
153         job->timeout = timeout;
154         job->callback = callback;
155         job->arg = arg;
156         job->blocked = 0;
157         job->dropped = 0;
158 end:
159         return job;
160 }
161
162 /**
163  * Adds 'job' at the end of the list of jobs, marking it
164  * as blocked if an other job with the same group is pending.
165  * @param job the job to add
166  */
167 static void job_add(struct job *job)
168 {
169         const void *group;
170         struct job *ijob, **pjob;
171
172         /* prepare to add */
173         group = job->group;
174         job->next = NULL;
175
176         /* search end and blockers */
177         pjob = &first_job;
178         ijob = first_job;
179         while (ijob) {
180                 if (group && ijob->group == group)
181                         job->blocked = 1;
182                 pjob = &ijob->next;
183                 ijob = ijob->next;
184         }
185
186         /* queue the jobs */
187         *pjob = job;
188         remains--;
189 }
190
191 /**
192  * Get the next job to process or NULL if none.
193  * @return the first job that isn't blocked or NULL
194  */
195 static inline struct job *job_get()
196 {
197         struct job *job = first_job;
198         while (job && job->blocked)
199                 job = job->next;
200         if (job)
201                 remains++;
202         return job;
203 }
204
205 /**
206  * Releases the processed 'job': removes it
207  * from the list of jobs and unblock the first
208  * pending job of the same group if any.
209  * @param job the job to release
210  */
211 static inline void job_release(struct job *job)
212 {
213         struct job *ijob, **pjob;
214         const void *group;
215
216         /* first unqueue the job */
217         pjob = &first_job;
218         ijob = first_job;
219         while (ijob != job) {
220                 pjob = &ijob->next;
221                 ijob = ijob->next;
222         }
223         *pjob = job->next;
224
225         /* then unblock jobs of the same group */
226         group = job->group;
227         if (group) {
228                 ijob = job->next;
229                 while (ijob && ijob->group != group)
230                         ijob = ijob->next;
231                 if (ijob)
232                         ijob->blocked = 0;
233         }
234
235         /* recycle the job */
236         job->next = free_jobs;
237         free_jobs = job;
238 }
239
240 /**
241  * Monitored cancel callback for a job.
242  * This function is called by the monitor
243  * to cancel the job when the safe environment
244  * is set.
245  * @param signum 0 on normal flow or the number
246  *               of the signal that interrupted the normal
247  *               flow, isn't used
248  * @param arg    the job to run
249  */
250 static void job_cancel(int signum, void *arg)
251 {
252         struct job *job = arg;
253         job->callback(SIGABRT, job->arg);
254 }
255
256 /**
257  * Monitored normal callback for events.
258  * This function is called by the monitor
259  * to run the event loop when the safe environment
260  * is set.
261  * @param signum 0 on normal flow or the number
262  *               of the signal that interrupted the normal
263  *               flow
264  * @param arg     the events to run
265  */
266 static void evloop_run(int signum, void *arg)
267 {
268         int rc;
269         struct sd_event *se;
270
271         if (!signum) {
272                 se = evloop.sdev;
273                 rc = sd_event_prepare(se);
274                 if (rc < 0) {
275                         errno = -rc;
276                         CRITICAL("sd_event_prepare returned an error (state: %d): %m", sd_event_get_state(se));
277                         abort();
278                 } else {
279                         if (rc == 0) {
280                                 rc = sd_event_wait(se, (uint64_t)(int64_t)-1);
281                                 if (rc < 0) {
282                                         errno = -rc;
283                                         ERROR("sd_event_wait returned an error (state: %d): %m", sd_event_get_state(se));
284                                 }
285                         }
286                         evloop.state = EVLOOP_STATE_RUN;
287                         if (rc > 0) {
288                                 rc = sd_event_dispatch(se);
289                                 if (rc < 0) {
290                                         errno = -rc;
291                                         ERROR("sd_event_dispatch returned an error (state: %d): %m", sd_event_get_state(se));
292                                 }
293                         }
294                 }
295         }
296 }
297
298 /**
299  * Internal callback for evloop management.
300  * The effect of this function is hidden: it exits
301  * the waiting poll if any.
302  */
303 static void evloop_on_efd_event()
304 {
305         uint64_t x;
306         read(evloop.efd, &x, sizeof x);
307 }
308
309 /**
310  * wakeup the event loop if needed by sending
311  * an event.
312  */
313 static void evloop_wakeup()
314 {
315         uint64_t x;
316
317         if (evloop.state & EVLOOP_STATE_WAIT) {
318                 x = 1;
319                 write(evloop.efd, &x, sizeof x);
320         }
321 }
322
323 /**
324  * Release the currently held event loop
325  */
326 static void evloop_release()
327 {
328         struct thread *nh, *ct = current_thread;
329
330         if (ct && evloop.holder == ct) {
331                 nh = ct->nholder;
332                 evloop.holder = nh;
333                 if (nh)
334                         pthread_cond_signal(nh->cwhold);
335         }
336 }
337
338 /**
339  * get the eventloop for the current thread
340  */
341 static int evloop_get()
342 {
343         struct thread *ct = current_thread;
344
345         if (evloop.holder)
346                 return evloop.holder == ct;
347
348         if (!evloop.sdev)
349                 return 0;
350
351         ct->nholder = NULL;
352         evloop.holder = ct;
353         return 1;
354 }
355
356 /**
357  * acquire the eventloop for the current thread
358  */
359 static void evloop_acquire()
360 {
361         struct thread **pwait, *ct;
362         pthread_cond_t cond;
363
364         /* try to get the evloop */
365         if (!evloop_get()) {
366                 /* failed, init waiting state */
367                 ct = current_thread;
368                 ct->nholder = NULL;
369                 ct->cwhold = &cond;
370                 pthread_cond_init(&cond, NULL);
371
372                 /* queue current thread in holder list */
373                 pwait = &evloop.holder;
374                 while (*pwait)
375                         pwait = &(*pwait)->nholder;
376                 *pwait = ct;
377
378                 /* wake up the evloop */
379                 evloop_wakeup();
380
381                 /* wait to acquire the evloop */
382                 pthread_cond_wait(&cond, &mutex);
383                 pthread_cond_destroy(&cond);
384         }
385 }
386
387 /**
388  * Enter the thread
389  * @param me the description of the thread to enter
390  */
391 static void thread_enter(volatile struct thread *me)
392 {
393         evloop_release();
394         /* initialize description of itself and link it in the list */
395         me->tid = pthread_self();
396         me->stop = 0;
397         me->waits = 0;
398         me->upper = current_thread;
399         me->next = threads;
400         threads = (struct thread*)me;
401         current_thread = (struct thread*)me;
402 }
403
404 /**
405  * leave the thread
406  * @param me the description of the thread to leave
407  */
408 static void thread_leave()
409 {
410         struct thread **prv, *me;
411
412         /* unlink the current thread and cleanup */
413         me = current_thread;
414         prv = &threads;
415         while (*prv != me)
416                 prv = &(*prv)->next;
417         *prv = me->next;
418
419         current_thread = me->upper;
420 }
421
422 /**
423  * Main processing loop of internal threads with processing jobs.
424  * The loop must be called with the mutex locked
425  * and it returns with the mutex locked.
426  * @param me the description of the thread to use
427  * TODO: how are timeout handled when reentering?
428  */
429 static void thread_run_internal(volatile struct thread *me)
430 {
431         struct job *job;
432
433         /* enter thread */
434         thread_enter(me);
435
436         /* loop until stopped */
437         while (!me->stop) {
438                 /* release the current event loop */
439                 evloop_release();
440
441                 /* get a job */
442                 job = job_get();
443                 if (job) {
444                         /* prepare running the job */
445                         job->blocked = 1; /* mark job as blocked */
446                         me->job = job; /* record the job (only for terminate) */
447
448                         /* run the job */
449                         pthread_mutex_unlock(&mutex);
450                         sig_monitor(job->timeout, job->callback, job->arg);
451                         pthread_mutex_lock(&mutex);
452
453                         /* release the run job */
454                         job_release(job);
455                 /* no job, check event loop wait */
456                 } else if (evloop_get()) {
457                         if (evloop.state != 0) {
458                                 /* busy ? */
459                                 CRITICAL("Can't enter dispatch while in dispatch!");
460                                 abort();
461                         }
462                         /* run the events */
463                         evloop.state = EVLOOP_STATE_RUN|EVLOOP_STATE_WAIT;
464                         pthread_mutex_unlock(&mutex);
465                         sig_monitor(0, evloop_run, NULL);
466                         pthread_mutex_lock(&mutex);
467                         evloop.state = 0;
468                 } else {
469                         /* no job and no event loop */
470                         running--;
471                         if (!running)
472                                 ERROR("Entering job deep sleep! Check your bindings.");
473                         me->waits = 1;
474                         pthread_cond_wait(&cond, &mutex);
475                         me->waits = 0;
476                         running++;
477                 }
478         }
479         /* cleanup */
480         evloop_release();
481         thread_leave();
482 }
483
484 /**
485  * Main processing loop of external threads.
486  * The loop must be called with the mutex locked
487  * and it returns with the mutex locked.
488  * @param me the description of the thread to use
489  */
490 static void thread_run_external(volatile struct thread *me)
491 {
492         /* enter thread */
493         thread_enter(me);
494
495         /* loop until stopped */
496         me->waits = 1;
497         while (!me->stop)
498                 pthread_cond_wait(&cond, &mutex);
499         me->waits = 0;
500         thread_leave();
501 }
502
503 /**
504  * Root for created threads.
505  */
506 static void thread_main()
507 {
508         struct thread me;
509
510         running++;
511         started++;
512         sig_monitor_init_timeouts();
513         thread_run_internal(&me);
514         sig_monitor_clean_timeouts();
515         started--;
516         running--;
517 }
518
519 /**
520  * Entry point for created threads.
521  * @param data not used
522  * @return NULL
523  */
524 static void *thread_starter(void *data)
525 {
526         pthread_mutex_lock(&mutex);
527         thread_main();
528         pthread_mutex_unlock(&mutex);
529         return NULL;
530 }
531
532 /**
533  * Starts a new thread
534  * @return 0 in case of success or -1 in case of error
535  */
536 static int start_one_thread()
537 {
538         pthread_t tid;
539         int rc;
540
541         rc = pthread_create(&tid, NULL, thread_starter, NULL);
542         if (rc != 0) {
543                 /* errno = rc; */
544                 WARNING("not able to start thread: %m");
545                 rc = -1;
546         }
547         return rc;
548 }
549
550 /**
551  * Queues a new asynchronous job represented by 'callback' and 'arg'
552  * for the 'group' and the 'timeout'.
553  * Jobs are queued FIFO and are possibly executed in parallel
554  * concurrently except for job of the same group that are
555  * executed sequentially in FIFO order.
556  * @param group    The group of the job or NULL when no group.
557  * @param timeout  The maximum execution time in seconds of the job
558  *                 or 0 for unlimited time.
559  * @param callback The function to execute for achieving the job.
560  *                 Its first parameter is either 0 on normal flow
561  *                 or the signal number that broke the normal flow.
562  *                 The remaining parameter is the parameter 'arg1'
563  *                 given here.
564  * @param arg      The second argument for 'callback'
565  * @return 0 in case of success or -1 in case of error
566  */
567 int jobs_queue(
568                 const void *group,
569                 int timeout,
570                 void (*callback)(int, void*),
571                 void *arg)
572 {
573         struct job *job;
574         int rc;
575
576         pthread_mutex_lock(&mutex);
577
578         /* allocates the job */
579         job = job_create(group, timeout, callback, arg);
580         if (!job)
581                 goto error;
582
583         /* check availability */
584         if (remains <= 0) {
585                 ERROR("can't process job with threads: too many jobs");
586                 errno = EBUSY;
587                 goto error2;
588         }
589
590         /* start a thread if needed */
591         if (running == started && started < allowed) {
592                 /* all threads are busy and a new can be started */
593                 rc = start_one_thread();
594                 if (rc < 0 && started == 0) {
595                         ERROR("can't start initial thread: %m");
596                         goto error2;
597                 }
598         }
599
600         /* queues the job */
601         job_add(job);
602
603         /* signal an existing job */
604         pthread_cond_signal(&cond);
605         pthread_mutex_unlock(&mutex);
606         return 0;
607
608 error2:
609         job->next = free_jobs;
610         free_jobs = job;
611 error:
612         pthread_mutex_unlock(&mutex);
613         return -1;
614 }
615
616 /**
617  * Internal helper function for 'jobs_enter'.
618  * @see jobs_enter, jobs_leave
619  */
620 static void enter_cb(int signum, void *closure)
621 {
622         struct sync *sync = closure;
623         sync->enter(signum, sync->arg, (void*)&sync->thread);
624 }
625
626 /**
627  * Internal helper function for 'jobs_call'.
628  * @see jobs_call
629  */
630 static void call_cb(int signum, void *closure)
631 {
632         struct sync *sync = closure;
633         sync->callback(signum, sync->arg);
634         jobs_leave((void*)&sync->thread);
635 }
636
637 /**
638  * Internal helper for synchronous jobs. It enters
639  * a new thread loop for evaluating the given job
640  * as recorded by the couple 'sync_cb' and 'sync'.
641  * @see jobs_call, jobs_enter, jobs_leave
642  */
643 static int do_sync(
644                 const void *group,
645                 int timeout,
646                 void (*sync_cb)(int signum, void *closure),
647                 struct sync *sync
648 )
649 {
650         struct job *job;
651
652         pthread_mutex_lock(&mutex);
653
654         /* allocates the job */
655         job = job_create(group, timeout, sync_cb, sync);
656         if (!job) {
657                 pthread_mutex_unlock(&mutex);
658                 return -1;
659         }
660
661         /* queues the job */
662         job_add(job);
663
664         /* run until stopped */
665         if (current_thread)
666                 thread_run_internal(&sync->thread);
667         else
668                 thread_run_external(&sync->thread);
669         pthread_mutex_unlock(&mutex);
670         return 0;
671 }
672
673 /**
674  * Enter a synchronisation point: activates the job given by 'callback'
675  * and 'closure' using 'group' and 'timeout' to control sequencing and
676  * execution time.
677  * @param group the group for sequencing jobs
678  * @param timeout the time in seconds allocated to the job
679  * @param callback the callback that will handle the job.
680  *                 it receives 3 parameters: 'signum' that will be 0
681  *                 on normal flow or the catched signal number in case
682  *                 of interrupted flow, the context 'closure' as given and
683  *                 a 'jobloop' reference that must be used when the job is
684  *                 terminated to unlock the current execution flow.
685  * @param closure the argument to the callback
686  * @return 0 on success or -1 in case of error
687  */
688 int jobs_enter(
689                 const void *group,
690                 int timeout,
691                 void (*callback)(int signum, void *closure, struct jobloop *jobloop),
692                 void *closure
693 )
694 {
695         struct sync sync;
696
697         sync.enter = callback;
698         sync.arg = closure;
699         return do_sync(group, timeout, enter_cb, &sync);
700 }
701
702 /**
703  * Unlocks the execution flow designed by 'jobloop'.
704  * @param jobloop indication of the flow to unlock
705  * @return 0 in case of success of -1 on error
706  */
707 int jobs_leave(struct jobloop *jobloop)
708 {
709         struct thread *t;
710
711         pthread_mutex_lock(&mutex);
712         t = threads;
713         while (t && t != (struct thread*)jobloop)
714                 t = t->next;
715         if (!t) {
716                 errno = EINVAL;
717         } else {
718                 t->stop = 1;
719                 if (t->waits)
720                         pthread_cond_broadcast(&cond);
721                 else
722                         evloop_wakeup();
723         }
724         pthread_mutex_unlock(&mutex);
725         return -!t;
726 }
727
728 /**
729  * Calls synchronously the job represented by 'callback' and 'arg1'
730  * for the 'group' and the 'timeout' and waits for its completion.
731  * @param group    The group of the job or NULL when no group.
732  * @param timeout  The maximum execution time in seconds of the job
733  *                 or 0 for unlimited time.
734  * @param callback The function to execute for achieving the job.
735  *                 Its first parameter is either 0 on normal flow
736  *                 or the signal number that broke the normal flow.
737  *                 The remaining parameter is the parameter 'arg1'
738  *                 given here.
739  * @param arg      The second argument for 'callback'
740  * @return 0 in case of success or -1 in case of error
741  */
742 int jobs_call(
743                 const void *group,
744                 int timeout,
745                 void (*callback)(int, void*),
746                 void *arg)
747 {
748         struct sync sync;
749
750         sync.callback = callback;
751         sync.arg = arg;
752
753         return do_sync(group, timeout, call_cb, &sync);
754 }
755
756 /**
757  * Internal callback for evloop management.
758  * The effect of this function is hidden: it exits
759  * the waiting poll if any. Then it wakes up a thread
760  * awaiting the evloop using signal.
761  */
762 static int on_evloop_efd(sd_event_source *s, int fd, uint32_t revents, void *userdata)
763 {
764         evloop_on_efd_event();
765         return 1;
766 }
767
768 /**
769  * Gets a sd_event item for the current thread.
770  * @return a sd_event or NULL in case of error
771  */
772 static struct sd_event *get_sd_event_locked()
773 {
774         int rc;
775
776         /* creates the evloop on need */
777         if (!evloop.sdev) {
778                 /* start the creation */
779                 evloop.state = 0;
780                 /* creates the eventfd for waking up polls */
781                 evloop.efd = eventfd(0, EFD_CLOEXEC|EFD_SEMAPHORE);
782                 if (evloop.efd < 0) {
783                         ERROR("can't make eventfd for events");
784                         goto error1;
785                 }
786                 /* create the systemd event loop */
787                 evloop.sdev = systemd_get_event_loop();
788                 if (!evloop.sdev) {
789                         ERROR("can't make event loop");
790                         goto error2;
791                 }
792                 /* put the eventfd in the event loop */
793                 rc = sd_event_add_io(evloop.sdev, NULL, evloop.efd, EPOLLIN, on_evloop_efd, NULL);
794                 if (rc < 0) {
795                         ERROR("can't register eventfd");
796                         evloop.sdev = NULL;
797 error2:
798                         close(evloop.efd);
799 error1:
800                         return NULL;
801                 }
802         }
803
804         /* acquire the event loop */
805         evloop_acquire();
806
807         return evloop.sdev;
808 }
809
810 /**
811  * Ensure that the current running thread can control the event loop.
812  */
813 void jobs_acquire_event_manager()
814 {
815         struct thread lt;
816
817         /* ensure an existing thread environment */
818         if (!current_thread) {
819                 memset(&lt, 0, sizeof lt);
820                 current_thread = &lt;
821         }
822
823         /* process */
824         pthread_mutex_lock(&mutex);
825         get_sd_event_locked();
826         pthread_mutex_unlock(&mutex);
827
828         /* release the faked thread environment if needed */
829         if (current_thread == &lt) {
830                 /*
831                  * Releasing it is needed because there is no way to guess
832                  * when it has to be released really. But here is where it is
833                  * hazardous: if the caller modifies the eventloop when it
834                  * is waiting, there is no way to make the change effective.
835                  * A workaround to achieve that goal is for the caller to
836                  * require the event loop a second time after having modified it.
837                  */
838                 NOTICE("Requiring sd_event loop out of binder callbacks is hazardous!");
839                 if (verbose_wants(Log_Level_Info))
840                         sig_monitor_dumpstack();
841                 evloop_release();
842                 current_thread = NULL;
843         }
844 }
845
846 /**
847  * Enter the jobs processing loop.
848  * @param allowed_count Maximum count of thread for jobs including this one
849  * @param start_count   Count of thread to start now, must be lower.
850  * @param waiter_count  Maximum count of jobs that can be waiting.
851  * @param start         The start routine to activate (can't be NULL)
852  * @return 0 in case of success or -1 in case of error.
853  */
854 int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)(int signum, void* arg), void *arg)
855 {
856         int rc, launched;
857         struct job *job;
858
859         assert(allowed_count >= 1);
860         assert(start_count >= 0);
861         assert(waiter_count > 0);
862         assert(start_count <= allowed_count);
863
864         rc = -1;
865         pthread_mutex_lock(&mutex);
866
867         /* check whether already running */
868         if (current_thread || allowed) {
869                 ERROR("thread already started");
870                 errno = EINVAL;
871                 goto error;
872         }
873
874         /* records the allowed count */
875         allowed = allowed_count;
876         started = 0;
877         running = 0;
878         remains = waiter_count;
879
880         /* start at least one thread: the current one */
881         launched = 1;
882         while (launched < start_count) {
883                 if (start_one_thread() != 0) {
884                         ERROR("Not all threads can be started");
885                         goto error;
886                 }
887                 launched++;
888         }
889
890         /* queue the start job */
891         job = job_create(NULL, 0, start, arg);
892         if (!job)
893                 goto error;
894         job_add(job);
895
896         /* run until end */
897         thread_main();
898         rc = 0;
899 error:
900         pthread_mutex_unlock(&mutex);
901         return rc;
902 }
903
904 /**
905  * Terminate all the threads and cancel all pending jobs.
906  */
907 void jobs_terminate()
908 {
909         struct job *job, *head, *tail;
910         pthread_t me, *others;
911         struct thread *t;
912         int count;
913
914         /* how am i? */
915         me = pthread_self();
916
917         /* request all threads to stop */
918         pthread_mutex_lock(&mutex);
919         allowed = 0;
920
921         /* count the number of threads */
922         count = 0;
923         t = threads;
924         while (t) {
925                 if (!t->upper && !pthread_equal(t->tid, me))
926                         count++;
927                 t = t->next;
928         }
929
930         /* fill the array of threads */
931         others = alloca(count * sizeof *others);
932         count = 0;
933         t = threads;
934         while (t) {
935                 if (!t->upper && !pthread_equal(t->tid, me))
936                         others[count++] = t->tid;
937                 t = t->next;
938         }
939
940         /* stops the threads */
941         t = threads;
942         while (t) {
943                 t->stop = 1;
944                 t = t->next;
945         }
946
947         /* wait the threads */
948         pthread_cond_broadcast(&cond);
949         pthread_mutex_unlock(&mutex);
950         while (count)
951                 pthread_join(others[--count], NULL);
952         pthread_mutex_lock(&mutex);
953
954         /* cancel pending jobs of other threads */
955         remains = 0;
956         head = first_job;
957         first_job = NULL;
958         tail = NULL;
959         while (head) {
960                 /* unlink the job */
961                 job = head;
962                 head = job->next;
963
964                 /* search if job is stacked for current */
965                 t = current_thread;
966                 while (t && t->job != job)
967                         t = t->upper;
968                 if (t) {
969                         /* yes, relink it at end */
970                         if (tail)
971                                 tail->next = job;
972                         else
973                                 first_job = job;
974                         tail = job;
975                         job->next = NULL;
976                 } else {
977                         /* no cancel the job */
978                         pthread_mutex_unlock(&mutex);
979                         sig_monitor(0, job_cancel, job);
980                         free(job);
981                         pthread_mutex_lock(&mutex);
982                 }
983         }
984         pthread_mutex_unlock(&mutex);
985 }
986