jobs: Add argument to start
[src/app-framework-binder.git] / src / jobs.c
1 /*
2  * Copyright (C) 2016, 2017 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #include <stdlib.h>
21 #include <stdint.h>
22 #include <unistd.h>
23 #include <signal.h>
24 #include <time.h>
25 #include <sys/syscall.h>
26 #include <pthread.h>
27 #include <errno.h>
28 #include <assert.h>
29 #include <sys/eventfd.h>
30
31 #include <systemd/sd-event.h>
32 #ifndef NO_JOBS_WATCHDOG
33 #include <systemd/sd-daemon.h>
34 #endif
35
36 #include "jobs.h"
37 #include "sig-monitor.h"
38 #include "verbose.h"
39
40 #if 0
41 #define _alert_ "do you really want to remove monitoring?"
42 #define sig_monitor_init_timeouts()  ((void)0)
43 #define sig_monitor_clean_timeouts() ((void)0)
44 #define sig_monitor(to,cb,arg)       (cb(0,arg))
45 #endif
46
47 #define EVENT_TIMEOUT_TOP       ((uint64_t)-1)
48 #define EVENT_TIMEOUT_CHILD     ((uint64_t)10000)
49
50 /** Internal shortcut for callback */
51 typedef void (*job_cb_t)(int, void*);
52
53 /** Description of a pending job */
54 struct job
55 {
56         struct job *next;    /**< link to the next job enqueued */
57         const void *group;   /**< group of the request */
58         job_cb_t callback;   /**< processing callback */
59         void *arg;           /**< argument */
60         int timeout;         /**< timeout in second for processing the request */
61         unsigned blocked: 1; /**< is an other request blocking this one ? */
62         unsigned dropped: 1; /**< is removed ? */
63 };
64
65 /** Description of handled event loops */
66 struct evloop
67 {
68         unsigned state;        /**< encoded state */
69         int efd;               /**< event notification */
70         struct sd_event *sdev; /**< the systemd event loop */
71         pthread_cond_t  cond;  /**< condition */
72 };
73
74 #define EVLOOP_STATE_WAIT           1U
75 #define EVLOOP_STATE_RUN            2U
76 #define EVLOOP_STATE_LOCK           4U
77
78 /** Description of threads */
79 struct thread
80 {
81         struct thread *next;   /**< next thread of the list */
82         struct thread *upper;  /**< upper same thread */
83         struct job *job;       /**< currently processed job */
84         pthread_t tid;         /**< the thread id */
85         unsigned stop: 1;      /**< stop requested */
86         unsigned waits: 1;     /**< is waiting? */
87 };
88
89 /**
90  * Description of synchonous callback
91  */
92 struct sync
93 {
94         struct thread thread;   /**< thread loop data */
95         union {
96                 void (*callback)(int, void*);   /**< the synchronous callback */
97                 void (*enter)(int signum, void *closure, struct jobloop *jobloop);
98                                 /**< the entering synchronous routine */
99         };
100         void *arg;              /**< the argument of the callback */
101 };
102
103
104 /* synchronisation of threads */
105 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
106 static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
107
108 /* count allowed, started and running threads */
109 static int allowed = 0; /** allowed count of threads */
110 static int started = 0; /** started count of threads */
111 static int running = 0; /** running count of threads */
112 static int remains = 0; /** allowed count of waiting jobs */
113
114 /* list of threads */
115 static struct thread *threads;
116 static _Thread_local struct thread *current_thread;
117 static _Thread_local struct evloop *current_evloop;
118
119 /* queue of pending jobs */
120 static struct job *first_job;
121 static struct job *free_jobs;
122
123 /* event loop */
124 static struct evloop evloop[1];
125
126 /**
127  * Create a new job with the given parameters
128  * @param group    the group of the job
129  * @param timeout  the timeout of the job (0 if none)
130  * @param callback the function that achieves the job
131  * @param arg      the argument of the callback
132  * @return the created job unblock or NULL when no more memory
133  */
134 static struct job *job_create(
135                 const void *group,
136                 int timeout,
137                 job_cb_t callback,
138                 void *arg)
139 {
140         struct job *job;
141
142         /* try recyle existing job */
143         job = free_jobs;
144         if (job)
145                 free_jobs = job->next;
146         else {
147                 /* allocation  without blocking */
148                 pthread_mutex_unlock(&mutex);
149                 job = malloc(sizeof *job);
150                 pthread_mutex_lock(&mutex);
151                 if (!job) {
152                         errno = -ENOMEM;
153                         goto end;
154                 }
155         }
156         /* initialises the job */
157         job->group = group;
158         job->timeout = timeout;
159         job->callback = callback;
160         job->arg = arg;
161         job->blocked = 0;
162         job->dropped = 0;
163 end:
164         return job;
165 }
166
167 /**
168  * Adds 'job' at the end of the list of jobs, marking it
169  * as blocked if an other job with the same group is pending.
170  * @param job the job to add
171  */
172 static void job_add(struct job *job)
173 {
174         const void *group;
175         struct job *ijob, **pjob;
176
177         /* prepare to add */
178         group = job->group;
179         job->next = NULL;
180
181         /* search end and blockers */
182         pjob = &first_job;
183         ijob = first_job;
184         while (ijob) {
185                 if (group && ijob->group == group)
186                         job->blocked = 1;
187                 pjob = &ijob->next;
188                 ijob = ijob->next;
189         }
190
191         /* queue the jobs */
192         *pjob = job;
193 }
194
195 /**
196  * Get the next job to process or NULL if none.
197  * @return the first job that isn't blocked or NULL
198  */
199 static inline struct job *job_get()
200 {
201         struct job *job = first_job;
202         while (job && job->blocked)
203                 job = job->next;
204         return job;
205 }
206
207 /**
208  * Releases the processed 'job': removes it
209  * from the list of jobs and unblock the first
210  * pending job of the same group if any.
211  * @param job the job to release
212  */
213 static inline void job_release(struct job *job)
214 {
215         struct job *ijob, **pjob;
216         const void *group;
217
218         /* first unqueue the job */
219         pjob = &first_job;
220         ijob = first_job;
221         while (ijob != job) {
222                 pjob = &ijob->next;
223                 ijob = ijob->next;
224         }
225         *pjob = job->next;
226
227         /* then unblock jobs of the same group */
228         group = job->group;
229         if (group) {
230                 ijob = job->next;
231                 while (ijob && ijob->group != group)
232                         ijob = ijob->next;
233                 if (ijob)
234                         ijob->blocked = 0;
235         }
236
237         /* recycle the job */
238         job->next = free_jobs;
239         free_jobs = job;
240 }
241
242 /**
243  * Monitored cancel callback for a job.
244  * This function is called by the monitor
245  * to cancel the job when the safe environment
246  * is set.
247  * @param signum 0 on normal flow or the number
248  *               of the signal that interrupted the normal
249  *               flow, isn't used
250  * @param arg    the job to run
251  */
252 static void job_cancel(int signum, void *arg)
253 {
254         struct job *job = arg;
255         job->callback(SIGABRT, job->arg);
256 }
257
258 /**
259  * Monitored normal callback for events.
260  * This function is called by the monitor
261  * to run the event loop when the safe environment
262  * is set.
263  * @param signum 0 on normal flow or the number
264  *               of the signal that interrupted the normal
265  *               flow
266  * @param arg     the events to run
267  */
268 static void evloop_run(int signum, void *arg)
269 {
270         int rc;
271         struct sd_event *se;
272         struct evloop *el = arg;
273
274         if (!signum) {
275                 se = el->sdev;
276                 rc = sd_event_prepare(se);
277                 if (rc < 0) {
278                         errno = -rc;
279                         ERROR("sd_event_prepare returned an error (state: %d): %m", sd_event_get_state(se));
280                 } else {
281                         if (rc == 0) {
282                                 rc = sd_event_wait(se, (uint64_t)(int64_t)-1);
283                                 if (rc < 0) {
284                                         errno = -rc;
285                                         ERROR("sd_event_wait returned an error (state: %d): %m", sd_event_get_state(se));
286                                 }
287                         }
288                         __atomic_and_fetch(&el->state, ~(EVLOOP_STATE_WAIT), __ATOMIC_RELAXED);
289
290                         if (rc > 0) {
291                                 rc = sd_event_dispatch(se);
292                                 if (rc < 0) {
293                                         errno = -rc;
294                                         ERROR("sd_event_dispatch returned an error (state: %d): %m", sd_event_get_state(se));
295                                 }
296                         }
297                 }
298         }
299         __atomic_and_fetch(&el->state, ~(EVLOOP_STATE_WAIT|EVLOOP_STATE_RUN), __ATOMIC_RELAXED);
300 }
301
302
303 /**
304  * Main processing loop of threads processing jobs.
305  * The loop must be called with the mutex locked
306  * and it returns with the mutex locked.
307  * @param me the description of the thread to use
308  * TODO: how are timeout handled when reentering?
309  */
310 static void thread_run(volatile struct thread *me)
311 {
312         struct thread **prv;
313         struct job *job;
314         struct evloop *el;
315
316         /* initialize description of itself and link it in the list */
317         me->tid = pthread_self();
318         me->stop = 0;
319         me->waits = 0;
320         me->upper = current_thread;
321         if (!current_thread) {
322                 started++;
323                 sig_monitor_init_timeouts();
324         }
325         me->next = threads;
326         threads = (struct thread*)me;
327         current_thread = (struct thread*)me;
328
329         /* loop until stopped */
330         while (!me->stop) {
331                 /* release the event loop */
332                 if (current_evloop) {
333                         __atomic_sub_fetch(&current_evloop->state, EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
334                         current_evloop = NULL;
335                 }
336
337                 /* get a job */
338                 job = job_get(first_job);
339                 if (job) {
340                         /* prepare running the job */
341                         remains++; /* increases count of job that can wait */
342                         job->blocked = 1; /* mark job as blocked */
343                         me->job = job; /* record the job (only for terminate) */
344
345                         /* run the job */
346                         pthread_mutex_unlock(&mutex);
347                         sig_monitor(job->timeout, job->callback, job->arg);
348                         pthread_mutex_lock(&mutex);
349
350                         /* release the run job */
351                         job_release(job);
352                 } else {
353                         /* no job, check events */
354                         el = &evloop[0];
355                         if (el->sdev && !__atomic_load_n(&el->state, __ATOMIC_RELAXED)) {
356                                 /* run the events */
357                                 __atomic_store_n(&el->state, EVLOOP_STATE_LOCK|EVLOOP_STATE_RUN|EVLOOP_STATE_WAIT, __ATOMIC_RELAXED);
358                                 current_evloop = el;
359                                 pthread_mutex_unlock(&mutex);
360                                 sig_monitor(0, evloop_run, el);
361                                 pthread_mutex_lock(&mutex);
362                         } else {
363                                 /* no job and not events */
364                                 running--;
365                                 if (!running)
366                                         ERROR("Entering job deep sleep! Check your bindings.");
367                                 me->waits = 1;
368                                 pthread_cond_wait(&cond, &mutex);
369                                 me->waits = 0;
370                                 running++;
371                         }
372                 }
373         }
374
375         /* release the event loop */
376         if (current_evloop) {
377                 __atomic_sub_fetch(&current_evloop->state, EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
378                 current_evloop = NULL;
379         }
380
381         /* unlink the current thread and cleanup */
382         prv = &threads;
383         while (*prv != me)
384                 prv = &(*prv)->next;
385         *prv = me->next;
386         current_thread = me->upper;
387         if (!current_thread) {
388                 sig_monitor_clean_timeouts();
389                 started--;
390         }
391 }
392
393 /**
394  * Entry point for created threads.
395  * @param data not used
396  * @return NULL
397  */
398 static void *thread_main(void *data)
399 {
400         struct thread me;
401
402         pthread_mutex_lock(&mutex);
403         running++;
404         thread_run(&me);
405         running--;
406         pthread_mutex_unlock(&mutex);
407         return NULL;
408 }
409
410 /**
411  * Starts a new thread
412  * @return 0 in case of success or -1 in case of error
413  */
414 static int start_one_thread()
415 {
416         pthread_t tid;
417         int rc;
418
419         rc = pthread_create(&tid, NULL, thread_main, NULL);
420         if (rc != 0) {
421                 /* errno = rc; */
422                 WARNING("not able to start thread: %m");
423                 rc = -1;
424         }
425         return rc;
426 }
427
428 /**
429  * Queues a new asynchronous job represented by 'callback' and 'arg'
430  * for the 'group' and the 'timeout'.
431  * Jobs are queued FIFO and are possibly executed in parallel
432  * concurrently except for job of the same group that are
433  * executed sequentially in FIFO order.
434  * @param group    The group of the job or NULL when no group.
435  * @param timeout  The maximum execution time in seconds of the job
436  *                 or 0 for unlimited time.
437  * @param callback The function to execute for achieving the job.
438  *                 Its first parameter is either 0 on normal flow
439  *                 or the signal number that broke the normal flow.
440  *                 The remaining parameter is the parameter 'arg1'
441  *                 given here.
442  * @param arg      The second argument for 'callback'
443  * @return 0 in case of success or -1 in case of error
444  */
445 int jobs_queue(
446                 const void *group,
447                 int timeout,
448                 void (*callback)(int, void*),
449                 void *arg)
450 {
451         const char *info;
452         struct job *job;
453         int rc;
454
455         pthread_mutex_lock(&mutex);
456
457         /* allocates the job */
458         job = job_create(group, timeout, callback, arg);
459         if (!job) {
460                 errno = ENOMEM;
461                 info = "out of memory";
462                 goto error;
463         }
464
465         /* check availability */
466         if (remains == 0) {
467                 errno = EBUSY;
468                 info = "too many jobs";
469                 goto error2;
470         }
471
472         /* start a thread if needed */
473         if (running == started && started < allowed) {
474                 /* all threads are busy and a new can be started */
475                 rc = start_one_thread();
476                 if (rc < 0 && started == 0) {
477                         info = "can't start first thread";
478                         goto error2;
479                 }
480         }
481
482         /* queues the job */
483         remains--;
484         job_add(job);
485
486         /* signal an existing job */
487         pthread_cond_signal(&cond);
488         pthread_mutex_unlock(&mutex);
489         return 0;
490
491 error2:
492         job->next = free_jobs;
493         free_jobs = job;
494 error:
495         ERROR("can't process job with threads: %s, %m", info);
496         pthread_mutex_unlock(&mutex);
497         return -1;
498 }
499
500 /**
501  * Internal helper function for 'jobs_enter'.
502  * @see jobs_enter, jobs_leave
503  */
504 static void enter_cb(int signum, void *closure)
505 {
506         struct sync *sync = closure;
507         sync->enter(signum, sync->arg, (void*)&sync->thread);
508 }
509
510 /**
511  * Internal helper function for 'jobs_call'.
512  * @see jobs_call
513  */
514 static void call_cb(int signum, void *closure)
515 {
516         struct sync *sync = closure;
517         sync->callback(signum, sync->arg);
518         jobs_leave((void*)&sync->thread);
519 }
520
521 /**
522  * Internal helper for synchronous jobs. It enters
523  * a new thread loop for evaluating the given job
524  * as recorded by the couple 'sync_cb' and 'sync'.
525  * @see jobs_call, jobs_enter, jobs_leave
526  */
527 static int do_sync(
528                 const void *group,
529                 int timeout,
530                 void (*sync_cb)(int signum, void *closure),
531                 struct sync *sync
532 )
533 {
534         struct job *job;
535
536         pthread_mutex_lock(&mutex);
537
538         /* allocates the job */
539         job = job_create(group, timeout, sync_cb, sync);
540         if (!job) {
541                 ERROR("out of memory");
542                 errno = ENOMEM;
543                 pthread_mutex_unlock(&mutex);
544                 return -1;
545         }
546
547         /* queues the job */
548         job_add(job);
549
550         /* run until stopped */
551         thread_run(&sync->thread);
552         pthread_mutex_unlock(&mutex);
553         return 0;
554 }
555
556 /**
557  * Enter a synchronisation point: activates the job given by 'callback'
558  * and 'closure' using 'group' and 'timeout' to control sequencing and
559  * execution time.
560  * @param group the group for sequencing jobs
561  * @param timeout the time in seconds allocated to the job
562  * @param callback the callback that will handle the job.
563  *                 it receives 3 parameters: 'signum' that will be 0
564  *                 on normal flow or the catched signal number in case
565  *                 of interrupted flow, the context 'closure' as given and
566  *                 a 'jobloop' reference that must be used when the job is
567  *                 terminated to unlock the current execution flow.
568  * @param arg the argument to the callback
569  * @return 0 on success or -1 in case of error
570  */
571 int jobs_enter(
572                 const void *group,
573                 int timeout,
574                 void (*callback)(int signum, void *closure, struct jobloop *jobloop),
575                 void *closure
576 )
577 {
578         struct sync sync;
579
580         sync.enter = callback;
581         sync.arg = closure;
582         return do_sync(group, timeout, enter_cb, &sync);
583 }
584
585 /**
586  * Unlocks the execution flow designed by 'jobloop'.
587  * @param jobloop indication of the flow to unlock
588  * @return 0 in case of success of -1 on error
589  */
590 int jobs_leave(struct jobloop *jobloop)
591 {
592         struct thread *t;
593
594         pthread_mutex_lock(&mutex);
595         t = threads;
596         while (t && t != (struct thread*)jobloop)
597                 t = t->next;
598         if (!t) {
599                 errno = EINVAL;
600         } else {
601                 t->stop = 1;
602                 if (t->waits)
603                         pthread_cond_broadcast(&cond);
604         }
605         pthread_mutex_unlock(&mutex);
606         return -!t;
607 }
608
609 /**
610  * Calls synchronously the job represented by 'callback' and 'arg1'
611  * for the 'group' and the 'timeout' and waits for its completion.
612  * @param group    The group of the job or NULL when no group.
613  * @param timeout  The maximum execution time in seconds of the job
614  *                 or 0 for unlimited time.
615  * @param callback The function to execute for achieving the job.
616  *                 Its first parameter is either 0 on normal flow
617  *                 or the signal number that broke the normal flow.
618  *                 The remaining parameter is the parameter 'arg1'
619  *                 given here.
620  * @param arg      The second argument for 'callback'
621  * @return 0 in case of success or -1 in case of error
622  */
623 int jobs_call(
624                 const void *group,
625                 int timeout,
626                 void (*callback)(int, void*),
627                 void *arg)
628 {
629         struct sync sync;
630
631         sync.callback = callback;
632         sync.arg = arg;
633
634         return do_sync(group, timeout, call_cb, &sync);
635 }
636
637 /**
638  * Internal callback for evloop management.
639  * The effect of this function is hidden: it exits
640  * the waiting poll if any. Then it wakes up a thread
641  * awaiting the evloop using signal.
642  */
643 static int on_evloop_efd(sd_event_source *s, int fd, uint32_t revents, void *userdata)
644 {
645         uint64_t x;
646         struct evloop *evloop = userdata;
647         read(evloop->efd, &x, sizeof x);
648         pthread_mutex_lock(&mutex);
649         pthread_cond_broadcast(&evloop->cond);  
650         pthread_mutex_unlock(&mutex);
651         return 1;
652 }
653
654 /**
655  * Gets a sd_event item for the current thread.
656  * @return a sd_event or NULL in case of error
657  */
658 static struct sd_event *get_sd_event_locked()
659 {
660         struct evloop *el;
661         uint64_t x;
662         int rc;
663
664         /* creates the evloop on need */
665         el = &evloop[0];
666         if (!el->sdev) {
667                 /* start the creation */
668                 el->state = 0;
669                 /* creates the eventfd for waking up polls */
670                 el->efd = eventfd(0, EFD_CLOEXEC);
671                 if (el->efd < 0) {
672                         ERROR("can't make eventfd for events");
673                         goto error1;
674                 }
675                 /* create the systemd event loop */
676                 rc = sd_event_new(&el->sdev);
677                 if (rc < 0) {
678                         ERROR("can't make new event loop");
679                         goto error2;
680                 }
681                 /* put the eventfd in the event loop */
682                 rc = sd_event_add_io(el->sdev, NULL, el->efd, EPOLLIN, on_evloop_efd, el);
683                 if (rc < 0) {
684                         ERROR("can't register eventfd");
685                         sd_event_unref(el->sdev);
686                         el->sdev = NULL;
687 error2:
688                         close(el->efd);
689 error1:
690                         return NULL;
691                 }
692         }
693
694         /* attach the event loop to the current thread */
695         if (current_evloop != el) {
696                 if (current_evloop)
697                         __atomic_sub_fetch(&current_evloop->state, EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
698                 current_evloop = el;
699                 __atomic_add_fetch(&el->state, EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
700         }
701
702         /* wait for a modifiable event loop */
703         while (__atomic_load_n(&el->state, __ATOMIC_RELAXED) & EVLOOP_STATE_WAIT) {
704                 x = 1;
705                 write(el->efd, &x, sizeof x);
706                 pthread_cond_wait(&el->cond, &mutex);
707         }
708
709         return el->sdev;
710 }
711
712 /**
713  * Gets a sd_event item for the current thread.
714  * @return a sd_event or NULL in case of error
715  */
716 struct sd_event *jobs_get_sd_event()
717 {
718         struct sd_event *result;
719
720         pthread_mutex_lock(&mutex);
721         result = get_sd_event_locked();
722         pthread_mutex_unlock(&mutex);
723
724         return result;
725 }
726
727 /**
728  * Enter the jobs processing loop.
729  * @param allowed_count Maximum count of thread for jobs including this one
730  * @param start_count   Count of thread to start now, must be lower.
731  * @param waiter_count  Maximum count of jobs that can be waiting.
732  * @param start         The start routine to activate (can't be NULL)
733  * @return 0 in case of success or -1 in case of error.
734  */
735 int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)(int signum, void* arg), void *arg)
736 {
737         int rc, launched;
738         struct thread me;
739         struct job *job;
740
741         assert(allowed_count >= 1);
742         assert(start_count >= 0);
743         assert(waiter_count > 0);
744         assert(start_count <= allowed_count);
745
746         rc = -1;
747         pthread_mutex_lock(&mutex);
748
749         /* check whether already running */
750         if (current_thread || allowed) {
751                 ERROR("thread already started");
752                 errno = EINVAL;
753                 goto error;
754         }
755
756         /* start */
757         if (sig_monitor_init() < 0) {
758                 ERROR("failed to initialise signal handlers");
759                 goto error;
760         }
761
762         /* records the allowed count */
763         allowed = allowed_count;
764         started = 0;
765         running = 0;
766         remains = waiter_count;
767
768 #ifndef NO_JOBS_WATCHDOG
769         /* set the watchdog */
770         if (sd_watchdog_enabled(0, NULL))
771                 sd_event_set_watchdog(get_sd_event_locked(), 1);
772 #endif
773
774         /* start at least one thread */
775         launched = 0;
776         while ((launched + 1) < start_count) {
777                 if (start_one_thread() != 0) {
778                         ERROR("Not all threads can be started");
779                         goto error;
780                 }
781                 launched++;
782         }
783
784         /* queue the start job */
785         job = job_create(NULL, 0, start, arg);
786         if (!job) {
787                 ERROR("out of memory");
788                 errno = ENOMEM;
789                 goto error;
790         }
791         job_add(job);
792         remains--;
793
794         /* run until end */
795         thread_run(&me);
796         rc = 0;
797 error:
798         pthread_mutex_unlock(&mutex);
799         return rc;
800 }
801
802 /**
803  * Terminate all the threads and cancel all pending jobs.
804  */
805 void jobs_terminate()
806 {
807         struct job *job, *head, *tail;
808         pthread_t me, *others;
809         struct thread *t;
810         int count;
811
812         /* how am i? */
813         me = pthread_self();
814
815         /* request all threads to stop */
816         pthread_mutex_lock(&mutex);
817         allowed = 0;
818
819         /* count the number of threads */
820         count = 0;
821         t = threads;
822         while (t) {
823                 if (!t->upper && !pthread_equal(t->tid, me))
824                         count++;
825                 t = t->next;
826         }
827
828         /* fill the array of threads */
829         others = alloca(count * sizeof *others);
830         count = 0;
831         t = threads;
832         while (t) {
833                 if (!t->upper && !pthread_equal(t->tid, me))
834                         others[count++] = t->tid;
835                 t = t->next;
836         }
837
838         /* stops the threads */
839         t = threads;
840         while (t) {
841                 t->stop = 1;
842                 t = t->next;
843         }
844
845         /* wait the threads */
846         pthread_cond_broadcast(&cond);
847         pthread_mutex_unlock(&mutex);
848         while (count)
849                 pthread_join(others[--count], NULL);
850         pthread_mutex_lock(&mutex);
851
852         /* cancel pending jobs of other threads */
853         remains = 0;
854         head = first_job;
855         first_job = NULL;
856         tail = NULL;
857         while (head) {
858                 /* unlink the job */
859                 job = head;
860                 head = job->next;
861
862                 /* search if job is stacked for current */
863                 t = current_thread;
864                 while (t && t->job != job)
865                         t = t->upper;
866                 if (t) {
867                         /* yes, relink it at end */
868                         if (tail)
869                                 tail->next = job;
870                         else
871                                 first_job = job;
872                         tail = job;
873                         job->next = NULL;
874                 } else {
875                         /* no cancel the job */
876                         pthread_mutex_unlock(&mutex);
877                         sig_monitor(0, job_cancel, job);
878                         free(job);
879                         pthread_mutex_lock(&mutex);
880                 }
881         }
882         pthread_mutex_unlock(&mutex);
883 }
884