jobs: rewrite of the event loop handling
[src/app-framework-binder.git] / src / jobs.c
1 /*
2  * Copyright (C) 2016, 2017 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #include <stdlib.h>
21 #include <stdint.h>
22 #include <unistd.h>
23 #include <signal.h>
24 #include <time.h>
25 #include <sys/syscall.h>
26 #include <pthread.h>
27 #include <errno.h>
28 #include <assert.h>
29 #include <sys/eventfd.h>
30
31 #include <systemd/sd-event.h>
32
33 #include "jobs.h"
34 #include "sig-monitor.h"
35 #include "verbose.h"
36
37 #if 0
38 #define _alert_ "do you really want to remove monitoring?"
39 #define sig_monitor_init_timeouts()  ((void)0)
40 #define sig_monitor_clean_timeouts() ((void)0)
41 #define sig_monitor(to,cb,arg)       (cb(0,arg))
42 #endif
43
44 #define EVENT_TIMEOUT_TOP       ((uint64_t)-1)
45 #define EVENT_TIMEOUT_CHILD     ((uint64_t)10000)
46
47 /** Internal shortcut for callback */
48 typedef void (*job_cb_t)(int, void*);
49
50 /** Description of a pending job */
51 struct job
52 {
53         struct job *next;    /**< link to the next job enqueued */
54         const void *group;   /**< group of the request */
55         job_cb_t callback;   /**< processing callback */
56         void *arg;           /**< argument */
57         int timeout;         /**< timeout in second for processing the request */
58         unsigned blocked: 1; /**< is an other request blocking this one ? */
59         unsigned dropped: 1; /**< is removed ? */
60 };
61
62 /** Description of handled event loops */
63 struct evloop
64 {
65         unsigned state;        /**< encoded state */
66         int efd;               /**< event notification */
67         struct sd_event *sdev; /**< the systemd event loop */
68         pthread_cond_t  cond;  /**< condition */
69 };
70
71 #define EVLOOP_STATE_WAIT           1U
72 #define EVLOOP_STATE_RUN            2U
73 #define EVLOOP_STATE_LOCK           4U
74
75 /** Description of threads */
76 struct thread
77 {
78         struct thread *next;   /**< next thread of the list */
79         struct thread *upper;  /**< upper same thread */
80         struct job *job;       /**< currently processed job */
81         pthread_t tid;         /**< the thread id */
82         unsigned stop: 1;      /**< stop requested */
83         unsigned waits: 1;     /**< is waiting? */
84 };
85
86 /**
87  * Description of synchonous callback
88  */
89 struct sync
90 {
91         struct thread thread;   /**< thread loop data */
92         union {
93                 void (*callback)(int, void*);   /**< the synchronous callback */
94                 void (*enter)(int signum, void *closure, struct jobloop *jobloop);
95                                 /**< the entering synchronous routine */
96         };
97         void *arg;              /**< the argument of the callback */
98 };
99
100
101 /* synchronisation of threads */
102 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
103 static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
104
105 /* count allowed, started and running threads */
106 static int allowed = 0; /** allowed count of threads */
107 static int started = 0; /** started count of threads */
108 static int running = 0; /** running count of threads */
109 static int remains = 0; /** allowed count of waiting jobs */
110
111 /* list of threads */
112 static struct thread *threads;
113 static _Thread_local struct thread *current_thread;
114 static _Thread_local struct evloop *current_evloop;
115
116 /* queue of pending jobs */
117 static struct job *first_job;
118 static struct job *free_jobs;
119
120 /* event loop */
121 static struct evloop evloop[1];
122
123 /**
124  * Create a new job with the given parameters
125  * @param group    the group of the job
126  * @param timeout  the timeout of the job (0 if none)
127  * @param callback the function that achieves the job
128  * @param arg      the argument of the callback
129  * @return the created job unblock or NULL when no more memory
130  */
131 static struct job *job_create(
132                 const void *group,
133                 int timeout,
134                 job_cb_t callback,
135                 void *arg)
136 {
137         struct job *job;
138
139         /* try recyle existing job */
140         job = free_jobs;
141         if (job)
142                 free_jobs = job->next;
143         else {
144                 /* allocation  without blocking */
145                 pthread_mutex_unlock(&mutex);
146                 job = malloc(sizeof *job);
147                 pthread_mutex_lock(&mutex);
148                 if (!job) {
149                         errno = -ENOMEM;
150                         goto end;
151                 }
152         }
153         /* initialises the job */
154         job->group = group;
155         job->timeout = timeout;
156         job->callback = callback;
157         job->arg = arg;
158         job->blocked = 0;
159         job->dropped = 0;
160 end:
161         return job;
162 }
163
164 /**
165  * Adds 'job' at the end of the list of jobs, marking it
166  * as blocked if an other job with the same group is pending.
167  * @param job the job to add
168  */
169 static void job_add(struct job *job)
170 {
171         const void *group;
172         struct job *ijob, **pjob;
173
174         /* prepare to add */
175         group = job->group;
176         job->next = NULL;
177
178         /* search end and blockers */
179         pjob = &first_job;
180         ijob = first_job;
181         while (ijob) {
182                 if (group && ijob->group == group)
183                         job->blocked = 1;
184                 pjob = &ijob->next;
185                 ijob = ijob->next;
186         }
187
188         /* queue the jobs */
189         *pjob = job;
190 }
191
192 /**
193  * Get the next job to process or NULL if none.
194  * @return the first job that isn't blocked or NULL
195  */
196 static inline struct job *job_get()
197 {
198         struct job *job = first_job;
199         while (job && job->blocked)
200                 job = job->next;
201         return job;
202 }
203
204 /**
205  * Releases the processed 'job': removes it
206  * from the list of jobs and unblock the first
207  * pending job of the same group if any.
208  * @param job the job to release
209  */
210 static inline void job_release(struct job *job)
211 {
212         struct job *ijob, **pjob;
213         const void *group;
214
215         /* first unqueue the job */
216         pjob = &first_job;
217         ijob = first_job;
218         while (ijob != job) {
219                 pjob = &ijob->next;
220                 ijob = ijob->next;
221         }
222         *pjob = job->next;
223
224         /* then unblock jobs of the same group */
225         group = job->group;
226         if (group) {
227                 ijob = job->next;
228                 while (ijob && ijob->group != group)
229                         ijob = ijob->next;
230                 if (ijob)
231                         ijob->blocked = 0;
232         }
233
234         /* recycle the job */
235         job->next = free_jobs;
236         free_jobs = job;
237 }
238
239 /**
240  * Monitored cancel callback for a job.
241  * This function is called by the monitor
242  * to cancel the job when the safe environment
243  * is set.
244  * @param signum 0 on normal flow or the number
245  *               of the signal that interrupted the normal
246  *               flow, isn't used
247  * @param arg    the job to run
248  */
249 static void job_cancel(int signum, void *arg)
250 {
251         struct job *job = arg;
252         job->callback(SIGABRT, job->arg);
253 }
254
255 /**
256  * Monitored normal callback for events.
257  * This function is called by the monitor
258  * to run the event loop when the safe environment
259  * is set.
260  * @param signum 0 on normal flow or the number
261  *               of the signal that interrupted the normal
262  *               flow
263  * @param arg     the events to run
264  */
265 static void evloop_run(int signum, void *arg)
266 {
267         int rc;
268         struct sd_event *se;
269         struct evloop *el = arg;
270
271         if (!signum) {
272                 se = el->sdev;
273                 rc = sd_event_prepare(se);
274                 if (rc < 0) {
275                         errno = -rc;
276                         ERROR("sd_event_prepare returned an error (state: %d): %m", sd_event_get_state(se));
277                 } else {
278                         if (rc == 0) {
279                                 rc = sd_event_wait(se, (uint64_t)(int64_t)-1);
280                                 if (rc < 0) {
281                                         errno = -rc;
282                                         ERROR("sd_event_wait returned an error (state: %d): %m", sd_event_get_state(se));
283                                 }
284                         }
285                         el->state &= ~(EVLOOP_STATE_WAIT);
286
287                         if (rc > 0) {
288                                 rc = sd_event_dispatch(se);
289                                 if (rc < 0) {
290                                         errno = -rc;
291                                         ERROR("sd_event_dispatch returned an error (state: %d): %m", sd_event_get_state(se));
292                                 }
293                         }
294                 }
295         }
296         el->state &= ~(EVLOOP_STATE_WAIT|EVLOOP_STATE_RUN);
297 }
298
299
300 /**
301  * Main processing loop of threads processing jobs.
302  * The loop must be called with the mutex locked
303  * and it returns with the mutex locked.
304  * @param me the description of the thread to use
305  * TODO: how are timeout handled when reentering?
306  */
307 static void thread_run(volatile struct thread *me)
308 {
309         struct thread **prv;
310         struct job *job;
311         struct evloop *el;
312
313         /* initialize description of itself and link it in the list */
314         me->tid = pthread_self();
315         me->stop = 0;
316         me->waits = 0;
317         me->upper = current_thread;
318         if (!current_thread) {
319                 started++;
320                 sig_monitor_init_timeouts();
321         }
322         me->next = threads;
323         threads = (struct thread*)me;
324         current_thread = (struct thread*)me;
325
326         /* loop until stopped */
327         while (!me->stop) {
328                 /* release the event loop */
329                 if (current_evloop && !(current_evloop->state & EVLOOP_STATE_RUN)) {
330                         current_evloop->state -= EVLOOP_STATE_LOCK;
331                         current_evloop = NULL;
332                 }
333
334                 /* get a job */
335                 job = job_get(first_job);
336                 if (job) {
337                         /* prepare running the job */
338                         remains++; /* increases count of job that can wait */
339                         job->blocked = 1; /* mark job as blocked */
340                         me->job = job; /* record the job (only for terminate) */
341
342                         /* run the job */
343                         pthread_mutex_unlock(&mutex);
344                         sig_monitor(job->timeout, job->callback, job->arg);
345                         pthread_mutex_lock(&mutex);
346
347                         /* release the run job */
348                         job_release(job);
349                 } else {
350                         /* no job, check events */
351                         el = &evloop[0];
352                         if (el->sdev && !el->state) {
353                                 /* run the events */
354                                 el->state = EVLOOP_STATE_LOCK|EVLOOP_STATE_RUN|EVLOOP_STATE_WAIT;
355                                 current_evloop = el;
356                                 pthread_mutex_unlock(&mutex);
357                                 sig_monitor(0, evloop_run, el);
358                                 pthread_mutex_lock(&mutex);
359                         } else {
360                                 /* no job and not events */
361                                 running--;
362                                 if (!running)
363                                         ERROR("Entering job deep sleep! Check your bindings.");
364                                 me->waits = 1;
365                                 pthread_cond_wait(&cond, &mutex);
366                                 me->waits = 0;
367                                 running++;
368                         }
369                 }
370         }
371
372         /* unlink the current thread and cleanup */
373         prv = &threads;
374         while (*prv != me)
375                 prv = &(*prv)->next;
376         *prv = me->next;
377         current_thread = me->upper;
378         if (!current_thread) {
379                 sig_monitor_clean_timeouts();
380                 started--;
381         }
382 }
383
384 /**
385  * Entry point for created threads.
386  * @param data not used
387  * @return NULL
388  */
389 static void *thread_main(void *data)
390 {
391         struct thread me;
392
393         pthread_mutex_lock(&mutex);
394         running++;
395         thread_run(&me);
396         running--;
397         pthread_mutex_unlock(&mutex);
398         return NULL;
399 }
400
401 /**
402  * Starts a new thread
403  * @return 0 in case of success or -1 in case of error
404  */
405 static int start_one_thread()
406 {
407         pthread_t tid;
408         int rc;
409
410         rc = pthread_create(&tid, NULL, thread_main, NULL);
411         if (rc != 0) {
412                 /* errno = rc; */
413                 WARNING("not able to start thread: %m");
414                 rc = -1;
415         }
416         return rc;
417 }
418
419 /**
420  * Queues a new asynchronous job represented by 'callback' and 'arg'
421  * for the 'group' and the 'timeout'.
422  * Jobs are queued FIFO and are possibly executed in parallel
423  * concurrently except for job of the same group that are
424  * executed sequentially in FIFO order.
425  * @param group    The group of the job or NULL when no group.
426  * @param timeout  The maximum execution time in seconds of the job
427  *                 or 0 for unlimited time.
428  * @param callback The function to execute for achieving the job.
429  *                 Its first parameter is either 0 on normal flow
430  *                 or the signal number that broke the normal flow.
431  *                 The remaining parameter is the parameter 'arg1'
432  *                 given here.
433  * @param arg      The second argument for 'callback'
434  * @return 0 in case of success or -1 in case of error
435  */
436 int jobs_queue(
437                 const void *group,
438                 int timeout,
439                 void (*callback)(int, void*),
440                 void *arg)
441 {
442         const char *info;
443         struct job *job;
444         int rc;
445
446         pthread_mutex_lock(&mutex);
447
448         /* allocates the job */
449         job = job_create(group, timeout, callback, arg);
450         if (!job) {
451                 errno = ENOMEM;
452                 info = "out of memory";
453                 goto error;
454         }
455
456         /* check availability */
457         if (remains == 0) {
458                 errno = EBUSY;
459                 info = "too many jobs";
460                 goto error2;
461         }
462
463         /* start a thread if needed */
464         if (running == started && started < allowed) {
465                 /* all threads are busy and a new can be started */
466                 rc = start_one_thread();
467                 if (rc < 0 && started == 0) {
468                         info = "can't start first thread";
469                         goto error2;
470                 }
471         }
472
473         /* queues the job */
474         remains--;
475         job_add(job);
476
477         /* signal an existing job */
478         pthread_cond_signal(&cond);
479         pthread_mutex_unlock(&mutex);
480         return 0;
481
482 error2:
483         job->next = free_jobs;
484         free_jobs = job;
485 error:
486         ERROR("can't process job with threads: %s, %m", info);
487         pthread_mutex_unlock(&mutex);
488         return -1;
489 }
490
491 /**
492  * Internal helper function for 'jobs_enter'.
493  * @see jobs_enter, jobs_leave
494  */
495 static void enter_cb(int signum, void *closure)
496 {
497         struct sync *sync = closure;
498         sync->enter(signum, sync->arg, (void*)&sync->thread);
499 }
500
501 /**
502  * Internal helper function for 'jobs_call'.
503  * @see jobs_call
504  */
505 static void call_cb(int signum, void *closure)
506 {
507         struct sync *sync = closure;
508         sync->callback(signum, sync->arg);
509         jobs_leave((void*)&sync->thread);
510 }
511
512 /**
513  * Internal helper for synchronous jobs. It enters
514  * a new thread loop for evaluating the given job
515  * as recorded by the couple 'sync_cb' and 'sync'.
516  * @see jobs_call, jobs_enter, jobs_leave
517  */
518 static int do_sync(
519                 const void *group,
520                 int timeout,
521                 void (*sync_cb)(int signum, void *closure),
522                 struct sync *sync
523 )
524 {
525         struct job *job;
526
527         pthread_mutex_lock(&mutex);
528
529         /* allocates the job */
530         job = job_create(group, timeout, sync_cb, sync);
531         if (!job) {
532                 ERROR("out of memory");
533                 errno = ENOMEM;
534                 pthread_mutex_unlock(&mutex);
535                 return -1;
536         }
537
538         /* queues the job */
539         job_add(job);
540
541         /* run until stopped */
542         thread_run(&sync->thread);
543         pthread_mutex_unlock(&mutex);
544         return 0;
545 }
546
547 /**
548  * Enter a synchronisation point: activates the job given by 'callback'
549  * and 'closure' using 'group' and 'timeout' to control sequencing and
550  * execution time.
551  * @param group the group for sequencing jobs
552  * @param timeout the time in seconds allocated to the job
553  * @param callback the callback that will handle the job.
554  *                 it receives 3 parameters: 'signum' that will be 0
555  *                 on normal flow or the catched signal number in case
556  *                 of interrupted flow, the context 'closure' as given and
557  *                 a 'jobloop' reference that must be used when the job is
558  *                 terminated to unlock the current execution flow.
559  * @param arg the argument to the callback
560  * @return 0 on success or -1 in case of error
561  */
562 int jobs_enter(
563                 const void *group,
564                 int timeout,
565                 void (*callback)(int signum, void *closure, struct jobloop *jobloop),
566                 void *closure
567 )
568 {
569         struct sync sync;
570
571         sync.enter = callback;
572         sync.arg = closure;
573         return do_sync(group, timeout, enter_cb, &sync);
574 }
575
576 /**
577  * Unlocks the execution flow designed by 'jobloop'.
578  * @param jobloop indication of the flow to unlock
579  * @return 0 in case of success of -1 on error
580  */
581 int jobs_leave(struct jobloop *jobloop)
582 {
583         struct thread *t;
584
585         pthread_mutex_lock(&mutex);
586         t = threads;
587         while (t && t != (struct thread*)jobloop)
588                 t = t->next;
589         if (!t) {
590                 errno = EINVAL;
591         } else {
592                 t->stop = 1;
593                 if (t->waits)
594                         pthread_cond_broadcast(&cond);
595         }
596         pthread_mutex_unlock(&mutex);
597         return -!t;
598 }
599
600 /**
601  * Calls synchronously the job represented by 'callback' and 'arg1'
602  * for the 'group' and the 'timeout' and waits for its completion.
603  * @param group    The group of the job or NULL when no group.
604  * @param timeout  The maximum execution time in seconds of the job
605  *                 or 0 for unlimited time.
606  * @param callback The function to execute for achieving the job.
607  *                 Its first parameter is either 0 on normal flow
608  *                 or the signal number that broke the normal flow.
609  *                 The remaining parameter is the parameter 'arg1'
610  *                 given here.
611  * @param arg      The second argument for 'callback'
612  * @return 0 in case of success or -1 in case of error
613  */
614 int jobs_call(
615                 const void *group,
616                 int timeout,
617                 void (*callback)(int, void*),
618                 void *arg)
619 {
620         struct sync sync;
621
622         sync.callback = callback;
623         sync.arg = arg;
624
625         return do_sync(group, timeout, call_cb, &sync);
626 }
627
628 /**
629  * Internal callback for evloop management.
630  * The effect of this function is hidden: it exits
631  * the waiting poll if any. Then it wakes up a thread
632  * awaiting the evloop using signal.
633  */
634 static int on_evloop_efd(sd_event_source *s, int fd, uint32_t revents, void *userdata)
635 {
636         uint64_t x;
637         struct evloop *evloop = userdata;
638         read(evloop->efd, &x, sizeof x);
639         pthread_cond_signal(&evloop->cond);     
640         return 1;
641 }
642
643 /**
644  * Gets a sd_event item for the current thread.
645  * @return a sd_event or NULL in case of error
646  */
647 struct sd_event *jobs_get_sd_event()
648 {
649         struct evloop *el;
650         uint64_t x;
651         int rc;
652
653         pthread_mutex_lock(&mutex);
654
655         /* creates the evloop on need */
656         el = &evloop[0];
657         if (!el->sdev) {
658                 /* creates the eventfd for waking up polls */
659                 el->efd = eventfd(0, EFD_CLOEXEC|EFD_SEMAPHORE);
660                 if (el->efd < 0) {
661                         ERROR("can't make eventfd for events");
662                         goto error1;
663                 }
664                 /* create the systemd event loop */
665                 rc = sd_event_new(&el->sdev);
666                 if (rc < 0) {
667                         ERROR("can't make new event loop");
668                         goto error2;
669                 }
670                 /* put the eventfd in the event loop */
671                 rc = sd_event_add_io(el->sdev, NULL, el->efd, EPOLLIN, on_evloop_efd, el);
672                 if (rc < 0) {
673                         ERROR("can't register eventfd");
674                         sd_event_unref(el->sdev);
675                         el->sdev = NULL;
676 error2:
677                         close(el->efd);
678 error1:
679                         pthread_mutex_unlock(&mutex);
680                         return NULL;
681                 }
682                 /* terminate creation */
683                 el->state = 0;
684         }
685
686         /* attach the event loop to the current thread */
687         if (current_evloop != el) {
688                 if (current_evloop)
689                         current_evloop->state -= EVLOOP_STATE_LOCK;
690                 current_evloop = el;
691                 el->state += EVLOOP_STATE_LOCK;
692         }
693
694         /* wait for a modifiable event loop */
695         while (el->state & EVLOOP_STATE_WAIT) {
696                 x = 1;
697                 write(el->efd, &x, sizeof x);
698                 pthread_cond_wait(&el->cond, &mutex);
699         }
700
701         pthread_mutex_unlock(&mutex);
702         return el->sdev;
703 }
704
705 /**
706  * Enter the jobs processing loop.
707  * @param allowed_count Maximum count of thread for jobs including this one
708  * @param start_count   Count of thread to start now, must be lower.
709  * @param waiter_count  Maximum count of jobs that can be waiting.
710  * @param start         The start routine to activate (can't be NULL)
711  * @return 0 in case of success or -1 in case of error.
712  */
713 int jobs_start(int allowed_count, int start_count, int waiter_count, void (*start)(int signum))
714 {
715         int rc, launched;
716         struct thread me;
717         struct job *job;
718
719         assert(allowed_count >= 1);
720         assert(start_count >= 0);
721         assert(waiter_count > 0);
722         assert(start_count <= allowed_count);
723
724         rc = -1;
725         pthread_mutex_lock(&mutex);
726
727         /* check whether already running */
728         if (current_thread || allowed) {
729                 ERROR("thread already started");
730                 errno = EINVAL;
731                 goto error;
732         }
733
734         /* start */
735         if (sig_monitor_init() < 0) {
736                 ERROR("failed to initialise signal handlers");
737                 goto error;
738         }
739
740         /* records the allowed count */
741         allowed = allowed_count;
742         started = 0;
743         running = 0;
744         remains = waiter_count;
745
746         /* start at least one thread */
747         launched = 0;
748         while ((launched + 1) < start_count) {
749                 if (start_one_thread() != 0) {
750                         ERROR("Not all threads can be started");
751                         goto error;
752                 }
753                 launched++;
754         }
755
756         /* queue the start job */
757         job = job_create(NULL, 0, (job_cb_t)start, NULL);
758         if (!job) {
759                 ERROR("out of memory");
760                 errno = ENOMEM;
761                 goto error;
762         }
763         job_add(job);
764         remains--;
765
766         /* run until end */
767         thread_run(&me);
768         rc = 0;
769 error:
770         pthread_mutex_unlock(&mutex);
771         return rc;
772 }
773
774 /**
775  * Terminate all the threads and cancel all pending jobs.
776  */
777 void jobs_terminate()
778 {
779         struct job *job, *head, *tail;
780         pthread_t me, *others;
781         struct thread *t;
782         int count;
783
784         /* how am i? */
785         me = pthread_self();
786
787         /* request all threads to stop */
788         pthread_mutex_lock(&mutex);
789         allowed = 0;
790
791         /* count the number of threads */
792         count = 0;
793         t = threads;
794         while (t) {
795                 if (!t->upper && !pthread_equal(t->tid, me))
796                         count++;
797                 t = t->next;
798         }
799
800         /* fill the array of threads */
801         others = alloca(count * sizeof *others);
802         count = 0;
803         t = threads;
804         while (t) {
805                 if (!t->upper && !pthread_equal(t->tid, me))
806                         others[count++] = t->tid;
807                 t = t->next;
808         }
809
810         /* stops the threads */
811         t = threads;
812         while (t) {
813                 t->stop = 1;
814                 t = t->next;
815         }
816
817         /* wait the threads */
818         pthread_cond_broadcast(&cond);
819         pthread_mutex_unlock(&mutex);
820         while (count)
821                 pthread_join(others[--count], NULL);
822         pthread_mutex_lock(&mutex);
823
824         /* cancel pending jobs of other threads */
825         remains = 0;
826         head = first_job;
827         first_job = NULL;
828         tail = NULL;
829         while (head) {
830                 /* unlink the job */
831                 job = head;
832                 head = job->next;
833
834                 /* search if job is stacked for current */
835                 t = current_thread;
836                 while (t && t->job != job)
837                         t = t->upper;
838                 if (t) {
839                         /* yes, relink it at end */
840                         if (tail)
841                                 tail->next = job;
842                         else
843                                 first_job = job;
844                         tail = job;
845                         job->next = NULL;
846                 } else {
847                         /* no cancel the job */
848                         pthread_mutex_unlock(&mutex);
849                         sig_monitor(0, job_cancel, job);
850                         free(job);
851                         pthread_mutex_lock(&mutex);
852                 }
853         }
854         pthread_mutex_unlock(&mutex);
855 }
856