jobs: Fix locks in event loops
[src/app-framework-binder.git] / src / jobs.c
1 /*
2  * Copyright (C) 2016, 2017 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #include <stdlib.h>
21 #include <stdint.h>
22 #include <unistd.h>
23 #include <signal.h>
24 #include <time.h>
25 #include <sys/syscall.h>
26 #include <pthread.h>
27 #include <errno.h>
28 #include <assert.h>
29 #include <sys/eventfd.h>
30
31 #include <systemd/sd-event.h>
32
33 #include "jobs.h"
34 #include "sig-monitor.h"
35 #include "verbose.h"
36
37 #if 0
38 #define _alert_ "do you really want to remove monitoring?"
39 #define sig_monitor_init_timeouts()  ((void)0)
40 #define sig_monitor_clean_timeouts() ((void)0)
41 #define sig_monitor(to,cb,arg)       (cb(0,arg))
42 #endif
43
44 #define EVENT_TIMEOUT_TOP       ((uint64_t)-1)
45 #define EVENT_TIMEOUT_CHILD     ((uint64_t)10000)
46
47 /** Internal shortcut for callback */
48 typedef void (*job_cb_t)(int, void*);
49
50 /** Description of a pending job */
51 struct job
52 {
53         struct job *next;    /**< link to the next job enqueued */
54         const void *group;   /**< group of the request */
55         job_cb_t callback;   /**< processing callback */
56         void *arg;           /**< argument */
57         int timeout;         /**< timeout in second for processing the request */
58         unsigned blocked: 1; /**< is an other request blocking this one ? */
59         unsigned dropped: 1; /**< is removed ? */
60 };
61
62 /** Description of handled event loops */
63 struct evloop
64 {
65         unsigned state;        /**< encoded state */
66         int efd;               /**< event notification */
67         struct sd_event *sdev; /**< the systemd event loop */
68         pthread_cond_t  cond;  /**< condition */
69 };
70
71 #define EVLOOP_STATE_WAIT           1U
72 #define EVLOOP_STATE_RUN            2U
73 #define EVLOOP_STATE_LOCK           4U
74
75 /** Description of threads */
76 struct thread
77 {
78         struct thread *next;   /**< next thread of the list */
79         struct thread *upper;  /**< upper same thread */
80         struct job *job;       /**< currently processed job */
81         pthread_t tid;         /**< the thread id */
82         unsigned stop: 1;      /**< stop requested */
83         unsigned waits: 1;     /**< is waiting? */
84 };
85
86 /**
87  * Description of synchonous callback
88  */
89 struct sync
90 {
91         struct thread thread;   /**< thread loop data */
92         union {
93                 void (*callback)(int, void*);   /**< the synchronous callback */
94                 void (*enter)(int signum, void *closure, struct jobloop *jobloop);
95                                 /**< the entering synchronous routine */
96         };
97         void *arg;              /**< the argument of the callback */
98 };
99
100
101 /* synchronisation of threads */
102 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
103 static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
104
105 /* count allowed, started and running threads */
106 static int allowed = 0; /** allowed count of threads */
107 static int started = 0; /** started count of threads */
108 static int running = 0; /** running count of threads */
109 static int remains = 0; /** allowed count of waiting jobs */
110
111 /* list of threads */
112 static struct thread *threads;
113 static _Thread_local struct thread *current_thread;
114 static _Thread_local struct evloop *current_evloop;
115
116 /* queue of pending jobs */
117 static struct job *first_job;
118 static struct job *free_jobs;
119
120 /* event loop */
121 static struct evloop evloop[1];
122
123 /**
124  * Create a new job with the given parameters
125  * @param group    the group of the job
126  * @param timeout  the timeout of the job (0 if none)
127  * @param callback the function that achieves the job
128  * @param arg      the argument of the callback
129  * @return the created job unblock or NULL when no more memory
130  */
131 static struct job *job_create(
132                 const void *group,
133                 int timeout,
134                 job_cb_t callback,
135                 void *arg)
136 {
137         struct job *job;
138
139         /* try recyle existing job */
140         job = free_jobs;
141         if (job)
142                 free_jobs = job->next;
143         else {
144                 /* allocation  without blocking */
145                 pthread_mutex_unlock(&mutex);
146                 job = malloc(sizeof *job);
147                 pthread_mutex_lock(&mutex);
148                 if (!job) {
149                         errno = -ENOMEM;
150                         goto end;
151                 }
152         }
153         /* initialises the job */
154         job->group = group;
155         job->timeout = timeout;
156         job->callback = callback;
157         job->arg = arg;
158         job->blocked = 0;
159         job->dropped = 0;
160 end:
161         return job;
162 }
163
164 /**
165  * Adds 'job' at the end of the list of jobs, marking it
166  * as blocked if an other job with the same group is pending.
167  * @param job the job to add
168  */
169 static void job_add(struct job *job)
170 {
171         const void *group;
172         struct job *ijob, **pjob;
173
174         /* prepare to add */
175         group = job->group;
176         job->next = NULL;
177
178         /* search end and blockers */
179         pjob = &first_job;
180         ijob = first_job;
181         while (ijob) {
182                 if (group && ijob->group == group)
183                         job->blocked = 1;
184                 pjob = &ijob->next;
185                 ijob = ijob->next;
186         }
187
188         /* queue the jobs */
189         *pjob = job;
190 }
191
192 /**
193  * Get the next job to process or NULL if none.
194  * @return the first job that isn't blocked or NULL
195  */
196 static inline struct job *job_get()
197 {
198         struct job *job = first_job;
199         while (job && job->blocked)
200                 job = job->next;
201         return job;
202 }
203
204 /**
205  * Releases the processed 'job': removes it
206  * from the list of jobs and unblock the first
207  * pending job of the same group if any.
208  * @param job the job to release
209  */
210 static inline void job_release(struct job *job)
211 {
212         struct job *ijob, **pjob;
213         const void *group;
214
215         /* first unqueue the job */
216         pjob = &first_job;
217         ijob = first_job;
218         while (ijob != job) {
219                 pjob = &ijob->next;
220                 ijob = ijob->next;
221         }
222         *pjob = job->next;
223
224         /* then unblock jobs of the same group */
225         group = job->group;
226         if (group) {
227                 ijob = job->next;
228                 while (ijob && ijob->group != group)
229                         ijob = ijob->next;
230                 if (ijob)
231                         ijob->blocked = 0;
232         }
233
234         /* recycle the job */
235         job->next = free_jobs;
236         free_jobs = job;
237 }
238
239 /**
240  * Monitored cancel callback for a job.
241  * This function is called by the monitor
242  * to cancel the job when the safe environment
243  * is set.
244  * @param signum 0 on normal flow or the number
245  *               of the signal that interrupted the normal
246  *               flow, isn't used
247  * @param arg    the job to run
248  */
249 static void job_cancel(int signum, void *arg)
250 {
251         struct job *job = arg;
252         job->callback(SIGABRT, job->arg);
253 }
254
255 /**
256  * Monitored normal callback for events.
257  * This function is called by the monitor
258  * to run the event loop when the safe environment
259  * is set.
260  * @param signum 0 on normal flow or the number
261  *               of the signal that interrupted the normal
262  *               flow
263  * @param arg     the events to run
264  */
265 static void evloop_run(int signum, void *arg)
266 {
267         int rc;
268         struct sd_event *se;
269         struct evloop *el = arg;
270
271         if (!signum) {
272                 se = el->sdev;
273                 rc = sd_event_prepare(se);
274                 if (rc < 0) {
275                         errno = -rc;
276                         ERROR("sd_event_prepare returned an error (state: %d): %m", sd_event_get_state(se));
277                 } else {
278                         if (rc == 0) {
279                                 rc = sd_event_wait(se, (uint64_t)(int64_t)-1);
280                                 if (rc < 0) {
281                                         errno = -rc;
282                                         ERROR("sd_event_wait returned an error (state: %d): %m", sd_event_get_state(se));
283                                 }
284                         }
285                         __atomic_and_fetch(&el->state, ~(EVLOOP_STATE_WAIT), __ATOMIC_RELAXED);
286
287                         if (rc > 0) {
288                                 rc = sd_event_dispatch(se);
289                                 if (rc < 0) {
290                                         errno = -rc;
291                                         ERROR("sd_event_dispatch returned an error (state: %d): %m", sd_event_get_state(se));
292                                 }
293                         }
294                 }
295         }
296         __atomic_and_fetch(&el->state, ~(EVLOOP_STATE_WAIT|EVLOOP_STATE_RUN), __ATOMIC_RELAXED);
297 }
298
299
300 /**
301  * Main processing loop of threads processing jobs.
302  * The loop must be called with the mutex locked
303  * and it returns with the mutex locked.
304  * @param me the description of the thread to use
305  * TODO: how are timeout handled when reentering?
306  */
307 static void thread_run(volatile struct thread *me)
308 {
309         struct thread **prv;
310         struct job *job;
311         struct evloop *el;
312
313         /* initialize description of itself and link it in the list */
314         me->tid = pthread_self();
315         me->stop = 0;
316         me->waits = 0;
317         me->upper = current_thread;
318         if (!current_thread) {
319                 started++;
320                 sig_monitor_init_timeouts();
321         }
322         me->next = threads;
323         threads = (struct thread*)me;
324         current_thread = (struct thread*)me;
325
326         /* loop until stopped */
327         while (!me->stop) {
328                 /* release the event loop */
329                 if (current_evloop) {
330                         __atomic_sub_fetch(&current_evloop->state, EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
331                         current_evloop = NULL;
332                 }
333
334                 /* get a job */
335                 job = job_get(first_job);
336                 if (job) {
337                         /* prepare running the job */
338                         remains++; /* increases count of job that can wait */
339                         job->blocked = 1; /* mark job as blocked */
340                         me->job = job; /* record the job (only for terminate) */
341
342                         /* run the job */
343                         pthread_mutex_unlock(&mutex);
344                         sig_monitor(job->timeout, job->callback, job->arg);
345                         pthread_mutex_lock(&mutex);
346
347                         /* release the run job */
348                         job_release(job);
349                 } else {
350                         /* no job, check events */
351                         el = &evloop[0];
352                         if (el->sdev && !__atomic_load_n(&el->state, __ATOMIC_RELAXED)) {
353                                 /* run the events */
354                                 __atomic_store_n(&el->state, EVLOOP_STATE_LOCK|EVLOOP_STATE_RUN|EVLOOP_STATE_WAIT, __ATOMIC_RELAXED);
355                                 current_evloop = el;
356                                 pthread_mutex_unlock(&mutex);
357                                 sig_monitor(0, evloop_run, el);
358                                 pthread_mutex_lock(&mutex);
359                         } else {
360                                 /* no job and not events */
361                                 running--;
362                                 if (!running)
363                                         ERROR("Entering job deep sleep! Check your bindings.");
364                                 me->waits = 1;
365                                 pthread_cond_wait(&cond, &mutex);
366                                 me->waits = 0;
367                                 running++;
368                         }
369                 }
370         }
371
372         /* release the event loop */
373         if (current_evloop) {
374                 __atomic_sub_fetch(&current_evloop->state, EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
375                 current_evloop = NULL;
376         }
377
378         /* unlink the current thread and cleanup */
379         prv = &threads;
380         while (*prv != me)
381                 prv = &(*prv)->next;
382         *prv = me->next;
383         current_thread = me->upper;
384         if (!current_thread) {
385                 sig_monitor_clean_timeouts();
386                 started--;
387         }
388 }
389
390 /**
391  * Entry point for created threads.
392  * @param data not used
393  * @return NULL
394  */
395 static void *thread_main(void *data)
396 {
397         struct thread me;
398
399         pthread_mutex_lock(&mutex);
400         running++;
401         thread_run(&me);
402         running--;
403         pthread_mutex_unlock(&mutex);
404         return NULL;
405 }
406
407 /**
408  * Starts a new thread
409  * @return 0 in case of success or -1 in case of error
410  */
411 static int start_one_thread()
412 {
413         pthread_t tid;
414         int rc;
415
416         rc = pthread_create(&tid, NULL, thread_main, NULL);
417         if (rc != 0) {
418                 /* errno = rc; */
419                 WARNING("not able to start thread: %m");
420                 rc = -1;
421         }
422         return rc;
423 }
424
425 /**
426  * Queues a new asynchronous job represented by 'callback' and 'arg'
427  * for the 'group' and the 'timeout'.
428  * Jobs are queued FIFO and are possibly executed in parallel
429  * concurrently except for job of the same group that are
430  * executed sequentially in FIFO order.
431  * @param group    The group of the job or NULL when no group.
432  * @param timeout  The maximum execution time in seconds of the job
433  *                 or 0 for unlimited time.
434  * @param callback The function to execute for achieving the job.
435  *                 Its first parameter is either 0 on normal flow
436  *                 or the signal number that broke the normal flow.
437  *                 The remaining parameter is the parameter 'arg1'
438  *                 given here.
439  * @param arg      The second argument for 'callback'
440  * @return 0 in case of success or -1 in case of error
441  */
442 int jobs_queue(
443                 const void *group,
444                 int timeout,
445                 void (*callback)(int, void*),
446                 void *arg)
447 {
448         const char *info;
449         struct job *job;
450         int rc;
451
452         pthread_mutex_lock(&mutex);
453
454         /* allocates the job */
455         job = job_create(group, timeout, callback, arg);
456         if (!job) {
457                 errno = ENOMEM;
458                 info = "out of memory";
459                 goto error;
460         }
461
462         /* check availability */
463         if (remains == 0) {
464                 errno = EBUSY;
465                 info = "too many jobs";
466                 goto error2;
467         }
468
469         /* start a thread if needed */
470         if (running == started && started < allowed) {
471                 /* all threads are busy and a new can be started */
472                 rc = start_one_thread();
473                 if (rc < 0 && started == 0) {
474                         info = "can't start first thread";
475                         goto error2;
476                 }
477         }
478
479         /* queues the job */
480         remains--;
481         job_add(job);
482
483         /* signal an existing job */
484         pthread_cond_signal(&cond);
485         pthread_mutex_unlock(&mutex);
486         return 0;
487
488 error2:
489         job->next = free_jobs;
490         free_jobs = job;
491 error:
492         ERROR("can't process job with threads: %s, %m", info);
493         pthread_mutex_unlock(&mutex);
494         return -1;
495 }
496
497 /**
498  * Internal helper function for 'jobs_enter'.
499  * @see jobs_enter, jobs_leave
500  */
501 static void enter_cb(int signum, void *closure)
502 {
503         struct sync *sync = closure;
504         sync->enter(signum, sync->arg, (void*)&sync->thread);
505 }
506
507 /**
508  * Internal helper function for 'jobs_call'.
509  * @see jobs_call
510  */
511 static void call_cb(int signum, void *closure)
512 {
513         struct sync *sync = closure;
514         sync->callback(signum, sync->arg);
515         jobs_leave((void*)&sync->thread);
516 }
517
518 /**
519  * Internal helper for synchronous jobs. It enters
520  * a new thread loop for evaluating the given job
521  * as recorded by the couple 'sync_cb' and 'sync'.
522  * @see jobs_call, jobs_enter, jobs_leave
523  */
524 static int do_sync(
525                 const void *group,
526                 int timeout,
527                 void (*sync_cb)(int signum, void *closure),
528                 struct sync *sync
529 )
530 {
531         struct job *job;
532
533         pthread_mutex_lock(&mutex);
534
535         /* allocates the job */
536         job = job_create(group, timeout, sync_cb, sync);
537         if (!job) {
538                 ERROR("out of memory");
539                 errno = ENOMEM;
540                 pthread_mutex_unlock(&mutex);
541                 return -1;
542         }
543
544         /* queues the job */
545         job_add(job);
546
547         /* run until stopped */
548         thread_run(&sync->thread);
549         pthread_mutex_unlock(&mutex);
550         return 0;
551 }
552
553 /**
554  * Enter a synchronisation point: activates the job given by 'callback'
555  * and 'closure' using 'group' and 'timeout' to control sequencing and
556  * execution time.
557  * @param group the group for sequencing jobs
558  * @param timeout the time in seconds allocated to the job
559  * @param callback the callback that will handle the job.
560  *                 it receives 3 parameters: 'signum' that will be 0
561  *                 on normal flow or the catched signal number in case
562  *                 of interrupted flow, the context 'closure' as given and
563  *                 a 'jobloop' reference that must be used when the job is
564  *                 terminated to unlock the current execution flow.
565  * @param arg the argument to the callback
566  * @return 0 on success or -1 in case of error
567  */
568 int jobs_enter(
569                 const void *group,
570                 int timeout,
571                 void (*callback)(int signum, void *closure, struct jobloop *jobloop),
572                 void *closure
573 )
574 {
575         struct sync sync;
576
577         sync.enter = callback;
578         sync.arg = closure;
579         return do_sync(group, timeout, enter_cb, &sync);
580 }
581
582 /**
583  * Unlocks the execution flow designed by 'jobloop'.
584  * @param jobloop indication of the flow to unlock
585  * @return 0 in case of success of -1 on error
586  */
587 int jobs_leave(struct jobloop *jobloop)
588 {
589         struct thread *t;
590
591         pthread_mutex_lock(&mutex);
592         t = threads;
593         while (t && t != (struct thread*)jobloop)
594                 t = t->next;
595         if (!t) {
596                 errno = EINVAL;
597         } else {
598                 t->stop = 1;
599                 if (t->waits)
600                         pthread_cond_broadcast(&cond);
601         }
602         pthread_mutex_unlock(&mutex);
603         return -!t;
604 }
605
606 /**
607  * Calls synchronously the job represented by 'callback' and 'arg1'
608  * for the 'group' and the 'timeout' and waits for its completion.
609  * @param group    The group of the job or NULL when no group.
610  * @param timeout  The maximum execution time in seconds of the job
611  *                 or 0 for unlimited time.
612  * @param callback The function to execute for achieving the job.
613  *                 Its first parameter is either 0 on normal flow
614  *                 or the signal number that broke the normal flow.
615  *                 The remaining parameter is the parameter 'arg1'
616  *                 given here.
617  * @param arg      The second argument for 'callback'
618  * @return 0 in case of success or -1 in case of error
619  */
620 int jobs_call(
621                 const void *group,
622                 int timeout,
623                 void (*callback)(int, void*),
624                 void *arg)
625 {
626         struct sync sync;
627
628         sync.callback = callback;
629         sync.arg = arg;
630
631         return do_sync(group, timeout, call_cb, &sync);
632 }
633
634 /**
635  * Internal callback for evloop management.
636  * The effect of this function is hidden: it exits
637  * the waiting poll if any. Then it wakes up a thread
638  * awaiting the evloop using signal.
639  */
640 static int on_evloop_efd(sd_event_source *s, int fd, uint32_t revents, void *userdata)
641 {
642         uint64_t x;
643         struct evloop *evloop = userdata;
644         read(evloop->efd, &x, sizeof x);
645         pthread_mutex_lock(&mutex);
646         pthread_cond_broadcast(&evloop->cond);  
647         pthread_mutex_unlock(&mutex);
648         return 1;
649 }
650
651 /**
652  * Gets a sd_event item for the current thread.
653  * @return a sd_event or NULL in case of error
654  */
655 struct sd_event *jobs_get_sd_event()
656 {
657         struct evloop *el;
658         uint64_t x;
659         int rc;
660
661         pthread_mutex_lock(&mutex);
662
663         /* creates the evloop on need */
664         el = &evloop[0];
665         if (!el->sdev) {
666                 /* start the creation */
667                 el->state = 0;
668                 /* creates the eventfd for waking up polls */
669                 el->efd = eventfd(0, EFD_CLOEXEC);
670                 if (el->efd < 0) {
671                         ERROR("can't make eventfd for events");
672                         goto error1;
673                 }
674                 /* create the systemd event loop */
675                 rc = sd_event_new(&el->sdev);
676                 if (rc < 0) {
677                         ERROR("can't make new event loop");
678                         goto error2;
679                 }
680                 /* put the eventfd in the event loop */
681                 rc = sd_event_add_io(el->sdev, NULL, el->efd, EPOLLIN, on_evloop_efd, el);
682                 if (rc < 0) {
683                         ERROR("can't register eventfd");
684                         sd_event_unref(el->sdev);
685                         el->sdev = NULL;
686 error2:
687                         close(el->efd);
688 error1:
689                         pthread_mutex_unlock(&mutex);
690                         return NULL;
691                 }
692         }
693
694         /* attach the event loop to the current thread */
695         if (current_evloop != el) {
696                 if (current_evloop)
697                         __atomic_sub_fetch(&current_evloop->state, EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
698                 current_evloop = el;
699                 __atomic_add_fetch(&el->state, EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
700         }
701
702         /* wait for a modifiable event loop */
703         while (__atomic_load_n(&el->state, __ATOMIC_RELAXED) & EVLOOP_STATE_WAIT) {
704                 x = 1;
705                 write(el->efd, &x, sizeof x);
706                 pthread_cond_wait(&el->cond, &mutex);
707         }
708
709         pthread_mutex_unlock(&mutex);
710         return el->sdev;
711 }
712
713 /**
714  * Enter the jobs processing loop.
715  * @param allowed_count Maximum count of thread for jobs including this one
716  * @param start_count   Count of thread to start now, must be lower.
717  * @param waiter_count  Maximum count of jobs that can be waiting.
718  * @param start         The start routine to activate (can't be NULL)
719  * @return 0 in case of success or -1 in case of error.
720  */
721 int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)(int signum))
722 {
723         int rc, launched;
724         struct thread me;
725         struct job *job;
726
727         assert(allowed_count >= 1);
728         assert(start_count >= 0);
729         assert(waiter_count > 0);
730         assert(start_count <= allowed_count);
731
732         rc = -1;
733         pthread_mutex_lock(&mutex);
734
735         /* check whether already running */
736         if (current_thread || allowed) {
737                 ERROR("thread already started");
738                 errno = EINVAL;
739                 goto error;
740         }
741
742         /* start */
743         if (sig_monitor_init() < 0) {
744                 ERROR("failed to initialise signal handlers");
745                 goto error;
746         }
747
748         /* records the allowed count */
749         allowed = allowed_count;
750         started = 0;
751         running = 0;
752         remains = waiter_count;
753
754         /* start at least one thread */
755         launched = 0;
756         while ((launched + 1) < start_count) {
757                 if (start_one_thread() != 0) {
758                         ERROR("Not all threads can be started");
759                         goto error;
760                 }
761                 launched++;
762         }
763
764         /* queue the start job */
765         job = job_create(NULL, 0, (job_cb_t)start, NULL);
766         if (!job) {
767                 ERROR("out of memory");
768                 errno = ENOMEM;
769                 goto error;
770         }
771         job_add(job);
772         remains--;
773
774         /* run until end */
775         thread_run(&me);
776         rc = 0;
777 error:
778         pthread_mutex_unlock(&mutex);
779         return rc;
780 }
781
782 /**
783  * Terminate all the threads and cancel all pending jobs.
784  */
785 void jobs_terminate()
786 {
787         struct job *job, *head, *tail;
788         pthread_t me, *others;
789         struct thread *t;
790         int count;
791
792         /* how am i? */
793         me = pthread_self();
794
795         /* request all threads to stop */
796         pthread_mutex_lock(&mutex);
797         allowed = 0;
798
799         /* count the number of threads */
800         count = 0;
801         t = threads;
802         while (t) {
803                 if (!t->upper && !pthread_equal(t->tid, me))
804                         count++;
805                 t = t->next;
806         }
807
808         /* fill the array of threads */
809         others = alloca(count * sizeof *others);
810         count = 0;
811         t = threads;
812         while (t) {
813                 if (!t->upper && !pthread_equal(t->tid, me))
814                         others[count++] = t->tid;
815                 t = t->next;
816         }
817
818         /* stops the threads */
819         t = threads;
820         while (t) {
821                 t->stop = 1;
822                 t = t->next;
823         }
824
825         /* wait the threads */
826         pthread_cond_broadcast(&cond);
827         pthread_mutex_unlock(&mutex);
828         while (count)
829                 pthread_join(others[--count], NULL);
830         pthread_mutex_lock(&mutex);
831
832         /* cancel pending jobs of other threads */
833         remains = 0;
834         head = first_job;
835         first_job = NULL;
836         tail = NULL;
837         while (head) {
838                 /* unlink the job */
839                 job = head;
840                 head = job->next;
841
842                 /* search if job is stacked for current */
843                 t = current_thread;
844                 while (t && t->job != job)
845                         t = t->upper;
846                 if (t) {
847                         /* yes, relink it at end */
848                         if (tail)
849                                 tail->next = job;
850                         else
851                                 first_job = job;
852                         tail = job;
853                         job->next = NULL;
854                 } else {
855                         /* no cancel the job */
856                         pthread_mutex_unlock(&mutex);
857                         sig_monitor(0, job_cancel, job);
858                         free(job);
859                         pthread_mutex_lock(&mutex);
860                 }
861         }
862         pthread_mutex_unlock(&mutex);
863 }
864