Add an easy function for synchronous calls
[src/app-framework-binder.git] / src / jobs.c
1 /*
2  * Copyright (C) 2016, 2017 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #include <stdlib.h>
21 #include <stdint.h>
22 #include <unistd.h>
23 #include <signal.h>
24 #include <time.h>
25 #include <sys/syscall.h>
26 #include <pthread.h>
27 #include <errno.h>
28 #include <assert.h>
29
30 #include <systemd/sd-event.h>
31
32 #include "jobs.h"
33 #include "sig-monitor.h"
34 #include "verbose.h"
35
36 #if 0
37 #define _alert_ "do you really want to remove monitoring?"
38 #define sig_monitor_init_timeouts()  ((void)0)
39 #define sig_monitor_clean_timeouts() ((void)0)
40 #define sig_monitor(to,cb,arg)       (cb(0,arg))
41 #endif
42
43 #define EVENT_TIMEOUT_TOP       ((uint64_t)-1)
44 #define EVENT_TIMEOUT_CHILD     ((uint64_t)10000)
45
46 /** Internal shortcut for callback */
47 typedef void (*job_cb_t)(int, void*, void *, void*);
48
49 /** Description of a pending job */
50 struct job
51 {
52         struct job *next;    /**< link to the next job enqueued */
53         void *group;         /**< group of the request */
54         job_cb_t callback;   /**< processing callback */
55         void *arg1;          /**< first arg */
56         void *arg2;          /**< second arg */
57         void *arg3;          /**< third arg */
58         int timeout;         /**< timeout in second for processing the request */
59         unsigned blocked: 1; /**< is an other request blocking this one ? */
60         unsigned dropped: 1; /**< is removed ? */
61 };
62
63 /** Description of handled event loops */
64 struct events
65 {
66         struct events *next;
67         struct sd_event *event;
68         uint64_t timeout;
69         unsigned runs: 1;
70 };
71
72 /** Description of threads */
73 struct thread
74 {
75         struct thread *next;   /**< next thread of the list */
76         struct thread *upper;  /**< upper same thread */
77         struct job *job;       /**< currently processed job */
78         struct events *events; /**< currently processed job */
79         pthread_t tid;         /**< the thread id */
80         unsigned stop: 1;      /**< stop requested */
81         unsigned lowered: 1;   /**< has a lower same thread */
82         unsigned waits: 1;     /**< is waiting? */
83 };
84
85 /**
86  * Description of synchonous callback
87  */
88 struct sync
89 {
90         void (*callback)(int, void*);   /**< the synchrnous callback */
91         void *arg;                      /**< the argument of the callback */
92 };
93
94 /* synchronisation of threads */
95 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
96 static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
97
98 /* count allowed, started and waiting threads */
99 static int allowed = 0; /** allowed count of threads */
100 static int started = 0; /** started count of threads */
101 static int waiting = 0; /** waiting count of threads */
102 static int remains = 0; /** allowed count of waiting jobs */
103 static int nevents = 0; /** count of events */
104
105 /* list of threads */
106 static struct thread *threads;
107 static _Thread_local struct thread *current;
108
109 /* queue of pending jobs */
110 static struct job *first_job;
111 static struct events *first_events;
112 static struct job *free_jobs;
113
114 /**
115  * Create a new job with the given parameters
116  * @param group    the group of the job
117  * @param timeout  the timeout of the job (0 if none)
118  * @param callback the function that achieves the job
119  * @param arg1     the first argument of the callback
120  * @param arg2     the second argument of the callback
121  * @param arg3     the third argument of the callback
122  * @return the created job unblock or NULL when no more memory
123  */
124 static struct job *job_create(
125                 void *group,
126                 int timeout,
127                 job_cb_t callback,
128                 void *arg1,
129                 void *arg2,
130                 void *arg3)
131 {
132         struct job *job;
133
134         /* try recyle existing job */
135         job = free_jobs;
136         if (job)
137                 free_jobs = job->next;
138         else {
139                 /* allocation  without blocking */
140                 pthread_mutex_unlock(&mutex);
141                 job = malloc(sizeof *job);
142                 pthread_mutex_lock(&mutex);
143                 if (!job) {
144                         errno = -ENOMEM;
145                         goto end;
146                 }
147         }
148         /* initialises the job */
149         job->group = group;
150         job->timeout = timeout;
151         job->callback = callback;
152         job->arg1 = arg1;
153         job->arg2 = arg2;
154         job->arg3 = arg3;
155         job->blocked = 0;
156         job->dropped = 0;
157 end:
158         return job;
159 }
160
161 /**
162  * Adds 'job' at the end of the list of jobs, marking it
163  * as blocked if an other job with the same group is pending.
164  * @param job the job to add
165  */
166 static void job_add(struct job *job)
167 {
168         void *group;
169         struct job *ijob, **pjob;
170
171         /* prepare to add */
172         group = job->group;
173         job->next = NULL;
174
175         /* search end and blockers */
176         pjob = &first_job;
177         ijob = first_job;
178         while (ijob) {
179                 if (group && ijob->group == group)
180                         job->blocked = 1;
181                 pjob = &ijob->next;
182                 ijob = ijob->next;
183         }
184
185         /* queue the jobs */
186         *pjob = job;
187 }
188
189 /**
190  * Get the next job to process or NULL if none.
191  * @return the first job that isn't blocked or NULL
192  */
193 static inline struct job *job_get()
194 {
195         struct job *job = first_job;
196         while (job && job->blocked)
197                 job = job->next;
198         return job;
199 }
200
201 /**
202  * Get the next events to process or NULL if none.
203  * @return the first events that isn't running or NULL
204  */
205 static inline struct events *events_get()
206 {
207         struct events *events = first_events;
208         while (events && events->runs)
209                 events = events->next;
210         return events;
211 }
212
213 /**
214  * Releases the processed 'job': removes it
215  * from the list of jobs and unblock the first
216  * pending job of the same group if any.
217  * @param job the job to release
218  */
219 static inline void job_release(struct job *job)
220 {
221         struct job *ijob, **pjob;
222         void *group;
223
224         /* first unqueue the job */
225         pjob = &first_job;
226         ijob = first_job;
227         while (ijob != job) {
228                 pjob = &ijob->next;
229                 ijob = ijob->next;
230         }
231         *pjob = job->next;
232
233         /* then unblock jobs of the same group */
234         group = job->group;
235         if (group) {
236                 ijob = job->next;
237                 while (ijob && ijob->group != group)
238                         ijob = ijob->next;
239                 if (ijob)
240                         ijob->blocked = 0;
241         }
242
243         /* recycle the job */
244         job->next = free_jobs;
245         free_jobs = job;
246 }
247
248 /**
249  * Monitored normal callback for a job.
250  * This function is called by the monitor
251  * to run the job when the safe environment
252  * is set.
253  * @param signum 0 on normal flow or the number
254  *               of the signal that interrupted the normal
255  *               flow
256  * @param arg     the job to run
257  */
258 static void job_call(int signum, void *arg)
259 {
260         struct job *job = arg;
261         job->callback(signum, job->arg1, job->arg2, job->arg3);
262 }
263
264 /**
265  * Monitored cancel callback for a job.
266  * This function is called by the monitor
267  * to cancel the job when the safe environment
268  * is set.
269  * @param signum 0 on normal flow or the number
270  *               of the signal that interrupted the normal
271  *               flow, isn't used
272  * @param arg    the job to run
273  */
274 static void job_cancel(int signum, void *arg)
275 {
276         job_call(SIGABRT, arg);
277 }
278
279 /**
280  * Monitored normal callback for events.
281  * This function is called by the monitor
282  * to run the event loop when the safe environment
283  * is set.
284  * @param signum 0 on normal flow or the number
285  *               of the signal that interrupted the normal
286  *               flow
287  * @param arg     the events to run
288  */
289 static void events_call(int signum, void *arg)
290 {
291         struct events *events = arg;
292         if (!signum)
293                 sd_event_run(events->event, events->timeout);
294 }
295
296 /**
297  * Main processing loop of threads processing jobs.
298  * The loop must be called with the mutex locked
299  * and it returns with the mutex locked.
300  * @param me the description of the thread to use
301  * TODO: how are timeout handled when reentering?
302  */
303 static void thread_run(volatile struct thread *me)
304 {
305         struct thread **prv;
306         struct job *job;
307         struct events *events;
308         uint64_t evto;
309
310         /* initialize description of itself and link it in the list */
311         me->tid = pthread_self();
312         me->stop = 0;
313         me->lowered = 0;
314         me->waits = 0;
315         me->upper = current;
316         if (current) {
317                 current->lowered = 1;
318                 evto = EVENT_TIMEOUT_CHILD;
319         } else {
320                 started++;
321                 sig_monitor_init_timeouts();
322                 evto = EVENT_TIMEOUT_TOP;
323         }
324         me->next = threads;
325         threads = (struct thread*)me;
326         current = (struct thread*)me;
327
328         NOTICE("job thread starting %d(/%d) %s", started, allowed, me->upper ? "child" : "parent");
329
330         /* loop until stopped */
331         me->events = NULL;
332         while (!me->stop) {
333                 /* get a job */
334                 job = job_get(first_job);
335                 if (job) {
336                         /* prepare running the job */
337                         remains++; /* increases count of job that can wait */
338                         job->blocked = 1; /* mark job as blocked */
339                         me->job = job; /* record the job (only for terminate) */
340
341                         /* run the job */
342                         pthread_mutex_unlock(&mutex);
343                         sig_monitor(job->timeout, job_call, job);
344                         pthread_mutex_lock(&mutex);
345
346                         /* release the run job */
347                         job_release(job);
348
349                         /* release event if any */
350                         events = me->events;
351                         if (events) {
352                                 events->runs = 0;
353                                 me->events = NULL;
354                         }
355                 } else {
356                         /* no job, check events */
357                         events = events_get();
358                         if (events) {
359                                 /* run the events */
360                                 events->runs = 1;
361                                 events->timeout = evto;
362                                 me->events = events;
363                                 pthread_mutex_unlock(&mutex);
364                                 sig_monitor(0, events_call, events);
365                                 pthread_mutex_lock(&mutex);
366                                 events->runs = 0;
367                                 me->events = NULL;
368                         } else {
369                                 /* no job and not events */
370                                 waiting++;
371                                 me->waits = 1;
372                                 pthread_cond_wait(&cond, &mutex);
373                                 me->waits = 0;
374                                 waiting--;
375                         }
376                 }
377         }
378         NOTICE("job thread stoping %d(/%d) %s", started, allowed, me->upper ? "child" : "parent");
379
380         /* unlink the current thread and cleanup */
381         prv = &threads;
382         while (*prv != me)
383                 prv = &(*prv)->next;
384         *prv = me->next;
385         current = me->upper;
386         if (current) {
387                 current->lowered = 0;
388         } else {
389                 sig_monitor_clean_timeouts();
390                 started--;
391         }
392 }
393
394 /**
395  * Entry point for created threads.
396  * @param data not used
397  * @return NULL
398  */
399 static void *thread_main(void *data)
400 {
401         struct thread me;
402
403         pthread_mutex_lock(&mutex);
404         thread_run(&me);
405         pthread_mutex_unlock(&mutex);
406         return NULL;
407 }
408
409 /**
410  * Starts a new thread
411  * @return 0 in case of success or -1 in case of error
412  */
413 static int start_one_thread()
414 {
415         pthread_t tid;
416         int rc;
417
418         rc = pthread_create(&tid, NULL, thread_main, NULL);
419         if (rc != 0) {
420                 /* errno = rc; */
421                 WARNING("not able to start thread: %m");
422                 rc = -1;
423         }
424         return rc;
425 }
426
427 /**
428  * Queues a new asynchronous job represented by 'callback'
429  * for the 'group' and the 'timeout'.
430  * Jobs are queued FIFO and are possibly executed in parallel
431  * concurrently except for job of the same group that are
432  * executed sequentially in FIFO order.
433  * @param group    The group of the job or NULL when no group.
434  * @param timeout  The maximum execution time in seconds of the job
435  *                 or 0 for unlimited time.
436  * @param callback The function to execute for achieving the job.
437  *                 Its first parameter is either 0 on normal flow
438  *                 or the signal number that broke the normal flow.
439  * @return 0 in case of success or -1 in case of error
440  */
441 int jobs_queue0(
442                 void *group,
443                 int timeout,
444                 void (*callback)(int signum))
445 {
446         return jobs_queue3(group, timeout, (job_cb_t)callback, NULL, NULL, NULL);
447 }
448
449 /**
450  * Queues a new asynchronous job represented by 'callback' and 'arg1'
451  * for the 'group' and the 'timeout'.
452  * Jobs are queued FIFO and are possibly executed in parallel
453  * concurrently except for job of the same group that are
454  * executed sequentially in FIFO order.
455  * @param group    The group of the job or NULL when no group.
456  * @param timeout  The maximum execution time in seconds of the job
457  *                 or 0 for unlimited time.
458  * @param callback The function to execute for achieving the job.
459  *                 Its first parameter is either 0 on normal flow
460  *                 or the signal number that broke the normal flow.
461  *                 The remaining parameter is the parameter 'arg1'
462  *                 given here.
463  * @param arg      The second argument for 'callback'
464  * @return 0 in case of success or -1 in case of error
465  */
466 int jobs_queue(
467                 void *group,
468                 int timeout,
469                 void (*callback)(int, void*),
470                 void *arg)
471 {
472         return jobs_queue3(group, timeout, (job_cb_t)callback, arg, NULL, NULL);
473 }
474
475 /**
476  * Queues a new asynchronous job represented by 'callback' and 'arg[12]'
477  * for the 'group' and the 'timeout'.
478  * Jobs are queued FIFO and are possibly executed in parallel
479  * concurrently except for job of the same group that are
480  * executed sequentially in FIFO order.
481  * @param group    The group of the job or NULL when no group.
482  * @param timeout  The maximum execution time in seconds of the job
483  *                 or 0 for unlimited time.
484  * @param callback The function to execute for achieving the job.
485  *                 Its first parameter is either 0 on normal flow
486  *                 or the signal number that broke the normal flow.
487  *                 The remaining parameters are the parameters 'arg[12]'
488  *                 given here.
489  * @param arg1     The second argument for 'callback'
490  * @param arg2     The third argument for 'callback'
491  * @return 0 in case of success or -1 in case of error
492  */
493 int jobs_queue2(
494                 void *group,
495                 int timeout,
496                 void (*callback)(int, void*, void*),
497                 void *arg1,
498                 void *arg2)
499 {
500         return jobs_queue3(group, timeout, (job_cb_t)callback, arg1, arg2, NULL);
501 }
502
503 /**
504  * Queues a new asynchronous job represented by 'callback' and 'arg[123]'
505  * for the 'group' and the 'timeout'.
506  * Jobs are queued FIFO and are possibly executed in parallel
507  * concurrently except for job of the same group that are
508  * executed sequentially in FIFO order.
509  * @param group    The group of the job or NULL when no group.
510  * @param timeout  The maximum execution time in seconds of the job
511  *                 or 0 for unlimited time.
512  * @param callback The function to execute for achieving the job.
513  *                 Its first parameter is either 0 on normal flow
514  *                 or the signal number that broke the normal flow.
515  *                 The remaining parameters are the parameters 'arg[123]'
516  *                 given here.
517  * @param arg1     The second argument for 'callback'
518  * @param arg2     The third argument for 'callback'
519  * @param arg3     The forth argument for 'callback'
520  * @return 0 in case of success or -1 in case of error
521  */
522 int jobs_queue3(
523                 void *group,
524                 int timeout,
525                 void (*callback)(int, void*, void *, void*),
526                 void *arg1,
527                 void *arg2,
528                 void *arg3)
529 {
530         const char *info;
531         struct job *job;
532         int rc;
533
534         pthread_mutex_lock(&mutex);
535
536         /* allocates the job */
537         job = job_create(group, timeout, callback, arg1, arg2, arg3);
538         if (!job) {
539                 errno = ENOMEM;
540                 info = "out of memory";
541                 goto error;
542         }
543
544         /* check availability */
545         if (remains == 0) {
546                 errno = EBUSY;
547                 info = "too many jobs";
548                 goto error2;
549         }
550
551         /* start a thread if needed */
552         if (waiting == 0 && started < allowed) {
553                 /* all threads are busy and a new can be started */
554                 rc = start_one_thread();
555                 if (rc < 0 && started == 0) {
556                         info = "can't start first thread";
557                         goto error2;
558                 }
559         }
560
561         /* queues the job */
562         remains--;
563         job_add(job);
564
565         /* signal an existing job */
566         pthread_cond_signal(&cond);
567         pthread_mutex_unlock(&mutex);
568         return 0;
569
570 error2:
571         job->next = free_jobs;
572         free_jobs = job;
573 error:
574         ERROR("can't process job with threads: %s, %m", info);
575         pthread_mutex_unlock(&mutex);
576         return -1;
577 }
578
579 /**
580  * Enter a synchronisation point: activates the job given by 'callback'
581  * and 'closure' using 'group' and 'timeout' to control sequencing and
582  * execution time.
583  * @param group the group for sequencing jobs
584  * @param timeout the time in seconds allocated to the job
585  * @param callback the callback that will handle the job.
586  *                 it receives 3 parameters: 'signum' that will be 0
587  *                 on normal flow or the catched signal number in case
588  *                 of interrupted flow, the context 'closure' as given and
589  *                 a 'jobloop' reference that must be used when the job is
590  *                 terminated to unlock the current execution flow.
591  * @param closure the context completion closure for the callback
592  * @return 0 on success or -1 in case of error
593  */
594 int jobs_enter(
595                 void *group,
596                 int timeout,
597                 void (*callback)(int signum, void *closure, struct jobloop *jobloop),
598                 void *closure
599 )
600 {
601         
602         struct job *job;
603         struct thread me;
604
605         pthread_mutex_lock(&mutex);
606
607         /* allocates the job */
608         job = job_create(group, timeout, (job_cb_t)callback, closure, &me, NULL);
609         if (!job) {
610                 ERROR("out of memory");
611                 errno = ENOMEM;
612                 pthread_mutex_unlock(&mutex);
613                 return -1;
614         }
615
616         /* queues the job */
617         job_add(job);
618
619         /* run until stopped */
620         thread_run(&me);
621         pthread_mutex_unlock(&mutex);
622         return 0;
623 }
624
625 /**
626  * Unlocks the execution flow designed by 'jobloop'.
627  * @param jobloop indication of the flow to unlock
628  * @return 0 in case of success of -1 on error
629  */
630 int jobs_leave(struct jobloop *jobloop)
631 {
632         struct thread *t;
633
634         pthread_mutex_lock(&mutex);
635         t = threads;
636         while (t && t != (struct thread*)jobloop)
637                 t = t->next;
638         if (!t) {
639                 errno = EINVAL;
640         } else {
641                 t->stop = 1;
642                 if (t->waits)
643                         pthread_cond_broadcast(&cond);
644         }
645         pthread_mutex_unlock(&mutex);
646         return -!t;
647 }
648
649 /**
650  * Internal helper function for 'jobs_call'.
651  * @see jobs_call, jobs_enter, jobs_leave
652  */
653 static void call_cb(int signum, void *closure, struct jobloop *jobloop)
654 {
655         struct sync *sync = closure;
656         sync->callback(signum, sync->arg);
657         jobs_leave(jobloop);
658 }
659
660 /**
661  * Calls synchronously the job represented by 'callback' and 'arg1'
662  * for the 'group' and the 'timeout' and waits for its completion.
663  * @param group    The group of the job or NULL when no group.
664  * @param timeout  The maximum execution time in seconds of the job
665  *                 or 0 for unlimited time.
666  * @param callback The function to execute for achieving the job.
667  *                 Its first parameter is either 0 on normal flow
668  *                 or the signal number that broke the normal flow.
669  *                 The remaining parameter is the parameter 'arg1'
670  *                 given here.
671  * @param arg      The second argument for 'callback'
672  * @return 0 in case of success or -1 in case of error
673  */
674 int jobs_call(
675                 void *group,
676                 int timeout,
677                 void (*callback)(int, void*),
678                 void *arg)
679 {
680         struct sync sync;
681
682         sync.callback = callback;
683         sync.arg = arg;
684         return jobs_enter(group, timeout, call_cb, &sync);
685 }
686
687 /**
688  * Gets a sd_event item for the current thread.
689  * @return a sd_event or NULL in case of error
690  */
691 struct sd_event *jobs_get_sd_event()
692 {
693         struct events *events;
694         struct thread *me;
695         int rc;
696
697         pthread_mutex_lock(&mutex);
698
699         /* search events on stack */
700         me = current;
701         while (me && !me->events)
702                 me = me->upper;
703         if (me)
704                 /* return the stacked events */
705                 events = me->events;
706         else {
707                 /* search an available events */
708                 events = events_get();
709                 if (!events) {
710                         /* not found, check if creation possible */
711                         if (nevents >= allowed) {
712                                 ERROR("not possible to add a new event");
713                                 events = NULL;
714                         } else {
715                                 events = malloc(sizeof *events);
716                                 if (events && (rc = sd_event_new(&events->event)) >= 0) {
717                                         if (nevents < started || start_one_thread() >= 0) {
718                                                 events->runs = 0;
719                                                 events->next = first_events;
720                                                 first_events = events;
721                                         } else {
722                                                 ERROR("can't start thread for events");
723                                                 sd_event_unref(events->event);
724                                                 free(events);
725                                                 events = NULL;
726                                         }
727                                 } else {
728                                         if (!events) {
729                                                 ERROR("out of memory");
730                                                 errno = ENOMEM;
731                                         } else {
732                                                 free(events);
733                                                 ERROR("creation of sd_event failed: %m");
734                                                 events = NULL;
735                                                 errno = -rc;
736                                         } 
737                                 }
738                         }
739                 }
740                 if (events) {
741                         /* */
742                         me = current;
743                         if (me) {
744                                 events->runs = 1;
745                                 me->events = events;
746                         } else {
747                                 WARNING("event returned for unknown thread!");
748                         }
749                 }
750         }
751         pthread_mutex_unlock(&mutex);
752         return events ? events->event : NULL;
753 }
754
755 /**
756  * Enter the jobs processing loop.
757  * @param allowed_count Maximum count of thread for jobs including this one
758  * @param start_count   Count of thread to start now, must be lower.
759  * @param waiter_count  Maximum count of jobs that can be waiting.
760  * @param start         The start routine to activate (can't be NULL)
761  * @return 0 in case of success or -1 in case of error.
762  */
763 int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)())
764 {
765         int rc, launched;
766         struct thread me;
767         struct job *job;
768
769         assert(allowed_count >= 1);
770         assert(start_count >= 0);
771         assert(waiter_count > 0);
772         assert(start_count <= allowed_count);
773
774         rc = -1;
775         pthread_mutex_lock(&mutex);
776
777         /* check whether already running */
778         if (current || allowed) {
779                 ERROR("thread already started");
780                 errno = EINVAL;
781                 goto error;
782         }
783
784         /* start */
785         if (sig_monitor_init() < 0) {
786                 ERROR("failed to initialise signal handlers");
787                 goto error;
788         }
789
790         /* records the allowed count */
791         allowed = allowed_count;
792         started = 0;
793         waiting = 0;
794         remains = waiter_count;
795
796         /* start at least one thread */
797         launched = 0;
798         while ((launched + 1) < start_count) {
799                 if (start_one_thread() != 0) {
800                         ERROR("Not all threads can be started");
801                         goto error;
802                 }
803                 launched++;
804         }
805
806         /* queue the start job */
807         job = job_create(NULL, 0, (job_cb_t)start, NULL, NULL, NULL);
808         if (!job) {
809                 ERROR("out of memory");
810                 errno = ENOMEM;
811                 goto error;
812         }
813         job_add(job);
814         remains--;
815
816         /* run until end */
817         thread_run(&me);
818         rc = 0;
819 error:
820         pthread_mutex_unlock(&mutex);
821         return rc;
822 }
823
824 /**
825  * Terminate all the threads and cancel all pending jobs.
826  */
827 void jobs_terminate()
828 {
829         struct job *job, *head, *tail;
830         pthread_t me, *others;
831         struct thread *t;
832         int count;
833
834         /* how am i? */
835         me = pthread_self();
836
837         /* request all threads to stop */
838         pthread_mutex_lock(&mutex);
839         allowed = 0;
840
841         /* count the number of threads */
842         count = 0;
843         t = threads;
844         while (t) {
845                 if (!t->upper && !pthread_equal(t->tid, me))
846                         count++;
847                 t = t->next;
848         }
849
850         /* fill the array of threads */
851         others = alloca(count * sizeof *others);
852         count = 0;
853         t = threads;
854         while (t) {
855                 if (!t->upper && !pthread_equal(t->tid, me))
856                         others[count++] = t->tid;
857                 t = t->next;
858         }
859
860         /* stops the threads */
861         t = threads;
862         while (t) {
863                 t->stop = 1;
864                 t = t->next;
865         }
866
867         /* wait the threads */
868         pthread_cond_broadcast(&cond);
869         pthread_mutex_unlock(&mutex);
870         while (count)
871                 pthread_join(others[--count], NULL);
872         pthread_mutex_lock(&mutex);
873
874         /* cancel pending jobs of other threads */
875         remains = 0;
876         head = first_job;
877         first_job = NULL;
878         tail = NULL;
879         while (head) {
880                 /* unlink the job */
881                 job = head;
882                 head = job->next;
883
884                 /* search if job is stacked for current */
885                 t = current;
886                 while (t && t->job != job)
887                         t = t->upper;
888                 if (t) {
889                         /* yes, relink it at end */
890                         if (tail)
891                                 tail->next = job;
892                         else
893                                 first_job = job;
894                         tail = job;
895                         job->next = NULL;
896                 } else {
897                         /* no cancel the job */
898                         pthread_mutex_unlock(&mutex);
899                         sig_monitor(0, job_cancel, job);
900                         free(job);
901                         pthread_mutex_lock(&mutex);
902                 }
903         }
904         pthread_mutex_unlock(&mutex);
905 }
906