reduce verbosity
[src/app-framework-binder.git] / src / jobs.c
1 /*
2  * Copyright (C) 2016, 2017 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #include <stdlib.h>
21 #include <stdint.h>
22 #include <unistd.h>
23 #include <signal.h>
24 #include <time.h>
25 #include <sys/syscall.h>
26 #include <pthread.h>
27 #include <errno.h>
28 #include <assert.h>
29
30 #include <systemd/sd-event.h>
31
32 #include "jobs.h"
33 #include "sig-monitor.h"
34 #include "verbose.h"
35
36 #if 0
37 #define _alert_ "do you really want to remove monitoring?"
38 #define sig_monitor_init_timeouts()  ((void)0)
39 #define sig_monitor_clean_timeouts() ((void)0)
40 #define sig_monitor(to,cb,arg)       (cb(0,arg))
41 #endif
42
43 #define EVENT_TIMEOUT_TOP       ((uint64_t)-1)
44 #define EVENT_TIMEOUT_CHILD     ((uint64_t)10000)
45
46 /** Internal shortcut for callback */
47 typedef void (*job_cb_t)(int, void*, void *, void*);
48
49 /** Description of a pending job */
50 struct job
51 {
52         struct job *next;    /**< link to the next job enqueued */
53         void *group;         /**< group of the request */
54         job_cb_t callback;   /**< processing callback */
55         void *arg1;          /**< first arg */
56         void *arg2;          /**< second arg */
57         void *arg3;          /**< third arg */
58         int timeout;         /**< timeout in second for processing the request */
59         unsigned blocked: 1; /**< is an other request blocking this one ? */
60         unsigned dropped: 1; /**< is removed ? */
61 };
62
63 /** Description of handled event loops */
64 struct events
65 {
66         struct events *next;
67         struct sd_event *event;
68         uint64_t timeout;
69         unsigned runs: 1;
70 };
71
72 /** Description of threads */
73 struct thread
74 {
75         struct thread *next;   /**< next thread of the list */
76         struct thread *upper;  /**< upper same thread */
77         struct job *job;       /**< currently processed job */
78         struct events *events; /**< currently processed job */
79         pthread_t tid;         /**< the thread id */
80         unsigned stop: 1;      /**< stop requested */
81         unsigned lowered: 1;   /**< has a lower same thread */
82         unsigned waits: 1;     /**< is waiting? */
83 };
84
85 /**
86  * Description of synchonous callback
87  */
88 struct sync
89 {
90         void (*callback)(int, void*);   /**< the synchrnous callback */
91         void *arg;                      /**< the argument of the callback */
92 };
93
94 /* synchronisation of threads */
95 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
96 static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
97
98 /* count allowed, started and waiting threads */
99 static int allowed = 0; /** allowed count of threads */
100 static int started = 0; /** started count of threads */
101 static int waiting = 0; /** waiting count of threads */
102 static int remains = 0; /** allowed count of waiting jobs */
103 static int nevents = 0; /** count of events */
104
105 /* list of threads */
106 static struct thread *threads;
107 static _Thread_local struct thread *current;
108
109 /* queue of pending jobs */
110 static struct job *first_job;
111 static struct events *first_events;
112 static struct job *free_jobs;
113
114 /**
115  * Create a new job with the given parameters
116  * @param group    the group of the job
117  * @param timeout  the timeout of the job (0 if none)
118  * @param callback the function that achieves the job
119  * @param arg1     the first argument of the callback
120  * @param arg2     the second argument of the callback
121  * @param arg3     the third argument of the callback
122  * @return the created job unblock or NULL when no more memory
123  */
124 static struct job *job_create(
125                 void *group,
126                 int timeout,
127                 job_cb_t callback,
128                 void *arg1,
129                 void *arg2,
130                 void *arg3)
131 {
132         struct job *job;
133
134         /* try recyle existing job */
135         job = free_jobs;
136         if (job)
137                 free_jobs = job->next;
138         else {
139                 /* allocation  without blocking */
140                 pthread_mutex_unlock(&mutex);
141                 job = malloc(sizeof *job);
142                 pthread_mutex_lock(&mutex);
143                 if (!job) {
144                         errno = -ENOMEM;
145                         goto end;
146                 }
147         }
148         /* initialises the job */
149         job->group = group;
150         job->timeout = timeout;
151         job->callback = callback;
152         job->arg1 = arg1;
153         job->arg2 = arg2;
154         job->arg3 = arg3;
155         job->blocked = 0;
156         job->dropped = 0;
157 end:
158         return job;
159 }
160
161 /**
162  * Adds 'job' at the end of the list of jobs, marking it
163  * as blocked if an other job with the same group is pending.
164  * @param job the job to add
165  */
166 static void job_add(struct job *job)
167 {
168         void *group;
169         struct job *ijob, **pjob;
170
171         /* prepare to add */
172         group = job->group;
173         job->next = NULL;
174
175         /* search end and blockers */
176         pjob = &first_job;
177         ijob = first_job;
178         while (ijob) {
179                 if (group && ijob->group == group)
180                         job->blocked = 1;
181                 pjob = &ijob->next;
182                 ijob = ijob->next;
183         }
184
185         /* queue the jobs */
186         *pjob = job;
187 }
188
189 /**
190  * Get the next job to process or NULL if none.
191  * @return the first job that isn't blocked or NULL
192  */
193 static inline struct job *job_get()
194 {
195         struct job *job = first_job;
196         while (job && job->blocked)
197                 job = job->next;
198         return job;
199 }
200
201 /**
202  * Get the next events to process or NULL if none.
203  * @return the first events that isn't running or NULL
204  */
205 static inline struct events *events_get()
206 {
207         struct events *events = first_events;
208         while (events && events->runs)
209                 events = events->next;
210         return events;
211 }
212
213 /**
214  * Releases the processed 'job': removes it
215  * from the list of jobs and unblock the first
216  * pending job of the same group if any.
217  * @param job the job to release
218  */
219 static inline void job_release(struct job *job)
220 {
221         struct job *ijob, **pjob;
222         void *group;
223
224         /* first unqueue the job */
225         pjob = &first_job;
226         ijob = first_job;
227         while (ijob != job) {
228                 pjob = &ijob->next;
229                 ijob = ijob->next;
230         }
231         *pjob = job->next;
232
233         /* then unblock jobs of the same group */
234         group = job->group;
235         if (group) {
236                 ijob = job->next;
237                 while (ijob && ijob->group != group)
238                         ijob = ijob->next;
239                 if (ijob)
240                         ijob->blocked = 0;
241         }
242
243         /* recycle the job */
244         job->next = free_jobs;
245         free_jobs = job;
246 }
247
248 /**
249  * Monitored normal callback for a job.
250  * This function is called by the monitor
251  * to run the job when the safe environment
252  * is set.
253  * @param signum 0 on normal flow or the number
254  *               of the signal that interrupted the normal
255  *               flow
256  * @param arg     the job to run
257  */
258 static void job_call(int signum, void *arg)
259 {
260         struct job *job = arg;
261         job->callback(signum, job->arg1, job->arg2, job->arg3);
262 }
263
264 /**
265  * Monitored cancel callback for a job.
266  * This function is called by the monitor
267  * to cancel the job when the safe environment
268  * is set.
269  * @param signum 0 on normal flow or the number
270  *               of the signal that interrupted the normal
271  *               flow, isn't used
272  * @param arg    the job to run
273  */
274 static void job_cancel(int signum, void *arg)
275 {
276         job_call(SIGABRT, arg);
277 }
278
279 /**
280  * Monitored normal callback for events.
281  * This function is called by the monitor
282  * to run the event loop when the safe environment
283  * is set.
284  * @param signum 0 on normal flow or the number
285  *               of the signal that interrupted the normal
286  *               flow
287  * @param arg     the events to run
288  */
289 static void events_call(int signum, void *arg)
290 {
291         struct events *events = arg;
292         if (!signum)
293                 sd_event_run(events->event, events->timeout);
294 }
295
296 /**
297  * Main processing loop of threads processing jobs.
298  * The loop must be called with the mutex locked
299  * and it returns with the mutex locked.
300  * @param me the description of the thread to use
301  * TODO: how are timeout handled when reentering?
302  */
303 static void thread_run(volatile struct thread *me)
304 {
305         struct thread **prv;
306         struct job *job;
307         struct events *events;
308         uint64_t evto;
309
310         /* initialize description of itself and link it in the list */
311         me->tid = pthread_self();
312         me->stop = 0;
313         me->lowered = 0;
314         me->waits = 0;
315         me->upper = current;
316         if (current) {
317                 current->lowered = 1;
318                 evto = EVENT_TIMEOUT_CHILD;
319         } else {
320                 started++;
321                 sig_monitor_init_timeouts();
322                 evto = EVENT_TIMEOUT_TOP;
323         }
324         me->next = threads;
325         threads = (struct thread*)me;
326         current = (struct thread*)me;
327
328         /* loop until stopped */
329         me->events = NULL;
330         while (!me->stop) {
331                 /* get a job */
332                 job = job_get(first_job);
333                 if (job) {
334                         /* prepare running the job */
335                         remains++; /* increases count of job that can wait */
336                         job->blocked = 1; /* mark job as blocked */
337                         me->job = job; /* record the job (only for terminate) */
338
339                         /* run the job */
340                         pthread_mutex_unlock(&mutex);
341                         sig_monitor(job->timeout, job_call, job);
342                         pthread_mutex_lock(&mutex);
343
344                         /* release the run job */
345                         job_release(job);
346
347                         /* release event if any */
348                         events = me->events;
349                         if (events) {
350                                 events->runs = 0;
351                                 me->events = NULL;
352                         }
353                 } else {
354                         /* no job, check events */
355                         events = events_get();
356                         if (events) {
357                                 /* run the events */
358                                 events->runs = 1;
359                                 events->timeout = evto;
360                                 me->events = events;
361                                 pthread_mutex_unlock(&mutex);
362                                 sig_monitor(0, events_call, events);
363                                 pthread_mutex_lock(&mutex);
364                                 events->runs = 0;
365                                 me->events = NULL;
366                         } else {
367                                 /* no job and not events */
368                                 waiting++;
369                                 me->waits = 1;
370                                 pthread_cond_wait(&cond, &mutex);
371                                 me->waits = 0;
372                                 waiting--;
373                         }
374                 }
375         }
376
377         /* unlink the current thread and cleanup */
378         prv = &threads;
379         while (*prv != me)
380                 prv = &(*prv)->next;
381         *prv = me->next;
382         current = me->upper;
383         if (current) {
384                 current->lowered = 0;
385         } else {
386                 sig_monitor_clean_timeouts();
387                 started--;
388         }
389 }
390
391 /**
392  * Entry point for created threads.
393  * @param data not used
394  * @return NULL
395  */
396 static void *thread_main(void *data)
397 {
398         struct thread me;
399
400         pthread_mutex_lock(&mutex);
401         thread_run(&me);
402         pthread_mutex_unlock(&mutex);
403         return NULL;
404 }
405
406 /**
407  * Starts a new thread
408  * @return 0 in case of success or -1 in case of error
409  */
410 static int start_one_thread()
411 {
412         pthread_t tid;
413         int rc;
414
415         rc = pthread_create(&tid, NULL, thread_main, NULL);
416         if (rc != 0) {
417                 /* errno = rc; */
418                 WARNING("not able to start thread: %m");
419                 rc = -1;
420         }
421         return rc;
422 }
423
424 /**
425  * Queues a new asynchronous job represented by 'callback'
426  * for the 'group' and the 'timeout'.
427  * Jobs are queued FIFO and are possibly executed in parallel
428  * concurrently except for job of the same group that are
429  * executed sequentially in FIFO order.
430  * @param group    The group of the job or NULL when no group.
431  * @param timeout  The maximum execution time in seconds of the job
432  *                 or 0 for unlimited time.
433  * @param callback The function to execute for achieving the job.
434  *                 Its first parameter is either 0 on normal flow
435  *                 or the signal number that broke the normal flow.
436  * @return 0 in case of success or -1 in case of error
437  */
438 int jobs_queue0(
439                 void *group,
440                 int timeout,
441                 void (*callback)(int signum))
442 {
443         return jobs_queue3(group, timeout, (job_cb_t)callback, NULL, NULL, NULL);
444 }
445
446 /**
447  * Queues a new asynchronous job represented by 'callback' and 'arg1'
448  * for the 'group' and the 'timeout'.
449  * Jobs are queued FIFO and are possibly executed in parallel
450  * concurrently except for job of the same group that are
451  * executed sequentially in FIFO order.
452  * @param group    The group of the job or NULL when no group.
453  * @param timeout  The maximum execution time in seconds of the job
454  *                 or 0 for unlimited time.
455  * @param callback The function to execute for achieving the job.
456  *                 Its first parameter is either 0 on normal flow
457  *                 or the signal number that broke the normal flow.
458  *                 The remaining parameter is the parameter 'arg1'
459  *                 given here.
460  * @param arg      The second argument for 'callback'
461  * @return 0 in case of success or -1 in case of error
462  */
463 int jobs_queue(
464                 void *group,
465                 int timeout,
466                 void (*callback)(int, void*),
467                 void *arg)
468 {
469         return jobs_queue3(group, timeout, (job_cb_t)callback, arg, NULL, NULL);
470 }
471
472 /**
473  * Queues a new asynchronous job represented by 'callback' and 'arg[12]'
474  * for the 'group' and the 'timeout'.
475  * Jobs are queued FIFO and are possibly executed in parallel
476  * concurrently except for job of the same group that are
477  * executed sequentially in FIFO order.
478  * @param group    The group of the job or NULL when no group.
479  * @param timeout  The maximum execution time in seconds of the job
480  *                 or 0 for unlimited time.
481  * @param callback The function to execute for achieving the job.
482  *                 Its first parameter is either 0 on normal flow
483  *                 or the signal number that broke the normal flow.
484  *                 The remaining parameters are the parameters 'arg[12]'
485  *                 given here.
486  * @param arg1     The second argument for 'callback'
487  * @param arg2     The third argument for 'callback'
488  * @return 0 in case of success or -1 in case of error
489  */
490 int jobs_queue2(
491                 void *group,
492                 int timeout,
493                 void (*callback)(int, void*, void*),
494                 void *arg1,
495                 void *arg2)
496 {
497         return jobs_queue3(group, timeout, (job_cb_t)callback, arg1, arg2, NULL);
498 }
499
500 /**
501  * Queues a new asynchronous job represented by 'callback' and 'arg[123]'
502  * for the 'group' and the 'timeout'.
503  * Jobs are queued FIFO and are possibly executed in parallel
504  * concurrently except for job of the same group that are
505  * executed sequentially in FIFO order.
506  * @param group    The group of the job or NULL when no group.
507  * @param timeout  The maximum execution time in seconds of the job
508  *                 or 0 for unlimited time.
509  * @param callback The function to execute for achieving the job.
510  *                 Its first parameter is either 0 on normal flow
511  *                 or the signal number that broke the normal flow.
512  *                 The remaining parameters are the parameters 'arg[123]'
513  *                 given here.
514  * @param arg1     The second argument for 'callback'
515  * @param arg2     The third argument for 'callback'
516  * @param arg3     The forth argument for 'callback'
517  * @return 0 in case of success or -1 in case of error
518  */
519 int jobs_queue3(
520                 void *group,
521                 int timeout,
522                 void (*callback)(int, void*, void *, void*),
523                 void *arg1,
524                 void *arg2,
525                 void *arg3)
526 {
527         const char *info;
528         struct job *job;
529         int rc;
530
531         pthread_mutex_lock(&mutex);
532
533         /* allocates the job */
534         job = job_create(group, timeout, callback, arg1, arg2, arg3);
535         if (!job) {
536                 errno = ENOMEM;
537                 info = "out of memory";
538                 goto error;
539         }
540
541         /* check availability */
542         if (remains == 0) {
543                 errno = EBUSY;
544                 info = "too many jobs";
545                 goto error2;
546         }
547
548         /* start a thread if needed */
549         if (waiting == 0 && started < allowed) {
550                 /* all threads are busy and a new can be started */
551                 rc = start_one_thread();
552                 if (rc < 0 && started == 0) {
553                         info = "can't start first thread";
554                         goto error2;
555                 }
556         }
557
558         /* queues the job */
559         remains--;
560         job_add(job);
561
562         /* signal an existing job */
563         pthread_cond_signal(&cond);
564         pthread_mutex_unlock(&mutex);
565         return 0;
566
567 error2:
568         job->next = free_jobs;
569         free_jobs = job;
570 error:
571         ERROR("can't process job with threads: %s, %m", info);
572         pthread_mutex_unlock(&mutex);
573         return -1;
574 }
575
576 /**
577  * Enter a synchronisation point: activates the job given by 'callback'
578  * and 'closure' using 'group' and 'timeout' to control sequencing and
579  * execution time.
580  * @param group the group for sequencing jobs
581  * @param timeout the time in seconds allocated to the job
582  * @param callback the callback that will handle the job.
583  *                 it receives 3 parameters: 'signum' that will be 0
584  *                 on normal flow or the catched signal number in case
585  *                 of interrupted flow, the context 'closure' as given and
586  *                 a 'jobloop' reference that must be used when the job is
587  *                 terminated to unlock the current execution flow.
588  * @param closure the context completion closure for the callback
589  * @return 0 on success or -1 in case of error
590  */
591 int jobs_enter(
592                 void *group,
593                 int timeout,
594                 void (*callback)(int signum, void *closure, struct jobloop *jobloop),
595                 void *closure
596 )
597 {
598         
599         struct job *job;
600         struct thread me;
601
602         pthread_mutex_lock(&mutex);
603
604         /* allocates the job */
605         job = job_create(group, timeout, (job_cb_t)callback, closure, &me, NULL);
606         if (!job) {
607                 ERROR("out of memory");
608                 errno = ENOMEM;
609                 pthread_mutex_unlock(&mutex);
610                 return -1;
611         }
612
613         /* queues the job */
614         job_add(job);
615
616         /* run until stopped */
617         thread_run(&me);
618         pthread_mutex_unlock(&mutex);
619         return 0;
620 }
621
622 /**
623  * Unlocks the execution flow designed by 'jobloop'.
624  * @param jobloop indication of the flow to unlock
625  * @return 0 in case of success of -1 on error
626  */
627 int jobs_leave(struct jobloop *jobloop)
628 {
629         struct thread *t;
630
631         pthread_mutex_lock(&mutex);
632         t = threads;
633         while (t && t != (struct thread*)jobloop)
634                 t = t->next;
635         if (!t) {
636                 errno = EINVAL;
637         } else {
638                 t->stop = 1;
639                 if (t->waits)
640                         pthread_cond_broadcast(&cond);
641         }
642         pthread_mutex_unlock(&mutex);
643         return -!t;
644 }
645
646 /**
647  * Internal helper function for 'jobs_call'.
648  * @see jobs_call, jobs_enter, jobs_leave
649  */
650 static void call_cb(int signum, void *closure, struct jobloop *jobloop)
651 {
652         struct sync *sync = closure;
653         sync->callback(signum, sync->arg);
654         jobs_leave(jobloop);
655 }
656
657 /**
658  * Calls synchronously the job represented by 'callback' and 'arg1'
659  * for the 'group' and the 'timeout' and waits for its completion.
660  * @param group    The group of the job or NULL when no group.
661  * @param timeout  The maximum execution time in seconds of the job
662  *                 or 0 for unlimited time.
663  * @param callback The function to execute for achieving the job.
664  *                 Its first parameter is either 0 on normal flow
665  *                 or the signal number that broke the normal flow.
666  *                 The remaining parameter is the parameter 'arg1'
667  *                 given here.
668  * @param arg      The second argument for 'callback'
669  * @return 0 in case of success or -1 in case of error
670  */
671 int jobs_call(
672                 void *group,
673                 int timeout,
674                 void (*callback)(int, void*),
675                 void *arg)
676 {
677         struct sync sync;
678
679         sync.callback = callback;
680         sync.arg = arg;
681         return jobs_enter(group, timeout, call_cb, &sync);
682 }
683
684 /**
685  * Gets a sd_event item for the current thread.
686  * @return a sd_event or NULL in case of error
687  */
688 struct sd_event *jobs_get_sd_event()
689 {
690         struct events *events;
691         struct thread *me;
692         int rc;
693
694         pthread_mutex_lock(&mutex);
695
696         /* search events on stack */
697         me = current;
698         while (me && !me->events)
699                 me = me->upper;
700         if (me)
701                 /* return the stacked events */
702                 events = me->events;
703         else {
704                 /* search an available events */
705                 events = events_get();
706                 if (!events) {
707                         /* not found, check if creation possible */
708                         if (nevents >= allowed) {
709                                 ERROR("not possible to add a new event");
710                                 events = NULL;
711                         } else {
712                                 events = malloc(sizeof *events);
713                                 if (events && (rc = sd_event_new(&events->event)) >= 0) {
714                                         if (nevents < started || start_one_thread() >= 0) {
715                                                 events->runs = 0;
716                                                 events->next = first_events;
717                                                 first_events = events;
718                                         } else {
719                                                 ERROR("can't start thread for events");
720                                                 sd_event_unref(events->event);
721                                                 free(events);
722                                                 events = NULL;
723                                         }
724                                 } else {
725                                         if (!events) {
726                                                 ERROR("out of memory");
727                                                 errno = ENOMEM;
728                                         } else {
729                                                 free(events);
730                                                 ERROR("creation of sd_event failed: %m");
731                                                 events = NULL;
732                                                 errno = -rc;
733                                         } 
734                                 }
735                         }
736                 }
737                 if (events) {
738                         /* */
739                         me = current;
740                         if (me) {
741                                 events->runs = 1;
742                                 me->events = events;
743                         } else {
744                                 WARNING("event returned for unknown thread!");
745                         }
746                 }
747         }
748         pthread_mutex_unlock(&mutex);
749         return events ? events->event : NULL;
750 }
751
752 /**
753  * Enter the jobs processing loop.
754  * @param allowed_count Maximum count of thread for jobs including this one
755  * @param start_count   Count of thread to start now, must be lower.
756  * @param waiter_count  Maximum count of jobs that can be waiting.
757  * @param start         The start routine to activate (can't be NULL)
758  * @return 0 in case of success or -1 in case of error.
759  */
760 int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)())
761 {
762         int rc, launched;
763         struct thread me;
764         struct job *job;
765
766         assert(allowed_count >= 1);
767         assert(start_count >= 0);
768         assert(waiter_count > 0);
769         assert(start_count <= allowed_count);
770
771         rc = -1;
772         pthread_mutex_lock(&mutex);
773
774         /* check whether already running */
775         if (current || allowed) {
776                 ERROR("thread already started");
777                 errno = EINVAL;
778                 goto error;
779         }
780
781         /* start */
782         if (sig_monitor_init() < 0) {
783                 ERROR("failed to initialise signal handlers");
784                 goto error;
785         }
786
787         /* records the allowed count */
788         allowed = allowed_count;
789         started = 0;
790         waiting = 0;
791         remains = waiter_count;
792
793         /* start at least one thread */
794         launched = 0;
795         while ((launched + 1) < start_count) {
796                 if (start_one_thread() != 0) {
797                         ERROR("Not all threads can be started");
798                         goto error;
799                 }
800                 launched++;
801         }
802
803         /* queue the start job */
804         job = job_create(NULL, 0, (job_cb_t)start, NULL, NULL, NULL);
805         if (!job) {
806                 ERROR("out of memory");
807                 errno = ENOMEM;
808                 goto error;
809         }
810         job_add(job);
811         remains--;
812
813         /* run until end */
814         thread_run(&me);
815         rc = 0;
816 error:
817         pthread_mutex_unlock(&mutex);
818         return rc;
819 }
820
821 /**
822  * Terminate all the threads and cancel all pending jobs.
823  */
824 void jobs_terminate()
825 {
826         struct job *job, *head, *tail;
827         pthread_t me, *others;
828         struct thread *t;
829         int count;
830
831         /* how am i? */
832         me = pthread_self();
833
834         /* request all threads to stop */
835         pthread_mutex_lock(&mutex);
836         allowed = 0;
837
838         /* count the number of threads */
839         count = 0;
840         t = threads;
841         while (t) {
842                 if (!t->upper && !pthread_equal(t->tid, me))
843                         count++;
844                 t = t->next;
845         }
846
847         /* fill the array of threads */
848         others = alloca(count * sizeof *others);
849         count = 0;
850         t = threads;
851         while (t) {
852                 if (!t->upper && !pthread_equal(t->tid, me))
853                         others[count++] = t->tid;
854                 t = t->next;
855         }
856
857         /* stops the threads */
858         t = threads;
859         while (t) {
860                 t->stop = 1;
861                 t = t->next;
862         }
863
864         /* wait the threads */
865         pthread_cond_broadcast(&cond);
866         pthread_mutex_unlock(&mutex);
867         while (count)
868                 pthread_join(others[--count], NULL);
869         pthread_mutex_lock(&mutex);
870
871         /* cancel pending jobs of other threads */
872         remains = 0;
873         head = first_job;
874         first_job = NULL;
875         tail = NULL;
876         while (head) {
877                 /* unlink the job */
878                 job = head;
879                 head = job->next;
880
881                 /* search if job is stacked for current */
882                 t = current;
883                 while (t && t->job != job)
884                         t = t->upper;
885                 if (t) {
886                         /* yes, relink it at end */
887                         if (tail)
888                                 tail->next = job;
889                         else
890                                 first_job = job;
891                         tail = job;
892                         job->next = NULL;
893                 } else {
894                         /* no cancel the job */
895                         pthread_mutex_unlock(&mutex);
896                         sig_monitor(0, job_cancel, job);
897                         free(job);
898                         pthread_mutex_lock(&mutex);
899                 }
900         }
901         pthread_mutex_unlock(&mutex);
902 }
903