Add comments
[src/app-framework-binder.git] / src / jobs.c
1 /*
2  * Copyright (C) 2016, 2017 "IoT.bzh"
3  * Author José Bollo <jose.bollo@iot.bzh>
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *   http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #define _GNU_SOURCE
19
20 #include <stdlib.h>
21 #include <unistd.h>
22 #include <signal.h>
23 #include <time.h>
24 #include <sys/syscall.h>
25 #include <pthread.h>
26 #include <errno.h>
27 #include <assert.h>
28
29 #include "jobs.h"
30 #include "sig-monitor.h"
31 #include "verbose.h"
32
33 #if 0
34 #define _alert_ "do you really want to remove monitoring?"
35 #define sig_monitor_init_timeouts()  ((void)0)
36 #define sig_monitor_clean_timeouts() ((void)0)
37 #define sig_monitor(to,cb,arg)       (cb(0,arg))
38 #endif
39
40 /** Internal shortcut for callback */
41 typedef void (*job_cb_t)(int, void*, void *, void*);
42
43 /** Description of a pending job */
44 struct job
45 {
46         struct job *next;    /**< link to the next job enqueued */
47         void *group;         /**< group of the request */
48         job_cb_t callback;   /**< processing callback */
49         void *arg1;          /**< first arg */
50         void *arg2;          /**< second arg */
51         void *arg3;          /**< third arg */
52         int timeout;         /**< timeout in second for processing the request */
53         unsigned blocked: 1; /**< is an other request blocking this one ? */
54         unsigned dropped: 1; /**< is removed ? */
55 };
56
57 /** Description of threads */
58 struct thread
59 {
60         struct thread *next;  /**< next thread of the list */
61         struct thread *upper; /**< upper same thread */
62         struct job *job;      /**< currently processed job */
63         pthread_t tid;        /**< the thread id */
64         unsigned stop: 1;     /**< stop requested */
65         unsigned lowered: 1;  /**< has a lower same thread */
66         unsigned waits: 1;    /**< is waiting? */
67 };
68
69 /* synchronisation of threads */
70 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
71 static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;
72
73 /* count allowed, started and waiting threads */
74 static int allowed = 0; /** allowed count of threads */
75 static int started = 0; /** started count of threads */
76 static int waiting = 0; /** waiting count of threads */
77 static int remains = 0; /** allowed count of waiting jobs */
78
79 /* list of threads */
80 static struct thread *threads;
81 static _Thread_local struct thread *current;
82
83 /* queue of pending jobs */
84 static struct job *first_job;
85 static struct job *first_events;
86 static struct job *free_jobs;
87
88 /**
89  * Create a new job with the given parameters
90  * @param group    the group of the job
91  * @param timeout  the timeout of the job (0 if none)
92  * @param callback the function that achieves the job
93  * @param arg1     the first argument of the callback
94  * @param arg2     the second argument of the callback
95  * @param arg3     the third argument of the callback
96  * @return the created job unblock or NULL when no more memory
97  */
98 static struct job *job_create(
99                 void *group,
100                 int timeout,
101                 job_cb_t callback,
102                 void *arg1,
103                 void *arg2,
104                 void *arg3)
105 {
106         struct job *job;
107
108         /* try recyle existing job */
109         job = free_jobs;
110         if (job)
111                 free_jobs = job->next;
112         else {
113                 /* allocation  without blocking */
114                 pthread_mutex_unlock(&mutex);
115                 job = malloc(sizeof *job);
116                 pthread_mutex_lock(&mutex);
117                 if (!job) {
118                         errno = -ENOMEM;
119                         goto end;
120                 }
121         }
122         /* initialises the job */
123         job->group = group;
124         job->timeout = timeout;
125         job->callback = callback;
126         job->arg1 = arg1;
127         job->arg2 = arg2;
128         job->arg3 = arg3;
129         job->blocked = 0;
130         job->dropped = 0;
131 end:
132         return job;
133 }
134
135 /**
136  * Adds 'job1' and 'job2' at the end of the list of jobs, marking it
137  * as blocked if an other job with the same group is pending.
138  * @param job1 the first job to add
139  * @param job2 the second job to add or NULL
140  */
141 static void job_add2(struct job *job1, struct job *job2)
142 {
143         void *group1, *group2, *group;
144         struct job *ijob, **pjob;
145
146         /* prepare to add */
147         group1 = job1->group;
148         job1->next = job2;
149         if (!job2)
150                 group2 = NULL;
151         else {
152                 job2->next = NULL;
153                 group2 = job2->group;
154                 if (group2 && group2 == group1)
155                         job2->blocked = 1;
156         }
157
158         /* search end and blockers */
159         pjob = &first_job;
160         ijob = first_job;
161         while (ijob) {
162                 group = ijob->group;
163                 if (group) {
164                         if (group == group1)
165                                 job1->blocked = 1;
166                         if (group == group2)
167                                 job2->blocked = 1;
168                 }
169                 pjob = &ijob->next;
170                 ijob = ijob->next;
171         }
172
173         /* queue the jobs */
174         *pjob = job1;
175 }
176
177 /**
178  * Get the next job to process or NULL if none.
179  * @param job the head of the list to search.
180  * @return the first job that isn't blocked or NULL
181  */
182 static inline struct job *job_get(struct job *job)
183 {
184         while (job && job->blocked)
185                 job = job->next;
186         return job;
187 }
188
189 /**
190  * Releases the processed 'job': removes it
191  * from the list of jobs and unblock the first
192  * pending job of the same group if any.
193  * @param job the job to release
194  */
195 static inline void job_release(struct job *job)
196 {
197         struct job *ijob, **pjob;
198         void *group;
199
200         /* first unqueue the job */
201         pjob = &first_job;
202         ijob = first_job;
203         while (ijob != job) {
204                 pjob = &ijob->next;
205                 ijob = ijob->next;
206         }
207         *pjob = job->next;
208
209         /* then unblock jobs of the same group */
210         group = job->group;
211         if (group) {
212                 ijob = job->next;
213                 while (ijob && ijob->group != group)
214                         ijob = ijob->next;
215                 if (ijob)
216                         ijob->blocked = 0;
217         }
218
219         /* recycle the job */
220         job->next = free_jobs;
221         free_jobs = job;
222 }
223
224 /**
225  * Releases the events 'job': removes it
226  * from the list of events.
227  * @param job the event to release
228  */
229 static inline void events_release(struct job *job)
230 {
231         struct job *ijob, **pjob;
232
233         /* first unqueue the job */
234         pjob = &first_events;
235         ijob = first_events;
236         while (ijob != job) {
237                 pjob = &ijob->next;
238                 ijob = ijob->next;
239         }
240         *pjob = job->next;
241
242         /* recycle the job */
243         job->next = free_jobs;
244         free_jobs = job;
245 }
246
247 /**
248  * Get the events of 'key' if existing.
249  * @param key the key to search
250  * @return the found events or NULL if none existing has key
251  */
252 static inline struct job *events_of_key(void *key)
253 {
254         struct job *job;
255
256         if (!key)
257                 job = NULL;
258         else {
259                 job = first_events;
260                 while (job && (job->dropped || job->group != key))
261                         job = job->next;
262         }
263         return job;
264 }
265
266 /**
267  * Monitored normal callback for a job.
268  * This function is called by the monitor
269  * to run the job when the safe environment
270  * is set.
271  * @param signum 0 on normal flow or the number
272  *               of the signal that interrupted the normal
273  *               flow
274  * @param arg     the job to run
275  */
276 static void job_call(int signum, void *arg)
277 {
278         struct job *job = arg;
279         job->callback(signum, job->arg1, job->arg2, job->arg3);
280 }
281
282 /**
283  * Monitored cancel callback for a job.
284  * This function is called by the monitor
285  * to cancel the job when the safe environment
286  * is set.
287  * @param signum 0 on normal flow or the number
288  *               of the signal that interrupted the normal
289  *               flow, isn't used
290  * @param arg    the job to run
291  */
292 static void job_cancel(int signum, void *arg)
293 {
294         job_call(SIGABRT, arg);
295 }
296
297 /**
298  * Main processing loop of threads processing jobs.
299  * The loop must be called with the mutex locked
300  * and it returns with the mutex locked.
301  * @param me the description of the thread to use
302  * TODO: how are timeout handled when reentering?
303  */
304 static void thread_run(struct thread *me)
305 {
306         struct thread **prv;
307         struct job *job;
308
309         /* initialize description of itself and link it in the list */
310         me->tid = pthread_self();
311         me->stop = 0;
312         me->lowered = 0;
313         me->waits = 0;
314         me->upper = current;
315         if (current)
316                 current->lowered = 1;
317         else
318                 sig_monitor_init_timeouts();
319         current = me;
320         me->next = threads;
321         threads = me;
322         started++;
323
324         /* loop until stopped */
325         while (!me->stop) {
326                 /* get a job */
327                 job = job_get(first_job);
328                 if (job) {
329                         /* prepare running the job */
330                         remains++; /* increases count of job that can wait */
331                         job->blocked = 1; /* mark job as blocked */
332                         me->job = job; /* record the job (only for terminate) */
333
334                         /* run the job */
335                         pthread_mutex_unlock(&mutex);
336                         sig_monitor(job->timeout, job_call, job);
337                         pthread_mutex_lock(&mutex);
338
339                         /* release the run job */
340                         job_release(job);
341                 } else {
342                         /* no job, check events */
343                         job = job_get(first_events);
344                         if (job) {
345                                 /* run the events */
346                                 job->blocked = 1;
347                                 pthread_mutex_unlock(&mutex);
348                                 sig_monitor(job->timeout, job_call, job);
349                                 pthread_mutex_lock(&mutex);
350                                 job->blocked = 0;
351                                 if (job->dropped)
352                                         events_release(job);
353                         } else {
354                                 /* no job and not events */
355                                 waiting++;
356                                 me->waits = 1;
357                                 pthread_cond_wait(&cond, &mutex);
358                                 me->waits = 0;
359                                 waiting--;
360                         }
361                 }
362         }
363
364         /* unlink the current thread and cleanup */
365         started--;
366         prv = &threads;
367         while (*prv != me)
368                 prv = &(*prv)->next;
369         *prv = me->next;
370         current = me->upper;
371         if (current)
372                 current->lowered = 0;
373         else
374                 sig_monitor_clean_timeouts();
375 }
376
377 /**
378  * Entry point for created threads.
379  * @param data not used
380  * @return NULL
381  */
382 static void *thread_main(void *data)
383 {
384         struct thread me;
385
386         pthread_mutex_lock(&mutex);
387         thread_run(&me);
388         pthread_mutex_unlock(&mutex);
389         return NULL;
390 }
391
392 /**
393  * Starts a new thread
394  * @return 0 in case of success or -1 in case of error
395  */
396 static int start_one_thread()
397 {
398         pthread_t tid;
399         int rc;
400
401         rc = pthread_create(&tid, NULL, thread_main, NULL);
402         if (rc != 0) {
403                 /* errno = rc; */
404                 WARNING("not able to start thread: %m");
405                 rc = -1;
406         }
407         return rc;
408 }
409
410 /**
411  * Queues a new asynchronous job represented by 'callback'
412  * for the 'group' and the 'timeout'.
413  * Jobs are queued FIFO and are possibly executed in parallel
414  * concurrently except for job of the same group that are
415  * executed sequentially in FIFO order.
416  * @param group    The group of the job or NULL when no group.
417  * @param timeout  The maximum execution time in seconds of the job
418  *                 or 0 for unlimited time.
419  * @param callback The function to execute for achieving the job.
420  *                 Its first parameter is either 0 on normal flow
421  *                 or the signal number that broke the normal flow.
422  * @return 0 in case of success or -1 in case of error
423  */
424 int jobs_queue0(
425                 void *group,
426                 int timeout,
427                 void (*callback)(int signum))
428 {
429         return jobs_queue3(group, timeout, (job_cb_t)callback, NULL, NULL, NULL);
430 }
431
432 /**
433  * Queues a new asynchronous job represented by 'callback' and 'arg1'
434  * for the 'group' and the 'timeout'.
435  * Jobs are queued FIFO and are possibly executed in parallel
436  * concurrently except for job of the same group that are
437  * executed sequentially in FIFO order.
438  * @param group    The group of the job or NULL when no group.
439  * @param timeout  The maximum execution time in seconds of the job
440  *                 or 0 for unlimited time.
441  * @param callback The function to execute for achieving the job.
442  *                 Its first parameter is either 0 on normal flow
443  *                 or the signal number that broke the normal flow.
444  *                 The remaining parameter is the parameter 'arg1'
445  *                 given here.
446  * @param arg1     The second argument for 'callback'
447  * @return 0 in case of success or -1 in case of error
448  */
449 int jobs_queue(
450                 void *group,
451                 int timeout,
452                 void (*callback)(int, void*),
453                 void *arg)
454 {
455         return jobs_queue3(group, timeout, (job_cb_t)callback, arg, NULL, NULL);
456 }
457
458 /**
459  * Queues a new asynchronous job represented by 'callback' and 'arg[12]'
460  * for the 'group' and the 'timeout'.
461  * Jobs are queued FIFO and are possibly executed in parallel
462  * concurrently except for job of the same group that are
463  * executed sequentially in FIFO order.
464  * @param group    The group of the job or NULL when no group.
465  * @param timeout  The maximum execution time in seconds of the job
466  *                 or 0 for unlimited time.
467  * @param callback The function to execute for achieving the job.
468  *                 Its first parameter is either 0 on normal flow
469  *                 or the signal number that broke the normal flow.
470  *                 The remaining parameters are the parameters 'arg[12]'
471  *                 given here.
472  * @param arg1     The second argument for 'callback'
473  * @param arg2     The third argument for 'callback'
474  * @return 0 in case of success or -1 in case of error
475  */
476 int jobs_queue2(
477                 void *group,
478                 int timeout,
479                 void (*callback)(int, void*, void*),
480                 void *arg1,
481                 void *arg2)
482 {
483         return jobs_queue3(group, timeout, (job_cb_t)callback, arg1, arg2, NULL);
484 }
485
486 /**
487  * Queues a new asynchronous job represented by 'callback' and 'arg[123]'
488  * for the 'group' and the 'timeout'.
489  * Jobs are queued FIFO and are possibly executed in parallel
490  * concurrently except for job of the same group that are
491  * executed sequentially in FIFO order.
492  * @param group    The group of the job or NULL when no group.
493  * @param timeout  The maximum execution time in seconds of the job
494  *                 or 0 for unlimited time.
495  * @param callback The function to execute for achieving the job.
496  *                 Its first parameter is either 0 on normal flow
497  *                 or the signal number that broke the normal flow.
498  *                 The remaining parameters are the parameters 'arg[123]'
499  *                 given here.
500  * @param arg1     The second argument for 'callback'
501  * @param arg2     The third argument for 'callback'
502  * @param arg3     The forth argument for 'callback'
503  * @return 0 in case of success or -1 in case of error
504  */
505 int jobs_queue3(
506                 void *group,
507                 int timeout,
508                 void (*callback)(int, void*, void *, void*),
509                 void *arg1,
510                 void *arg2,
511                 void *arg3)
512 {
513         const char *info;
514         struct job *job;
515         int rc;
516
517         pthread_mutex_lock(&mutex);
518
519         /* allocates the job */
520         job = job_create(group, timeout, callback, arg1, arg2, arg3);
521         if (!job) {
522                 errno = ENOMEM;
523                 info = "out of memory";
524                 goto error;
525         }
526
527         /* check availability */
528         if (remains == 0) {
529                 errno = EBUSY;
530                 info = "too many jobs";
531                 goto error2;
532         }
533
534         /* start a thread if needed */
535         if (waiting == 0 && started < allowed) {
536                 /* all threads are busy and a new can be started */
537                 rc = start_one_thread();
538                 if (rc < 0 && started == 0) {
539                         info = "can't start first thread";
540                         goto error2;
541                 }
542         }
543
544         /* queues the job */
545         remains--;
546         job_add2(job, NULL);
547
548         /* signal an existing job */
549         pthread_cond_signal(&cond);
550         pthread_mutex_unlock(&mutex);
551         return 0;
552
553 error2:
554         job->next = free_jobs;
555         free_jobs = job;
556 error:
557         ERROR("can't process job with threads: %s, %m", info);
558         pthread_mutex_unlock(&mutex);
559         return -1;
560 }
561
562 /**
563  * Run a asynchronous job represented by 'callback'
564  * with the 'timeout' but only returns after job completion.
565  * @param timeout  The maximum execution time in seconds of the job
566  *                 or 0 for unlimited time.
567  * @param callback The function to execute for achieving the job.
568  *                 Its first parameter is either 0 on normal flow
569  *                 or the signal number that broke the normal flow.
570  * @return 0 in case of success or -1 in case of error
571  */
572 int jobs_invoke0(
573                 int timeout,
574                 void (*callback)(int signum))
575 {
576         return jobs_invoke3(timeout, (job_cb_t)callback, NULL, NULL, NULL);
577 }
578
579 /**
580  * Run a asynchronous job represented by 'callback' and 'arg1'
581  * with the 'timeout' but only returns after job completion.
582  * @param timeout  The maximum execution time in seconds of the job
583  *                 or 0 for unlimited time.
584  * @param callback The function to execute for achieving the job.
585  *                 Its first parameter is either 0 on normal flow
586  *                 or the signal number that broke the normal flow.
587  *                 The remaining parameter is the parameter 'arg1'
588  *                 given here.
589  * @param arg1     The second argument for 'callback'
590  * @return 0 in case of success or -1 in case of error
591  */
592 int jobs_invoke(
593                 int timeout,
594                 void (*callback)(int, void*),
595                 void *arg)
596 {
597         return jobs_invoke3(timeout, (job_cb_t)callback, arg, NULL, NULL);
598 }
599
600 /**
601  * Run a asynchronous job represented by 'callback' and 'arg[12]'
602  * with the 'timeout' but only returns after job completion.
603  * @param timeout  The maximum execution time in seconds of the job
604  *                 or 0 for unlimited time.
605  * @param callback The function to execute for achieving the job.
606  *                 Its first parameter is either 0 on normal flow
607  *                 or the signal number that broke the normal flow.
608  *                 The remaining parameters are the parameters 'arg[12]'
609  *                 given here.
610  * @param arg1     The second argument for 'callback'
611  * @param arg2     The third argument for 'callback'
612  * @return 0 in case of success or -1 in case of error
613  */
614 int jobs_invoke2(
615                 int timeout,
616                 void (*callback)(int, void*, void*),
617                 void *arg1,
618                 void *arg2)
619 {
620         return jobs_invoke3(timeout, (job_cb_t)callback, arg1, arg2, NULL);
621 }
622
623 /**
624  * Stops the thread pointed by 'arg1'. Used with
625  * invoke familly to return to the caller after completion.
626  * @param signum Unused
627  * @param arg1   The thread to stop
628  * @param arg2   Unused
629  * @param arg3   Unused
630  */
631 static void unlock_invoker(int signum, void *arg1, void *arg2, void *arg3)
632 {
633         struct thread *t = arg1;
634         pthread_mutex_lock(&mutex);
635         t->stop = 1;
636         if (t->waits)
637                 pthread_cond_broadcast(&cond);
638         pthread_mutex_unlock(&mutex);
639 }
640
641 /**
642  * Run a asynchronous job represented by 'callback' and 'arg[123]'
643  * with the 'timeout' but only returns after job completion.
644  * @param timeout  The maximum execution time in seconds of the job
645  *                 or 0 for unlimited time.
646  * @param callback The function to execute for achieving the job.
647  *                 Its first parameter is either 0 on normal flow
648  *                 or the signal number that broke the normal flow.
649  *                 The remaining parameters are the parameters 'arg[123]'
650  *                 given here.
651  * @param arg1     The second argument for 'callback'
652  * @param arg2     The third argument for 'callback'
653  * @param arg3     The forth argument for 'callback'
654  * @return 0 in case of success or -1 in case of error
655  */
656 int jobs_invoke3(
657                 int timeout,
658                 void (*callback)(int, void*, void *, void*),
659                 void *arg1,
660                 void *arg2,
661                 void *arg3)
662 {
663         struct job *job1, *job2;
664         struct thread me;
665         
666         pthread_mutex_lock(&mutex);
667
668         /* allocates the job */
669         job1 = job_create(&me, timeout, callback, arg1, arg2, arg3);
670         job2 = job_create(&me, 0, unlock_invoker, &me, NULL, NULL);
671         if (!job1 || !job2) {
672                 ERROR("out of memory");
673                 errno = ENOMEM;
674                 if (job1) {
675                         job1->next = free_jobs;
676                         free_jobs = job1;
677                 }
678                 if (job2) {
679                         job2->next = free_jobs;
680                         free_jobs = job2;
681                 }
682                 pthread_mutex_unlock(&mutex);
683                 return -1;
684         }
685
686         /* queues the job */
687         job_add2(job1, job2);
688
689         /* run until stopped */
690         thread_run(&me);
691         pthread_mutex_unlock(&mutex);
692         return 0;
693 }
694
695 /**
696  * Initialise the job stuff.
697  * @param allowed_count Maximum count of thread for jobs (can be 0,
698  *                      see 'jobs_add_me' for merging new threads)
699  * @param start_count   Count of thread to start now, must be lower.
700  * @param waiter_count  Maximum count of jobs that can be waiting.
701  * @return 0 in case of success or -1 in case of error.
702  */
703 int jobs_init(int allowed_count, int start_count, int waiter_count)
704 {
705         int rc, launched;
706
707         assert(allowed_count >= 0);
708         assert(start_count >= 0);
709         assert(waiter_count > 0);
710         assert(start_count <= allowed_count);
711
712         /* records the allowed count */
713         allowed = allowed_count;
714         started = 0;
715         waiting = 0;
716         remains = waiter_count;
717
718         /* start at least one thread */
719         pthread_mutex_lock(&mutex);
720         launched = 0;
721         while (launched < start_count && start_one_thread() == 0)
722                 launched++;
723         rc = -(launched != start_count);
724         pthread_mutex_unlock(&mutex);
725
726         /* end */
727         if (rc)
728                 ERROR("Not all threads can be started");
729         return rc;
730 }
731
732 /**
733  * Terminate all the threads and cancel all pending jobs.
734  */
735 void jobs_terminate()
736 {
737         struct job *job, *head, *tail;
738         pthread_t me, *others;
739         struct thread *t;
740         int count;
741
742         /* how am i? */
743         me = pthread_self();
744
745         /* request all threads to stop */
746         pthread_mutex_lock(&mutex);
747         allowed = 0;
748
749         /* count the number of threads */
750         count = 0;
751         t = threads;
752         while (t) {
753                 if (!t->upper && !pthread_equal(t->tid, me))
754                         count++;
755                 t = t->next;
756         }
757
758         /* fill the array of threads */
759         others = alloca(count * sizeof *others);
760         count = 0;
761         t = threads;
762         while (t) {
763                 if (!t->upper && !pthread_equal(t->tid, me))
764                         others[count++] = t->tid;
765                 t = t->next;
766         }
767
768         /* stops the threads */
769         t = threads;
770         while (t) {
771                 t->stop = 1;
772                 t = t->next;
773         }
774
775         /* wait the threads */
776         pthread_cond_broadcast(&cond);
777         pthread_mutex_unlock(&mutex);
778         while (count)
779                 pthread_join(others[--count], NULL);
780         pthread_mutex_lock(&mutex);
781
782         /* cancel pending jobs of other threads */
783         remains = 0;
784         head = first_job;
785         first_job = NULL;
786         tail = NULL;
787         while (head) {
788                 /* unlink the job */
789                 job = head;
790                 head = job->next;
791
792                 /* search if job is stacked for current */
793                 t = current;
794                 while (t && t->job != job)
795                         t = t->upper;
796                 if (t) {
797                         /* yes, relink it at end */
798                         if (tail)
799                                 tail->next = job;
800                         else
801                                 first_job = job;
802                         tail = job;
803                         job->next = NULL;
804                 } else {
805                         /* no cancel the job */
806                         pthread_mutex_unlock(&mutex);
807                         sig_monitor(0, job_cancel, job);
808                         free(job);
809                         pthread_mutex_lock(&mutex);
810                 }
811         }
812         pthread_mutex_unlock(&mutex);
813 }
814
815 /**
816  * Adds the events waiter/dispatcher to the list of events waiters/dispatchers
817  * to monitor.
818  * @param key     A key to register the events waiter/dispatcher (see
819  *                'jobs_del_events')
820  * @param timeout Timeout in second of the function or 0 if none
821  * @param events  The callback, the first argument is 0 for normal
822  *                flow or the signal number when normal flow failed
823  * @param closure The closure to give to the callback as secondd argument
824  * @return 0 in case of success or -1 in case of error
825  */
826 int jobs_add_events(void *key, int timeout, void (*events)(int signum, void*), void *closure)
827 {
828         struct job *job;
829
830         pthread_mutex_lock(&mutex);
831
832         /* look at an already existsing events for same key */
833         job = events_of_key(key);
834         if (job) {
835                 pthread_mutex_unlock(&mutex);
836                 ERROR("events of key %p already exist", key);
837                 errno = EEXIST;
838                 return -1;
839         }
840
841         /* creates the job */
842         job = job_create(key, timeout, (job_cb_t)events, closure, NULL, NULL);
843         if (!job) {
844                 pthread_mutex_unlock(&mutex);
845                 ERROR("Can't create events, out of memory");
846                 errno = ENOMEM;
847                 return -1;
848         }
849
850         /* adds the loop */
851         job->next = first_events;
852         first_events = job;
853
854         /* signal the loop */
855         if (waiting)
856                 pthread_cond_signal(&cond);
857         pthread_mutex_unlock(&mutex);
858         return 0;
859 }
860
861 /**
862  * Removes the events of 'key'
863  * @param key The key of the events to remove
864  * @return 0 in case of success or -1 in case of error
865  */
866 int jobs_del_events(void *key)
867 {
868         struct job *job;
869
870         pthread_mutex_lock(&mutex);
871         job = events_of_key(key);
872         if (job)
873                 if (job->blocked)
874                         job->dropped = 1;
875                 else
876                         events_release(job);
877         pthread_mutex_unlock(&mutex);
878         if (!job) {
879                 ERROR("events of key %p not found", key);
880                 errno = ENOENT;
881         }
882         return -!job;
883 }
884
885 /**
886  * Adds the current thread to the pool of threads
887  * processing the jobs. Returns normally when the threads are
888  * terminated or immediately with an error if the thread is
889  * already in the pool.
890  * @return 0 in case of success or -1 in case of error
891  */
892 int jobs_add_me()
893 {
894         struct thread me;
895
896         /* check whether already running */
897         if (current) {
898                 ERROR("thread already running");
899                 errno = EINVAL;
900                 return -1;
901         }
902
903         /* allowed... */
904         pthread_mutex_lock(&mutex);
905         allowed++;
906         thread_run(&me);
907         allowed--;
908         pthread_mutex_unlock(&mutex);
909         return 0;
910 }
911
912