jobs: Fix possible race condition
[src/app-framework-binder.git] / src / jobs.c
1 /*
2  * Copyright (C) 2016, 2017 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #include <stdlib.h>
21 #include <stdint.h>
22 #include <unistd.h>
23 #include <signal.h>
24 #include <time.h>
25 #include <sys/syscall.h>
26 #include <pthread.h>
27 #include <errno.h>
28 #include <assert.h>
29 #include <sys/eventfd.h>
30
31 #include <systemd/sd-event.h>
32
33 #include "jobs.h"
34 #include "sig-monitor.h"
35 #include "verbose.h"
36
37 #if 0
38 #define _alert_ "do you really want to remove monitoring?"
39 #define sig_monitor_init_timeouts()  ((void)0)
40 #define sig_monitor_clean_timeouts() ((void)0)
41 #define sig_monitor(to,cb,arg)       (cb(0,arg))
42 #endif
43
44 #define EVENT_TIMEOUT_TOP       ((uint64_t)-1)
45 #define EVENT_TIMEOUT_CHILD     ((uint64_t)10000)
46
47 /** Internal shortcut for callback */
48 typedef void (*job_cb_t)(int, void*);
49
50 /** Description of a pending job */
51 struct job
52 {
53         struct job *next;    /**< link to the next job enqueued */
54         const void *group;   /**< group of the request */
55         job_cb_t callback;   /**< processing callback */
56         void *arg;           /**< argument */
57         int timeout;         /**< timeout in second for processing the request */
58         unsigned blocked: 1; /**< is an other request blocking this one ? */
59         unsigned dropped: 1; /**< is removed ? */
60 };
61
62 /** Description of handled event loops */
63 struct evloop
64 {
65         unsigned state;        /**< encoded state */
66         int efd;               /**< event notification */
67         struct sd_event *sdev; /**< the systemd event loop */
68         pthread_cond_t  cond;  /**< condition */
69 };
70
71 #define EVLOOP_STATE_WAIT           1U
72 #define EVLOOP_STATE_RUN            2U
73 #define EVLOOP_STATE_LOCK           4U
74
75 /** Description of threads */
76 struct thread
77 {
78         struct thread *next;   /**< next thread of the list */
79         struct thread *upper;  /**< upper same thread */
80         struct job *job;       /**< currently processed job */
81         pthread_t tid;         /**< the thread id */
82         unsigned stop: 1;      /**< stop requested */
83         unsigned waits: 1;     /**< is waiting? */
84 };
85
86 /**
87  * Description of synchonous callback
88  */
89 struct sync
90 {
91         struct thread thread;   /**< thread loop data */
92         union {
93                 void (*callback)(int, void*);   /**< the synchronous callback */
94                 void (*enter)(int signum, void *closure, struct jobloop *jobloop);
95                                 /**< the entering synchronous routine */
96         };
97         void *arg;              /**< the argument of the callback */
98 };
99
100
101 /* synchronisation of threads */
102 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
103 static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
104
105 /* count allowed, started and running threads */
106 static int allowed = 0; /** allowed count of threads */
107 static int started = 0; /** started count of threads */
108 static int running = 0; /** running count of threads */
109 static int remains = 0; /** allowed count of waiting jobs */
110
111 /* list of threads */
112 static struct thread *threads;
113 static _Thread_local struct thread *current_thread;
114 static _Thread_local struct evloop *current_evloop;
115
116 /* queue of pending jobs */
117 static struct job *first_job;
118 static struct job *free_jobs;
119
120 /* event loop */
121 static struct evloop evloop[1];
122
123 /**
124  * Create a new job with the given parameters
125  * @param group    the group of the job
126  * @param timeout  the timeout of the job (0 if none)
127  * @param callback the function that achieves the job
128  * @param arg      the argument of the callback
129  * @return the created job unblock or NULL when no more memory
130  */
131 static struct job *job_create(
132                 const void *group,
133                 int timeout,
134                 job_cb_t callback,
135                 void *arg)
136 {
137         struct job *job;
138
139         /* try recyle existing job */
140         job = free_jobs;
141         if (job)
142                 free_jobs = job->next;
143         else {
144                 /* allocation  without blocking */
145                 pthread_mutex_unlock(&mutex);
146                 job = malloc(sizeof *job);
147                 pthread_mutex_lock(&mutex);
148                 if (!job) {
149                         errno = -ENOMEM;
150                         goto end;
151                 }
152         }
153         /* initialises the job */
154         job->group = group;
155         job->timeout = timeout;
156         job->callback = callback;
157         job->arg = arg;
158         job->blocked = 0;
159         job->dropped = 0;
160 end:
161         return job;
162 }
163
164 /**
165  * Adds 'job' at the end of the list of jobs, marking it
166  * as blocked if an other job with the same group is pending.
167  * @param job the job to add
168  */
169 static void job_add(struct job *job)
170 {
171         const void *group;
172         struct job *ijob, **pjob;
173
174         /* prepare to add */
175         group = job->group;
176         job->next = NULL;
177
178         /* search end and blockers */
179         pjob = &first_job;
180         ijob = first_job;
181         while (ijob) {
182                 if (group && ijob->group == group)
183                         job->blocked = 1;
184                 pjob = &ijob->next;
185                 ijob = ijob->next;
186         }
187
188         /* queue the jobs */
189         *pjob = job;
190 }
191
192 /**
193  * Get the next job to process or NULL if none.
194  * @return the first job that isn't blocked or NULL
195  */
196 static inline struct job *job_get()
197 {
198         struct job *job = first_job;
199         while (job && job->blocked)
200                 job = job->next;
201         return job;
202 }
203
204 /**
205  * Releases the processed 'job': removes it
206  * from the list of jobs and unblock the first
207  * pending job of the same group if any.
208  * @param job the job to release
209  */
210 static inline void job_release(struct job *job)
211 {
212         struct job *ijob, **pjob;
213         const void *group;
214
215         /* first unqueue the job */
216         pjob = &first_job;
217         ijob = first_job;
218         while (ijob != job) {
219                 pjob = &ijob->next;
220                 ijob = ijob->next;
221         }
222         *pjob = job->next;
223
224         /* then unblock jobs of the same group */
225         group = job->group;
226         if (group) {
227                 ijob = job->next;
228                 while (ijob && ijob->group != group)
229                         ijob = ijob->next;
230                 if (ijob)
231                         ijob->blocked = 0;
232         }
233
234         /* recycle the job */
235         job->next = free_jobs;
236         free_jobs = job;
237 }
238
239 /**
240  * Monitored cancel callback for a job.
241  * This function is called by the monitor
242  * to cancel the job when the safe environment
243  * is set.
244  * @param signum 0 on normal flow or the number
245  *               of the signal that interrupted the normal
246  *               flow, isn't used
247  * @param arg    the job to run
248  */
249 static void job_cancel(int signum, void *arg)
250 {
251         struct job *job = arg;
252         job->callback(SIGABRT, job->arg);
253 }
254
255 /**
256  * Monitored normal callback for events.
257  * This function is called by the monitor
258  * to run the event loop when the safe environment
259  * is set.
260  * @param signum 0 on normal flow or the number
261  *               of the signal that interrupted the normal
262  *               flow
263  * @param arg     the events to run
264  */
265 static void evloop_run(int signum, void *arg)
266 {
267         int rc;
268         struct sd_event *se;
269         struct evloop *el = arg;
270
271         if (!signum) {
272                 se = el->sdev;
273                 rc = sd_event_prepare(se);
274                 if (rc < 0) {
275                         errno = -rc;
276                         ERROR("sd_event_prepare returned an error (state: %d): %m", sd_event_get_state(se));
277                 } else {
278                         if (rc == 0) {
279                                 rc = sd_event_wait(se, (uint64_t)(int64_t)-1);
280                                 if (rc < 0) {
281                                         errno = -rc;
282                                         ERROR("sd_event_wait returned an error (state: %d): %m", sd_event_get_state(se));
283                                 }
284                         }
285                         el->state &= ~(EVLOOP_STATE_WAIT);
286
287                         if (rc > 0) {
288                                 rc = sd_event_dispatch(se);
289                                 if (rc < 0) {
290                                         errno = -rc;
291                                         ERROR("sd_event_dispatch returned an error (state: %d): %m", sd_event_get_state(se));
292                                 }
293                         }
294                 }
295         }
296         el->state &= ~(EVLOOP_STATE_WAIT|EVLOOP_STATE_RUN);
297 }
298
299
300 /**
301  * Main processing loop of threads processing jobs.
302  * The loop must be called with the mutex locked
303  * and it returns with the mutex locked.
304  * @param me the description of the thread to use
305  * TODO: how are timeout handled when reentering?
306  */
307 static void thread_run(volatile struct thread *me)
308 {
309         struct thread **prv;
310         struct job *job;
311         struct evloop *el;
312
313         /* initialize description of itself and link it in the list */
314         me->tid = pthread_self();
315         me->stop = 0;
316         me->waits = 0;
317         me->upper = current_thread;
318         if (!current_thread) {
319                 started++;
320                 sig_monitor_init_timeouts();
321         }
322         me->next = threads;
323         threads = (struct thread*)me;
324         current_thread = (struct thread*)me;
325
326         /* loop until stopped */
327         while (!me->stop) {
328                 /* release the event loop */
329                 if (current_evloop && !(current_evloop->state & EVLOOP_STATE_RUN)) {
330                         current_evloop->state -= EVLOOP_STATE_LOCK;
331                         current_evloop = NULL;
332                 }
333
334                 /* get a job */
335                 job = job_get(first_job);
336                 if (job) {
337                         /* prepare running the job */
338                         remains++; /* increases count of job that can wait */
339                         job->blocked = 1; /* mark job as blocked */
340                         me->job = job; /* record the job (only for terminate) */
341
342                         /* run the job */
343                         pthread_mutex_unlock(&mutex);
344                         sig_monitor(job->timeout, job->callback, job->arg);
345                         pthread_mutex_lock(&mutex);
346
347                         /* release the run job */
348                         job_release(job);
349                 } else {
350                         /* no job, check events */
351                         el = &evloop[0];
352                         if (el->sdev && !el->state) {
353                                 /* run the events */
354                                 el->state = EVLOOP_STATE_LOCK|EVLOOP_STATE_RUN|EVLOOP_STATE_WAIT;
355                                 current_evloop = el;
356                                 pthread_mutex_unlock(&mutex);
357                                 sig_monitor(0, evloop_run, el);
358                                 pthread_mutex_lock(&mutex);
359                         } else {
360                                 /* no job and not events */
361                                 running--;
362                                 if (!running)
363                                         ERROR("Entering job deep sleep! Check your bindings.");
364                                 me->waits = 1;
365                                 pthread_cond_wait(&cond, &mutex);
366                                 me->waits = 0;
367                                 running++;
368                         }
369                 }
370         }
371
372         /* unlink the current thread and cleanup */
373         prv = &threads;
374         while (*prv != me)
375                 prv = &(*prv)->next;
376         *prv = me->next;
377         current_thread = me->upper;
378         if (!current_thread) {
379                 sig_monitor_clean_timeouts();
380                 started--;
381         }
382 }
383
384 /**
385  * Entry point for created threads.
386  * @param data not used
387  * @return NULL
388  */
389 static void *thread_main(void *data)
390 {
391         struct thread me;
392
393         pthread_mutex_lock(&mutex);
394         running++;
395         thread_run(&me);
396         running--;
397         pthread_mutex_unlock(&mutex);
398         return NULL;
399 }
400
401 /**
402  * Starts a new thread
403  * @return 0 in case of success or -1 in case of error
404  */
405 static int start_one_thread()
406 {
407         pthread_t tid;
408         int rc;
409
410         rc = pthread_create(&tid, NULL, thread_main, NULL);
411         if (rc != 0) {
412                 /* errno = rc; */
413                 WARNING("not able to start thread: %m");
414                 rc = -1;
415         }
416         return rc;
417 }
418
419 /**
420  * Queues a new asynchronous job represented by 'callback' and 'arg'
421  * for the 'group' and the 'timeout'.
422  * Jobs are queued FIFO and are possibly executed in parallel
423  * concurrently except for job of the same group that are
424  * executed sequentially in FIFO order.
425  * @param group    The group of the job or NULL when no group.
426  * @param timeout  The maximum execution time in seconds of the job
427  *                 or 0 for unlimited time.
428  * @param callback The function to execute for achieving the job.
429  *                 Its first parameter is either 0 on normal flow
430  *                 or the signal number that broke the normal flow.
431  *                 The remaining parameter is the parameter 'arg1'
432  *                 given here.
433  * @param arg      The second argument for 'callback'
434  * @return 0 in case of success or -1 in case of error
435  */
436 int jobs_queue(
437                 const void *group,
438                 int timeout,
439                 void (*callback)(int, void*),
440                 void *arg)
441 {
442         const char *info;
443         struct job *job;
444         int rc;
445
446         pthread_mutex_lock(&mutex);
447
448         /* allocates the job */
449         job = job_create(group, timeout, callback, arg);
450         if (!job) {
451                 errno = ENOMEM;
452                 info = "out of memory";
453                 goto error;
454         }
455
456         /* check availability */
457         if (remains == 0) {
458                 errno = EBUSY;
459                 info = "too many jobs";
460                 goto error2;
461         }
462
463         /* start a thread if needed */
464         if (running == started && started < allowed) {
465                 /* all threads are busy and a new can be started */
466                 rc = start_one_thread();
467                 if (rc < 0 && started == 0) {
468                         info = "can't start first thread";
469                         goto error2;
470                 }
471         }
472
473         /* queues the job */
474         remains--;
475         job_add(job);
476
477         /* signal an existing job */
478         pthread_cond_signal(&cond);
479         pthread_mutex_unlock(&mutex);
480         return 0;
481
482 error2:
483         job->next = free_jobs;
484         free_jobs = job;
485 error:
486         ERROR("can't process job with threads: %s, %m", info);
487         pthread_mutex_unlock(&mutex);
488         return -1;
489 }
490
491 /**
492  * Internal helper function for 'jobs_enter'.
493  * @see jobs_enter, jobs_leave
494  */
495 static void enter_cb(int signum, void *closure)
496 {
497         struct sync *sync = closure;
498         sync->enter(signum, sync->arg, (void*)&sync->thread);
499 }
500
501 /**
502  * Internal helper function for 'jobs_call'.
503  * @see jobs_call
504  */
505 static void call_cb(int signum, void *closure)
506 {
507         struct sync *sync = closure;
508         sync->callback(signum, sync->arg);
509         jobs_leave((void*)&sync->thread);
510 }
511
512 /**
513  * Internal helper for synchronous jobs. It enters
514  * a new thread loop for evaluating the given job
515  * as recorded by the couple 'sync_cb' and 'sync'.
516  * @see jobs_call, jobs_enter, jobs_leave
517  */
518 static int do_sync(
519                 const void *group,
520                 int timeout,
521                 void (*sync_cb)(int signum, void *closure),
522                 struct sync *sync
523 )
524 {
525         struct job *job;
526
527         pthread_mutex_lock(&mutex);
528
529         /* allocates the job */
530         job = job_create(group, timeout, sync_cb, sync);
531         if (!job) {
532                 ERROR("out of memory");
533                 errno = ENOMEM;
534                 pthread_mutex_unlock(&mutex);
535                 return -1;
536         }
537
538         /* queues the job */
539         job_add(job);
540
541         /* run until stopped */
542         thread_run(&sync->thread);
543         pthread_mutex_unlock(&mutex);
544         return 0;
545 }
546
547 /**
548  * Enter a synchronisation point: activates the job given by 'callback'
549  * and 'closure' using 'group' and 'timeout' to control sequencing and
550  * execution time.
551  * @param group the group for sequencing jobs
552  * @param timeout the time in seconds allocated to the job
553  * @param callback the callback that will handle the job.
554  *                 it receives 3 parameters: 'signum' that will be 0
555  *                 on normal flow or the catched signal number in case
556  *                 of interrupted flow, the context 'closure' as given and
557  *                 a 'jobloop' reference that must be used when the job is
558  *                 terminated to unlock the current execution flow.
559  * @param arg the argument to the callback
560  * @return 0 on success or -1 in case of error
561  */
562 int jobs_enter(
563                 const void *group,
564                 int timeout,
565                 void (*callback)(int signum, void *closure, struct jobloop *jobloop),
566                 void *closure
567 )
568 {
569         struct sync sync;
570
571         sync.enter = callback;
572         sync.arg = closure;
573         return do_sync(group, timeout, enter_cb, &sync);
574 }
575
576 /**
577  * Unlocks the execution flow designed by 'jobloop'.
578  * @param jobloop indication of the flow to unlock
579  * @return 0 in case of success of -1 on error
580  */
581 int jobs_leave(struct jobloop *jobloop)
582 {
583         struct thread *t;
584
585         pthread_mutex_lock(&mutex);
586         t = threads;
587         while (t && t != (struct thread*)jobloop)
588                 t = t->next;
589         if (!t) {
590                 errno = EINVAL;
591         } else {
592                 t->stop = 1;
593                 if (t->waits)
594                         pthread_cond_broadcast(&cond);
595         }
596         pthread_mutex_unlock(&mutex);
597         return -!t;
598 }
599
600 /**
601  * Calls synchronously the job represented by 'callback' and 'arg1'
602  * for the 'group' and the 'timeout' and waits for its completion.
603  * @param group    The group of the job or NULL when no group.
604  * @param timeout  The maximum execution time in seconds of the job
605  *                 or 0 for unlimited time.
606  * @param callback The function to execute for achieving the job.
607  *                 Its first parameter is either 0 on normal flow
608  *                 or the signal number that broke the normal flow.
609  *                 The remaining parameter is the parameter 'arg1'
610  *                 given here.
611  * @param arg      The second argument for 'callback'
612  * @return 0 in case of success or -1 in case of error
613  */
614 int jobs_call(
615                 const void *group,
616                 int timeout,
617                 void (*callback)(int, void*),
618                 void *arg)
619 {
620         struct sync sync;
621
622         sync.callback = callback;
623         sync.arg = arg;
624
625         return do_sync(group, timeout, call_cb, &sync);
626 }
627
628 /**
629  * Internal callback for evloop management.
630  * The effect of this function is hidden: it exits
631  * the waiting poll if any. Then it wakes up a thread
632  * awaiting the evloop using signal.
633  */
634 static int on_evloop_efd(sd_event_source *s, int fd, uint32_t revents, void *userdata)
635 {
636         uint64_t x;
637         struct evloop *evloop = userdata;
638         read(evloop->efd, &x, sizeof x);
639         pthread_mutex_lock(&mutex);
640         pthread_cond_broadcast(&evloop->cond);  
641         pthread_mutex_unlock(&mutex);
642         return 1;
643 }
644
645 /**
646  * Gets a sd_event item for the current thread.
647  * @return a sd_event or NULL in case of error
648  */
649 struct sd_event *jobs_get_sd_event()
650 {
651         struct evloop *el;
652         uint64_t x;
653         int rc;
654
655         pthread_mutex_lock(&mutex);
656
657         /* creates the evloop on need */
658         el = &evloop[0];
659         if (!el->sdev) {
660                 /* creates the eventfd for waking up polls */
661                 el->efd = eventfd(0, EFD_CLOEXEC);
662                 if (el->efd < 0) {
663                         ERROR("can't make eventfd for events");
664                         goto error1;
665                 }
666                 /* create the systemd event loop */
667                 rc = sd_event_new(&el->sdev);
668                 if (rc < 0) {
669                         ERROR("can't make new event loop");
670                         goto error2;
671                 }
672                 /* put the eventfd in the event loop */
673                 rc = sd_event_add_io(el->sdev, NULL, el->efd, EPOLLIN, on_evloop_efd, el);
674                 if (rc < 0) {
675                         ERROR("can't register eventfd");
676                         sd_event_unref(el->sdev);
677                         el->sdev = NULL;
678 error2:
679                         close(el->efd);
680 error1:
681                         pthread_mutex_unlock(&mutex);
682                         return NULL;
683                 }
684                 /* terminate creation */
685                 el->state = 0;
686         }
687
688         /* attach the event loop to the current thread */
689         if (current_evloop != el) {
690                 if (current_evloop)
691                         current_evloop->state -= EVLOOP_STATE_LOCK;
692                 current_evloop = el;
693                 el->state += EVLOOP_STATE_LOCK;
694         }
695
696         /* wait for a modifiable event loop */
697         while (el->state & EVLOOP_STATE_WAIT) {
698                 x = 1;
699                 write(el->efd, &x, sizeof x);
700                 pthread_cond_wait(&el->cond, &mutex);
701         }
702
703         pthread_mutex_unlock(&mutex);
704         return el->sdev;
705 }
706
707 /**
708  * Enter the jobs processing loop.
709  * @param allowed_count Maximum count of thread for jobs including this one
710  * @param start_count   Count of thread to start now, must be lower.
711  * @param waiter_count  Maximum count of jobs that can be waiting.
712  * @param start         The start routine to activate (can't be NULL)
713  * @return 0 in case of success or -1 in case of error.
714  */
715 int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)(int signum))
716 {
717         int rc, launched;
718         struct thread me;
719         struct job *job;
720
721         assert(allowed_count >= 1);
722         assert(start_count >= 0);
723         assert(waiter_count > 0);
724         assert(start_count <= allowed_count);
725
726         rc = -1;
727         pthread_mutex_lock(&mutex);
728
729         /* check whether already running */
730         if (current_thread || allowed) {
731                 ERROR("thread already started");
732                 errno = EINVAL;
733                 goto error;
734         }
735
736         /* start */
737         if (sig_monitor_init() < 0) {
738                 ERROR("failed to initialise signal handlers");
739                 goto error;
740         }
741
742         /* records the allowed count */
743         allowed = allowed_count;
744         started = 0;
745         running = 0;
746         remains = waiter_count;
747
748         /* start at least one thread */
749         launched = 0;
750         while ((launched + 1) < start_count) {
751                 if (start_one_thread() != 0) {
752                         ERROR("Not all threads can be started");
753                         goto error;
754                 }
755                 launched++;
756         }
757
758         /* queue the start job */
759         job = job_create(NULL, 0, (job_cb_t)start, NULL);
760         if (!job) {
761                 ERROR("out of memory");
762                 errno = ENOMEM;
763                 goto error;
764         }
765         job_add(job);
766         remains--;
767
768         /* run until end */
769         thread_run(&me);
770         rc = 0;
771 error:
772         pthread_mutex_unlock(&mutex);
773         return rc;
774 }
775
776 /**
777  * Terminate all the threads and cancel all pending jobs.
778  */
779 void jobs_terminate()
780 {
781         struct job *job, *head, *tail;
782         pthread_t me, *others;
783         struct thread *t;
784         int count;
785
786         /* how am i? */
787         me = pthread_self();
788
789         /* request all threads to stop */
790         pthread_mutex_lock(&mutex);
791         allowed = 0;
792
793         /* count the number of threads */
794         count = 0;
795         t = threads;
796         while (t) {
797                 if (!t->upper && !pthread_equal(t->tid, me))
798                         count++;
799                 t = t->next;
800         }
801
802         /* fill the array of threads */
803         others = alloca(count * sizeof *others);
804         count = 0;
805         t = threads;
806         while (t) {
807                 if (!t->upper && !pthread_equal(t->tid, me))
808                         others[count++] = t->tid;
809                 t = t->next;
810         }
811
812         /* stops the threads */
813         t = threads;
814         while (t) {
815                 t->stop = 1;
816                 t = t->next;
817         }
818
819         /* wait the threads */
820         pthread_cond_broadcast(&cond);
821         pthread_mutex_unlock(&mutex);
822         while (count)
823                 pthread_join(others[--count], NULL);
824         pthread_mutex_lock(&mutex);
825
826         /* cancel pending jobs of other threads */
827         remains = 0;
828         head = first_job;
829         first_job = NULL;
830         tail = NULL;
831         while (head) {
832                 /* unlink the job */
833                 job = head;
834                 head = job->next;
835
836                 /* search if job is stacked for current */
837                 t = current_thread;
838                 while (t && t->job != job)
839                         t = t->upper;
840                 if (t) {
841                         /* yes, relink it at end */
842                         if (tail)
843                                 tail->next = job;
844                         else
845                                 first_job = job;
846                         tail = job;
847                         job->next = NULL;
848                 } else {
849                         /* no cancel the job */
850                         pthread_mutex_unlock(&mutex);
851                         sig_monitor(0, job_cancel, job);
852                         free(job);
853                         pthread_mutex_lock(&mutex);
854                 }
855         }
856         pthread_mutex_unlock(&mutex);
857 }
858