jobs: Cosmetic changes
[src/app-framework-binder.git] / src / jobs.c
1 /*
2  * Copyright (C) 2016, 2017, 2018 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #if defined(NO_JOBS_WATCHDOG)
21 #   define HAS_WATCHDOG 0
22 #else
23 #   define HAS_WATCHDOG 1
24 #endif
25
26 #include <stdlib.h>
27 #include <stdint.h>
28 #include <unistd.h>
29 #include <signal.h>
30 #include <time.h>
31 #include <sys/syscall.h>
32 #include <pthread.h>
33 #include <errno.h>
34 #include <assert.h>
35 #include <sys/eventfd.h>
36
37 #include <systemd/sd-event.h>
38 #if HAS_WATCHDOG
39 #include <systemd/sd-daemon.h>
40 #endif
41
42 #include "jobs.h"
43 #include "sig-monitor.h"
44 #include "verbose.h"
45
46 #if 0
47 #define _alert_ "do you really want to remove signal monitoring?"
48 #define sig_monitor_init_timeouts()  ((void)0)
49 #define sig_monitor_clean_timeouts() ((void)0)
50 #define sig_monitor(to,cb,arg)       (cb(0,arg))
51 #endif
52
53 #define EVENT_TIMEOUT_TOP       ((uint64_t)-1)
54 #define EVENT_TIMEOUT_CHILD     ((uint64_t)10000)
55
56 /** Internal shortcut for callback */
57 typedef void (*job_cb_t)(int, void*);
58
59 /** Description of a pending job */
60 struct job
61 {
62         struct job *next;    /**< link to the next job enqueued */
63         const void *group;   /**< group of the request */
64         job_cb_t callback;   /**< processing callback */
65         void *arg;           /**< argument */
66         int timeout;         /**< timeout in second for processing the request */
67         unsigned blocked: 1; /**< is an other request blocking this one ? */
68         unsigned dropped: 1; /**< is removed ? */
69 };
70
71 /** Description of handled event loops */
72 struct evloop
73 {
74         unsigned state;        /**< encoded state */
75         int efd;               /**< event notification */
76         struct sd_event *sdev; /**< the systemd event loop */
77         pthread_cond_t  cond;  /**< condition */
78 };
79
80 #define EVLOOP_STATE_WAIT           1U
81 #define EVLOOP_STATE_RUN            2U
82 #define EVLOOP_STATE_LOCK           4U
83
84 /** Description of threads */
85 struct thread
86 {
87         struct thread *next;   /**< next thread of the list */
88         struct thread *upper;  /**< upper same thread */
89         struct job *job;       /**< currently processed job */
90         pthread_t tid;         /**< the thread id */
91         unsigned stop: 1;      /**< stop requested */
92         unsigned waits: 1;     /**< is waiting? */
93 };
94
95 /**
96  * Description of synchonous callback
97  */
98 struct sync
99 {
100         struct thread thread;   /**< thread loop data */
101         union {
102                 void (*callback)(int, void*);   /**< the synchronous callback */
103                 void (*enter)(int signum, void *closure, struct jobloop *jobloop);
104                                 /**< the entering synchronous routine */
105         };
106         void *arg;              /**< the argument of the callback */
107 };
108
109
110 /* synchronisation of threads */
111 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
112 static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
113
114 /* count allowed, started and running threads */
115 static int allowed = 0; /** allowed count of threads */
116 static int started = 0; /** started count of threads */
117 static int running = 0; /** running count of threads */
118 static int remains = 0; /** allowed count of waiting jobs */
119
120 /* list of threads */
121 static struct thread *threads;
122 static _Thread_local struct thread *current_thread;
123 static _Thread_local struct evloop *current_evloop;
124
125 /* queue of pending jobs */
126 static struct job *first_job;
127 static struct job *free_jobs;
128
129 /* event loop */
130 static struct evloop evloop[1];
131
132 /**
133  * Create a new job with the given parameters
134  * @param group    the group of the job
135  * @param timeout  the timeout of the job (0 if none)
136  * @param callback the function that achieves the job
137  * @param arg      the argument of the callback
138  * @return the created job unblock or NULL when no more memory
139  */
140 static struct job *job_create(
141                 const void *group,
142                 int timeout,
143                 job_cb_t callback,
144                 void *arg)
145 {
146         struct job *job;
147
148         /* try recyle existing job */
149         job = free_jobs;
150         if (job)
151                 free_jobs = job->next;
152         else {
153                 /* allocation without blocking */
154                 pthread_mutex_unlock(&mutex);
155                 job = malloc(sizeof *job);
156                 pthread_mutex_lock(&mutex);
157                 if (!job) {
158                         errno = -ENOMEM;
159                         goto end;
160                 }
161         }
162         /* initialises the job */
163         job->group = group;
164         job->timeout = timeout;
165         job->callback = callback;
166         job->arg = arg;
167         job->blocked = 0;
168         job->dropped = 0;
169 end:
170         return job;
171 }
172
173 /**
174  * Adds 'job' at the end of the list of jobs, marking it
175  * as blocked if an other job with the same group is pending.
176  * @param job the job to add
177  */
178 static void job_add(struct job *job)
179 {
180         const void *group;
181         struct job *ijob, **pjob;
182
183         /* prepare to add */
184         group = job->group;
185         job->next = NULL;
186
187         /* search end and blockers */
188         pjob = &first_job;
189         ijob = first_job;
190         while (ijob) {
191                 if (group && ijob->group == group)
192                         job->blocked = 1;
193                 pjob = &ijob->next;
194                 ijob = ijob->next;
195         }
196
197         /* queue the jobs */
198         *pjob = job;
199 }
200
201 /**
202  * Get the next job to process or NULL if none.
203  * @return the first job that isn't blocked or NULL
204  */
205 static inline struct job *job_get()
206 {
207         struct job *job = first_job;
208         while (job && job->blocked)
209                 job = job->next;
210         return job;
211 }
212
213 /**
214  * Releases the processed 'job': removes it
215  * from the list of jobs and unblock the first
216  * pending job of the same group if any.
217  * @param job the job to release
218  */
219 static inline void job_release(struct job *job)
220 {
221         struct job *ijob, **pjob;
222         const void *group;
223
224         /* first unqueue the job */
225         pjob = &first_job;
226         ijob = first_job;
227         while (ijob != job) {
228                 pjob = &ijob->next;
229                 ijob = ijob->next;
230         }
231         *pjob = job->next;
232
233         /* then unblock jobs of the same group */
234         group = job->group;
235         if (group) {
236                 ijob = job->next;
237                 while (ijob && ijob->group != group)
238                         ijob = ijob->next;
239                 if (ijob)
240                         ijob->blocked = 0;
241         }
242
243         /* recycle the job */
244         job->next = free_jobs;
245         free_jobs = job;
246 }
247
248 /**
249  * Monitored cancel callback for a job.
250  * This function is called by the monitor
251  * to cancel the job when the safe environment
252  * is set.
253  * @param signum 0 on normal flow or the number
254  *               of the signal that interrupted the normal
255  *               flow, isn't used
256  * @param arg    the job to run
257  */
258 static void job_cancel(int signum, void *arg)
259 {
260         struct job *job = arg;
261         job->callback(SIGABRT, job->arg);
262 }
263
264 /**
265  * Monitored normal callback for events.
266  * This function is called by the monitor
267  * to run the event loop when the safe environment
268  * is set.
269  * @param signum 0 on normal flow or the number
270  *               of the signal that interrupted the normal
271  *               flow
272  * @param arg     the events to run
273  */
274 static void evloop_run(int signum, void *arg)
275 {
276         int rc;
277         struct sd_event *se;
278         struct evloop *el = arg;
279
280         if (!signum) {
281                 se = el->sdev;
282                 rc = sd_event_prepare(se);
283                 if (rc < 0) {
284                         errno = -rc;
285                         ERROR("sd_event_prepare returned an error (state: %d): %m", sd_event_get_state(se));
286                 } else {
287                         if (rc == 0) {
288                                 rc = sd_event_wait(se, (uint64_t)(int64_t)-1);
289                                 if (rc < 0) {
290                                         errno = -rc;
291                                         ERROR("sd_event_wait returned an error (state: %d): %m", sd_event_get_state(se));
292                                 }
293                         }
294                         __atomic_and_fetch(&el->state, ~(EVLOOP_STATE_WAIT), __ATOMIC_RELAXED);
295
296                         if (rc > 0) {
297                                 rc = sd_event_dispatch(se);
298                                 if (rc < 0) {
299                                         errno = -rc;
300                                         ERROR("sd_event_dispatch returned an error (state: %d): %m", sd_event_get_state(se));
301                                 }
302                         }
303                 }
304         }
305         __atomic_and_fetch(&el->state, ~(EVLOOP_STATE_WAIT|EVLOOP_STATE_RUN), __ATOMIC_RELAXED);
306 }
307
308
309 /**
310  * Main processing loop of threads processing jobs.
311  * The loop must be called with the mutex locked
312  * and it returns with the mutex locked.
313  * @param me the description of the thread to use
314  * TODO: how are timeout handled when reentering?
315  */
316 static void thread_run(volatile struct thread *me)
317 {
318         struct thread **prv;
319         struct job *job;
320         struct evloop *el;
321
322         /* initialize description of itself and link it in the list */
323         me->tid = pthread_self();
324         me->stop = 0;
325         me->waits = 0;
326         me->upper = current_thread;
327         if (!current_thread) {
328                 started++;
329                 sig_monitor_init_timeouts();
330         }
331         me->next = threads;
332         threads = (struct thread*)me;
333         current_thread = (struct thread*)me;
334
335         /* loop until stopped */
336         while (!me->stop) {
337                 /* release the event loop */
338                 if (current_evloop) {
339                         __atomic_sub_fetch(&current_evloop->state, EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
340                         current_evloop = NULL;
341                 }
342
343                 /* get a job */
344                 job = job_get(first_job);
345                 if (job) {
346                         /* prepare running the job */
347                         remains++; /* increases count of job that can wait */
348                         job->blocked = 1; /* mark job as blocked */
349                         me->job = job; /* record the job (only for terminate) */
350
351                         /* run the job */
352                         pthread_mutex_unlock(&mutex);
353                         sig_monitor(job->timeout, job->callback, job->arg);
354                         pthread_mutex_lock(&mutex);
355
356                         /* release the run job */
357                         job_release(job);
358                 } else {
359                         /* no job, check events */
360                         el = &evloop[0];
361                         if (el->sdev && !__atomic_load_n(&el->state, __ATOMIC_RELAXED)) {
362                                 /* run the events */
363                                 __atomic_store_n(&el->state, EVLOOP_STATE_LOCK|EVLOOP_STATE_RUN|EVLOOP_STATE_WAIT, __ATOMIC_RELAXED);
364                                 current_evloop = el;
365                                 pthread_mutex_unlock(&mutex);
366                                 sig_monitor(0, evloop_run, el);
367                                 pthread_mutex_lock(&mutex);
368                         } else {
369                                 /* no job and not events */
370                                 running--;
371                                 if (!running)
372                                         ERROR("Entering job deep sleep! Check your bindings.");
373                                 me->waits = 1;
374                                 pthread_cond_wait(&cond, &mutex);
375                                 me->waits = 0;
376                                 running++;
377                         }
378                 }
379         }
380
381         /* release the event loop */
382         if (current_evloop) {
383                 __atomic_sub_fetch(&current_evloop->state, EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
384                 current_evloop = NULL;
385         }
386
387         /* unlink the current thread and cleanup */
388         prv = &threads;
389         while (*prv != me)
390                 prv = &(*prv)->next;
391         *prv = me->next;
392         current_thread = me->upper;
393         if (!current_thread) {
394                 sig_monitor_clean_timeouts();
395                 started--;
396         }
397 }
398
399 /**
400  * Entry point for created threads.
401  * @param data not used
402  * @return NULL
403  */
404 static void *thread_main(void *data)
405 {
406         struct thread me;
407
408         pthread_mutex_lock(&mutex);
409         running++;
410         thread_run(&me);
411         running--;
412         pthread_mutex_unlock(&mutex);
413         return NULL;
414 }
415
416 /**
417  * Starts a new thread
418  * @return 0 in case of success or -1 in case of error
419  */
420 static int start_one_thread()
421 {
422         pthread_t tid;
423         int rc;
424
425         rc = pthread_create(&tid, NULL, thread_main, NULL);
426         if (rc != 0) {
427                 /* errno = rc; */
428                 WARNING("not able to start thread: %m");
429                 rc = -1;
430         }
431         return rc;
432 }
433
434 /**
435  * Queues a new asynchronous job represented by 'callback' and 'arg'
436  * for the 'group' and the 'timeout'.
437  * Jobs are queued FIFO and are possibly executed in parallel
438  * concurrently except for job of the same group that are
439  * executed sequentially in FIFO order.
440  * @param group    The group of the job or NULL when no group.
441  * @param timeout  The maximum execution time in seconds of the job
442  *                 or 0 for unlimited time.
443  * @param callback The function to execute for achieving the job.
444  *                 Its first parameter is either 0 on normal flow
445  *                 or the signal number that broke the normal flow.
446  *                 The remaining parameter is the parameter 'arg1'
447  *                 given here.
448  * @param arg      The second argument for 'callback'
449  * @return 0 in case of success or -1 in case of error
450  */
451 int jobs_queue(
452                 const void *group,
453                 int timeout,
454                 void (*callback)(int, void*),
455                 void *arg)
456 {
457         const char *info;
458         struct job *job;
459         int rc;
460
461         pthread_mutex_lock(&mutex);
462
463         /* allocates the job */
464         job = job_create(group, timeout, callback, arg);
465         if (!job) {
466                 errno = ENOMEM;
467                 info = "out of memory";
468                 goto error;
469         }
470
471         /* check availability */
472         if (remains == 0) {
473                 errno = EBUSY;
474                 info = "too many jobs";
475                 goto error2;
476         }
477
478         /* start a thread if needed */
479         if (running == started && started < allowed) {
480                 /* all threads are busy and a new can be started */
481                 rc = start_one_thread();
482                 if (rc < 0 && started == 0) {
483                         info = "can't start first thread";
484                         goto error2;
485                 }
486         }
487
488         /* queues the job */
489         remains--;
490         job_add(job);
491
492         /* signal an existing job */
493         pthread_cond_signal(&cond);
494         pthread_mutex_unlock(&mutex);
495         return 0;
496
497 error2:
498         job->next = free_jobs;
499         free_jobs = job;
500 error:
501         ERROR("can't process job with threads: %s, %m", info);
502         pthread_mutex_unlock(&mutex);
503         return -1;
504 }
505
506 /**
507  * Internal helper function for 'jobs_enter'.
508  * @see jobs_enter, jobs_leave
509  */
510 static void enter_cb(int signum, void *closure)
511 {
512         struct sync *sync = closure;
513         sync->enter(signum, sync->arg, (void*)&sync->thread);
514 }
515
516 /**
517  * Internal helper function for 'jobs_call'.
518  * @see jobs_call
519  */
520 static void call_cb(int signum, void *closure)
521 {
522         struct sync *sync = closure;
523         sync->callback(signum, sync->arg);
524         jobs_leave((void*)&sync->thread);
525 }
526
527 /**
528  * Internal helper for synchronous jobs. It enters
529  * a new thread loop for evaluating the given job
530  * as recorded by the couple 'sync_cb' and 'sync'.
531  * @see jobs_call, jobs_enter, jobs_leave
532  */
533 static int do_sync(
534                 const void *group,
535                 int timeout,
536                 void (*sync_cb)(int signum, void *closure),
537                 struct sync *sync
538 )
539 {
540         struct job *job;
541
542         pthread_mutex_lock(&mutex);
543
544         /* allocates the job */
545         job = job_create(group, timeout, sync_cb, sync);
546         if (!job) {
547                 ERROR("out of memory");
548                 errno = ENOMEM;
549                 pthread_mutex_unlock(&mutex);
550                 return -1;
551         }
552
553         /* queues the job */
554         job_add(job);
555
556         /* run until stopped */
557         thread_run(&sync->thread);
558         pthread_mutex_unlock(&mutex);
559         return 0;
560 }
561
562 /**
563  * Enter a synchronisation point: activates the job given by 'callback'
564  * and 'closure' using 'group' and 'timeout' to control sequencing and
565  * execution time.
566  * @param group the group for sequencing jobs
567  * @param timeout the time in seconds allocated to the job
568  * @param callback the callback that will handle the job.
569  *                 it receives 3 parameters: 'signum' that will be 0
570  *                 on normal flow or the catched signal number in case
571  *                 of interrupted flow, the context 'closure' as given and
572  *                 a 'jobloop' reference that must be used when the job is
573  *                 terminated to unlock the current execution flow.
574  * @param arg the argument to the callback
575  * @return 0 on success or -1 in case of error
576  */
577 int jobs_enter(
578                 const void *group,
579                 int timeout,
580                 void (*callback)(int signum, void *closure, struct jobloop *jobloop),
581                 void *closure
582 )
583 {
584         struct sync sync;
585
586         sync.enter = callback;
587         sync.arg = closure;
588         return do_sync(group, timeout, enter_cb, &sync);
589 }
590
591 /**
592  * Unlocks the execution flow designed by 'jobloop'.
593  * @param jobloop indication of the flow to unlock
594  * @return 0 in case of success of -1 on error
595  */
596 int jobs_leave(struct jobloop *jobloop)
597 {
598         struct thread *t;
599
600         pthread_mutex_lock(&mutex);
601         t = threads;
602         while (t && t != (struct thread*)jobloop)
603                 t = t->next;
604         if (!t) {
605                 errno = EINVAL;
606         } else {
607                 t->stop = 1;
608                 if (t->waits)
609                         pthread_cond_broadcast(&cond);
610         }
611         pthread_mutex_unlock(&mutex);
612         return -!t;
613 }
614
615 /**
616  * Calls synchronously the job represented by 'callback' and 'arg1'
617  * for the 'group' and the 'timeout' and waits for its completion.
618  * @param group    The group of the job or NULL when no group.
619  * @param timeout  The maximum execution time in seconds of the job
620  *                 or 0 for unlimited time.
621  * @param callback The function to execute for achieving the job.
622  *                 Its first parameter is either 0 on normal flow
623  *                 or the signal number that broke the normal flow.
624  *                 The remaining parameter is the parameter 'arg1'
625  *                 given here.
626  * @param arg      The second argument for 'callback'
627  * @return 0 in case of success or -1 in case of error
628  */
629 int jobs_call(
630                 const void *group,
631                 int timeout,
632                 void (*callback)(int, void*),
633                 void *arg)
634 {
635         struct sync sync;
636
637         sync.callback = callback;
638         sync.arg = arg;
639
640         return do_sync(group, timeout, call_cb, &sync);
641 }
642
643 /**
644  * Internal callback for evloop management.
645  * The effect of this function is hidden: it exits
646  * the waiting poll if any. Then it wakes up a thread
647  * awaiting the evloop using signal.
648  */
649 static int on_evloop_efd(sd_event_source *s, int fd, uint32_t revents, void *userdata)
650 {
651         uint64_t x;
652         struct evloop *evloop = userdata;
653         read(evloop->efd, &x, sizeof x);
654         pthread_mutex_lock(&mutex);
655         pthread_cond_broadcast(&evloop->cond);  
656         pthread_mutex_unlock(&mutex);
657         return 1;
658 }
659
660 /**
661  * Gets a sd_event item for the current thread.
662  * @return a sd_event or NULL in case of error
663  */
664 static struct sd_event *get_sd_event_locked()
665 {
666         struct evloop *el;
667         uint64_t x;
668         int rc;
669
670         /* creates the evloop on need */
671         el = &evloop[0];
672         if (!el->sdev) {
673                 /* start the creation */
674                 el->state = 0;
675                 /* creates the eventfd for waking up polls */
676                 el->efd = eventfd(0, EFD_CLOEXEC);
677                 if (el->efd < 0) {
678                         ERROR("can't make eventfd for events");
679                         goto error1;
680                 }
681                 /* create the systemd event loop */
682                 rc = sd_event_new(&el->sdev);
683                 if (rc < 0) {
684                         ERROR("can't make new event loop");
685                         goto error2;
686                 }
687                 /* put the eventfd in the event loop */
688                 rc = sd_event_add_io(el->sdev, NULL, el->efd, EPOLLIN, on_evloop_efd, el);
689                 if (rc < 0) {
690                         ERROR("can't register eventfd");
691                         sd_event_unref(el->sdev);
692                         el->sdev = NULL;
693 error2:
694                         close(el->efd);
695 error1:
696                         return NULL;
697                 }
698         }
699
700         /* attach the event loop to the current thread */
701         if (current_evloop != el) {
702                 if (current_evloop)
703                         __atomic_sub_fetch(&current_evloop->state, EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
704                 current_evloop = el;
705                 __atomic_add_fetch(&el->state, EVLOOP_STATE_LOCK, __ATOMIC_RELAXED);
706         }
707
708         /* wait for a modifiable event loop */
709         while (__atomic_load_n(&el->state, __ATOMIC_RELAXED) & EVLOOP_STATE_WAIT) {
710                 x = 1;
711                 write(el->efd, &x, sizeof x);
712                 pthread_cond_wait(&el->cond, &mutex);
713         }
714
715         return el->sdev;
716 }
717
718 /**
719  * Gets a sd_event item for the current thread.
720  * @return a sd_event or NULL in case of error
721  */
722 struct sd_event *jobs_get_sd_event()
723 {
724         struct sd_event *result;
725
726         pthread_mutex_lock(&mutex);
727         result = get_sd_event_locked();
728         pthread_mutex_unlock(&mutex);
729
730         return result;
731 }
732
733 /**
734  * Enter the jobs processing loop.
735  * @param allowed_count Maximum count of thread for jobs including this one
736  * @param start_count   Count of thread to start now, must be lower.
737  * @param waiter_count  Maximum count of jobs that can be waiting.
738  * @param start         The start routine to activate (can't be NULL)
739  * @return 0 in case of success or -1 in case of error.
740  */
741 int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)(int signum, void* arg), void *arg)
742 {
743         int rc, launched;
744         struct thread me;
745         struct job *job;
746
747         assert(allowed_count >= 1);
748         assert(start_count >= 0);
749         assert(waiter_count > 0);
750         assert(start_count <= allowed_count);
751
752         rc = -1;
753         pthread_mutex_lock(&mutex);
754
755         /* check whether already running */
756         if (current_thread || allowed) {
757                 ERROR("thread already started");
758                 errno = EINVAL;
759                 goto error;
760         }
761
762         /* start */
763         if (sig_monitor_init() < 0) {
764                 ERROR("failed to initialise signal handlers");
765                 goto error;
766         }
767
768         /* records the allowed count */
769         allowed = allowed_count;
770         started = 0;
771         running = 0;
772         remains = waiter_count;
773
774 #if HAS_WATCHDOG
775         /* set the watchdog */
776         if (sd_watchdog_enabled(0, NULL))
777                 sd_event_set_watchdog(get_sd_event_locked(), 1);
778 #endif
779
780         /* start at least one thread */
781         launched = 0;
782         while ((launched + 1) < start_count) {
783                 if (start_one_thread() != 0) {
784                         ERROR("Not all threads can be started");
785                         goto error;
786                 }
787                 launched++;
788         }
789
790         /* queue the start job */
791         job = job_create(NULL, 0, start, arg);
792         if (!job) {
793                 ERROR("out of memory");
794                 errno = ENOMEM;
795                 goto error;
796         }
797         job_add(job);
798         remains--;
799
800         /* run until end */
801         thread_run(&me);
802         rc = 0;
803 error:
804         pthread_mutex_unlock(&mutex);
805         return rc;
806 }
807
808 /**
809  * Terminate all the threads and cancel all pending jobs.
810  */
811 void jobs_terminate()
812 {
813         struct job *job, *head, *tail;
814         pthread_t me, *others;
815         struct thread *t;
816         int count;
817
818         /* how am i? */
819         me = pthread_self();
820
821         /* request all threads to stop */
822         pthread_mutex_lock(&mutex);
823         allowed = 0;
824
825         /* count the number of threads */
826         count = 0;
827         t = threads;
828         while (t) {
829                 if (!t->upper && !pthread_equal(t->tid, me))
830                         count++;
831                 t = t->next;
832         }
833
834         /* fill the array of threads */
835         others = alloca(count * sizeof *others);
836         count = 0;
837         t = threads;
838         while (t) {
839                 if (!t->upper && !pthread_equal(t->tid, me))
840                         others[count++] = t->tid;
841                 t = t->next;
842         }
843
844         /* stops the threads */
845         t = threads;
846         while (t) {
847                 t->stop = 1;
848                 t = t->next;
849         }
850
851         /* wait the threads */
852         pthread_cond_broadcast(&cond);
853         pthread_mutex_unlock(&mutex);
854         while (count)
855                 pthread_join(others[--count], NULL);
856         pthread_mutex_lock(&mutex);
857
858         /* cancel pending jobs of other threads */
859         remains = 0;
860         head = first_job;
861         first_job = NULL;
862         tail = NULL;
863         while (head) {
864                 /* unlink the job */
865                 job = head;
866                 head = job->next;
867
868                 /* search if job is stacked for current */
869                 t = current_thread;
870                 while (t && t->job != job)
871                         t = t->upper;
872                 if (t) {
873                         /* yes, relink it at end */
874                         if (tail)
875                                 tail->next = job;
876                         else
877                                 first_job = job;
878                         tail = job;
879                         job->next = NULL;
880                 } else {
881                         /* no cancel the job */
882                         pthread_mutex_unlock(&mutex);
883                         sig_monitor(0, job_cancel, job);
884                         free(job);
885                         pthread_mutex_lock(&mutex);
886                 }
887         }
888         pthread_mutex_unlock(&mutex);
889 }
890